This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 76b219b6d6 Compute all groups for group by queries with only filtered 
aggregations (#14211)
76b219b6d6 is described below

commit 76b219b6d628d0dbb8904ff86841a01378f30abd
Author: Yash Mayya <[email protected]>
AuthorDate: Fri Oct 18 05:28:03 2024 +0530

    Compute all groups for group by queries with only filtered aggregations 
(#14211)
---
 .../common/utils/config/QueryOptionsUtils.java     |  4 ++
 .../operator/query/FilteredGroupByOperator.java    | 41 ++++++++------------
 .../function/AggregationFunctionUtils.java         | 10 ++++-
 .../pinot/queries/FilteredAggregationsTest.java    |  7 ++--
 .../tests/MultiStageEngineIntegrationTest.java     | 45 +++++++++++++++++++---
 .../tests/OfflineClusterIntegrationTest.java       | 40 +++++++++++++++++++
 .../operator/MultistageGroupByExecutor.java        | 11 ++++++
 .../apache/pinot/spi/utils/CommonConstants.java    |  9 +++++
 8 files changed, 133 insertions(+), 34 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
index 12ad735865..0ec3374aea 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
@@ -234,6 +234,10 @@ public class QueryOptionsUtils {
     return 
Boolean.parseBoolean(queryOptions.get(QueryOptionKey.SERVER_RETURN_FINAL_RESULT_KEY_UNPARTITIONED));
   }
 
+  public static boolean isFilteredAggregationsSkipEmptyGroups(Map<String, 
String> queryOptions) {
+    return 
Boolean.parseBoolean(queryOptions.get(QueryOptionKey.FILTERED_AGGREGATIONS_SKIP_EMPTY_GROUPS));
+  }
+
   @Nullable
   public static String getOrderByAlgorithm(Map<String, String> queryOptions) {
     return queryOptions.get(QueryOptionKey.ORDER_BY_ALGORITHM);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java
index 97ce0e4f6b..5c82827cb4 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java
@@ -124,33 +124,24 @@ public class FilteredGroupByOperator extends 
BaseOperator<GroupByResultsBlock> {
 
       // Perform aggregation group-by on all the blocks
       DefaultGroupByExecutor groupByExecutor;
-      if (groupKeyGenerator == null) {
-        // The group key generator should be shared across all 
AggregationFunctions so that agg results can be
-        // aligned. Given that filtered aggregations are stored as an iterable 
of iterables so that all filtered aggs
-        // with the same filter can share transform blocks, rather than a 
singular flat iterable in the case where
-        // aggs are all non-filtered, sharing a GroupKeyGenerator across all 
aggs cannot be accomplished by allowing
-        // the GroupByExecutor to have sole ownership of the 
GroupKeyGenerator. Therefore, we allow constructing a
-        // GroupByExecutor with a pre-existing GroupKeyGenerator so that the 
GroupKeyGenerator can be shared across
-        // loop iterations i.e. across all aggs.
-        if (aggregationInfo.isUseStarTree()) {
-          groupByExecutor =
-              new StarTreeGroupByExecutor(_queryContext, aggregationFunctions, 
_groupByExpressions, projectOperator);
-        } else {
-          groupByExecutor =
-              new DefaultGroupByExecutor(_queryContext, aggregationFunctions, 
_groupByExpressions, projectOperator);
-        }
-        groupKeyGenerator = groupByExecutor.getGroupKeyGenerator();
+
+      if (aggregationInfo.isUseStarTree()) {
+        groupByExecutor =
+            new StarTreeGroupByExecutor(_queryContext, aggregationFunctions, 
_groupByExpressions, projectOperator,
+                groupKeyGenerator);
       } else {
-        if (aggregationInfo.isUseStarTree()) {
-          groupByExecutor =
-              new StarTreeGroupByExecutor(_queryContext, aggregationFunctions, 
_groupByExpressions, projectOperator,
-                  groupKeyGenerator);
-        } else {
-          groupByExecutor =
-              new DefaultGroupByExecutor(_queryContext, aggregationFunctions, 
_groupByExpressions, projectOperator,
-                  groupKeyGenerator);
-        }
+        groupByExecutor =
+            new DefaultGroupByExecutor(_queryContext, aggregationFunctions, 
_groupByExpressions, projectOperator,
+                groupKeyGenerator);
       }
+      // The group key generator should be shared across all 
AggregationFunctions so that agg results can be
+      // aligned. Given that filtered aggregations are stored as an iterable 
of iterables so that all filtered aggs
+      // with the same filter can share transform blocks, rather than a 
singular flat iterable in the case where
+      // aggs are all non-filtered, sharing a GroupKeyGenerator across all 
aggs cannot be accomplished by allowing
+      // the GroupByExecutor to have sole ownership of the GroupKeyGenerator. 
Therefore, we allow constructing a
+      // GroupByExecutor with a pre-existing GroupKeyGenerator so that the 
GroupKeyGenerator can be shared across
+      // loop iterations i.e. across all aggs.
+      groupKeyGenerator = groupByExecutor.getGroupKeyGenerator();
 
       int numDocsScanned = 0;
       ValueBlock valueBlock;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
index 95020d0a34..962fc74259 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
@@ -39,6 +39,7 @@ import 
org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.request.context.FilterContext;
 import org.apache.pinot.common.request.context.predicate.Predicate;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
 import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
 import org.apache.pinot.core.operator.BaseProjectOperator;
@@ -384,7 +385,14 @@ public class AggregationFunctionUtils {
       }
     }
 
-    if (!nonFilteredFunctions.isEmpty()) {
+    if (!nonFilteredFunctions.isEmpty() || 
((queryContext.getGroupByExpressions() != null)
+        && 
!QueryOptionsUtils.isFilteredAggregationsSkipEmptyGroups(queryContext.getQueryOptions())))
 {
+      // If there are no non-filtered aggregation functions for a group by 
query, we still add a new AggregationInfo
+      // with an empty AggregationFunction array and the main query filter so 
that the GroupByExecutor will compute all
+      // the groups (from the result of applying the main query filter) but no 
unnecessary additional aggregation will
+      // be done since the AggregationFunction array is empty. However, if the 
query option to skip empty groups is
+      // enabled, we don't do this in order to avoid unnecessary computation 
of empty groups (which can be very
+      // expensive if the main filter has high selectivity).
       AggregationFunction[] aggregationFunctions = 
nonFilteredFunctions.toArray(new AggregationFunction[0]);
       aggregationInfos.add(
           buildAggregationInfo(segmentContext, queryContext, 
aggregationFunctions, mainFilter, mainFilterOperator,
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/FilteredAggregationsTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/FilteredAggregationsTest.java
index 02a9a05395..e253dce452 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/FilteredAggregationsTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/FilteredAggregationsTest.java
@@ -41,6 +41,7 @@ import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -394,9 +395,9 @@ public class FilteredAggregationsTest extends 
BaseQueriesTest {
 
   @Test
   public void testGroupByMultipleColumns() {
-    String filterQuery =
-        "SELECT SUM(INT_COL) FILTER(WHERE INT_COL > 25000) testSum FROM 
MyTable GROUP BY BOOLEAN_COL, STRING_COL "
-            + "ORDER BY BOOLEAN_COL, STRING_COL";
+    String filterQuery = "SET " + 
CommonConstants.Broker.Request.QueryOptionKey.FILTERED_AGGREGATIONS_SKIP_EMPTY_GROUPS
+        + "=true; SELECT SUM(INT_COL) FILTER(WHERE INT_COL > 25000) testSum 
FROM MyTable GROUP BY BOOLEAN_COL, "
+        + "STRING_COL ORDER BY BOOLEAN_COL, STRING_COL";
     String nonFilterQuery =
         "SELECT SUM(INT_COL) testSum FROM MyTable WHERE INT_COL > 25000 GROUP 
BY BOOLEAN_COL, STRING_COL "
             + "ORDER BY BOOLEAN_COL, STRING_COL";
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
index 48cd23ea7d..b450612d45 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
@@ -140,11 +140,6 @@ public class MultiStageEngineIntegrationTest extends 
BaseClusterIntegrationTestS
       throws IOException {
   }
 
-//  @Override
-//  protected boolean useMultiStageQueryEngine() {
-//    return true;
-//  }
-
   @BeforeMethod
   @Override
   public void resetMultiStage() {
@@ -1043,6 +1038,46 @@ public class MultiStageEngineIntegrationTest extends 
BaseClusterIntegrationTestS
     
assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(0).asInt(), 
15482);
   }
 
+  @Test
+  public void 
testFilteredAggregationWithNoValueMatchingAggregationFilterDefault()
+      throws Exception {
+    // Use a hint to ensure that the aggregation will not be pushed to the 
leaf stage, so that we can test the
+    // MultistageGroupByExecutor
+    String sqlQuery = "SELECT /*+ 
aggOptions(is_skip_leaf_stage_group_by='true') */"
+        + "AirlineID, COUNT(*) FILTER (WHERE Origin = 'garbage') FROM mytable 
WHERE AirlineID > 20000 GROUP BY "
+        + "AirlineID";
+    JsonNode result = postQuery(sqlQuery);
+    assertNoError(result);
+    // Ensure that result set is not empty
+    assertTrue(result.get("numRowsResultSet").asInt() > 0);
+
+    // Ensure that the count is 0 for all groups (because the aggregation 
filter does not match any rows)
+    JsonNode rows = result.get("resultTable").get("rows");
+    for (int i = 0; i < rows.size(); i++) {
+      assertEquals(rows.get(i).get(1).asInt(), 0);
+      // Ensure that the main filter was applied
+      assertTrue(rows.get(i).get(0).asInt() > 20000);
+    }
+  }
+
+  @Test
+  public void 
testFilteredAggregationWithNoValueMatchingAggregationFilterWithOption()
+      throws Exception {
+    // Use a hint to ensure that the aggregation will not be pushed to the 
leaf stage, so that we can test the
+    // MultistageGroupByExecutor
+    String sqlQuery = "SET " + 
CommonConstants.Broker.Request.QueryOptionKey.FILTERED_AGGREGATIONS_SKIP_EMPTY_GROUPS
+        + "=true; SELECT /*+ aggOptions(is_skip_leaf_stage_group_by='true') */"
+        + "AirlineID, COUNT(*) FILTER (WHERE Origin = 'garbage') FROM mytable 
WHERE AirlineID > 20000 GROUP BY "
+        + "AirlineID";
+
+    JsonNode result = postQuery(sqlQuery);
+    assertNoError(result);
+
+    // Result set will be empty since the aggregation filter does not match 
any rows, and we've set the query option to
+    // skip empty groups
+    assertEquals(result.get("numRowsResultSet").asInt(), 0);
+  }
+
   @Override
   protected String getTableName() {
     return _tableName;
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 25a75352f7..0fa4868179 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -3722,4 +3722,44 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     updateTableConfig(tableConfig);
     reloadAllSegments(TEST_UPDATED_RANGE_INDEX_QUERY, true, numTotalDocs);
   }
+
+  @Test(dataProvider = "useBothQueryEngines")
+  public void 
testFilteredAggregationWithNoValueMatchingAggregationFilterDefault(boolean 
useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+
+    String sqlQuery =
+        "SELECT AirlineID, COUNT(*) FILTER (WHERE Origin = 'garbage') FROM 
mytable WHERE AirlineID > 20000 GROUP BY "
+            + "AirlineID";
+
+    JsonNode result = postQuery(sqlQuery);
+    assertNoError(result);
+
+    // Ensure that result set is not empty since all groups should be computed 
by default
+    assertTrue(result.get("numRowsResultSet").asInt() > 0);
+
+    // Ensure that the count is 0 for all groups (because the aggregation 
filter does not match any rows)
+    JsonNode rows = result.get("resultTable").get("rows");
+    for (int i = 0; i < rows.size(); i++) {
+      assertEquals(rows.get(i).get(1).asInt(), 0);
+      // Ensure that the main filter was applied
+      assertTrue(rows.get(i).get(0).asInt() > 20000);
+    }
+  }
+
+  @Test(dataProvider = "useBothQueryEngines")
+  public void 
testFilteredAggregationWithNoValueMatchingAggregationFilterWithOption(boolean 
useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    String sqlQuery =
+        "SET " + 
CommonConstants.Broker.Request.QueryOptionKey.FILTERED_AGGREGATIONS_SKIP_EMPTY_GROUPS
 + "=true; "
+            + "SELECT AirlineID, COUNT(*) FILTER (WHERE Origin = 'garbage') 
FROM mytable WHERE AirlineID > 20000 "
+            + "GROUP BY AirlineID";
+    JsonNode result = postQuery(sqlQuery);
+    assertNoError(result);
+
+    // Result set will be empty since the aggregation filter does not match 
any rows, and we've set the option to skip
+    // empty groups
+    assertEquals(result.get("numRowsResultSet").asInt(), 0);
+  }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java
index 1f3f3a20fc..41501f6938 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java
@@ -58,6 +58,7 @@ public class MultistageGroupByExecutor {
   private final AggType _aggType;
   private final DataSchema _resultSchema;
   private final int _numGroupsLimit;
+  private final boolean _filteredAggregationsSkipEmptyGroups;
 
   // Group By Result holders for each mode
   private final GroupByResultHolder[] _aggregateResultHolders;
@@ -79,6 +80,10 @@ public class MultistageGroupByExecutor {
     int maxInitialResultHolderCapacity = 
getMaxInitialResultHolderCapacity(opChainMetadata, nodeHint);
     _numGroupsLimit = getNumGroupsLimit(opChainMetadata, nodeHint);
 
+    // By default, we compute all groups for SQL compliant results. However, 
we allow overriding this behavior via
+    // query option for improved performance.
+    _filteredAggregationsSkipEmptyGroups = 
QueryOptionsUtils.isFilteredAggregationsSkipEmptyGroups(opChainMetadata);
+
     int numFunctions = aggFunctions.length;
     if (!aggType.isInputIntermediateFormat()) {
       _aggregateResultHolders = new GroupByResultHolder[numFunctions];
@@ -241,6 +246,12 @@ public class MultistageGroupByExecutor {
           aggFunction.aggregateGroupBySV(numMatchedRows, filteredIntKeys, 
groupByResultHolder, blockValSetMap);
         }
       }
+      if (intKeys == null && !_filteredAggregationsSkipEmptyGroups) {
+        // _groupIdGenerator should still have all the groups even if there 
are only filtered aggregates for SQL
+        // compliant results. However, if the query option to skip empty 
groups is set, we avoid this step for
+        // improved performance.
+        generateGroupByKeys(block);
+      }
     }
   }
 
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 8ccc9dbad0..ff81f6bc4e 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -463,6 +463,15 @@ public class CommonConstants {
         // executed in an  Unbounded FCFS fashion. However, secondary 
workloads are executed in a constrainted FCFS
         // fashion with limited compute.
         public static final String IS_SECONDARY_WORKLOAD = 
"isSecondaryWorkload";
+
+        // For group by queries with only filtered aggregations (and no 
non-filtered aggregations), the default behavior
+        // is to compute all groups over the rows matching the main query 
filter. This ensures SQL compliant results,
+        // since empty groups are also expected to be returned in such 
queries. However, this could be quite inefficient
+        // if the main query does not have a filter (since a scan would be 
required to compute all groups). In case
+        // users are okay with skipping empty groups - i.e., only the groups 
matching at least one aggregation filter
+        // will be returned - this query option can be set. This is useful for 
performance, since indexes can be used
+        // for the aggregation filters and a full scan can be avoided.
+        public static final String FILTERED_AGGREGATIONS_SKIP_EMPTY_GROUPS = 
"filteredAggregationsSkipEmptyGroups";
       }
 
       public static class QueryOptionValue {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to