This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d2daa88ebc [FLINK-38776][table] Fix incorrect auxiliary group field 
names
9d2daa88ebc is described below

commit 9d2daa88ebc569f2a11c39874f4d0a323cdd75ce
Author: dylanhz <[email protected]>
AuthorDate: Tue Dec 16 04:55:31 2025 +0800

    [FLINK-38776][table] Fix incorrect auxiliary group field names
---
 .../batch/BatchPhysicalWindowAggregateRule.scala   |  2 +-
 .../table/planner/plan/utils/RelExplainUtil.scala  |  2 +-
 .../batch/sql/agg/AggregateReduceGroupingTest.xml  | 35 ++++++++++++++++++----
 .../logical/AggregateReduceGroupingRuleTest.xml    | 21 +++++++++++++
 .../common/AggregateReduceGroupingTestBase.scala   |  8 +++++
 5 files changed, 61 insertions(+), 7 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalWindowAggregateRule.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalWindowAggregateRule.scala
index 4b23f840431..cbecfa93b06 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalWindowAggregateRule.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalWindowAggregateRule.scala
@@ -397,7 +397,7 @@ class BatchPhysicalWindowAggregateRule
       case (udf, aggIndex) =>
         aggBufferFieldNames(aggIndex) = udf match {
           case _: AggregateFunction[_, _] =>
-            Array(aggNames(aggIndex))
+            Array(aggNames(aggIndex + auxGroupSet.length))
           case agf: DeclarativeAggregateFunction =>
             agf.aggBufferAttributes.map {
               attr =>
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RelExplainUtil.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RelExplainUtil.scala
index 970ed8fae50..90aed57acce 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RelExplainUtil.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RelExplainUtil.scala
@@ -825,7 +825,7 @@ object RelExplainUtil {
 
     val inNames = grouping.map(inFields(_)) ++ auxGrouping.map(inFields(_)) ++ 
aggStrings
     val outNames = grouping.indices.map(outFields(_)) ++
-      (grouping.length + 1 until grouping.length + 1 + 
auxGrouping.length).map(outFields(_)) ++
+      (grouping.length until grouping.length + 
auxGrouping.length).map(outFields(_)) ++
       outFieldNames
     inNames
       .zip(outNames)
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateReduceGroupingTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateReduceGroupingTest.xml
index 8cc7192a86d..ffbce7d6679 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateReduceGroupingTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateReduceGroupingTest.xml
@@ -381,7 +381,7 @@ LogicalProject(a4=[$0], b4=[$1], EXPR$2=[$3])
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-HashWindowAggregate(groupBy=[a4], auxGrouping=[b4], 
window=[TumblingGroupWindow('w$, d4, 900000)], select=[a4, b4 AS EXPR$2, 
COUNT(c4) AS EXPR$2])
+HashWindowAggregate(groupBy=[a4], auxGrouping=[b4], 
window=[TumblingGroupWindow('w$, d4, 900000)], select=[a4, b4, COUNT(c4) AS 
EXPR$2])
 +- Exchange(distribution=[hash[a4]])
    +- LegacyTableSourceScan(table=[[default_catalog, default_database, T4, 
source: [TestTableSource(a4, b4, c4, d4)]]], fields=[a4, b4, c4, d4])
 ]]>
@@ -401,7 +401,7 @@ LogicalProject(a4=[$0], c4=[$1], EXPR$2=[$3], EXPR$3=[$4])
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-HashWindowAggregate(groupBy=[a4], auxGrouping=[c4], 
window=[TumblingGroupWindow('w$, d4, 900000)], select=[a4, c4 AS EXPR$2, 
COUNT(b4) AS EXPR$2, AVG(b4) AS EXPR$3])
+HashWindowAggregate(groupBy=[a4], auxGrouping=[c4], 
window=[TumblingGroupWindow('w$, d4, 900000)], select=[a4, c4, COUNT(b4) AS 
EXPR$2, AVG(b4) AS EXPR$3])
 +- Exchange(distribution=[hash[a4]])
    +- LegacyTableSourceScan(table=[[default_catalog, default_database, T4, 
source: [TestTableSource(a4, b4, c4, d4)]]], fields=[a4, b4, c4, d4])
 ]]>
@@ -428,7 +428,7 @@ Calc(select=[a4, c4, s, EXPR$3])
    +- Exchange(distribution=[hash[a4, s]])
       +- LocalHashAggregate(groupBy=[a4, s], auxGrouping=[c4], select=[a4, s, 
c4, Partial_COUNT(b4) AS count$0])
          +- Calc(select=[a4, c4, w$start AS s, CAST((($f2 - (($f3 * $f3) / 
$f4)) / $f4) AS INTEGER) AS b4])
-            +- HashWindowAggregate(groupBy=[a4], auxGrouping=[c4], 
window=[TumblingGroupWindow('w$, d4, 900000)], properties=[w$start, w$end, 
w$rowtime], select=[a4, c4 AS $f2, SUM($f4) AS $f2, SUM(b4) AS $f3, COUNT(b4) 
AS $f4])
+            +- HashWindowAggregate(groupBy=[a4], auxGrouping=[c4], 
window=[TumblingGroupWindow('w$, d4, 900000)], properties=[w$start, w$end, 
w$rowtime], select=[a4, c4, SUM($f4) AS $f2, SUM(b4) AS $f3, COUNT(b4) AS $f4])
                +- Exchange(distribution=[keep_input_as_is[hash[a4]]])
                   +- Calc(select=[a4, c4, d4, b4, (b4 * b4) AS $f4])
                      +- Exchange(distribution=[hash[a4]])
@@ -457,7 +457,7 @@ Calc(select=[a4, c4, e, EXPR$3])
    +- Exchange(distribution=[hash[a4, e]])
       +- LocalHashAggregate(groupBy=[a4, e], auxGrouping=[c4], select=[a4, e, 
c4, Partial_COUNT(b4) AS count$0])
          +- Calc(select=[a4, c4, w$end AS e, CAST((($f2 - (($f3 * $f3) / $f4)) 
/ $f4) AS INTEGER) AS b4])
-            +- HashWindowAggregate(groupBy=[a4], auxGrouping=[c4], 
window=[TumblingGroupWindow('w$, d4, 900000)], properties=[w$start, w$end, 
w$rowtime], select=[a4, c4 AS $f2, SUM($f4) AS $f2, SUM(b4) AS $f3, COUNT(b4) 
AS $f4])
+            +- HashWindowAggregate(groupBy=[a4], auxGrouping=[c4], 
window=[TumblingGroupWindow('w$, d4, 900000)], properties=[w$start, w$end, 
w$rowtime], select=[a4, c4, SUM($f4) AS $f2, SUM(b4) AS $f3, COUNT(b4) AS $f4])
                +- Exchange(distribution=[keep_input_as_is[hash[a4]]])
                   +- Calc(select=[a4, c4, d4, b4, (b4 * b4) AS $f4])
                      +- Exchange(distribution=[hash[a4]])
@@ -485,7 +485,7 @@ HashAggregate(isMerge=[true], groupBy=[a4, b4], 
auxGrouping=[c4], select=[a4, b4
 +- Exchange(distribution=[hash[a4, b4]])
    +- LocalHashAggregate(groupBy=[a4, b4], auxGrouping=[c4], select=[a4, b4, 
c4, Partial_COUNT(*) AS count1$0])
       +- Calc(select=[a4, CAST((($f2 - (($f3 * $f3) / $f4)) / $f4) AS INTEGER) 
AS b4, c4])
-         +- HashWindowAggregate(groupBy=[a4], auxGrouping=[c4], 
window=[TumblingGroupWindow('w$, d4, 900000)], properties=[w$start, w$end, 
w$rowtime], select=[a4, c4 AS $f2, SUM($f4) AS $f2, SUM(b4) AS $f3, COUNT(b4) 
AS $f4])
+         +- HashWindowAggregate(groupBy=[a4], auxGrouping=[c4], 
window=[TumblingGroupWindow('w$, d4, 900000)], properties=[w$start, w$end, 
w$rowtime], select=[a4, c4, SUM($f4) AS $f2, SUM(b4) AS $f3, COUNT(b4) AS $f4])
             +- Exchange(distribution=[keep_input_as_is[hash[a4]]])
                +- Calc(select=[a4, c4, d4, b4, (b4 * b4) AS $f4])
                   +- Exchange(distribution=[hash[a4]])
@@ -635,6 +635,31 @@ HashAggregate(isMerge=[true], groupBy=[a3, b3], 
select=[a3, b3, Final_COUNT(coun
    +- LocalHashAggregate(groupBy=[a3, b3], select=[a3, b3, Partial_COUNT(c3) 
AS count$0])
       +- Calc(select=[a3, b3, c3])
          +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
T3, source: [TestTableSource(a3, b3, c3, d3)]]], fields=[a3, b3, c3, d3])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testImperativeAggWithAuxiliaryGrouping">
+    <Resource name="sql">
+      <![CDATA[SELECT a4, c4, COUNT(b4) FROM (SELECT a4, c4, ARRAY_AGG(b4) AS 
b4 FROM T4 GROUP BY a4, c4, TUMBLE(d4, INTERVAL '15' MINUTE)) t GROUP BY a4, 
c4]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalAggregate(group=[{0, 1}], EXPR$2=[COUNT($2)])
++- LogicalProject(a4=[$0], c4=[$1], b4=[$3])
+   +- LogicalAggregate(group=[{0, 1, 2}], b4=[ARRAY_AGG($3)])
+      +- LogicalProject(a4=[$0], c4=[$2], $f2=[$TUMBLE($3, 900000:INTERVAL 
MINUTE)], b4=[$1])
+         +- LogicalTableScan(table=[[default_catalog, default_database, T4, 
source: [TestTableSource(a4, b4, c4, d4)]]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+HashAggregate(isMerge=[false], groupBy=[a4], auxGrouping=[c4], select=[a4, c4, 
COUNT(b4) AS EXPR$2])
++- Exchange(distribution=[hash[a4]])
+   +- SortWindowAggregate(groupBy=[a4], auxGrouping=[c4], 
window=[TumblingGroupWindow('w$, d4, 900000)], select=[a4, c4, ARRAY_AGG(b4) AS 
b4])
+      +- Exchange(distribution=[forward])
+         +- Sort(orderBy=[a4 ASC, d4 ASC])
+            +- Exchange(distribution=[hash[a4]])
+               +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, T4, source: [TestTableSource(a4, b4, c4, d4)]]], fields=[a4, 
b4, c4, d4])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AggregateReduceGroupingRuleTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AggregateReduceGroupingRuleTest.xml
index e65c28bdba1..77f9b7e23f1 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AggregateReduceGroupingRuleTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AggregateReduceGroupingRuleTest.xml
@@ -572,6 +572,27 @@ LogicalAggregate(group=[{0, 1}], EXPR$2=[COUNT($2)])
 FlinkLogicalAggregate(group=[{0, 1}], EXPR$2=[COUNT($2)])
 +- FlinkLogicalCalc(select=[a3, b3, c3])
    +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, 
default_database, T3, source: [TestTableSource(a3, b3, c3, d3)]]], fields=[a3, 
b3, c3, d3])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testImperativeAggWithAuxiliaryGrouping">
+    <Resource name="sql">
+      <![CDATA[SELECT a4, c4, COUNT(b4) FROM (SELECT a4, c4, ARRAY_AGG(b4) AS 
b4 FROM T4 GROUP BY a4, c4, TUMBLE(d4, INTERVAL '15' MINUTE)) t GROUP BY a4, 
c4]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalAggregate(group=[{0, 1}], EXPR$2=[COUNT($2)])
++- LogicalProject(a4=[$0], c4=[$1], b4=[$3])
+   +- LogicalAggregate(group=[{0, 1, 2}], b4=[ARRAY_AGG($3)])
+      +- LogicalProject(a4=[$0], c4=[$2], $f2=[$TUMBLE($3, 900000:INTERVAL 
MINUTE)], b4=[$1])
+         +- LogicalTableScan(table=[[default_catalog, default_database, T4, 
source: [TestTableSource(a4, b4, c4, d4)]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+FlinkLogicalAggregate(group=[{0}], c4=[AUXILIARY_GROUP($1)], 
EXPR$2=[COUNT($2)])
++- FlinkLogicalWindowAggregate(group=[{0}], c4=[AUXILIARY_GROUP($2)], 
b4=[ARRAY_AGG($1)], window=[TumblingGroupWindow('w$, d4, 900000)], 
properties=[])
+   +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, 
default_database, T4, source: [TestTableSource(a4, b4, c4, d4)]]], fields=[a4, 
b4, c4, d4])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/AggregateReduceGroupingTestBase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/AggregateReduceGroupingTestBase.scala
index 9fb0ee322f2..7a25cd25048 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/AggregateReduceGroupingTestBase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/AggregateReduceGroupingTestBase.scala
@@ -342,6 +342,14 @@ abstract class 
AggregateReduceGroupingTestBase(withExecPlan: Boolean) extends Ta
       "SELECT a1, d1, COUNT(DISTINCT c1), MAX(DISTINCT b1), SUM(b1) FROM T1 
GROUP BY a1, d1")
   }
 
+  @Test
+  def testImperativeAggWithAuxiliaryGrouping(): Unit = {
+    verifyPlan(
+      "SELECT a4, c4, COUNT(b4) FROM " +
+        "(SELECT a4, c4, ARRAY_AGG(b4) AS b4 FROM T4 " +
+        "GROUP BY a4, c4, TUMBLE(d4, INTERVAL '15' MINUTE)) t GROUP BY a4, c4")
+  }
+
   def verifyPlan(sqlQuery: String): Unit = {
     if (withExecPlan) {
       util.verifyExecPlan(sqlQuery)

Reply via email to