bowenli-oai commented on code in PR #277:
URL: 
https://github.com/apache/flink-connector-kafka/pull/277#discussion_r3470452697


##########
flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceITTest.java:
##########
@@ -1243,6 +1246,12 @@ private Set<String> findMetrics(InMemoryReporter 
inMemoryReporter, String groupP
                     .collect(Collectors.toSet());
         }
 
+        private Set<String> findKafkaClusterMetrics(InMemoryReporter 
inMemoryReporter) {
+            return findMetrics(inMemoryReporter, 
DYNAMIC_KAFKA_SOURCE_METRIC_GROUP).stream()

Review Comment:
   Could we avoid selecting a single arbitrary `DynamicKafkaSource` group here? 
This PR registers `activeSplitCount` directly on the parent group, so 
`findGroup(DYNAMIC_KAFKA_SOURCE_METRIC_GROUP)` can return that parent; 
`getMetricsByGroup()` then contains only the root gauge, and the 
`.kafkaCluster.` filter yields an empty set. That is consistent with the 
current 2.2.1/JDK 21 failure at line 1125. Please aggregate 
`findGroups(...)`/all matching groups or explicitly select kafka-cluster child 
groups. The same single-group pattern in `hasKafkaClusterMetrics()` can also 
make disappearance checks pass prematurely.



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java:
##########
@@ -630,6 +643,20 @@ private void closeAllReadersAndClearState() {
         }
         clusterReaderMap.clear();
         clustersProperties.clear();
+        activeSplitCount.set(0);

Review Comment:
   Can we avoid publishing `0` from the intermediate clear step? A normal 
metadata rebuild can preserve assigned splits, but metric reporters read this 
`AtomicInteger` concurrently and can sample `0` after this line before 
`refreshActiveSplitCount()` publishes the rebuilt count at line 357. For an 
autoscaler-facing stable signal, that transient is indistinguishable from “all 
local splits removed.” Since this method’s only caller refreshes after 
rebuilding, it seems safer to publish only the final count and keep the 
explicit zero in `close()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to