kirktrue commented on code in PR #14680:
URL: https://github.com/apache/kafka/pull/14680#discussion_r1380596565


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -381,20 +394,33 @@ public void commitAsync(OffsetCommitCallback callback) {
     @Override
     public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, 
OffsetCommitCallback callback) {
         CompletableFuture<Void> future = commit(offsets, false);
-        final OffsetCommitCallback commitCallback = callback == null ? new 
DefaultOffsetCommitCallback() : callback;
         future.whenComplete((r, t) -> {
-            if (t != null) {
-                commitCallback.onComplete(offsets, new KafkaException(t));
-            } else {
-                commitCallback.onComplete(offsets, null);
+            if (callback == null) {
+                if (t != null) {
+                    log.error("Offset commit with offsets {} failed", offsets, 
t);
+                }
+                return;
             }
-        }).exceptionally(e -> {
-            throw new KafkaException(e);
+
+            Runnable task = () -> {
+                if (t instanceof RetriableException) {
+                    callback.onComplete(offsets, new 
RetriableCommitFailedException(t));
+                } else if (t instanceof FencedInstanceIdException) {
+                    fencedInstance.set(true);
+                    callback.onComplete(offsets, (Exception) t);
+                } else if (t != null) {
+                    callback.onComplete(offsets, (Exception) t);
+                } else {
+                    callback.onComplete(offsets, null);
+                }
+            };
+            invoker.submit(task);  // Store the callback task for future 
execution
         });

Review Comment:
   The code passed to `whenComplete` will be run on the background thread. 
Therefore, it's "acceptable" (IMO) to enqueue a new event onto the background 
event queue. Then, when it's later run in the background event processor, the 
event processing should trigger the code you have here.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -326,6 +337,8 @@ public PrototypeAsyncConsumer(LogContext logContext,
      */
     @Override
     public ConsumerRecords<K, V> poll(final Duration timeout) {
+        maybeInvokeCallbacks();
+        maybeThrowFencedInstanceException();

Review Comment:
   Ideally, these checks should be added to the logic in 
`BackgroundEventProcessor`. Right now, `BackgroundEventProcessor` is a separate 
class, which makes it a bit awkward, since you need access to the consumer's 
private instance variables. In a PR that I have in review, the 
`BackgroundEventProcessor` is an inner class of the consumer, making getting 
and setting the instance variable values trivial.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -147,6 +152,10 @@ public class PrototypeAsyncConsumer<K, V> implements 
Consumer<K, V> {
     // to keep from repeatedly scanning subscriptions in poll(), cache the 
result during metadata updates
     private boolean cachedSubscriptionHasAllFetchPositions;
     private final WakeupTrigger wakeupTrigger = new WakeupTrigger();
+    private final AtomicBoolean fencedInstance = new AtomicBoolean(false);

Review Comment:
   ```suggestion
       private volatile boolean isFenced = false;
   ```
   
   Any reason this needs to be anything other than a `boolean`?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java:
##########
@@ -168,6 +200,45 @@ public void testCommitted_ExceptionThrown() {
         }
     }
 
+    @Test
+    public void testEnsureCallbackExecutedByApplicationThread() {
+        final String currentThread = Thread.currentThread().getName();
+        ExecutorService backgroundExecutor = 
Executors.newSingleThreadExecutor();
+        MockCommitCallback callback = new MockCommitCallback();
+        CountDownLatch latch = new CountDownLatch(1);  // Initialize the latch 
with a count of 1
+        try {
+            CompletableFuture<Void> future = new CompletableFuture<>();
+            doReturn(future).when(consumer).commit(new HashMap<>(), false);
+            assertDoesNotThrow(() -> consumer.commitAsync(new HashMap<>(), 
callback));
+            // Simulating some background work
+            backgroundExecutor.submit(() -> {
+                future.complete(null);
+                latch.countDown();
+            });
+            latch.await();
+            assertEquals(1, consumer.callbacks());
+            consumer.maybeInvokeCallbacks();
+            assertEquals(currentThread, callback.completionThread);
+        } catch (Exception e) {
+            fail("Not expecting an exception");
+        } finally {
+            backgroundExecutor.shutdown();
+        }
+    }
+
+    private static class MockCommitCallback implements OffsetCommitCallback {
+        public int invoked = 0;
+        public Exception exception = null;
+        public String completionThread;
+
+        @Override
+        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, 
Exception exception) {
+            System.out.println("invoke");

Review Comment:
   We should remove the `System.out.println()` from committed code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to