Intro
Imagine that you have to use external systems to retrieve data or you deal with communication in distributed architecture (communication between modules within application). In this article I would like to tackle one possible problem, but please note that this approach can be used in a many different scenarios.
Please note that I don’t want to provide another “CompletableFuture guide”. If you are interested in specific options and possibilities of CompletableFuture I recommend you great article by Tomasz Nurkiewicz.
Problem
We would like to write simple application that will provide us best possible exchange rate from public exchange rate systems (let’s assume that bigger exchange rate value is better as we want to sell). What we want is to convert from our “local” currency (in my example it will be PLN) to selected one (using simple rest call). Response should inform us only about highest rate.
So if I run my application on localhost:8080 and I would like to check lowest exchange rate for GBP then it can looks like:
1 2 3 4 5 6 7 8 9 |
╭─dpokusa@venture ~ ╰─➤ http :8080/GBP HTTP/1.1 200 OK Content-Type: application/json;charset=UTF-8 Date: Sat, 05 Nov 2016 17:20:25 GMT Transfer-Encoding: chunked X-Application-Context: application:8081 4.8631 |
(In example above I used httpie. I also recommend postman if you prefer more GUI-oriented tools :))
Another example (USD):
1 2 3 4 5 6 7 8 9 |
╭─dpokusa@venture ~ ╰─➤ http :8080/USD HTTP/1.1 200 OK Content-Type: application/json;charset=UTF-8 Date: Sat, 05 Nov 2016 17:25:12 GMT Transfer-Encoding: chunked X-Application-Context: application:8081 3.8933 |
In our example we will check 3 external services for exchange rates:
- NBP (Polish National Bank) – http://api.nbp.pl/en.html
- Fixer – http://fixer.io
- Yahoo Finance API
Concept
First of all as you can imagine we will make a few external calls to described above systems (every call will take some time). Best option is to call asynchronously to every exchange rate API and after receiving all rates just check which one is the highest and finally make a response.
We don’t want to make every request synchronously, because we will increase response time from our application.
Solution
Basic solution
Let’s start with some general interface for every API call:
1 2 3 |
interface ExchangeRate { BigDecimal retrieve(); } |
Then let’s provide some implementation for one of calls:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
class NBPExchangeRate implements ExchangeRate { private static final Logger log = LoggerFactory.getLogger(NBPExchangeRate.class); private static final String NBP_API_CURRENCY_RATE_PATH = "http://api.nbp.pl/api/exchangerates/rates/a/%s?format=json"; private final String url; private final String currency; NBPExchangeRate(String currency) { this.currency = currency; this.url = StringFormatter.format(NBP_API_CURRENCY_RATE_PATH, currency).getValue(); } @Override public BigDecimal retrieve() { RestTemplate restTemplate = new RestTemplate(); NBPApiResponse result = restTemplate.getForObject(url, NBPApiResponse.class); BigDecimal currentRate = result.getRates()[0].getMid(); log.info("Current rate for {} is {}", currency, currentRate); return currentRate; } private static class NBPApiResponse { private String table; private String currency; private String code; private Rates[] rates; // getters&setters here } private static class Rates { private String no; private BigDecimal mid; @JsonDeserialize(using = LocalDateDeserializer.class) private LocalDate effectiveDate; // getters&setters here } } |
As you can see there, it’s very simple standard code. I use Spring’s RestTemplate to prepare and make external call. What’s important here – so far we don’t use any CompletableFuture so we can test this kind of classes as usual.
What I want to show is that you can provide functionality without thinking about async invocations. You can add this after finishing basic implementation.
Lets go async
Let’s look what we can achieve by using CompletableFuture . This class provides you possibility of creating object that will do some operations “in background” (We will talk about executors which do this operations later in this article). This gives you possibility to create non blocking operations (such as external calls) for future usage. Once you want to use information from CompletableFuture it start blocking until Future is completed. However, if operation is already completed then it will not block thread at all.
This can help you in a couple of scenarios:
- You want to do some expensive operation and use it later. Then you can create CompletableFuture at first, make other required operations and then use completed Future . In case it’s not completed your app will just wait for it
- You want to do some expensive operations concurrently and then combine them in some way (this is what we want, and this is most popular scenario)
How do you create CompletableFuture ? The easiest way is to use static factory methods provided by this class. For example: CompletableFuture.supplyAsync(Supplier<u> supplier)</u> . Supplier class is a @FunctionalInterface so you can use lambdas 🙂
Lets try then to provide some Factory for creating our asynchronous calls. This is not the best option, however in this case it can be a good starting point:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
@Component public class ExchangeRateFactory { public CompletableFuture<BigDecimal> nbpRateFor(String currency) { return prepareFuture(new NBPExchangeRate(currency)); } public CompletableFuture<BigDecimal> fixerRateFor(String currency) { return prepareFuture(new FixerExchangeRate(currency)); } public CompletableFuture<BigDecimal> yahooRateFor(String currency) { return prepareFuture(new YahooFinanceExchangeRate(currency)); } private CompletableFuture<BigDecimal> prepareFuture(ExchangeRate exchangeRate) { return CompletableFuture .supplyAsync(() -> exchangeRate.retrieve()) .exceptionally(t -> BigDecimal.ZERO); } } |
I also added exceptionally here for each created CompletableFuture – this means that in case that some exception will stop completion of particular Future then provided method will return value as a result. Receiving Throwable t gives you possibility to adjust result to situation. In this simple example it will return 0.
Finally. we are ready to use it!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
@RestController public class BestCurrencyController { private static final Logger log = LoggerFactory.getLogger(BestCurrencyController.class); @Autowired private ExchangeRateFactory exchangeRateFactory; @RequestMapping(value = "/{currency}", method = RequestMethod.GET) public BigDecimal bestCurrencyRate(@PathVariable String currency) { CompletableFuture<BigDecimal> nbpRate = exchangeRateFactory.nbpRateFor(currency); CompletableFuture<BigDecimal> fixerRate = exchangeRateFactory.fixerRateFor(currency); CompletableFuture<BigDecimal> yahooRate = exchangeRateFactory.yahooRateFor(currency); CompletableFuture<BigDecimal> best = nbpRate .thenCombineAsync(fixerRate, (nbp, fixer) -> selectBest(nbp, fixer)) .thenCombineAsync(yahooRate, (combined, yahoo) -> selectBest(combined, yahoo)); try { return best.get(1000, TimeUnit.MILLISECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { log.error("Can't retrieve best currency rate ", e); return BigDecimal.ZERO; } } private BigDecimal selectBest(BigDecimal first, BigDecimal second) { log.info("selecting best between " + first + " and " + second); return first.max(second); } } |
Completable Futures are being created in lines 12-14, then (lines 16-19) we take nbpRate as our basic value, combine result with fixerRate and select better option (higher), then again we combine this result with yahooRate . This will create another CompletableFuture – let’s call it best.
Finally, as we don’t have anything more to do we use get() method on our Future . This method as I wrote before is blocking- so we will wait until our Futures will complete and we will have our solution. Then we will just return it as a result.
Now you can see why I said that previous fabric its not our best option. If you have to add new service (and its very likely in our problem) then you will have to add here new lines and implementation which breaks Open/Close principle. I use this approach for sake of simplicity of explanation.
Executors
You can run CompletableFuture by using default Executor (as in all examples in this article) what can be fine for simple solutions. But for real world complex requirements this probably will be not effective and can lead to serious problems. If you want to learn more about default executors, Oleg Schelajev did great article about choosing right executor for your tasks.
However, if you are using spring you probably should provide TaskExecutor which has similar interface as java.util.concurrent.Executor , but Spring provides a few nice and configurable implementations of it.
Take an example:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
@Configuration public class ExecutorConfiguration { private static int CORE_POOL_SIZE = 2; private static int MAX_POOL_SIZE = 10; @Bean public TaskExecutor exchangeRateTaskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); ThreadFactory threadFactory = new CustomizableThreadFactory("ExchangeRateTaskExecutor-"); executor.setThreadFacstory(threadFactory); executor.setCorePoolSize(CORE_POOL_SIZE); executor.setMaxPoolSize(MAX_POOL_SIZE); TaskExecutor taskExecutor = new ConcurrentTaskExecutor(executor); executor.initialize(); return taskExecutor; } } |
This configuration provides ThreadPoolTaskExecutor which uses CustomizableThreadFactory . One can set max pool size (max number of threads that will be created), core pool size (base number of thread that should be in use) and of course more options.
Don’t use default executor. If you use spring, just go with your own configured TaskExecutor . If you use JEE or other frameworks pick up one of available executors in java.util.concurrent package. Concurrent programming is very challenging so if you can keep something under control then definitely you should do it 🙂
To use particular executor just use second implementation of supplyAsync method:
1 2 3 |
CompletableFuture .supplyAsync(() -> exchangeRate.retrieve(), executor) .exceptionally(t -> BigDecimal.ZERO); |
Afterwords
There are more problems around this topic, for example, you need to figure out how to deal with situation when external service is down. We also need to consider how to test our solution and how to simplify future maintenance. There is always a need of providing a good balance- we should always provide sufficient, smallest and simplest possible solution- it’s not easy task to do though 🙂
Most important criteria that should be considered before providing final solution:
- Reliability
- Resilience
- Maintainability
- Testability
- Performance