dajac commented on a change in pull request #9878: URL: https://github.com/apache/kafka/pull/9878#discussion_r662791694
########## File path: clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java ########## @@ -27,217 +28,107 @@ /** * A flexible future which supports call chaining and other asynchronous programming patterns. - * This will eventually become a thin shim on top of Java 8's CompletableFuture. */ public class KafkaFutureImpl<T> extends KafkaFuture<T> { - /** - * A convenience method that throws the current exception, wrapping it if needed. - * - * In general, KafkaFuture throws CancellationException and InterruptedException directly, and - * wraps all other exceptions in an ExecutionException. - */ - private static void wrapAndThrow(Throwable t) throws InterruptedException, ExecutionException { - if (t instanceof CancellationException) { - throw (CancellationException) t; - } else if (t instanceof InterruptedException) { - throw (InterruptedException) t; - } else { - throw new ExecutionException(t); - } - } - private static class Applicant<A, B> implements BiConsumer<A, Throwable> { - private final BaseFunction<A, B> function; - private final KafkaFutureImpl<B> future; + private final KafkaCompletableFuture<T> completableFuture; - Applicant(BaseFunction<A, B> function, KafkaFutureImpl<B> future) { - this.function = function; - this.future = future; - } + private final boolean isDependant; - @Override - public void accept(A a, Throwable exception) { - if (exception != null) { - future.completeExceptionally(exception); - } else { - try { - B b = function.apply(a); - future.complete(b); - } catch (Throwable t) { - future.completeExceptionally(t); - } - } - } + public KafkaFutureImpl() { + this(false, new KafkaCompletableFuture<>()); } - private static class SingleWaiter<R> implements BiConsumer<R, Throwable> { - private R value = null; - private Throwable exception = null; - private boolean done = false; - - @Override - public synchronized void accept(R newValue, Throwable newException) { - this.value = newValue; - this.exception = newException; - this.done = true; - this.notifyAll(); - } - - synchronized R await() throws InterruptedException, ExecutionException { - while (true) { - if (exception != null) - wrapAndThrow(exception); - if (done) - return value; - this.wait(); - } - } - - R await(long timeout, TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { - long startMs = System.currentTimeMillis(); - long waitTimeMs = unit.toMillis(timeout); - long delta = 0; - synchronized (this) { - while (true) { - if (exception != null) - wrapAndThrow(exception); - if (done) - return value; - if (delta >= waitTimeMs) { - throw new TimeoutException(); - } - this.wait(waitTimeMs - delta); - delta = System.currentTimeMillis() - startMs; - } - } - } + public KafkaFutureImpl(KafkaCompletableFuture<T> completableFuture) { + this(false, completableFuture); } - /** - * True if this future is done. - */ - private boolean done = false; - - /** - * The value of this future, or null. Protected by the object monitor. - */ - private T value = null; - - /** - * The exception associated with this future, or null. Protected by the object monitor. - */ - private Throwable exception = null; + private KafkaFutureImpl(boolean isDependant, KafkaCompletableFuture<T> completableFuture) { + this.isDependant = isDependant; + this.completableFuture = completableFuture; + } - /** - * A list of objects waiting for this future to complete (either successfully or - * exceptionally). Protected by the object monitor. - */ - private List<BiConsumer<? super T, ? super Throwable>> waiters = new ArrayList<>(); + @Override + public CompletionStage<T> toCompletionStage() { + return completableFuture; + } /** * Returns a new KafkaFuture that, when this future completes normally, is executed with this * futures's result as the argument to the supplied function. */ @Override public <R> KafkaFuture<R> thenApply(BaseFunction<T, R> function) { - KafkaFutureImpl<R> future = new KafkaFutureImpl<>(); - addWaiter(new Applicant<>(function, future)); - return future; + CompletableFuture<R> completableFuture = this.completableFuture.thenApply((java.util.function.Function<? super T, ? extends R>) value -> { + try { + return function.apply(value); + } catch (Throwable t) { + if (t instanceof CompletionException) { + throw new CompletionException(t); + } else { + throw t; + } + } + }); + return new KafkaFutureImpl<>(true, toKafkaCompletableFuture(completableFuture)); } - public <R> void copyWith(KafkaFuture<R> future, BaseFunction<R, T> function) { - KafkaFutureImpl<R> futureImpl = (KafkaFutureImpl<R>) future; - futureImpl.addWaiter(new Applicant<>(function, this)); + private <U> KafkaCompletableFuture<U> toKafkaCompletableFuture(CompletableFuture<U> tCompletableFuture) { + KafkaCompletableFuture<U> res; + if (tCompletableFuture instanceof KafkaCompletableFuture) { + res = (KafkaCompletableFuture<U>) tCompletableFuture; Review comment: nit: We could omit `res` and return directly in the two places below. ########## File path: clients/src/main/java/org/apache/kafka/common/KafkaFuture.java ########## @@ -98,14 +80,45 @@ private void maybeComplete() { * an exception, which one gets returned is arbitrarily chosen. */ public static KafkaFuture<Void> allOf(KafkaFuture<?>... futures) { - KafkaFuture<Void> allOfFuture = new KafkaFutureImpl<>(); - AllOfAdapter<Object> allOfWaiter = new AllOfAdapter<>(futures.length, allOfFuture); - for (KafkaFuture<?> future : futures) { - future.addWaiter(allOfWaiter); - } - return allOfFuture; + KafkaFutureImpl<Void> result = new KafkaFutureImpl<>(); + CompletableFuture.allOf(Arrays.stream(futures) + .map(kafkaFuture -> { + // Safe since KafkaFuture's only subclass is KafkaFuture for which toCompletionStage() + // always return a CF. Review comment: nit: I would replace `CF` by `CompletableFuture`. ########## File path: clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java ########## @@ -27,217 +28,107 @@ /** * A flexible future which supports call chaining and other asynchronous programming patterns. - * This will eventually become a thin shim on top of Java 8's CompletableFuture. */ public class KafkaFutureImpl<T> extends KafkaFuture<T> { - /** - * A convenience method that throws the current exception, wrapping it if needed. - * - * In general, KafkaFuture throws CancellationException and InterruptedException directly, and - * wraps all other exceptions in an ExecutionException. - */ - private static void wrapAndThrow(Throwable t) throws InterruptedException, ExecutionException { - if (t instanceof CancellationException) { - throw (CancellationException) t; - } else if (t instanceof InterruptedException) { - throw (InterruptedException) t; - } else { - throw new ExecutionException(t); - } - } - private static class Applicant<A, B> implements BiConsumer<A, Throwable> { - private final BaseFunction<A, B> function; - private final KafkaFutureImpl<B> future; + private final KafkaCompletableFuture<T> completableFuture; - Applicant(BaseFunction<A, B> function, KafkaFutureImpl<B> future) { - this.function = function; - this.future = future; - } + private final boolean isDependant; - @Override - public void accept(A a, Throwable exception) { - if (exception != null) { - future.completeExceptionally(exception); - } else { - try { - B b = function.apply(a); - future.complete(b); - } catch (Throwable t) { - future.completeExceptionally(t); - } - } - } + public KafkaFutureImpl() { + this(false, new KafkaCompletableFuture<>()); } - private static class SingleWaiter<R> implements BiConsumer<R, Throwable> { - private R value = null; - private Throwable exception = null; - private boolean done = false; - - @Override - public synchronized void accept(R newValue, Throwable newException) { - this.value = newValue; - this.exception = newException; - this.done = true; - this.notifyAll(); - } - - synchronized R await() throws InterruptedException, ExecutionException { - while (true) { - if (exception != null) - wrapAndThrow(exception); - if (done) - return value; - this.wait(); - } - } - - R await(long timeout, TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { - long startMs = System.currentTimeMillis(); - long waitTimeMs = unit.toMillis(timeout); - long delta = 0; - synchronized (this) { - while (true) { - if (exception != null) - wrapAndThrow(exception); - if (done) - return value; - if (delta >= waitTimeMs) { - throw new TimeoutException(); - } - this.wait(waitTimeMs - delta); - delta = System.currentTimeMillis() - startMs; - } - } - } + public KafkaFutureImpl(KafkaCompletableFuture<T> completableFuture) { + this(false, completableFuture); } - /** - * True if this future is done. - */ - private boolean done = false; - - /** - * The value of this future, or null. Protected by the object monitor. - */ - private T value = null; - - /** - * The exception associated with this future, or null. Protected by the object monitor. - */ - private Throwable exception = null; + private KafkaFutureImpl(boolean isDependant, KafkaCompletableFuture<T> completableFuture) { + this.isDependant = isDependant; + this.completableFuture = completableFuture; + } - /** - * A list of objects waiting for this future to complete (either successfully or - * exceptionally). Protected by the object monitor. - */ - private List<BiConsumer<? super T, ? super Throwable>> waiters = new ArrayList<>(); + @Override + public CompletionStage<T> toCompletionStage() { + return completableFuture; + } /** * Returns a new KafkaFuture that, when this future completes normally, is executed with this * futures's result as the argument to the supplied function. */ @Override public <R> KafkaFuture<R> thenApply(BaseFunction<T, R> function) { - KafkaFutureImpl<R> future = new KafkaFutureImpl<>(); - addWaiter(new Applicant<>(function, future)); - return future; + CompletableFuture<R> completableFuture = this.completableFuture.thenApply((java.util.function.Function<? super T, ? extends R>) value -> { Review comment: nit: Do we really need to specify `(java.util.function.Function<? super T, ? extends R>)` here? ########## File path: clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java ########## @@ -247,17 +138,32 @@ public boolean completeExceptionally(Throwable newException) { */ @Override public synchronized boolean cancel(boolean mayInterruptIfRunning) { - return completeExceptionally(new CancellationException()) || exception instanceof CancellationException; + return completableFuture.cancel(mayInterruptIfRunning); + } + + // We need to deal with differences between KF's historic API and the API of CF: + // CF#get() does not wrap CancellationException in ExecutionException (nor does KF). + // CF#get() always wraps the _cause_ of a CompletionException in ExecutionException (which KF does not). + // + // The semantics for KafkaFuture are that all exceptional completions of the future (via #completeExceptionally() or exceptions from dependants) + // manifest as ExecutionException, as observed via both get() and getNow(). + private void maybeRewrapAndThrow(Throwable cause) { + if (cause instanceof CancellationException) { + throw (CancellationException) cause; + } } /** * Waits if necessary for this future to complete, and then returns its result. */ @Override public T get() throws InterruptedException, ExecutionException { - SingleWaiter<T> waiter = new SingleWaiter<>(); - addWaiter(waiter); - return waiter.await(); + try { + return completableFuture.get(); + } catch (ExecutionException e) { + maybeRewrapAndThrow(e.getCause()); + throw e; Review comment: nit: Would it make sense to move `throw e` into `maybeRewrapAndThrow` to let `maybeRewrapAndThrow` throw in both cases? More generally, I wonder if we could handle all the case in `maybeRewrapAndThrow` and use it everywhere. ########## File path: clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java ########## @@ -27,217 +28,107 @@ /** * A flexible future which supports call chaining and other asynchronous programming patterns. - * This will eventually become a thin shim on top of Java 8's CompletableFuture. */ public class KafkaFutureImpl<T> extends KafkaFuture<T> { - /** - * A convenience method that throws the current exception, wrapping it if needed. - * - * In general, KafkaFuture throws CancellationException and InterruptedException directly, and - * wraps all other exceptions in an ExecutionException. - */ - private static void wrapAndThrow(Throwable t) throws InterruptedException, ExecutionException { - if (t instanceof CancellationException) { - throw (CancellationException) t; - } else if (t instanceof InterruptedException) { - throw (InterruptedException) t; - } else { - throw new ExecutionException(t); - } - } - private static class Applicant<A, B> implements BiConsumer<A, Throwable> { - private final BaseFunction<A, B> function; - private final KafkaFutureImpl<B> future; + private final KafkaCompletableFuture<T> completableFuture; - Applicant(BaseFunction<A, B> function, KafkaFutureImpl<B> future) { - this.function = function; - this.future = future; - } + private final boolean isDependant; - @Override - public void accept(A a, Throwable exception) { - if (exception != null) { - future.completeExceptionally(exception); - } else { - try { - B b = function.apply(a); - future.complete(b); - } catch (Throwable t) { - future.completeExceptionally(t); - } - } - } + public KafkaFutureImpl() { + this(false, new KafkaCompletableFuture<>()); } - private static class SingleWaiter<R> implements BiConsumer<R, Throwable> { - private R value = null; - private Throwable exception = null; - private boolean done = false; - - @Override - public synchronized void accept(R newValue, Throwable newException) { - this.value = newValue; - this.exception = newException; - this.done = true; - this.notifyAll(); - } - - synchronized R await() throws InterruptedException, ExecutionException { - while (true) { - if (exception != null) - wrapAndThrow(exception); - if (done) - return value; - this.wait(); - } - } - - R await(long timeout, TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { - long startMs = System.currentTimeMillis(); - long waitTimeMs = unit.toMillis(timeout); - long delta = 0; - synchronized (this) { - while (true) { - if (exception != null) - wrapAndThrow(exception); - if (done) - return value; - if (delta >= waitTimeMs) { - throw new TimeoutException(); - } - this.wait(waitTimeMs - delta); - delta = System.currentTimeMillis() - startMs; - } - } - } + public KafkaFutureImpl(KafkaCompletableFuture<T> completableFuture) { + this(false, completableFuture); } - /** - * True if this future is done. - */ - private boolean done = false; - - /** - * The value of this future, or null. Protected by the object monitor. - */ - private T value = null; - - /** - * The exception associated with this future, or null. Protected by the object monitor. - */ - private Throwable exception = null; + private KafkaFutureImpl(boolean isDependant, KafkaCompletableFuture<T> completableFuture) { + this.isDependant = isDependant; + this.completableFuture = completableFuture; + } - /** - * A list of objects waiting for this future to complete (either successfully or - * exceptionally). Protected by the object monitor. - */ - private List<BiConsumer<? super T, ? super Throwable>> waiters = new ArrayList<>(); + @Override + public CompletionStage<T> toCompletionStage() { + return completableFuture; + } /** * Returns a new KafkaFuture that, when this future completes normally, is executed with this * futures's result as the argument to the supplied function. */ @Override public <R> KafkaFuture<R> thenApply(BaseFunction<T, R> function) { - KafkaFutureImpl<R> future = new KafkaFutureImpl<>(); - addWaiter(new Applicant<>(function, future)); - return future; + CompletableFuture<R> completableFuture = this.completableFuture.thenApply((java.util.function.Function<? super T, ? extends R>) value -> { + try { + return function.apply(value); + } catch (Throwable t) { + if (t instanceof CompletionException) { + throw new CompletionException(t); Review comment: I am not sure that I follow why we wrap it here. Could you elaborate? ########## File path: clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java ########## @@ -247,17 +138,32 @@ public boolean completeExceptionally(Throwable newException) { */ @Override public synchronized boolean cancel(boolean mayInterruptIfRunning) { - return completeExceptionally(new CancellationException()) || exception instanceof CancellationException; + return completableFuture.cancel(mayInterruptIfRunning); + } + + // We need to deal with differences between KF's historic API and the API of CF: + // CF#get() does not wrap CancellationException in ExecutionException (nor does KF). + // CF#get() always wraps the _cause_ of a CompletionException in ExecutionException (which KF does not). + // + // The semantics for KafkaFuture are that all exceptional completions of the future (via #completeExceptionally() or exceptions from dependants) + // manifest as ExecutionException, as observed via both get() and getNow(). Review comment: nit: Could we turn this into a javadoc or perhaps move it into the method? ########## File path: clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java ########## @@ -27,217 +28,107 @@ /** * A flexible future which supports call chaining and other asynchronous programming patterns. - * This will eventually become a thin shim on top of Java 8's CompletableFuture. */ public class KafkaFutureImpl<T> extends KafkaFuture<T> { - /** - * A convenience method that throws the current exception, wrapping it if needed. - * - * In general, KafkaFuture throws CancellationException and InterruptedException directly, and - * wraps all other exceptions in an ExecutionException. - */ - private static void wrapAndThrow(Throwable t) throws InterruptedException, ExecutionException { - if (t instanceof CancellationException) { - throw (CancellationException) t; - } else if (t instanceof InterruptedException) { - throw (InterruptedException) t; - } else { - throw new ExecutionException(t); - } - } - private static class Applicant<A, B> implements BiConsumer<A, Throwable> { - private final BaseFunction<A, B> function; - private final KafkaFutureImpl<B> future; + private final KafkaCompletableFuture<T> completableFuture; - Applicant(BaseFunction<A, B> function, KafkaFutureImpl<B> future) { - this.function = function; - this.future = future; - } + private final boolean isDependant; - @Override - public void accept(A a, Throwable exception) { - if (exception != null) { - future.completeExceptionally(exception); - } else { - try { - B b = function.apply(a); - future.complete(b); - } catch (Throwable t) { - future.completeExceptionally(t); - } - } - } + public KafkaFutureImpl() { + this(false, new KafkaCompletableFuture<>()); } - private static class SingleWaiter<R> implements BiConsumer<R, Throwable> { - private R value = null; - private Throwable exception = null; - private boolean done = false; - - @Override - public synchronized void accept(R newValue, Throwable newException) { - this.value = newValue; - this.exception = newException; - this.done = true; - this.notifyAll(); - } - - synchronized R await() throws InterruptedException, ExecutionException { - while (true) { - if (exception != null) - wrapAndThrow(exception); - if (done) - return value; - this.wait(); - } - } - - R await(long timeout, TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { - long startMs = System.currentTimeMillis(); - long waitTimeMs = unit.toMillis(timeout); - long delta = 0; - synchronized (this) { - while (true) { - if (exception != null) - wrapAndThrow(exception); - if (done) - return value; - if (delta >= waitTimeMs) { - throw new TimeoutException(); - } - this.wait(waitTimeMs - delta); - delta = System.currentTimeMillis() - startMs; - } - } - } + public KafkaFutureImpl(KafkaCompletableFuture<T> completableFuture) { + this(false, completableFuture); } - /** - * True if this future is done. - */ - private boolean done = false; - - /** - * The value of this future, or null. Protected by the object monitor. - */ - private T value = null; - - /** - * The exception associated with this future, or null. Protected by the object monitor. - */ - private Throwable exception = null; + private KafkaFutureImpl(boolean isDependant, KafkaCompletableFuture<T> completableFuture) { + this.isDependant = isDependant; + this.completableFuture = completableFuture; + } - /** - * A list of objects waiting for this future to complete (either successfully or - * exceptionally). Protected by the object monitor. - */ - private List<BiConsumer<? super T, ? super Throwable>> waiters = new ArrayList<>(); + @Override + public CompletionStage<T> toCompletionStage() { + return completableFuture; + } /** * Returns a new KafkaFuture that, when this future completes normally, is executed with this * futures's result as the argument to the supplied function. */ @Override public <R> KafkaFuture<R> thenApply(BaseFunction<T, R> function) { - KafkaFutureImpl<R> future = new KafkaFutureImpl<>(); - addWaiter(new Applicant<>(function, future)); - return future; + CompletableFuture<R> completableFuture = this.completableFuture.thenApply((java.util.function.Function<? super T, ? extends R>) value -> { + try { + return function.apply(value); + } catch (Throwable t) { + if (t instanceof CompletionException) { + throw new CompletionException(t); + } else { + throw t; + } + } + }); + return new KafkaFutureImpl<>(true, toKafkaCompletableFuture(completableFuture)); } - public <R> void copyWith(KafkaFuture<R> future, BaseFunction<R, T> function) { - KafkaFutureImpl<R> futureImpl = (KafkaFutureImpl<R>) future; - futureImpl.addWaiter(new Applicant<>(function, this)); + private <U> KafkaCompletableFuture<U> toKafkaCompletableFuture(CompletableFuture<U> tCompletableFuture) { + KafkaCompletableFuture<U> res; + if (tCompletableFuture instanceof KafkaCompletableFuture) { + res = (KafkaCompletableFuture<U>) tCompletableFuture; + } else { + KafkaCompletableFuture<U> result = new KafkaCompletableFuture<>(); + tCompletableFuture.whenComplete((x, y) -> { + if (y != null) { + result.kafkaCompleteExceptionally(y); + } else { + result.kafkaComplete(x); + } + }); + res = result; + } + return res; } /** * @see KafkaFutureImpl#thenApply(BaseFunction) + * @deprecated Since Kafka 3.0. */ + @Deprecated @Override public <R> KafkaFuture<R> thenApply(Function<T, R> function) { return thenApply((BaseFunction<T, R>) function); } - private static class WhenCompleteBiConsumer<T> implements BiConsumer<T, Throwable> { - private final KafkaFutureImpl<T> future; - private final BiConsumer<? super T, ? super Throwable> biConsumer; - - WhenCompleteBiConsumer(KafkaFutureImpl<T> future, BiConsumer<? super T, ? super Throwable> biConsumer) { - this.future = future; - this.biConsumer = biConsumer; - } - - @Override - public void accept(T val, Throwable exception) { + @Override + public KafkaFuture<T> whenComplete(final BiConsumer<? super T, ? super Throwable> biConsumer) { + CompletableFuture<T> tCompletableFuture = completableFuture.whenComplete((java.util.function.BiConsumer<? super T, ? super Throwable>) (a, b) -> { try { - if (exception != null) { - biConsumer.accept(null, exception); + biConsumer.accept(a, b); + } catch (Throwable t) { + if (t instanceof CompletionException) { + throw new CompletionException(t); } else { - biConsumer.accept(val, null); + throw t; } - } catch (Throwable e) { - if (exception == null) { - exception = e; - } - } - if (exception != null) { - future.completeExceptionally(exception); - } else { - future.complete(val); } - } + }); + return new KafkaFutureImpl<>(true, toKafkaCompletableFuture(tCompletableFuture)); } - @Override - public KafkaFuture<T> whenComplete(final BiConsumer<? super T, ? super Throwable> biConsumer) { - final KafkaFutureImpl<T> future = new KafkaFutureImpl<>(); - addWaiter(new WhenCompleteBiConsumer<>(future, biConsumer)); - return future; - } - - protected synchronized void addWaiter(BiConsumer<? super T, ? super Throwable> action) { - if (exception != null) { - action.accept(null, exception); - } else if (done) { - action.accept(value, null); - } else { - waiters.add(action); - } - } @Override public synchronized boolean complete(T newValue) { - List<BiConsumer<? super T, ? super Throwable>> oldWaiters; - synchronized (this) { - if (done) - return false; - value = newValue; - done = true; - oldWaiters = waiters; - waiters = null; - } - for (BiConsumer<? super T, ? super Throwable> waiter : oldWaiters) { - waiter.accept(newValue, null); - } - return true; + return completableFuture.kafkaComplete(newValue); } @Override public boolean completeExceptionally(Throwable newException) { - List<BiConsumer<? super T, ? super Throwable>> oldWaiters; - synchronized (this) { - if (done) - return false; - exception = newException; - done = true; - oldWaiters = waiters; - waiters = null; - } - for (BiConsumer<? super T, ? super Throwable> waiter : oldWaiters) { - waiter.accept(null, newException); - } - return true; + // CF#get() always wraps the _cause_ of a CompletionException in EE (which KF does not) Review comment: nit: I would rather use the full name instead of using acronyms. ########## File path: clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java ########## @@ -267,50 +173,77 @@ public T get() throws InterruptedException, ExecutionException { @Override public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - SingleWaiter<T> waiter = new SingleWaiter<>(); - addWaiter(waiter); - return waiter.await(timeout, unit); + try { + return completableFuture.get(timeout, unit); + } catch (ExecutionException e) { + maybeRewrapAndThrow(e.getCause()); + throw e; + } } /** * Returns the result value (or throws any encountered exception) if completed, else returns * the given valueIfAbsent. */ @Override - public synchronized T getNow(T valueIfAbsent) throws InterruptedException, ExecutionException { - if (exception != null) - wrapAndThrow(exception); - if (done) - return value; - return valueIfAbsent; + public synchronized T getNow(T valueIfAbsent) throws ExecutionException { + try { + return completableFuture.getNow(valueIfAbsent); + } catch (CompletionException e) { + maybeRewrapAndThrow(e.getCause()); + // Note, unlike CF#get() which throws ExecutionException, CF#getNow() throws CompletionException + // thus needs rewrapping to conform to KafkaFuture API, where KF#getNow() throws ExecutionException. + throw new ExecutionException(e.getCause()); + } } /** * Returns true if this CompletableFuture was cancelled before it completed normally. */ @Override public synchronized boolean isCancelled() { - return exception instanceof CancellationException; + if (isDependant) { + Throwable exception; + try { + completableFuture.getNow(null); + return false; + } catch (Exception e) { + exception = e; + } + return exception instanceof CompletionException + && exception.getCause() instanceof CancellationException; Review comment: I am curious here. Would it be possible to let the `CancellationException` propagate from the upstream future to this one when the upstream future is cancelled? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org