frankvicky commented on code in PR #18737:
URL: https://github.com/apache/kafka/pull/18737#discussion_r1938459944
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -324,7 +323,7 @@ public CompletableFuture<Void>
maybeAutoCommitSyncBeforeRevocation(final long de
CompletableFuture<Void> result = new CompletableFuture<>();
OffsetCommitRequestState requestState =
- createOffsetCommitRequest(subscriptions.allConsumed(), deadlineMs);
+ createOffsetCommitRequest(latestPartitionOffsets, deadlineMs);
Review Comment:
Hi @junrao
Thanks for the review.
Yes, this is indeed a problem. Since assignment reconciliation is triggered
from a different path (`ConsumerGroupHeartbeat`) and not in the normal user app
consume loop, I think we could update `latestPartitionOffsets` in
`ConsumerMembershipMananger#signalReconciliationStarted().`
In this way, we could get the following benefits:
- Getting the latest `latestPartitionOffsets` after marking the revoked
partition as `pendingRevocation`.
- We could avoid `createOffsetCommitRequest` and
`autoCommitSyncBeforeRevocationWithRetries` always invoking
`subscription#allConsumed`, which will lead to the gap between the app thread
and the background thread.
WDYT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]