This is an automated email from the ASF dual-hosted git repository.
lincoln-lil pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 8d83511bbf0 [FLINK-39720][table] SubQueryDecorrelator produces
incorrect plans for correlated EXISTS with HAVING on aggregate outputs
8d83511bbf0 is described below
commit 8d83511bbf037f4816b4fdcf8e1732c57bebc413
Author: lincoln lee <[email protected]>
AuthorDate: Mon May 25 14:40:28 2026 +0800
[FLINK-39720][table] SubQueryDecorrelator produces incorrect plans for
correlated EXISTS with HAVING on aggregate outputs
This closes #28217.
---
.../plan/rules/logical/SubQueryDecorrelator.java | 13 +-
.../logical/subquery/SubQuerySemiJoinTest.xml | 153 +++++++++++++++++++++
.../logical/subquery/SubQuerySemiJoinTest.scala | 45 ++++++
3 files changed, 210 insertions(+), 1 deletion(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/SubQueryDecorrelator.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/SubQueryDecorrelator.java
index 04832a80798..9ac57e63973 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/SubQueryDecorrelator.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/SubQueryDecorrelator.java
@@ -532,9 +532,20 @@ public class SubQueryDecorrelator extends RelShuttleImpl {
unsupportedCorConditions);
assert unsupportedCorConditions.isEmpty();
- final RexNode remainingCondition =
+ RexNode remainingCondition =
RexUtil.composeConjunction(rexBuilder, nonCorConditions,
false);
+ // Re-index the remaining (non-correlated) condition against the
rewritten input.
+ // The child may have shifted its row type during decorrelation
(e.g. an Aggregate
+ // injects correlated columns into its group key), so RexInputRefs
in HAVING /
+ // Filter predicates that survive in nonCorConditions must be
remapped through
+ // frame.oldToNewOutputs. Otherwise they silently point at the
wrong column.
+ if (remainingCondition != null) {
+ remainingCondition =
+ adjustInputRefs(
+ remainingCondition, frame.oldToNewOutputs,
frame.r.getRowType());
+ }
+
// Using LogicalFilter.create instead of RelBuilder.filter to
create Filter
// because RelBuilder.filter method does not have VariablesSet arg.
final RelNode newFilter =
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/subquery/SubQuerySemiJoinTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/subquery/SubQuerySemiJoinTest.xml
index a63e9c9cf6d..53a20710540 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/subquery/SubQuerySemiJoinTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/subquery/SubQuerySemiJoinTest.xml
@@ -366,6 +366,35 @@ LogicalProject(a=[$0], b=[$1], c=[$2])
+- LogicalProject(e=[$1], f=[$2])
+- LogicalFilter(condition=[true])
+- LogicalTableScan(table=[[default_catalog, default_database,
r]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testExistsWithCorrelatedOnWhere_Aggregate_LocalWhere">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM l WHERE EXISTS (SELECT 1 FROM r WHERE l.a = r.d
AND r.e > 10 GROUP BY r.f)]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[EXISTS({
+LogicalAggregate(group=[{0}])
+ LogicalProject(f=[$2])
+ LogicalFilter(condition=[AND(=($cor0.a, $0), >($1, 10))])
+ LogicalTableScan(table=[[default_catalog, default_database, r]])
+})], variablesSet=[[$cor0]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, l]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalJoin(condition=[=($0, $3)], joinType=[semi])
+ :- LogicalTableScan(table=[[default_catalog, default_database, l]])
+ +- LogicalProject(d=[$1])
+ +- LogicalAggregate(group=[{0, 1}])
+ +- LogicalProject(f=[$2], d=[$0])
+ +- LogicalFilter(condition=[>($1, 10)])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
r]])
]]>
</Resource>
</TestCase>
@@ -446,6 +475,130 @@ LogicalProject(a=[$0], b=[$1])
+- LogicalProject(d=[$1])
+- LogicalFilter(condition=[true])
+- LogicalTableScan(table=[[default_catalog, default_database, y]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testExistsWithCorrelatedOnWhere_Having1">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM l WHERE EXISTS (SELECT 1 FROM r WHERE l.a = r.d
GROUP BY r.f HAVING SUM(r.e) >= 3)]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[EXISTS({
+LogicalFilter(condition=[>=($1, 3)])
+ LogicalAggregate(group=[{0}], agg#0=[SUM($1)])
+ LogicalProject(f=[$2], e=[$1])
+ LogicalFilter(condition=[=($cor0.a, $0)])
+ LogicalTableScan(table=[[default_catalog, default_database, r]])
+})], variablesSet=[[$cor0]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, l]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalJoin(condition=[=($0, $3)], joinType=[semi])
+ :- LogicalTableScan(table=[[default_catalog, default_database, l]])
+ +- LogicalProject(d=[$1])
+ +- LogicalFilter(condition=[>=($2, 3)])
+ +- LogicalAggregate(group=[{0, 1}], agg#0=[SUM($2)])
+ +- LogicalProject(f=[$2], d=[$0], e=[$1])
+ +- LogicalFilter(condition=[true])
+ +- LogicalTableScan(table=[[default_catalog,
default_database, r]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testExistsWithCorrelatedOnWhere_Having2">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM l WHERE EXISTS (SELECT 1 FROM r WHERE l.a = r.d
GROUP BY r.f HAVING SUM(r.e) >= 3 AND MAX(r.e) < 100)]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[EXISTS({
+LogicalFilter(condition=[AND(>=($1, 3), <($2, 100))])
+ LogicalAggregate(group=[{0}], agg#0=[SUM($1)], agg#1=[MAX($1)])
+ LogicalProject(f=[$2], e=[$1])
+ LogicalFilter(condition=[=($cor0.a, $0)])
+ LogicalTableScan(table=[[default_catalog, default_database, r]])
+})], variablesSet=[[$cor0]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, l]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalJoin(condition=[=($0, $3)], joinType=[semi])
+ :- LogicalTableScan(table=[[default_catalog, default_database, l]])
+ +- LogicalProject(d=[$1])
+ +- LogicalFilter(condition=[AND(>=($2, 3), <($3, 100))])
+ +- LogicalAggregate(group=[{0, 1}], agg#0=[SUM($2)], agg#1=[MAX($2)])
+ +- LogicalProject(f=[$2], d=[$0], e=[$1])
+ +- LogicalFilter(condition=[true])
+ +- LogicalTableScan(table=[[default_catalog,
default_database, r]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testExistsWithCorrelatedOnWhere_Having3">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM l WHERE EXISTS (SELECT 1 FROM r WHERE l.a = r.d
GROUP BY r.f HAVING SUM(r.e) >= 3 AND COUNT(*) > 1)]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[EXISTS({
+LogicalFilter(condition=[AND(>=($1, 3), >($2, 1))])
+ LogicalAggregate(group=[{0}], agg#0=[SUM($1)], agg#1=[COUNT()])
+ LogicalProject(f=[$2], e=[$1])
+ LogicalFilter(condition=[=($cor0.a, $0)])
+ LogicalTableScan(table=[[default_catalog, default_database, r]])
+})], variablesSet=[[$cor0]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, l]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalJoin(condition=[=($0, $3)], joinType=[semi])
+ :- LogicalTableScan(table=[[default_catalog, default_database, l]])
+ +- LogicalProject(d=[$1])
+ +- LogicalFilter(condition=[AND(>=($2, 3), >($3, 1))])
+ +- LogicalAggregate(group=[{0, 1}], agg#0=[SUM($2)], agg#1=[COUNT()])
+ +- LogicalProject(f=[$2], d=[$0], e=[$1])
+ +- LogicalFilter(condition=[true])
+ +- LogicalTableScan(table=[[default_catalog,
default_database, r]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testExistsWithCorrelatedOnWhere_Having4">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM l WHERE EXISTS (SELECT 1 FROM r WHERE l.a = r.d
AND l.b = r.e GROUP BY r.f HAVING COUNT(r.d) >= 2)]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[EXISTS({
+LogicalFilter(condition=[>=($1, 2)])
+ LogicalAggregate(group=[{0}], agg#0=[COUNT($1)])
+ LogicalProject(f=[$2], d=[$0])
+ LogicalFilter(condition=[AND(=($cor0.a, $0), =($cor0.b, $1))])
+ LogicalTableScan(table=[[default_catalog, default_database, r]])
+})], variablesSet=[[$cor0]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, l]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalJoin(condition=[AND(=($0, $3), =($1, $4))], joinType=[semi])
+ :- LogicalTableScan(table=[[default_catalog, default_database, l]])
+ +- LogicalProject(d=[$1], e=[$2])
+ +- LogicalFilter(condition=[>=($3, 2)])
+ +- LogicalAggregate(group=[{0, 1, 2}], agg#0=[COUNT($1)])
+ +- LogicalProject(f=[$2], d=[$0], e=[$1])
+ +- LogicalFilter(condition=[true])
+ +- LogicalTableScan(table=[[default_catalog,
default_database, r]])
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/subquery/SubQuerySemiJoinTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/subquery/SubQuerySemiJoinTest.scala
index 8785e5a77f6..5b0802ccf59 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/subquery/SubQuerySemiJoinTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/subquery/SubQuerySemiJoinTest.scala
@@ -1323,6 +1323,51 @@ class SubQuerySemiJoinTest extends SubQueryTestBase {
util.verifyRelPlanNotExpected(sqlQuery, "joinType=[semi]")
}
+ @Test
+ def testExistsWithCorrelatedOnWhere_Having1(): Unit = {
+ // Correlated WHERE plus HAVING on a single aggregate output.
+ // Regression for SubQueryDecorrelator: the non-correlated HAVING
predicate must be
+ // re-indexed against the rewritten Aggregate (which receives the
correlated column
+ // injected into its group key).
+ val sqlQuery = "SELECT * FROM l WHERE EXISTS " +
+ "(SELECT 1 FROM r WHERE l.a = r.d GROUP BY r.f HAVING SUM(r.e) >= 3)"
+ util.verifyRelPlan(sqlQuery)
+ }
+
+ @Test
+ def testExistsWithCorrelatedOnWhere_Having2(): Unit = {
+ // Compound HAVING with multiple aggregate refs.
+ val sqlQuery = "SELECT * FROM l WHERE EXISTS " +
+ "(SELECT 1 FROM r WHERE l.a = r.d GROUP BY r.f HAVING SUM(r.e) >= 3 AND
MAX(r.e) < 100)"
+ util.verifyRelPlan(sqlQuery)
+ }
+
+ @Test
+ def testExistsWithCorrelatedOnWhere_Having3(): Unit = {
+ // HAVING that mixes an aggregate ref with COUNT(*).
+ val sqlQuery = "SELECT * FROM l WHERE EXISTS " +
+ "(SELECT 1 FROM r WHERE l.a = r.d GROUP BY r.f HAVING SUM(r.e) >= 3 AND
COUNT(*) > 1)"
+ util.verifyRelPlan(sqlQuery)
+ }
+
+ @Test
+ def testExistsWithCorrelatedOnWhere_Having4(): Unit = {
+ // Multiple correlated WHERE columns combined with a HAVING on aggregate
output.
+ val sqlQuery = "SELECT * FROM l WHERE EXISTS " +
+ "(SELECT 1 FROM r WHERE l.a = r.d AND l.b = r.e GROUP BY r.f HAVING
COUNT(r.d) >= 2)"
+ util.verifyRelPlan(sqlQuery)
+ }
+
+ @Test
+ def testExistsWithCorrelatedOnWhere_Aggregate_LocalWhere(): Unit = {
+ // Mixed correlated + local WHERE, no HAVING. Guards against an over-eager
fix:
+ // the local predicate `r.e > 10` sits below the Aggregate, so its
RexInputRef must
+ // remain stable through decorrelation.
+ val sqlQuery = "SELECT * FROM l WHERE EXISTS " +
+ "(SELECT 1 FROM r WHERE l.a = r.d AND r.e > 10 GROUP BY r.f)"
+ util.verifyRelPlan(sqlQuery)
+ }
+
@Test
def testExistsWithCorrelatedOnWhere_UnsupportedAggregate1(): Unit = {
util.addTableSource[(Int, Long)]("l1", 'a, 'b)