This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new a0d49baad6 MSQ: Fix issue with rollup ingestion and aggregators with 
multiple names. (#14367)
a0d49baad6 is described below

commit a0d49baad6a23f58b6591f78358baf2887339364
Author: Gian Merlino <[email protected]>
AuthorDate: Mon Jun 5 21:58:41 2023 -0700

    MSQ: Fix issue with rollup ingestion and aggregators with multiple names. 
(#14367)
    
    The same aggregator can have two output names for a SQL like:
    
      INSERT INTO foo
      SELECT x, COUNT(*) AS y, COUNT(*) AS z
      FROM t
      GROUP BY 1
      PARTITIONED BY ALL
    
    In this case, the SQL planner will create a query with a single "count"
    aggregator mapped to output names "y" and "z". The prior MSQ code did
    not properly handle this case, instead throwing an error like:
    
      Expected single output for query column[a0] but got [[1, 2]]
---
 .../org/apache/druid/msq/exec/ControllerImpl.java  | 22 ++++++------
 .../org/apache/druid/msq/exec/MSQInsertTest.java   | 40 ++++++++++++++++++++++
 2 files changed, 50 insertions(+), 12 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index b39249d811..db7a0f38b0 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -1859,18 +1859,16 @@ public class ControllerImpl implements Controller
     if (isRollupQuery) {
       // Populate aggregators from the native query when doing an ingest in 
rollup mode.
       for (AggregatorFactory aggregatorFactory : ((GroupByQuery) 
query).getAggregatorSpecs()) {
-        final int outputColumn = CollectionUtils.getOnlyElement(
-            
columnMappings.getOutputColumnsForQueryColumn(aggregatorFactory.getName()),
-            xs -> new ISE("Expected single output for query column[%s] but 
got[%s]", aggregatorFactory.getName(), xs)
-        );
-        final String outputColumnName = 
columnMappings.getOutputColumnName(outputColumn);
-        if (outputColumnAggregatorFactories.containsKey(outputColumnName)) {
-          throw new ISE("There can only be one aggregation for column [%s].", 
outputColumn);
-        } else {
-          outputColumnAggregatorFactories.put(
-              outputColumnName,
-              
aggregatorFactory.withName(outputColumnName).getCombiningFactory()
-          );
+        for (final int outputColumn : 
columnMappings.getOutputColumnsForQueryColumn(aggregatorFactory.getName())) {
+          final String outputColumnName = 
columnMappings.getOutputColumnName(outputColumn);
+          if (outputColumnAggregatorFactories.containsKey(outputColumnName)) {
+            throw new ISE("There can only be one aggregation for column 
[%s].", outputColumn);
+          } else {
+            outputColumnAggregatorFactories.put(
+                outputColumnName,
+                
aggregatorFactory.withName(outputColumnName).getCombiningFactory()
+            );
+          }
         }
       }
     }
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
index 479bc4de0d..548e5e0166 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
@@ -37,6 +37,7 @@ import org.apache.druid.msq.test.MSQTestBase;
 import org.apache.druid.msq.test.MSQTestFileUtils;
 import org.apache.druid.msq.util.MultiStageQueryContext;
 import org.apache.druid.query.NestedDataTestUtils;
+import org.apache.druid.query.QueryContexts;
 import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
 import 
org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
 import org.apache.druid.segment.column.ColumnType;
@@ -295,6 +296,45 @@ public class MSQInsertTest extends MSQTestBase
 
   }
 
+  @Test
+  public void testInsertOnFoo1WithTwoCountAggregatorsWithRollupContext()
+  {
+    final List<Object[]> expectedRows = expectedFooRows();
+
+    // Add 1L to each expected row, since we have two count aggregators.
+    for (int i = 0; i < expectedRows.size(); i++) {
+      final Object[] expectedRow = expectedRows.get(i);
+      final Object[] newExpectedRow = new Object[expectedRow.length + 1];
+      System.arraycopy(expectedRow, 0, newExpectedRow, 0, expectedRow.length);
+      newExpectedRow[expectedRow.length] = 1L;
+      expectedRows.set(i, newExpectedRow);
+    }
+
+    RowSignature rowSignature = RowSignature.builder()
+                                            .add("__time", ColumnType.LONG)
+                                            .add("dim1", ColumnType.STRING)
+                                            .add("cnt", ColumnType.LONG)
+                                            .add("cnt2", ColumnType.LONG)
+                                            .build();
+
+    testIngestQuery().setSql(
+                         "insert into foo1\n"
+                         + "select  __time, dim1 , count(*) as cnt, count(*) 
as cnt2\n"
+                         + "from foo\n"
+                         + "where dim1 is not null\n"
+                         + "group by 1, 2\n"
+                         + "PARTITIONED by All")
+                     .setExpectedDataSource("foo1")
+                     .setQueryContext(QueryContexts.override(context, 
ROLLUP_CONTEXT_PARAMS))
+                     .setExpectedRowSignature(rowSignature)
+                     .setExpectedSegment(ImmutableSet.of(SegmentId.of("foo1", 
Intervals.ETERNITY, "test", 0)))
+                     .setExpectedResultRows(expectedRows)
+                     .setExpectedRollUp(true)
+                     .addExpectedAggregatorFactory(new 
LongSumAggregatorFactory("cnt", "cnt"))
+                     .addExpectedAggregatorFactory(new 
LongSumAggregatorFactory("cnt2", "cnt2"))
+                     .verifyResults();
+  }
+
   @Test
   public void testInsertOnFoo1WithGroupByLimitWithClusterBy()
   {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to