lucasbru commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1560885552
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -1952,10 +1988,6 @@ private void maybeThrowFencedInstanceException() { } } - private void maybeInvokeCommitCallbacks() { - offsetCommitCallbackInvoker.executeCallbacks(); - } - Review Comment: For me, abstracting this one-liner is more obfuscating than it is helping, but if you insist, I can bring it back. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -1388,6 +1393,37 @@ public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets, Duration } } + private void awaitPendingAsyncCommitsAndExecuteCommitCallbacks(Timer timer, boolean disableWakeup) { + if (lastPendingAsyncCommit == null) { + return; + } + + try { + CompletableFuture<Void> futureToAwait; + if (!disableWakeup) { + // We don't want the wake-up trigger to complete our pending async commit future, + // so create new future here. + futureToAwait = new CompletableFuture<>(); + lastPendingAsyncCommit.whenComplete((v, t) -> { + if (t != null) { + futureToAwait.completeExceptionally(t); + } else { + futureToAwait.complete(v); + } + }); + wakeupTrigger.setActiveTask(futureToAwait); + } else { + futureToAwait = lastPendingAsyncCommit; + } + ConsumerUtils.getResult(futureToAwait, timer); + lastPendingAsyncCommit = null; + } finally { + if (!disableWakeup) wakeupTrigger.clearTask(); + timer.update(); + } Review Comment: I think always clearing it in `finally` would mean that `lastPendingAsyncCommit` is cleared even though we timed out or were woken up while waiting for it. However, this brought up another issue - what happens when the async commit future completes exceptionally? We'd throw the exception here, but we shouldn't - the error will be handled inside the future. So basically here we want to wait for the async commit, not worrying about return value or exception. And then, the only cases why we fail here should be wake-up or time out, and in both cases, we should check again for the future to be completed the next time we trigger commit sync. ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ########## @@ -157,8 +157,10 @@ public class AsyncKafkaConsumerTest { @AfterEach public void resetAll() { backgroundEventQueue.clear(); - if (consumer != null) { + try { consumer.close(Duration.ZERO); + } catch (Exception e) { + // ignore Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org