This is an automated email from the ASF dual-hosted git repository. shengkai pushed a commit to branch release-1.18 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.18 by this push: new f66679bfe5f [FLINK-32952][table-planner] Fix scan reuse with readable metadata and watermark push down get wrong watermark error (#23338) f66679bfe5f is described below commit f66679bfe5f5b344eec71a7579504762cc3c04ae Author: yunhong <337361...@qq.com> AuthorDate: Tue Sep 12 11:07:23 2023 +0800 [FLINK-32952][table-planner] Fix scan reuse with readable metadata and watermark push down get wrong watermark error (#23338) Co-authored-by: zhengyunhong.zyh <zhengyunhong....@alibaba-inc.com> --- .../flink/table/planner/plan/reuse/ScanReuser.java | 16 +- .../table/planner/plan/reuse/ScanReuserUtils.java | 12 +- .../table/planner/plan/optimize/ScanReuseTest.java | 76 ++++++++ .../table/planner/plan/optimize/ScanReuseTest.xml | 214 +++++++++++++++------ 4 files changed, 243 insertions(+), 75 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuser.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuser.java index 1ef3bf2f2b1..9128e110346 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuser.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuser.java @@ -170,12 +170,18 @@ public class ScanReuser { // 2. Create new source. List<SourceAbilitySpec> specs = abilitySpecsWithoutEscaped(pickTable); - // 2.1 Apply projections - List<SourceAbilitySpec> newSpecs = new ArrayList<>(); + // 2.1 Create produced type. + // The source produced type is the input type into the runtime. The format looks as: + // PHYSICAL COLUMNS + METADATA COLUMNS. While re-compute the source ability specs with + // source metadata, we need to distinguish between schema type and produced type, which + // source ability specs use produced type instead of schema type. RowType originType = DynamicSourceUtils.createProducedType( pickTable.contextResolvedTable().getResolvedSchema(), pickTable.tableSource()); + + // 2.2 Apply projections + List<SourceAbilitySpec> newSpecs = new ArrayList<>(); RowType newSourceType = applyPhysicalAndMetadataPushDown( pickTable.tableSource(), @@ -190,15 +196,15 @@ public class ScanReuser { allMetaKeys); specs.addAll(newSpecs); - // 2.2 Watermark spec + // 2.3 Watermark spec Optional<WatermarkPushDownSpec> watermarkSpec = - getAdjustedWatermarkSpec(pickTable, newSourceType); + getAdjustedWatermarkSpec(pickTable, originType, newSourceType); if (watermarkSpec.isPresent()) { specs.add(watermarkSpec.get()); newSourceType = watermarkSpec.get().getProducedType().get(); } - // 2.3 Create a new ScanTableSource. ScanTableSource can not be pushed down twice. + // 2.4 Create a new ScanTableSource. ScanTableSource can not be pushed down twice. DynamicTableSourceSpec tableSourceSpec = new DynamicTableSourceSpec(pickTable.contextResolvedTable(), specs); ScanTableSource newTableSource = diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuserUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuserUtils.java index 5c79cd4cd25..43a00d720a2 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuserUtils.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuserUtils.java @@ -170,24 +170,18 @@ public class ScanReuserUtils { /** Watermark push down must be after projection push down, so we need to adjust its index. */ public static Optional<WatermarkPushDownSpec> getAdjustedWatermarkSpec( - TableSourceTable table, RowType newSourceType) { - RowType producedType = - (RowType) - table.contextResolvedTable() - .getResolvedSchema() - .toSourceRowDataType() - .getLogicalType(); + TableSourceTable table, RowType oldSourceType, RowType newSourceType) { for (SourceAbilitySpec spec : table.abilitySpecs()) { if (spec instanceof WatermarkPushDownSpec) { return Optional.of( adjustWatermarkIndex( table.contextResolvedTable().getResolvedSchema(), - producedType, + oldSourceType, newSourceType, (WatermarkPushDownSpec) spec)); } if (spec.getProducedType().isPresent()) { - producedType = spec.getProducedType().get(); + oldSourceType = spec.getProducedType().get(); } } return Optional.empty(); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/ScanReuseTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/ScanReuseTest.java index a0a2c9e480d..7016e15d15e 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/ScanReuseTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/ScanReuseTest.java @@ -30,6 +30,8 @@ import org.junit.runners.Parameterized; import java.util.Arrays; import java.util.Collection; +import static org.junit.Assume.assumeTrue; + /** Test push project into source with sub plan reuse. */ @RunWith(Parameterized.class) public class ScanReuseTest extends TableTestBase { @@ -324,4 +326,78 @@ public class ScanReuseTest extends TableTestBase { util.verifyExecPlan(sqlQuery); } } + + @Test + public void testReuseWithReadMetadataAndWatermarkPushDown1() { + assumeTrue(isStreaming); + String ddl = + "CREATE TABLE MyTable1 (\n" + + " metadata_0 int METADATA VIRTUAL,\n" + + " a0 int,\n" + + " a1 int,\n" + + " a2 int,\n" + + " ts STRING,\n " + + " rowtime as TO_TIMESTAMP(`ts`),\n" + + " WATERMARK FOR rowtime AS rowtime - INTERVAL '1' SECOND\n" + + ") WITH (\n" + + " 'connector' = 'values',\n" + + " 'bounded' = 'false',\n" + + " 'readable-metadata' = 'metadata_0:int',\n" + + " 'enable-watermark-push-down' = 'true',\n" + + " 'disable-lookup' = 'true'" + + ")"; + util.tableEnv().executeSql(ddl); + + // join left side value source without projection spec. + String sqlQuery = + "SELECT T1.a1, T1.a2 FROM" + + " (SELECT a0, window_start, window_end," + + " MIN(a1) as a1, MIN(a2) as a2, MIN(metadata_0) as metadata_0" + + " FROM TABLE(" + + " TUMBLE(TABLE MyTable1, DESCRIPTOR(rowtime), INTERVAL '1' SECOND)) " + + " GROUP BY a0, window_start, window_end) T1," + + " (SELECT a0, window_start, window_end, MIN(a1) as a1" + + " FROM TABLE(" + + " TUMBLE(TABLE MyTable1, DESCRIPTOR(rowtime), INTERVAL '1' SECOND)) " + + " GROUP BY a0, window_start, window_end) T2" + + " WHERE T1.a1 = T2.a1"; + util.verifyExecPlan(sqlQuery); + } + + @Test + public void testReuseWithReadMetadataAndWatermarkPushDown2() { + assumeTrue(isStreaming); + String ddl = + "CREATE TABLE MyTable1 (\n" + + " metadata_0 int METADATA VIRTUAL,\n" + + " a0 int,\n" + + " a1 int,\n" + + " a2 int,\n" + + " ts STRING,\n " + + " rowtime as TO_TIMESTAMP(`ts`),\n" + + " WATERMARK FOR rowtime AS rowtime - INTERVAL '1' SECOND\n" + + ") WITH (\n" + + " 'connector' = 'values',\n" + + " 'bounded' = 'false',\n" + + " 'readable-metadata' = 'metadata_0:int',\n" + + " 'enable-watermark-push-down' = 'true',\n" + + " 'disable-lookup' = 'true'" + + ")"; + util.tableEnv().executeSql(ddl); + + // join right side value source without projection spec. + String sqlQuery = + "SELECT T1.a1, T2.a2 FROM" + + " (SELECT a0, window_start, window_end, MIN(a1) as a1" + + " FROM TABLE(" + + " TUMBLE(TABLE MyTable1, DESCRIPTOR(rowtime), INTERVAL '1' SECOND)) " + + " GROUP BY a0, window_start, window_end) T1," + + " (SELECT a0, window_start, window_end," + + " MIN(a1) as a1, MIN(a2) as a2, MIN(metadata_0) as metadata_0" + + " FROM TABLE(" + + " TUMBLE(TABLE MyTable1, DESCRIPTOR(rowtime), INTERVAL '1' SECOND)) " + + " GROUP BY a0, window_start, window_end) T2" + + " WHERE T1.a1 = T2.a1"; + util.verifyExecPlan(sqlQuery); + } } diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ScanReuseTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ScanReuseTest.xml index 3fca3c9ded3..345da2c2604 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ScanReuseTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ScanReuseTest.xml @@ -154,37 +154,6 @@ Calc(select=[a, c, c0]) : +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, c], metadata=[]]], fields=[a, c])(reuse_id=[1]) +- Exchange(distribution=[hash[a]]) +- Reused(reference_id=[1]) -]]> - </Resource> - </TestCase> - <TestCase name="testProjectWithExpr[isStreaming: true]"> - <Resource name="sql"> - <![CDATA[SELECT T1.a, T1.b, T2.c FROM (SELECT a, b + 1 as b FROM MyTable) T1, MyTable T2 WHERE T1.a = T2.a]]> - </Resource> - <Resource name="ast"> - <![CDATA[ -LogicalProject(a=[$0], b=[$1], c=[$4]) -+- LogicalFilter(condition=[=($0, $2)]) - +- LogicalJoin(condition=[true], joinType=[inner]) - :- LogicalProject(a=[$0], b=[+($1, 1)]) - : +- LogicalWatermarkAssigner(rowtime=[rtime], watermark=[-($7, 5000:INTERVAL SECOND)]) - : +- LogicalProject(a=[$0], b=[$1], c=[$2], nested=[$3], metadata_1=[$4], compute_metadata=[*($4, 2)], metadata_2=[$5], rtime=[TO_TIMESTAMP($2, $3.s)]) - : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) - +- LogicalWatermarkAssigner(rowtime=[rtime], watermark=[-($7, 5000:INTERVAL SECOND)]) - +- LogicalProject(a=[$0], b=[$1], c=[$2], nested=[$3], metadata_1=[$4], compute_metadata=[*($4, 2)], metadata_2=[$5], rtime=[TO_TIMESTAMP($2, $3.s)]) - +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) -]]> - </Resource> - <Resource name="optimized exec plan"> - <![CDATA[ -Calc(select=[a, b, c]) -+- Join(joinType=[InnerJoin], where=[(a = a0)], select=[a, b, a0, c], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) - :- Exchange(distribution=[hash[a]]) - : +- Calc(select=[a, (b + 1) AS b]) - : +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, b, c, nested_s], metadata=[], watermark=[-(TO_TIMESTAMP(c, nested_s), 5000:INTERVAL SECOND)], watermarkEmitStrategy=[on-periodic]]], fields=[a, b, c, nested_s])(reuse_id=[1]) - +- Exchange(distribution=[hash[a]]) - +- Calc(select=[a, c]) - +- Reused(reference_id=[1]) ]]> </Resource> </TestCase> @@ -736,64 +705,65 @@ Calc(select=[a, b, c]) ]]> </Resource> </TestCase> - <TestCase name="testProjectWithFilter[isStreaming: false]"> + <TestCase name="testProjectWithExpr[isStreaming: true]"> <Resource name="sql"> - <![CDATA[SELECT T1.a, T1.b, T2.c FROM (SELECT * FROM MyTable WHERE b = 2) T1, (SELECT * FROM MyTable WHERE b = 3) T2 WHERE T1.a = T2.a]]> + <![CDATA[SELECT T1.a, T1.b, T2.c FROM (SELECT a, b + 1 as b FROM MyTable) T1, MyTable T2 WHERE T1.a = T2.a]]> </Resource> <Resource name="ast"> <![CDATA[ -LogicalProject(a=[$0], b=[$1], c=[$9]) -+- LogicalFilter(condition=[=($0, $7)]) +LogicalProject(a=[$0], b=[$1], c=[$4]) ++- LogicalFilter(condition=[=($0, $2)]) +- LogicalJoin(condition=[true], joinType=[inner]) - :- LogicalProject(a=[$0], b=[$1], c=[$2], nested=[$3], metadata_1=[$4], compute_metadata=[$5], metadata_2=[$6]) - : +- LogicalFilter(condition=[=($1, 2)]) - : +- LogicalProject(a=[$0], b=[$1], c=[$2], nested=[$3], metadata_1=[$4], compute_metadata=[*($4, 2)], metadata_2=[$5]) + :- LogicalProject(a=[$0], b=[+($1, 1)]) + : +- LogicalWatermarkAssigner(rowtime=[rtime], watermark=[-($7, 5000:INTERVAL SECOND)]) + : +- LogicalProject(a=[$0], b=[$1], c=[$2], nested=[$3], metadata_1=[$4], compute_metadata=[*($4, 2)], metadata_2=[$5], rtime=[TO_TIMESTAMP($2, $3.s)]) : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) - +- LogicalProject(a=[$0], b=[$1], c=[$2], nested=[$3], metadata_1=[$4], compute_metadata=[$5], metadata_2=[$6]) - +- LogicalFilter(condition=[=($1, 3)]) - +- LogicalProject(a=[$0], b=[$1], c=[$2], nested=[$3], metadata_1=[$4], compute_metadata=[*($4, 2)], metadata_2=[$5]) - +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) + +- LogicalWatermarkAssigner(rowtime=[rtime], watermark=[-($7, 5000:INTERVAL SECOND)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], nested=[$3], metadata_1=[$4], compute_metadata=[*($4, 2)], metadata_2=[$5], rtime=[TO_TIMESTAMP($2, $3.s)]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) ]]> </Resource> <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[a, b, c]) -+- HashJoin(joinType=[InnerJoin], where=[(a = a0)], select=[a, b, a0, c], build=[left]) ++- Join(joinType=[InnerJoin], where=[(a = a0)], select=[a, b, a0, c], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) :- Exchange(distribution=[hash[a]]) - : +- Calc(select=[a, CAST(2 AS BIGINT) AS b], where=[(b = 2)]) - : +- TableSourceScan(table=[[default_catalog, default_database, MyTable, filter=[], project=[a, b, c], metadata=[]]], fields=[a, b, c])(reuse_id=[1]) + : +- Calc(select=[a, (b + 1) AS b]) + : +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, b, c, nested_s], metadata=[], watermark=[-(TO_TIMESTAMP(c, nested_s), 5000:INTERVAL SECOND)], watermarkEmitStrategy=[on-periodic]]], fields=[a, b, c, nested_s])(reuse_id=[1]) +- Exchange(distribution=[hash[a]]) - +- Calc(select=[a, c], where=[(b = 3)]) + +- Calc(select=[a, c]) +- Reused(reference_id=[1]) ]]> </Resource> </TestCase> - <TestCase name="testProjectWithMetaAndCompute[isStreaming: true]"> + <TestCase name="testProjectWithFilter[isStreaming: false]"> <Resource name="sql"> - <![CDATA[SELECT T1.a, T1.b, T1.metadata_1, T1.compute_metadata, T2.c, T2.metadata_2 FROM MyTable T1, MyTable T2 WHERE T1.a = T2.a]]> + <![CDATA[SELECT T1.a, T1.b, T2.c FROM (SELECT * FROM MyTable WHERE b = 2) T1, (SELECT * FROM MyTable WHERE b = 3) T2 WHERE T1.a = T2.a]]> </Resource> <Resource name="ast"> <![CDATA[ -LogicalProject(a=[$0], b=[$1], metadata_1=[$4], compute_metadata=[$5], c=[$10], metadata_2=[$14]) -+- LogicalFilter(condition=[=($0, $8)]) +LogicalProject(a=[$0], b=[$1], c=[$9]) ++- LogicalFilter(condition=[=($0, $7)]) +- LogicalJoin(condition=[true], joinType=[inner]) - :- LogicalWatermarkAssigner(rowtime=[rtime], watermark=[-($7, 5000:INTERVAL SECOND)]) - : +- LogicalProject(a=[$0], b=[$1], c=[$2], nested=[$3], metadata_1=[$4], compute_metadata=[*($4, 2)], metadata_2=[$5], rtime=[TO_TIMESTAMP($2, $3.s)]) - : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) - +- LogicalWatermarkAssigner(rowtime=[rtime], watermark=[-($7, 5000:INTERVAL SECOND)]) - +- LogicalProject(a=[$0], b=[$1], c=[$2], nested=[$3], metadata_1=[$4], compute_metadata=[*($4, 2)], metadata_2=[$5], rtime=[TO_TIMESTAMP($2, $3.s)]) - +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) + :- LogicalProject(a=[$0], b=[$1], c=[$2], nested=[$3], metadata_1=[$4], compute_metadata=[$5], metadata_2=[$6]) + : +- LogicalFilter(condition=[=($1, 2)]) + : +- LogicalProject(a=[$0], b=[$1], c=[$2], nested=[$3], metadata_1=[$4], compute_metadata=[*($4, 2)], metadata_2=[$5]) + : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], nested=[$3], metadata_1=[$4], compute_metadata=[$5], metadata_2=[$6]) + +- LogicalFilter(condition=[=($1, 3)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], nested=[$3], metadata_1=[$4], compute_metadata=[*($4, 2)], metadata_2=[$5]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) ]]> </Resource> <Resource name="optimized exec plan"> <![CDATA[ -Calc(select=[a, b, metadata_1, compute_metadata, c, metadata_2]) -+- Join(joinType=[InnerJoin], where=[(a = a0)], select=[a, b, metadata_1, compute_metadata, a0, c, metadata_2], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) +Calc(select=[a, b, c]) ++- HashJoin(joinType=[InnerJoin], where=[(a = a0)], select=[a, b, a0, c], build=[left]) :- Exchange(distribution=[hash[a]]) - : +- Calc(select=[a, b, metadata_1, (metadata_1 * 2) AS compute_metadata]) - : +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, b, c, nested_s, metadata_1, metadata_2], metadata=[], watermark=[-(TO_TIMESTAMP(c, nested_s), 5000:INTERVAL SECOND)], watermarkEmitStrategy=[on-periodic]]], fields=[a, b, c, nested_s, metadata_1, metadata_2])(reuse_id=[1]) + : +- Calc(select=[a, CAST(2 AS BIGINT) AS b], where=[(b = 2)]) + : +- TableSourceScan(table=[[default_catalog, default_database, MyTable, filter=[], project=[a, b, c], metadata=[]]], fields=[a, b, c])(reuse_id=[1]) +- Exchange(distribution=[hash[a]]) - +- Calc(select=[a, c, metadata_2]) + +- Calc(select=[a, c], where=[(b = 3)]) +- Reused(reference_id=[1]) ]]> </Resource> @@ -1226,6 +1196,36 @@ Calc(select=[a, b, metadata_1, compute_metadata, c, metadata_2]) +- Exchange(distribution=[hash[a]]) +- Calc(select=[a, b, metadata_1]) +- Reused(reference_id=[1]) +]]> + </Resource> + </TestCase> + <TestCase name="testProjectWithMetaAndCompute[isStreaming: true]"> + <Resource name="sql"> + <![CDATA[SELECT T1.a, T1.b, T1.metadata_1, T1.compute_metadata, T2.c, T2.metadata_2 FROM MyTable T1, MyTable T2 WHERE T1.a = T2.a]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(a=[$0], b=[$1], metadata_1=[$4], compute_metadata=[$5], c=[$10], metadata_2=[$14]) ++- LogicalFilter(condition=[=($0, $8)]) + +- LogicalJoin(condition=[true], joinType=[inner]) + :- LogicalWatermarkAssigner(rowtime=[rtime], watermark=[-($7, 5000:INTERVAL SECOND)]) + : +- LogicalProject(a=[$0], b=[$1], c=[$2], nested=[$3], metadata_1=[$4], compute_metadata=[*($4, 2)], metadata_2=[$5], rtime=[TO_TIMESTAMP($2, $3.s)]) + : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) + +- LogicalWatermarkAssigner(rowtime=[rtime], watermark=[-($7, 5000:INTERVAL SECOND)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], nested=[$3], metadata_1=[$4], compute_metadata=[*($4, 2)], metadata_2=[$5], rtime=[TO_TIMESTAMP($2, $3.s)]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) +]]> + </Resource> + <Resource name="optimized exec plan"> + <![CDATA[ +Calc(select=[a, b, metadata_1, compute_metadata, c, metadata_2]) ++- Join(joinType=[InnerJoin], where=[(a = a0)], select=[a, b, metadata_1, compute_metadata, a0, c, metadata_2], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) + :- Exchange(distribution=[hash[a]]) + : +- Calc(select=[a, b, metadata_1, (metadata_1 * 2) AS compute_metadata]) + : +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, b, c, nested_s, metadata_1, metadata_2], metadata=[], watermark=[-(TO_TIMESTAMP(c, nested_s), 5000:INTERVAL SECOND)], watermarkEmitStrategy=[on-periodic]]], fields=[a, b, c, nested_s, metadata_1, metadata_2])(reuse_id=[1]) + +- Exchange(distribution=[hash[a]]) + +- Calc(select=[a, c, metadata_2]) + +- Reused(reference_id=[1]) ]]> </Resource> </TestCase> @@ -1256,6 +1256,98 @@ Calc(select=[a, c, c0]) : +- TableSourceScan(table=[[default_catalog, default_database, MyTable, partitions=[{c=1}], project=[a], metadata=[]]], fields=[a], hints=[[[OPTIONS options:{partition-list=c:1;c:2}]]]) +- Exchange(distribution=[hash[a]]) +- TableSourceScan(table=[[default_catalog, default_database, MyTable, partitions=[{c=2}], project=[a], metadata=[]]], fields=[a], hints=[[[OPTIONS options:{partition-list=c:1;c:2}]]]) +]]> + </Resource> + </TestCase> + <TestCase name="testReuseWithReadMetadataAndWatermarkPushDown1[isStreaming: true]"> + <Resource name="sql"> + <![CDATA[SELECT T1.a1, T1.a2 FROM (SELECT a0, window_start, window_end, MIN(a1) as a1, MIN(a2) as a2, MIN(metadata_0) as metadata_0 FROM TABLE( TUMBLE(TABLE MyTable1, DESCRIPTOR(rowtime), INTERVAL '1' SECOND)) GROUP BY a0, window_start, window_end) T1, (SELECT a0, window_start, window_end, MIN(a1) as a1 FROM TABLE( TUMBLE(TABLE MyTable1, DESCRIPTOR(rowtime), INTERVAL '1' SECOND)) GROUP BY a0, window_start, window_end) T2 WHERE T1.a1 = T2.a1]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(a1=[$3], a2=[$4]) ++- LogicalFilter(condition=[=($3, $9)]) + +- LogicalJoin(condition=[true], joinType=[inner]) + :- LogicalAggregate(group=[{0, 1, 2}], a1=[MIN($3)], a2=[MIN($4)], metadata_0=[MIN($5)]) + : +- LogicalProject(a0=[$1], window_start=[$6], window_end=[$7], a1=[$2], a2=[$3], metadata_0=[$0]) + : +- LogicalTableFunctionScan(invocation=[TUMBLE($5, DESCRIPTOR($5), 1000:INTERVAL SECOND)], rowType=[RecordType(INTEGER metadata_0, INTEGER a0, INTEGER a1, INTEGER a2, VARCHAR(2147483647) ts, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)]) + : +- LogicalProject(metadata_0=[$0], a0=[$1], a1=[$2], a2=[$3], ts=[$4], rowtime=[$5]) + : +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)]) + : +- LogicalProject(metadata_0=[$4], a0=[$0], a1=[$1], a2=[$2], ts=[$3], rowtime=[TO_TIMESTAMP($3)]) + : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]]) + +- LogicalAggregate(group=[{0, 1, 2}], a1=[MIN($3)]) + +- LogicalProject(a0=[$1], window_start=[$6], window_end=[$7], a1=[$2]) + +- LogicalTableFunctionScan(invocation=[TUMBLE($5, DESCRIPTOR($5), 1000:INTERVAL SECOND)], rowType=[RecordType(INTEGER metadata_0, INTEGER a0, INTEGER a1, INTEGER a2, VARCHAR(2147483647) ts, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)]) + +- LogicalProject(metadata_0=[$0], a0=[$1], a1=[$2], a2=[$3], ts=[$4], rowtime=[$5]) + +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)]) + +- LogicalProject(metadata_0=[$4], a0=[$0], a1=[$1], a2=[$2], ts=[$3], rowtime=[TO_TIMESTAMP($3)]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]]) +]]> + </Resource> + <Resource name="optimized exec plan"> + <![CDATA[ +Calc(select=[a1, a2]) ++- Join(joinType=[InnerJoin], where=[(a1 = a10)], select=[a1, a2, a10], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) + :- Exchange(distribution=[hash[a1]]) + : +- Calc(select=[a1, a2]) + : +- GlobalWindowAggregate(groupBy=[a0], window=[TUMBLE(slice_end=[$slice_end], size=[1 s])], select=[a0, MIN(min$0) AS a1, MIN(min$1) AS a2, start('w$) AS window_start, end('w$) AS window_end]) + : +- Exchange(distribution=[hash[a0]]) + : +- LocalWindowAggregate(groupBy=[a0], window=[TUMBLE(time_col=[rowtime], size=[1 s])], select=[a0, MIN(a1) AS min$0, MIN(a2) AS min$1, slice_end('w$) AS $slice_end]) + : +- Calc(select=[a0, a1, a2, metadata_0, Reinterpret(TO_TIMESTAMP(ts)) AS rowtime]) + : +- TableSourceScan(table=[[default_catalog, default_database, MyTable1, project=[a0, a1, a2, ts, metadata_0], metadata=[metadata_0], watermark=[-(TO_TIMESTAMP(ts), 1000:INTERVAL SECOND)], watermarkEmitStrategy=[on-periodic]]], fields=[a0, a1, a2, ts, metadata_0])(reuse_id=[1]) + +- Exchange(distribution=[hash[a1]]) + +- Calc(select=[a1]) + +- GlobalWindowAggregate(groupBy=[a0], window=[TUMBLE(slice_end=[$slice_end], size=[1 s])], select=[a0, MIN(min$0) AS a1, start('w$) AS window_start, end('w$) AS window_end]) + +- Exchange(distribution=[hash[a0]]) + +- LocalWindowAggregate(groupBy=[a0], window=[TUMBLE(time_col=[rowtime], size=[1 s])], select=[a0, MIN(a1) AS min$0, slice_end('w$) AS $slice_end]) + +- Calc(select=[a0, a1, Reinterpret(TO_TIMESTAMP(ts)) AS rowtime]) + +- Reused(reference_id=[1]) +]]> + </Resource> + </TestCase> + <TestCase name="testReuseWithReadMetadataAndWatermarkPushDown2[isStreaming: true]"> + <Resource name="sql"> + <![CDATA[SELECT T1.a1, T2.a2 FROM (SELECT a0, window_start, window_end, MIN(a1) as a1 FROM TABLE( TUMBLE(TABLE MyTable1, DESCRIPTOR(rowtime), INTERVAL '1' SECOND)) GROUP BY a0, window_start, window_end) T1, (SELECT a0, window_start, window_end, MIN(a1) as a1, MIN(a2) as a2, MIN(metadata_0) as metadata_0 FROM TABLE( TUMBLE(TABLE MyTable1, DESCRIPTOR(rowtime), INTERVAL '1' SECOND)) GROUP BY a0, window_start, window_end) T2 WHERE T1.a1 = T2.a1]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(a1=[$3], a2=[$8]) ++- LogicalFilter(condition=[=($3, $7)]) + +- LogicalJoin(condition=[true], joinType=[inner]) + :- LogicalAggregate(group=[{0, 1, 2}], a1=[MIN($3)]) + : +- LogicalProject(a0=[$1], window_start=[$6], window_end=[$7], a1=[$2]) + : +- LogicalTableFunctionScan(invocation=[TUMBLE($5, DESCRIPTOR($5), 1000:INTERVAL SECOND)], rowType=[RecordType(INTEGER metadata_0, INTEGER a0, INTEGER a1, INTEGER a2, VARCHAR(2147483647) ts, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)]) + : +- LogicalProject(metadata_0=[$0], a0=[$1], a1=[$2], a2=[$3], ts=[$4], rowtime=[$5]) + : +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)]) + : +- LogicalProject(metadata_0=[$4], a0=[$0], a1=[$1], a2=[$2], ts=[$3], rowtime=[TO_TIMESTAMP($3)]) + : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]]) + +- LogicalAggregate(group=[{0, 1, 2}], a1=[MIN($3)], a2=[MIN($4)], metadata_0=[MIN($5)]) + +- LogicalProject(a0=[$1], window_start=[$6], window_end=[$7], a1=[$2], a2=[$3], metadata_0=[$0]) + +- LogicalTableFunctionScan(invocation=[TUMBLE($5, DESCRIPTOR($5), 1000:INTERVAL SECOND)], rowType=[RecordType(INTEGER metadata_0, INTEGER a0, INTEGER a1, INTEGER a2, VARCHAR(2147483647) ts, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)]) + +- LogicalProject(metadata_0=[$0], a0=[$1], a1=[$2], a2=[$3], ts=[$4], rowtime=[$5]) + +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)]) + +- LogicalProject(metadata_0=[$4], a0=[$0], a1=[$1], a2=[$2], ts=[$3], rowtime=[TO_TIMESTAMP($3)]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]]) +]]> + </Resource> + <Resource name="optimized exec plan"> + <![CDATA[ +Calc(select=[a1, a2]) ++- Join(joinType=[InnerJoin], where=[(a1 = a10)], select=[a1, a10, a2], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) + :- Exchange(distribution=[hash[a1]]) + : +- Calc(select=[a1]) + : +- GlobalWindowAggregate(groupBy=[a0], window=[TUMBLE(slice_end=[$slice_end], size=[1 s])], select=[a0, MIN(min$0) AS a1, start('w$) AS window_start, end('w$) AS window_end]) + : +- Exchange(distribution=[hash[a0]]) + : +- LocalWindowAggregate(groupBy=[a0], window=[TUMBLE(time_col=[rowtime], size=[1 s])], select=[a0, MIN(a1) AS min$0, slice_end('w$) AS $slice_end]) + : +- Calc(select=[a0, a1, Reinterpret(TO_TIMESTAMP(ts)) AS rowtime]) + : +- TableSourceScan(table=[[default_catalog, default_database, MyTable1, project=[a0, a1, a2, ts, metadata_0], metadata=[metadata_0], watermark=[-(TO_TIMESTAMP(ts), 1000:INTERVAL SECOND)], watermarkEmitStrategy=[on-periodic]]], fields=[a0, a1, a2, ts, metadata_0])(reuse_id=[1]) + +- Exchange(distribution=[hash[a1]]) + +- Calc(select=[a1, a2]) + +- GlobalWindowAggregate(groupBy=[a0], window=[TUMBLE(slice_end=[$slice_end], size=[1 s])], select=[a0, MIN(min$0) AS a1, MIN(min$1) AS a2, start('w$) AS window_start, end('w$) AS window_end]) + +- Exchange(distribution=[hash[a0]]) + +- LocalWindowAggregate(groupBy=[a0], window=[TUMBLE(time_col=[rowtime], size=[1 s])], select=[a0, MIN(a1) AS min$0, MIN(a2) AS min$1, slice_end('w$) AS $slice_end]) + +- Calc(select=[a0, a1, a2, metadata_0, Reinterpret(TO_TIMESTAMP(ts)) AS rowtime]) + +- Reused(reference_id=[1]) ]]> </Resource> </TestCase>