jeffkbkim commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1245682672


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -728,6 +794,81 @@ public void replay(
             }
             consumerGroup.removeMember(memberId);
         }
+
+        updateGroupsByTopics(groupId, oldSubscribedTopicNames, 
consumerGroup.subscribedTopicNames());

Review Comment:
   how does `groupsByTopics` (and `groups`) know that the changes made here are 
already committed (and won't be reverted)?
   
   i think i'm confused because in api handling (i.e. consumer group heartbeat) 
once we modify the timeline data structures we generate records to commit the 
offset in the timeline but here we do it in reverse



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -423,6 +456,47 @@ public Map<String, TopicMetadata> 
computeSubscriptionMetadata(
         return Collections.unmodifiableMap(newSubscriptionMetadata);
     }
 
+    /**
+     * Updates the metadata refresh deadline.
+     *
+     * @param deadlineMs The next time in milliseconds.
+     * @param groupEpoch The associated group epoch.
+     */
+    public void setMetadataRefreshDeadline(
+        long deadlineMs,
+        int groupEpoch
+    ) {
+        this.metadataRefreshDeadline = new DeadlineAndEpoch(deadlineMs, 
groupEpoch);
+    }
+
+    /**
+     * Requests a metadata refresh.
+     */
+    public void requestMetadataRefresh() {
+        this.metadataRefreshDeadline = DeadlineAndEpoch.EMPTY;
+    }
+
+    /**
+     * Checks if a metadata refresh is required. A refresh is required in two 
cases:
+     * 1) The deadline is smaller or equals to the current time;

Review Comment:
   nit: "or equal to"



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -423,6 +456,47 @@ public Map<String, TopicMetadata> 
computeSubscriptionMetadata(
         return Collections.unmodifiableMap(newSubscriptionMetadata);
     }
 
+    /**
+     * Updates the next metadata refresh time.
+     *
+     * @param nextTimeMs The next time in milliseconds.
+     * @param groupEpoch The associated group epoch.
+     */
+    public void setNextMetadataRefreshTime(
+        long nextTimeMs,
+        int groupEpoch
+    ) {
+        this.nextMetadataRefreshTime = new TimeAndEpoch(nextTimeMs, 
groupEpoch);
+    }
+
+    /**
+     * Resets the next metadata refresh.
+     */
+    public void resetNextMetadataRefreshTime() {
+        this.nextMetadataRefreshTime = TimeAndEpoch.EMPTY;
+    }
+
+    /**
+     * Checks if a metadata refresh is required. A refresh is required in two 
cases:
+     * 1) The next update time is smaller or equals to the current time;
+     * 2) The group epoch associated with the next update time is smaller than

Review Comment:
   shouldn't it be "associated with the next update time is larger than"?
   
   the "current group epoch" is `groupEpoch()` right



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -728,6 +794,81 @@ public void replay(
             }
             consumerGroup.removeMember(memberId);
         }
+
+        updateGroupsByTopics(groupId, oldSubscribedTopicNames, 
consumerGroup.subscribedTopicNames());
+    }
+
+    /**
+     * @return The set of groups subscribed to the topic.
+     */
+    public Set<String> groupsSubscribedToTopic(String topicName) {
+        Set<String> groups = groupsByTopics.get(topicName);

Review Comment:
   any reason we don't use `getOrDefault()`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to