dajac commented on code in PR #13678:
URL: https://github.com/apache/kafka/pull/13678#discussion_r1190765038


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -186,6 +187,7 @@ public ConsumerCoordinator(GroupRebalanceConfig 
rebalanceConfig,
         this.completedOffsetCommits = new ConcurrentLinkedQueue<>();
         this.sensors = new ConsumerCoordinatorMetrics(metrics, 
metricGrpPrefix);
         this.interceptors = interceptors;
+        this.inFlightAsyncCommits = new AtomicInteger();
         this.pendingAsyncCommits = new AtomicInteger();

Review Comment:
   @philipnee @kirktrue @erikvanoosten I was looking at how we use 
`pendingAsyncCommits` in `close()`. My understanding is that we use it to wait 
in close until all the in-flight commit requests are done to ensure that their 
callbacks are called. Knowing this, isn't it weird that we decrement 
`pendingAsyncCommits` when we resolve the coordinator and not when the response 
of the offset commit is received? I may be missing something but this is not 
clear to me.
   
   I was looking into this because I was hoping that we could consolidate 
`inFlightAsyncCommits` and `pendingAsyncCommits`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to