kirktrue commented on code in PR #15585:
URL: https://github.com/apache/kafka/pull/15585#discussion_r1539435022


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##########
@@ -550,6 +550,11 @@ public ConsumerGroupHeartbeatRequestData 
buildRequestData() {
             } else {
                 // SubscribedTopicRegex - only sent if has changed since the 
last heartbeat
                 //                      - not supported yet

Review Comment:
   This comment would go away, if we do end up keeping a separate `else` block.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##########
@@ -550,6 +550,11 @@ public ConsumerGroupHeartbeatRequestData 
buildRequestData() {
             } else {
                 // SubscribedTopicRegex - only sent if has changed since the 
last heartbeat
                 //                      - not supported yet
+                TreeSet<String> subscribedTopicNames = new 
TreeSet<>(this.subscriptions.subscription());
+                if (sendAllFields || 
!subscribedTopicNames.equals(sentFields.subscribedTopicNames)) {
+                    data.setSubscribedTopicNames(new 
ArrayList<>(this.subscriptions.subscription()));
+                    sentFields.subscribedTopicNames = subscribedTopicNames;
+                }

Review Comment:
   Thanks for catching this, @lianetm. I agree that if the logic is identical, 
there's no need for duplication. Any speculation as to the reason the `if` 
statement existed in the first place?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1667,6 +1667,9 @@ private void updateLastSeenEpochIfNewer(TopicPartition 
topicPartition, OffsetAnd
     public boolean updateAssignmentMetadataIfNeeded(Timer timer) {
         maybeThrowFencedInstanceException();
         maybeInvokeCommitCallbacks();
+        if (subscriptions.hasPatternSubscription()) {
+            updatePatternSubscription(metadata.fetch());
+        }

Review Comment:
   Good call, @lianetm.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1750,8 +1753,14 @@ private void subscribeInternal(Pattern pattern, 
Optional<ConsumerRebalanceListen
             throwIfNoAssignorsConfigured();
             log.info("Subscribed to pattern: '{}'", pattern);
             subscriptions.subscribe(pattern, listener);
-            updatePatternSubscription(metadata.fetch());
             metadata.requestUpdateForNewTopics();
+            Cluster cache = metadata.fetch();
+
+            while (cache == metadata.fetch()) {
+                log.info("Waiting for new metadata update");
+            }

Review Comment:
   @Phuc-Hong-Tran—why was this `while` loop added? Did you see issues in your 
testing, or were you preemptively wanting to make the code more robust?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1750,8 +1753,14 @@ private void subscribeInternal(Pattern pattern, 
Optional<ConsumerRebalanceListen
             throwIfNoAssignorsConfigured();
             log.info("Subscribed to pattern: '{}'", pattern);
             subscriptions.subscribe(pattern, listener);
-            updatePatternSubscription(metadata.fetch());
             metadata.requestUpdateForNewTopics();
+            Cluster cache = metadata.fetch();
+
+            while (cache == metadata.fetch()) {
+                log.info("Waiting for new metadata update");
+            }

Review Comment:
   I'm wondering why we call `metadata.requestUpdateForNewTopics` here when 
that's already in `updatePatternSubscription`? 🤔 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to