This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a commit to branch release-1.20
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.20 by this push:
     new acc348613e5 [FLINK-35804][table-planner] Fix incorrect calc merge 
during decorrelate phase
acc348613e5 is described below

commit acc348613e5c0f955e34e5fd456d3cbd5a29b5de
Author: lincoln lee <[email protected]>
AuthorDate: Mon Jul 15 15:17:58 2024 +0800

    [FLINK-35804][table-planner] Fix incorrect calc merge during decorrelate 
phase
    
    This closes #25084
    
    Co-authored-by: zhaorongsheng <[email protected]>
---
 .../apache/calcite/sql2rel/RelDecorrelator.java    | 38 ++++++++++++----------
 .../logical/FlinkFilterProjectTransposeRule.java   |  4 +++
 .../table/planner/plan/batch/sql/CalcTest.xml      | 31 ++++++++++++++++++
 .../table/planner/plan/stream/sql/CalcTest.xml     | 31 ++++++++++++++++++
 .../table/planner/plan/batch/sql/CalcTest.scala    | 16 +++++++++
 .../table/planner/plan/stream/sql/CalcTest.scala   | 16 +++++++++
 6 files changed, 119 insertions(+), 17 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
index 10c2764a807..8aed985a7b0 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
@@ -16,6 +16,8 @@
  */
 package org.apache.calcite.sql2rel;
 
+import 
org.apache.flink.table.planner.plan.rules.logical.FlinkFilterProjectTransposeRule;
+
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
@@ -125,9 +127,9 @@ import static 
org.apache.calcite.linq4j.Nullness.castNonNull;
  * Copied to fix calcite issues. FLINK modifications are at lines
  *
  * <ol>
- *   <li>Was changed within FLINK-29280, FLINK-28682: Line 216 ~ 223
- *   <li>Should be removed after fix of FLINK-29540: Line 289 ~ 295
- *   <li>Should be removed after fix of FLINK-29540: Line 307 ~ 313
+ *   <li>Was changed within FLINK-29280, FLINK-28682, FLINK-35804: Line 218 ~ 
225, Line 273 ~ 288
+ *   <li>Should be removed after fix of FLINK-29540: Line 293 ~ 299
+ *   <li>Should be removed after fix of FLINK-29540: Line 311 ~ 317
  * </ol>
  */
 public class RelDecorrelator implements ReflectiveVisitor {
@@ -268,20 +270,22 @@ public class RelDecorrelator implements ReflectiveVisitor 
{
                                                         
.FilterIntoJoinRuleConfig.class)
                                         .toRule())
                         .addRuleInstance(
-                                CoreRules.FILTER_PROJECT_TRANSPOSE
-                                        .config
-                                        .withRelBuilderFactory(f)
-                                        
.as(FilterProjectTransposeRule.Config.class)
-                                        .withOperandFor(
-                                                Filter.class,
-                                                filter ->
-                                                        
!RexUtil.containsCorrelation(
-                                                                
filter.getCondition()),
-                                                Project.class,
-                                                project -> true)
-                                        .withCopyFilter(true)
-                                        .withCopyProject(true)
-                                        .toRule())
+                                // ----- FLINK MODIFICATION BEGIN -----
+                                FlinkFilterProjectTransposeRule.build(
+                                        CoreRules.FILTER_PROJECT_TRANSPOSE
+                                                .config
+                                                .withRelBuilderFactory(f)
+                                                
.as(FilterProjectTransposeRule.Config.class)
+                                                .withOperandFor(
+                                                        Filter.class,
+                                                        filter ->
+                                                                
!RexUtil.containsCorrelation(
+                                                                        
filter.getCondition()),
+                                                        Project.class,
+                                                        project -> true)
+                                                .withCopyFilter(true)
+                                                .withCopyProject(true)))
+                        // ----- FLINK MODIFICATION END -----
                         .addRuleInstance(
                                 FilterCorrelateRule.Config.DEFAULT
                                         .withRelBuilderFactory(f)
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterProjectTransposeRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterProjectTransposeRule.java
index fdca581b612..54d6f277c8c 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterProjectTransposeRule.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterProjectTransposeRule.java
@@ -39,6 +39,10 @@ public class FlinkFilterProjectTransposeRule extends 
FilterProjectTransposeRule
 
     public static final RelOptRule INSTANCE = new 
FlinkFilterProjectTransposeRule(Config.DEFAULT);
 
+    public static FlinkFilterProjectTransposeRule build(Config config) {
+        return new FlinkFilterProjectTransposeRule(config);
+    }
+
     protected FlinkFilterProjectTransposeRule(Config config) {
         super(config);
     }
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/CalcTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/CalcTest.xml
index 0b04d0a7924..69692846ffd 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/CalcTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/CalcTest.xml
@@ -69,6 +69,37 @@ LogicalProject(a=[$0])
       <![CDATA[
 Calc(select=[a], where=[>(random_udf(b), 10)])
 +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCalcMergeWithCorrelate">
+    <Resource name="sql">
+      <![CDATA[
+SELECT a, r FROM (
+ SELECT a, random_udf(b) r FROM (
+  select a, b, c1 FROM MyTable, LATERAL TABLE(str_split(c)) AS T(c1)
+ ) t
+)
+WHERE r > 10
+]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], r=[$1])
++- LogicalFilter(condition=[>($1, 10)])
+   +- LogicalProject(a=[$0], r=[random_udf($1)])
+      +- LogicalProject(a=[$0], b=[$1], c1=[$3])
+         +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{2}])
+            :- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c)]]])
+            +- LogicalTableFunctionScan(invocation=[str_split($cor0.c)], 
rowType=[RecordType(VARCHAR(2147483647) EXPR$0)])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[a, r], where=[>(r, 10)])
++- Calc(select=[a, random_udf(b) AS r])
+   +- Correlate(invocation=[str_split($cor0.c)], 
correlate=[table(str_split($cor0.c))], select=[a,b,c,EXPR$0], 
rowType=[RecordType(BIGINT a, INTEGER b, VARCHAR(2147483647) c, 
VARCHAR(2147483647) EXPR$0)], joinType=[INNER])
+      +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml
index 4397895cebc..215a6aa8eea 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml
@@ -69,6 +69,37 @@ LogicalProject(a=[$0])
       <![CDATA[
 Calc(select=[a], where=[>(random_udf(b), 10)])
 +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCalcMergeWithCorrelate">
+    <Resource name="sql">
+      <![CDATA[
+SELECT a, r FROM (
+ SELECT a, random_udf(b) r FROM (
+  select a, b, c1 FROM MyTable, LATERAL TABLE(str_split(c)) AS T(c1)
+ ) t
+)
+WHERE r > 10
+]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], r=[$1])
++- LogicalFilter(condition=[>($1, 10)])
+   +- LogicalProject(a=[$0], r=[random_udf($1)])
+      +- LogicalProject(a=[$0], b=[$1], c1=[$3])
+         +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{2}])
+            :- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c)]]])
+            +- LogicalTableFunctionScan(invocation=[str_split($cor0.c)], 
rowType=[RecordType(VARCHAR(2147483647) EXPR$0)])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+       <![CDATA[
+Calc(select=[a, r], where=[>(r, 10)])
++- Calc(select=[a, random_udf(b) AS r])
+   +- Correlate(invocation=[str_split($cor0.c)], 
correlate=[table(str_split($cor0.c))], select=[a,b,c,EXPR$0], 
rowType=[RecordType(BIGINT a, INTEGER b, VARCHAR(2147483647) c, 
VARCHAR(2147483647) EXPR$0)], joinType=[INNER])
+      +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/CalcTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/CalcTest.scala
index abd196d34af..49bd11af112 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/CalcTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/CalcTest.scala
@@ -23,6 +23,7 @@ import org.apache.flink.api.scala._
 import org.apache.flink.table.api._
 import org.apache.flink.table.planner.plan.utils.MyPojo
 import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.NonDeterministicUdf
+import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedTableFunctions.{JavaTableFunc1,
 StringSplit}
 import org.apache.flink.table.planner.utils.TableTestBase
 
 import org.assertj.core.api.Assertions.assertThatExceptionOfType
@@ -207,4 +208,19 @@ class CalcTest extends TableTestBase {
     val sqlQuery = "SELECT a FROM (SELECT a, b FROM MyTable) t WHERE 
random_udf(b) > 10"
     util.verifyRelPlan(sqlQuery)
   }
+
+  @Test
+  def testCalcMergeWithCorrelate(): Unit = {
+    util.addTemporarySystemFunction("str_split", new StringSplit())
+    val sqlQuery =
+      """
+        |SELECT a, r FROM (
+        | SELECT a, random_udf(b) r FROM (
+        |  select a, b, c1 FROM MyTable, LATERAL TABLE(str_split(c)) AS T(c1)
+        | ) t
+        |)
+        |WHERE r > 10
+        |""".stripMargin
+    util.verifyRelPlan(sqlQuery)
+  }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/CalcTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/CalcTest.scala
index a0643c21e62..1c62fc054e2 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/CalcTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/CalcTest.scala
@@ -23,6 +23,7 @@ import org.apache.flink.api.scala._
 import org.apache.flink.table.api._
 import org.apache.flink.table.planner.plan.utils.MyPojo
 import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.NonDeterministicUdf
+import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedTableFunctions.{JavaTableFunc1,
 StringSplit}
 import org.apache.flink.table.planner.utils.TableTestBase
 
 import org.assertj.core.api.Assertions.assertThatExceptionOfType
@@ -201,4 +202,19 @@ class CalcTest extends TableTestBase {
     val sqlQuery = "SELECT a FROM (SELECT a, b FROM MyTable) t WHERE 
random_udf(b) > 10"
     util.verifyRelPlan(sqlQuery)
   }
+
+  @Test
+  def testCalcMergeWithCorrelate(): Unit = {
+    util.addTemporarySystemFunction("str_split", new StringSplit())
+    val sqlQuery =
+      """
+        |SELECT a, r FROM (
+        | SELECT a, random_udf(b) r FROM (
+        |  select a, b, c1 FROM MyTable, LATERAL TABLE(str_split(c)) AS T(c1)
+        | ) t
+        |)
+        |WHERE r > 10
+        |""".stripMargin
+    util.verifyRelPlan(sqlQuery)
+  }
 }

Reply via email to