frankvicky commented on code in PR #18737:
URL: https://github.com/apache/kafka/pull/18737#discussion_r1938459944


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -324,7 +323,7 @@ public CompletableFuture<Void> 
maybeAutoCommitSyncBeforeRevocation(final long de
 
         CompletableFuture<Void> result = new CompletableFuture<>();
         OffsetCommitRequestState requestState =
-            createOffsetCommitRequest(subscriptions.allConsumed(), deadlineMs);
+            createOffsetCommitRequest(latestPartitionOffsets, deadlineMs);

Review Comment:
   Hi @junrao
   Thanks for the review.
   
   Yes, this is indeed a problem. Since assignment reconciliation is triggered 
from a different path (`ConsumerGroupHeartbeat`) and not in the normal user app 
consume loop, I think we could update `latestPartitionOffsets` in 
`ConsumerMembershipMananger#signalReconciliationStarted().` 
   
   In this way, we could get the following benefits:
   - Getting the latest `latestPartitionOffsets` after marking the revoked 
partition as `pendingRevocation`.
   - We could avoid `createOffsetCommitRequest` and 
`autoCommitSyncBeforeRevocationWithRetries` always invoking 
`subscription#allConsumed`, which will lead to the gap between the app thread 
and the background thread.
   
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to