mjsax commented on code in PR #17820:
URL: https://github.com/apache/kafka/pull/17820#discussion_r1843070888
##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1250,6 +1253,20 @@ private Optional<String> removeStreamThread(final long
timeoutMs) throws Timeout
return Optional.empty();
}
+ private int calculateMetricsRecordingLevel() {
+ final int recordingLevel;
+ final String recordingLevelString =
applicationConfigs.getString(METRICS_RECORDING_LEVEL_CONFIG);
+ if (recordingLevelString.equals("INFO")) {
+ recordingLevel = 0;
+ } else if (recordingLevelString.equals("DEBUG")) {
+ recordingLevel = 1;
+ } else {
+ // Must be TRACE level
Review Comment:
Might be better to use another `if` and final `else` throw an exception as
safe guard? Would also highlight that we need to update this code, in case we
add a new level?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -614,6 +615,12 @@ public StreamThread(final Time time,
streamsMetrics,
time.milliseconds()
);
+ ThreadMetrics.addThreadStateTelemetryMetric(threadId,
+ streamsMetrics,
+ (metricConfig, now) -> this.state().ordinal());
+ ThreadMetrics.addThreadStateMetric(threadId,
Review Comment:
as above
##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1250,6 +1253,20 @@ private Optional<String> removeStreamThread(final long
timeoutMs) throws Timeout
return Optional.empty();
}
+ private int calculateMetricsRecordingLevel() {
+ final int recordingLevel;
+ final String recordingLevelString =
applicationConfigs.getString(METRICS_RECORDING_LEVEL_CONFIG);
+ if (recordingLevelString.equals("INFO")) {
Review Comment:
Why not use a `switch` statement?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java:
##########
@@ -290,6 +295,30 @@ public static void addThreadStartTimeMetric(final String
threadId,
);
}
+ public static void addThreadStateTelemetryMetric(final String threadId,
+ final StreamsMetricsImpl
streamsMetrics,
+ final Gauge<Integer>
threadStateProvider) {
+ streamsMetrics.addThreadLevelMutableMetric(
+ THREAD_STATE,
Review Comment:
nit: indention too deep (should only be 4 whitespace, not 8 -- should be an
IDE setting)
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java:
##########
@@ -290,6 +295,30 @@ public static void addThreadStartTimeMetric(final String
threadId,
);
}
+ public static void addThreadStateTelemetryMetric(final String threadId,
+ final StreamsMetricsImpl
streamsMetrics,
+ final Gauge<Integer>
threadStateProvider) {
+ streamsMetrics.addThreadLevelMutableMetric(
+ THREAD_STATE,
+ THREAD_STATE_DESCRIPTION,
+ threadId,
+ threadStateProvider
+ );
+ }
+
+ public static void addThreadStateMetric(final String threadId,
+ final StreamsMetricsImpl
streamsMetrics,
+ final Gauge<String>
threadStateProvider) {
+ streamsMetrics.addThreadLevelMutableMetric(
+ STATE,
Review Comment:
as above
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -614,6 +615,12 @@ public StreamThread(final Time time,
streamsMetrics,
time.milliseconds()
);
+ ThreadMetrics.addThreadStateTelemetryMetric(threadId,
Review Comment:
nit: `threadId` should be in the next line by itself
##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java:
##########
@@ -180,15 +180,16 @@ public void shouldPushMetricsToBroker(final String
recordingLevel) throws Except
final String name = mn.name().replace('-', '.');
final String group = mn.group().replace("-metrics",
"").replace('-', '.');
return "org.apache.kafka." + group + "." + name;
- }).sorted().collect(Collectors.toList());
+ }).filter(name ->
!name.equals("org.apache.kafka.stream.thread.state"))// telemetry reporter
filters out string metrics
+ .sorted().collect(Collectors.toList());
final List<String> actualMetrics = new
ArrayList<>(TelemetryPlugin.SUBSCRIBED_METRICS.get(mainConsumerInstanceId));
assertEquals(expectedMetrics, actualMetrics);
TestUtils.waitForCondition(() ->
!TelemetryPlugin.SUBSCRIBED_METRICS.get(adminInstanceId).isEmpty(),
30_000,
"Never received subscribed metrics");
final List<String> actualInstanceMetrics =
TelemetryPlugin.SUBSCRIBED_METRICS.get(adminInstanceId);
- final List<String> expectedInstanceMetrics =
Arrays.asList("org.apache.kafka.stream.alive.stream.threads",
"org.apache.kafka.stream.failed.stream.threads");
+ final List<String> expectedInstanceMetrics =
Arrays.asList("org.apache.kafka.stream.alive.stream.threads",
"org.apache.kafka.stream.client.state",
"org.apache.kafka.stream.failed.stream.threads",
"org.apache.kafka.stream.recording.level");
Review Comment:
Nit: line too long. How about
```
final List<String> expectedInstanceMetrics = Arrays.asList(
"org.apache.kafka.stream.client.state",
"org.apache.kafka.stream.alive.stream.threads",
"org.apache.kafka.stream.failed.stream.threads",
"org.apache.kafka.stream.recording.level"
);
```
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -614,6 +615,12 @@ public StreamThread(final Time time,
streamsMetrics,
time.milliseconds()
);
+ ThreadMetrics.addThreadStateTelemetryMetric(threadId,
+ streamsMetrics,
+ (metricConfig, now) -> this.state().ordinal());
+ ThreadMetrics.addThreadStateMetric(threadId,
+ streamsMetrics,
+ (metricConfig, now) ->
this.state().name().toLowerCase(Locale.getDefault()));
Review Comment:
For the corresponding client metric we just use
```
ClientMetrics.addStateMetric(streamsMetrics, (metricsConfig, now) -> state);
```
Why so "complicated" -- I am also ok to update the code for the client
metric. But both should be the same?
(Or maybe keep `... -> state` and add a "fancy" `toString()` overload to
both `enum` (for client and thread) which model the state?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]