vamossagar12 commented on PR #13801:
URL: https://github.com/apache/kafka/pull/13801#issuecomment-1908434176

   @C0urante , I did some analysis today pertaining to the version that was 
present with `CompletableFuture` 
[here](https://github.com/apache/kafka/blob/062591757a04647eb4837348f59b0e5736b6372f/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java#L312-L330).
   
   I wrote a small program to mimic the behaviour ie returning a future, having 
possibly blocking callback methods chained, from a method 
   
   ```
   package org.example;
   
   import java.time.Duration;
   import java.util.concurrent.*;
   
   public class CompletableFutureTest {
   
       private static Future<Void> getFuture() {
           CompletableFuture<Void> overallFuture = 
CompletableFuture.completedFuture(null);
           overallFuture = overallFuture.thenRun(() -> {
               try {
                   System.out.println("Function 1 - Thread::" + 
Thread.currentThread());
                   long initTime = System.currentTimeMillis();
                   System.out.println("Function 1 - Initial Time::" + initTime);
                   Thread.sleep(Duration.ofSeconds(30).toMillis());
                   System.out.println("Function 1 - Time Elapsed::" + 
(System.currentTimeMillis() - initTime) / 1000);
               } catch (InterruptedException e) {
                   throw new RuntimeException(e);
               }
           });
           overallFuture = overallFuture.thenRun(() -> {
               long initTime = System.currentTimeMillis();
               System.out.println("Function 2 - Thread::" + 
Thread.currentThread());
               System.out.println("Function 2 - Initial Time::" + initTime);
               try {
                   Thread.sleep(Duration.ofSeconds(40).toMillis());
               } catch (InterruptedException e) {
                   throw new RuntimeException(e);
               }
               System.out.println("Function 2 - Time Elapsed::" + 
(System.currentTimeMillis() - initTime) / 1000);
           });
           return overallFuture;
       }
   
       public static void main(String[] args) {
           System.out.println("Begin main");
           System.out.println("Main - Thread::" + Thread.currentThread());
           Future<Void> future = getFuture();
           try {
               long initTime = System.currentTimeMillis();
               future.get();
               System.out.println("Main - Time Elapsed::" + 
(System.currentTimeMillis() - initTime) / 1000);
               System.out.println("End of main");
           } catch (InterruptedException e) {
               throw new RuntimeException(e);
           } catch (ExecutionException e) {
               throw new RuntimeException(e.getCause());
           }
       }
   }
   ```
   and this is the output I get
   
   ```
   Begin main
   Main - Thread::Thread[main,5,main]
   Function 1 - Thread::Thread[main,5,main]
   Function 1 - Initial Time::1706111085236
   Function 1 - Time Elapsed::30
   Function 2 - Thread::Thread[main,5,main]
   Function 2 - Initial Time::1706111115248
   Function 2 - Time Elapsed::40
   Main - Time Elapsed::0
   End of main
   ```
   
   So, essentially the callbacks get executed as soon as the `getFuture()` 
method is invoked and by the time we get to the `get()` call in main, the 
future is already resolved and it returns immediately (as you pointed out).
   
   I get a very similar result when I change the definition of `overallFuture` 
to 
   
   ```
   CompletableFuture<Void> overallFuture = CompletableFuture.supplyAsync(() -> 
null);
   ```
   
   However, if I change the invocations of callback to use `thenRunAsync`, then 
I get totally different results (which is what we expect):
   
   ```
   Begin main
   Main - Thread::Thread[main,5,main]
   Function 1 - Thread::Thread[ForkJoinPool.commonPool-worker-1,5,main]
   Function 1 - Initial Time::1706111422731
   Function 1 - Time Elapsed::30
   Function 2 - Thread::Thread[ForkJoinPool.commonPool-worker-1,5,main]
   Function 2 - Initial Time::1706111452740
   Function 2 - Time Elapsed::40
   Main - Time Elapsed::70
   End of main
   ```
   
   i.e this time, the `getFuture` method returns immediately and the lambdas 
get invoked when `get` is invoked on the returned future. I think the reason 
for that is that the methods in `CompletableFuture` are executed immediately 
upon invocation. It's only the `xxxAsync` variants that execute on the common 
fork join pool (can be seen in the above printed thread) or if needed on a 
separate thread pool. This behaviour is similar to other reactive libraries 
like `rx-java` IIRC. I believe the offset write happens on the same thread 
which invokes the `set()` method and we would want to continue that behaviour, 
so I am not sure we would want to use the `xxxAsync` variants. I even tried 
using `KafkaFutureImpl` which is Kafka's internal implementation for async 
programming, but I get pretty similar results. Let me know what you think. 
   
   And btw, thanks for the `thenRun()` suggestion. That makes sense in this 
case.
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to