chia7712 commented on a change in pull request #9878:
URL: https://github.com/apache/kafka/pull/9878#discussion_r663755587



##########
File path: 
clients/src/main/java/org/apache/kafka/common/internals/KafkaCompletableFuture.java
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.internals;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+/**
+ * This internal class exists because CompletableFuture exposes complete(), 
completeExceptionally() and
+ * other methods which would allow erroneous completion by user code of a 
KafkaFuture returned from a
+ * Kafka API to a client application.
+ * @param <T> The type of the future value.
+ */
+public class KafkaCompletableFuture<T> extends CompletableFuture<T> {

Review comment:
       > allow interop with 3rd part APIs which might require it
   
   fair enough :)

##########
File path: 
clients/src/main/java/org/apache/kafka/common/internals/KafkaCompletableFuture.java
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.internals;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+/**
+ * This internal class exists because CompletableFuture exposes complete(), 
completeExceptionally() and
+ * other methods which would allow erroneous completion by user code of a 
KafkaFuture returned from a
+ * Kafka API to a client application.
+ * @param <T> The type of the future value.
+ */
+public class KafkaCompletableFuture<T> extends CompletableFuture<T> {
+
+    boolean kafkaComplete(T value) {

Review comment:
       > The idea is that all Kafka code knows it's using a 
KafkaCompletableFuture and knows to complete it via the kafka*() methods.
   
   that is a good point. Could you add this comment? 

##########
File path: 
clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
##########
@@ -267,50 +180,82 @@ public T get() throws InterruptedException, 
ExecutionException {
     @Override
     public T get(long timeout, TimeUnit unit) throws InterruptedException, 
ExecutionException,
             TimeoutException {
-        SingleWaiter<T> waiter = new SingleWaiter<>();
-        addWaiter(waiter);
-        return waiter.await(timeout, unit);
+        try {
+            return completableFuture.get(timeout, unit);
+        } catch (ExecutionException e) {
+            maybeThrowCancellationException(e.getCause());
+            throw e;
+        }
     }
 
     /**
      * Returns the result value (or throws any encountered exception) if 
completed, else returns
      * the given valueIfAbsent.
      */
     @Override
-    public synchronized T getNow(T valueIfAbsent) throws InterruptedException, 
ExecutionException {
-        if (exception != null)
-            wrapAndThrow(exception);
-        if (done)
-            return value;
-        return valueIfAbsent;
+    public synchronized T getNow(T valueIfAbsent) throws ExecutionException {

Review comment:
       > But this wasn't described in the KIP and wouldn't be a source 
compatible change (existing code with a catch (InterruptedException)
   
   As it can cause compatible change, we should deprecate it and then add a new 
method (for example: get(T value)) to replace it. This can be discussed in 
another issue :)

##########
File path: clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java
##########
@@ -17,68 +17,265 @@
 package org.apache.kafka.common;
 
 import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.utils.Java;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
 
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
 
 /**
  * A unit test for KafkaFuture.
  */
 @Timeout(120)
 public class KafkaFutureTest {
 
+    /** Asserts that the given future is done, didn't fail and wasn't 
cancelled. */
+    private void assertIsSuccessful(KafkaFuture<?> future) {
+        assertTrue(future.isDone());
+        assertFalse(future.isCompletedExceptionally());
+        assertFalse(future.isCancelled());
+    }
+
+    /** Asserts that the given future is done, failed and wasn't cancelled. */
+    private void assertIsFailed(KafkaFuture<?> future) {
+        assertTrue(future.isDone());
+        assertFalse(future.isCancelled());
+        assertTrue(future.isCompletedExceptionally());
+    }
+
+    /** Asserts that the given future is done, didn't fail and was cancelled. 
*/
+    private void assertIsCancelled(KafkaFuture<?> future) {
+        assertTrue(future.isDone());
+        assertTrue(future.isCancelled());
+        assertTrue(future.isCompletedExceptionally());
+        assertThrows(CancellationException.class, () -> future.getNow(null));
+        assertThrows(CancellationException.class, () -> future.get(0, 
TimeUnit.MILLISECONDS));
+    }
+
+    private <T> void awaitAndAssertResult(KafkaFuture<T> future,
+                                          T expectedResult,
+                                          T alternativeValue) {
+        assertNotEquals(expectedResult, alternativeValue);
+        try {
+            assertEquals(expectedResult, future.get(5, TimeUnit.MINUTES));
+        } catch (Exception e) {
+            throw new AssertionError("Unexpected exception", e);
+        }
+        try {
+            assertEquals(expectedResult, future.get());
+        } catch (Exception e) {
+            throw new AssertionError("Unexpected exception", e);
+        }
+        try {
+            assertEquals(expectedResult, future.getNow(alternativeValue));
+        } catch (Exception e) {
+            throw new AssertionError("Unexpected exception", e);
+        }
+    }
+
+    private Throwable awaitAndAssertFailure(KafkaFuture<?> future,
+                                            Class<? extends Throwable> 
expectedException,
+                                            String expectedMessage) {
+        try {
+            future.get(5, TimeUnit.MINUTES);
+            fail("Expected an exception");
+        } catch (ExecutionException e) {
+            assertEquals(expectedException, e.getCause().getClass());
+            assertEquals(expectedMessage, e.getCause().getMessage());
+        } catch (Exception e) {
+            throw new AssertionError("Unexpected exception", e);
+        }
+        try {
+            future.get();
+            fail("Expected an exception");
+        } catch (ExecutionException e) {
+            assertEquals(expectedException, e.getCause().getClass());
+            assertEquals(expectedMessage, e.getCause().getMessage());
+        } catch (Exception e) {
+            throw new AssertionError("Unexpected exception", e);
+        }
+        try {
+            future.getNow(null);
+            fail("Expected an exception");
+        } catch (ExecutionException e) {
+            assertEquals(expectedException, e.getCause().getClass());
+            assertEquals(expectedMessage, e.getCause().getMessage());
+            return e.getCause();
+        } catch (Exception e) {
+            throw new AssertionError("Unexpected exception", e);
+        }
+        throw new AssertionError("Unexpected lack of exception");
+    }
+
+
+    private void awaitAndAssertCancelled(KafkaFuture<?> future, String 
expectedMessage) {
+        try {
+            future.get(5, TimeUnit.MINUTES);
+            fail("Expected an exception");
+        } catch (CancellationException e) {
+            assertEquals(CancellationException.class, e.getClass());
+            assertEquals(expectedMessage, e.getMessage());
+        } catch (Exception e) {
+            throw new AssertionError("Unexpected exception", e);
+        }
+        try {
+            future.get();
+            fail("Expected an exception");
+        } catch (CancellationException e) {
+            assertEquals(CancellationException.class, e.getClass());
+            assertEquals(expectedMessage, e.getMessage());
+        } catch (Exception e) {
+            throw new AssertionError("Unexpected exception", e);
+        }
+        try {
+            future.getNow(null);

Review comment:
       How about using following style.
   
   ```java
           ExecutionException e = assertThrows(ExecutionException.class, () -> 
future.getNow(null));
           assertEquals(expectedException, e.getCause().getClass());
           assertEquals(expectedMessage, e.getCause().getMessage());
           return e.getCause();
   ```

##########
File path: clients/src/main/java/org/apache/kafka/common/KafkaFuture.java
##########
@@ -184,7 +194,7 @@ public abstract T get(long timeout, TimeUnit unit) throws 
InterruptedException,
      * Returns the result value (or throws any encountered exception) if 
completed, else returns
      * the given valueIfAbsent.
      */
-    public abstract T getNow(T valueIfAbsent) throws InterruptedException, 
ExecutionException;
+    public abstract T getNow(T valueIfAbsent) throws ExecutionException;

Review comment:
       Could you please revert this change as you mentioned that it can cause 
compatible issue.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
##########
@@ -27,217 +28,111 @@
 
 /**
  * A flexible future which supports call chaining and other asynchronous 
programming patterns.
- * This will eventually become a thin shim on top of Java 8's 
CompletableFuture.
  */
 public class KafkaFutureImpl<T> extends KafkaFuture<T> {
-    /**
-     * A convenience method that throws the current exception, wrapping it if 
needed.
-     *
-     * In general, KafkaFuture throws CancellationException and 
InterruptedException directly, and
-     * wraps all other exceptions in an ExecutionException.
-     */
-    private static void wrapAndThrow(Throwable t) throws InterruptedException, 
ExecutionException {
-        if (t instanceof CancellationException) {
-            throw (CancellationException) t;
-        } else if (t instanceof InterruptedException) {
-            throw (InterruptedException) t;
-        } else {
-            throw new ExecutionException(t);
-        }
-    }
 
-    private static class Applicant<A, B> implements BiConsumer<A, Throwable> {
-        private final BaseFunction<A, B> function;
-        private final KafkaFutureImpl<B> future;
+    private final KafkaCompletableFuture<T> completableFuture;
 
-        Applicant(BaseFunction<A, B> function, KafkaFutureImpl<B> future) {
-            this.function = function;
-            this.future = future;
-        }
+    private final boolean isDependant;
 
-        @Override
-        public void accept(A a, Throwable exception) {
-            if (exception != null) {
-                future.completeExceptionally(exception);
-            } else {
-                try {
-                    B b = function.apply(a);
-                    future.complete(b);
-                } catch (Throwable t) {
-                    future.completeExceptionally(t);
-                }
-            }
-        }
+    public KafkaFutureImpl() {
+        this(false, new KafkaCompletableFuture<>());
     }
 
-    private static class SingleWaiter<R> implements BiConsumer<R, Throwable> {
-        private R value = null;
-        private Throwable exception = null;
-        private boolean done = false;
-
-        @Override
-        public synchronized void accept(R newValue, Throwable newException) {
-            this.value = newValue;
-            this.exception = newException;
-            this.done = true;
-            this.notifyAll();
-        }
-
-        synchronized R await() throws InterruptedException, ExecutionException 
{
-            while (true) {
-                if (exception != null)
-                    wrapAndThrow(exception);
-                if (done)
-                    return value;
-                this.wait();
-            }
-        }
-
-        R await(long timeout, TimeUnit unit)
-                throws InterruptedException, ExecutionException, 
TimeoutException {
-            long startMs = System.currentTimeMillis();
-            long waitTimeMs = unit.toMillis(timeout);
-            long delta = 0;
-            synchronized (this) {
-                while (true) {
-                    if (exception != null)
-                        wrapAndThrow(exception);
-                    if (done)
-                        return value;
-                    if (delta >= waitTimeMs) {
-                        throw new TimeoutException();
-                    }
-                    this.wait(waitTimeMs - delta);
-                    delta = System.currentTimeMillis() - startMs;
-                }
-            }
-        }
+    public KafkaFutureImpl(KafkaCompletableFuture<T> completableFuture) {

Review comment:
       Is this method still useful?

##########
File path: 
clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
##########
@@ -267,50 +180,82 @@ public T get() throws InterruptedException, 
ExecutionException {
     @Override
     public T get(long timeout, TimeUnit unit) throws InterruptedException, 
ExecutionException,
             TimeoutException {
-        SingleWaiter<T> waiter = new SingleWaiter<>();
-        addWaiter(waiter);
-        return waiter.await(timeout, unit);
+        try {
+            return completableFuture.get(timeout, unit);
+        } catch (ExecutionException e) {
+            maybeThrowCancellationException(e.getCause());
+            throw e;
+        }
     }
 
     /**
      * Returns the result value (or throws any encountered exception) if 
completed, else returns
      * the given valueIfAbsent.
      */
     @Override
-    public synchronized T getNow(T valueIfAbsent) throws InterruptedException, 
ExecutionException {
-        if (exception != null)
-            wrapAndThrow(exception);
-        if (done)
-            return value;
-        return valueIfAbsent;
+    public synchronized T getNow(T valueIfAbsent) throws ExecutionException {

Review comment:
       > A new method also means their code is then not binary compatible with 
older client versions.
   
   yep, the BC could be broken. What I really care is the source compatibility. 
It seems to me a public API should be source compatible to next major release :)
   
   This patch is good to go and we need more time (rather than vespene gas ... 
StarCraft is my favor game :) ) to reach consensus.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to