This is an automated email from the ASF dual-hosted git repository.

jtuglu1 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new f357a4b86af fix: Peon metrics emit same value (#19484)
f357a4b86af is described below

commit f357a4b86af4cbb2632be49120a80da3a6cfa7c4
Author: Virushade <[email protected]>
AuthorDate: Thu May 21 02:27:58 2026 +0800

    fix: Peon metrics emit same value (#19484)
    
    This PR fixes incorrect Peon sink-level query metric emission in 
SinkQuerySegmentWalker. This bug exists since v32, introduced by 
https://github.com/apache/druid/pull/17170.
    
    The existing code iterated over METRICS_TO_REPORT and used a switch without 
break statements. Because reportMetric.getValue() is bound to the current 
metric reporter, fallthrough caused later accumulator values to be reported 
through the wrong reporter. For example, the query/segment/time reporter could 
be called with segment time, then wait time, then segment-and-cache time before 
emit(). Since DefaultQueryMetrics stores metric values by metric name before 
emission, the last value ov [...]
---
 .../appenderator/SinkQuerySegmentWalker.java       | 25 +++++-----------------
 .../appenderator/StreamAppenderatorTest.java       | 15 +++++++------
 2 files changed, 13 insertions(+), 27 deletions(-)

diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
index 0fbff897711..2b5a6210b2c 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
@@ -22,7 +22,6 @@ package org.apache.druid.segment.realtime.appenderator;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import org.apache.druid.client.CachingQueryRunner;
 import org.apache.druid.client.cache.Cache;
@@ -90,7 +89,6 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.ObjLongConsumer;
 import java.util.stream.Collectors;
 
 /**
@@ -107,13 +105,6 @@ public class SinkQuerySegmentWalker implements 
QuerySegmentWalker
       DefaultQueryMetrics.QUERY_SEGMENT_AND_CACHE_TIME
   );
 
-  private static final Map<String, ObjLongConsumer<? super QueryMetrics<?>>> 
METRICS_TO_REPORT =
-      ImmutableMap.of(
-          DefaultQueryMetrics.QUERY_SEGMENT_TIME, 
QueryMetrics::reportSegmentTime,
-          DefaultQueryMetrics.QUERY_SEGMENT_AND_CACHE_TIME, 
QueryMetrics::reportSegmentAndCacheTime,
-          DefaultQueryMetrics.QUERY_WAIT_TIME, QueryMetrics::reportWaitTime
-      );
-
   private final String dataSource;
 
   // Maintain a timeline of ids and Sinks for all the segments including the 
base and upgraded versions
@@ -539,17 +530,11 @@ public class SinkQuerySegmentWalker implements 
QuerySegmentWalker
                 for (Map.Entry<String, SegmentMetrics> segmentAndMetrics : 
segmentMetricsAccumulator.entrySet()) {
                   queryMetrics.segment(segmentAndMetrics.getKey());
 
-                  for (Map.Entry<String, ObjLongConsumer<? super 
QueryMetrics<?>>> reportMetric : METRICS_TO_REPORT.entrySet()) {
-                    final String metricName = reportMetric.getKey();
-                    switch (metricName) {
-                      case DefaultQueryMetrics.QUERY_SEGMENT_TIME:
-                        reportMetric.getValue().accept(queryMetrics, 
segmentAndMetrics.getValue().getSegmentTime());
-                      case DefaultQueryMetrics.QUERY_WAIT_TIME:
-                        reportMetric.getValue().accept(queryMetrics, 
segmentAndMetrics.getValue().getWaitTime());
-                      case DefaultQueryMetrics.QUERY_SEGMENT_AND_CACHE_TIME:
-                        reportMetric.getValue().accept(queryMetrics, 
segmentAndMetrics.getValue().getSegmentAndCacheTime());
-                    }
-                  }
+                  final SegmentMetrics segmentMetrics = 
segmentAndMetrics.getValue();
+
+                  
queryMetrics.reportSegmentTime(segmentMetrics.getSegmentTime());
+                  queryMetrics.reportWaitTime(segmentMetrics.getWaitTime());
+                  
queryMetrics.reportSegmentAndCacheTime(segmentMetrics.getSegmentAndCacheTime());
 
                   try {
                     queryMetrics.emit(emitter);
diff --git 
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java
 
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java
index 6f903017802..904d99b7cde 100644
--- 
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java
@@ -35,6 +35,7 @@ import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.metrics.StubServiceEmitter;
 import org.apache.druid.metadata.PendingSegmentRecord;
+import org.apache.druid.query.DefaultQueryMetrics;
 import org.apache.druid.query.Druids;
 import org.apache.druid.query.Order;
 import org.apache.druid.query.QueryPlus;
@@ -2296,14 +2297,14 @@ public class StreamAppenderatorTest extends 
InitializedNullHandlingTest
   private void verifySinkMetrics(StubServiceEmitter emitter, Set<String> 
segmentIds)
   {
     int segments = segmentIds.size();
-    emitter.verifyEmitted("query/cpu/time", 1);
-    Assert.assertEquals(segments, 
emitter.getMetricEvents("query/segment/time").size());
-    Assert.assertEquals(segments, 
emitter.getMetricEvents("query/segmentAndCache/time").size());
-    Assert.assertEquals(segments, 
emitter.getMetricEvents("query/wait/time").size());
+    emitter.verifyEmitted(DefaultQueryMetrics.QUERY_CPU_TIME, 1);
+    Assert.assertEquals(segments, 
emitter.getMetricEvents(DefaultQueryMetrics.QUERY_SEGMENT_TIME).size());
+    Assert.assertEquals(segments, 
emitter.getMetricEvents(DefaultQueryMetrics.QUERY_SEGMENT_AND_CACHE_TIME).size());
+    Assert.assertEquals(segments, 
emitter.getMetricEvents(DefaultQueryMetrics.QUERY_WAIT_TIME).size());
     for (String id : segmentIds) {
-      
Assert.assertTrue(emitter.getMetricEvents("query/segment/time").stream().anyMatch(value
 -> value.getUserDims().containsValue(id)));
-      
Assert.assertTrue(emitter.getMetricEvents("query/segmentAndCache/time").stream().anyMatch(value
 -> value.getUserDims().containsValue(id)));
-      
Assert.assertTrue(emitter.getMetricEvents("query/wait/time").stream().anyMatch(value
 -> value.getUserDims().containsValue(id)));
+      
Assert.assertTrue(emitter.getMetricEvents(DefaultQueryMetrics.QUERY_SEGMENT_TIME).stream().anyMatch(value
 -> value.getUserDims().containsValue(id)));
+      
Assert.assertTrue(emitter.getMetricEvents(DefaultQueryMetrics.QUERY_SEGMENT_AND_CACHE_TIME).stream().anyMatch(value
 -> value.getUserDims().containsValue(id)));
+      
Assert.assertTrue(emitter.getMetricEvents(DefaultQueryMetrics.QUERY_WAIT_TIME).stream().anyMatch(value
 -> value.getUserDims().containsValue(id)));
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to