junrao commented on code in PR #18737:
URL: https://github.com/apache/kafka/pull/18737#discussion_r1945291682
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -414,7 +415,15 @@ private void process(final ResetOffsetEvent event) {
*/
private void process(final CheckAndUpdatePositionsEvent event) {
CompletableFuture<Boolean> future =
requestManagers.offsetsRequestManager.updateFetchPositions(event.deadlineMs());
- future.whenComplete(complete(event.future()));
+ future.whenComplete((value, exception) -> {
+ if (exception != null)
+ event.future().completeExceptionally(exception);
+ else {
+
requestManagers.commitRequestManager.ifPresent(commitRequestManager ->
+
commitRequestManager.setLatestPartitionOffsets(subscriptions.allConsumed()));
Review Comment:
When the user calls `commitAsync()` with no offsets, we append a
`SyncCommitEvent` to the background thread without waiting for the processing
to complete. The background will eventually call `subscriptions.allConsumed` to
grab the offsets to commit. However, since this is done while the application
thread is running, it's possible that some of the committed offsets have not
been actually fully consumed by the application.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -637,6 +638,15 @@ private void maybeUpdateLastSeenEpochIfNewer(final
Map<TopicPartition, OffsetAnd
});
}
+ public Map<TopicPartition, OffsetAndMetadata> latestPartitionOffsets() {
Review Comment:
This seems unused?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]