chia7712 commented on a change in pull request #9878:
URL: https://github.com/apache/kafka/pull/9878#discussion_r663136853



##########
File path: clients/src/main/java/org/apache/kafka/common/KafkaFuture.java
##########
@@ -19,75 +19,57 @@
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
 
+import java.util.Arrays;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 /**
- * A flexible future which supports call chaining and other asynchronous 
programming patterns. This will
- * eventually become a thin shim on top of Java 8's CompletableFuture.
+ * A flexible future which supports call chaining and other asynchronous 
programming patterns.
  *
- * The API for this class is still evolving and we may break compatibility in 
minor releases, if necessary.
+ * <h3>Relation to {@code CompletionStage}</h3>
+ * <p>It is possible to obtain a {@code CompletionStage} from a
+ * {@code KafkaFuture} instance by calling {@link #toCompletionStage()}.
+ * If converting {@link KafkaFuture#whenComplete(BiConsumer)} or {@link 
KafkaFuture#thenApply(BaseFunction)} to
+ * {@link CompletableFuture#whenComplete(java.util.function.BiConsumer)} or
+ * {@link CompletableFuture#thenApply(java.util.function.Function)} be aware 
that the returned
+ * {@code KafkaFuture} will fail with an {@code ExecutionException}, whereas a 
{@code CompletionStage} fails
+ * with a {@code CompletionException}.
  */
 @InterfaceStability.Evolving

Review comment:
       Should we remove this annotation.
   
   KIP-707 has following description.
   
   ```
   Remove the @InterfaceStability.Evolving annotation on KafkaFuture to reflect 
the reality that changing this class incompatibly would cause of too much user 
code to break.
   ```
   

##########
File path: 
clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
##########
@@ -267,50 +180,82 @@ public T get() throws InterruptedException, 
ExecutionException {
     @Override
     public T get(long timeout, TimeUnit unit) throws InterruptedException, 
ExecutionException,
             TimeoutException {
-        SingleWaiter<T> waiter = new SingleWaiter<>();
-        addWaiter(waiter);
-        return waiter.await(timeout, unit);
+        try {
+            return completableFuture.get(timeout, unit);
+        } catch (ExecutionException e) {
+            maybeThrowCancellationException(e.getCause());
+            throw e;
+        }
     }
 
     /**
      * Returns the result value (or throws any encountered exception) if 
completed, else returns
      * the given valueIfAbsent.
      */
     @Override
-    public synchronized T getNow(T valueIfAbsent) throws InterruptedException, 
ExecutionException {
-        if (exception != null)
-            wrapAndThrow(exception);
-        if (done)
-            return value;
-        return valueIfAbsent;
+    public synchronized T getNow(T valueIfAbsent) throws ExecutionException {

Review comment:
       Does it need to remove `InterruptedException` from other get methods 
also? For another, why we don't remove `InterruptedException` from 
`KafkaFuture#getNow`?

##########
File path: 
clients/src/main/java/org/apache/kafka/common/internals/KafkaCompletableFuture.java
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.internals;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+/**
+ * This internal class exists because CompletableFuture exposes complete(), 
completeExceptionally() and
+ * other methods which would allow erroneous completion by user code of a 
KafkaFuture returned from a
+ * Kafka API to a client application.
+ * @param <T> The type of the future value.
+ */
+public class KafkaCompletableFuture<T> extends CompletableFuture<T> {
+
+    boolean kafkaComplete(T value) {
+        return super.complete(value);
+    }
+
+    boolean kafkaCompleteExceptionally(Throwable throwable) {
+        return super.completeExceptionally(throwable);
+    }
+
+    @Override
+    public boolean complete(T value) {
+        throw new UnsupportedOperationException();

Review comment:
       How about adding some comments to this exception?

##########
File path: 
clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
##########
@@ -247,17 +142,35 @@ public boolean completeExceptionally(Throwable 
newException) {
      */
     @Override
     public synchronized boolean cancel(boolean mayInterruptIfRunning) {

Review comment:
       Do we still need `synchronized`? It seems to me `CompletableFuture` can 
deal with thread issue.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/internals/KafkaCompletableFuture.java
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.internals;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+/**
+ * This internal class exists because CompletableFuture exposes complete(), 
completeExceptionally() and
+ * other methods which would allow erroneous completion by user code of a 
KafkaFuture returned from a
+ * Kafka API to a client application.
+ * @param <T> The type of the future value.
+ */
+public class KafkaCompletableFuture<T> extends CompletableFuture<T> {

Review comment:
       Why not throwing exception when users call `toCompletableFuture`? If 
users want to use blocking method (ex. `get()`), they ought to call 
`KafkaFuture#get` instead. What is the benefit if we expose 
`toCompletableFuture`? 
   

##########
File path: 
clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
##########
@@ -267,50 +180,82 @@ public T get() throws InterruptedException, 
ExecutionException {
     @Override
     public T get(long timeout, TimeUnit unit) throws InterruptedException, 
ExecutionException,
             TimeoutException {
-        SingleWaiter<T> waiter = new SingleWaiter<>();
-        addWaiter(waiter);
-        return waiter.await(timeout, unit);
+        try {
+            return completableFuture.get(timeout, unit);
+        } catch (ExecutionException e) {
+            maybeThrowCancellationException(e.getCause());
+            throw e;
+        }
     }
 
     /**
      * Returns the result value (or throws any encountered exception) if 
completed, else returns
      * the given valueIfAbsent.
      */
     @Override
-    public synchronized T getNow(T valueIfAbsent) throws InterruptedException, 
ExecutionException {
-        if (exception != null)
-            wrapAndThrow(exception);
-        if (done)
-            return value;
-        return valueIfAbsent;
+    public synchronized T getNow(T valueIfAbsent) throws ExecutionException {
+        try {
+            return completableFuture.getNow(valueIfAbsent);
+        } catch (CompletionException e) {
+            maybeThrowCancellationException(e.getCause());
+            // Note, unlike CompletableFuture#get() which throws 
ExecutionException, CompletableFuture#getNow()
+            // throws CompletionException, thus needs rewrapping to conform to 
KafkaFuture API,
+            // where KafkaFuture#getNow() throws ExecutionException.
+            throw new ExecutionException(e.getCause());
+        }
     }
 
     /**
      * Returns true if this CompletableFuture was cancelled before it 
completed normally.
      */
     @Override
     public synchronized boolean isCancelled() {
-        return exception instanceof CancellationException;
+        if (isDependant) {
+            // Having isCancelled() for a dependent future just return
+            // CompletableFuture.isCancelled() would break the historical 
KafkaFuture behaviour because
+            // CompleteableFuture#isCancelled() just checks for the exception 
being CancellationException
+            // whereas it will be a CompletionException wrapping a 
CancellationException
+            // due needing to compensate for CompleteableFuture's 
CompletionException unwrapping

Review comment:
       typo: CompleteableFuture -> CompletableFuture

##########
File path: 
clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
##########
@@ -267,50 +180,82 @@ public T get() throws InterruptedException, 
ExecutionException {
     @Override
     public T get(long timeout, TimeUnit unit) throws InterruptedException, 
ExecutionException,
             TimeoutException {
-        SingleWaiter<T> waiter = new SingleWaiter<>();
-        addWaiter(waiter);
-        return waiter.await(timeout, unit);
+        try {
+            return completableFuture.get(timeout, unit);
+        } catch (ExecutionException e) {
+            maybeThrowCancellationException(e.getCause());
+            throw e;
+        }
     }
 
     /**
      * Returns the result value (or throws any encountered exception) if 
completed, else returns
      * the given valueIfAbsent.
      */
     @Override
-    public synchronized T getNow(T valueIfAbsent) throws InterruptedException, 
ExecutionException {
-        if (exception != null)
-            wrapAndThrow(exception);
-        if (done)
-            return value;
-        return valueIfAbsent;
+    public synchronized T getNow(T valueIfAbsent) throws ExecutionException {
+        try {
+            return completableFuture.getNow(valueIfAbsent);
+        } catch (CompletionException e) {
+            maybeThrowCancellationException(e.getCause());
+            // Note, unlike CompletableFuture#get() which throws 
ExecutionException, CompletableFuture#getNow()
+            // throws CompletionException, thus needs rewrapping to conform to 
KafkaFuture API,
+            // where KafkaFuture#getNow() throws ExecutionException.
+            throw new ExecutionException(e.getCause());
+        }
     }
 
     /**
      * Returns true if this CompletableFuture was cancelled before it 
completed normally.
      */
     @Override
     public synchronized boolean isCancelled() {
-        return exception instanceof CancellationException;
+        if (isDependant) {
+            // Having isCancelled() for a dependent future just return
+            // CompletableFuture.isCancelled() would break the historical 
KafkaFuture behaviour because
+            // CompleteableFuture#isCancelled() just checks for the exception 
being CancellationException

Review comment:
       typo: CompleteableFuture -> CompletableFuture

##########
File path: 
clients/src/main/java/org/apache/kafka/common/internals/KafkaCompletableFuture.java
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.internals;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+/**
+ * This internal class exists because CompletableFuture exposes complete(), 
completeExceptionally() and
+ * other methods which would allow erroneous completion by user code of a 
KafkaFuture returned from a
+ * Kafka API to a client application.
+ * @param <T> The type of the future value.
+ */
+public class KafkaCompletableFuture<T> extends CompletableFuture<T> {
+
+    boolean kafkaComplete(T value) {

Review comment:
       why it needs prefix "kafka"? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to