showuon commented on a change in pull request #11340: URL: https://github.com/apache/kafka/pull/11340#discussion_r713737164
########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ########## @@ -66,15 +66,7 @@ import org.slf4j.Logger; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.Set; +import java.util.*; Review comment: Any reason you change to import all classes under `java.util`? I think we should import what we used in this class only. ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ########## @@ -505,7 +497,7 @@ public boolean poll(Timer timer, boolean waitForJoinGroup) { } // if not wait for join group, we would just use a timer of 0 - if (!ensureActiveGroup(waitForJoinGroup ? timer : time.timer(0L))) { + if (!ensureActiveGroup(waitForJoinGroup ? timer : time.timer(0L), timer)) { Review comment: I don't think we should also pass `timer` when not `waitForJoinGroup` here. I think when not `waitForJoinGroup`, we should not do `onJoinPrepare`, so we should pass `time.timer(0L)`, too. What do you think? ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ########## @@ -670,10 +662,11 @@ private void validateCooperativeAssignment(final Map<String, List<TopicPartition } @Override - protected void onJoinPrepare(int generation, String memberId) { + protected void onJoinPrepare(int generation, String memberId, final Timer pollTimer) { log.debug("Executing onJoinPrepare with generation {} and memberId {}", generation, memberId); // commit offsets prior to rebalance if auto-commit enabled - maybeAutoCommitOffsetsSync(time.timer(rebalanceConfig.rebalanceTimeoutMs)); + //The timer whose commitOffset timed out is no longer time.timer(rebalanceConfig.rebalanceTimeoutMs), and is changed to the timer passed by the customer + maybeAutoCommitOffsetsSync(pollTimer); Review comment: Did we have any null handling for the `pollTimer`? Maybe we can pass the old `time.timer(rebalanceConfig.rebalanceTimeoutMs)` for null case here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org