kirktrue commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1559907295
########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ########## @@ -157,8 +157,10 @@ public class AsyncKafkaConsumerTest { @AfterEach public void resetAll() { backgroundEventQueue.clear(); - if (consumer != null) { + try { consumer.close(Duration.ZERO); + } catch (Exception e) { + // ignore Review Comment: I'm a little leery about swallowing the exception here. Can we validate the exception type is something we expect? e.g.: ```suggestion } catch (Exception e) { assertInstanceOf(KafkaException.class, e); ``` ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -1388,6 +1393,37 @@ public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets, Duration } } + private void awaitPendingAsyncCommitsAndExecuteCommitCallbacks(Timer timer, boolean disableWakeup) { + if (lastPendingAsyncCommit == null) { + return; + } + + try { + CompletableFuture<Void> futureToAwait; + if (!disableWakeup) { + // We don't want the wake-up trigger to complete our pending async commit future, + // so create new future here. + futureToAwait = new CompletableFuture<>(); + lastPendingAsyncCommit.whenComplete((v, t) -> { + if (t != null) { + futureToAwait.completeExceptionally(t); + } else { + futureToAwait.complete(v); + } + }); + wakeupTrigger.setActiveTask(futureToAwait); + } else { + futureToAwait = lastPendingAsyncCommit; + } + ConsumerUtils.getResult(futureToAwait, timer); + lastPendingAsyncCommit = null; + } finally { + if (!disableWakeup) wakeupTrigger.clearTask(); + timer.update(); + } Review Comment: Do we want to clear out the `lastPendingAsyncCommit` in the `finally` block: ```suggestion ConsumerUtils.getResult(futureToAwait, timer); } finally { lastPendingAsyncCommit = null; if (!disableWakeup) wakeupTrigger.clearTask(); timer.update(); } ``` ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -1952,10 +1988,6 @@ private void maybeThrowFencedInstanceException() { } } - private void maybeInvokeCommitCallbacks() { - offsetCommitCallbackInvoker.executeCallbacks(); - } - Review Comment: Any reason we don't want to keep this method abstraction? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org