tombentley commented on a change in pull request #9878:
URL: https://github.com/apache/kafka/pull/9878#discussion_r663962563



##########
File path: 
clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
##########
@@ -27,217 +28,111 @@
 
 /**
  * A flexible future which supports call chaining and other asynchronous 
programming patterns.
- * This will eventually become a thin shim on top of Java 8's 
CompletableFuture.
  */
 public class KafkaFutureImpl<T> extends KafkaFuture<T> {
-    /**
-     * A convenience method that throws the current exception, wrapping it if 
needed.
-     *
-     * In general, KafkaFuture throws CancellationException and 
InterruptedException directly, and
-     * wraps all other exceptions in an ExecutionException.
-     */
-    private static void wrapAndThrow(Throwable t) throws InterruptedException, 
ExecutionException {
-        if (t instanceof CancellationException) {
-            throw (CancellationException) t;
-        } else if (t instanceof InterruptedException) {
-            throw (InterruptedException) t;
-        } else {
-            throw new ExecutionException(t);
-        }
-    }
 
-    private static class Applicant<A, B> implements BiConsumer<A, Throwable> {
-        private final BaseFunction<A, B> function;
-        private final KafkaFutureImpl<B> future;
+    private final KafkaCompletableFuture<T> completableFuture;
 
-        Applicant(BaseFunction<A, B> function, KafkaFutureImpl<B> future) {
-            this.function = function;
-            this.future = future;
-        }
+    private final boolean isDependant;
 
-        @Override
-        public void accept(A a, Throwable exception) {
-            if (exception != null) {
-                future.completeExceptionally(exception);
-            } else {
-                try {
-                    B b = function.apply(a);
-                    future.complete(b);
-                } catch (Throwable t) {
-                    future.completeExceptionally(t);
-                }
-            }
-        }
+    public KafkaFutureImpl() {
+        this(false, new KafkaCompletableFuture<>());
     }
 
-    private static class SingleWaiter<R> implements BiConsumer<R, Throwable> {
-        private R value = null;
-        private Throwable exception = null;
-        private boolean done = false;
-
-        @Override
-        public synchronized void accept(R newValue, Throwable newException) {
-            this.value = newValue;
-            this.exception = newException;
-            this.done = true;
-            this.notifyAll();
-        }
-
-        synchronized R await() throws InterruptedException, ExecutionException 
{
-            while (true) {
-                if (exception != null)
-                    wrapAndThrow(exception);
-                if (done)
-                    return value;
-                this.wait();
-            }
-        }
-
-        R await(long timeout, TimeUnit unit)
-                throws InterruptedException, ExecutionException, 
TimeoutException {
-            long startMs = System.currentTimeMillis();
-            long waitTimeMs = unit.toMillis(timeout);
-            long delta = 0;
-            synchronized (this) {
-                while (true) {
-                    if (exception != null)
-                        wrapAndThrow(exception);
-                    if (done)
-                        return value;
-                    if (delta >= waitTimeMs) {
-                        throw new TimeoutException();
-                    }
-                    this.wait(waitTimeMs - delta);
-                    delta = System.currentTimeMillis() - startMs;
-                }
-            }
-        }
+    public KafkaFutureImpl(KafkaCompletableFuture<T> completableFuture) {

Review comment:
       No, but it makes me wonder whether we'd find factory methods for already 
completed and already failed `KafkaFutures` useful.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to