La classe CompletableFuture fait partie des nouveautés de Java 8, dans le package java.util.concurrent. Dans un premier temps, je n’y ai pas beaucoup prêté attention, parce que j’avais assez à faire pour prendre en main les lambdas et l’API Stream, et aussi parce que je trouvais sa javadoc assez confuse.

Récemment, je suis tombé sur NoBlogDefFound où Tomasz Nurkiewicz explique plein de choses sur les classes de concurrence de Java SE. Plusieurs articles traitent de CompletableFuture et ça m’a bien éclairé sur le sujet. Je vais essayer de résumer ce que j’en ai compris et ce qui m’a plu.

Future

Tout d’abord, CompletableFuture implémente Future, ce qui signifie qu’on peut l’utiliser pour lancer une tâche et récupérer un résultat plus tard avec la méthode get().

CompletableFuture vide

Evidemment, pour que ça ait un intérêt, CompletableFuture est un future avec quelques petites choses en plus… comme par exemple d’être completable, c’est à dire qu’on peut le démarrer sans lui associer de tâche et qu’on peut le terminer explicitement.

CompletableFuture<Integer> future = new CompletableFuture<>()
future.complete(42)

On peut utiliser CompletableFuture à la place de CountdownLatch : plusieurs threads se mettent en attente en appelant future.get() et on les libère en appelant future.complete(). En bonus par rapport à CountdownLatch, on passe un objet en paramètre qui sera récupéré par les threads.

CompletableFuture<Integer> future = new CompletableFuture<>()
launchThreads(future);
future.complete(42);
System.out.println("Done with value " + future.getNow(0))<;

La méthode launchThreads démarre des threads et appelle future.get().

new Thread(() -> {
  try {
    Integer value = future.get();
    System.out.println("Thread " + threadNumber + " : " + value);
  } catch (Exception e) {
    ...
  }
}).start();

On peut aussi envoyer une exception.

future.completeExceptionally(new RuntimeException("Arghh"));

Elle sera récupérée dans une ExecutionException :

try {
    Integer value = future.get();
    System.out.println("Thread " + threadNumber + " : " + value);
} catch (ExecutionException e) {
    ...
}

L’appel de get peut être remplacé par join. Dans ce cas, on ne doit pas gérer des exceptions checked et ExecutionException est remplacée par CompletionException, qui est une RuntimeException.

new Thread(() -> {
    Integer value = future.join();
    System.out.println("Thread " + threadNumber + " : " + value);
}).start();

Bref, c’est comme un Future, mais avec des méthodes complete, et une méthode join plus pratique que get.

CompletableFuture, Runnable et lambda

On peut aussi créer un CompletableFuture à partir d’un Runnable, comme on le faisait déjà avec FutureTask, sauf que, comme les constructeurs sont moins à la mode, on utilise une méthode de fabrique runAsync(Runnable). Et contrairement à FutureTask, on n’a pas de variante avec Callable. A la place, on a une méthode supplyAsync(Supplier<U> supplier), ce qui revient strictement au même si on utilise les expressions lambda.

CompletionStage

Le principal intérêt de CompletableFuture, c’est qu’on peut facilement enchainer les tâches. Ce sont les méthodes définies dans l’interface CompletionStage qui permettent ça. C’est sympa, ça ressemble à la programmation avec les streams, ou avec un Optional, à la différence prêt qu’ici on est en mode asynchrone.

thenApply applique une fonction et renvoie un nouveau CompletableFuture, avec un nouveau type de valeur. thenAccept applique un consommateur et renvoie un CompletableFuture sans valeur (<Void>). thenRun execute un Runnable ; ces variantes avec Runnable existent pour la plupart des fonctionnalités de CompletableFuture, mais pour alléger le billet je n’en parlerai plus.

CompletableFuture<Integer> future = start
      .thenApply(val -> val + 1)
      .thenApply(val -> 2 * val);

future.thenApply(val -> "Accepted " + val)
      .thenAccept(System.out::println);

Toutes ces méthodes thenXxx ont trois variantes : la simple qui s’exécute dans le même thread, async qui s’exécute dans le common pool et async avec un executor.

thenCompose est un peu le flatMap de CompletableFuture. Il utilise une fonction qui reçoit le résultat du CompletableFuture et renvoie un nouveau CompletableFuture. C’est un peu comme thenApply avec lequel on construit soi-même le nouveau CompletableFuture.

Combinaison de CompletableFuture

Il y a deux façons de combiner des CompletableFuture : prendre celui qui se termine en premier, ou celui qui se termine en dernier.

La méthode statique allOf prend un ensemble de CompletableFuture pour en faire un nouveau qui rendra la main lorsque tous seront terminés. Ça fonctionne comme une Barrier. La méthode thenCombine fait presque la même chose, avec seulement deux futures et avec une fonction qui combine les deux résultats. thenAcceptBoth fait aussi la même chose, mais en consommant les deux résultats, sans fournir de nouveau résultat.

Avec anyOf, la main sera rendue au premier terminé. La méthode applyToEither fait presque la même chose, avec seulement deux futures mais en ajoutant une transformation. applyEither fait la même chose en consommant le résultat, sans valeur de retour.

anyOf(future1, future2)
        .thenApply(val -> "Done with anyOf " + val)
        .thenAccept(System.out::println);

Gestion des erreurs

La méthode exceptionally est appelée lorsqu’un CompletableFuture se termine en exception ; elle peut être utilisée de deux façons.

On peut enchainer des then après la prise en compte de l’exception.

future.exceptionally(ex -> ...)
      .thenApply(val -> ...);

Dans ce cas, exceptionally intercepte l’exception et renvoie un nouveau CompletableFuture avec une valeur qui sera transmise à thenApply.

Par contre, si on appelle exceptionally sans enchainer, sa valeur de retour est perdu (on peut mettre null).

future.exceptionally(ex -> ...);
future.thenApply(val -> ...);

Dans notre exemple, thenApply n’est pas appelé en cas d’exception. On a simulé un try - catch dans un CompletableFuture, et pour respecter l’ordre (try puis catch), on peut aussi échanger les deux lignes sans changer le comportement.

future.thenApply(val -> ...);
future.exceptionally(ex -> ...);

Conclusions

Je confirme ma première impression : je ne trouve toujours pas cette classe très lisible. Il y a beaucoup de choses pour une seule classe (7 méthodes statiques, un cinquantaine de méthodes d’instance).

Je ne vois toujours pas très bien l’intérêt d’avoir au même endroit l’aspect complete et le coté reactive-like, d’autant que le nom insiste sur la première fonctionnalité qui ne me semble pas la plus importante. Est-ce que ça n’aurait pas pu être dans des classes, ou au moins des interfaces, différentes ? Guava a bien fait une distinction entre ListenableFuture et SettableFuture.

Et puis, l’API n’est pas très cohérente. Par exemple, getNow ne renvoie pas les mêmes exceptions que get, mais les mêmes que join. Pourquoi ne pas l’appeler joinNow ? Et que dire de la méthode allOf est qui ne renvoie pas de valeur. Et du piège tendu par la méthode cancel qui n’a pas le même comportement que pour les autres Future ?

Malgré tout, CompletableFuture est bien plus intéressante que je ne pensais au départ, à condition de connaitre et savoir utiliser les expressions lambda pour l’exploiter correctement. Avec ça, notre code peut prendre des allures Reactive. D’ailleurs, on peut trouver quelques similitudes avec la classe Observable de RxJava. Pour la peine, je vais ajouter un paragraphe sur le sujet dans ma toute nouvelle formation "Programmation parallèle et concurrente en Java".

Je l’ai déjà signalé en introduction, Tomasz Nurkiewicz dit des choses bien plus intéressantes et plus approfondies, alors n’hésitez pas à consulter les articles originaux.