This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.18 by this push:
     new f66679bfe5f [FLINK-32952][table-planner] Fix scan reuse with readable 
metadata and watermark push down get wrong watermark error (#23338)
f66679bfe5f is described below

commit f66679bfe5f5b344eec71a7579504762cc3c04ae
Author: yunhong <337361...@qq.com>
AuthorDate: Tue Sep 12 11:07:23 2023 +0800

    [FLINK-32952][table-planner] Fix scan reuse with readable metadata and 
watermark push down get wrong watermark error (#23338)
    
    Co-authored-by: zhengyunhong.zyh <zhengyunhong....@alibaba-inc.com>
---
 .../flink/table/planner/plan/reuse/ScanReuser.java |  16 +-
 .../table/planner/plan/reuse/ScanReuserUtils.java  |  12 +-
 .../table/planner/plan/optimize/ScanReuseTest.java |  76 ++++++++
 .../table/planner/plan/optimize/ScanReuseTest.xml  | 214 +++++++++++++++------
 4 files changed, 243 insertions(+), 75 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuser.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuser.java
index 1ef3bf2f2b1..9128e110346 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuser.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuser.java
@@ -170,12 +170,18 @@ public class ScanReuser {
             // 2. Create new source.
             List<SourceAbilitySpec> specs = 
abilitySpecsWithoutEscaped(pickTable);
 
-            // 2.1 Apply projections
-            List<SourceAbilitySpec> newSpecs = new ArrayList<>();
+            // 2.1 Create produced type.
+            // The source produced type is the input type into the runtime. 
The format looks as:
+            // PHYSICAL COLUMNS + METADATA COLUMNS. While re-compute the 
source ability specs with
+            // source metadata, we need to distinguish between schema type and 
produced type, which
+            // source ability specs use produced type instead of schema type.
             RowType originType =
                     DynamicSourceUtils.createProducedType(
                             
pickTable.contextResolvedTable().getResolvedSchema(),
                             pickTable.tableSource());
+
+            // 2.2 Apply projections
+            List<SourceAbilitySpec> newSpecs = new ArrayList<>();
             RowType newSourceType =
                     applyPhysicalAndMetadataPushDown(
                             pickTable.tableSource(),
@@ -190,15 +196,15 @@ public class ScanReuser {
                             allMetaKeys);
             specs.addAll(newSpecs);
 
-            // 2.2 Watermark spec
+            // 2.3 Watermark spec
             Optional<WatermarkPushDownSpec> watermarkSpec =
-                    getAdjustedWatermarkSpec(pickTable, newSourceType);
+                    getAdjustedWatermarkSpec(pickTable, originType, 
newSourceType);
             if (watermarkSpec.isPresent()) {
                 specs.add(watermarkSpec.get());
                 newSourceType = watermarkSpec.get().getProducedType().get();
             }
 
-            // 2.3 Create a new ScanTableSource. ScanTableSource can not be 
pushed down twice.
+            // 2.4 Create a new ScanTableSource. ScanTableSource can not be 
pushed down twice.
             DynamicTableSourceSpec tableSourceSpec =
                     new 
DynamicTableSourceSpec(pickTable.contextResolvedTable(), specs);
             ScanTableSource newTableSource =
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuserUtils.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuserUtils.java
index 5c79cd4cd25..43a00d720a2 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuserUtils.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuserUtils.java
@@ -170,24 +170,18 @@ public class ScanReuserUtils {
 
     /** Watermark push down must be after projection push down, so we need to 
adjust its index. */
     public static Optional<WatermarkPushDownSpec> getAdjustedWatermarkSpec(
-            TableSourceTable table, RowType newSourceType) {
-        RowType producedType =
-                (RowType)
-                        table.contextResolvedTable()
-                                .getResolvedSchema()
-                                .toSourceRowDataType()
-                                .getLogicalType();
+            TableSourceTable table, RowType oldSourceType, RowType 
newSourceType) {
         for (SourceAbilitySpec spec : table.abilitySpecs()) {
             if (spec instanceof WatermarkPushDownSpec) {
                 return Optional.of(
                         adjustWatermarkIndex(
                                 
table.contextResolvedTable().getResolvedSchema(),
-                                producedType,
+                                oldSourceType,
                                 newSourceType,
                                 (WatermarkPushDownSpec) spec));
             }
             if (spec.getProducedType().isPresent()) {
-                producedType = spec.getProducedType().get();
+                oldSourceType = spec.getProducedType().get();
             }
         }
         return Optional.empty();
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/ScanReuseTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/ScanReuseTest.java
index a0a2c9e480d..7016e15d15e 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/ScanReuseTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/ScanReuseTest.java
@@ -30,6 +30,8 @@ import org.junit.runners.Parameterized;
 import java.util.Arrays;
 import java.util.Collection;
 
+import static org.junit.Assume.assumeTrue;
+
 /** Test push project into source with sub plan reuse. */
 @RunWith(Parameterized.class)
 public class ScanReuseTest extends TableTestBase {
@@ -324,4 +326,78 @@ public class ScanReuseTest extends TableTestBase {
             util.verifyExecPlan(sqlQuery);
         }
     }
+
+    @Test
+    public void testReuseWithReadMetadataAndWatermarkPushDown1() {
+        assumeTrue(isStreaming);
+        String ddl =
+                "CREATE TABLE MyTable1 (\n"
+                        + "  metadata_0 int METADATA VIRTUAL,\n"
+                        + "  a0 int,\n"
+                        + "  a1 int,\n"
+                        + "  a2 int,\n"
+                        + "  ts STRING,\n "
+                        + "  rowtime as TO_TIMESTAMP(`ts`),\n"
+                        + "  WATERMARK FOR rowtime AS rowtime - INTERVAL '1' 
SECOND\n"
+                        + ") WITH (\n"
+                        + " 'connector' = 'values',\n"
+                        + " 'bounded' = 'false',\n"
+                        + " 'readable-metadata' = 'metadata_0:int',\n"
+                        + " 'enable-watermark-push-down' = 'true',\n"
+                        + " 'disable-lookup' = 'true'"
+                        + ")";
+        util.tableEnv().executeSql(ddl);
+
+        // join left side value source without projection spec.
+        String sqlQuery =
+                "SELECT T1.a1, T1.a2 FROM"
+                        + " (SELECT a0, window_start, window_end,"
+                        + " MIN(a1) as a1, MIN(a2) as a2, MIN(metadata_0) as 
metadata_0"
+                        + " FROM TABLE("
+                        + "   TUMBLE(TABLE MyTable1, DESCRIPTOR(rowtime), 
INTERVAL '1' SECOND)) "
+                        + " GROUP BY a0, window_start, window_end) T1,"
+                        + " (SELECT a0, window_start, window_end, MIN(a1) as 
a1"
+                        + "  FROM TABLE("
+                        + "   TUMBLE(TABLE MyTable1, DESCRIPTOR(rowtime), 
INTERVAL '1' SECOND)) "
+                        + " GROUP BY a0, window_start, window_end) T2"
+                        + " WHERE T1.a1 = T2.a1";
+        util.verifyExecPlan(sqlQuery);
+    }
+
+    @Test
+    public void testReuseWithReadMetadataAndWatermarkPushDown2() {
+        assumeTrue(isStreaming);
+        String ddl =
+                "CREATE TABLE MyTable1 (\n"
+                        + "  metadata_0 int METADATA VIRTUAL,\n"
+                        + "  a0 int,\n"
+                        + "  a1 int,\n"
+                        + "  a2 int,\n"
+                        + "  ts STRING,\n "
+                        + "  rowtime as TO_TIMESTAMP(`ts`),\n"
+                        + "  WATERMARK FOR rowtime AS rowtime - INTERVAL '1' 
SECOND\n"
+                        + ") WITH (\n"
+                        + " 'connector' = 'values',\n"
+                        + " 'bounded' = 'false',\n"
+                        + " 'readable-metadata' = 'metadata_0:int',\n"
+                        + " 'enable-watermark-push-down' = 'true',\n"
+                        + " 'disable-lookup' = 'true'"
+                        + ")";
+        util.tableEnv().executeSql(ddl);
+
+        // join right side value source without projection spec.
+        String sqlQuery =
+                "SELECT T1.a1, T2.a2 FROM"
+                        + " (SELECT a0, window_start, window_end, MIN(a1) as 
a1"
+                        + "  FROM TABLE("
+                        + "   TUMBLE(TABLE MyTable1, DESCRIPTOR(rowtime), 
INTERVAL '1' SECOND)) "
+                        + " GROUP BY a0, window_start, window_end) T1,"
+                        + " (SELECT a0, window_start, window_end,"
+                        + " MIN(a1) as a1, MIN(a2) as a2, MIN(metadata_0) as 
metadata_0"
+                        + " FROM TABLE("
+                        + "   TUMBLE(TABLE MyTable1, DESCRIPTOR(rowtime), 
INTERVAL '1' SECOND)) "
+                        + " GROUP BY a0, window_start, window_end) T2"
+                        + " WHERE T1.a1 = T2.a1";
+        util.verifyExecPlan(sqlQuery);
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ScanReuseTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ScanReuseTest.xml
index 3fca3c9ded3..345da2c2604 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ScanReuseTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ScanReuseTest.xml
@@ -154,37 +154,6 @@ Calc(select=[a, c, c0])
    :  +- TableSourceScan(table=[[default_catalog, default_database, MyTable, 
project=[a, c], metadata=[]]], fields=[a, c])(reuse_id=[1])
    +- Exchange(distribution=[hash[a]])
       +- Reused(reference_id=[1])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testProjectWithExpr[isStreaming: true]">
-    <Resource name="sql">
-      <![CDATA[SELECT T1.a, T1.b, T2.c FROM (SELECT a, b + 1 as b FROM 
MyTable) T1, MyTable T2 WHERE T1.a = T2.a]]>
-    </Resource>
-    <Resource name="ast">
-      <![CDATA[
-LogicalProject(a=[$0], b=[$1], c=[$4])
-+- LogicalFilter(condition=[=($0, $2)])
-   +- LogicalJoin(condition=[true], joinType=[inner])
-      :- LogicalProject(a=[$0], b=[+($1, 1)])
-      :  +- LogicalWatermarkAssigner(rowtime=[rtime], watermark=[-($7, 
5000:INTERVAL SECOND)])
-      :     +- LogicalProject(a=[$0], b=[$1], c=[$2], nested=[$3], 
metadata_1=[$4], compute_metadata=[*($4, 2)], metadata_2=[$5], 
rtime=[TO_TIMESTAMP($2, $3.s)])
-      :        +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
-      +- LogicalWatermarkAssigner(rowtime=[rtime], watermark=[-($7, 
5000:INTERVAL SECOND)])
-         +- LogicalProject(a=[$0], b=[$1], c=[$2], nested=[$3], 
metadata_1=[$4], compute_metadata=[*($4, 2)], metadata_2=[$5], 
rtime=[TO_TIMESTAMP($2, $3.s)])
-            +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
-]]>
-    </Resource>
-    <Resource name="optimized exec plan">
-      <![CDATA[
-Calc(select=[a, b, c])
-+- Join(joinType=[InnerJoin], where=[(a = a0)], select=[a, b, a0, c], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
-   :- Exchange(distribution=[hash[a]])
-   :  +- Calc(select=[a, (b + 1) AS b])
-   :     +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable, project=[a, b, c, nested_s], metadata=[], watermark=[-(TO_TIMESTAMP(c, 
nested_s), 5000:INTERVAL SECOND)], watermarkEmitStrategy=[on-periodic]]], 
fields=[a, b, c, nested_s])(reuse_id=[1])
-   +- Exchange(distribution=[hash[a]])
-      +- Calc(select=[a, c])
-         +- Reused(reference_id=[1])
 ]]>
     </Resource>
   </TestCase>
@@ -736,64 +705,65 @@ Calc(select=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testProjectWithFilter[isStreaming: false]">
+  <TestCase name="testProjectWithExpr[isStreaming: true]">
     <Resource name="sql">
-      <![CDATA[SELECT T1.a, T1.b, T2.c FROM (SELECT * FROM MyTable WHERE b = 
2) T1, (SELECT * FROM MyTable WHERE b = 3) T2 WHERE T1.a = T2.a]]>
+      <![CDATA[SELECT T1.a, T1.b, T2.c FROM (SELECT a, b + 1 as b FROM 
MyTable) T1, MyTable T2 WHERE T1.a = T2.a]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalProject(a=[$0], b=[$1], c=[$9])
-+- LogicalFilter(condition=[=($0, $7)])
+LogicalProject(a=[$0], b=[$1], c=[$4])
++- LogicalFilter(condition=[=($0, $2)])
    +- LogicalJoin(condition=[true], joinType=[inner])
-      :- LogicalProject(a=[$0], b=[$1], c=[$2], nested=[$3], metadata_1=[$4], 
compute_metadata=[$5], metadata_2=[$6])
-      :  +- LogicalFilter(condition=[=($1, 2)])
-      :     +- LogicalProject(a=[$0], b=[$1], c=[$2], nested=[$3], 
metadata_1=[$4], compute_metadata=[*($4, 2)], metadata_2=[$5])
+      :- LogicalProject(a=[$0], b=[+($1, 1)])
+      :  +- LogicalWatermarkAssigner(rowtime=[rtime], watermark=[-($7, 
5000:INTERVAL SECOND)])
+      :     +- LogicalProject(a=[$0], b=[$1], c=[$2], nested=[$3], 
metadata_1=[$4], compute_metadata=[*($4, 2)], metadata_2=[$5], 
rtime=[TO_TIMESTAMP($2, $3.s)])
       :        +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
-      +- LogicalProject(a=[$0], b=[$1], c=[$2], nested=[$3], metadata_1=[$4], 
compute_metadata=[$5], metadata_2=[$6])
-         +- LogicalFilter(condition=[=($1, 3)])
-            +- LogicalProject(a=[$0], b=[$1], c=[$2], nested=[$3], 
metadata_1=[$4], compute_metadata=[*($4, 2)], metadata_2=[$5])
-               +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
+      +- LogicalWatermarkAssigner(rowtime=[rtime], watermark=[-($7, 
5000:INTERVAL SECOND)])
+         +- LogicalProject(a=[$0], b=[$1], c=[$2], nested=[$3], 
metadata_1=[$4], compute_metadata=[*($4, 2)], metadata_2=[$5], 
rtime=[TO_TIMESTAMP($2, $3.s)])
+            +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, b, c])
-+- HashJoin(joinType=[InnerJoin], where=[(a = a0)], select=[a, b, a0, c], 
build=[left])
++- Join(joinType=[InnerJoin], where=[(a = a0)], select=[a, b, a0, c], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
    :- Exchange(distribution=[hash[a]])
-   :  +- Calc(select=[a, CAST(2 AS BIGINT) AS b], where=[(b = 2)])
-   :     +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable, filter=[], project=[a, b, c], metadata=[]]], fields=[a, b, 
c])(reuse_id=[1])
+   :  +- Calc(select=[a, (b + 1) AS b])
+   :     +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable, project=[a, b, c, nested_s], metadata=[], watermark=[-(TO_TIMESTAMP(c, 
nested_s), 5000:INTERVAL SECOND)], watermarkEmitStrategy=[on-periodic]]], 
fields=[a, b, c, nested_s])(reuse_id=[1])
    +- Exchange(distribution=[hash[a]])
-      +- Calc(select=[a, c], where=[(b = 3)])
+      +- Calc(select=[a, c])
          +- Reused(reference_id=[1])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testProjectWithMetaAndCompute[isStreaming: true]">
+  <TestCase name="testProjectWithFilter[isStreaming: false]">
     <Resource name="sql">
-      <![CDATA[SELECT T1.a, T1.b, T1.metadata_1, T1.compute_metadata, T2.c, 
T2.metadata_2 FROM MyTable T1, MyTable T2 WHERE T1.a = T2.a]]>
+      <![CDATA[SELECT T1.a, T1.b, T2.c FROM (SELECT * FROM MyTable WHERE b = 
2) T1, (SELECT * FROM MyTable WHERE b = 3) T2 WHERE T1.a = T2.a]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalProject(a=[$0], b=[$1], metadata_1=[$4], compute_metadata=[$5], 
c=[$10], metadata_2=[$14])
-+- LogicalFilter(condition=[=($0, $8)])
+LogicalProject(a=[$0], b=[$1], c=[$9])
++- LogicalFilter(condition=[=($0, $7)])
    +- LogicalJoin(condition=[true], joinType=[inner])
-      :- LogicalWatermarkAssigner(rowtime=[rtime], watermark=[-($7, 
5000:INTERVAL SECOND)])
-      :  +- LogicalProject(a=[$0], b=[$1], c=[$2], nested=[$3], 
metadata_1=[$4], compute_metadata=[*($4, 2)], metadata_2=[$5], 
rtime=[TO_TIMESTAMP($2, $3.s)])
-      :     +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
-      +- LogicalWatermarkAssigner(rowtime=[rtime], watermark=[-($7, 
5000:INTERVAL SECOND)])
-         +- LogicalProject(a=[$0], b=[$1], c=[$2], nested=[$3], 
metadata_1=[$4], compute_metadata=[*($4, 2)], metadata_2=[$5], 
rtime=[TO_TIMESTAMP($2, $3.s)])
-            +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
+      :- LogicalProject(a=[$0], b=[$1], c=[$2], nested=[$3], metadata_1=[$4], 
compute_metadata=[$5], metadata_2=[$6])
+      :  +- LogicalFilter(condition=[=($1, 2)])
+      :     +- LogicalProject(a=[$0], b=[$1], c=[$2], nested=[$3], 
metadata_1=[$4], compute_metadata=[*($4, 2)], metadata_2=[$5])
+      :        +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
+      +- LogicalProject(a=[$0], b=[$1], c=[$2], nested=[$3], metadata_1=[$4], 
compute_metadata=[$5], metadata_2=[$6])
+         +- LogicalFilter(condition=[=($1, 3)])
+            +- LogicalProject(a=[$0], b=[$1], c=[$2], nested=[$3], 
metadata_1=[$4], compute_metadata=[*($4, 2)], metadata_2=[$5])
+               +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Calc(select=[a, b, metadata_1, compute_metadata, c, metadata_2])
-+- Join(joinType=[InnerJoin], where=[(a = a0)], select=[a, b, metadata_1, 
compute_metadata, a0, c, metadata_2], leftInputSpec=[NoUniqueKey], 
rightInputSpec=[NoUniqueKey])
+Calc(select=[a, b, c])
++- HashJoin(joinType=[InnerJoin], where=[(a = a0)], select=[a, b, a0, c], 
build=[left])
    :- Exchange(distribution=[hash[a]])
-   :  +- Calc(select=[a, b, metadata_1, (metadata_1 * 2) AS compute_metadata])
-   :     +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable, project=[a, b, c, nested_s, metadata_1, metadata_2], metadata=[], 
watermark=[-(TO_TIMESTAMP(c, nested_s), 5000:INTERVAL SECOND)], 
watermarkEmitStrategy=[on-periodic]]], fields=[a, b, c, nested_s, metadata_1, 
metadata_2])(reuse_id=[1])
+   :  +- Calc(select=[a, CAST(2 AS BIGINT) AS b], where=[(b = 2)])
+   :     +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable, filter=[], project=[a, b, c], metadata=[]]], fields=[a, b, 
c])(reuse_id=[1])
    +- Exchange(distribution=[hash[a]])
-      +- Calc(select=[a, c, metadata_2])
+      +- Calc(select=[a, c], where=[(b = 3)])
          +- Reused(reference_id=[1])
 ]]>
     </Resource>
@@ -1226,6 +1196,36 @@ Calc(select=[a, b, metadata_1, compute_metadata, c, 
metadata_2])
    +- Exchange(distribution=[hash[a]])
       +- Calc(select=[a, b, metadata_1])
          +- Reused(reference_id=[1])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testProjectWithMetaAndCompute[isStreaming: true]">
+    <Resource name="sql">
+      <![CDATA[SELECT T1.a, T1.b, T1.metadata_1, T1.compute_metadata, T2.c, 
T2.metadata_2 FROM MyTable T1, MyTable T2 WHERE T1.a = T2.a]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], metadata_1=[$4], compute_metadata=[$5], 
c=[$10], metadata_2=[$14])
++- LogicalFilter(condition=[=($0, $8)])
+   +- LogicalJoin(condition=[true], joinType=[inner])
+      :- LogicalWatermarkAssigner(rowtime=[rtime], watermark=[-($7, 
5000:INTERVAL SECOND)])
+      :  +- LogicalProject(a=[$0], b=[$1], c=[$2], nested=[$3], 
metadata_1=[$4], compute_metadata=[*($4, 2)], metadata_2=[$5], 
rtime=[TO_TIMESTAMP($2, $3.s)])
+      :     +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
+      +- LogicalWatermarkAssigner(rowtime=[rtime], watermark=[-($7, 
5000:INTERVAL SECOND)])
+         +- LogicalProject(a=[$0], b=[$1], c=[$2], nested=[$3], 
metadata_1=[$4], compute_metadata=[*($4, 2)], metadata_2=[$5], 
rtime=[TO_TIMESTAMP($2, $3.s)])
+            +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[a, b, metadata_1, compute_metadata, c, metadata_2])
++- Join(joinType=[InnerJoin], where=[(a = a0)], select=[a, b, metadata_1, 
compute_metadata, a0, c, metadata_2], leftInputSpec=[NoUniqueKey], 
rightInputSpec=[NoUniqueKey])
+   :- Exchange(distribution=[hash[a]])
+   :  +- Calc(select=[a, b, metadata_1, (metadata_1 * 2) AS compute_metadata])
+   :     +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable, project=[a, b, c, nested_s, metadata_1, metadata_2], metadata=[], 
watermark=[-(TO_TIMESTAMP(c, nested_s), 5000:INTERVAL SECOND)], 
watermarkEmitStrategy=[on-periodic]]], fields=[a, b, c, nested_s, metadata_1, 
metadata_2])(reuse_id=[1])
+   +- Exchange(distribution=[hash[a]])
+      +- Calc(select=[a, c, metadata_2])
+         +- Reused(reference_id=[1])
 ]]>
     </Resource>
   </TestCase>
@@ -1256,6 +1256,98 @@ Calc(select=[a, c, c0])
    :  +- TableSourceScan(table=[[default_catalog, default_database, MyTable, 
partitions=[{c=1}], project=[a], metadata=[]]], fields=[a], hints=[[[OPTIONS 
options:{partition-list=c:1;c:2}]]])
    +- Exchange(distribution=[hash[a]])
       +- TableSourceScan(table=[[default_catalog, default_database, MyTable, 
partitions=[{c=2}], project=[a], metadata=[]]], fields=[a], hints=[[[OPTIONS 
options:{partition-list=c:1;c:2}]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testReuseWithReadMetadataAndWatermarkPushDown1[isStreaming: 
true]">
+    <Resource name="sql">
+      <![CDATA[SELECT T1.a1, T1.a2 FROM (SELECT a0, window_start, window_end, 
MIN(a1) as a1, MIN(a2) as a2, MIN(metadata_0) as metadata_0 FROM TABLE(   
TUMBLE(TABLE MyTable1, DESCRIPTOR(rowtime), INTERVAL '1' SECOND))  GROUP BY a0, 
window_start, window_end) T1, (SELECT a0, window_start, window_end, MIN(a1) as 
a1  FROM TABLE(   TUMBLE(TABLE MyTable1, DESCRIPTOR(rowtime), INTERVAL '1' 
SECOND))  GROUP BY a0, window_start, window_end) T2 WHERE T1.a1 = T2.a1]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$3], a2=[$4])
++- LogicalFilter(condition=[=($3, $9)])
+   +- LogicalJoin(condition=[true], joinType=[inner])
+      :- LogicalAggregate(group=[{0, 1, 2}], a1=[MIN($3)], a2=[MIN($4)], 
metadata_0=[MIN($5)])
+      :  +- LogicalProject(a0=[$1], window_start=[$6], window_end=[$7], 
a1=[$2], a2=[$3], metadata_0=[$0])
+      :     +- LogicalTableFunctionScan(invocation=[TUMBLE($5, DESCRIPTOR($5), 
1000:INTERVAL SECOND)], rowType=[RecordType(INTEGER metadata_0, INTEGER a0, 
INTEGER a1, INTEGER a2, VARCHAR(2147483647) ts, TIMESTAMP(3) *ROWTIME* rowtime, 
TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* 
window_time)])
+      :        +- LogicalProject(metadata_0=[$0], a0=[$1], a1=[$2], a2=[$3], 
ts=[$4], rowtime=[$5])
+      :           +- LogicalWatermarkAssigner(rowtime=[rowtime], 
watermark=[-($5, 1000:INTERVAL SECOND)])
+      :              +- LogicalProject(metadata_0=[$4], a0=[$0], a1=[$1], 
a2=[$2], ts=[$3], rowtime=[TO_TIMESTAMP($3)])
+      :                 +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable1]])
+      +- LogicalAggregate(group=[{0, 1, 2}], a1=[MIN($3)])
+         +- LogicalProject(a0=[$1], window_start=[$6], window_end=[$7], 
a1=[$2])
+            +- LogicalTableFunctionScan(invocation=[TUMBLE($5, DESCRIPTOR($5), 
1000:INTERVAL SECOND)], rowType=[RecordType(INTEGER metadata_0, INTEGER a0, 
INTEGER a1, INTEGER a2, VARCHAR(2147483647) ts, TIMESTAMP(3) *ROWTIME* rowtime, 
TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* 
window_time)])
+               +- LogicalProject(metadata_0=[$0], a0=[$1], a1=[$2], a2=[$3], 
ts=[$4], rowtime=[$5])
+                  +- LogicalWatermarkAssigner(rowtime=[rowtime], 
watermark=[-($5, 1000:INTERVAL SECOND)])
+                     +- LogicalProject(metadata_0=[$4], a0=[$0], a1=[$1], 
a2=[$2], ts=[$3], rowtime=[TO_TIMESTAMP($3)])
+                        +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable1]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[a1, a2])
++- Join(joinType=[InnerJoin], where=[(a1 = a10)], select=[a1, a2, a10], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+   :- Exchange(distribution=[hash[a1]])
+   :  +- Calc(select=[a1, a2])
+   :     +- GlobalWindowAggregate(groupBy=[a0], 
window=[TUMBLE(slice_end=[$slice_end], size=[1 s])], select=[a0, MIN(min$0) AS 
a1, MIN(min$1) AS a2, start('w$) AS window_start, end('w$) AS window_end])
+   :        +- Exchange(distribution=[hash[a0]])
+   :           +- LocalWindowAggregate(groupBy=[a0], 
window=[TUMBLE(time_col=[rowtime], size=[1 s])], select=[a0, MIN(a1) AS min$0, 
MIN(a2) AS min$1, slice_end('w$) AS $slice_end])
+   :              +- Calc(select=[a0, a1, a2, metadata_0, 
Reinterpret(TO_TIMESTAMP(ts)) AS rowtime])
+   :                 +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable1, project=[a0, a1, a2, ts, metadata_0], 
metadata=[metadata_0], watermark=[-(TO_TIMESTAMP(ts), 1000:INTERVAL SECOND)], 
watermarkEmitStrategy=[on-periodic]]], fields=[a0, a1, a2, ts, 
metadata_0])(reuse_id=[1])
+   +- Exchange(distribution=[hash[a1]])
+      +- Calc(select=[a1])
+         +- GlobalWindowAggregate(groupBy=[a0], 
window=[TUMBLE(slice_end=[$slice_end], size=[1 s])], select=[a0, MIN(min$0) AS 
a1, start('w$) AS window_start, end('w$) AS window_end])
+            +- Exchange(distribution=[hash[a0]])
+               +- LocalWindowAggregate(groupBy=[a0], 
window=[TUMBLE(time_col=[rowtime], size=[1 s])], select=[a0, MIN(a1) AS min$0, 
slice_end('w$) AS $slice_end])
+                  +- Calc(select=[a0, a1, Reinterpret(TO_TIMESTAMP(ts)) AS 
rowtime])
+                     +- Reused(reference_id=[1])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testReuseWithReadMetadataAndWatermarkPushDown2[isStreaming: 
true]">
+    <Resource name="sql">
+      <![CDATA[SELECT T1.a1, T2.a2 FROM (SELECT a0, window_start, window_end, 
MIN(a1) as a1  FROM TABLE(   TUMBLE(TABLE MyTable1, DESCRIPTOR(rowtime), 
INTERVAL '1' SECOND))  GROUP BY a0, window_start, window_end) T1, (SELECT a0, 
window_start, window_end, MIN(a1) as a1, MIN(a2) as a2, MIN(metadata_0) as 
metadata_0 FROM TABLE(   TUMBLE(TABLE MyTable1, DESCRIPTOR(rowtime), INTERVAL 
'1' SECOND))  GROUP BY a0, window_start, window_end) T2 WHERE T1.a1 = T2.a1]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$3], a2=[$8])
++- LogicalFilter(condition=[=($3, $7)])
+   +- LogicalJoin(condition=[true], joinType=[inner])
+      :- LogicalAggregate(group=[{0, 1, 2}], a1=[MIN($3)])
+      :  +- LogicalProject(a0=[$1], window_start=[$6], window_end=[$7], 
a1=[$2])
+      :     +- LogicalTableFunctionScan(invocation=[TUMBLE($5, DESCRIPTOR($5), 
1000:INTERVAL SECOND)], rowType=[RecordType(INTEGER metadata_0, INTEGER a0, 
INTEGER a1, INTEGER a2, VARCHAR(2147483647) ts, TIMESTAMP(3) *ROWTIME* rowtime, 
TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* 
window_time)])
+      :        +- LogicalProject(metadata_0=[$0], a0=[$1], a1=[$2], a2=[$3], 
ts=[$4], rowtime=[$5])
+      :           +- LogicalWatermarkAssigner(rowtime=[rowtime], 
watermark=[-($5, 1000:INTERVAL SECOND)])
+      :              +- LogicalProject(metadata_0=[$4], a0=[$0], a1=[$1], 
a2=[$2], ts=[$3], rowtime=[TO_TIMESTAMP($3)])
+      :                 +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable1]])
+      +- LogicalAggregate(group=[{0, 1, 2}], a1=[MIN($3)], a2=[MIN($4)], 
metadata_0=[MIN($5)])
+         +- LogicalProject(a0=[$1], window_start=[$6], window_end=[$7], 
a1=[$2], a2=[$3], metadata_0=[$0])
+            +- LogicalTableFunctionScan(invocation=[TUMBLE($5, DESCRIPTOR($5), 
1000:INTERVAL SECOND)], rowType=[RecordType(INTEGER metadata_0, INTEGER a0, 
INTEGER a1, INTEGER a2, VARCHAR(2147483647) ts, TIMESTAMP(3) *ROWTIME* rowtime, 
TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* 
window_time)])
+               +- LogicalProject(metadata_0=[$0], a0=[$1], a1=[$2], a2=[$3], 
ts=[$4], rowtime=[$5])
+                  +- LogicalWatermarkAssigner(rowtime=[rowtime], 
watermark=[-($5, 1000:INTERVAL SECOND)])
+                     +- LogicalProject(metadata_0=[$4], a0=[$0], a1=[$1], 
a2=[$2], ts=[$3], rowtime=[TO_TIMESTAMP($3)])
+                        +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable1]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[a1, a2])
++- Join(joinType=[InnerJoin], where=[(a1 = a10)], select=[a1, a10, a2], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+   :- Exchange(distribution=[hash[a1]])
+   :  +- Calc(select=[a1])
+   :     +- GlobalWindowAggregate(groupBy=[a0], 
window=[TUMBLE(slice_end=[$slice_end], size=[1 s])], select=[a0, MIN(min$0) AS 
a1, start('w$) AS window_start, end('w$) AS window_end])
+   :        +- Exchange(distribution=[hash[a0]])
+   :           +- LocalWindowAggregate(groupBy=[a0], 
window=[TUMBLE(time_col=[rowtime], size=[1 s])], select=[a0, MIN(a1) AS min$0, 
slice_end('w$) AS $slice_end])
+   :              +- Calc(select=[a0, a1, Reinterpret(TO_TIMESTAMP(ts)) AS 
rowtime])
+   :                 +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable1, project=[a0, a1, a2, ts, metadata_0], 
metadata=[metadata_0], watermark=[-(TO_TIMESTAMP(ts), 1000:INTERVAL SECOND)], 
watermarkEmitStrategy=[on-periodic]]], fields=[a0, a1, a2, ts, 
metadata_0])(reuse_id=[1])
+   +- Exchange(distribution=[hash[a1]])
+      +- Calc(select=[a1, a2])
+         +- GlobalWindowAggregate(groupBy=[a0], 
window=[TUMBLE(slice_end=[$slice_end], size=[1 s])], select=[a0, MIN(min$0) AS 
a1, MIN(min$1) AS a2, start('w$) AS window_start, end('w$) AS window_end])
+            +- Exchange(distribution=[hash[a0]])
+               +- LocalWindowAggregate(groupBy=[a0], 
window=[TUMBLE(time_col=[rowtime], size=[1 s])], select=[a0, MIN(a1) AS min$0, 
MIN(a2) AS min$1, slice_end('w$) AS $slice_end])
+                  +- Calc(select=[a0, a1, a2, metadata_0, 
Reinterpret(TO_TIMESTAMP(ts)) AS rowtime])
+                     +- Reused(reference_id=[1])
 ]]>
     </Resource>
   </TestCase>

Reply via email to