lucasbru commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1560885552


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1952,10 +1988,6 @@ private void maybeThrowFencedInstanceException() {
         }
     }
 
-    private void maybeInvokeCommitCallbacks() {
-        offsetCommitCallbackInvoker.executeCallbacks();
-    }
-

Review Comment:
   For me, abstracting this one-liner is more obfuscating than it is helping, 
but if you insist, I can bring it back.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1388,6 +1393,37 @@ public void commitSync(Map<TopicPartition, 
OffsetAndMetadata> offsets, Duration
         }
     }
 
+    private void awaitPendingAsyncCommitsAndExecuteCommitCallbacks(Timer 
timer, boolean disableWakeup) {
+        if (lastPendingAsyncCommit == null) {
+            return;
+        }
+
+        try {
+            CompletableFuture<Void> futureToAwait;
+            if (!disableWakeup) {
+                // We don't want the wake-up trigger to complete our pending 
async commit future,
+                // so create new future here.
+                futureToAwait = new CompletableFuture<>();
+                lastPendingAsyncCommit.whenComplete((v, t) -> {
+                    if (t != null) {
+                        futureToAwait.completeExceptionally(t);
+                    } else {
+                        futureToAwait.complete(v);
+                    }
+                });
+                wakeupTrigger.setActiveTask(futureToAwait);
+            } else {
+                futureToAwait = lastPendingAsyncCommit;
+            }
+            ConsumerUtils.getResult(futureToAwait, timer);
+            lastPendingAsyncCommit = null;
+        } finally {
+            if (!disableWakeup) wakeupTrigger.clearTask();
+            timer.update();
+        }

Review Comment:
   I think always clearing it in `finally` would mean that 
`lastPendingAsyncCommit` is cleared even though we timed out or were woken up 
while waiting for it.
   
   However, this brought up another issue - what happens when the async commit 
future completes exceptionally? We'd throw the exception here, but we shouldn't 
- the error will be handled inside the future. So basically here we want to 
wait for the async commit, not worrying about return value or exception. And 
then, the only cases why we fail here should be wake-up or time out, and in 
both cases, we should check again for the future to be completed the next time 
we trigger commit sync.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -157,8 +157,10 @@ public class AsyncKafkaConsumerTest {
     @AfterEach
     public void resetAll() {
         backgroundEventQueue.clear();
-        if (consumer != null) {
+        try {
             consumer.close(Duration.ZERO);
+        } catch (Exception e) {
+            // ignore

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to