showuon commented on a change in pull request #11340:
URL: https://github.com/apache/kafka/pull/11340#discussion_r716160145



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -1069,6 +1074,37 @@ private void maybeAutoCommitOffsetsSync(Timer timer) {
         }
     }
 
+    private void cleanUpConsumedOffsets(Map<TopicPartition, OffsetAndMetadata> 
willCommitOffsets) {
+
+        if (willCommitOffsets.isEmpty())
+            return;
+
+        Set<String> validTopics = metadata.fetch().topics();
+        Set<TopicPartition> toGiveUpTopicPartitions = new HashSet<>();
+
+        Iterator<Map.Entry<TopicPartition, OffsetAndMetadata>> iterator = 
willCommitOffsets.entrySet().iterator();

Review comment:
       We only need the entry key, so it could be changed to 
`willCommitOffsets.keySet().iterator();`

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -343,7 +344,7 @@ protected synchronized long timeToNextHeartbeat(long now) {
      * Ensure that the group is active (i.e. joined and synced)
      */
     public void ensureActiveGroup() {
-        while (!ensureActiveGroup(time.timer(Long.MAX_VALUE))) {
+        while (!ensureActiveGroup(time.timer(Long.MAX_VALUE), null)) {

Review comment:
       Actually, I don't think the `null` here make sense. Now, we take `null` 
as `timer(0)` (one try), right? But in this case, we put `Long.MAX_VALE` to 
wait for a long time, I think we should still use the original 
`rebalanceTimeout` as timer in this case.

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
##########
@@ -1121,7 +1121,7 @@ public void testNormalJoinGroupFollower() {
                     sync.groupAssignments().isEmpty();
         }, syncGroupResponse(assigned, Errors.NONE));
 
-        coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE));
+        coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE),null);

Review comment:
       We should pass in a reasonable timer as the 2nd parameter (just like the 
1st parameter). Same comments to below.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -1054,6 +1056,9 @@ private void doAutoCommitOffsetsAsync() {
     private void maybeAutoCommitOffsetsSync(Timer timer) {
         if (autoCommitEnabled) {
             Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets = 
subscriptions.allConsumed();
+
+            cleanUpConsumedOffsets(allConsumedOffsets);

Review comment:
       Could we do this after we have `UnknownTopicOrPartitionException` 
happened? I think this issue is rarely happened, we can "lazily" clean it up. 
So, we can move this line into below `catch` block. (and need to add an 
`UnknownTopicOrPartitionException` case)

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -1069,6 +1074,37 @@ private void maybeAutoCommitOffsetsSync(Timer timer) {
         }
     }
 
+    private void cleanUpConsumedOffsets(Map<TopicPartition, OffsetAndMetadata> 
willCommitOffsets) {
+
+        if (willCommitOffsets.isEmpty())
+            return;
+
+        Set<String> validTopics = metadata.fetch().topics();
+        Set<TopicPartition> toGiveUpTopicPartitions = new HashSet<>();
+
+        Iterator<Map.Entry<TopicPartition, OffsetAndMetadata>> iterator = 
willCommitOffsets.entrySet().iterator();
+
+        while (iterator.hasNext()) {
+
+            Map.Entry<TopicPartition, OffsetAndMetadata> entry = 
iterator.next();
+
+            if (!validTopics.contains(entry.getKey().topic())) {
+
+                toGiveUpTopicPartitions.add(entry.getKey());

Review comment:
       I don't think we need extra `toGiveUpTopicPartitions` to store the 
partitions to be deleted. We can log the warning message in L1103 here directly

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -1069,6 +1074,37 @@ private void maybeAutoCommitOffsetsSync(Timer timer) {
         }
     }
 
+    private void cleanUpConsumedOffsets(Map<TopicPartition, OffsetAndMetadata> 
willCommitOffsets) {

Review comment:
       nit: Do you think the variable name `partitionOffsetsToBeCommited` is 
better than `willCommitOffsets`?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -670,10 +671,11 @@ private void validateCooperativeAssignment(final 
Map<String, List<TopicPartition
     }
 
     @Override
-    protected void onJoinPrepare(int generation, String memberId) {
+    protected void onJoinPrepare(int generation, String memberId, final Timer 
pollTimer) {
         log.debug("Executing onJoinPrepare with generation {} and memberId 
{}", generation, memberId);
         // commit offsets prior to rebalance if auto-commit enabled
-        
maybeAutoCommitOffsetsSync(time.timer(rebalanceConfig.rebalanceTimeoutMs));
+        //The timer whose commitOffset timed out is no longer 
time.timer(rebalanceConfig.rebalanceTimeoutMs), and is changed to the timer 
passed by the customer

Review comment:
       This comment is not necessary IMO.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -343,7 +344,7 @@ protected synchronized long timeToNextHeartbeat(long now) {
      * Ensure that the group is active (i.e. joined and synced)
      */
     public void ensureActiveGroup() {
-        while (!ensureActiveGroup(time.timer(Long.MAX_VALUE))) {
+        while (!ensureActiveGroup(time.timer(Long.MAX_VALUE), null)) {

Review comment:
       Could we pass in a timer here directly, instead of null? 

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -420,7 +423,7 @@ boolean joinGroupIfNeeded(final Timer timer) {
                 // need to set the flag before calling onJoinPrepare since the 
user callback may throw
                 // exception, in which case upon retry we should not retry 
onJoinPrepare either.
                 needsJoinPrepare = false;
-                onJoinPrepare(generation.generationId, generation.memberId);
+                onJoinPrepare(generation.generationId, generation.memberId, 
pollTimer == null ? time.timer(0L) : pollTimer);

Review comment:
       I'm thinking the `pollTimer` should not be null here, which makes sense 
more. We should pass a non-null timer into methods. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to