lianetm commented on code in PR #18737:
URL: https://github.com/apache/kafka/pull/18737#discussion_r1958289291
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -748,9 +748,14 @@ public ConsumerRecords<K, V> poll(final Duration timeout) {
}
do {
-
+ PollEvent event = new PollEvent(timer.currentTimeMs());
// Make sure to let the background thread know that we are
still polling.
- applicationEventHandler.add(new
PollEvent(timer.currentTimeMs()));
+ // This will trigger async auto-commits of consumed positions
when hitting
+ // the interval time or on partition revocation
+ applicationEventHandler.add(event);
+ // Wait for reconciliation and auto-commit to complete to
ensure all commit requests are processed
Review Comment:
```suggestion
// Wait for reconciliation and auto-commit to be triggered,
to ensure all commit requests retrieve the positions to commit
```
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -748,9 +748,14 @@ public ConsumerRecords<K, V> poll(final Duration timeout) {
}
do {
-
+ PollEvent event = new PollEvent(timer.currentTimeMs());
// Make sure to let the background thread know that we are
still polling.
- applicationEventHandler.add(new
PollEvent(timer.currentTimeMs()));
+ // This will trigger async auto-commits of consumed positions
when hitting
+ // the interval time or on partition revocation
Review Comment:
```suggestion
// the interval time or reconciling new assignments
```
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PollEvent.java:
##########
@@ -16,10 +16,26 @@
*/
package org.apache.kafka.clients.consumer.internals.events;
+import java.util.concurrent.CompletableFuture;
+
public class PollEvent extends ApplicationEvent {
private final long pollTimeMs;
+ /**
+ * A future that represents the completion of reconciliation and
auto-commit
+ * processing.
+ * This future is completed when all commit request generation points have
+ * been passed, including:
+ * <ul>
+ * <li>auto-commit on revocation</li>
+ * <li>auto-commit</li>
Review Comment:
```suggestion
* <li>auto-commit on rebalance</li>
* <li>auto-commit on the interval</li>
```
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -186,7 +186,6 @@ public NetworkClientDelegate.PollResult poll(final long
currentTimeMs) {
return drainPendingOffsetCommitRequests();
}
- maybeAutoCommitAsync();
Review Comment:
The java doc stayed behind after this change (still reads `The function will
also try to autocommit the offsets, if feature is enabled.`)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]