philipnee commented on code in PR #13678:
URL: https://github.com/apache/kafka/pull/13678#discussion_r1191665112


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -1223,6 +1233,26 @@ public void maybeAutoCommitOffsetsAsync(long now) {
         }
     }
 
+    private boolean waitForPendingAsyncCommits(Timer timer) {

Review Comment:
   the sendOffsetCommitRequest method basically 1. if the offset is empty, 
complete the future and return immediately, 2. if there's no coordinator 
`checkAndGetCoordinator` then complete the future with exception (like 
completeExceptionally) and return, otherwise 3. create the request data and 
send it `client.send(coordinator, builder).compose(new 
OffsetCommitResponseHandler(offsets, generation));` 
   
   What I'm suggesting is not to change that method, but to change the 
`commitOffsetsSync`.  My suggestions are:
   
   Currently, it returns immediately if the offset is empty. But we don't want 
that, because we also want to check if there's any inflightAsyncCommits. 
   
   Now, if the we can't return immediately, we will need to send these commits, 
and the requirement is to `coordinatorUnknownAndUnreadySync`.  This check is 
already in place, so I think you just need to try to send the async commit 
after this check.
   
   So there's not very much code change there :) 
   
   The main concern me and David have is managing 2 atomic int, because at 
certain point one might forget to update one of it and causes some weird bug.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to