This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 76b219b6d6 Compute all groups for group by queries with only filtered
aggregations (#14211)
76b219b6d6 is described below
commit 76b219b6d628d0dbb8904ff86841a01378f30abd
Author: Yash Mayya <[email protected]>
AuthorDate: Fri Oct 18 05:28:03 2024 +0530
Compute all groups for group by queries with only filtered aggregations
(#14211)
---
.../common/utils/config/QueryOptionsUtils.java | 4 ++
.../operator/query/FilteredGroupByOperator.java | 41 ++++++++------------
.../function/AggregationFunctionUtils.java | 10 ++++-
.../pinot/queries/FilteredAggregationsTest.java | 7 ++--
.../tests/MultiStageEngineIntegrationTest.java | 45 +++++++++++++++++++---
.../tests/OfflineClusterIntegrationTest.java | 40 +++++++++++++++++++
.../operator/MultistageGroupByExecutor.java | 11 ++++++
.../apache/pinot/spi/utils/CommonConstants.java | 9 +++++
8 files changed, 133 insertions(+), 34 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
index 12ad735865..0ec3374aea 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
@@ -234,6 +234,10 @@ public class QueryOptionsUtils {
return
Boolean.parseBoolean(queryOptions.get(QueryOptionKey.SERVER_RETURN_FINAL_RESULT_KEY_UNPARTITIONED));
}
+ public static boolean isFilteredAggregationsSkipEmptyGroups(Map<String,
String> queryOptions) {
+ return
Boolean.parseBoolean(queryOptions.get(QueryOptionKey.FILTERED_AGGREGATIONS_SKIP_EMPTY_GROUPS));
+ }
+
@Nullable
public static String getOrderByAlgorithm(Map<String, String> queryOptions) {
return queryOptions.get(QueryOptionKey.ORDER_BY_ALGORITHM);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java
index 97ce0e4f6b..5c82827cb4 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java
@@ -124,33 +124,24 @@ public class FilteredGroupByOperator extends
BaseOperator<GroupByResultsBlock> {
// Perform aggregation group-by on all the blocks
DefaultGroupByExecutor groupByExecutor;
- if (groupKeyGenerator == null) {
- // The group key generator should be shared across all
AggregationFunctions so that agg results can be
- // aligned. Given that filtered aggregations are stored as an iterable
of iterables so that all filtered aggs
- // with the same filter can share transform blocks, rather than a
singular flat iterable in the case where
- // aggs are all non-filtered, sharing a GroupKeyGenerator across all
aggs cannot be accomplished by allowing
- // the GroupByExecutor to have sole ownership of the
GroupKeyGenerator. Therefore, we allow constructing a
- // GroupByExecutor with a pre-existing GroupKeyGenerator so that the
GroupKeyGenerator can be shared across
- // loop iterations i.e. across all aggs.
- if (aggregationInfo.isUseStarTree()) {
- groupByExecutor =
- new StarTreeGroupByExecutor(_queryContext, aggregationFunctions,
_groupByExpressions, projectOperator);
- } else {
- groupByExecutor =
- new DefaultGroupByExecutor(_queryContext, aggregationFunctions,
_groupByExpressions, projectOperator);
- }
- groupKeyGenerator = groupByExecutor.getGroupKeyGenerator();
+
+ if (aggregationInfo.isUseStarTree()) {
+ groupByExecutor =
+ new StarTreeGroupByExecutor(_queryContext, aggregationFunctions,
_groupByExpressions, projectOperator,
+ groupKeyGenerator);
} else {
- if (aggregationInfo.isUseStarTree()) {
- groupByExecutor =
- new StarTreeGroupByExecutor(_queryContext, aggregationFunctions,
_groupByExpressions, projectOperator,
- groupKeyGenerator);
- } else {
- groupByExecutor =
- new DefaultGroupByExecutor(_queryContext, aggregationFunctions,
_groupByExpressions, projectOperator,
- groupKeyGenerator);
- }
+ groupByExecutor =
+ new DefaultGroupByExecutor(_queryContext, aggregationFunctions,
_groupByExpressions, projectOperator,
+ groupKeyGenerator);
}
+ // The group key generator should be shared across all
AggregationFunctions so that agg results can be
+ // aligned. Given that filtered aggregations are stored as an iterable
of iterables so that all filtered aggs
+ // with the same filter can share transform blocks, rather than a
singular flat iterable in the case where
+ // aggs are all non-filtered, sharing a GroupKeyGenerator across all
aggs cannot be accomplished by allowing
+ // the GroupByExecutor to have sole ownership of the GroupKeyGenerator.
Therefore, we allow constructing a
+ // GroupByExecutor with a pre-existing GroupKeyGenerator so that the
GroupKeyGenerator can be shared across
+ // loop iterations i.e. across all aggs.
+ groupKeyGenerator = groupByExecutor.getGroupKeyGenerator();
int numDocsScanned = 0;
ValueBlock valueBlock;
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
index 95020d0a34..962fc74259 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
@@ -39,6 +39,7 @@ import
org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.request.context.FilterContext;
import org.apache.pinot.common.request.context.predicate.Predicate;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.common.ObjectSerDeUtils;
import org.apache.pinot.core.operator.BaseProjectOperator;
@@ -384,7 +385,14 @@ public class AggregationFunctionUtils {
}
}
- if (!nonFilteredFunctions.isEmpty()) {
+ if (!nonFilteredFunctions.isEmpty() ||
((queryContext.getGroupByExpressions() != null)
+ &&
!QueryOptionsUtils.isFilteredAggregationsSkipEmptyGroups(queryContext.getQueryOptions())))
{
+ // If there are no non-filtered aggregation functions for a group by
query, we still add a new AggregationInfo
+ // with an empty AggregationFunction array and the main query filter so
that the GroupByExecutor will compute all
+ // the groups (from the result of applying the main query filter) but no
unnecessary additional aggregation will
+ // be done since the AggregationFunction array is empty. However, if the
query option to skip empty groups is
+ // enabled, we don't do this in order to avoid unnecessary computation
of empty groups (which can be very
+ // expensive if the main filter has high selectivity).
AggregationFunction[] aggregationFunctions =
nonFilteredFunctions.toArray(new AggregationFunction[0]);
aggregationInfos.add(
buildAggregationInfo(segmentContext, queryContext,
aggregationFunctions, mainFilter, mainFilterOperator,
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/FilteredAggregationsTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/FilteredAggregationsTest.java
index 02a9a05395..e253dce452 100644
---
a/pinot-core/src/test/java/org/apache/pinot/queries/FilteredAggregationsTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/FilteredAggregationsTest.java
@@ -41,6 +41,7 @@ import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -394,9 +395,9 @@ public class FilteredAggregationsTest extends
BaseQueriesTest {
@Test
public void testGroupByMultipleColumns() {
- String filterQuery =
- "SELECT SUM(INT_COL) FILTER(WHERE INT_COL > 25000) testSum FROM
MyTable GROUP BY BOOLEAN_COL, STRING_COL "
- + "ORDER BY BOOLEAN_COL, STRING_COL";
+ String filterQuery = "SET " +
CommonConstants.Broker.Request.QueryOptionKey.FILTERED_AGGREGATIONS_SKIP_EMPTY_GROUPS
+ + "=true; SELECT SUM(INT_COL) FILTER(WHERE INT_COL > 25000) testSum
FROM MyTable GROUP BY BOOLEAN_COL, "
+ + "STRING_COL ORDER BY BOOLEAN_COL, STRING_COL";
String nonFilterQuery =
"SELECT SUM(INT_COL) testSum FROM MyTable WHERE INT_COL > 25000 GROUP
BY BOOLEAN_COL, STRING_COL "
+ "ORDER BY BOOLEAN_COL, STRING_COL";
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
index 48cd23ea7d..b450612d45 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
@@ -140,11 +140,6 @@ public class MultiStageEngineIntegrationTest extends
BaseClusterIntegrationTestS
throws IOException {
}
-// @Override
-// protected boolean useMultiStageQueryEngine() {
-// return true;
-// }
-
@BeforeMethod
@Override
public void resetMultiStage() {
@@ -1043,6 +1038,46 @@ public class MultiStageEngineIntegrationTest extends
BaseClusterIntegrationTestS
assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(0).asInt(),
15482);
}
+ @Test
+ public void
testFilteredAggregationWithNoValueMatchingAggregationFilterDefault()
+ throws Exception {
+ // Use a hint to ensure that the aggregation will not be pushed to the
leaf stage, so that we can test the
+ // MultistageGroupByExecutor
+ String sqlQuery = "SELECT /*+
aggOptions(is_skip_leaf_stage_group_by='true') */"
+ + "AirlineID, COUNT(*) FILTER (WHERE Origin = 'garbage') FROM mytable
WHERE AirlineID > 20000 GROUP BY "
+ + "AirlineID";
+ JsonNode result = postQuery(sqlQuery);
+ assertNoError(result);
+ // Ensure that result set is not empty
+ assertTrue(result.get("numRowsResultSet").asInt() > 0);
+
+ // Ensure that the count is 0 for all groups (because the aggregation
filter does not match any rows)
+ JsonNode rows = result.get("resultTable").get("rows");
+ for (int i = 0; i < rows.size(); i++) {
+ assertEquals(rows.get(i).get(1).asInt(), 0);
+ // Ensure that the main filter was applied
+ assertTrue(rows.get(i).get(0).asInt() > 20000);
+ }
+ }
+
+ @Test
+ public void
testFilteredAggregationWithNoValueMatchingAggregationFilterWithOption()
+ throws Exception {
+ // Use a hint to ensure that the aggregation will not be pushed to the
leaf stage, so that we can test the
+ // MultistageGroupByExecutor
+ String sqlQuery = "SET " +
CommonConstants.Broker.Request.QueryOptionKey.FILTERED_AGGREGATIONS_SKIP_EMPTY_GROUPS
+ + "=true; SELECT /*+ aggOptions(is_skip_leaf_stage_group_by='true') */"
+ + "AirlineID, COUNT(*) FILTER (WHERE Origin = 'garbage') FROM mytable
WHERE AirlineID > 20000 GROUP BY "
+ + "AirlineID";
+
+ JsonNode result = postQuery(sqlQuery);
+ assertNoError(result);
+
+ // Result set will be empty since the aggregation filter does not match
any rows, and we've set the query option to
+ // skip empty groups
+ assertEquals(result.get("numRowsResultSet").asInt(), 0);
+ }
+
@Override
protected String getTableName() {
return _tableName;
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 25a75352f7..0fa4868179 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -3722,4 +3722,44 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
updateTableConfig(tableConfig);
reloadAllSegments(TEST_UPDATED_RANGE_INDEX_QUERY, true, numTotalDocs);
}
+
+ @Test(dataProvider = "useBothQueryEngines")
+ public void
testFilteredAggregationWithNoValueMatchingAggregationFilterDefault(boolean
useMultiStageQueryEngine)
+ throws Exception {
+ setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+
+ String sqlQuery =
+ "SELECT AirlineID, COUNT(*) FILTER (WHERE Origin = 'garbage') FROM
mytable WHERE AirlineID > 20000 GROUP BY "
+ + "AirlineID";
+
+ JsonNode result = postQuery(sqlQuery);
+ assertNoError(result);
+
+ // Ensure that result set is not empty since all groups should be computed
by default
+ assertTrue(result.get("numRowsResultSet").asInt() > 0);
+
+ // Ensure that the count is 0 for all groups (because the aggregation
filter does not match any rows)
+ JsonNode rows = result.get("resultTable").get("rows");
+ for (int i = 0; i < rows.size(); i++) {
+ assertEquals(rows.get(i).get(1).asInt(), 0);
+ // Ensure that the main filter was applied
+ assertTrue(rows.get(i).get(0).asInt() > 20000);
+ }
+ }
+
+ @Test(dataProvider = "useBothQueryEngines")
+ public void
testFilteredAggregationWithNoValueMatchingAggregationFilterWithOption(boolean
useMultiStageQueryEngine)
+ throws Exception {
+ setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+ String sqlQuery =
+ "SET " +
CommonConstants.Broker.Request.QueryOptionKey.FILTERED_AGGREGATIONS_SKIP_EMPTY_GROUPS
+ "=true; "
+ + "SELECT AirlineID, COUNT(*) FILTER (WHERE Origin = 'garbage')
FROM mytable WHERE AirlineID > 20000 "
+ + "GROUP BY AirlineID";
+ JsonNode result = postQuery(sqlQuery);
+ assertNoError(result);
+
+ // Result set will be empty since the aggregation filter does not match
any rows, and we've set the option to skip
+ // empty groups
+ assertEquals(result.get("numRowsResultSet").asInt(), 0);
+ }
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java
index 1f3f3a20fc..41501f6938 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java
@@ -58,6 +58,7 @@ public class MultistageGroupByExecutor {
private final AggType _aggType;
private final DataSchema _resultSchema;
private final int _numGroupsLimit;
+ private final boolean _filteredAggregationsSkipEmptyGroups;
// Group By Result holders for each mode
private final GroupByResultHolder[] _aggregateResultHolders;
@@ -79,6 +80,10 @@ public class MultistageGroupByExecutor {
int maxInitialResultHolderCapacity =
getMaxInitialResultHolderCapacity(opChainMetadata, nodeHint);
_numGroupsLimit = getNumGroupsLimit(opChainMetadata, nodeHint);
+ // By default, we compute all groups for SQL compliant results. However,
we allow overriding this behavior via
+ // query option for improved performance.
+ _filteredAggregationsSkipEmptyGroups =
QueryOptionsUtils.isFilteredAggregationsSkipEmptyGroups(opChainMetadata);
+
int numFunctions = aggFunctions.length;
if (!aggType.isInputIntermediateFormat()) {
_aggregateResultHolders = new GroupByResultHolder[numFunctions];
@@ -241,6 +246,12 @@ public class MultistageGroupByExecutor {
aggFunction.aggregateGroupBySV(numMatchedRows, filteredIntKeys,
groupByResultHolder, blockValSetMap);
}
}
+ if (intKeys == null && !_filteredAggregationsSkipEmptyGroups) {
+ // _groupIdGenerator should still have all the groups even if there
are only filtered aggregates for SQL
+ // compliant results. However, if the query option to skip empty
groups is set, we avoid this step for
+ // improved performance.
+ generateGroupByKeys(block);
+ }
}
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 8ccc9dbad0..ff81f6bc4e 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -463,6 +463,15 @@ public class CommonConstants {
// executed in an Unbounded FCFS fashion. However, secondary
workloads are executed in a constrainted FCFS
// fashion with limited compute.
public static final String IS_SECONDARY_WORKLOAD =
"isSecondaryWorkload";
+
+ // For group by queries with only filtered aggregations (and no
non-filtered aggregations), the default behavior
+ // is to compute all groups over the rows matching the main query
filter. This ensures SQL compliant results,
+ // since empty groups are also expected to be returned in such
queries. However, this could be quite inefficient
+ // if the main query does not have a filter (since a scan would be
required to compute all groups). In case
+ // users are okay with skipping empty groups - i.e., only the groups
matching at least one aggregation filter
+ // will be returned - this query option can be set. This is useful for
performance, since indexes can be used
+ // for the aggregation filters and a full scan can be avoided.
+ public static final String FILTERED_AGGREGATIONS_SKIP_EMPTY_GROUPS =
"filteredAggregationsSkipEmptyGroups";
}
public static class QueryOptionValue {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]