This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new f99db0804e5 KAFKA-19275 client-state and thread-state metrics are
always "Unavailable" (#19712)
f99db0804e5 is described below
commit f99db0804e52644b3e05aedb96d9209246f3fe60
Author: Kuan-Po Tseng <[email protected]>
AuthorDate: Wed May 14 14:07:32 2025 +0800
KAFKA-19275 client-state and thread-state metrics are always "Unavailable"
(#19712)
Fix the issue where JMC is unable to correctly display client-state and
thread-state metrics. The root cause is that these two metrics directly
return the `State` class to JMX. If the user has not set up the RMI
server, JMC or other monitoring tools will be unable to interpret the
`State` class. To resolve this, we should return a string representation
of the state instead of the State class in these two metrics.
Reviewers: Luke Chen <[email protected]>, Ken Huang
<[email protected]>, Chia-Ping Tsai <[email protected]>
---
.../kafka/streams/integration/MetricsIntegrationTest.java | 14 +++++++-------
.../main/java/org/apache/kafka/streams/KafkaStreams.java | 2 +-
.../kafka/streams/internals/metrics/ClientMetrics.java | 3 +--
.../kafka/streams/processor/internals/StreamThread.java | 2 +-
.../streams/processor/internals/metrics/ThreadMetrics.java | 3 +--
.../kafka/streams/internals/metrics/ClientMetricsTest.java | 2 +-
.../processor/internals/metrics/ThreadMetricsTest.java | 2 +-
7 files changed, 13 insertions(+), 15 deletions(-)
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
index c6dc962d6d6..a56723abc33 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
@@ -272,7 +272,7 @@ public class MetricsIntegrationTest {
kafkaStreams = new KafkaStreams(topology, streamsConfiguration);
verifyAliveStreamThreadsMetric();
- verifyStateMetric(State.CREATED);
+ verifyStateMetric(State.CREATED.name());
verifyTopologyDescriptionMetric(topology.describe().toString());
verifyApplicationIdMetric();
@@ -283,7 +283,7 @@ public class MetricsIntegrationTest {
() -> "Kafka Streams application did not reach state RUNNING in "
+ timeout + " ms");
verifyAliveStreamThreadsMetric();
- verifyStateMetric(State.RUNNING);
+ verifyStateMetric(State.RUNNING.name());
}
private void produceRecordsForTwoSegments(final Duration segmentInterval) {
@@ -357,7 +357,7 @@ public class MetricsIntegrationTest {
.to(STREAM_OUTPUT_4);
startApplication();
- verifyStateMetric(State.RUNNING);
+ verifyStateMetric(State.RUNNING.name());
checkClientLevelMetrics();
checkThreadLevelMetrics();
checkTaskLevelMetrics();
@@ -392,7 +392,7 @@ public class MetricsIntegrationTest {
produceRecordsForClosingWindow(windowSize);
startApplication();
- verifyStateMetric(State.RUNNING);
+ verifyStateMetric(State.RUNNING.name());
checkWindowStoreAndSuppressionBufferMetrics();
@@ -421,7 +421,7 @@ public class MetricsIntegrationTest {
startApplication();
- verifyStateMetric(State.RUNNING);
+ verifyStateMetric(State.RUNNING.name());
checkSessionStoreMetrics();
@@ -439,14 +439,14 @@ public class MetricsIntegrationTest {
assertThat(metricsList.get(0).metricValue(), is(NUM_THREADS));
}
- private void verifyStateMetric(final State state) {
+ private void verifyStateMetric(final String state) {
final List<Metric> metricsList = new
ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
.filter(m -> m.metricName().name().equals(STATE) &&
m.metricName().group().equals(STREAM_CLIENT_NODE_METRICS))
.collect(Collectors.toList());
assertThat(metricsList.size(), is(1));
assertThat(metricsList.get(0).metricValue(), is(state));
- assertThat(metricsList.get(0).metricValue().toString(),
is(state.toString()));
+ assertThat(metricsList.get(0).metricValue().toString(), is(state));
}
private void verifyTopologyDescriptionMetric(final String
topologyDescription) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 00e9ede261f..1fbf307426c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -983,7 +983,7 @@ public class KafkaStreams implements AutoCloseable {
ClientMetrics.addCommitIdMetric(streamsMetrics);
ClientMetrics.addApplicationIdMetric(streamsMetrics,
applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG));
ClientMetrics.addTopologyDescriptionMetric(streamsMetrics,
(metricsConfig, now) -> this.topologyMetadata.topologyDescriptionString());
- ClientMetrics.addStateMetric(streamsMetrics, (metricsConfig, now) ->
state);
+ ClientMetrics.addStateMetric(streamsMetrics, (metricsConfig, now) ->
state.name());
ClientMetrics.addClientStateTelemetryMetric(streamsMetrics,
(metricsConfig, now) -> state.ordinal());
ClientMetrics.addClientRecordingLevelMetric(streamsMetrics,
calculateMetricsRecordingLevel());
threads = Collections.synchronizedList(new LinkedList<>());
diff --git
a/streams/src/main/java/org/apache/kafka/streams/internals/metrics/ClientMetrics.java
b/streams/src/main/java/org/apache/kafka/streams/internals/metrics/ClientMetrics.java
index 22e09042e16..21bac269d5a 100644
---
a/streams/src/main/java/org/apache/kafka/streams/internals/metrics/ClientMetrics.java
+++
b/streams/src/main/java/org/apache/kafka/streams/internals/metrics/ClientMetrics.java
@@ -19,7 +19,6 @@ package org.apache.kafka.streams.internals.metrics;
import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
-import org.apache.kafka.streams.KafkaStreams.State;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.slf4j.Logger;
@@ -118,7 +117,7 @@ public class ClientMetrics {
}
public static void addStateMetric(final StreamsMetricsImpl streamsMetrics,
- final Gauge<State> stateProvider) {
+ final Gauge<String> stateProvider) {
streamsMetrics.addClientLevelMutableMetric(
STATE,
STATE_DESCRIPTION,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 2a83f0b6123..20d49b44dbd 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -628,7 +628,7 @@ public class StreamThread extends Thread implements
ProcessingThread {
ThreadMetrics.addThreadStateMetric(
threadId,
streamsMetrics,
- (metricConfig, now) -> this.state());
+ (metricConfig, now) -> this.state().name());
ThreadMetrics.addThreadBlockedTimeMetric(
threadId,
new StreamThreadTotalBlockedTime(
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java
index bddd27c5905..b45bde5ddd3 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java
@@ -19,7 +19,6 @@ package org.apache.kafka.streams.processor.internals.metrics;
import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
-import org.apache.kafka.streams.processor.internals.StreamThread;
import
org.apache.kafka.streams.processor.internals.StreamThreadTotalBlockedTime;
import java.util.Collections;
@@ -313,7 +312,7 @@ public class ThreadMetrics {
public static void addThreadStateMetric(final String threadId,
final StreamsMetricsImpl
streamsMetrics,
- final Gauge<StreamThread.State>
threadStateProvider) {
+ final Gauge<String>
threadStateProvider) {
streamsMetrics.addThreadLevelMutableMetric(
STATE,
THREAD_STATE_DESCRIPTION,
diff --git
a/streams/src/test/java/org/apache/kafka/streams/internals/metrics/ClientMetricsTest.java
b/streams/src/test/java/org/apache/kafka/streams/internals/metrics/ClientMetricsTest.java
index 21e65ce892e..9142835b92c 100644
---
a/streams/src/test/java/org/apache/kafka/streams/internals/metrics/ClientMetricsTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/internals/metrics/ClientMetricsTest.java
@@ -89,7 +89,7 @@ public class ClientMetricsTest {
public void shouldAddStateMetric() {
final String name = "state";
final String description = "The state of the Kafka Streams client";
- final Gauge<State> stateProvider = (config, now) -> State.RUNNING;
+ final Gauge<String> stateProvider = (config, now) ->
State.RUNNING.name();
setUpAndVerifyMutableMetric(
name,
description,
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java
index a24a250c9e9..87891ab3897 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java
@@ -435,7 +435,7 @@ public class ThreadMetricsTest {
@Test
public void shouldAddThreadStateJmxMetric() {
- final Gauge<StreamThread.State> threadStateProvider = (streamsMetrics,
startTime) -> StreamThread.State.RUNNING;
+ final Gauge<String> threadStateProvider = (streamsMetrics, startTime)
-> StreamThread.State.RUNNING.name();
ThreadMetrics.addThreadStateMetric(
THREAD_ID,
streamsMetrics,