dajac commented on code in PR #14680:
URL: https://github.com/apache/kafka/pull/14680#discussion_r1380330986


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -1096,4 +1114,47 @@ private void subscribeInternal(Collection<String> 
topics, Optional<ConsumerRebal
         }
     }
 
+    private void maybeThrowFencedInstanceException() {
+        if (fencedInstance.get()) {
+            throw new FencedInstanceIdException("Get fenced exception for 
group.instance.id " +
+                groupInstanceId.orElse("null"));
+        }
+    }
+
+    // Visible for testing
+    void maybeInvokeCallbacks() {
+        if (callbacks() > 0) {
+            invoker.executeCallbacks();
+        }
+    }
+
+    // Visible for testing
+    int callbacks() {
+        return invoker.callbackQueue.size();
+    }
+
+    /**
+     * Utility class that helps the application thread to invoke user 
registered callbacks such as
+     * {@link OffsetCommitCallback}.  This is achieved by having the 
background thread to register a runnable to the
+     * invoker upon the future completion, and execute the callbacks when user 
polls the consumer.
+     */
+    private static class CallbackInvoker {
+        // Thread-safe queue to store callbacks
+        private final BlockingQueue<Runnable> callbackQueue = new 
LinkedBlockingQueue<>();

Review Comment:
   I also wonder if we really need this new queue. Could we rely on the 
existing queue between the background and the foreground threads? I haven't 
look into it myself but I wonder...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to