This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new aa97d86c1ed [FLINK-38317][table] Make
PushCalcPastChangelogNormalizeRule tolerant to trimmed fields
aa97d86c1ed is described below
commit aa97d86c1ed252d92c3cc4e31718d42b50408f64
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Thu Sep 4 09:31:07 2025 +0200
[FLINK-38317][table] Make PushCalcPastChangelogNormalizeRule tolerant to
trimmed fields
---
.../stream/PushCalcPastChangelogNormalizeRule.java | 7 +-
.../PushCalcPastChangelogNormalizeRuleTest.java | 20 ++++++
.../PushCalcPastChangelogNormalizeRuleTest.xml | 82 ++++++++++++++++++++++
3 files changed, 107 insertions(+), 2 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/PushCalcPastChangelogNormalizeRule.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/PushCalcPastChangelogNormalizeRule.java
index bbe64d2e5ac..da152485828 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/PushCalcPastChangelogNormalizeRule.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/PushCalcPastChangelogNormalizeRule.java
@@ -326,8 +326,11 @@ public class PushCalcPastChangelogNormalizeRule
}
if (!conditions.isEmpty()) {
final RexNode condition = relBuilder.and(conditions);
- programBuilder.addCondition(
- FlinkRexUtil.simplify(relBuilder.getRexBuilder(),
condition, rexExecutor));
+ final RexNode simplifiedCondition =
+ FlinkRexUtil.simplify(relBuilder.getRexBuilder(),
condition, rexExecutor);
+ if (!condition.isAlwaysTrue()) {
+
programBuilder.addCondition(adjustInputRef(simplifiedCondition,
inputRefMapping));
+ }
}
final RexProgram newProgram = programBuilder.getProgram();
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/stream/PushCalcPastChangelogNormalizeRuleTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/stream/PushCalcPastChangelogNormalizeRuleTest.java
index 3e466f40fc0..a31f9c574c0 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/stream/PushCalcPastChangelogNormalizeRuleTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/stream/PushCalcPastChangelogNormalizeRuleTest.java
@@ -217,4 +217,24 @@ class PushCalcPastChangelogNormalizeRuleTest extends
TableTestBase {
util.tableEnv().createTable("T", sourceDescriptorWithTwoPrimaryKeys);
util.verifyRelPlan("SELECT f1, f5 FROM T WHERE f1 < 1 AND f3 IS NOT
NULL");
}
+
+ @Test
+ void testPartialPushDownWithTrimmedFieldsAndDifferentProjection() {
+ util.tableEnv().createTable("T", sourceDescriptorWithTwoPrimaryKeys);
+ // verifyExecPlan is intended here as it will show whether the node is
reused or not
+ util.verifyExecPlan(
+ "SELECT f3 FROM T WHERE f2 < 1 AND f2 > 0\n"
+ + " UNION SELECT f3 FROM T WHERE f2 < 3 AND f2 > 0\n"
+ + " INTERSECT SELECT f3 FROM T WHERE f2 > 0 AND f2 <
10");
+ }
+
+ @Test
+ void testPartialPushDownWithTrimmedFields() {
+ util.tableEnv().createTable("T", sourceDescriptorWithTwoPrimaryKeys);
+ // verifyExecPlan is intended here as it will show whether the node is
reused or not
+ util.verifyExecPlan(
+ "SELECT f2 FROM T WHERE f2 < 1 AND f2 > 0\n"
+ + " UNION SELECT f2 FROM T WHERE f2 < 3 AND f2 > 0\n"
+ + " INTERSECT SELECT f2 FROM T WHERE f2 > 0 AND f2 <
10");
+ }
}
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/PushCalcPastChangelogNormalizeRuleTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/PushCalcPastChangelogNormalizeRuleTest.xml
index 76fb44c599a..506a22b2afd 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/PushCalcPastChangelogNormalizeRuleTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/PushCalcPastChangelogNormalizeRuleTest.xml
@@ -292,6 +292,88 @@ Calc(select=[f1, f5])
+- Exchange(distribution=[hash[f1, f2]])
+- Calc(select=[f1, f2, f3, f5], where=[<(f1, 1)])
+- TableSourceScan(table=[[default_catalog, default_database, T]],
fields=[f0, f1, f2, f3, f4, f5, f6, f7])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testPartialPushDownWithTrimmedFields">
+ <Resource name="sql">
+ <![CDATA[SELECT f2 FROM T WHERE f2 < 1 AND f2 > 0
+ UNION SELECT f2 FROM T WHERE f2 < 3 AND f2 > 0
+ INTERSECT SELECT f2 FROM T WHERE f2 > 0 AND f2 < 10]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalUnion(all=[false])
+:- LogicalProject(f2=[$2])
+: +- LogicalFilter(condition=[AND(<($2, 1), >($2, 0))])
+: +- LogicalTableScan(table=[[default_catalog, default_database, T]])
++- LogicalIntersect(all=[false])
+ :- LogicalProject(f2=[$2])
+ : +- LogicalFilter(condition=[AND(<($2, 3), >($2, 0))])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T]])
+ +- LogicalProject(f2=[$2])
+ +- LogicalFilter(condition=[AND(>($2, 0), <($2, 10))])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+GroupAggregate(groupBy=[f2], select=[f2])
++- Exchange(distribution=[hash[f2]])
+ +- Union(all=[true], union=[f2])
+ :- Calc(select=[f2], where=[(f2 < 1)])
+ : +- ChangelogNormalize(key=[f1, f2])(reuse_id=[1])
+ : +- Exchange(distribution=[hash[f1, f2]])
+ : +- Calc(select=[f1, f2], where=[(f2 > 0)])
+ : +- TableSourceScan(table=[[default_catalog,
default_database, T]], fields=[f0, f1, f2, f3, f4, f5, f6, f7])
+ +- Join(joinType=[LeftSemiJoin], where=[(f2 = f20)], select=[f2],
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ :- Exchange(distribution=[hash[f2]])
+ : +- Calc(select=[f2], where=[(f2 < 3)])
+ : +- Reused(reference_id=[1])
+ +- Exchange(distribution=[hash[f2]])
+ +- Calc(select=[f2], where=[(f2 < 10)])
+ +- Reused(reference_id=[1])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testPartialPushDownWithTrimmedFieldsAndDifferentProjection">
+ <Resource name="sql">
+ <![CDATA[SELECT f3 FROM T WHERE f2 < 1 AND f2 > 0
+ UNION SELECT f3 FROM T WHERE f2 < 3 AND f2 > 0
+ INTERSECT SELECT f3 FROM T WHERE f2 > 0 AND f2 < 10]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalUnion(all=[false])
+:- LogicalProject(f3=[$3])
+: +- LogicalFilter(condition=[AND(<($2, 1), >($2, 0))])
+: +- LogicalTableScan(table=[[default_catalog, default_database, T]])
++- LogicalIntersect(all=[false])
+ :- LogicalProject(f3=[$3])
+ : +- LogicalFilter(condition=[AND(<($2, 3), >($2, 0))])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T]])
+ +- LogicalProject(f3=[$3])
+ +- LogicalFilter(condition=[AND(>($2, 0), <($2, 10))])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+GroupAggregate(groupBy=[f3], select=[f3])
++- Exchange(distribution=[hash[f3]])
+ +- Union(all=[true], union=[f3])
+ :- Calc(select=[f3], where=[(f2 < 1)])
+ : +- ChangelogNormalize(key=[f1, f2])(reuse_id=[1])
+ : +- Exchange(distribution=[hash[f1, f2]])
+ : +- Calc(select=[f1, f2, f3], where=[(f2 > 0)])
+ : +- TableSourceScan(table=[[default_catalog,
default_database, T]], fields=[f0, f1, f2, f3, f4, f5, f6, f7])
+ +- Join(joinType=[LeftSemiJoin], where=[IS NOT DISTINCT FROM(f3, f30)],
select=[f3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ :- Exchange(distribution=[hash[f3]])
+ : +- Calc(select=[f3], where=[(f2 < 3)])
+ : +- Reused(reference_id=[1])
+ +- Exchange(distribution=[hash[f3]])
+ +- Calc(select=[f3], where=[(f2 < 10)])
+ +- Reused(reference_id=[1])
]]>
</Resource>
</TestCase>