dajac commented on a change in pull request #9878:
URL: https://github.com/apache/kafka/pull/9878#discussion_r663902068



##########
File path: 
clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
##########
@@ -27,217 +28,111 @@
 
 /**
  * A flexible future which supports call chaining and other asynchronous 
programming patterns.
- * This will eventually become a thin shim on top of Java 8's 
CompletableFuture.
  */
 public class KafkaFutureImpl<T> extends KafkaFuture<T> {
-    /**
-     * A convenience method that throws the current exception, wrapping it if 
needed.
-     *
-     * In general, KafkaFuture throws CancellationException and 
InterruptedException directly, and
-     * wraps all other exceptions in an ExecutionException.
-     */
-    private static void wrapAndThrow(Throwable t) throws InterruptedException, 
ExecutionException {
-        if (t instanceof CancellationException) {
-            throw (CancellationException) t;
-        } else if (t instanceof InterruptedException) {
-            throw (InterruptedException) t;
-        } else {
-            throw new ExecutionException(t);
-        }
-    }
 
-    private static class Applicant<A, B> implements BiConsumer<A, Throwable> {
-        private final BaseFunction<A, B> function;
-        private final KafkaFutureImpl<B> future;
+    private final KafkaCompletableFuture<T> completableFuture;
 
-        Applicant(BaseFunction<A, B> function, KafkaFutureImpl<B> future) {
-            this.function = function;
-            this.future = future;
-        }
+    private final boolean isDependant;
 
-        @Override
-        public void accept(A a, Throwable exception) {
-            if (exception != null) {
-                future.completeExceptionally(exception);
-            } else {
-                try {
-                    B b = function.apply(a);
-                    future.complete(b);
-                } catch (Throwable t) {
-                    future.completeExceptionally(t);
-                }
-            }
-        }
+    public KafkaFutureImpl() {
+        this(false, new KafkaCompletableFuture<>());
     }
 
-    private static class SingleWaiter<R> implements BiConsumer<R, Throwable> {
-        private R value = null;
-        private Throwable exception = null;
-        private boolean done = false;
-
-        @Override
-        public synchronized void accept(R newValue, Throwable newException) {
-            this.value = newValue;
-            this.exception = newException;
-            this.done = true;
-            this.notifyAll();
-        }
-
-        synchronized R await() throws InterruptedException, ExecutionException 
{
-            while (true) {
-                if (exception != null)
-                    wrapAndThrow(exception);
-                if (done)
-                    return value;
-                this.wait();
-            }
-        }
-
-        R await(long timeout, TimeUnit unit)
-                throws InterruptedException, ExecutionException, 
TimeoutException {
-            long startMs = System.currentTimeMillis();
-            long waitTimeMs = unit.toMillis(timeout);
-            long delta = 0;
-            synchronized (this) {
-                while (true) {
-                    if (exception != null)
-                        wrapAndThrow(exception);
-                    if (done)
-                        return value;
-                    if (delta >= waitTimeMs) {
-                        throw new TimeoutException();
-                    }
-                    this.wait(waitTimeMs - delta);
-                    delta = System.currentTimeMillis() - startMs;
-                }
-            }
-        }
+    public KafkaFutureImpl(KafkaCompletableFuture<T> completableFuture) {
+        this(false, completableFuture);
     }
 
-    /**
-     * True if this future is done.
-     */
-    private boolean done = false;
-
-    /**
-     * The value of this future, or null.  Protected by the object monitor.
-     */
-    private T value = null;
-
-    /**
-     * The exception associated with this future, or null.  Protected by the 
object monitor.
-     */
-    private Throwable exception = null;
+    private KafkaFutureImpl(boolean isDependant, KafkaCompletableFuture<T> 
completableFuture) {
+        this.isDependant = isDependant;
+        this.completableFuture = completableFuture;
+    }
 
-    /**
-     * A list of objects waiting for this future to complete (either 
successfully or
-     * exceptionally).  Protected by the object monitor.
-     */
-    private List<BiConsumer<? super T, ? super Throwable>> waiters = new 
ArrayList<>();
+    @Override
+    public CompletionStage<T> toCompletionStage() {
+        return completableFuture;
+    }
 
     /**
      * Returns a new KafkaFuture that, when this future completes normally, is 
executed with this
      * futures's result as the argument to the supplied function.
      */
     @Override
     public <R> KafkaFuture<R> thenApply(BaseFunction<T, R> function) {
-        KafkaFutureImpl<R> future = new KafkaFutureImpl<>();
-        addWaiter(new Applicant<>(function, future));
-        return future;
+        CompletableFuture<R> appliedFuture = completableFuture.thenApply(value 
-> {
+            try {
+                return function.apply(value);
+            } catch (Throwable t) {
+                if (t instanceof CompletionException) {
+                    // KafkaFuture#thenApply, when the function threw 
CompletionException should return
+                    // an ExecutionException wrapping a CompletionException 
wrapping the exception thrown by the
+                    // function. CompletableFuture#thenApply will just return 
ExecutionException wrapping the
+                    // exception thrown by the function, so we add an extra 
CompletionException here to
+                    // maintain the KafkaFuture behaviour.
+                    throw new CompletionException(t);
+                } else {
+                    throw t;
+                }
+            }
+        });
+        return new KafkaFutureImpl<>(true, 
toKafkaCompletableFuture(appliedFuture));
     }
 
-    public <R> void copyWith(KafkaFuture<R> future, BaseFunction<R, T> 
function) {
-        KafkaFutureImpl<R> futureImpl = (KafkaFutureImpl<R>) future;
-        futureImpl.addWaiter(new Applicant<>(function, this));
+    private static <U> KafkaCompletableFuture<U> 
toKafkaCompletableFuture(CompletableFuture<U> completableFuture) {
+        if (completableFuture instanceof KafkaCompletableFuture) {
+            return (KafkaCompletableFuture<U>) completableFuture;
+        } else {
+            final KafkaCompletableFuture<U> result = new 
KafkaCompletableFuture<>();
+            completableFuture.whenComplete((x, y) -> {
+                if (y != null) {
+                    result.kafkaCompleteExceptionally(y);
+                } else {
+                    result.kafkaComplete(x);
+                }
+            });
+            return result;
+        }
     }
 
     /**
      * @see KafkaFutureImpl#thenApply(BaseFunction)
+     * @deprecated Since Kafka 3.0.
      */
+    @Deprecated
     @Override
     public <R> KafkaFuture<R> thenApply(Function<T, R> function) {
         return thenApply((BaseFunction<T, R>) function);
     }
 
-    private static class WhenCompleteBiConsumer<T> implements BiConsumer<T, 
Throwable> {
-        private final KafkaFutureImpl<T> future;
-        private final BiConsumer<? super T, ? super Throwable> biConsumer;
-
-        WhenCompleteBiConsumer(KafkaFutureImpl<T> future, BiConsumer<? super 
T, ? super Throwable> biConsumer) {
-            this.future = future;
-            this.biConsumer = biConsumer;
-        }
-
-        @Override
-        public void accept(T val, Throwable exception) {
+    @Override
+    public KafkaFuture<T> whenComplete(final BiConsumer<? super T, ? super 
Throwable> biConsumer) {
+        CompletableFuture<T> tCompletableFuture = 
completableFuture.whenComplete((java.util.function.BiConsumer<? super T, ? 
super Throwable>) (a, b) -> {
             try {
-                if (exception != null) {
-                    biConsumer.accept(null, exception);
+                biConsumer.accept(a, b);
+            } catch (Throwable t) {
+                if (t instanceof CompletionException) {
+                    throw new CompletionException(t);
                 } else {
-                    biConsumer.accept(val, null);
-                }
-            } catch (Throwable e) {
-                if (exception == null) {
-                    exception = e;
+                    throw t;
                 }
             }
-            if (exception != null) {
-                future.completeExceptionally(exception);
-            } else {
-                future.complete(val);
-            }
-        }
+        });
+        return new KafkaFutureImpl<>(true, 
toKafkaCompletableFuture(tCompletableFuture));
     }
 
-    @Override
-    public KafkaFuture<T> whenComplete(final BiConsumer<? super T, ? super 
Throwable> biConsumer) {
-        final KafkaFutureImpl<T> future = new KafkaFutureImpl<>();
-        addWaiter(new WhenCompleteBiConsumer<>(future, biConsumer));
-        return future;
-    }
-
-    protected synchronized void addWaiter(BiConsumer<? super T, ? super 
Throwable> action) {
-        if (exception != null) {
-            action.accept(null, exception);
-        } else if (done) {
-            action.accept(value, null);
-        } else {
-            waiters.add(action);
-        }
-    }
 
     @Override
     public synchronized boolean complete(T newValue) {

Review comment:
       Could we also remove `synchronized` here? 

##########
File path: 
clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
##########
@@ -267,50 +180,82 @@ public T get() throws InterruptedException, 
ExecutionException {
     @Override
     public T get(long timeout, TimeUnit unit) throws InterruptedException, 
ExecutionException,
             TimeoutException {
-        SingleWaiter<T> waiter = new SingleWaiter<>();
-        addWaiter(waiter);
-        return waiter.await(timeout, unit);
+        try {
+            return completableFuture.get(timeout, unit);
+        } catch (ExecutionException e) {
+            maybeThrowCancellationException(e.getCause());
+            throw e;
+        }
     }
 
     /**
      * Returns the result value (or throws any encountered exception) if 
completed, else returns
      * the given valueIfAbsent.
      */
     @Override
-    public synchronized T getNow(T valueIfAbsent) throws InterruptedException, 
ExecutionException {

Review comment:
       Could we also remove `synchronized` here? 

##########
File path: 
clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
##########
@@ -267,50 +180,82 @@ public T get() throws InterruptedException, 
ExecutionException {
     @Override
     public T get(long timeout, TimeUnit unit) throws InterruptedException, 
ExecutionException,
             TimeoutException {
-        SingleWaiter<T> waiter = new SingleWaiter<>();
-        addWaiter(waiter);
-        return waiter.await(timeout, unit);
+        try {
+            return completableFuture.get(timeout, unit);
+        } catch (ExecutionException e) {
+            maybeThrowCancellationException(e.getCause());
+            throw e;
+        }
     }
 
     /**
      * Returns the result value (or throws any encountered exception) if 
completed, else returns
      * the given valueIfAbsent.
      */
     @Override
-    public synchronized T getNow(T valueIfAbsent) throws InterruptedException, 
ExecutionException {
-        if (exception != null)
-            wrapAndThrow(exception);
-        if (done)
-            return value;
-        return valueIfAbsent;
+    public synchronized T getNow(T valueIfAbsent) throws ExecutionException {

Review comment:
       Good question. AK 3.0 is a good opportunity to do such breaking changes 
so I would be in favour of doing it. Let's see what other think.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to