showuon commented on a change in pull request #11340:
URL: https://github.com/apache/kafka/pull/11340#discussion_r713737164



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -66,15 +66,7 @@
 import org.slf4j.Logger;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Set;
+import java.util.*;

Review comment:
       Any reason you change to import all classes under `java.util`? I think 
we should import what we used in this class only.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -505,7 +497,7 @@ public boolean poll(Timer timer, boolean waitForJoinGroup) {
                 }
 
                 // if not wait for join group, we would just use a timer of 0
-                if (!ensureActiveGroup(waitForJoinGroup ? timer : 
time.timer(0L))) {
+                if (!ensureActiveGroup(waitForJoinGroup ? timer : 
time.timer(0L), timer)) {

Review comment:
       I don't think we should also pass `timer` when not `waitForJoinGroup` 
here. I think when not `waitForJoinGroup`, we should not do `onJoinPrepare`, so 
we should pass `time.timer(0L)`, too. What do you think?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -670,10 +662,11 @@ private void validateCooperativeAssignment(final 
Map<String, List<TopicPartition
     }
 
     @Override
-    protected void onJoinPrepare(int generation, String memberId) {
+    protected void onJoinPrepare(int generation, String memberId, final Timer 
pollTimer) {
         log.debug("Executing onJoinPrepare with generation {} and memberId 
{}", generation, memberId);
         // commit offsets prior to rebalance if auto-commit enabled
-        
maybeAutoCommitOffsetsSync(time.timer(rebalanceConfig.rebalanceTimeoutMs));
+        //The timer whose commitOffset timed out is no longer 
time.timer(rebalanceConfig.rebalanceTimeoutMs), and is changed to the timer 
passed by the customer
+        maybeAutoCommitOffsetsSync(pollTimer);

Review comment:
       Did we have any null handling for the `pollTimer`? Maybe we can pass the 
old `time.timer(rebalanceConfig.rebalanceTimeoutMs)` for null case here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to