chia7712 commented on a change in pull request #9878: URL: https://github.com/apache/kafka/pull/9878#discussion_r663936311
########## File path: clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java ########## @@ -17,68 +17,265 @@ package org.apache.kafka.common; import org.apache.kafka.common.internals.KafkaFutureImpl; +import org.apache.kafka.common.utils.Java; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; +import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.fail; /** * A unit test for KafkaFuture. */ @Timeout(120) public class KafkaFutureTest { + /** Asserts that the given future is done, didn't fail and wasn't cancelled. */ + private void assertIsSuccessful(KafkaFuture<?> future) { + assertTrue(future.isDone()); + assertFalse(future.isCompletedExceptionally()); + assertFalse(future.isCancelled()); + } + + /** Asserts that the given future is done, failed and wasn't cancelled. */ + private void assertIsFailed(KafkaFuture<?> future) { + assertTrue(future.isDone()); + assertFalse(future.isCancelled()); + assertTrue(future.isCompletedExceptionally()); + } + + /** Asserts that the given future is done, didn't fail and was cancelled. */ + private void assertIsCancelled(KafkaFuture<?> future) { + assertTrue(future.isDone()); + assertTrue(future.isCancelled()); + assertTrue(future.isCompletedExceptionally()); + assertThrows(CancellationException.class, () -> future.getNow(null)); + assertThrows(CancellationException.class, () -> future.get(0, TimeUnit.MILLISECONDS)); + } + + private <T> void awaitAndAssertResult(KafkaFuture<T> future, + T expectedResult, + T alternativeValue) { + assertNotEquals(expectedResult, alternativeValue); + try { + assertEquals(expectedResult, future.get(5, TimeUnit.MINUTES)); + } catch (Exception e) { + throw new AssertionError("Unexpected exception", e); + } + try { + assertEquals(expectedResult, future.get()); + } catch (Exception e) { + throw new AssertionError("Unexpected exception", e); + } + try { + assertEquals(expectedResult, future.getNow(alternativeValue)); + } catch (Exception e) { + throw new AssertionError("Unexpected exception", e); + } + } + + private Throwable awaitAndAssertFailure(KafkaFuture<?> future, + Class<? extends Throwable> expectedException, + String expectedMessage) { + try { + future.get(5, TimeUnit.MINUTES); + fail("Expected an exception"); + } catch (ExecutionException e) { + assertEquals(expectedException, e.getCause().getClass()); + assertEquals(expectedMessage, e.getCause().getMessage()); + } catch (Exception e) { + throw new AssertionError("Unexpected exception", e); + } + try { + future.get(); + fail("Expected an exception"); + } catch (ExecutionException e) { + assertEquals(expectedException, e.getCause().getClass()); + assertEquals(expectedMessage, e.getCause().getMessage()); + } catch (Exception e) { + throw new AssertionError("Unexpected exception", e); + } + try { + future.getNow(null); + fail("Expected an exception"); + } catch (ExecutionException e) { + assertEquals(expectedException, e.getCause().getClass()); + assertEquals(expectedMessage, e.getCause().getMessage()); + return e.getCause(); + } catch (Exception e) { + throw new AssertionError("Unexpected exception", e); + } + throw new AssertionError("Unexpected lack of exception"); + } + + + private void awaitAndAssertCancelled(KafkaFuture<?> future, String expectedMessage) { + try { + future.get(5, TimeUnit.MINUTES); + fail("Expected an exception"); + } catch (CancellationException e) { + assertEquals(CancellationException.class, e.getClass()); + assertEquals(expectedMessage, e.getMessage()); + } catch (Exception e) { + throw new AssertionError("Unexpected exception", e); + } + try { + future.get(); + fail("Expected an exception"); + } catch (CancellationException e) { + assertEquals(CancellationException.class, e.getClass()); + assertEquals(expectedMessage, e.getMessage()); + } catch (Exception e) { + throw new AssertionError("Unexpected exception", e); + } + try { + future.getNow(null); Review comment: How about using following style. ```java ExecutionException e = assertThrows(ExecutionException.class, () -> future.getNow(null)); assertEquals(expectedException, e.getCause().getClass()); assertEquals(expectedMessage, e.getCause().getMessage()); return e.getCause(); ``` ########## File path: clients/src/main/java/org/apache/kafka/common/KafkaFuture.java ########## @@ -184,7 +194,7 @@ public abstract T get(long timeout, TimeUnit unit) throws InterruptedException, * Returns the result value (or throws any encountered exception) if completed, else returns * the given valueIfAbsent. */ - public abstract T getNow(T valueIfAbsent) throws InterruptedException, ExecutionException; + public abstract T getNow(T valueIfAbsent) throws ExecutionException; Review comment: Could you please revert this change as you mentioned that it can cause compatible issue. ########## File path: clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java ########## @@ -27,217 +28,111 @@ /** * A flexible future which supports call chaining and other asynchronous programming patterns. - * This will eventually become a thin shim on top of Java 8's CompletableFuture. */ public class KafkaFutureImpl<T> extends KafkaFuture<T> { - /** - * A convenience method that throws the current exception, wrapping it if needed. - * - * In general, KafkaFuture throws CancellationException and InterruptedException directly, and - * wraps all other exceptions in an ExecutionException. - */ - private static void wrapAndThrow(Throwable t) throws InterruptedException, ExecutionException { - if (t instanceof CancellationException) { - throw (CancellationException) t; - } else if (t instanceof InterruptedException) { - throw (InterruptedException) t; - } else { - throw new ExecutionException(t); - } - } - private static class Applicant<A, B> implements BiConsumer<A, Throwable> { - private final BaseFunction<A, B> function; - private final KafkaFutureImpl<B> future; + private final KafkaCompletableFuture<T> completableFuture; - Applicant(BaseFunction<A, B> function, KafkaFutureImpl<B> future) { - this.function = function; - this.future = future; - } + private final boolean isDependant; - @Override - public void accept(A a, Throwable exception) { - if (exception != null) { - future.completeExceptionally(exception); - } else { - try { - B b = function.apply(a); - future.complete(b); - } catch (Throwable t) { - future.completeExceptionally(t); - } - } - } + public KafkaFutureImpl() { + this(false, new KafkaCompletableFuture<>()); } - private static class SingleWaiter<R> implements BiConsumer<R, Throwable> { - private R value = null; - private Throwable exception = null; - private boolean done = false; - - @Override - public synchronized void accept(R newValue, Throwable newException) { - this.value = newValue; - this.exception = newException; - this.done = true; - this.notifyAll(); - } - - synchronized R await() throws InterruptedException, ExecutionException { - while (true) { - if (exception != null) - wrapAndThrow(exception); - if (done) - return value; - this.wait(); - } - } - - R await(long timeout, TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { - long startMs = System.currentTimeMillis(); - long waitTimeMs = unit.toMillis(timeout); - long delta = 0; - synchronized (this) { - while (true) { - if (exception != null) - wrapAndThrow(exception); - if (done) - return value; - if (delta >= waitTimeMs) { - throw new TimeoutException(); - } - this.wait(waitTimeMs - delta); - delta = System.currentTimeMillis() - startMs; - } - } - } + public KafkaFutureImpl(KafkaCompletableFuture<T> completableFuture) { Review comment: Is this method still useful? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org