see-quick commented on code in PR #20672:
URL: https://github.com/apache/kafka/pull/20672#discussion_r2480681973


##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryBase.java:
##########
@@ -249,28 +277,68 @@ public void shouldPushMetricsToBroker(final String 
recordingLevel, final String
                         return "org.apache.kafka." + group + "." + name;
                     }).filter(name -> 
!name.equals("org.apache.kafka.stream.thread.state"))// telemetry reporter 
filters out string metrics
                     .sorted().toList();
-            final List<String> actualMetrics = new 
ArrayList<>(TelemetryPlugin.SUBSCRIBED_METRICS.get(mainConsumerInstanceId));
+            final List<String> actualMetrics = new 
ArrayList<>(getSubscribedMetricsMap().get(mainConsumerInstanceId));
             assertEquals(expectedMetrics, actualMetrics);
 
             TestUtils.waitForCondition(
-                () -> 
!TelemetryPlugin.SUBSCRIBED_METRICS.getOrDefault(adminInstanceId, 
Collections.emptyList()).isEmpty(),
+                () -> !getSubscribedMetricsMap().getOrDefault(adminInstanceId, 
Collections.emptyList()).isEmpty(),
                 30_000,
                 "Never received subscribed metrics"
             );
-            final List<String> actualInstanceMetrics = 
TelemetryPlugin.SUBSCRIBED_METRICS.get(adminInstanceId);
+            final List<String> actualInstanceMetrics = 
getSubscribedMetricsMap().get(adminInstanceId);
             final List<String> expectedInstanceMetrics = Arrays.asList(
                 "org.apache.kafka.stream.alive.stream.threads",
                 "org.apache.kafka.stream.client.state",
                 "org.apache.kafka.stream.failed.stream.threads",
                 "org.apache.kafka.stream.recording.level");
-            
+
             assertEquals(expectedInstanceMetrics, actualInstanceMetrics);
 
-            TestUtils.waitForCondition(() -> TelemetryPlugin.processId != null,
+            TestUtils.waitForCondition(() -> getProcessId() != null,
                     30_000,
                     "Never received the process id");
 
-            assertEquals(expectedProcessId, TelemetryPlugin.processId);
+            assertEquals(expectedProcessId, getProcessId());
+        }
+    }
+
+    @Test
+    public void shouldReceivePushInterval() throws Exception {

Review Comment:
   > I think that just switching the integration tests to use the new APIs is 
probably enough here.
   
   Okay. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to