lucasbru commented on code in PR #14680:
URL: https://github.com/apache/kafka/pull/14680#discussion_r1387628672


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -278,7 +286,8 @@ public PrototypeAsyncConsumer(final Time time,
         }
     }
 
-    public PrototypeAsyncConsumer(LogContext logContext,
+    // Visible for testing
+    PrototypeAsyncConsumer(LogContext logContext,

Review Comment:
   You will have to fix the indentation of the other parameters.



##########
core/src/test/scala/integration/kafka/api/BaseAsyncConsumerTest.scala:
##########
@@ -16,7 +16,7 @@
  */
 package kafka.api
 
-import kafka.utils.TestUtils.waitUntilTrue
+import kafka.utils.TestUtils.{waitUntilTrue}

Review Comment:
   Do we need curly brackets?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -381,21 +391,28 @@ public void commitAsync(OffsetCommitCallback callback) {
     @Override
     public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, 
OffsetCommitCallback callback) {
         CompletableFuture<Void> future = commit(offsets, false);
-        final OffsetCommitCallback commitCallback = callback == null ? new 
DefaultOffsetCommitCallback() : callback;
         future.whenComplete((r, t) -> {
-            if (t != null) {
-                commitCallback.onComplete(offsets, new KafkaException(t));
-            } else {
-                commitCallback.onComplete(offsets, null);
+            if (callback == null) {
+                if (t != null) {
+                    log.error("Offset commit with offsets {} failed", offsets, 
t);
+                }
+                return;
             }
-        }).exceptionally(e -> {
-            throw new KafkaException(e);
+
+            invoker.submit(new OffsetCommitCallbackTask(callback, offsets, 
(Exception) t));
         });
     }
 
     // Visible for testing
     CompletableFuture<Void> commit(Map<TopicPartition, OffsetAndMetadata> 
offsets, final boolean isWakeupable) {

Review Comment:
   nit: some parameters final, some non-final



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -1096,4 +1111,76 @@ private void subscribeInternal(Collection<String> 
topics, Optional<ConsumerRebal
         }
     }
 
-}
\ No newline at end of file
+    private void maybeThrowFencedInstanceException() {
+        if (isFenced) {
+            throw new FencedInstanceIdException("Get fenced exception for 
group.instance.id " +
+                groupInstanceId.orElse("null"));
+        }
+    }
+
+    // Visible for testing
+    void maybeInvokeCallbacks() {

Review Comment:
   nit: how about `maybeInvokeCommitCallbacks`?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -1096,4 +1111,76 @@ private void subscribeInternal(Collection<String> 
topics, Optional<ConsumerRebal
         }
     }
 
-}
\ No newline at end of file
+    private void maybeThrowFencedInstanceException() {
+        if (isFenced) {
+            throw new FencedInstanceIdException("Get fenced exception for 
group.instance.id " +
+                groupInstanceId.orElse("null"));
+        }
+    }
+
+    // Visible for testing
+    void maybeInvokeCallbacks() {
+        if (callbacks() > 0) {
+            invoker.executeCallbacks();
+        }
+    }
+
+    // Visible for testing
+    int callbacks() {
+        return invoker.callbackQueue.size();
+    }
+
+    /**
+     * Utility class that helps the application thread to invoke user 
registered {@link OffsetCommitCallback}. This is
+     * achieved by having the background thread to register a {@link 
OffsetCommitCallbackTask} to the invoker upon the
+     * future completion, and execute the callbacks when user 
polls/commits/closes the consumer.
+     */
+    private class OffsetCommitCallbackInvoker {
+        // Thread-safe queue to store callbacks
+        private final BlockingQueue<OffsetCommitCallbackTask> callbackQueue = 
new LinkedBlockingQueue<>();
+
+        public void submit(final OffsetCommitCallbackTask callback) {
+            try {
+                callbackQueue.offer(callback);
+            } catch (Exception e) {
+                log.error("Failed to enqueue OffsetCommitCallback", e);
+            }
+        }
+
+        public void executeCallbacks() {
+            LinkedList<OffsetCommitCallbackTask> callbacks = new 
LinkedList<>();
+            callbackQueue.drainTo(callbacks);
+            while (!callbacks.isEmpty()) {
+                OffsetCommitCallbackTask callback = callbacks.poll();
+                if (callback != null) {
+                    callback.invoke();
+                }
+            }
+        }
+    }
+
+    private class OffsetCommitCallbackTask {

Review Comment:
   Should be a static class.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -712,6 +729,10 @@ private void close(Duration timeout, boolean 
swallowException) {
         if (applicationEventHandler != null)
             closeQuietly(() -> applicationEventHandler.close(timeout), "Failed 
to close application event handler with a timeout(ms)=" + timeout, 
firstException);
 
+        // Invoke all callbacks after the background thread exists in case if 
there are unsent async
+        // commits
+        maybeInvokeCallbacks();

Review Comment:
   Why do we not check for fenced exceptions here?
   
   The original consumer attempts to complete all in-flight async commit 
requests within `timeout` ms. Will closing the background thread take care of 
this?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -147,6 +151,10 @@ public class PrototypeAsyncConsumer<K, V> implements 
Consumer<K, V> {
     // to keep from repeatedly scanning subscriptions in poll(), cache the 
result during metadata updates
     private boolean cachedSubscriptionHasAllFetchPositions;
     private final WakeupTrigger wakeupTrigger = new WakeupTrigger();
+    private volatile boolean isFenced = false;

Review Comment:
   I'm wondering why we need this member. It looks like the volatile boolean is 
written and read only from the application thread. Could it then just be a 
boolean? But if so, could we just throw the exception directly and avoid the 
indirection?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -381,21 +391,28 @@ public void commitAsync(OffsetCommitCallback callback) {
     @Override
     public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, 
OffsetCommitCallback callback) {
         CompletableFuture<Void> future = commit(offsets, false);
-        final OffsetCommitCallback commitCallback = callback == null ? new 
DefaultOffsetCommitCallback() : callback;
         future.whenComplete((r, t) -> {
-            if (t != null) {
-                commitCallback.onComplete(offsets, new KafkaException(t));
-            } else {
-                commitCallback.onComplete(offsets, null);
+            if (callback == null) {
+                if (t != null) {
+                    log.error("Offset commit with offsets {} failed", offsets, 
t);
+                }
+                return;
             }
-        }).exceptionally(e -> {
-            throw new KafkaException(e);
+
+            invoker.submit(new OffsetCommitCallbackTask(callback, offsets, 
(Exception) t));
         });
     }
 
     // Visible for testing
     CompletableFuture<Void> commit(Map<TopicPartition, OffsetAndMetadata> 
offsets, final boolean isWakeupable) {
+        maybeThrowFencedInstanceException();
+        maybeInvokeCallbacks();
         maybeThrowInvalidGroupIdException();
+
+        if (offsets.isEmpty()) {
+            return CompletableFuture.completedFuture(null);
+        }

Review Comment:
   Maybe not related to this concrete PR, but the original commit code has some 
logic around updating the last seen leader epochs here, can we omit it, or is 
it a follow-up task?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -1096,4 +1111,76 @@ private void subscribeInternal(Collection<String> 
topics, Optional<ConsumerRebal
         }
     }
 
-}
\ No newline at end of file
+    private void maybeThrowFencedInstanceException() {
+        if (isFenced) {
+            throw new FencedInstanceIdException("Get fenced exception for 
group.instance.id " +
+                groupInstanceId.orElse("null"));
+        }
+    }
+
+    // Visible for testing
+    void maybeInvokeCallbacks() {
+        if (callbacks() > 0) {
+            invoker.executeCallbacks();
+        }
+    }
+
+    // Visible for testing
+    int callbacks() {
+        return invoker.callbackQueue.size();
+    }
+
+    /**
+     * Utility class that helps the application thread to invoke user 
registered {@link OffsetCommitCallback}. This is
+     * achieved by having the background thread to register a {@link 
OffsetCommitCallbackTask} to the invoker upon the
+     * future completion, and execute the callbacks when user 
polls/commits/closes the consumer.
+     */
+    private class OffsetCommitCallbackInvoker {
+        // Thread-safe queue to store callbacks
+        private final BlockingQueue<OffsetCommitCallbackTask> callbackQueue = 
new LinkedBlockingQueue<>();
+
+        public void submit(final OffsetCommitCallbackTask callback) {
+            try {
+                callbackQueue.offer(callback);
+            } catch (Exception e) {
+                log.error("Failed to enqueue OffsetCommitCallback", e);
+            }
+        }
+
+        public void executeCallbacks() {
+            LinkedList<OffsetCommitCallbackTask> callbacks = new 
LinkedList<>();

Review Comment:
   Why do we need to create a copy of the queue here? Are we concerned about 
consumer invocations inside the callback? Then maybe a put a comment here.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -1096,4 +1111,76 @@ private void subscribeInternal(Collection<String> 
topics, Optional<ConsumerRebal
         }
     }
 
-}
\ No newline at end of file
+    private void maybeThrowFencedInstanceException() {
+        if (isFenced) {
+            throw new FencedInstanceIdException("Get fenced exception for 
group.instance.id " +
+                groupInstanceId.orElse("null"));
+        }
+    }
+
+    // Visible for testing
+    void maybeInvokeCallbacks() {
+        if (callbacks() > 0) {
+            invoker.executeCallbacks();
+        }
+    }
+
+    // Visible for testing
+    int callbacks() {
+        return invoker.callbackQueue.size();
+    }
+
+    /**
+     * Utility class that helps the application thread to invoke user 
registered {@link OffsetCommitCallback}. This is
+     * achieved by having the background thread to register a {@link 
OffsetCommitCallbackTask} to the invoker upon the

Review Comment:
   ```suggestion
        * achieved by having the background thread register a {@link 
OffsetCommitCallbackTask} to the invoker upon the
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -1010,15 +1031,9 @@ private void updateLastSeenEpochIfNewer(TopicPartition 
topicPartition, OffsetAnd
             offsetAndMetadata.leaderEpoch().ifPresent(epoch -> 
metadata.updateLastSeenEpochIfNewer(topicPartition, epoch));
     }
 
-    private class DefaultOffsetCommitCallback implements OffsetCommitCallback {
-        @Override
-        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, 
Exception exception) {
-            if (exception != null)
-                log.error("Offset commit with offsets {} failed", offsets, 
exception);
-        }
-    }
-
     boolean updateAssignmentMetadataIfNeeded(Timer timer) {
+        maybeInvokeCallbacks();

Review Comment:
   Sometimes we check for the fencedInstance first, sometimes we invoke the 
callbacks first.
   
   I think in a previous version of the PR and in the original consumer we 
invoke at the beginning of `poll`. Why did you move this into 
`updateAssignmentMetadataIfNeeded`? 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -1096,4 +1111,76 @@ private void subscribeInternal(Collection<String> 
topics, Optional<ConsumerRebal
         }
     }
 
-}
\ No newline at end of file
+    private void maybeThrowFencedInstanceException() {
+        if (isFenced) {
+            throw new FencedInstanceIdException("Get fenced exception for 
group.instance.id " +
+                groupInstanceId.orElse("null"));
+        }
+    }
+
+    // Visible for testing
+    void maybeInvokeCallbacks() {
+        if (callbacks() > 0) {
+            invoker.executeCallbacks();
+        }
+    }
+
+    // Visible for testing
+    int callbacks() {
+        return invoker.callbackQueue.size();
+    }
+
+    /**
+     * Utility class that helps the application thread to invoke user 
registered {@link OffsetCommitCallback}. This is
+     * achieved by having the background thread to register a {@link 
OffsetCommitCallbackTask} to the invoker upon the
+     * future completion, and execute the callbacks when user 
polls/commits/closes the consumer.
+     */
+    private class OffsetCommitCallbackInvoker {

Review Comment:
   Should be a static class.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -1096,4 +1111,76 @@ private void subscribeInternal(Collection<String> 
topics, Optional<ConsumerRebal
         }
     }
 
-}
\ No newline at end of file
+    private void maybeThrowFencedInstanceException() {
+        if (isFenced) {
+            throw new FencedInstanceIdException("Get fenced exception for 
group.instance.id " +
+                groupInstanceId.orElse("null"));
+        }
+    }
+
+    // Visible for testing
+    void maybeInvokeCallbacks() {
+        if (callbacks() > 0) {
+            invoker.executeCallbacks();
+        }
+    }
+
+    // Visible for testing
+    int callbacks() {
+        return invoker.callbackQueue.size();
+    }
+
+    /**
+     * Utility class that helps the application thread to invoke user 
registered {@link OffsetCommitCallback}. This is
+     * achieved by having the background thread to register a {@link 
OffsetCommitCallbackTask} to the invoker upon the
+     * future completion, and execute the callbacks when user 
polls/commits/closes the consumer.
+     */
+    private class OffsetCommitCallbackInvoker {
+        // Thread-safe queue to store callbacks
+        private final BlockingQueue<OffsetCommitCallbackTask> callbackQueue = 
new LinkedBlockingQueue<>();
+
+        public void submit(final OffsetCommitCallbackTask callback) {
+            try {
+                callbackQueue.offer(callback);
+            } catch (Exception e) {
+                log.error("Failed to enqueue OffsetCommitCallback", e);

Review Comment:
   Why do we expect an exception inside offer?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to