lianetm commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1561902955


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1388,6 +1393,33 @@ public void commitSync(Map<TopicPartition, 
OffsetAndMetadata> offsets, Duration
         }
     }
 
+    private void awaitPendingAsyncCommitsAndExecuteCommitCallbacks(Timer 
timer, boolean disableWakeup) {
+        if (lastPendingAsyncCommit == null) {
+            return;
+        }
+
+        try {
+            final CompletableFuture<Void> futureToAwait = new 
CompletableFuture<>();
+            // We don't want the wake-up trigger to complete our pending async 
commit future,
+            // so create new future here. Any errors in the pending async 
commit will be handled
+            // by the async commit future / the commit callback - here, we 
just want to wait for it to complete.
+            lastPendingAsyncCommit.whenComplete((v, t) -> {

Review Comment:
   nit: we could loose the braces and inline this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to