This is an automated email from the ASF dual-hosted git repository.
jtuglu1 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new f357a4b86af fix: Peon metrics emit same value (#19484)
f357a4b86af is described below
commit f357a4b86af4cbb2632be49120a80da3a6cfa7c4
Author: Virushade <[email protected]>
AuthorDate: Thu May 21 02:27:58 2026 +0800
fix: Peon metrics emit same value (#19484)
This PR fixes incorrect Peon sink-level query metric emission in
SinkQuerySegmentWalker. This bug exists since v32, introduced by
https://github.com/apache/druid/pull/17170.
The existing code iterated over METRICS_TO_REPORT and used a switch without
break statements. Because reportMetric.getValue() is bound to the current
metric reporter, fallthrough caused later accumulator values to be reported
through the wrong reporter. For example, the query/segment/time reporter could
be called with segment time, then wait time, then segment-and-cache time before
emit(). Since DefaultQueryMetrics stores metric values by metric name before
emission, the last value ov [...]
---
.../appenderator/SinkQuerySegmentWalker.java | 25 +++++-----------------
.../appenderator/StreamAppenderatorTest.java | 15 +++++++------
2 files changed, 13 insertions(+), 27 deletions(-)
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
index 0fbff897711..2b5a6210b2c 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
@@ -22,7 +22,6 @@ package org.apache.druid.segment.realtime.appenderator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.client.CachingQueryRunner;
import org.apache.druid.client.cache.Cache;
@@ -90,7 +89,6 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.ObjLongConsumer;
import java.util.stream.Collectors;
/**
@@ -107,13 +105,6 @@ public class SinkQuerySegmentWalker implements
QuerySegmentWalker
DefaultQueryMetrics.QUERY_SEGMENT_AND_CACHE_TIME
);
- private static final Map<String, ObjLongConsumer<? super QueryMetrics<?>>>
METRICS_TO_REPORT =
- ImmutableMap.of(
- DefaultQueryMetrics.QUERY_SEGMENT_TIME,
QueryMetrics::reportSegmentTime,
- DefaultQueryMetrics.QUERY_SEGMENT_AND_CACHE_TIME,
QueryMetrics::reportSegmentAndCacheTime,
- DefaultQueryMetrics.QUERY_WAIT_TIME, QueryMetrics::reportWaitTime
- );
-
private final String dataSource;
// Maintain a timeline of ids and Sinks for all the segments including the
base and upgraded versions
@@ -539,17 +530,11 @@ public class SinkQuerySegmentWalker implements
QuerySegmentWalker
for (Map.Entry<String, SegmentMetrics> segmentAndMetrics :
segmentMetricsAccumulator.entrySet()) {
queryMetrics.segment(segmentAndMetrics.getKey());
- for (Map.Entry<String, ObjLongConsumer<? super
QueryMetrics<?>>> reportMetric : METRICS_TO_REPORT.entrySet()) {
- final String metricName = reportMetric.getKey();
- switch (metricName) {
- case DefaultQueryMetrics.QUERY_SEGMENT_TIME:
- reportMetric.getValue().accept(queryMetrics,
segmentAndMetrics.getValue().getSegmentTime());
- case DefaultQueryMetrics.QUERY_WAIT_TIME:
- reportMetric.getValue().accept(queryMetrics,
segmentAndMetrics.getValue().getWaitTime());
- case DefaultQueryMetrics.QUERY_SEGMENT_AND_CACHE_TIME:
- reportMetric.getValue().accept(queryMetrics,
segmentAndMetrics.getValue().getSegmentAndCacheTime());
- }
- }
+ final SegmentMetrics segmentMetrics =
segmentAndMetrics.getValue();
+
+
queryMetrics.reportSegmentTime(segmentMetrics.getSegmentTime());
+ queryMetrics.reportWaitTime(segmentMetrics.getWaitTime());
+
queryMetrics.reportSegmentAndCacheTime(segmentMetrics.getSegmentAndCacheTime());
try {
queryMetrics.emit(emitter);
diff --git
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java
index 6f903017802..904d99b7cde 100644
---
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java
+++
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java
@@ -35,6 +35,7 @@ import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.metadata.PendingSegmentRecord;
+import org.apache.druid.query.DefaultQueryMetrics;
import org.apache.druid.query.Druids;
import org.apache.druid.query.Order;
import org.apache.druid.query.QueryPlus;
@@ -2296,14 +2297,14 @@ public class StreamAppenderatorTest extends
InitializedNullHandlingTest
private void verifySinkMetrics(StubServiceEmitter emitter, Set<String>
segmentIds)
{
int segments = segmentIds.size();
- emitter.verifyEmitted("query/cpu/time", 1);
- Assert.assertEquals(segments,
emitter.getMetricEvents("query/segment/time").size());
- Assert.assertEquals(segments,
emitter.getMetricEvents("query/segmentAndCache/time").size());
- Assert.assertEquals(segments,
emitter.getMetricEvents("query/wait/time").size());
+ emitter.verifyEmitted(DefaultQueryMetrics.QUERY_CPU_TIME, 1);
+ Assert.assertEquals(segments,
emitter.getMetricEvents(DefaultQueryMetrics.QUERY_SEGMENT_TIME).size());
+ Assert.assertEquals(segments,
emitter.getMetricEvents(DefaultQueryMetrics.QUERY_SEGMENT_AND_CACHE_TIME).size());
+ Assert.assertEquals(segments,
emitter.getMetricEvents(DefaultQueryMetrics.QUERY_WAIT_TIME).size());
for (String id : segmentIds) {
-
Assert.assertTrue(emitter.getMetricEvents("query/segment/time").stream().anyMatch(value
-> value.getUserDims().containsValue(id)));
-
Assert.assertTrue(emitter.getMetricEvents("query/segmentAndCache/time").stream().anyMatch(value
-> value.getUserDims().containsValue(id)));
-
Assert.assertTrue(emitter.getMetricEvents("query/wait/time").stream().anyMatch(value
-> value.getUserDims().containsValue(id)));
+
Assert.assertTrue(emitter.getMetricEvents(DefaultQueryMetrics.QUERY_SEGMENT_TIME).stream().anyMatch(value
-> value.getUserDims().containsValue(id)));
+
Assert.assertTrue(emitter.getMetricEvents(DefaultQueryMetrics.QUERY_SEGMENT_AND_CACHE_TIME).stream().anyMatch(value
-> value.getUserDims().containsValue(id)));
+
Assert.assertTrue(emitter.getMetricEvents(DefaultQueryMetrics.QUERY_WAIT_TIME).stream().anyMatch(value
-> value.getUserDims().containsValue(id)));
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]