This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new a0d49baad6 MSQ: Fix issue with rollup ingestion and aggregators with
multiple names. (#14367)
a0d49baad6 is described below
commit a0d49baad6a23f58b6591f78358baf2887339364
Author: Gian Merlino <[email protected]>
AuthorDate: Mon Jun 5 21:58:41 2023 -0700
MSQ: Fix issue with rollup ingestion and aggregators with multiple names.
(#14367)
The same aggregator can have two output names for a SQL like:
INSERT INTO foo
SELECT x, COUNT(*) AS y, COUNT(*) AS z
FROM t
GROUP BY 1
PARTITIONED BY ALL
In this case, the SQL planner will create a query with a single "count"
aggregator mapped to output names "y" and "z". The prior MSQ code did
not properly handle this case, instead throwing an error like:
Expected single output for query column[a0] but got [[1, 2]]
---
.../org/apache/druid/msq/exec/ControllerImpl.java | 22 ++++++------
.../org/apache/druid/msq/exec/MSQInsertTest.java | 40 ++++++++++++++++++++++
2 files changed, 50 insertions(+), 12 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index b39249d811..db7a0f38b0 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -1859,18 +1859,16 @@ public class ControllerImpl implements Controller
if (isRollupQuery) {
// Populate aggregators from the native query when doing an ingest in
rollup mode.
for (AggregatorFactory aggregatorFactory : ((GroupByQuery)
query).getAggregatorSpecs()) {
- final int outputColumn = CollectionUtils.getOnlyElement(
-
columnMappings.getOutputColumnsForQueryColumn(aggregatorFactory.getName()),
- xs -> new ISE("Expected single output for query column[%s] but
got[%s]", aggregatorFactory.getName(), xs)
- );
- final String outputColumnName =
columnMappings.getOutputColumnName(outputColumn);
- if (outputColumnAggregatorFactories.containsKey(outputColumnName)) {
- throw new ISE("There can only be one aggregation for column [%s].",
outputColumn);
- } else {
- outputColumnAggregatorFactories.put(
- outputColumnName,
-
aggregatorFactory.withName(outputColumnName).getCombiningFactory()
- );
+ for (final int outputColumn :
columnMappings.getOutputColumnsForQueryColumn(aggregatorFactory.getName())) {
+ final String outputColumnName =
columnMappings.getOutputColumnName(outputColumn);
+ if (outputColumnAggregatorFactories.containsKey(outputColumnName)) {
+ throw new ISE("There can only be one aggregation for column
[%s].", outputColumn);
+ } else {
+ outputColumnAggregatorFactories.put(
+ outputColumnName,
+
aggregatorFactory.withName(outputColumnName).getCombiningFactory()
+ );
+ }
}
}
}
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
index 479bc4de0d..548e5e0166 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
@@ -37,6 +37,7 @@ import org.apache.druid.msq.test.MSQTestBase;
import org.apache.druid.msq.test.MSQTestFileUtils;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.NestedDataTestUtils;
+import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import
org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import org.apache.druid.segment.column.ColumnType;
@@ -295,6 +296,45 @@ public class MSQInsertTest extends MSQTestBase
}
+ @Test
+ public void testInsertOnFoo1WithTwoCountAggregatorsWithRollupContext()
+ {
+ final List<Object[]> expectedRows = expectedFooRows();
+
+ // Add 1L to each expected row, since we have two count aggregators.
+ for (int i = 0; i < expectedRows.size(); i++) {
+ final Object[] expectedRow = expectedRows.get(i);
+ final Object[] newExpectedRow = new Object[expectedRow.length + 1];
+ System.arraycopy(expectedRow, 0, newExpectedRow, 0, expectedRow.length);
+ newExpectedRow[expectedRow.length] = 1L;
+ expectedRows.set(i, newExpectedRow);
+ }
+
+ RowSignature rowSignature = RowSignature.builder()
+ .add("__time", ColumnType.LONG)
+ .add("dim1", ColumnType.STRING)
+ .add("cnt", ColumnType.LONG)
+ .add("cnt2", ColumnType.LONG)
+ .build();
+
+ testIngestQuery().setSql(
+ "insert into foo1\n"
+ + "select __time, dim1 , count(*) as cnt, count(*)
as cnt2\n"
+ + "from foo\n"
+ + "where dim1 is not null\n"
+ + "group by 1, 2\n"
+ + "PARTITIONED by All")
+ .setExpectedDataSource("foo1")
+ .setQueryContext(QueryContexts.override(context,
ROLLUP_CONTEXT_PARAMS))
+ .setExpectedRowSignature(rowSignature)
+ .setExpectedSegment(ImmutableSet.of(SegmentId.of("foo1",
Intervals.ETERNITY, "test", 0)))
+ .setExpectedResultRows(expectedRows)
+ .setExpectedRollUp(true)
+ .addExpectedAggregatorFactory(new
LongSumAggregatorFactory("cnt", "cnt"))
+ .addExpectedAggregatorFactory(new
LongSumAggregatorFactory("cnt2", "cnt2"))
+ .verifyResults();
+ }
+
@Test
public void testInsertOnFoo1WithGroupByLimitWithClusterBy()
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]