lianetm commented on code in PR #18737:
URL: https://github.com/apache/kafka/pull/18737#discussion_r1949460839


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -846,9 +845,18 @@ private CompletableFuture<Map<TopicPartition, 
OffsetAndMetadata>> commit(final C
         }
 
         applicationEventHandler.add(commitEvent);
+        awaitOffsetsReady(commitEvent.offsetsReady());
         return commitEvent.future();
     }
 
+    private void awaitOffsetsReady(CompletableFuture<Void> offsetsReady) {

Review Comment:
   I would add a java doc here to show that this will block until the 
background thread has retrieved the allConsumed positions to commit if no 
offsets have been specified. This will ensure that the offsets to commit are 
not affected by fetches that may start after this. 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -846,9 +845,18 @@ private CompletableFuture<Map<TopicPartition, 
OffsetAndMetadata>> commit(final C
         }
 
         applicationEventHandler.add(commitEvent);
+        awaitOffsetsReady(commitEvent.offsetsReady());
         return commitEvent.future();
     }
 
+    private void awaitOffsetsReady(CompletableFuture<Void> offsetsReady) {
+        try {
+            offsetsReady.get();
+        } catch (Throwable e) {
+            throw new KafkaException(e);
+        }

Review Comment:
   should we reuse the existing `ConsumerUtils.getResult(future);` ? (and if we 
do then maybe simpler to just call it directly and remove this func?) 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java:
##########
@@ -1384,7 +1386,7 @@ public void registerStateListener(MemberStateListener 
listener) {
      */
     public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
         if (state == MemberState.RECONCILING) {

Review Comment:
   should we move this check inside the `maybeReconcile`, now that it's called 
from other places too? 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -206,8 +207,12 @@ public void process(ApplicationEvent event) {
     }
 
     private void process(final PollEvent event) {
+        // In order to ensure certain positions before reconciliation, we only 
trigger full process of reconcile by PollEvent
+        
requestManagers.consumerMembershipManager.ifPresent(consumerMembershipManager ->
+            consumerMembershipManager.maybeReconcile(true));

Review Comment:
   this means that we will only trigger the commit before revocation from here 
(before any fetch starts). So do we still need to use a snapshot to 
`maybeAutoCommitSyncBeforeRevocation`? or could simplify and trust the 
`subscriptions.allConsumed` as we used to?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -234,7 +239,10 @@ private void process(final AsyncCommitEvent event) {
 
         try {
             CommitRequestManager manager = 
requestManagers.commitRequestManager.get();
-            CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = 
manager.commitAsync(event.offsets());
+            Optional<Map<TopicPartition, OffsetAndMetadata>> offsets = 
event.offsets().isEmpty() ?
+                Optional.of(subscriptions.allConsumed()) : event.offsets();
+            event.markOffsetsReady();

Review Comment:
   I think one improvement is that we need to make this consistent for both 
commits, sync and async. They both need to retrieve the `allConsumed` if the 
`event.offsets` is empty, and `markOffsetsReady` (this is key, the app thread 
will block on this for all commits, and we're not completing it on the sync 
path it seems).
   
   Let's please add unit tests to ensure that processing SyncCommitEvent and 
AsyncCommitEvents completes the offsetsReady future no matter what. 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java:
##########
@@ -818,6 +818,8 @@ void maybeReconcile() {
             return;
         }
 
+        if (!canCommit) return;

Review Comment:
   should we 
   ```
     if (autoCommitEnabled  && !canCommit) return; 
   ```
   Seems much clearer why we cannot start to reconcile, and less error prone 
(to avoid shortcircuiting here by mistake) 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -267,7 +268,7 @@ private CompletableFuture<Map<TopicPartition, 
OffsetAndMetadata>> requestAutoCom
     public void maybeAutoCommitAsync() {
         if (autoCommitEnabled() && autoCommitState.get().shouldAutoCommit()) {
             OffsetCommitRequestState requestState = createOffsetCommitRequest(
-                subscriptions.allConsumed(),
+                latestPartitionOffsets,

Review Comment:
   I guess here we could have the same situation @junrao described for the 
commit before revocation right below:
   
   > However, when we get here, it's possible that a batch of records have just 
been returned to the application thread before the first step, but those 
records haven't been processed yet. So latestPartitionOffsets is not up to date 
yet. We need to wait for the next setLatestPartitionOffsets() call to happen. 
At that point, we know any record returned to the application will have been 
processed and no more records can be given to the application. So, it's safe to 
commit the offset at that point.
   
   I wonder if we should fix here in the same way we're tackling the commit 
before revocation, by fixing **when** we auto-commit, not **what** we 
auto-commit? 
   
   Before this PR, both auto-commits (on the interval and before revocation) 
were triggered freely in the background thread when polling the managers 
(opening door for race with fetch). If we move to triggering those commits 
before fetching (when processing `PollEvent`), wouldn't we solve the 
correctness/race issue, without needing to keep any snapshot of the 
`subscriptions.allConsumed`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to