This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 8dc28e0  Allow Metadata and Dictionary Based Plans for No Op Filters 
(#7563)
8dc28e0 is described below

commit 8dc28e010be4a9870db27da8765ec0b825c2cef9
Author: Atri Sharma <atri.j...@gmail.com>
AuthorDate: Tue Oct 26 06:27:48 2021 +0530

    Allow Metadata and Dictionary Based Plans for No Op Filters (#7563)
    
    When building aggregation plans, we ought to also consider the fact
    that some filters might be no-ops on certain segments. For those segments,
    we should consider using metadata or dictionary based plans.
---
 .../plan/AggregationGroupByOrderByPlanNode.java    | 12 ++-
 .../core/plan/AggregationGroupByPlanNode.java      | 12 ++-
 .../pinot/core/plan/AggregationPlanNode.java       | 15 ++--
 .../apache/pinot/core/plan/DocIdSetPlanNode.java   | 11 ++-
 .../org/apache/pinot/core/plan/FilterPlanNode.java | 49 +++++++----
 .../apache/pinot/core/plan/ProjectionPlanNode.java |  9 +-
 .../apache/pinot/core/plan/TransformPlanNode.java  | 11 ++-
 .../apache/pinot/core/startree/StarTreeUtils.java  | 21 +++--
 ...adataAndDictionaryAggregationPlanMakerTest.java | 53 +++++++-----
 .../pinot/core/startree/v2/BaseStarTreeV2Test.java | 11 +--
 .../pinot/queries/DistinctCountQueriesTest.java    | 77 ++++++++++-------
 ...SegmentPartitionedDistinctCountQueriesTest.java | 97 ++++++++++++----------
 12 files changed, 238 insertions(+), 140 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByOrderByPlanNode.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByOrderByPlanNode.java
index 81d3857..58118f4 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByOrderByPlanNode.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByOrderByPlanNode.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.core.operator.filter.BaseFilterOperator;
 import org.apache.pinot.core.operator.query.AggregationGroupByOrderByOperator;
 import org.apache.pinot.core.operator.transform.TransformOperator;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
@@ -58,6 +59,9 @@ public class AggregationGroupByOrderByPlanNode implements 
PlanNode {
     AggregationFunction[] aggregationFunctions = 
_queryContext.getAggregationFunctions();
     ExpressionContext[] groupByExpressions = 
_queryContext.getGroupByExpressions().toArray(new ExpressionContext[0]);
 
+    FilterPlanNode filterPlanNode = new FilterPlanNode(_indexSegment, 
_queryContext);
+    BaseFilterOperator filterOperator = filterPlanNode.run();
+
     // Use star-tree to solve the query if possible
     List<StarTreeV2> starTrees = _indexSegment.getStarTrees();
     if (starTrees != null && !StarTreeUtils.isStarTreeDisabled(_queryContext)) 
{
@@ -65,7 +69,8 @@ public class AggregationGroupByOrderByPlanNode implements 
PlanNode {
           StarTreeUtils.extractAggregationFunctionPairs(aggregationFunctions);
       if (aggregationFunctionColumnPairs != null) {
         Map<String, List<CompositePredicateEvaluator>> predicateEvaluatorsMap =
-            StarTreeUtils.extractPredicateEvaluatorsMap(_indexSegment, 
_queryContext.getFilter());
+            StarTreeUtils.extractPredicateEvaluatorsMap(_indexSegment, 
_queryContext.getFilter(),
+                filterPlanNode.getPredicateEvaluatorMap());
         if (predicateEvaluatorsMap != null) {
           for (StarTreeV2 starTreeV2 : starTrees) {
             if (StarTreeUtils.isFitForStarTree(starTreeV2.getMetadata(), 
aggregationFunctionColumnPairs,
@@ -83,8 +88,9 @@ public class AggregationGroupByOrderByPlanNode implements 
PlanNode {
 
     Set<ExpressionContext> expressionsToTransform =
         
AggregationFunctionUtils.collectExpressionsToTransform(aggregationFunctions, 
groupByExpressions);
-    TransformOperator transformPlanNode = new TransformPlanNode(_indexSegment, 
_queryContext, expressionsToTransform,
-        DocIdSetPlanNode.MAX_DOC_PER_CALL).run();
+    TransformOperator transformPlanNode =
+        new TransformPlanNode(_indexSegment, _queryContext, 
expressionsToTransform, DocIdSetPlanNode.MAX_DOC_PER_CALL,
+            filterOperator).run();
     return new AggregationGroupByOrderByOperator(aggregationFunctions, 
groupByExpressions, transformPlanNode,
         numTotalDocs, _queryContext, false);
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByPlanNode.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByPlanNode.java
index 33d83de..f9d0e09 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByPlanNode.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByPlanNode.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.core.operator.filter.BaseFilterOperator;
 import org.apache.pinot.core.operator.query.AggregationGroupByOperator;
 import org.apache.pinot.core.operator.transform.TransformOperator;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
@@ -58,6 +59,9 @@ public class AggregationGroupByPlanNode implements PlanNode {
     AggregationFunction[] aggregationFunctions = 
_queryContext.getAggregationFunctions();
     ExpressionContext[] groupByExpressions = 
_queryContext.getGroupByExpressions().toArray(new ExpressionContext[0]);
 
+    FilterPlanNode filterPlanNode = new FilterPlanNode(_indexSegment, 
_queryContext);
+    BaseFilterOperator filterOperator = filterPlanNode.run();
+
     // Use star-tree to solve the query if possible
     List<StarTreeV2> starTrees = _indexSegment.getStarTrees();
     if (starTrees != null && !StarTreeUtils.isStarTreeDisabled(_queryContext)) 
{
@@ -65,7 +69,8 @@ public class AggregationGroupByPlanNode implements PlanNode {
           StarTreeUtils.extractAggregationFunctionPairs(aggregationFunctions);
       if (aggregationFunctionColumnPairs != null) {
         Map<String, List<CompositePredicateEvaluator>> predicateEvaluatorsMap =
-            StarTreeUtils.extractPredicateEvaluatorsMap(_indexSegment, 
_queryContext.getFilter());
+            StarTreeUtils.extractPredicateEvaluatorsMap(_indexSegment, 
_queryContext.getFilter(),
+                filterPlanNode.getPredicateEvaluatorMap());
         if (predicateEvaluatorsMap != null) {
           for (StarTreeV2 starTreeV2 : starTrees) {
             if (StarTreeUtils.isFitForStarTree(starTreeV2.getMetadata(), 
aggregationFunctionColumnPairs,
@@ -83,8 +88,9 @@ public class AggregationGroupByPlanNode implements PlanNode {
 
     Set<ExpressionContext> expressionsToTransform =
         
AggregationFunctionUtils.collectExpressionsToTransform(aggregationFunctions, 
groupByExpressions);
-    TransformOperator transformPlanNode = new TransformPlanNode(_indexSegment, 
_queryContext, expressionsToTransform,
-        DocIdSetPlanNode.MAX_DOC_PER_CALL).run();
+    TransformOperator transformPlanNode =
+        new TransformPlanNode(_indexSegment, _queryContext, 
expressionsToTransform, DocIdSetPlanNode.MAX_DOC_PER_CALL,
+            filterOperator).run();
     return new AggregationGroupByOperator(_queryContext, groupByExpressions, 
transformPlanNode, numTotalDocs, false);
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
index c1cec0b..d7318d9 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
@@ -26,6 +26,7 @@ import java.util.Set;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
+import org.apache.pinot.core.operator.filter.BaseFilterOperator;
 import org.apache.pinot.core.operator.query.AggregationOperator;
 import org.apache.pinot.core.operator.query.DictionaryBasedAggregationOperator;
 import org.apache.pinot.core.operator.query.MetadataBasedAggregationOperator;
@@ -64,10 +65,12 @@ public class AggregationPlanNode implements PlanNode {
     int numTotalDocs = _indexSegment.getSegmentMetadata().getTotalDocs();
     AggregationFunction[] aggregationFunctions = 
_queryContext.getAggregationFunctions();
 
+    FilterPlanNode filterPlanNode = new FilterPlanNode(_indexSegment, 
_queryContext);
+    BaseFilterOperator filterOperator = filterPlanNode.run();
+
     // Use metadata/dictionary to solve the query if possible
-    // NOTE: Skip the segment with valid doc index because the valid doc index 
is equivalent to a filter
     // TODO: Use the same operator for both of them so that COUNT(*), MAX(col) 
can be optimized
-    if (_queryContext.getFilter() == null && _indexSegment.getValidDocIds() == 
null) {
+    if (filterOperator.isResultMatchingAll()) {
       if (isFitForMetadataBasedPlan(aggregationFunctions)) {
         return new MetadataBasedAggregationOperator(aggregationFunctions, 
_indexSegment.getSegmentMetadata(),
             Collections.emptyMap());
@@ -88,7 +91,8 @@ public class AggregationPlanNode implements PlanNode {
           StarTreeUtils.extractAggregationFunctionPairs(aggregationFunctions);
       if (aggregationFunctionColumnPairs != null) {
         Map<String, List<CompositePredicateEvaluator>> predicateEvaluatorsMap =
-            StarTreeUtils.extractPredicateEvaluatorsMap(_indexSegment, 
_queryContext.getFilter());
+            StarTreeUtils.extractPredicateEvaluatorsMap(_indexSegment, 
_queryContext.getFilter(),
+                filterPlanNode.getPredicateEvaluatorMap());
         if (predicateEvaluatorsMap != null) {
           for (StarTreeV2 starTreeV2 : starTrees) {
             if (StarTreeUtils.isFitForStarTree(starTreeV2.getMetadata(), 
aggregationFunctionColumnPairs, null,
@@ -105,8 +109,9 @@ public class AggregationPlanNode implements PlanNode {
 
     Set<ExpressionContext> expressionsToTransform =
         
AggregationFunctionUtils.collectExpressionsToTransform(aggregationFunctions, 
null);
-    TransformOperator transformOperator = new TransformPlanNode(_indexSegment, 
_queryContext, expressionsToTransform,
-        DocIdSetPlanNode.MAX_DOC_PER_CALL).run();
+    TransformOperator transformOperator =
+        new TransformPlanNode(_indexSegment, _queryContext, 
expressionsToTransform, DocIdSetPlanNode.MAX_DOC_PER_CALL,
+            filterOperator).run();
     return new AggregationOperator(aggregationFunctions, transformOperator, 
numTotalDocs, false);
   }
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/DocIdSetPlanNode.java 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/DocIdSetPlanNode.java
index 6745c5f..fd58e8f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/DocIdSetPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/DocIdSetPlanNode.java
@@ -18,7 +18,9 @@
  */
 package org.apache.pinot.core.plan;
 
+import javax.annotation.Nullable;
 import org.apache.pinot.core.operator.DocIdSetOperator;
+import org.apache.pinot.core.operator.filter.BaseFilterOperator;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.segment.spi.IndexSegment;
 
@@ -29,17 +31,22 @@ public class DocIdSetPlanNode implements PlanNode {
   private final IndexSegment _indexSegment;
   private final QueryContext _queryContext;
   private final int _maxDocPerCall;
+  private final BaseFilterOperator _filterOperator;
 
-  public DocIdSetPlanNode(IndexSegment indexSegment, QueryContext 
queryContext, int maxDocPerCall) {
+  public DocIdSetPlanNode(IndexSegment indexSegment, QueryContext 
queryContext, int maxDocPerCall,
+      @Nullable BaseFilterOperator filterOperator) {
     assert maxDocPerCall > 0 && maxDocPerCall <= MAX_DOC_PER_CALL;
 
     _indexSegment = indexSegment;
     _queryContext = queryContext;
     _maxDocPerCall = maxDocPerCall;
+    _filterOperator = filterOperator;
   }
 
   @Override
   public DocIdSetOperator run() {
-    return new DocIdSetOperator(new FilterPlanNode(_indexSegment, 
_queryContext).run(), _maxDocPerCall);
+    return new DocIdSetOperator(
+        _filterOperator != null ? _filterOperator : new 
FilterPlanNode(_indexSegment, _queryContext).run(),
+        _maxDocPerCall);
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
index da818f1..276cc57 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
@@ -21,9 +21,9 @@ package org.apache.pinot.core.plan;
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import javax.annotation.Nullable;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.request.context.FilterContext;
 import org.apache.pinot.common.request.context.FunctionContext;
@@ -60,6 +60,9 @@ public class FilterPlanNode implements PlanNode {
   private final QueryContext _queryContext;
   private final int _numDocs;
 
+  // Cache the predicate evaluators
+  private final Map<Predicate, PredicateEvaluator> _predicateEvaluatorMap = 
new HashMap<>();
+
   public FilterPlanNode(IndexSegment indexSegment, QueryContext queryContext) {
     _indexSegment = indexSegment;
     _queryContext = queryContext;
@@ -74,7 +77,7 @@ public class FilterPlanNode implements PlanNode {
     ThreadSafeMutableRoaringBitmap validDocIds = 
_indexSegment.getValidDocIds();
     boolean applyValidDocIds = validDocIds != null && 
!QueryOptionsUtils.isSkipUpsert(_queryContext.getQueryOptions());
     if (filter != null) {
-      BaseFilterOperator filterOperator = constructPhysicalOperator(filter, 
_queryContext.getDebugOptions());
+      BaseFilterOperator filterOperator = constructPhysicalOperator(filter);
       if (applyValidDocIds) {
         BaseFilterOperator validDocFilter =
             new 
BitmapBasedFilterOperator(validDocIds.getMutableRoaringBitmap(), false, 
_numDocs);
@@ -91,6 +94,13 @@ public class FilterPlanNode implements PlanNode {
   }
 
   /**
+   * Returns a map from predicates to their evaluators.
+   */
+  public Map<Predicate, PredicateEvaluator> getPredicateEvaluatorMap() {
+    return _predicateEvaluatorMap;
+  }
+
+  /**
    * H3 index can be applied iff:
    * <ul>
    *   <li>Predicate is of type RANGE</li>
@@ -126,14 +136,13 @@ public class FilterPlanNode implements PlanNode {
   /**
    * Helper method to build the operator tree from the filter.
    */
-  private BaseFilterOperator constructPhysicalOperator(FilterContext filter,
-      @Nullable Map<String, String> debugOptions) {
+  private BaseFilterOperator constructPhysicalOperator(FilterContext filter) {
     switch (filter.getType()) {
       case AND:
         List<FilterContext> childFilters = filter.getChildren();
         List<BaseFilterOperator> childFilterOperators = new 
ArrayList<>(childFilters.size());
         for (FilterContext childFilter : childFilters) {
-          BaseFilterOperator childFilterOperator = 
constructPhysicalOperator(childFilter, debugOptions);
+          BaseFilterOperator childFilterOperator = 
constructPhysicalOperator(childFilter);
           if (childFilterOperator.isResultEmpty()) {
             // Return empty filter operator if any of the child filter 
operator's result is empty
             return EmptyFilterOperator.getInstance();
@@ -142,12 +151,13 @@ public class FilterPlanNode implements PlanNode {
             childFilterOperators.add(childFilterOperator);
           }
         }
-        return FilterOperatorUtils.getAndFilterOperator(childFilterOperators, 
_numDocs, debugOptions);
+        return FilterOperatorUtils.getAndFilterOperator(childFilterOperators, 
_numDocs,
+            _queryContext.getDebugOptions());
       case OR:
         childFilters = filter.getChildren();
         childFilterOperators = new ArrayList<>(childFilters.size());
         for (FilterContext childFilter : childFilters) {
-          BaseFilterOperator childFilterOperator = 
constructPhysicalOperator(childFilter, debugOptions);
+          BaseFilterOperator childFilterOperator = 
constructPhysicalOperator(childFilter);
           if (childFilterOperator.isResultMatchingAll()) {
             // Return match all filter operator if any of the child filter 
operator matches all records
             return new MatchAllFilterOperator(_numDocs);
@@ -156,7 +166,7 @@ public class FilterPlanNode implements PlanNode {
             childFilterOperators.add(childFilterOperator);
           }
         }
-        return FilterOperatorUtils.getOrFilterOperator(childFilterOperators, 
_numDocs, debugOptions);
+        return FilterOperatorUtils.getOrFilterOperator(childFilterOperators, 
_numDocs, _queryContext.getDebugOptions());
       case PREDICATE:
         Predicate predicate = filter.getPredicate();
         ExpressionContext lhs = predicate.getLhs();
@@ -170,6 +180,10 @@ public class FilterPlanNode implements PlanNode {
         } else {
           String column = lhs.getIdentifier();
           DataSource dataSource = _indexSegment.getDataSource(column);
+          PredicateEvaluator predicateEvaluator = 
_predicateEvaluatorMap.get(predicate);
+          if (predicateEvaluator != null) {
+            return 
FilterOperatorUtils.getLeafFilterOperator(predicateEvaluator, dataSource, 
_numDocs);
+          }
           switch (predicate.getType()) {
             case TEXT_MATCH:
               return new TextMatchFilterOperator(dataSource.getTextIndex(), 
((TextMatchPredicate) predicate).getValue(),
@@ -183,19 +197,21 @@ public class FilterPlanNode implements PlanNode {
               //
               // Consuming segments: When FST is enabled, use 
AutomatonBasedEvaluator so that regexp matching logic is
               // similar to that of FSTBasedEvaluator, else use regular flow 
of getting predicate evaluator.
-              PredicateEvaluator evaluator;
               if (dataSource.getFSTIndex() != null) {
-                evaluator = 
FSTBasedRegexpPredicateEvaluatorFactory.newFSTBasedEvaluator(dataSource.getFSTIndex(),
-                    dataSource.getDictionary(), ((RegexpLikePredicate) 
predicate).getValue());
+                predicateEvaluator =
+                    
FSTBasedRegexpPredicateEvaluatorFactory.newFSTBasedEvaluator(dataSource.getFSTIndex(),
+                        dataSource.getDictionary(), ((RegexpLikePredicate) 
predicate).getValue());
               } else if (dataSource instanceof MutableDataSource && 
((MutableDataSource) dataSource).isFSTEnabled()) {
-                evaluator =
+                predicateEvaluator =
                     
FSTBasedRegexpPredicateEvaluatorFactory.newAutomatonBasedEvaluator(dataSource.getDictionary(),
                         ((RegexpLikePredicate) predicate).getValue());
               } else {
-                evaluator = 
PredicateEvaluatorProvider.getPredicateEvaluator(predicate, 
dataSource.getDictionary(),
-                    dataSource.getDataSourceMetadata().getDataType());
+                predicateEvaluator =
+                    
PredicateEvaluatorProvider.getPredicateEvaluator(predicate, 
dataSource.getDictionary(),
+                        dataSource.getDataSourceMetadata().getDataType());
               }
-              return FilterOperatorUtils.getLeafFilterOperator(evaluator, 
dataSource, _numDocs);
+              _predicateEvaluatorMap.put(predicate, predicateEvaluator);
+              return 
FilterOperatorUtils.getLeafFilterOperator(predicateEvaluator, dataSource, 
_numDocs);
             case JSON_MATCH:
               JsonIndexReader jsonIndex = dataSource.getJsonIndex();
               Preconditions.checkState(jsonIndex != null, "Cannot apply 
JSON_MATCH on column: %s without json index",
@@ -216,9 +232,10 @@ public class FilterPlanNode implements PlanNode {
                 return new MatchAllFilterOperator(_numDocs);
               }
             default:
-              PredicateEvaluator predicateEvaluator =
+              predicateEvaluator =
                   PredicateEvaluatorProvider.getPredicateEvaluator(predicate, 
dataSource.getDictionary(),
                       dataSource.getDataSourceMetadata().getDataType());
+              _predicateEvaluatorMap.put(predicate, predicateEvaluator);
               return 
FilterOperatorUtils.getLeafFilterOperator(predicateEvaluator, dataSource, 
_numDocs);
           }
         }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/ProjectionPlanNode.java 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/ProjectionPlanNode.java
index 27db4bf..5f286b4 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/ProjectionPlanNode.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/ProjectionPlanNode.java
@@ -21,8 +21,10 @@ package org.apache.pinot.core.plan;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
+import javax.annotation.Nullable;
 import org.apache.pinot.core.operator.DocIdSetOperator;
 import org.apache.pinot.core.operator.ProjectionOperator;
+import org.apache.pinot.core.operator.filter.BaseFilterOperator;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.datasource.DataSource;
@@ -37,13 +39,15 @@ public class ProjectionPlanNode implements PlanNode {
   private final QueryContext _queryContext;
   private final Set<String> _projectionColumns;
   private final int _maxDocsPerCall;
+  private final BaseFilterOperator _filterOperator;
 
   public ProjectionPlanNode(IndexSegment indexSegment, QueryContext 
queryContext, Set<String> projectionColumns,
-      int maxDocsPerCall) {
+      int maxDocsPerCall, @Nullable BaseFilterOperator filterOperator) {
     _indexSegment = indexSegment;
     _queryContext = queryContext;
     _projectionColumns = projectionColumns;
     _maxDocsPerCall = maxDocsPerCall;
+    _filterOperator = filterOperator;
   }
 
   @Override
@@ -54,7 +58,8 @@ public class ProjectionPlanNode implements PlanNode {
     }
     // NOTE: Skip creating DocIdSetOperator when maxDocsPerCall is 0 (for 
selection query with LIMIT 0)
     DocIdSetOperator docIdSetOperator =
-        _maxDocsPerCall > 0 ? new DocIdSetPlanNode(_indexSegment, 
_queryContext, _maxDocsPerCall).run() : null;
+        _maxDocsPerCall > 0 ? new DocIdSetPlanNode(_indexSegment, 
_queryContext, _maxDocsPerCall, _filterOperator).run()
+            : null;
     return new ProjectionOperator(dataSourceMap, docIdSetOperator);
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/TransformPlanNode.java 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/TransformPlanNode.java
index 6a60adb..f6f750c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/TransformPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/TransformPlanNode.java
@@ -21,8 +21,10 @@ package org.apache.pinot.core.plan;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Set;
+import javax.annotation.Nullable;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.core.operator.ProjectionOperator;
+import org.apache.pinot.core.operator.filter.BaseFilterOperator;
 import org.apache.pinot.core.operator.transform.PassThroughTransformOperator;
 import org.apache.pinot.core.operator.transform.TransformOperator;
 import org.apache.pinot.core.query.request.context.QueryContext;
@@ -37,13 +39,20 @@ public class TransformPlanNode implements PlanNode {
   private final QueryContext _queryContext;
   private final Collection<ExpressionContext> _expressions;
   private final int _maxDocsPerCall;
+  private final BaseFilterOperator _filterOperator;
 
   public TransformPlanNode(IndexSegment indexSegment, QueryContext 
queryContext,
       Collection<ExpressionContext> expressions, int maxDocsPerCall) {
+    this(indexSegment, queryContext, expressions, maxDocsPerCall, null);
+  }
+
+  public TransformPlanNode(IndexSegment indexSegment, QueryContext 
queryContext,
+      Collection<ExpressionContext> expressions, int maxDocsPerCall, @Nullable 
BaseFilterOperator filterOperator) {
     _indexSegment = indexSegment;
     _queryContext = queryContext;
     _expressions = expressions;
     _maxDocsPerCall = maxDocsPerCall;
+    _filterOperator = filterOperator;
   }
 
   @Override
@@ -57,7 +66,7 @@ public class TransformPlanNode implements PlanNode {
       }
     }
     ProjectionOperator projectionOperator =
-        new ProjectionPlanNode(_indexSegment, _queryContext, 
projectionColumns, _maxDocsPerCall).run();
+        new ProjectionPlanNode(_indexSegment, _queryContext, 
projectionColumns, _maxDocsPerCall, _filterOperator).run();
     if (hasNonIdentifierExpression) {
       return new TransformOperator(projectionOperator, _expressions);
     } else {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/startree/StarTreeUtils.java 
b/pinot-core/src/main/java/org/apache/pinot/core/startree/StarTreeUtils.java
index 0dee1e4..049da07 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/startree/StarTreeUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/startree/StarTreeUtils.java
@@ -33,7 +33,6 @@ import 
org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.request.context.FilterContext;
 import org.apache.pinot.common.request.context.predicate.Predicate;
 import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
-import 
org.apache.pinot.core.operator.filter.predicate.PredicateEvaluatorProvider;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
 import 
org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
 import org.apache.pinot.core.query.request.context.QueryContext;
@@ -90,10 +89,12 @@ public class StarTreeUtils {
    * (d1 > 50 AND (d2 > 10 OR d2 < 35)).
    * This method represents a list of CompositePredicates per dimension. For 
each dimension, all CompositePredicates in
    * the list are implicitly ANDed together. Any OR predicates are nested 
within a CompositePredicate.
+   *
+   * A map from predicates to their evaluators is passed in to accelerate the 
computation.
    */
   @Nullable
   public static Map<String, List<CompositePredicateEvaluator>> 
extractPredicateEvaluatorsMap(IndexSegment indexSegment,
-      @Nullable FilterContext filter) {
+      @Nullable FilterContext filter, Map<Predicate, PredicateEvaluator> 
predicateEvaluatorMap) {
     if (filter == null) {
       return Collections.emptyMap();
     }
@@ -108,7 +109,8 @@ public class StarTreeUtils {
           queue.addAll(filterNode.getChildren());
           break;
         case OR:
-          Pair<String, List<PredicateEvaluator>> pair = 
isOrClauseValidForStarTree(indexSegment, filterNode);
+          Pair<String, List<PredicateEvaluator>> pair =
+              isOrClauseValidForStarTree(indexSegment, filterNode, 
predicateEvaluatorMap);
           if (pair == null) {
             return null;
           }
@@ -121,8 +123,9 @@ public class StarTreeUtils {
           break;
         case PREDICATE:
           Predicate predicate = filterNode.getPredicate();
-          PredicateEvaluator predicateEvaluator = 
getPredicateEvaluatorForPredicate(indexSegment, predicate);
+          PredicateEvaluator predicateEvaluator = 
getPredicateEvaluator(indexSegment, predicate, predicateEvaluatorMap);
           if (predicateEvaluator == null) {
+            // The predicate cannot be solved with star-tree
             return null;
           }
           if (!predicateEvaluator.isAlwaysTrue()) {
@@ -181,7 +184,7 @@ public class StarTreeUtils {
    */
   @Nullable
   private static Pair<String, List<PredicateEvaluator>> 
isOrClauseValidForStarTree(IndexSegment indexSegment,
-      FilterContext filter) {
+      FilterContext filter, Map<Predicate, PredicateEvaluator> 
predicateEvaluatorMap) {
     assert filter.getType() == FilterContext.Type.OR;
 
     List<Predicate> predicates = new ArrayList<>();
@@ -190,7 +193,7 @@ public class StarTreeUtils {
     String identifier = null;
     List<PredicateEvaluator> predicateEvaluators = new ArrayList<>();
     for (Predicate predicate : predicates) {
-      PredicateEvaluator predicateEvaluator = 
getPredicateEvaluatorForPredicate(indexSegment, predicate);
+      PredicateEvaluator predicateEvaluator = 
getPredicateEvaluator(indexSegment, predicate, predicateEvaluatorMap);
       if (predicateEvaluator == null) {
         // The predicate cannot be solved with star-tree
         return null;
@@ -246,7 +249,8 @@ public class StarTreeUtils {
    * star-tree.
    */
   @Nullable
-  private static PredicateEvaluator 
getPredicateEvaluatorForPredicate(IndexSegment indexSegment, Predicate 
predicate) {
+  private static PredicateEvaluator getPredicateEvaluator(IndexSegment 
indexSegment, Predicate predicate,
+      Map<Predicate, PredicateEvaluator> predicateEvaluatorMap) {
     ExpressionContext lhs = predicate.getLhs();
     if (lhs.getType() != ExpressionContext.Type.IDENTIFIER) {
       // Star-tree does not support non-identifier expression
@@ -271,7 +275,6 @@ public class StarTreeUtils {
       default:
         break;
     }
-    return PredicateEvaluatorProvider
-        .getPredicateEvaluator(predicate, dictionary, 
dataSource.getDataSourceMetadata().getDataType());
+    return predicateEvaluatorMap.get(predicate);
   }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
index d9ad71a..2b5b401 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
@@ -151,54 +151,63 @@ public class 
MetadataAndDictionaryAggregationPlanMakerTest {
   @DataProvider(name = "testPlanMakerDataProvider")
   public Object[][] testPlanMakerDataProvider() {
     List<Object[]> entries = new ArrayList<>();
+    // Selection
     entries.add(new Object[]{
-        "select * from testTable", /*selection query*/
-        SelectionOnlyOperator.class, SelectionOnlyOperator.class
+        "select * from testTable", SelectionOnlyOperator.class, 
SelectionOnlyOperator.class
     });
+    // Selection
     entries.add(new Object[]{
-        "select column1,column5 from testTable", /*selection query*/
-        SelectionOnlyOperator.class, SelectionOnlyOperator.class
+        "select column1,column5 from testTable", SelectionOnlyOperator.class, 
SelectionOnlyOperator.class
     });
+    // Selection with filter
     entries.add(new Object[]{
-        "select * from testTable where daysSinceEpoch > 100", /*selection 
query with filters*/
-        SelectionOnlyOperator.class, SelectionOnlyOperator.class
+        "select * from testTable where daysSinceEpoch > 100", 
SelectionOnlyOperator.class, SelectionOnlyOperator.class
     });
+    // COUNT from metadata
     entries.add(new Object[]{
-        "select count(*) from testTable", /*count(*) from metadata*/
-        MetadataBasedAggregationOperator.class, AggregationOperator.class
+        "select count(*) from testTable", 
MetadataBasedAggregationOperator.class, AggregationOperator.class
     });
+    // COUNT from metadata with match all filter
     entries.add(new Object[]{
-        "select max(daysSinceEpoch),min(daysSinceEpoch) from testTable", /*min 
max from dictionary*/
-        DictionaryBasedAggregationOperator.class, AggregationOperator.class
+        "select count(*) from testTable where column1 > 10", 
MetadataBasedAggregationOperator.class,
+        AggregationOperator.class
     });
+    // MIN/MAX from dictionary
     entries.add(new Object[]{
-        "select minmaxrange(daysSinceEpoch) from testTable", /*min max from 
dictionary*/
-        DictionaryBasedAggregationOperator.class, AggregationOperator.class
+        "select max(daysSinceEpoch),min(daysSinceEpoch) from testTable", 
DictionaryBasedAggregationOperator.class,
+        AggregationOperator.class
     });
+    // MIN/MAX from dictionary with match all filter
     entries.add(new Object[]{
-        "select max(column17),min(column17) from testTable", /* minmax from 
dictionary*/
+        "select max(daysSinceEpoch),min(daysSinceEpoch) from testTable where 
column1 > 10",
         DictionaryBasedAggregationOperator.class, AggregationOperator.class
     });
+    // MINMAXRANGE from dictionary
     entries.add(new Object[]{
-        "select minmaxrange(column17) from testTable", /*no minmax metadata, 
go to dictionary*/
+        "select minmaxrange(daysSinceEpoch) from testTable", 
DictionaryBasedAggregationOperator.class,
+        AggregationOperator.class
+    });
+    // MINMAXRANGE from dictionary with match all filter
+    entries.add(new Object[]{
+        "select minmaxrange(daysSinceEpoch) from testTable where column1 > 10",
         DictionaryBasedAggregationOperator.class, AggregationOperator.class
     });
+    // Aggregation
     entries.add(new Object[]{
-        "select sum(column1) from testTable", /*aggregation query*/
-        AggregationOperator.class, AggregationOperator.class
+        "select sum(column1) from testTable", AggregationOperator.class, 
AggregationOperator.class
     });
+    // Aggregation group-by
     entries.add(new Object[]{
-        "select sum(column1) from testTable group by daysSinceEpoch", 
/*aggregation with group by query*/
-        AggregationGroupByOperator.class, AggregationGroupByOperator.class
+        "select sum(column1) from testTable group by daysSinceEpoch", 
AggregationGroupByOperator.class,
+        AggregationGroupByOperator.class
     });
+    // COUNT from metadata, MIN from dictionary
     entries.add(new Object[]{
-        "select count(*),min(column17) from testTable",
-        /*multiple aggregations query, one from metadata, one from dictionary*/
-        AggregationOperator.class, AggregationOperator.class
+        "select count(*),min(column17) from testTable", 
AggregationOperator.class, AggregationOperator.class
     });
+    // Aggregation group-by
     entries.add(new Object[]{
         "select count(*),min(daysSinceEpoch) from testTable group by 
daysSinceEpoch",
-        /*multiple aggregations with group by*/
         AggregationGroupByOperator.class, AggregationGroupByOperator.class
     });
 
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java
index 648934e..8cb2e7a 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java
@@ -31,7 +31,6 @@ import java.util.Random;
 import java.util.Set;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.request.context.ExpressionContext;
-import org.apache.pinot.common.request.context.FilterContext;
 import org.apache.pinot.core.common.BlockDocIdIterator;
 import org.apache.pinot.core.plan.FilterPlanNode;
 import org.apache.pinot.core.plan.PlanNode;
@@ -224,9 +223,11 @@ abstract class BaseStarTreeV2Test<R, A> {
     List<String> groupByColumns = new ArrayList<>(groupByColumnSet);
 
     // Filter
-    FilterContext filter = queryContext.getFilter();
+    FilterPlanNode filterPlanNode = new FilterPlanNode(_indexSegment, 
queryContext);
+    filterPlanNode.run();
     Map<String, List<CompositePredicateEvaluator>> predicateEvaluatorsMap =
-        StarTreeUtils.extractPredicateEvaluatorsMap(_indexSegment, filter);
+        StarTreeUtils.extractPredicateEvaluatorsMap(_indexSegment, 
queryContext.getFilter(),
+            filterPlanNode.getPredicateEvaluatorMap());
     assertNotNull(predicateEvaluatorsMap);
 
     // Extract values with star-tree
@@ -234,8 +235,8 @@ abstract class BaseStarTreeV2Test<R, A> {
         new StarTreeFilterPlanNode(_starTreeV2, predicateEvaluatorsMap, 
groupByColumnSet, null);
     List<ForwardIndexReader> starTreeAggregationColumnReaders = new 
ArrayList<>(numAggregations);
     for (AggregationFunctionColumnPair aggregationFunctionColumnPair : 
aggregationFunctionColumnPairs) {
-      starTreeAggregationColumnReaders
-          
.add(_starTreeV2.getDataSource(aggregationFunctionColumnPair.toColumnName()).getForwardIndex());
+      starTreeAggregationColumnReaders.add(
+          
_starTreeV2.getDataSource(aggregationFunctionColumnPair.toColumnName()).getForwardIndex());
     }
     List<ForwardIndexReader> starTreeGroupByColumnReaders = new 
ArrayList<>(numGroupByColumns);
     for (String groupByColumn : groupByColumns) {
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountQueriesTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountQueriesTest.java
index afa2181..3c045bc 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountQueriesTest.java
@@ -96,7 +96,7 @@ public class DistinctCountQueriesTest extends BaseQueriesTest 
{
 
   @Override
   protected String getFilter() {
-    // NOTE: Use a match all filter to switch between 
DictionaryBasedAggregationOperator and AggregationOperator
+    // NOTE: This is a match all filter
     return " WHERE intColumn >= 0";
   }
 
@@ -185,31 +185,23 @@ public class DistinctCountQueriesTest extends 
BaseQueriesTest {
 
   @Test
   public void testAggregationOnly() {
+    // Dictionary based
     String query =
         "SELECT DISTINCTCOUNT(intColumn), DISTINCTCOUNT(longColumn), 
DISTINCTCOUNT(floatColumn), DISTINCTCOUNT"
             + "(doubleColumn), DISTINCTCOUNT(stringColumn), 
DISTINCTCOUNT(bytesColumn) FROM testTable";
 
     // Inner segment
-    Operator operator = getOperatorForPqlQuery(query);
-    assertTrue(operator instanceof DictionaryBasedAggregationOperator);
-    IntermediateResultsBlock resultsBlock = 
((DictionaryBasedAggregationOperator) operator).nextBlock();
-    QueriesTestUtils
-        
.testInnerSegmentExecutionStatistics(operator.getExecutionStatistics(), 
NUM_RECORDS, 0, 0, NUM_RECORDS);
-    List<Object> aggregationResult = resultsBlock.getAggregationResult();
-
-    operator = getOperatorForPqlQueryWithFilter(query);
-    assertTrue(operator instanceof AggregationOperator);
-    IntermediateResultsBlock resultsBlockWithFilter = ((AggregationOperator) 
operator).nextBlock();
-    QueriesTestUtils
-        
.testInnerSegmentExecutionStatistics(operator.getExecutionStatistics(), 
NUM_RECORDS, 0, 6 * NUM_RECORDS,
-            NUM_RECORDS);
-    List<Object> aggregationResultWithFilter = 
resultsBlockWithFilter.getAggregationResult();
-
-    assertNotNull(aggregationResult);
-    assertNotNull(aggregationResultWithFilter);
-    assertEquals(aggregationResult, aggregationResultWithFilter);
-    for (int i = 0; i < 6; i++) {
-      assertEquals(((Set) aggregationResult.get(i)).size(), _values.size());
+    for (Object operator : Arrays.asList(getOperatorForSqlQuery(query), 
getOperatorForSqlQueryWithFilter(query))) {
+      assertTrue(operator instanceof DictionaryBasedAggregationOperator);
+      IntermediateResultsBlock resultsBlock = 
((DictionaryBasedAggregationOperator) operator).nextBlock();
+      QueriesTestUtils.testInnerSegmentExecutionStatistics(((Operator) 
operator).getExecutionStatistics(), NUM_RECORDS,
+          0, 0, NUM_RECORDS);
+      List<Object> aggregationResult = resultsBlock.getAggregationResult();
+      assertNotNull(aggregationResult);
+      assertEquals(aggregationResult.size(), 6);
+      for (int i = 0; i < 6; i++) {
+        assertEquals(((Set) aggregationResult.get(i)).size(), _values.size());
+      }
     }
 
     // Inter segments
@@ -217,13 +209,39 @@ public class DistinctCountQueriesTest extends 
BaseQueriesTest {
     for (int i = 0; i < 6; i++) {
       expectedResults[i] = Integer.toString(_values.size());
     }
+    for (BrokerResponseNative brokerResponse : 
Arrays.asList(getBrokerResponseForPqlQuery(query),
+        getBrokerResponseForPqlQueryWithFilter(query))) {
+      QueriesTestUtils.testInterSegmentAggregationResult(brokerResponse, 4 * 
NUM_RECORDS, 0, 0, 4 * NUM_RECORDS,
+          expectedResults);
+    }
+
+    // Regular aggregation
+    query = query + " WHERE intColumn >= 500";
+
+    // Inner segment
+    int expectedResult = 0;
+    for (Integer value : _values) {
+      if (value >= 500) {
+        expectedResult++;
+      }
+    }
+    Operator operator = getOperatorForSqlQuery(query);
+    assertTrue(operator instanceof AggregationOperator);
+    List<Object> aggregationResult = ((AggregationOperator) 
operator).nextBlock().getAggregationResult();
+    assertNotNull(aggregationResult);
+    assertEquals(aggregationResult.size(), 6);
+    for (int i = 0; i < 6; i++) {
+      assertEquals(((Set) aggregationResult.get(i)).size(), expectedResult);
+    }
+
+    // Inter segment
     BrokerResponseNative brokerResponse = getBrokerResponseForPqlQuery(query);
-    QueriesTestUtils
-        .testInterSegmentAggregationResult(brokerResponse, 4 * NUM_RECORDS, 0, 
0, 4 * NUM_RECORDS, expectedResults);
-    brokerResponse = getBrokerResponseForPqlQueryWithFilter(query);
-    QueriesTestUtils
-        .testInterSegmentAggregationResult(brokerResponse, 4 * NUM_RECORDS, 0, 
4 * 6 * NUM_RECORDS, 4 * NUM_RECORDS,
-            expectedResults);
+    List<AggregationResult> aggregationResults = 
brokerResponse.getAggregationResults();
+    assertNotNull(aggregationResults);
+    assertEquals(aggregationResults.size(), 6);
+    for (int i = 0; i < 6; i++) {
+      assertEquals(aggregationResults.get(i).getValue(), 
Integer.toString(expectedResult));
+    }
   }
 
   @Test
@@ -237,9 +255,8 @@ public class DistinctCountQueriesTest extends 
BaseQueriesTest {
     Operator operator = getOperatorForPqlQuery(query);
     assertTrue(operator instanceof AggregationGroupByOperator);
     IntermediateResultsBlock resultsBlock = ((AggregationGroupByOperator) 
operator).nextBlock();
-    QueriesTestUtils
-        
.testInnerSegmentExecutionStatistics(operator.getExecutionStatistics(), 
NUM_RECORDS, 0, 6 * NUM_RECORDS,
-            NUM_RECORDS);
+    
QueriesTestUtils.testInnerSegmentExecutionStatistics(operator.getExecutionStatistics(),
 NUM_RECORDS, 0,
+        6 * NUM_RECORDS, NUM_RECORDS);
     AggregationGroupByResult aggregationGroupByResult = 
resultsBlock.getAggregationGroupByResult();
     assertNotNull(aggregationGroupByResult);
     int numGroups = 0;
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/SegmentPartitionedDistinctCountQueriesTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/SegmentPartitionedDistinctCountQueriesTest.java
index 3bc4148..0520086 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/SegmentPartitionedDistinctCountQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/SegmentPartitionedDistinctCountQueriesTest.java
@@ -92,13 +92,12 @@ public class SegmentPartitionedDistinctCountQueriesTest 
extends BaseQueriesTest
       new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
 
   private Set<Integer> _values;
-  private long _expectedResult;
   private IndexSegment _indexSegment;
   private List<IndexSegment> _indexSegments;
 
   @Override
   protected String getFilter() {
-    // NOTE: Use a match all filter to switch between 
DictionaryBasedAggregationOperator and AggregationOperator
+    // NOTE: This is a match all filter
     return " WHERE intColumn >= 0";
   }
 
@@ -135,7 +134,6 @@ public class SegmentPartitionedDistinctCountQueriesTest 
extends BaseQueriesTest
       record.putValue(BYTES_COLUMN, bytesValue);
       records.add(record);
     }
-    _expectedResult = _values.size();
 
     SegmentGeneratorConfig segmentGeneratorConfig = new 
SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA);
     segmentGeneratorConfig.setTableName(RAW_TABLE_NAME);
@@ -153,64 +151,79 @@ public class SegmentPartitionedDistinctCountQueriesTest 
extends BaseQueriesTest
 
   @Test
   public void testAggregationOnly() {
-    String query =
-        "SELECT SEGMENTPARTITIONEDDISTINCTCOUNT(intColumn), 
SEGMENTPARTITIONEDDISTINCTCOUNT(longColumn), "
-            + "SEGMENTPARTITIONEDDISTINCTCOUNT(floatColumn), 
SEGMENTPARTITIONEDDISTINCTCOUNT(doubleColumn), "
-            + "SEGMENTPARTITIONEDDISTINCTCOUNT(stringColumn), 
SEGMENTPARTITIONEDDISTINCTCOUNT(bytesColumn) FROM "
-            + "testTable";
+    // Dictionary based
+    String query = "SELECT SEGMENTPARTITIONEDDISTINCTCOUNT(intColumn), 
SEGMENTPARTITIONEDDISTINCTCOUNT(longColumn), "
+        + "SEGMENTPARTITIONEDDISTINCTCOUNT(floatColumn), 
SEGMENTPARTITIONEDDISTINCTCOUNT(doubleColumn), "
+        + "SEGMENTPARTITIONEDDISTINCTCOUNT(stringColumn), 
SEGMENTPARTITIONEDDISTINCTCOUNT(bytesColumn) FROM "
+        + "testTable";
 
     // Inner segment
-    Operator operator = getOperatorForPqlQuery(query);
-    assertTrue(operator instanceof DictionaryBasedAggregationOperator);
-    IntermediateResultsBlock resultsBlock = 
((DictionaryBasedAggregationOperator) operator).nextBlock();
-    QueriesTestUtils
-        
.testInnerSegmentExecutionStatistics(operator.getExecutionStatistics(), 
NUM_RECORDS, 0, 0, NUM_RECORDS);
-    List<Object> aggregationResult = resultsBlock.getAggregationResult();
+    for (Object operator : Arrays.asList(getOperatorForSqlQuery(query), 
getOperatorForSqlQueryWithFilter(query))) {
+      assertTrue(operator instanceof DictionaryBasedAggregationOperator);
+      IntermediateResultsBlock resultsBlock = 
((DictionaryBasedAggregationOperator) operator).nextBlock();
+      QueriesTestUtils.testInnerSegmentExecutionStatistics(((Operator) 
operator).getExecutionStatistics(), NUM_RECORDS,
+          0, 0, NUM_RECORDS);
+      List<Object> aggregationResult = resultsBlock.getAggregationResult();
+      assertNotNull(aggregationResult);
+      assertEquals(aggregationResult.size(), 6);
+      for (int i = 0; i < 6; i++) {
+        assertEquals(((Long) aggregationResult.get(i)).intValue(), 
_values.size());
+      }
+    }
 
-    operator = getOperatorForPqlQueryWithFilter(query);
-    assertTrue(operator instanceof AggregationOperator);
-    IntermediateResultsBlock resultsBlockWithFilter = ((AggregationOperator) 
operator).nextBlock();
-    QueriesTestUtils
-        
.testInnerSegmentExecutionStatistics(operator.getExecutionStatistics(), 
NUM_RECORDS, 0, 6 * NUM_RECORDS,
-            NUM_RECORDS);
-    List<Object> aggregationResultWithFilter = 
resultsBlockWithFilter.getAggregationResult();
+    // Inter segments (expect 4 * inner segment result)
+    String[] expectedResults = new String[6];
+    for (int i = 0; i < 6; i++) {
+      expectedResults[i] = Integer.toString(4 * _values.size());
+    }
+    for (BrokerResponseNative brokerResponse : 
Arrays.asList(getBrokerResponseForPqlQuery(query),
+        getBrokerResponseForPqlQueryWithFilter(query))) {
+      QueriesTestUtils.testInterSegmentAggregationResult(brokerResponse, 4 * 
NUM_RECORDS, 0, 0, 4 * NUM_RECORDS,
+          expectedResults);
+    }
 
+    // Regular aggregation
+    query = query + " WHERE intColumn >= 500";
+
+    // Inner segment
+    int expectedResult = 0;
+    for (Integer value : _values) {
+      if (value >= 500) {
+        expectedResult++;
+      }
+    }
+    Operator operator = getOperatorForSqlQuery(query);
+    assertTrue(operator instanceof AggregationOperator);
+    List<Object> aggregationResult = ((AggregationOperator) 
operator).nextBlock().getAggregationResult();
     assertNotNull(aggregationResult);
-    assertNotNull(aggregationResultWithFilter);
-    assertEquals(aggregationResult, aggregationResultWithFilter);
+    assertEquals(aggregationResult.size(), 6);
     for (int i = 0; i < 6; i++) {
-      assertEquals((long) aggregationResult.get(i), _expectedResult);
+      assertEquals(((Long) aggregationResult.get(i)).intValue(), 
expectedResult);
     }
 
-    // Inter segments (expect 4 * inner segment result)
-    String[] expectedResults = new String[6];
+    // Inter segment (expect 4 * inner segment result)
+    BrokerResponseNative brokerResponse = getBrokerResponseForPqlQuery(query);
+    List<AggregationResult> aggregationResults = 
brokerResponse.getAggregationResults();
+    assertNotNull(aggregationResults);
+    assertEquals(aggregationResults.size(), 6);
     for (int i = 0; i < 6; i++) {
-      expectedResults[i] = Long.toString(4 * _expectedResult);
+      assertEquals(aggregationResults.get(i).getValue(), Integer.toString(4 * 
expectedResult));
     }
-    BrokerResponseNative brokerResponse = getBrokerResponseForPqlQuery(query);
-    QueriesTestUtils
-        .testInterSegmentAggregationResult(brokerResponse, 4 * NUM_RECORDS, 0, 
0, 4 * NUM_RECORDS, expectedResults);
-    brokerResponse = getBrokerResponseForPqlQueryWithFilter(query);
-    QueriesTestUtils
-        .testInterSegmentAggregationResult(brokerResponse, 4 * NUM_RECORDS, 0, 
4 * 6 * NUM_RECORDS, 4 * NUM_RECORDS,
-            expectedResults);
   }
 
   @Test
   public void testAggregationGroupBy() {
-    String query =
-        "SELECT SEGMENTPARTITIONEDDISTINCTCOUNT(intColumn), 
SEGMENTPARTITIONEDDISTINCTCOUNT(longColumn), "
-            + "SEGMENTPARTITIONEDDISTINCTCOUNT(floatColumn), 
SEGMENTPARTITIONEDDISTINCTCOUNT(doubleColumn), "
-            + "SEGMENTPARTITIONEDDISTINCTCOUNT(stringColumn), 
SEGMENTPARTITIONEDDISTINCTCOUNT(bytesColumn) FROM "
-            + "testTable GROUP BY intColumn";
+    String query = "SELECT SEGMENTPARTITIONEDDISTINCTCOUNT(intColumn), 
SEGMENTPARTITIONEDDISTINCTCOUNT(longColumn), "
+        + "SEGMENTPARTITIONEDDISTINCTCOUNT(floatColumn), 
SEGMENTPARTITIONEDDISTINCTCOUNT(doubleColumn), "
+        + "SEGMENTPARTITIONEDDISTINCTCOUNT(stringColumn), 
SEGMENTPARTITIONEDDISTINCTCOUNT(bytesColumn) FROM "
+        + "testTable GROUP BY intColumn";
 
     // Inner segment
     Operator operator = getOperatorForPqlQuery(query);
     assertTrue(operator instanceof AggregationGroupByOperator);
     IntermediateResultsBlock resultsBlock = ((AggregationGroupByOperator) 
operator).nextBlock();
-    QueriesTestUtils
-        
.testInnerSegmentExecutionStatistics(operator.getExecutionStatistics(), 
NUM_RECORDS, 0, 6 * NUM_RECORDS,
-            NUM_RECORDS);
+    
QueriesTestUtils.testInnerSegmentExecutionStatistics(operator.getExecutionStatistics(),
 NUM_RECORDS, 0,
+        6 * NUM_RECORDS, NUM_RECORDS);
     AggregationGroupByResult aggregationGroupByResult = 
resultsBlock.getAggregationGroupByResult();
     assertNotNull(aggregationGroupByResult);
     int numGroups = 0;

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to