This is an automated email from the ASF dual-hosted git repository.

gianm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 4378f10333b perf: Optimize aggregators for groupBy queries. (#19423)
4378f10333b is described below

commit 4378f10333b598ed3d3777f96ccea009fc327bab
Author: Gian Merlino <[email protected]>
AuthorDate: Thu May 7 01:13:41 2026 -0700

    perf: Optimize aggregators for groupBy queries. (#19423)
    
    Previously aggregators were only optimized for timeseries and topN.
    This patch adds optimization for groupBy too. It also adds a context
    parameter "optimizeAggregators" that can be used to switch off
    optimization, in case this is useful for debugging purposes.
---
 .../querykit/groupby/GroupByPreShuffleFrameProcessor.java |  6 +++++-
 .../druid/query/PerSegmentQueryOptimizationContext.java   | 15 +--------------
 .../main/java/org/apache/druid/query/QueryContext.java    |  8 ++++++++
 .../main/java/org/apache/druid/query/QueryContexts.java   | 15 ++++++++++++---
 .../query/aggregation/FilteredAggregatorFactory.java      |  2 +-
 .../java/org/apache/druid/query/groupby/GroupByQuery.java | 14 ++++++++++++++
 .../apache/druid/query/timeseries/TimeseriesQuery.java    |  3 +++
 .../main/java/org/apache/druid/query/topn/TopNQuery.java  |  3 +++
 8 files changed, 47 insertions(+), 19 deletions(-)

diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java
index ea342efdb6f..e960e9cff79 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java
@@ -49,6 +49,7 @@ import org.apache.druid.msq.input.table.SegmentsInputSlice;
 import org.apache.druid.msq.querykit.BaseLeafFrameProcessor;
 import org.apache.druid.msq.querykit.ReadableInput;
 import org.apache.druid.msq.querykit.SegmentReferenceHolder;
+import org.apache.druid.query.PerSegmentQueryOptimizationContext;
 import org.apache.druid.query.QueryToolChest;
 import org.apache.druid.query.aggregation.MetricManipulatorFns;
 import org.apache.druid.query.groupby.GroupByQuery;
@@ -193,8 +194,11 @@ public class GroupByPreShuffleFrameProcessor extends 
BaseLeafFrameProcessor
         rowSequence = 
Sequences.simple(List.of(GroupByTimeBoundaryUtils.computeTimeBoundaryResult(query,
 tbi)));
       } else {
         // Resolve this query using a cursor.
+        final GroupByQuery segmentQuery = (GroupByQuery) query
+            .withQuerySegmentSpec(new 
SpecificSegmentSpec(segmentHolder.getDescriptor()))
+            .optimizeForSegment(new 
PerSegmentQueryOptimizationContext(segmentHolder.getDescriptor()));
         rowSequence = groupingEngine.process(
-            query.withQuerySegmentSpec(new 
SpecificSegmentSpec(segmentHolder.getDescriptor())),
+            segmentQuery,
             Objects.requireNonNull(segment.as(CursorFactory.class)),
             tbi,
             bufferPool,
diff --git 
a/processing/src/main/java/org/apache/druid/query/PerSegmentQueryOptimizationContext.java
 
b/processing/src/main/java/org/apache/druid/query/PerSegmentQueryOptimizationContext.java
index 46a58776fa3..0d1257cbe46 100644
--- 
a/processing/src/main/java/org/apache/druid/query/PerSegmentQueryOptimizationContext.java
+++ 
b/processing/src/main/java/org/apache/druid/query/PerSegmentQueryOptimizationContext.java
@@ -25,19 +25,6 @@ package org.apache.druid.query;
  *
  * @see PerSegmentOptimizingQueryRunner
  */
-public class PerSegmentQueryOptimizationContext
+public record PerSegmentQueryOptimizationContext(SegmentDescriptor 
segmentDescriptor)
 {
-  private final SegmentDescriptor segmentDescriptor;
-
-  public PerSegmentQueryOptimizationContext(
-      SegmentDescriptor segmentDescriptor
-  )
-  {
-    this.segmentDescriptor = segmentDescriptor;
-  }
-
-  public SegmentDescriptor getSegmentDescriptor()
-  {
-    return segmentDescriptor;
-  }
 }
diff --git a/processing/src/main/java/org/apache/druid/query/QueryContext.java 
b/processing/src/main/java/org/apache/druid/query/QueryContext.java
index 42ed66978ff..39f325d1089 100644
--- a/processing/src/main/java/org/apache/druid/query/QueryContext.java
+++ b/processing/src/main/java/org/apache/druid/query/QueryContext.java
@@ -461,6 +461,14 @@ public class QueryContext
     );
   }
 
+  public boolean isOptimizeAggregators()
+  {
+    return getBoolean(
+        QueryContexts.OPTIMIZE_AGGREGATORS_KEY,
+        QueryContexts.DEFAULT_OPTIMIZE_AGGREGATORS
+    );
+  }
+
   public long getMaxQueuedBytes(long defaultValue)
   {
     return getLong(QueryContexts.MAX_QUEUED_BYTES_KEY, defaultValue);
diff --git a/processing/src/main/java/org/apache/druid/query/QueryContexts.java 
b/processing/src/main/java/org/apache/druid/query/QueryContexts.java
index 6df392d9571..1cb8aa24cf4 100644
--- a/processing/src/main/java/org/apache/druid/query/QueryContexts.java
+++ b/processing/src/main/java/org/apache/druid/query/QueryContexts.java
@@ -27,6 +27,7 @@ import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Numbers;
 import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.query.aggregation.AggregatorFactory;
 
 import javax.annotation.Nullable;
 import java.math.BigDecimal;
@@ -66,9 +67,16 @@ public class QueryContexts
   public static final String MAX_NUMERIC_IN_FILTERS = "maxNumericInFilters";
   public static final String CURSOR_AUTO_ARRANGE_FILTERS = 
"cursorAutoArrangeFilters";
   public static final String CLONE_QUERY_MODE = "cloneQueryMode";
-  // This flag controls whether a SQL join query with left scan should be 
attempted to be run as direct table access
-  // instead of being wrapped inside a query. With direct table access 
enabled, Druid can push down the join operation to
-  // data servers.
+  /**
+   * This flag controls whether {@link 
AggregatorFactory#optimizeForSegment(PerSegmentQueryOptimizationContext)}
+   * is used. It is undocumented because its main purpose is to help 
developers debug issues with the optimizations.
+   */
+  public static final String OPTIMIZE_AGGREGATORS_KEY = "optimizeAggregators";
+  /**
+   * This flag controls whether a SQL join query with left scan should be 
attempted to be run as direct table access
+   * instead of being wrapped inside a query. With direct table access 
enabled, Druid can push down the join operation to
+   * data servers.
+   */
   public static final String SQL_JOIN_LEFT_SCAN_DIRECT = 
"enableJoinLeftTableScanDirect";
   public static final String USE_FILTER_CNF_KEY = "useFilterCNF";
   public static final String NUM_RETRIES_ON_MISSING_SEGMENTS_KEY = 
"numRetriesOnMissingSegments";
@@ -176,6 +184,7 @@ public class QueryContexts
   public static final boolean DEFAULT_CATALOG_VALIDATION_ENABLED = true;
   public static final boolean DEFAULT_USE_NESTED_FOR_UNKNOWN_TYPE_IN_SUBQUERY 
= false;
   public static final boolean DEFAULT_EXTENDED_FILTERED_SUM_REWRITE_ENABLED = 
true;
+  public static final boolean DEFAULT_OPTIMIZE_AGGREGATORS = true;
   public static final boolean DEFAULT_CTX_FULL_REPORT = false;
 
 
diff --git 
a/processing/src/main/java/org/apache/druid/query/aggregation/FilteredAggregatorFactory.java
 
b/processing/src/main/java/org/apache/druid/query/aggregation/FilteredAggregatorFactory.java
index 9b8c0d20381..e6ed1cb3da3 100644
--- 
a/processing/src/main/java/org/apache/druid/query/aggregation/FilteredAggregatorFactory.java
+++ 
b/processing/src/main/java/org/apache/druid/query/aggregation/FilteredAggregatorFactory.java
@@ -234,7 +234,7 @@ public class FilteredAggregatorFactory extends 
AggregatorFactory
         return this;
       }
 
-      Interval segmentInterval = 
optimizationContext.getSegmentDescriptor().getInterval();
+      Interval segmentInterval = 
optimizationContext.segmentDescriptor().getInterval();
       List<Interval> filterIntervals = intervalDimFilter.getIntervals();
       List<Interval> excludedFilterIntervals = new ArrayList<>();
       List<Interval> effectiveFilterIntervals = new ArrayList<>();
diff --git 
a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java 
b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java
index d31f130f665..64a8ca0b5f6 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java
@@ -44,6 +44,7 @@ import org.apache.druid.java.util.common.guava.Sequences;
 import org.apache.druid.query.BaseQuery;
 import org.apache.druid.query.DataSource;
 import org.apache.druid.query.DimensionComparisonUtils;
+import org.apache.druid.query.PerSegmentQueryOptimizationContext;
 import org.apache.druid.query.Queries;
 import org.apache.druid.query.Query;
 import org.apache.druid.query.QueryDataSource;
@@ -874,6 +875,19 @@ public class GroupByQuery extends BaseQuery<ResultRow>
     return new 
Builder(this).setPostAggregatorSpecs(postAggregatorSpecs).build();
   }
 
+  @Override
+  public Query<ResultRow> 
optimizeForSegment(PerSegmentQueryOptimizationContext optimizationContext)
+  {
+    if (!context().isOptimizeAggregators()) {
+      return this;
+    }
+    final List<AggregatorFactory> optimizedAggs = new 
ArrayList<>(aggregatorSpecs.size());
+    for (AggregatorFactory aggregatorFactory : aggregatorSpecs) {
+      
optimizedAggs.add(aggregatorFactory.optimizeForSegment(optimizationContext));
+    }
+    return withAggregatorSpecs(optimizedAggs);
+  }
+
   private static void verifyOutputNames(
       List<DimensionSpec> dimensions,
       List<AggregatorFactory> aggregators,
diff --git 
a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQuery.java
 
b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQuery.java
index b2165cd5b70..7db72689d1b 100644
--- 
a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQuery.java
+++ 
b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQuery.java
@@ -230,6 +230,9 @@ public class TimeseriesQuery extends 
BaseQuery<Result<TimeseriesResultValue>>
   @Override
   public Query<Result<TimeseriesResultValue>> 
optimizeForSegment(PerSegmentQueryOptimizationContext optimizationContext)
   {
+    if (!context().isOptimizeAggregators()) {
+      return this;
+    }
     return 
Druids.TimeseriesQueryBuilder.copy(this).aggregators(optimizeAggs(optimizationContext)).build();
   }
 
diff --git 
a/processing/src/main/java/org/apache/druid/query/topn/TopNQuery.java 
b/processing/src/main/java/org/apache/druid/query/topn/TopNQuery.java
index aee76989db3..679acd43e8c 100644
--- a/processing/src/main/java/org/apache/druid/query/topn/TopNQuery.java
+++ b/processing/src/main/java/org/apache/druid/query/topn/TopNQuery.java
@@ -226,6 +226,9 @@ public class TopNQuery extends 
BaseQuery<Result<TopNResultValue>>
   @Override
   public Query<Result<TopNResultValue>> 
optimizeForSegment(PerSegmentQueryOptimizationContext optimizationContext)
   {
+    if (!context().isOptimizeAggregators()) {
+      return this;
+    }
     return new 
TopNQueryBuilder(this).aggregators(optimizeAggs(optimizationContext)).build();
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to