vamossagar12 commented on PR #13801: URL: https://github.com/apache/kafka/pull/13801#issuecomment-1908434176
@C0urante , I did some analysis today pertaining to the version that was present with `CompletableFuture` [here](https://github.com/apache/kafka/blob/062591757a04647eb4837348f59b0e5736b6372f/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java#L312-L330). I wrote a small program to mimic the behaviour ie returning a future, having possibly blocking callback methods chained, from a method ``` package org.example; import java.time.Duration; import java.util.concurrent.*; public class CompletableFutureTest { private static Future<Void> getFuture() { CompletableFuture<Void> overallFuture = CompletableFuture.completedFuture(null); overallFuture = overallFuture.thenRun(() -> { try { System.out.println("Function 1 - Thread::" + Thread.currentThread()); long initTime = System.currentTimeMillis(); System.out.println("Function 1 - Initial Time::" + initTime); Thread.sleep(Duration.ofSeconds(30).toMillis()); System.out.println("Function 1 - Time Elapsed::" + (System.currentTimeMillis() - initTime) / 1000); } catch (InterruptedException e) { throw new RuntimeException(e); } }); overallFuture = overallFuture.thenRun(() -> { long initTime = System.currentTimeMillis(); System.out.println("Function 2 - Thread::" + Thread.currentThread()); System.out.println("Function 2 - Initial Time::" + initTime); try { Thread.sleep(Duration.ofSeconds(40).toMillis()); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println("Function 2 - Time Elapsed::" + (System.currentTimeMillis() - initTime) / 1000); }); return overallFuture; } public static void main(String[] args) { System.out.println("Begin main"); System.out.println("Main - Thread::" + Thread.currentThread()); Future<Void> future = getFuture(); try { long initTime = System.currentTimeMillis(); future.get(); System.out.println("Main - Time Elapsed::" + (System.currentTimeMillis() - initTime) / 1000); System.out.println("End of main"); } catch (InterruptedException e) { throw new RuntimeException(e); } catch (ExecutionException e) { throw new RuntimeException(e.getCause()); } } } ``` and this is the output I get ``` Begin main Main - Thread::Thread[main,5,main] Function 1 - Thread::Thread[main,5,main] Function 1 - Initial Time::1706111085236 Function 1 - Time Elapsed::30 Function 2 - Thread::Thread[main,5,main] Function 2 - Initial Time::1706111115248 Function 2 - Time Elapsed::40 Main - Time Elapsed::0 End of main ``` So, essentially the callbacks get executed as soon as the `getFuture()` method is invoked and by the time we get to the `get()` call in main, the future is already resolved and it returns immediately (as you pointed out). I get a very similar result when I change the definition of `overallFuture` to ``` CompletableFuture<Void> overallFuture = CompletableFuture.supplyAsync(() -> null); ``` However, if I change the invocations of callback to use `thenRunAsync`, then I get totally different results (which is what we expect): ``` Begin main Main - Thread::Thread[main,5,main] Function 1 - Thread::Thread[ForkJoinPool.commonPool-worker-1,5,main] Function 1 - Initial Time::1706111422731 Function 1 - Time Elapsed::30 Function 2 - Thread::Thread[ForkJoinPool.commonPool-worker-1,5,main] Function 2 - Initial Time::1706111452740 Function 2 - Time Elapsed::40 Main - Time Elapsed::70 End of main ``` i.e this time, the `getFuture` method returns immediately and the lambdas get invoked when `get` is invoked on the returned future. I think the reason for that is that the methods in `CompletableFuture` are executed immediately upon invocation. It's only the `xxxAsync` variants that execute on the common fork join pool (can be seen in the above printed thread) or if needed on a separate thread pool. This behaviour is similar to other reactive libraries like `rx-java` IIRC. I believe the offset write happens on the same thread which invokes the `set()` method and we would want to continue that behaviour, so I am not sure we would want to use the `xxxAsync` variants. I even tried using `KafkaFutureImpl` which is Kafka's internal implementation for async programming, but I get pretty similar results. Let me know what you think. And btw, thanks for the `thenRun()` suggestion. That makes sense in this case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org