This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new f99db0804e5 KAFKA-19275 client-state and thread-state metrics are 
always "Unavailable" (#19712)
f99db0804e5 is described below

commit f99db0804e52644b3e05aedb96d9209246f3fe60
Author: Kuan-Po Tseng <[email protected]>
AuthorDate: Wed May 14 14:07:32 2025 +0800

    KAFKA-19275 client-state and thread-state metrics are always "Unavailable" 
(#19712)
    
    Fix the issue where JMC is unable to correctly display client-state and
    thread-state metrics. The root cause is that these two metrics directly
    return the `State` class to JMX. If the user has not set up the RMI
    server, JMC or other monitoring tools will be unable to interpret the
    `State` class. To resolve this, we should return a string representation
    of the state instead of the State class in these two metrics.
    
    Reviewers: Luke Chen <[email protected]>, Ken Huang
     <[email protected]>, Chia-Ping Tsai <[email protected]>
---
 .../kafka/streams/integration/MetricsIntegrationTest.java  | 14 +++++++-------
 .../main/java/org/apache/kafka/streams/KafkaStreams.java   |  2 +-
 .../kafka/streams/internals/metrics/ClientMetrics.java     |  3 +--
 .../kafka/streams/processor/internals/StreamThread.java    |  2 +-
 .../streams/processor/internals/metrics/ThreadMetrics.java |  3 +--
 .../kafka/streams/internals/metrics/ClientMetricsTest.java |  2 +-
 .../processor/internals/metrics/ThreadMetricsTest.java     |  2 +-
 7 files changed, 13 insertions(+), 15 deletions(-)

diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
index c6dc962d6d6..a56723abc33 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
@@ -272,7 +272,7 @@ public class MetricsIntegrationTest {
         kafkaStreams = new KafkaStreams(topology, streamsConfiguration);
 
         verifyAliveStreamThreadsMetric();
-        verifyStateMetric(State.CREATED);
+        verifyStateMetric(State.CREATED.name());
         verifyTopologyDescriptionMetric(topology.describe().toString());
         verifyApplicationIdMetric();
 
@@ -283,7 +283,7 @@ public class MetricsIntegrationTest {
             () -> "Kafka Streams application did not reach state RUNNING in " 
+ timeout + " ms");
 
         verifyAliveStreamThreadsMetric();
-        verifyStateMetric(State.RUNNING);
+        verifyStateMetric(State.RUNNING.name());
     }
 
     private void produceRecordsForTwoSegments(final Duration segmentInterval) {
@@ -357,7 +357,7 @@ public class MetricsIntegrationTest {
             .to(STREAM_OUTPUT_4);
         startApplication();
 
-        verifyStateMetric(State.RUNNING);
+        verifyStateMetric(State.RUNNING.name());
         checkClientLevelMetrics();
         checkThreadLevelMetrics();
         checkTaskLevelMetrics();
@@ -392,7 +392,7 @@ public class MetricsIntegrationTest {
         produceRecordsForClosingWindow(windowSize);
         startApplication();
 
-        verifyStateMetric(State.RUNNING);
+        verifyStateMetric(State.RUNNING.name());
 
         checkWindowStoreAndSuppressionBufferMetrics();
 
@@ -421,7 +421,7 @@ public class MetricsIntegrationTest {
 
         startApplication();
 
-        verifyStateMetric(State.RUNNING);
+        verifyStateMetric(State.RUNNING.name());
 
         checkSessionStoreMetrics();
 
@@ -439,14 +439,14 @@ public class MetricsIntegrationTest {
         assertThat(metricsList.get(0).metricValue(), is(NUM_THREADS));
     }
 
-    private void verifyStateMetric(final State state) {
+    private void verifyStateMetric(final String state) {
         final List<Metric> metricsList = new 
ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
             .filter(m -> m.metricName().name().equals(STATE) &&
                 m.metricName().group().equals(STREAM_CLIENT_NODE_METRICS))
             .collect(Collectors.toList());
         assertThat(metricsList.size(), is(1));
         assertThat(metricsList.get(0).metricValue(), is(state));
-        assertThat(metricsList.get(0).metricValue().toString(), 
is(state.toString()));
+        assertThat(metricsList.get(0).metricValue().toString(), is(state));
     }
 
     private void verifyTopologyDescriptionMetric(final String 
topologyDescription) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 00e9ede261f..1fbf307426c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -983,7 +983,7 @@ public class KafkaStreams implements AutoCloseable {
         ClientMetrics.addCommitIdMetric(streamsMetrics);
         ClientMetrics.addApplicationIdMetric(streamsMetrics, 
applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG));
         ClientMetrics.addTopologyDescriptionMetric(streamsMetrics, 
(metricsConfig, now) -> this.topologyMetadata.topologyDescriptionString());
-        ClientMetrics.addStateMetric(streamsMetrics, (metricsConfig, now) -> 
state);
+        ClientMetrics.addStateMetric(streamsMetrics, (metricsConfig, now) -> 
state.name());
         ClientMetrics.addClientStateTelemetryMetric(streamsMetrics, 
(metricsConfig, now) -> state.ordinal());
         ClientMetrics.addClientRecordingLevelMetric(streamsMetrics, 
calculateMetricsRecordingLevel());
         threads = Collections.synchronizedList(new LinkedList<>());
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/internals/metrics/ClientMetrics.java
 
b/streams/src/main/java/org/apache/kafka/streams/internals/metrics/ClientMetrics.java
index 22e09042e16..21bac269d5a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/internals/metrics/ClientMetrics.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/internals/metrics/ClientMetrics.java
@@ -19,7 +19,6 @@ package org.apache.kafka.streams.internals.metrics;
 import org.apache.kafka.common.metrics.Gauge;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
-import org.apache.kafka.streams.KafkaStreams.State;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 
 import org.slf4j.Logger;
@@ -118,7 +117,7 @@ public class ClientMetrics {
     }
 
     public static void addStateMetric(final StreamsMetricsImpl streamsMetrics,
-                                      final Gauge<State> stateProvider) {
+                                      final Gauge<String> stateProvider) {
         streamsMetrics.addClientLevelMutableMetric(
             STATE,
             STATE_DESCRIPTION,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 2a83f0b6123..20d49b44dbd 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -628,7 +628,7 @@ public class StreamThread extends Thread implements 
ProcessingThread {
         ThreadMetrics.addThreadStateMetric(
             threadId,
             streamsMetrics,
-            (metricConfig, now) -> this.state());
+            (metricConfig, now) -> this.state().name());
         ThreadMetrics.addThreadBlockedTimeMetric(
             threadId,
             new StreamThreadTotalBlockedTime(
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java
index bddd27c5905..b45bde5ddd3 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java
@@ -19,7 +19,6 @@ package org.apache.kafka.streams.processor.internals.metrics;
 import org.apache.kafka.common.metrics.Gauge;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
-import org.apache.kafka.streams.processor.internals.StreamThread;
 import 
org.apache.kafka.streams.processor.internals.StreamThreadTotalBlockedTime;
 
 import java.util.Collections;
@@ -313,7 +312,7 @@ public class ThreadMetrics {
 
     public static void addThreadStateMetric(final String threadId,
                                             final StreamsMetricsImpl 
streamsMetrics,
-                                            final Gauge<StreamThread.State> 
threadStateProvider) {
+                                            final Gauge<String> 
threadStateProvider) {
         streamsMetrics.addThreadLevelMutableMetric(
             STATE,
             THREAD_STATE_DESCRIPTION,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/internals/metrics/ClientMetricsTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/internals/metrics/ClientMetricsTest.java
index 21e65ce892e..9142835b92c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/internals/metrics/ClientMetricsTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/internals/metrics/ClientMetricsTest.java
@@ -89,7 +89,7 @@ public class ClientMetricsTest {
     public void shouldAddStateMetric() {
         final String name = "state";
         final String description = "The state of the Kafka Streams client";
-        final Gauge<State> stateProvider = (config, now) -> State.RUNNING;
+        final Gauge<String> stateProvider = (config, now) -> 
State.RUNNING.name();
         setUpAndVerifyMutableMetric(
             name,
             description,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java
index a24a250c9e9..87891ab3897 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java
@@ -435,7 +435,7 @@ public class ThreadMetricsTest {
 
     @Test
     public void shouldAddThreadStateJmxMetric() {
-        final Gauge<StreamThread.State> threadStateProvider = (streamsMetrics, 
startTime) -> StreamThread.State.RUNNING;
+        final Gauge<String> threadStateProvider = (streamsMetrics, startTime) 
-> StreamThread.State.RUNNING.name();
         ThreadMetrics.addThreadStateMetric(
                 THREAD_ID,
                 streamsMetrics,

Reply via email to