bbejeck commented on code in PR #18953:
URL: https://github.com/apache/kafka/pull/18953#discussion_r1960453578
##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java:
##########
@@ -259,10 +317,10 @@ public void
shouldPassCorrectMetricsDynamicInstances(final boolean stateUpdaterE
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streamsOne);
final List<MetricName> streamsTaskMetricNames =
streamsOne.metrics().values().stream().map(Metric::metricName)
- .filter(metricName ->
metricName.tags().containsKey("task-id")).collect(Collectors.toList());
+ .filter(metricName ->
metricName.tags().containsKey("task-id")).toList();
Review Comment:
side cleanup
##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java:
##########
@@ -144,6 +157,51 @@ public void tearDown() throws Exception {
if (!streamsSecondApplicationProperties.isEmpty()) {
IntegrationTestUtils.purgeLocalStreamsState(streamsSecondApplicationProperties);
}
+ if (globalStoreIterator != null) {
+ globalStoreIterator.close();
+ }
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {"INFO", "DEBUG", "TRACE"})
+ public void shouldPushGlobalThreadMetricsToBroker(final String
recordingLevel) throws Exception {
Review Comment:
New test for validating broker plugin emits the global thread metrics. I
thought of updating the existing broker plugin test, but it's busy enough as is.
##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java:
##########
@@ -525,7 +619,7 @@ public void exportMetrics(final AuthorizableRequestContext
context, final Client
.stream()
.flatMap(rm -> rm.getScopeMetricsList().stream())
.flatMap(sm -> sm.getMetricsList().stream())
- .map(metric -> metric.getGauge())
+
.map(org.apache.kafka.shaded.io.opentelemetry.proto.metrics.v1.Metric::getGauge)
Review Comment:
side cleanup as pointed out by IntelliJ not sure using a method handle here
and below is better than using a lambda in this case due to needing the fully
qualified name
##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java:
##########
@@ -144,6 +157,51 @@ public void tearDown() throws Exception {
if (!streamsSecondApplicationProperties.isEmpty()) {
IntegrationTestUtils.purgeLocalStreamsState(streamsSecondApplicationProperties);
}
+ if (globalStoreIterator != null) {
+ globalStoreIterator.close();
Review Comment:
Required from test - explaination below adding the global store
##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java:
##########
@@ -215,21 +273,21 @@ public void shouldPushMetricsToBroker(final String
recordingLevel) throws Except
public void shouldPassMetrics(final String topologyType, final boolean
stateUpdaterEnabled) throws Exception {
// Streams metrics should get passed to Admin and Consumer
streamsApplicationProperties = props(stateUpdaterEnabled);
- final Topology topology = topologyType.equals("simple") ?
simpleTopology() : complexTopology();
+ final Topology topology = topologyType.equals("simple") ?
simpleTopology(false) : complexTopology();
try (final KafkaStreams streams = new KafkaStreams(topology,
streamsApplicationProperties)) {
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
final List<MetricName> streamsThreadMetrics =
streams.metrics().values().stream().map(Metric::metricName)
- .filter(metricName ->
metricName.tags().containsKey("thread-id")).collect(Collectors.toList());
+ .filter(metricName ->
metricName.tags().containsKey("thread-id")).toList();
Review Comment:
side cleanup
##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java:
##########
@@ -215,21 +273,21 @@ public void shouldPushMetricsToBroker(final String
recordingLevel) throws Except
public void shouldPassMetrics(final String topologyType, final boolean
stateUpdaterEnabled) throws Exception {
// Streams metrics should get passed to Admin and Consumer
streamsApplicationProperties = props(stateUpdaterEnabled);
- final Topology topology = topologyType.equals("simple") ?
simpleTopology() : complexTopology();
+ final Topology topology = topologyType.equals("simple") ?
simpleTopology(false) : complexTopology();
try (final KafkaStreams streams = new KafkaStreams(topology,
streamsApplicationProperties)) {
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
final List<MetricName> streamsThreadMetrics =
streams.metrics().values().stream().map(Metric::metricName)
- .filter(metricName ->
metricName.tags().containsKey("thread-id")).collect(Collectors.toList());
+ .filter(metricName ->
metricName.tags().containsKey("thread-id")).toList();
final List<MetricName> streamsClientMetrics =
streams.metrics().values().stream().map(Metric::metricName)
- .filter(metricName ->
metricName.group().equals("stream-metrics")).collect(Collectors.toList());
+ .filter(metricName ->
metricName.group().equals("stream-metrics")).toList();
Review Comment:
side cleanup
##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java:
##########
@@ -350,10 +408,10 @@ public void passedMetricsShouldNotLeakIntoClientMetrics()
throws Exception {
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
final List<MetricName> streamsThreadMetrics =
streams.metrics().values().stream().map(Metric::metricName)
- .filter(metricName ->
metricName.tags().containsKey("thread-id")).collect(Collectors.toList());
+ .filter(metricName ->
metricName.tags().containsKey("thread-id")).toList();
Review Comment:
side cleanup here and the next
##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java:
##########
@@ -293,24 +351,24 @@ public void
shouldPassCorrectMetricsDynamicInstances(final boolean stateUpdaterE
);
final List<MetricName> streamsOneTaskMetrics =
streamsOne.metrics().values().stream().map(Metric::metricName)
- .filter(metricName ->
metricName.tags().containsKey("task-id")).collect(Collectors.toList());
+ .filter(metricName ->
metricName.tags().containsKey("task-id")).toList();
final List<MetricName> streamsOneStateMetrics =
streamsOne.metrics().values().stream().map(Metric::metricName)
- .filter(metricName ->
metricName.group().equals("stream-state-metrics")).collect(Collectors.toList());
+ .filter(metricName ->
metricName.group().equals("stream-state-metrics")).toList();
final List<MetricName> consumerOnePassedTaskMetrics =
INTERCEPTING_CONSUMERS.get(FIRST_INSTANCE_CLIENT)
-
.passedMetrics.stream().map(KafkaMetric::metricName).filter(metricName ->
metricName.tags().containsKey("task-id")).collect(Collectors.toList());
+
.passedMetrics.stream().map(KafkaMetric::metricName).filter(metricName ->
metricName.tags().containsKey("task-id")).toList();
final List<MetricName> consumerOnePassedStateMetrics =
INTERCEPTING_CONSUMERS.get(FIRST_INSTANCE_CLIENT)
-
.passedMetrics.stream().map(KafkaMetric::metricName).filter(metricName ->
metricName.group().equals("stream-state-metrics")).collect(Collectors.toList());
+
.passedMetrics.stream().map(KafkaMetric::metricName).filter(metricName ->
metricName.group().equals("stream-state-metrics")).toList();
final List<MetricName> streamsTwoTaskMetrics =
streamsTwo.metrics().values().stream().map(Metric::metricName)
- .filter(metricName ->
metricName.tags().containsKey("task-id")).collect(Collectors.toList());
+ .filter(metricName ->
metricName.tags().containsKey("task-id")).toList();
final List<MetricName> streamsTwoStateMetrics =
streamsTwo.metrics().values().stream().map(Metric::metricName)
- .filter(metricName ->
metricName.group().equals("stream-state-metrics")).collect(Collectors.toList());
+ .filter(metricName ->
metricName.group().equals("stream-state-metrics")).toList();
final List<MetricName> consumerTwoPassedTaskMetrics =
INTERCEPTING_CONSUMERS.get(SECOND_INSTANCE_CLIENT)
-
.passedMetrics.stream().map(KafkaMetric::metricName).filter(metricName ->
metricName.tags().containsKey("task-id")).collect(Collectors.toList());
+
.passedMetrics.stream().map(KafkaMetric::metricName).filter(metricName ->
metricName.tags().containsKey("task-id")).toList();
final List<MetricName> consumerTwoPassedStateMetrics =
INTERCEPTING_CONSUMERS.get(SECOND_INSTANCE_CLIENT)
-
.passedMetrics.stream().map(KafkaMetric::metricName).filter(metricName ->
metricName.group().equals("stream-state-metrics")).collect(Collectors.toList());
+
.passedMetrics.stream().map(KafkaMetric::metricName).filter(metricName ->
metricName.group().equals("stream-state-metrics")).toList();
Review Comment:
side cleanup
##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java:
##########
@@ -259,10 +317,10 @@ public void
shouldPassCorrectMetricsDynamicInstances(final boolean stateUpdaterE
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streamsOne);
final List<MetricName> streamsTaskMetricNames =
streamsOne.metrics().values().stream().map(Metric::metricName)
- .filter(metricName ->
metricName.tags().containsKey("task-id")).collect(Collectors.toList());
+ .filter(metricName ->
metricName.tags().containsKey("task-id")).toList();
final List<MetricName> consumerPassedStreamTaskMetricNames =
INTERCEPTING_CONSUMERS.get(FIRST_INSTANCE_CLIENT).passedMetrics.stream().map(KafkaMetric::metricName)
- .filter(metricName ->
metricName.tags().containsKey("task-id")).collect(Collectors.toList());
+ .filter(metricName ->
metricName.tags().containsKey("task-id")).toList();
Review Comment:
side cleanup
##########
streams/src/main/java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporter.java:
##########
@@ -37,10 +38,10 @@ public class StreamsThreadMetricsDelegatingReporter
implements MetricsReporter {
private final String stateUpdaterThreadId;
- public StreamsThreadMetricsDelegatingReporter(final Consumer<byte[],
byte[]> consumer, final String threadId, final String stateUpdaterThreadId) {
+ public StreamsThreadMetricsDelegatingReporter(final Consumer<byte[],
byte[]> consumer, final String threadId, final Optional<String>
stateUpdaterThreadId) {
this.consumer = Objects.requireNonNull(consumer);
this.threadId = Objects.requireNonNull(threadId);
- this.stateUpdaterThreadId =
Objects.requireNonNull(stateUpdaterThreadId);
+ this.stateUpdaterThreadId = stateUpdaterThreadId.orElse("");
Review Comment:
Thinking this should be `stateUpdaterThreadId.orElse(null);` instead -
thoughts?
##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java:
##########
@@ -419,8 +485,36 @@ private Topology complexTopology() {
return builder.build();
}
- private Topology simpleTopology() {
+ private void addGlobalStore(final StreamsBuilder builder) {
+ builder.addGlobalStore(Stores.keyValueStoreBuilder(
+ Stores.inMemoryKeyValueStore("iq-test-store"),
+ Serdes.String(),
+ Serdes.String()
+ ),
+ globalStoreTopic,
+ Consumed.with(Serdes.String(), Serdes.String()),
+ () -> new Processor<>() {
+ private KeyValueStore<String, String> store;
+
+ @Override
+ public void init(final ProcessorContext<Void, Void>
context) {
+ store = context.getStateStore("iq-test-store");
+ }
+
+ @Override
+ public void process(final Record<String, String> record) {
+ store.put(record.key(), record.value());
+ globalStoreIterator = store.all();
Review Comment:
The store iterator is intentionally not closed here as it needs to be open
during the test so the Streams app will emit the
`org.apache.kafka.stream.state.oldest.iterator.open.since.ms` metric that is
expected. So the `globalStoreIterator` is a global variable (pun not intended)
so it can be closed in the `tearDown` method.
##########
streams/src/main/java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporter.java:
##########
@@ -37,10 +38,10 @@ public class StreamsThreadMetricsDelegatingReporter
implements MetricsReporter {
private final String stateUpdaterThreadId;
- public StreamsThreadMetricsDelegatingReporter(final Consumer<byte[],
byte[]> consumer, final String threadId, final String stateUpdaterThreadId) {
+ public StreamsThreadMetricsDelegatingReporter(final Consumer<byte[],
byte[]> consumer, final String threadId, final Optional<String>
stateUpdaterThreadId) {
Review Comment:
Since the reporting process is exactly the same for the global thread as a
stream thread, IMHO it made sense to re-use the existing reporter class. But it
seems better to have the `stateUpdaterThreadId` represented as an `Optional` to
signal that it may not be present
##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java:
##########
@@ -215,21 +273,21 @@ public void shouldPushMetricsToBroker(final String
recordingLevel) throws Except
public void shouldPassMetrics(final String topologyType, final boolean
stateUpdaterEnabled) throws Exception {
// Streams metrics should get passed to Admin and Consumer
streamsApplicationProperties = props(stateUpdaterEnabled);
- final Topology topology = topologyType.equals("simple") ?
simpleTopology() : complexTopology();
+ final Topology topology = topologyType.equals("simple") ?
simpleTopology(false) : complexTopology();
try (final KafkaStreams streams = new KafkaStreams(topology,
streamsApplicationProperties)) {
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
final List<MetricName> streamsThreadMetrics =
streams.metrics().values().stream().map(Metric::metricName)
- .filter(metricName ->
metricName.tags().containsKey("thread-id")).collect(Collectors.toList());
+ .filter(metricName ->
metricName.tags().containsKey("thread-id")).toList();
final List<MetricName> streamsClientMetrics =
streams.metrics().values().stream().map(Metric::metricName)
- .filter(metricName ->
metricName.group().equals("stream-metrics")).collect(Collectors.toList());
+ .filter(metricName ->
metricName.group().equals("stream-metrics")).toList();
- final List<MetricName> consumerPassedStreamThreadMetricNames =
INTERCEPTING_CONSUMERS.get(FIRST_INSTANCE_CLIENT).passedMetrics.stream().map(KafkaMetric::metricName).collect(Collectors.toList());
- final List<MetricName> adminPassedStreamClientMetricNames =
INTERCEPTING_ADMIN_CLIENTS.get(FIRST_INSTANCE_CLIENT).passedMetrics.stream().map(KafkaMetric::metricName).collect(Collectors.toList());
+ final List<MetricName> consumerPassedStreamThreadMetricNames =
INTERCEPTING_CONSUMERS.get(FIRST_INSTANCE_CLIENT).passedMetrics.stream().map(KafkaMetric::metricName).toList();
+ final List<MetricName> adminPassedStreamClientMetricNames =
INTERCEPTING_ADMIN_CLIENTS.get(FIRST_INSTANCE_CLIENT).passedMetrics.stream().map(KafkaMetric::metricName).toList();
Review Comment:
side cleanup
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]