This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 9d2daa88ebc [FLINK-38776][table] Fix incorrect auxiliary group field
names
9d2daa88ebc is described below
commit 9d2daa88ebc569f2a11c39874f4d0a323cdd75ce
Author: dylanhz <[email protected]>
AuthorDate: Tue Dec 16 04:55:31 2025 +0800
[FLINK-38776][table] Fix incorrect auxiliary group field names
---
.../batch/BatchPhysicalWindowAggregateRule.scala | 2 +-
.../table/planner/plan/utils/RelExplainUtil.scala | 2 +-
.../batch/sql/agg/AggregateReduceGroupingTest.xml | 35 ++++++++++++++++++----
.../logical/AggregateReduceGroupingRuleTest.xml | 21 +++++++++++++
.../common/AggregateReduceGroupingTestBase.scala | 8 +++++
5 files changed, 61 insertions(+), 7 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalWindowAggregateRule.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalWindowAggregateRule.scala
index 4b23f840431..cbecfa93b06 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalWindowAggregateRule.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalWindowAggregateRule.scala
@@ -397,7 +397,7 @@ class BatchPhysicalWindowAggregateRule
case (udf, aggIndex) =>
aggBufferFieldNames(aggIndex) = udf match {
case _: AggregateFunction[_, _] =>
- Array(aggNames(aggIndex))
+ Array(aggNames(aggIndex + auxGroupSet.length))
case agf: DeclarativeAggregateFunction =>
agf.aggBufferAttributes.map {
attr =>
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RelExplainUtil.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RelExplainUtil.scala
index 970ed8fae50..90aed57acce 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RelExplainUtil.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RelExplainUtil.scala
@@ -825,7 +825,7 @@ object RelExplainUtil {
val inNames = grouping.map(inFields(_)) ++ auxGrouping.map(inFields(_)) ++
aggStrings
val outNames = grouping.indices.map(outFields(_)) ++
- (grouping.length + 1 until grouping.length + 1 +
auxGrouping.length).map(outFields(_)) ++
+ (grouping.length until grouping.length +
auxGrouping.length).map(outFields(_)) ++
outFieldNames
inNames
.zip(outNames)
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateReduceGroupingTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateReduceGroupingTest.xml
index 8cc7192a86d..ffbce7d6679 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateReduceGroupingTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateReduceGroupingTest.xml
@@ -381,7 +381,7 @@ LogicalProject(a4=[$0], b4=[$1], EXPR$2=[$3])
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-HashWindowAggregate(groupBy=[a4], auxGrouping=[b4],
window=[TumblingGroupWindow('w$, d4, 900000)], select=[a4, b4 AS EXPR$2,
COUNT(c4) AS EXPR$2])
+HashWindowAggregate(groupBy=[a4], auxGrouping=[b4],
window=[TumblingGroupWindow('w$, d4, 900000)], select=[a4, b4, COUNT(c4) AS
EXPR$2])
+- Exchange(distribution=[hash[a4]])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, T4,
source: [TestTableSource(a4, b4, c4, d4)]]], fields=[a4, b4, c4, d4])
]]>
@@ -401,7 +401,7 @@ LogicalProject(a4=[$0], c4=[$1], EXPR$2=[$3], EXPR$3=[$4])
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-HashWindowAggregate(groupBy=[a4], auxGrouping=[c4],
window=[TumblingGroupWindow('w$, d4, 900000)], select=[a4, c4 AS EXPR$2,
COUNT(b4) AS EXPR$2, AVG(b4) AS EXPR$3])
+HashWindowAggregate(groupBy=[a4], auxGrouping=[c4],
window=[TumblingGroupWindow('w$, d4, 900000)], select=[a4, c4, COUNT(b4) AS
EXPR$2, AVG(b4) AS EXPR$3])
+- Exchange(distribution=[hash[a4]])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, T4,
source: [TestTableSource(a4, b4, c4, d4)]]], fields=[a4, b4, c4, d4])
]]>
@@ -428,7 +428,7 @@ Calc(select=[a4, c4, s, EXPR$3])
+- Exchange(distribution=[hash[a4, s]])
+- LocalHashAggregate(groupBy=[a4, s], auxGrouping=[c4], select=[a4, s,
c4, Partial_COUNT(b4) AS count$0])
+- Calc(select=[a4, c4, w$start AS s, CAST((($f2 - (($f3 * $f3) /
$f4)) / $f4) AS INTEGER) AS b4])
- +- HashWindowAggregate(groupBy=[a4], auxGrouping=[c4],
window=[TumblingGroupWindow('w$, d4, 900000)], properties=[w$start, w$end,
w$rowtime], select=[a4, c4 AS $f2, SUM($f4) AS $f2, SUM(b4) AS $f3, COUNT(b4)
AS $f4])
+ +- HashWindowAggregate(groupBy=[a4], auxGrouping=[c4],
window=[TumblingGroupWindow('w$, d4, 900000)], properties=[w$start, w$end,
w$rowtime], select=[a4, c4, SUM($f4) AS $f2, SUM(b4) AS $f3, COUNT(b4) AS $f4])
+- Exchange(distribution=[keep_input_as_is[hash[a4]]])
+- Calc(select=[a4, c4, d4, b4, (b4 * b4) AS $f4])
+- Exchange(distribution=[hash[a4]])
@@ -457,7 +457,7 @@ Calc(select=[a4, c4, e, EXPR$3])
+- Exchange(distribution=[hash[a4, e]])
+- LocalHashAggregate(groupBy=[a4, e], auxGrouping=[c4], select=[a4, e,
c4, Partial_COUNT(b4) AS count$0])
+- Calc(select=[a4, c4, w$end AS e, CAST((($f2 - (($f3 * $f3) / $f4))
/ $f4) AS INTEGER) AS b4])
- +- HashWindowAggregate(groupBy=[a4], auxGrouping=[c4],
window=[TumblingGroupWindow('w$, d4, 900000)], properties=[w$start, w$end,
w$rowtime], select=[a4, c4 AS $f2, SUM($f4) AS $f2, SUM(b4) AS $f3, COUNT(b4)
AS $f4])
+ +- HashWindowAggregate(groupBy=[a4], auxGrouping=[c4],
window=[TumblingGroupWindow('w$, d4, 900000)], properties=[w$start, w$end,
w$rowtime], select=[a4, c4, SUM($f4) AS $f2, SUM(b4) AS $f3, COUNT(b4) AS $f4])
+- Exchange(distribution=[keep_input_as_is[hash[a4]]])
+- Calc(select=[a4, c4, d4, b4, (b4 * b4) AS $f4])
+- Exchange(distribution=[hash[a4]])
@@ -485,7 +485,7 @@ HashAggregate(isMerge=[true], groupBy=[a4, b4],
auxGrouping=[c4], select=[a4, b4
+- Exchange(distribution=[hash[a4, b4]])
+- LocalHashAggregate(groupBy=[a4, b4], auxGrouping=[c4], select=[a4, b4,
c4, Partial_COUNT(*) AS count1$0])
+- Calc(select=[a4, CAST((($f2 - (($f3 * $f3) / $f4)) / $f4) AS INTEGER)
AS b4, c4])
- +- HashWindowAggregate(groupBy=[a4], auxGrouping=[c4],
window=[TumblingGroupWindow('w$, d4, 900000)], properties=[w$start, w$end,
w$rowtime], select=[a4, c4 AS $f2, SUM($f4) AS $f2, SUM(b4) AS $f3, COUNT(b4)
AS $f4])
+ +- HashWindowAggregate(groupBy=[a4], auxGrouping=[c4],
window=[TumblingGroupWindow('w$, d4, 900000)], properties=[w$start, w$end,
w$rowtime], select=[a4, c4, SUM($f4) AS $f2, SUM(b4) AS $f3, COUNT(b4) AS $f4])
+- Exchange(distribution=[keep_input_as_is[hash[a4]]])
+- Calc(select=[a4, c4, d4, b4, (b4 * b4) AS $f4])
+- Exchange(distribution=[hash[a4]])
@@ -635,6 +635,31 @@ HashAggregate(isMerge=[true], groupBy=[a3, b3],
select=[a3, b3, Final_COUNT(coun
+- LocalHashAggregate(groupBy=[a3, b3], select=[a3, b3, Partial_COUNT(c3)
AS count$0])
+- Calc(select=[a3, b3, c3])
+- LegacyTableSourceScan(table=[[default_catalog, default_database,
T3, source: [TestTableSource(a3, b3, c3, d3)]]], fields=[a3, b3, c3, d3])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testImperativeAggWithAuxiliaryGrouping">
+ <Resource name="sql">
+ <![CDATA[SELECT a4, c4, COUNT(b4) FROM (SELECT a4, c4, ARRAY_AGG(b4) AS
b4 FROM T4 GROUP BY a4, c4, TUMBLE(d4, INTERVAL '15' MINUTE)) t GROUP BY a4,
c4]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalAggregate(group=[{0, 1}], EXPR$2=[COUNT($2)])
++- LogicalProject(a4=[$0], c4=[$1], b4=[$3])
+ +- LogicalAggregate(group=[{0, 1, 2}], b4=[ARRAY_AGG($3)])
+ +- LogicalProject(a4=[$0], c4=[$2], $f2=[$TUMBLE($3, 900000:INTERVAL
MINUTE)], b4=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T4,
source: [TestTableSource(a4, b4, c4, d4)]]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+HashAggregate(isMerge=[false], groupBy=[a4], auxGrouping=[c4], select=[a4, c4,
COUNT(b4) AS EXPR$2])
++- Exchange(distribution=[hash[a4]])
+ +- SortWindowAggregate(groupBy=[a4], auxGrouping=[c4],
window=[TumblingGroupWindow('w$, d4, 900000)], select=[a4, c4, ARRAY_AGG(b4) AS
b4])
+ +- Exchange(distribution=[forward])
+ +- Sort(orderBy=[a4 ASC, d4 ASC])
+ +- Exchange(distribution=[hash[a4]])
+ +- LegacyTableSourceScan(table=[[default_catalog,
default_database, T4, source: [TestTableSource(a4, b4, c4, d4)]]], fields=[a4,
b4, c4, d4])
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AggregateReduceGroupingRuleTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AggregateReduceGroupingRuleTest.xml
index e65c28bdba1..77f9b7e23f1 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AggregateReduceGroupingRuleTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AggregateReduceGroupingRuleTest.xml
@@ -572,6 +572,27 @@ LogicalAggregate(group=[{0, 1}], EXPR$2=[COUNT($2)])
FlinkLogicalAggregate(group=[{0, 1}], EXPR$2=[COUNT($2)])
+- FlinkLogicalCalc(select=[a3, b3, c3])
+- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog,
default_database, T3, source: [TestTableSource(a3, b3, c3, d3)]]], fields=[a3,
b3, c3, d3])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testImperativeAggWithAuxiliaryGrouping">
+ <Resource name="sql">
+ <![CDATA[SELECT a4, c4, COUNT(b4) FROM (SELECT a4, c4, ARRAY_AGG(b4) AS
b4 FROM T4 GROUP BY a4, c4, TUMBLE(d4, INTERVAL '15' MINUTE)) t GROUP BY a4,
c4]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalAggregate(group=[{0, 1}], EXPR$2=[COUNT($2)])
++- LogicalProject(a4=[$0], c4=[$1], b4=[$3])
+ +- LogicalAggregate(group=[{0, 1, 2}], b4=[ARRAY_AGG($3)])
+ +- LogicalProject(a4=[$0], c4=[$2], $f2=[$TUMBLE($3, 900000:INTERVAL
MINUTE)], b4=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T4,
source: [TestTableSource(a4, b4, c4, d4)]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+FlinkLogicalAggregate(group=[{0}], c4=[AUXILIARY_GROUP($1)],
EXPR$2=[COUNT($2)])
++- FlinkLogicalWindowAggregate(group=[{0}], c4=[AUXILIARY_GROUP($2)],
b4=[ARRAY_AGG($1)], window=[TumblingGroupWindow('w$, d4, 900000)],
properties=[])
+ +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog,
default_database, T4, source: [TestTableSource(a4, b4, c4, d4)]]], fields=[a4,
b4, c4, d4])
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/AggregateReduceGroupingTestBase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/AggregateReduceGroupingTestBase.scala
index 9fb0ee322f2..7a25cd25048 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/AggregateReduceGroupingTestBase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/AggregateReduceGroupingTestBase.scala
@@ -342,6 +342,14 @@ abstract class
AggregateReduceGroupingTestBase(withExecPlan: Boolean) extends Ta
"SELECT a1, d1, COUNT(DISTINCT c1), MAX(DISTINCT b1), SUM(b1) FROM T1
GROUP BY a1, d1")
}
+ @Test
+ def testImperativeAggWithAuxiliaryGrouping(): Unit = {
+ verifyPlan(
+ "SELECT a4, c4, COUNT(b4) FROM " +
+ "(SELECT a4, c4, ARRAY_AGG(b4) AS b4 FROM T4 " +
+ "GROUP BY a4, c4, TUMBLE(d4, INTERVAL '15' MINUTE)) t GROUP BY a4, c4")
+ }
+
def verifyPlan(sqlQuery: String): Unit = {
if (withExecPlan) {
util.verifyExecPlan(sqlQuery)