lianetm commented on code in PR #18737:
URL: https://github.com/apache/kafka/pull/18737#discussion_r1958289291


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -748,9 +748,14 @@ public ConsumerRecords<K, V> poll(final Duration timeout) {
             }
 
             do {
-
+                PollEvent event = new PollEvent(timer.currentTimeMs());
                 // Make sure to let the background thread know that we are 
still polling.
-                applicationEventHandler.add(new 
PollEvent(timer.currentTimeMs()));
+                // This will trigger async auto-commits of consumed positions 
when hitting
+                // the interval time or on partition revocation
+                applicationEventHandler.add(event);
+                // Wait for reconciliation and auto-commit to complete to 
ensure all commit requests are processed

Review Comment:
   ```suggestion
                   // Wait for reconciliation and auto-commit to be triggered, 
to ensure all commit requests retrieve the positions to commit
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -748,9 +748,14 @@ public ConsumerRecords<K, V> poll(final Duration timeout) {
             }
 
             do {
-
+                PollEvent event = new PollEvent(timer.currentTimeMs());
                 // Make sure to let the background thread know that we are 
still polling.
-                applicationEventHandler.add(new 
PollEvent(timer.currentTimeMs()));
+                // This will trigger async auto-commits of consumed positions 
when hitting
+                // the interval time or on partition revocation

Review Comment:
   ```suggestion
                   // the interval time or reconciling new assignments
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PollEvent.java:
##########
@@ -16,10 +16,26 @@
  */
 package org.apache.kafka.clients.consumer.internals.events;
 
+import java.util.concurrent.CompletableFuture;
+
 public class PollEvent extends ApplicationEvent {
 
     private final long pollTimeMs;
 
+    /**
+     * A future that represents the completion of reconciliation and 
auto-commit
+     * processing.
+     * This future is completed when all commit request generation points have
+     * been passed, including:
+     * <ul>
+     *   <li>auto-commit on revocation</li>
+     *   <li>auto-commit</li>

Review Comment:
   ```suggestion
        *   <li>auto-commit on rebalance</li>
        *   <li>auto-commit on the interval</li>
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -186,7 +186,6 @@ public NetworkClientDelegate.PollResult poll(final long 
currentTimeMs) {
             return drainPendingOffsetCommitRequests();
         }
 
-        maybeAutoCommitAsync();

Review Comment:
   The java doc stayed behind after this change (still reads `The function will 
also try to autocommit the offsets, if feature is enabled.`)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to