Spring Boot – Asynchronous Implementation
This article is about how asynchronous behavior can be achieved in spring boot. But first of all, let’s see the difference between synchronous and asynchronous.
-
Synchronous Programming: In synchronous programming, tasks are performed one at a time and only when one is completed the next is unblocked.
-
Asynchronous Programming: In asynchronous programming, multiple tasks can be executed simultaneously. You can move to another task before the previous one finishes.
In spring boot, we can achieve asynchronous behavior using @EnableAsync
&
@Async
annotations.
-
@EnableAsync
- to enable async support by annotating the main application class. -
@Async
- annotate the method
When you annotate a method with @Async
annotation, it creates a proxy for
that object based on “proxyTargetClass”
property.
When spring executes this method, by default it will be searching for associated thread pool definition. Either a unique spring framework TaskExecutor bean in the context or Executor bean named “taskExecutor”. If neither of these two is resolvable, default it will use spring framework SimpleAsyncTaskExecutor to process async method execution.
Example – Without Async
@RestController
public class DecomController {
@Autowired
DecomService service;
@GetMapping("/decomTrial")
public String decomClinicalTrial() {
System.out.println(" *** decomClinicalTrial :: Started*** ");
service.ArchiveUsers();
service.ArchiveReports();
System.out.println(" *** decomClinicalTrial :: Completed.*** ");
return "decomClinicalTrial Complted.";
}
}
@Service
public class DecomService {
public void ArchiveUsers() {
S.o.p(Thread.currentThread().getName() + " :ArchiveUsers :Start "+new Date());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
S.o.p (Thread.currentThread().getName() + " :ArchiveUsers :End " + new Date());
}
public void ArchiveReports() {
S.o.p(Thread.currentThread().getName() + ":ArchiveReports : Start" + new Date());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
S.o.p(Thread.currentThread().getName() + " :ArchiveReports : End" + new Date());
}
}
@SpringBootApplication
public class TrialServicesApplication {
public static void main(String[] args) {
SpringApplication.run(TrialServicesApplication.class, args);
System.out.println("*************************************");
System.out.println("Trial Services Startred ... ");
System.out.println("*************************************");
}
}
*** decomClinicalTrial: Started***
http-nio-8080-exec-1:ArchiveUsers :Start 24 Sep 2021 07:52:32 GMT
http-nio-8080-exec-1:ArchiveUsers :End 24 Sep 2021 07:52:37 GMT
http-nio-8080-exec-1:ArchiveReports :Start24 Sep 2021 07:52:37 GMT
http-nio-8080-exec-1:ArchiveReports :End24 Sep 2021 07:52:42 GMT
*** decomClinicalTrial: Completed.***
Here all methods execute by single Thread http-nio-8080-exec-1
With @Aync
Place @EnableAsync
on the Top of SpringBoot Main class -
TrialServicesApplication
@EnableAsync
@SpringBootApplication
public class TrialServicesApplication {
public static void main(String[] args) {
SpringApplication.run(TrialServicesApplication.class, args);
System.out.println("*************************************");
System.out.println("Trial Services Startred ... ");
System.out.println("*************************************");
}
}
Place @Async
on the top of each method, which we want to execute in parallel.
@Service
public class DecomService {
@Async
public void ArchiveUsers() {
S.o.p(Thread.currentThread().getName() + " :ArchiveUsers :Start " + new Date());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
S.o.p(Thread.currentThread().getName() + " :ArchiveUsers :End " + new Date());
}
@Async
public void ArchiveReports() {
S.o.p(Thread.currentThread().getName() + ":ArchiveReports : Start" + new Date());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
S.o.p(Thread.currentThread().getName() + " :ArchiveReports : End" + new Date());
}
}
*** decomClinicalTrial :: Started***
*** decomClinicalTrial :: Completed.***
task-1 :ArchiveUsers : Start 24 Sep 2021 08:01:20 GMT
task-2 :ArchiveReports : Start24 Sep 2021 08:01:20 GMT
task-2 :ArchiveReports : End24 Sep 2021 08:01:25 GMT
task-1 :ArchiveUsers :End 24 Sep 2021 08:01:25 GMT
In above ArchiveUsers, ArchiveReports are executed paralley by two different threads task-1, task-2
Async methods with Return Types
Previously, The async method only used a void return type. And in my opinion, that is the best way to write self sufficient asynchronous functions. However, If you want to handle the results from an @Async function, you are in luck. Because Spring framework provides out of the box support for these situations using Future type. This is possible by wrapping the result inside of an AsyncResult class.
Asynchronous with Future
For Example
- I have list of 100 User Objects
- I have FixedThreadPool of 10
- I want to sumbit these 100 objects to 10 threads to Save in the Database & it returns same user Object with UserId, which is Stored in DB
- I will collect each saved User object in Future.
Expectation
I thought 10 Theards will exceute 100 User Objects parallally.
Here the Problem
Also Future.get()
is a blocking method, which means it will block the execution of current thread and next iteration of loop will not continue unless this future (on which the get is called) returns. This will happen in every iteration, which makes the code non parallel.
class User{
int uid; //It will be DB primaryKey, created by DB
String name;
String address;
}
public List<User> saveUsers(List <User> users){
List<User> savedUsers = new ArrayList<>();
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (User user : users) {
Callable r = () -> userservice.saveAndReturn(user);
Future<Void> future = executorService.submit(r);
User saved = future.get(); //blocking
savedUsers.add(saved);
}
return savedUsers;
}
By some on Stackoverflow asked almost same
The reason I ask this is because in the below code the main thread would submit a task to be executed in a different thread. After submitting the task in the queue, it blocks on Future.get() method for the task to return the value. I would rather have the task executed in the main thread rather than submitting to a different thread and waiting for the results. What is that I gained by executing the task in a new thread?
I am aware that you could wait for a limited time etc, but then what if I really care about the result? The problem gets worse if there are multiple tasks to be executed. It seems to me that we are just doing the work synchronously. I am aware of the Guava library which provides a non blocking listener interface
We have Few Solutions
Feature API it self
Future
offers you method isDone()
which is not blocking and returns true if computation has completed, false otherwise.
Future.get()
is used to retrieve the result of computation.
You have a couple of options:
- call
isDone()
and if the result is ready ask for it by invokingget()
, notice how there is no blocking - block indefinitely with
get()
- block for specified timeout with
get(long timeout, TimeUnit unit)
The whole Future API
thing is there to have easy way obtaining values from threads executing parallel tasks. This can be done synchronously or asynchronously if you prefer, as described in bullets above.
You can try below
-
If you really want to block till you get the result, use blocking
get()
-
If you can wait for a specific period to know the status instead of infinite blocking duration, use
get()
with time-out -
If you can continue without analysing the result immediately and inspect the result at future time, use CompletableFuture (java 8)
A Future that may be explicitly completed (setting its value and status), and may be used as a CompletionStage, supporting dependent functions and actions that trigger upon its completion.
-
You can implement callback mechanism from your Runnable/Callable. Have a look at below SE question:
Java executors: how to be notified, without blocking, when a task completes?
CompletableFuture
In Java 8 you can use CompletableFuture. Here’s an example I had in my code where I’m using it to fetch users from my user service, map them to my view objects and then update my view or show an error dialog :
CompletableFuture.supplyAsync(
userService::listUsers
).thenApply(
this::mapUsersToUserViews
).thenAccept(
this::updateView
).exceptionally(
throwable -> { showErrorDialogFor(throwable); return null; }
);
It executes asynchronously. I’m using two private methods: mapUsersToUserViews
and updateView
.
Order processing problem with Future
Java has a Future interface for this. A Future represents the result of an asynchronous operation. Long running methods, external service calls and database hits are the good candidates for asynchronous operations.
It is interesting that this get() is a blocking call, waits if necessary for the computation to complete and then retrieves the result.
We can also set a timeLimit on how long this asynchronous task can take by passing time and timeUnit in the get() and it throws TimeOutException along with the InterruptedException and ExecutionException.
The Future interface also has methods called isDone() which is used to check the completion of the asynchronous operation, isCancelled() as the name suggests to check the task is cancelled, and cancel() which attempts to cancel the task and it fails if the task has already completed, cancelled or could not cancel the task.
if we have List of objects, it will block at get() of 1st object & wait until its complete.
Let’s say we have Order object, which will undergo multiple methods to get final Order object with all details.
if we have 100 Order objects, each Order object needs to go 5 stages, but if we use Future here, it will block at every get() call. and the call not async any more.
Limitations of Future interface
- Futures cannot be completed manually.
- There is no way to execute multiple futures (or results) in parallel and then combine the results together.
- There are no exception handling constructs for Future.
- Future doesn’t have the mechanism to create multiple stages of processing that can be chained together. It needs to be done manually.
- Future doesn’t have the mechanism to notify you of the completion of an API.
Fortunately, with the release of Java 8, CompletableFuture
combats all of the above problems and provides a much better asynchronous programming approach in Java.
CompletableFuture
While Future Interface was a great improvement over Threads when it came to asynchronous programming, it still had several limitations most notable of which was the blocking call.
When CompletableFuture was introduced, it overcame some of Future Interface’s shortcomings and resolved several of its limitations. Namely:
- Non-blocking
- Ability to Programmatically completing a future
- Perform Error handling
- Ability to Chain several futures
- Ability to combine results of multiple futures (that run in parallel)
If we submit 100 Orders CompletableFuture in the Order process.It will do …
- It will get the 1st Order by one thread, run as async and pass result to next and next…
- Parllarlly , another theread will process another Order…in parllel all orders will be executed. NON Blocking
They only diffrence with applyAsnc is we can pass another ExecutorSrvice to process that pertilar method call with another therad of that ExecutorService.
CompletableFuture has over 50 different methods for composing, combining, and executing asynchronous computation steps and handling errors.
First, the CompletableFuture class implements the Future interface so that we can use it as a Future implementation but with additional completion logic.
Here we will go through some of the most common methods
Creating a CompletableFuture
1. The trivial example -
You can create a CompletableFuture simply by using the following no-arg constructor -
CompletableFuture<String> completableFuture = new CompletableFuture<String>();
This is the simplest CompletableFuture that you can have. All the clients who want to get the result of this CompletableFuture can call CompletableFuture.get()
method -
String result = completableFuture.get()
The get()
method blocks until the Future is complete. So, the above call will block forever because the Future is never completed.
You can use CompletableFuture.complete()
method to manually complete a Future -
completableFuture.complete("Future's Result")
All the clients waiting for this Future will get the specified result. And, Subsequent calls to completableFuture.complete()
will be ignored.
2. Running asynchronous computation using runAsync()
-
If you want to run some background task asynchronously and don’t want to return anything from the task, then you can use CompletableFuture.runAsync()
method. It takes a Runnable object and returns CompletableFuture<Void>
.
// Run a task specified by a Runnable Object asynchronously.
CompletableFuture<Void> future = CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {
// Simulate a long-running Job
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
System.out.println("I'll run in a separate thread than the main thread.");
}
});
// Block and wait for the future to complete
future.get()
You can also pass the Runnable object in the form of a lambda expression -
// Using Lambda Expression
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
// Simulate a long-running Job
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
System.out.println("I'll run in a separate thread than the main thread.");
});
In this post, I’ll use lambda expressions very frequently, and you should use it too if you’re not already using it in your Java code.
3. Run a task asynchronously and return the result using supplyAsync()
-
CompletableFuture.runAsync()
is useful for tasks that don’t return anything. But what if you want to return some result from your background task?
Well, CompletableFuture.supplyAsync()
is your companion. It takes a [Supplier
// Run a task specified by a Supplier object asynchronously
CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
return "Result of the asynchronous computation";
}
});
// Block and get the result of the Future
String result = future.get();
System.out.println(result);
A [Supplier
Once again, you can use Java 8’s lambda expression to make the above code more concise -
// Using Lambda Expression
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
return "Result of the asynchronous computation";
});
A note about Executor and Thread Pool -
You might be wondering that - Well, I know that the
runAsync()
andsupplyAsync()
methods execute their tasks in a separate thread. But, we never created a thread right?Yes! CompletableFuture executes these tasks in a thread obtained from the global ForkJoinPool.commonPool().
But hey, you can also create a Thread Pool and pass it to
runAsync()
andsupplyAsync()
methods to let them execute their tasks in a thread obtained from your thread pool.All the methods in the CompletableFuture API has two variants - One which accepts an Executor as an argument and one which doesn’t -
// Variations of runAsync() and supplyAsync() methods static CompletableFuture<Void> runAsync(Runnable runnable) static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
Here’s how you can create a thread pool and pass it to one of these methods -
Executor executor = Executors.newFixedThreadPool(10); CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new IllegalStateException(e); } return "Result of the asynchronous computation"; }, executor);
Transforming and acting on a CompletableFuture
The CompletableFuture.get()
method is blocking. It waits until the Future is completed and returns the result after its completion.
But, that’s not what we want right? For building asynchronous systems we should be able to attach a callback to the CompletableFuture which should automatically get called when the Future completes.
That way, we won’t need to wait for the result, and we can write the logic that needs to be executed after the completion of the Future inside our callback function.
You can attach a callback to the CompletableFuture using thenApply()
, thenAccept()
and thenRun()
methods -
1. thenApply()
You can use thenApply()
method to process and transform the result of a CompletableFuture when it arrives. It takes a Function<T,R> as an argument. Function<T,R> is a simple functional interface representing a function that accepts an argument of type T and produces a result of type R -
// Create a CompletableFuture
CompletableFuture<String> whatsYourNameFuture = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
return "Rajeev";
});
// Attach a callback to the Future using thenApply()
CompletableFuture<String> greetingFuture = whatsYourNameFuture.thenApply(name -> {
return "Hello " + name;
});
// Block and get the result of the future.
System.out.println(greetingFuture.get()); // Hello Rajeev
You can also write a sequence of transformations on the CompletableFuture by attaching a series of thenApply()
callback methods. The result of one thenApply()
method is passed to the next in the series -
CompletableFuture<String> welcomeText = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
return "Rajeev";
}).thenApply(name -> {
return "Hello " + name;
}).thenApply(greeting -> {
return greeting + ", Welcome to the CalliCoder Blog";
});
System.out.println(welcomeText.get());
// Prints - Hello Rajeev, Welcome to the CalliCoder Blog
2. thenAccept() and thenRun()
If you don’t want to return anything from your callback function and just want to run some piece of code after the completion of the Future, then you can use thenAccept()
and thenRun()
methods. These methods are consumers and are often used as the last callback in the callback chain.
CompletableFuture.thenAccept()
takes a [Consumer
// thenAccept() example
CompletableFuture.supplyAsync(() -> {
return ProductService.getProductDetail(productId);
}).thenAccept(product -> {
System.out.println("Got product detail from remote service " + product.getName())
});
While thenAccept()
has access to the result of the CompletableFuture on which it is attached, thenRun()
doesn’t even have access to the Future’s result. It takes a Runnable
and returns CompletableFuture<Void>
-
// thenRun() example
CompletableFuture.supplyAsync(() -> {
// Run some computation
}).thenRun(() -> {
// Computation Finished.
});
A note about async callback methods -
All the callback methods provided by CompletableFuture have two async variants -
// thenApply() variants <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
These async callback variations help you further parallelize your computations by executing the callback tasks in a separate thread.
Consider the following example -
CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new IllegalStateException(e); } return "Some Result" }).thenApply(result -> { /* Executed in the same thread where the supplyAsync() task is executed or in the main thread If the supplyAsync() task completes immediately (Remove sleep() call to verify) */ return "Processed Result" })
In the above case, the task inside
thenApply()
is executed in the same thread where thesupplyAsync()
task is executed, or in the main thread if thesupplyAsync()
task completes immediately (try removingsleep()
call to verify).To have more control over the thread that executes the callback task, you can use async callbacks. If you use
thenApplyAsync()
callback, then it will be executed in a different thread obtained fromForkJoinPool.commonPool()
-CompletableFuture.supplyAsync(() -> { return "Some Result" }).thenApplyAsync(result -> { // Executed in a different thread from ForkJoinPool.commonPool() return "Processed Result" })
Moreover, If you pass an Executor to the
thenApplyAsync()
callback then the task will be executed in a thread obtained from the Executor’s thread pool.Executor executor = Executors.newFixedThreadPool(2); CompletableFuture.supplyAsync(() -> { return "Some result" }).thenApplyAsync(result -> { // Executed in a thread obtained from the executor return "Processed Result" }, executor);
Combining two CompletableFutures together
1. Combine two dependent futures using thenCompose() -
Let’s say that you want to fetch the details of a user from a remote API service and once the user’s detail is available, you want to fetch his Credit rating from another service.
Consider the following implementations of getUserDetail()
and getCreditRating()
methods -
CompletableFuture<User> getUsersDetail(String userId) {
return CompletableFuture.supplyAsync(() -> {
return UserService.getUserDetails(userId);
});
}
CompletableFuture<Double> getCreditRating(User user) {
return CompletableFuture.supplyAsync(() -> {
return CreditRatingService.getCreditRating(user);
});
}
Now, Let’s understand what will happen if we use thenApply()
to achieve the desired result -
CompletableFuture<CompletableFuture<Double>> result = getUserDetail(userId)
.thenApply(user -> getCreditRating(user));
In earlier examples, the Supplier
function passed to thenApply()
callback would return a simple value but in this case, it is returning a CompletableFuture. Therefore, the final result in the above case is a nested CompletableFuture.
If you want the final result to be a top-level Future, use thenCompose()
method instead -
CompletableFuture<Double> result = getUserDetail(userId)
.thenCompose(user -> getCreditRating(user));
So, Rule of thumb here - If your callback function returns a CompletableFuture, and you want a flattened result from the CompletableFuture chain (which in most cases you would), then use thenCompose()
.
2. Combine two independent futures using thenCombine() -
While thenCompose()
is used to combine two Futures where one future is dependent on the other, thenCombine()
is used when you want two Futures to run independently and do something after both are complete.
System.out.println("Retrieving weight.");
CompletableFuture<Double> weightInKgFuture = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
return 65.0;
});
System.out.println("Retrieving height.");
CompletableFuture<Double> heightInCmFuture = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
return 177.8;
});
System.out.println("Calculating BMI.");
CompletableFuture<Double> combinedFuture = weightInKgFuture
.thenCombine(heightInCmFuture, (weightInKg, heightInCm) -> {
Double heightInMeter = heightInCm/100;
return weightInKg/(heightInMeter*heightInMeter);
});
System.out.println("Your BMI is - " + combinedFuture.get());
The callback function passed to thenCombine()
will be called when both the Futures are complete.
Combining multiple CompletableFutures together
We used thenCompose()
and thenCombine()
to combine two CompletableFutures together. Now, what if you want to combine an arbitrary number of CompletableFutures? Well, you can use the following methods to combine any number of CompletableFutures -
static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
1. CompletableFuture.allOf()
CompletableFuture.allOf
is used in scenarios when you have a List of independent futures that you want to run in parallel and do something after all of them are complete.
Let’s say that you want to download the contents of 100 different web pages of a website. You can do this operation sequentially but this will take a lot of time. So, you have written a function which takes a web page link, and returns a CompletableFuture, i.e. It downloads the web page’s content asynchronously -
CompletableFuture<String> downloadWebPage(String pageLink) {
return CompletableFuture.supplyAsync(() -> {
// Code to download and return the web page's content
});
}
Now, when all the web pages are downloaded, you want to count the number of web pages that contain a keyword - ‘CompletableFuture’. Let’s use CompletableFuture.allOf()
to achieve this -
List<String> webPageLinks = Arrays.asList(...) // A list of 100 web page links
// Download contents of all the web pages asynchronously
List<CompletableFuture<String>> pageContentFutures = webPageLinks.stream()
.map(webPageLink -> downloadWebPage(webPageLink))
.collect(Collectors.toList());
// Create a combined Future using allOf()
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
pageContentFutures.toArray(new CompletableFuture[pageContentFutures.size()])
);
The problem with CompletableFuture.allOf()
is that it returns CompletableFuture<Void>
. But we can get the results of all the wrapped CompletableFutures by writing few additional lines of code -
// When all the Futures are completed, call `future.join()` to get their results and collect the results in a list -
CompletableFuture<List<String>> allPageContentsFuture = allFutures.thenApply(v -> {
return pageContentFutures.stream()
.map(pageContentFuture -> pageContentFuture.join())
.collect(Collectors.toList());
});
Take a moment to understand the above code snippet. Since we’re calling future.join()
when all the futures are complete, we’re not blocking anywhere :-)
The join()
method is similar to get()
. The only difference is that it throws an unchecked exception if the underlying CompletableFuture completes exceptionally.
Let’s now count the number of web pages that contain our keyword -
// Count the number of web pages having the "CompletableFuture" keyword.
CompletableFuture<Long> countFuture = allPageContentsFuture.thenApply(pageContents -> {
return pageContents.stream()
.filter(pageContent -> pageContent.contains("CompletableFuture"))
.count();
});
System.out.println("Number of Web Pages having CompletableFuture keyword - " +
countFuture.get());
2. CompletableFuture.anyOf()
CompletableFuture.anyOf()
as the name suggests, returns a new CompletableFuture which is completed when any of the given CompletableFutures complete, with the same result.
Consider the following example -
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
return "Result of Future 1";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
return "Result of Future 2";
});
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
return "Result of Future 3";
});
CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2, future3);
System.out.println(anyOfFuture.get()); // Result of Future 2
In the above example, the anyOfFuture
is completed when any of the three CompletableFutures complete. Since future2
has the least amount of sleep time, it will complete first, and the final result will be - Result of Future 2.
CompletableFuture.anyOf()
takes a varargs of Futures and returns CompletableFuture<Object>
. The problem with CompletableFuture.anyOf()
is that if you have CompletableFutures that return results of different types, then you won’t know the type of your final CompletableFuture.
CompletableFuture Exception Handling
We explored How to create CompletableFuture, transform them, and combine multiple CompletableFutures. Now let’s understand what to do when anything goes wrong.
Let’s first understand how errors are propagated in a callback chain. Consider the following CompletableFuture callback chain -
CompletableFuture.supplyAsync(() -> {
// Code which might throw an exception
return "Some result";
}).thenApply(result -> {
return "processed result";
}).thenApply(result -> {
return "result after further processing";
}).thenAccept(result -> {
// do something with the final result
});
If an error occurs in the original supplyAsync()
task, then none of the thenApply()
callbacks will be called and future will be resolved with the exception occurred. If an error occurs in first thenApply()
callback then 2nd and 3rd callbacks won’t be called and the future will be resolved with the exception occurred, and so on.
1. Handle exceptions using exceptionally() callback
The exceptionally()
callback gives you a chance to recover from errors generated from the original Future. You can log the exception here and return a default value.
Integer age = -1;
CompletableFuture<String> maturityFuture = CompletableFuture.supplyAsync(() -> {
if(age < 0) {
throw new IllegalArgumentException("Age can not be negative");
}
if(age > 18) {
return "Adult";
} else {
return "Child";
}
}).exceptionally(ex -> {
System.out.println("Oops! We have an exception - " + ex.getMessage());
return "Unknown!";
});
System.out.println("Maturity : " + maturityFuture.get());
Note that, the error will not be propagated further in the callback chain if you handle it once.
2. Handle exceptions using the generic handle() method
The API also provides a more generic method - handle()
to recover from exceptions. It is called whether or not an exception occurs.
Integer age = -1;
CompletableFuture<String> maturityFuture = CompletableFuture.supplyAsync(() -> {
if(age < 0) {
throw new IllegalArgumentException("Age can not be negative");
}
if(age > 18) {
return "Adult";
} else {
return "Child";
}
}).handle((res, ex) -> {
if(ex != null) {
System.out.println("Oops! We have an exception - " + ex.getMessage());
return "Unknown!";
}
return res;
});
System.out.println("Maturity : " + maturityFuture.get());
If an exception occurs, then the res
argument will be null, otherwise, the ex
argument will be null.
Customize No. of Threads Used by CompletetableFeature
When you provide a task to CompletableFuture.supplyAsync()
, it goes to the default instance of ForkJoinPool
that has as many threads as your computer has CPUs.
-
static CompletableFuture supplyAsync(Supplier supplier) Returns a new CompletableFuture that is asynchronously completed by a task running in the ForkJoinPool.commonPool() with the value obtained by calling the given Supplier.
-
static CompletableFuture supplyAsync(Supplier supplier, Executor executor) Returns a new CompletableFuture that is asynchronously completed by a task running in the given executor with the value obtained by calling the given Supplier.
If I use “static CompletableFuture supplyAsync(Supplier supplier)” method , it by default use ForkJoinPool.commonPool(). This returns a ForkJoinPool which has the number of worker threads equal to the number of available cores in the running machine.
We can SET maximum no of threds by passing excecutor to this method. supplyAsync(Supplier supplier, Executor executor)
ForkJoinPool
implements Executor
.
Therefore, you can write your code like this:
int threadCount = 3;
ForkJoinPool myPool = new ForkJoinPool(threadCount);
CompletableFuture cf = CompletableFuture.supplyAsync(mySup, myPool);
or
final int NUM_OF_THREADS = 10;
ExecutorService executor = Executors.newFixedThreadPool(
Math.min(tasks.size(), NUM_OF_THREADS));
List<CompletableFuture<Integer>> futures = tasks.stream()
.map(tmpTask -> {
return CompletableFuture.supplyAsync(() -> {
return tmpTask.calculate();
}, executor);
})
.collect(Collectors.toList());
RealTime Example
ExecutorService executor = Executors.newFixedThreadPool(THREADPOOL_SIZE);
CompletableFuture<TaskStatus> taskStatusBo = CompletableFuture.supplyAsync(() -> {
try {
//This is asyinc job
taskService.doSomeTask(taskStatusBo, SomeObject);
} catch (Exception e) {
log.error(e.getMessage(), e);
taskStatusBo.setStatus(STATUS_FAILED);
taskStatusRepository.save(taskStatusBo);
}
return taskStatusBo;
}, executor)
.completeOnTimeout(taskStatusBo, TimeOut, TimeUnit.SECONDS) // This is for Setting tiomeout (30 sec)
.whenComplete((result, exception) -> {
//This Block will call after TimeOut (30 Sec)
if (exception != null) {
log.error(exception.getMessage(), exception);
if (exception instanceof TimeoutException == Boolean.FALSE) {
if (null != taskStatusBo) {
taskStatusBo.setStatus(STATUS_FAILED);
taskStatusRepository.save(activity);
}
}
}
});
Ref.
CompletetableFeature https://www.youtube.com/watch?v=ImtZgX1nmr8&t=1033s https://medium.com/swlh/completablefuture-a-simplified-guide-to-async-programming-41cecb162308 https://medium.com/swlh/futures-in-java-completablefuture-69acc14fa71d https://levelup.gitconnected.com/everything-you-need-to-know-about-the-completablefuture-api-ec357e731a5c https://www.callicoder.com/java-8-completablefuture-tutorial/ https://reflectoring.io/java-completablefuture/
SpringBoot https://swathisprasad.medium.com/multi-threading-in-spring-boot-using-completablefuture-a7ca68a0fe48#:~:text=Multi%2Dthreading%20is%20similar%20to,blocking%20and%20multi%2Dthreaded%20code.
https://www.youtube.com/watch?v=3rJBLFA95Io
https://levelup.gitconnected.com/completablefuture-a-new-era-of-asynchronous-programming-86c2fe23e246