This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new bdffb6bd4ae [FLINK-28986][table-planner] UNNEST function with nested 
filter fails to generate plan
bdffb6bd4ae is described below

commit bdffb6bd4ae3ea32bdcd6ec6bce6c6e0e8b92a11
Author: Jane Chan <qingyue....@gmail.com>
AuthorDate: Tue Aug 16 17:35:54 2022 +0800

    [FLINK-28986][table-planner] UNNEST function with nested filter fails to 
generate plan
    
    This closes #20601
---
 .../planner/plan/rules/FlinkBatchRuleSets.scala    |  2 ++
 .../planner/plan/rules/FlinkStreamRuleSets.scala   |  2 ++
 .../table/planner/plan/batch/sql/UnnestTest.xml    | 36 +++++++++++++++++++
 .../plan/rules/logical/LogicalUnnestRuleTest.xml   | 42 ++++++++++++++++++++++
 .../table/planner/plan/stream/sql/UnnestTest.xml   | 36 +++++++++++++++++++
 .../table/planner/plan/common/UnnestTestBase.scala | 17 +++++++++
 .../planner/runtime/stream/sql/UnnestITCase.scala  | 31 ++++++++++++++++
 7 files changed, 166 insertions(+)

diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
index a5b1b197dec..691ca43f339 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
@@ -116,6 +116,8 @@ object FlinkBatchRuleSets {
         ConvertToNotInOrInRule.INSTANCE,
         // optimize limit 0
         FlinkLimit0RemoveRule.INSTANCE,
+        // fix: FLINK-28986 nested filter pattern causes unnest rule mismatch
+        CoreRules.FILTER_MERGE,
         // unnest rule
         LogicalUnnestRule.INSTANCE,
         // Wrap arguments for JSON aggregate functions
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
index d5462682964..97312367287 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
@@ -122,6 +122,8 @@ object FlinkStreamRuleSets {
           ConvertToNotInOrInRule.INSTANCE,
           // optimize limit 0
           FlinkLimit0RemoveRule.INSTANCE,
+          // fix: FLINK-28986 nested filter pattern causes unnest rule mismatch
+          CoreRules.FILTER_MERGE,
           // unnest rule
           LogicalUnnestRule.INSTANCE,
           // rewrite constant table function scan to correlate
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/UnnestTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/UnnestTest.xml
index 8896c7ee62d..ef3bc2e29d0 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/UnnestTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/UnnestTest.xml
@@ -291,6 +291,42 @@ LogicalProject(a=[$0], b=[$1], _1=[$2], _2=[$3])
       <![CDATA[
 Correlate(invocation=[$UNNEST_ROWS$1($cor0.b)], 
correlate=[table($UNNEST_ROWS$1($cor0.b))], select=[a,b,_1,_2], 
rowType=[RecordType(INTEGER a, RecordType:peek_no_expand(INTEGER _1, 
VARCHAR(2147483647) _2) ARRAY b, INTEGER _1, VARCHAR(2147483647) _2)], 
joinType=[INNER], condition=[>($0, 1)])
 +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b)]]], fields=[a, b])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testUnnestWithNestedFilter">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM (
+   SELECT a, b1, b2 FROM
+       (SELECT a, b FROM MyTable) T
+       CROSS JOIN
+       UNNEST(T.b) AS S(b1, b2)
+       WHERE S.b1 >= 12
+   ) tmp
+WHERE b2 <> 'Hello'
+    ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b1=[$1], b2=[$2])
++- LogicalFilter(condition=[<>($2, _UTF-16LE'Hello')])
+   +- LogicalProject(a=[$0], b1=[$2], b2=[$3])
+      +- LogicalFilter(condition=[>=($2, 12)])
+         +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{1}])
+            :- LogicalProject(a=[$0], b=[$1])
+            :  +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b)]]])
+            +- LogicalProject(b1=[$0], b2=[$1])
+               +- Uncollect
+                  +- LogicalProject(b=[$cor0.b])
+                     +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[a, _1 AS b1, _2 AS b2])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.b)], 
correlate=[table($UNNEST_ROWS$1($cor0.b))], select=[a,b,_1,_2], 
rowType=[RecordType(INTEGER a, RecordType:peek_no_expand(INTEGER _1, 
VARCHAR(2147483647) _2) ARRAY b, INTEGER _1, VARCHAR(2147483647) _2)], 
joinType=[INNER], condition=[AND(>=($0, 12), <>($1, _UTF-16LE'Hello'))])
+   +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b)]]], fields=[a, b])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/LogicalUnnestRuleTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/LogicalUnnestRuleTest.xml
index ddbdb8d61e3..d2e281ba526 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/LogicalUnnestRuleTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/LogicalUnnestRuleTest.xml
@@ -308,6 +308,48 @@ LogicalProject(a=[$0], b=[$1], _1=[$2], _2=[$3])
    +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{1}])
       :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b)]]])
       +- LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.b)], 
rowType=[RecordType:peek_no_expand(INTEGER _1, VARCHAR(2147483647) _2)])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testUnnestWithNestedFilter">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM (
+   SELECT a, b1, b2 FROM
+       (SELECT a, b FROM MyTable) T
+       CROSS JOIN
+       UNNEST(T.b) AS S(b1, b2)
+       WHERE S.b1 >= 12
+   ) tmp
+WHERE b2 <> 'Hello'
+    ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b1=[$1], b2=[$2])
++- LogicalFilter(condition=[<>($2, _UTF-16LE'Hello')])
+   +- LogicalProject(a=[$0], b1=[$2], b2=[$3])
+      +- LogicalFilter(condition=[>=($2, 12)])
+         +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{1}])
+            :- LogicalProject(a=[$0], b=[$1])
+            :  +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b)]]])
+            +- LogicalProject(b1=[$0], b2=[$1])
+               +- Uncollect
+                  +- LogicalProject(b=[$cor0.b])
+                     +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+LogicalProject(a=[$0], b1=[$1], b2=[$2])
++- LogicalFilter(condition=[<>($2, _UTF-16LE'Hello')])
+   +- LogicalProject(a=[$0], b1=[$2], b2=[$3])
+      +- LogicalFilter(condition=[>=($2, 12)])
+         +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{1}])
+            :- LogicalProject(a=[$0], b=[$1])
+            :  +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b)]]])
+            +- LogicalProject(b1=[$0], b2=[$1])
+               +- 
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.b)], 
rowType=[RecordType:peek_no_expand(INTEGER _1, VARCHAR(2147483647) _2)])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/UnnestTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/UnnestTest.xml
index 1dbe3458b14..f266ab21419 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/UnnestTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/UnnestTest.xml
@@ -282,6 +282,42 @@ LogicalProject(a=[$0], b=[$1], _1=[$2], _2=[$3])
       <![CDATA[
 Correlate(invocation=[$UNNEST_ROWS$1($cor0.b)], 
correlate=[table($UNNEST_ROWS$1($cor0.b))], select=[a,b,_1,_2], 
rowType=[RecordType(INTEGER a, RecordType:peek_no_expand(INTEGER _1, 
VARCHAR(2147483647) _2) ARRAY b, INTEGER _1, VARCHAR(2147483647) _2)], 
joinType=[INNER], condition=[>($0, 1)])
 +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b)]]], fields=[a, b])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testUnnestWithNestedFilter">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM (
+   SELECT a, b1, b2 FROM
+       (SELECT a, b FROM MyTable) T
+       CROSS JOIN
+       UNNEST(T.b) AS S(b1, b2)
+       WHERE S.b1 >= 12
+   ) tmp
+WHERE b2 <> 'Hello'
+    ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b1=[$1], b2=[$2])
++- LogicalFilter(condition=[<>($2, _UTF-16LE'Hello')])
+   +- LogicalProject(a=[$0], b1=[$2], b2=[$3])
+      +- LogicalFilter(condition=[>=($2, 12)])
+         +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{1}])
+            :- LogicalProject(a=[$0], b=[$1])
+            :  +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b)]]])
+            +- LogicalProject(b1=[$0], b2=[$1])
+               +- Uncollect
+                  +- LogicalProject(b=[$cor0.b])
+                     +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[a, _1 AS b1, _2 AS b2])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.b)], 
correlate=[table($UNNEST_ROWS$1($cor0.b))], select=[a,b,_1,_2], 
rowType=[RecordType(INTEGER a, RecordType:peek_no_expand(INTEGER _1, 
VARCHAR(2147483647) _2) ARRAY b, INTEGER _1, VARCHAR(2147483647) _2)], 
joinType=[INNER], condition=[AND(>=($0, 12), <>($1, _UTF-16LE'Hello'))])
+   +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b)]]], fields=[a, b])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/UnnestTestBase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/UnnestTestBase.scala
index 04403b4e96c..c7d76a4c8da 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/UnnestTestBase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/UnnestTestBase.scala
@@ -121,6 +121,23 @@ abstract class UnnestTestBase(withExecPlan: Boolean) 
extends TableTestBase {
     verifyPlan("SELECT a, b, A._1, A._2 FROM MyTable, UNNEST(MyTable.b) AS A 
where A._1 > 1")
   }
 
+  @Test
+  def testUnnestWithNestedFilter(): Unit = {
+    util.addTableSource[(Int, Array[(Int, String)])]("MyTable", 'a, 'b)
+    val sqlQuery =
+      """
+        |SELECT * FROM (
+        |   SELECT a, b1, b2 FROM
+        |       (SELECT a, b FROM MyTable) T
+        |       CROSS JOIN
+        |       UNNEST(T.b) AS S(b1, b2)
+        |       WHERE S.b1 >= 12
+        |   ) tmp
+        |WHERE b2 <> 'Hello'
+    """.stripMargin
+    verifyPlan(sqlQuery)
+  }
+
   private def verifyPlan(sql: String): Unit = {
     if (withExecPlan) {
       util.verifyExecPlan(sql)
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/UnnestITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/UnnestITCase.scala
index 863bf3984f8..81214a4cf2e 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/UnnestITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/UnnestITCase.scala
@@ -313,4 +313,35 @@ class UnnestITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase(mo
     assertEquals(expected.sorted, sink.getAppendResults.sorted)
   }
 
+  @Test
+  def testUnnestWithNestedFilter(): Unit = {
+    val data = List(
+      (1, Array((12, "45.6"), (12, "45.612"))),
+      (2, Array((13, "41.6"), (14, "45.2136"))),
+      (3, Array((18, "42.6")))
+    )
+    val t = env.fromCollection(data).toTable(tEnv, 'a, 'b)
+    tEnv.registerTable("MyTable", t)
+
+    val sqlQuery =
+      """
+        |SELECT * FROM (
+        |   SELECT a, b1, b2 FROM
+        |       (SELECT a, b FROM MyTable) T
+        |       CROSS JOIN
+        |       UNNEST(T.b) as S(b1, b2)
+        |       WHERE S.b1 >= 12
+        |   ) tmp
+        |WHERE b2 <> '42.6'
+    """.stripMargin
+
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val expected = List("1,12,45.612", "1,12,45.6", "2,13,41.6", 
"2,14,45.2136")
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
 }

Reply via email to