lianetm commented on code in PR #18737:
URL: https://github.com/apache/kafka/pull/18737#discussion_r1958555476


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -748,9 +748,14 @@ public ConsumerRecords<K, V> poll(final Duration timeout) {
             }
 
             do {
-
+                PollEvent event = new PollEvent(timer.currentTimeMs());
                 // Make sure to let the background thread know that we are 
still polling.
-                applicationEventHandler.add(new 
PollEvent(timer.currentTimeMs()));
+                // This will trigger async auto-commits of consumed positions 
when hitting
+                // the interval time or reconciling new assignments
+                applicationEventHandler.add(event);
+                // Wait for reconciliation and auto-commit to be triggered, to 
ensure all commit requests
+                // retrieve the positions to commit before proceeding with 
fetching new records
+                ConsumerUtils.getResult(event.reconcileAndAutoCommit());

Review Comment:
   should we pass the default api timeout here? Under normal execution, this 
will just complete right away (local actions in the background), but if for 
background failure the event  can't be processed, the consumer would hang here 
indefinitely (instead of timing out). Note that I suggest the default api 
timeout and not the timeout from param because we could have poll(ZERO), and 
that 0 shouldn't apply to the inter-thread communication, which is what we're 
doing here. Same for the blocking call we added for offsetsReady.
   
   We do this same approach in other api calls btw, ex. seek. Makes sense?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to