philipnee commented on code in PR #13678: URL: https://github.com/apache/kafka/pull/13678#discussion_r1191665112
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ########## @@ -1223,6 +1233,26 @@ public void maybeAutoCommitOffsetsAsync(long now) { } } + private boolean waitForPendingAsyncCommits(Timer timer) { Review Comment: the sendOffsetCommitRequest method basically 1. if the offset is empty, complete the future and return immediately, 2. if there's no coordinator `checkAndGetCoordinator` then complete the future with exception (like completeExceptionally) and return, otherwise 3. create the request data and send it `client.send(coordinator, builder).compose(new OffsetCommitResponseHandler(offsets, generation));` What I'm suggesting is not to change that method, but to change the `commitOffsetsSync`. My suggestions are: Currently, it returns immediately if the offset is empty. But we don't want that, because we also want to check if there's any inflightAsyncCommits. Now, if the we can't return immediately, we will need to send these commits, and the requirement is to `coordinatorUnknownAndUnreadySync`. This check is already in place, so I think you just need to try to send the async commit after this check. So there's not very much code change there :) The main concern me and David have is managing 2 atomic int, because at certain point one might forget to update one of it and causes some weird bug. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org