kirktrue commented on code in PR #14680: URL: https://github.com/apache/kafka/pull/14680#discussion_r1380596565
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ########## @@ -381,20 +394,33 @@ public void commitAsync(OffsetCommitCallback callback) { @Override public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) { CompletableFuture<Void> future = commit(offsets, false); - final OffsetCommitCallback commitCallback = callback == null ? new DefaultOffsetCommitCallback() : callback; future.whenComplete((r, t) -> { - if (t != null) { - commitCallback.onComplete(offsets, new KafkaException(t)); - } else { - commitCallback.onComplete(offsets, null); + if (callback == null) { + if (t != null) { + log.error("Offset commit with offsets {} failed", offsets, t); + } + return; } - }).exceptionally(e -> { - throw new KafkaException(e); + + Runnable task = () -> { + if (t instanceof RetriableException) { + callback.onComplete(offsets, new RetriableCommitFailedException(t)); + } else if (t instanceof FencedInstanceIdException) { + fencedInstance.set(true); + callback.onComplete(offsets, (Exception) t); + } else if (t != null) { + callback.onComplete(offsets, (Exception) t); + } else { + callback.onComplete(offsets, null); + } + }; + invoker.submit(task); // Store the callback task for future execution }); Review Comment: The code passed to `whenComplete` will be run on the background thread. Therefore, it's "acceptable" (IMO) to enqueue a new event onto the background event queue. Then, when it's later run in the background event processor, the event processing should trigger the code you have here. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ########## @@ -326,6 +337,8 @@ public PrototypeAsyncConsumer(LogContext logContext, */ @Override public ConsumerRecords<K, V> poll(final Duration timeout) { + maybeInvokeCallbacks(); + maybeThrowFencedInstanceException(); Review Comment: Ideally, these checks should be added to the logic in `BackgroundEventProcessor`. Right now, `BackgroundEventProcessor` is a separate class, which makes it a bit awkward, since you need access to the consumer's private instance variables. In a PR that I have in review, the `BackgroundEventProcessor` is an inner class of the consumer, making getting and setting the instance variable values trivial. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ########## @@ -147,6 +152,10 @@ public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> { // to keep from repeatedly scanning subscriptions in poll(), cache the result during metadata updates private boolean cachedSubscriptionHasAllFetchPositions; private final WakeupTrigger wakeupTrigger = new WakeupTrigger(); + private final AtomicBoolean fencedInstance = new AtomicBoolean(false); Review Comment: ```suggestion private volatile boolean isFenced = false; ``` Any reason this needs to be anything other than a `boolean`? ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java: ########## @@ -168,6 +200,45 @@ public void testCommitted_ExceptionThrown() { } } + @Test + public void testEnsureCallbackExecutedByApplicationThread() { + final String currentThread = Thread.currentThread().getName(); + ExecutorService backgroundExecutor = Executors.newSingleThreadExecutor(); + MockCommitCallback callback = new MockCommitCallback(); + CountDownLatch latch = new CountDownLatch(1); // Initialize the latch with a count of 1 + try { + CompletableFuture<Void> future = new CompletableFuture<>(); + doReturn(future).when(consumer).commit(new HashMap<>(), false); + assertDoesNotThrow(() -> consumer.commitAsync(new HashMap<>(), callback)); + // Simulating some background work + backgroundExecutor.submit(() -> { + future.complete(null); + latch.countDown(); + }); + latch.await(); + assertEquals(1, consumer.callbacks()); + consumer.maybeInvokeCallbacks(); + assertEquals(currentThread, callback.completionThread); + } catch (Exception e) { + fail("Not expecting an exception"); + } finally { + backgroundExecutor.shutdown(); + } + } + + private static class MockCommitCallback implements OffsetCommitCallback { + public int invoked = 0; + public Exception exception = null; + public String completionThread; + + @Override + public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { + System.out.println("invoke"); Review Comment: We should remove the `System.out.println()` from committed code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org