This is an automated email from the ASF dual-hosted git repository.
xuyangzhong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new f49981e0efe [FLINK-38224][table-planner] Do not convert to delta join
when the sink is a retract sink (#26907)
f49981e0efe is described below
commit f49981e0efebd1fbc8fd926656198147bd949c71
Author: Xuyang <[email protected]>
AuthorDate: Tue Sep 23 14:53:22 2025 +0800
[FLINK-38224][table-planner] Do not convert to delta join when the sink is
a retract sink (#26907)
---
.../physical/stream/DuplicateChangesInferRule.java | 4 +-
.../stream/DuplicateChangesInferRuleTest.java | 102 +++++++------
.../stream/DuplicateChangesInferRuleTest.xml | 161 ++++++++++++---------
.../planner/plan/stream/sql/DeltaJoinTest.xml | 144 ++++++++++++++----
.../testJsonPlanWithTableHints.out | 10 +-
.../planner/plan/stream/sql/DeltaJoinTest.scala | 75 +++++++++-
6 files changed, 341 insertions(+), 155 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/DuplicateChangesInferRule.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/DuplicateChangesInferRule.java
index 9dda578de6e..7092c4c0385 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/DuplicateChangesInferRule.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/DuplicateChangesInferRule.java
@@ -166,7 +166,9 @@ public class DuplicateChangesInferRule extends
RelRule<DuplicateChangesInferRule
try {
ChangelogMode sinkProvidedChangelogMode =
sink.tableSink().getChangelogMode(ChangelogMode.all());
- if (sinkProvidedChangelogMode.containsOnly(RowKind.INSERT)) {
+ boolean sinkIsAppend =
sinkProvidedChangelogMode.containsOnly(RowKind.INSERT);
+ boolean sinkIsRetract =
sinkProvidedChangelogMode.contains(RowKind.UPDATE_BEFORE);
+ if (sinkIsAppend || sinkIsRetract) {
acceptUpdates = false;
}
} catch (Throwable t) {
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/stream/DuplicateChangesInferRuleTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/stream/DuplicateChangesInferRuleTest.java
index 6827e1403ba..f1a4dc34712 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/stream/DuplicateChangesInferRuleTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/stream/DuplicateChangesInferRuleTest.java
@@ -111,7 +111,7 @@ public class DuplicateChangesInferRuleTest extends
TableTestBase {
util.tableEnv()
.executeSql(
- "CREATE TABLE pk_snk (\n"
+ "CREATE TABLE pk_upsert_snk (\n"
+ " a int not null,\n"
+ " b string not null,\n"
+ " c bigint not null,\n"
@@ -120,14 +120,22 @@ public class DuplicateChangesInferRuleTest extends
TableTestBase {
+ " with (\n"
+ " 'connector' = 'values',\n"
+ " 'sink-insert-only' = 'false',\n"
- + " 'sink-changelog-mode-enforced' =
'I,UA,UB,D'\n"
+ + " 'sink-changelog-mode-enforced' =
'I,UA,D'\n"
+ + ")");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE pk_retract_snk WITH (\n"
+ + " 'sink-changelog-mode-enforced' =
'I,UA,UB,D'"
+ + ") LIKE pk_upsert_snk (\n"
+ + " OVERWRITING OPTIONS\n"
+ ")");
}
@TestTemplate
void testCalc() {
String sql =
- String.format("insert into %s select a,b,c from append_src1",
getSinkTableName());
+ String.format("insert into %s select a, b, c from
append_src1", getSinkTableName());
verifyRelPlanInsert(sql);
}
@@ -135,7 +143,7 @@ public class DuplicateChangesInferRuleTest extends
TableTestBase {
void testCalcWithNonDeterministicFilter1() {
String sql =
String.format(
- "insert into %s select a,b,c from append_src1 where c
< cast(now() as bigint)",
+ "insert into %s select a, b, c from append_src1 where
c < cast(now() as bigint)",
getSinkTableName());
verifyRelPlanInsert(sql);
}
@@ -144,7 +152,7 @@ public class DuplicateChangesInferRuleTest extends
TableTestBase {
void testCalcWithNonDeterministicFilter2() {
String sql =
String.format(
- "insert into %s select a,b,c from append_src1 where a
<> 1 and c < cast(now() as bigint)",
+ "insert into %s select a, b, c from append_src1 where
a <> 1 and c < cast(now() as bigint)",
getSinkTableName());
verifyRelPlanInsert(sql);
}
@@ -153,7 +161,7 @@ public class DuplicateChangesInferRuleTest extends
TableTestBase {
void testCalcWithNestedNonDeterministicFilter() {
String sql =
String.format(
- "insert into %s select a,b,c from append_src1 where c
< cast(cast(now() as int) as bigint)",
+ "insert into %s select a, b, c from append_src1 where
c < cast(cast(now() as int) as bigint)",
getSinkTableName());
verifyRelPlanInsert(sql);
}
@@ -179,7 +187,7 @@ public class DuplicateChangesInferRuleTest extends
TableTestBase {
@TestTemplate
void testAggregate() {
assumeTrue(testSinkWithPk);
- String sql = "insert into pk_snk select a,max(b),sum(c) from
append_src1 group by a";
+ String sql = "insert into pk_upsert_snk select a,max(b),sum(c) from
append_src1 group by a";
verifyRelPlanInsert(sql);
}
@@ -217,12 +225,12 @@ public class DuplicateChangesInferRuleTest extends
TableTestBase {
util.tableEnv()
.executeSql(
- "CREATE TABLE pk_snk_with_time_col (\n"
+ "CREATE TABLE pk_upsert_snk_with_time_col (\n"
+ " w_start timestamp(3),\n"
+ " w_end timestamp(3)\n"
- + ") LIKE pk_snk");
+ + ") LIKE pk_upsert_snk");
String sql =
- "insert into pk_snk_with_time_col select a, b, c,
window_start, window_end "
+ "insert into pk_upsert_snk_with_time_col select a, b, c,
window_start, window_end "
+ "from TABLE(TUMBLE(TABLE append_src1,
DESCRIPTOR(rt), INTERVAL '1' MINUTE))";
verifyRelPlanInsert(sql);
}
@@ -232,7 +240,7 @@ public class DuplicateChangesInferRuleTest extends
TableTestBase {
assumeTrue(testSinkWithPk);
String sql =
- "insert into pk_snk select a, b, c "
+ "insert into pk_upsert_snk select a, b, c "
+ "from ( "
+ " select a, b, c, "
+ " ROW_NUMBER() OVER (PARTITION BY a ORDER BY c
DESC) as rank_num "
@@ -247,12 +255,12 @@ public class DuplicateChangesInferRuleTest extends
TableTestBase {
util.tableEnv()
.executeSql(
- "CREATE TABLE pk_snk_with_time_col (\n"
+ "CREATE TABLE pk_upsert_snk_with_time_col (\n"
+ " w_start timestamp(3),\n"
+ " w_end timestamp(3)\n"
- + ") LIKE pk_snk");
+ + ") LIKE pk_upsert_snk");
String sql =
- "insert into pk_snk_with_time_col select a, b, c,
window_start, window_end "
+ "insert into pk_upsert_snk_with_time_col select a, b, c,
window_start, window_end "
+ "from ( "
+ " select a, b, c, window_start, window_end, "
+ " ROW_NUMBER() OVER (PARTITION BY a,
window_start, window_end ORDER BY c DESC) as rank_num "
@@ -267,7 +275,7 @@ public class DuplicateChangesInferRuleTest extends
TableTestBase {
enableMiniBatch();
- String sql = "insert into pk_snk select a,max(b),sum(c) from
append_src1 group by a";
+ String sql = "insert into pk_upsert_snk select a,max(b),sum(c) from
append_src1 group by a";
verifyRelPlanInsert(sql);
}
@@ -284,7 +292,7 @@ public class DuplicateChangesInferRuleTest extends
TableTestBase {
.set(OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true);
String sql =
- "insert into pk_snk select a, max(b), count(distinct c) from
append_src1 group by a";
+ "insert into pk_upsert_snk select a, max(b), count(distinct c)
from append_src1 group by a";
verifyRelPlanInsert(sql);
}
@@ -292,15 +300,7 @@ public class DuplicateChangesInferRuleTest extends
TableTestBase {
void testDropUpdateBefore() {
assumeTrue(testSinkWithPk);
- util.tableEnv()
- .executeSql(
- "CREATE TABLE upsert_snk WITH (\n"
- + " 'sink-changelog-mode-enforced' = 'I,UA,D'"
- + ") LIKE pk_snk (\n"
- + " OVERWRITING OPTIONS\n"
- + ")");
-
- String sql = "insert into pk_snk select a,b,c from retract_src";
+ String sql = "insert into pk_upsert_snk select a, b, c from
retract_src";
verifyRelPlanInsert(sql);
}
@@ -310,13 +310,21 @@ public class DuplicateChangesInferRuleTest extends
TableTestBase {
util.tableEnv()
.executeSql(
- "CREATE TABLE another_pk_snk (\n"
+ "CREATE TABLE another_pk_upsert_snk (\n"
+ " primary key (b) not enforced\n"
- + ") LIKE pk_snk (\n"
+ + ") LIKE pk_upsert_snk (\n"
+ " EXCLUDING CONSTRAINTS\n"
+ ")");
- String sql = "insert into another_pk_snk select a,b,c from
retract_src";
+ String sql = "insert into another_pk_upsert_snk select a, b, c from
retract_src";
+ verifyRelPlanInsert(sql);
+ }
+
+ @TestTemplate
+ void testRetractSink() {
+ assumeTrue(testSinkWithPk);
+
+ String sql = "insert into pk_retract_snk select a, b, c from
retract_src";
verifyRelPlanInsert(sql);
}
@@ -324,7 +332,7 @@ public class DuplicateChangesInferRuleTest extends
TableTestBase {
void testChangelogNormalize() {
assumeTrue(testSinkWithPk);
- String sql = "insert into pk_snk select a,b,c from upsert_src";
+ String sql = "insert into pk_retract_snk select a, b, c from
upsert_src";
verifyRelPlanInsert(sql);
}
@@ -375,7 +383,7 @@ public class DuplicateChangesInferRuleTest extends
TableTestBase {
void testLimit() {
String sql =
String.format(
- "insert into %s select a,b,c from append_src1 limit
10",
+ "insert into %s select a, b, c from append_src1 limit
10",
getSinkTableName());
verifyRelPlanInsert(sql);
}
@@ -429,7 +437,7 @@ public class DuplicateChangesInferRuleTest extends
TableTestBase {
void testUnion() {
String sql =
String.format(
- "insert into %s select a,b,c from append_src1 union
all select a,b,c from append_src2",
+ "insert into %s select a, b, c from append_src1 union
all select a, b, c from append_src2",
getSinkTableName());
verifyRelPlanInsert(sql);
}
@@ -445,14 +453,14 @@ public class DuplicateChangesInferRuleTest extends
TableTestBase {
true);
util.tableEnv().executeSql("CREATE VIEW my_view as select a, b, c+1 as
c from append_src1");
- util.tableEnv().executeSql("CREATE TABLE pk_snk2 LIKE pk_snk");
+ util.tableEnv().executeSql("CREATE TABLE pk_upsert_snk2 LIKE
pk_upsert_snk");
// left: allow
// right: disallow
// merged: disallow
StatementSet stmtSet = util.tableEnv().createStatementSet();
- stmtSet.addInsertSql("insert into pk_snk select a, b, c/2 from
my_view");
+ stmtSet.addInsertSql("insert into pk_upsert_snk select a, b, c/2 from
my_view");
stmtSet.addInsertSql(
- "insert into pk_snk2 select a, max(b), sum(c) from my_view
group by a");
+ "insert into pk_upsert_snk2 select a, max(b), sum(c) from
my_view group by a");
verifyRelPlanInsert(stmtSet);
}
@@ -467,14 +475,15 @@ public class DuplicateChangesInferRuleTest extends
TableTestBase {
true);
util.tableEnv().executeSql("CREATE VIEW my_view as select a, b, c+1 as
c from append_src1");
- util.tableEnv().executeSql("CREATE TABLE pk_snk2 LIKE pk_snk");
+ util.tableEnv().executeSql("CREATE TABLE pk_upsert_snk2 LIKE
pk_upsert_snk");
// left: disallow
// right: allow
// merged: disallow
StatementSet stmtSet = util.tableEnv().createStatementSet();
- stmtSet.addInsertSql("insert into pk_snk select a, max(b), sum(c) from
my_view group by a");
- stmtSet.addInsertSql("insert into pk_snk2 select a, b, c/2 from
my_view");
+ stmtSet.addInsertSql(
+ "insert into pk_upsert_snk select a, max(b), sum(c) from
my_view group by a");
+ stmtSet.addInsertSql("insert into pk_upsert_snk2 select a, b, c/2 from
my_view");
verifyRelPlanInsert(stmtSet);
}
@@ -489,14 +498,14 @@ public class DuplicateChangesInferRuleTest extends
TableTestBase {
true);
util.tableEnv().executeSql("CREATE VIEW my_view as select a, b, c+1 as
c from append_src1");
- util.tableEnv().executeSql("CREATE TABLE pk_snk2 LIKE pk_snk");
+ util.tableEnv().executeSql("CREATE TABLE pk_upsert_snk2 LIKE
pk_upsert_snk");
// left: allow
// right: allow
// merged: allow
StatementSet stmtSet = util.tableEnv().createStatementSet();
- stmtSet.addInsertSql("insert into pk_snk select a, b, c/3 from
my_view");
- stmtSet.addInsertSql("insert into pk_snk2 select a, b, c/2 from
my_view");
+ stmtSet.addInsertSql("insert into pk_upsert_snk select a, b, c/3 from
my_view");
+ stmtSet.addInsertSql("insert into pk_upsert_snk2 select a, b, c/2 from
my_view");
verifyRelPlanInsert(stmtSet);
}
@@ -511,15 +520,16 @@ public class DuplicateChangesInferRuleTest extends
TableTestBase {
true);
util.tableEnv().executeSql("CREATE VIEW my_view as select a, b, c+1 as
c from append_src1");
- util.tableEnv().executeSql("CREATE TABLE pk_snk2 LIKE pk_snk");
+ util.tableEnv().executeSql("CREATE TABLE pk_upsert_snk2 LIKE
pk_upsert_snk");
// left: disallow
// right: disallow
// merged: disallow
StatementSet stmtSet = util.tableEnv().createStatementSet();
- stmtSet.addInsertSql("insert into pk_snk select a, max(b), sum(c) from
my_view group by a");
stmtSet.addInsertSql(
- "insert into pk_snk2 select a, min(b), max(c) from my_view
group by a");
+ "insert into pk_upsert_snk select a, max(b), sum(c) from
my_view group by a");
+ stmtSet.addInsertSql(
+ "insert into pk_upsert_snk2 select a, min(b), max(c) from
my_view group by a");
verifyRelPlanInsert(stmtSet);
}
@@ -529,13 +539,13 @@ public class DuplicateChangesInferRuleTest extends
TableTestBase {
util.tableEnv()
.executeSql(
- "CREATE TABLE append_only_sink_with_pk (\n"
+ "CREATE TABLE pk_append_sink (\n"
+ " primary key (a) not enforced\n"
+ ") LIKE no_pk_snk (\n"
+ " OVERWRITING OPTIONS\n"
+ ")");
- String sql = "insert into append_only_sink_with_pk select a, b, c from
append_src1";
+ String sql = "insert into pk_append_sink select a, b, c from
append_src1";
verifyRelPlanInsert(sql);
}
@@ -557,7 +567,7 @@ public class DuplicateChangesInferRuleTest extends
TableTestBase {
}
private String getSinkTableName() {
- return testSinkWithPk ? "pk_snk" : "no_pk_snk";
+ return testSinkWithPk ? "pk_upsert_snk" : "no_pk_snk";
}
private void enableMiniBatch() {
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/DuplicateChangesInferRuleTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/DuplicateChangesInferRuleTest.xml
index b2fccab63ef..e762799c559 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/DuplicateChangesInferRuleTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/DuplicateChangesInferRuleTest.xml
@@ -19,7 +19,7 @@ limitations under the License.
<TestCase name="testAggregate[testSinkWithPk = true]">
<Resource name="ast">
<![CDATA[
-LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a,
EXPR$1, EXPR$2])
+LogicalSink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a,
EXPR$1, EXPR$2])
+- LogicalAggregate(group=[{0}], EXPR$1=[MAX($1)], EXPR$2=[SUM($2)])
+- LogicalProject(a=[$0], b=[$1], c=[$2])
+- LogicalWatermarkAssigner(rowtime=[rt], watermark=[-($3, 1000:INTERVAL
SECOND)])
@@ -28,7 +28,7 @@ LogicalSink(table=[default_catalog.default_database.pk_snk],
fields=[a, EXPR$1,
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-Sink(table=[default_catalog.default_database.pk_snk], fields=[a, EXPR$1,
EXPR$2], duplicateChanges=[NONE])
+Sink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a,
EXPR$1, EXPR$2], duplicateChanges=[NONE])
+- GroupAggregate(groupBy=[a], select=[a, MAX(b) AS EXPR$1, SUM(c) AS EXPR$2],
duplicateChanges=[ALLOW])
+- Exchange(distribution=[hash[a]], duplicateChanges=[DISALLOW])
+- Calc(select=[a, b, c], duplicateChanges=[DISALLOW])
@@ -40,7 +40,7 @@ Sink(table=[default_catalog.default_database.pk_snk],
fields=[a, EXPR$1, EXPR$2]
<TestCase name="testAppendOnlySinkWithPk[testSinkWithPk = false]">
<Resource name="ast">
<![CDATA[
-LogicalSink(table=[default_catalog.default_database.append_only_sink_with_pk],
fields=[a, b, c])
+LogicalSink(table=[default_catalog.default_database.pk_append_sink],
fields=[a, b, c])
+- LogicalProject(a=[$0], b=[$1], c=[$2])
+- LogicalWatermarkAssigner(rowtime=[rt], watermark=[-($3, 1000:INTERVAL
SECOND)])
+- LogicalTableScan(table=[[default_catalog, default_database,
append_src1]])
@@ -48,7 +48,7 @@
LogicalSink(table=[default_catalog.default_database.append_only_sink_with_pk], f
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-Sink(table=[default_catalog.default_database.append_only_sink_with_pk],
fields=[a, b, c], duplicateChanges=[NONE])
+Sink(table=[default_catalog.default_database.pk_append_sink], fields=[a, b,
c], duplicateChanges=[NONE])
+- Calc(select=[a, b, c], duplicateChanges=[DISALLOW])
+- WatermarkAssigner(rowtime=[rt], watermark=[-(rt, 1000:INTERVAL SECOND)],
duplicateChanges=[DISALLOW])
+- TableSourceScan(table=[[default_catalog, default_database,
append_src1]], fields=[a, b, c, rt], duplicateChanges=[DISALLOW])
@@ -76,7 +76,7 @@ Sink(table=[default_catalog.default_database.no_pk_snk],
fields=[a, b, c], dupli
<TestCase name="testCalc[testSinkWithPk = true]">
<Resource name="ast">
<![CDATA[
-LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c])
+LogicalSink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a,
b, c])
+- LogicalProject(a=[$0], b=[$1], c=[$2])
+- LogicalWatermarkAssigner(rowtime=[rt], watermark=[-($3, 1000:INTERVAL
SECOND)])
+- LogicalTableScan(table=[[default_catalog, default_database,
append_src1]])
@@ -84,7 +84,7 @@ LogicalSink(table=[default_catalog.default_database.pk_snk],
fields=[a, b, c])
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-Sink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c],
duplicateChanges=[NONE])
+Sink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, b, c],
duplicateChanges=[NONE])
+- Calc(select=[a, b, c], duplicateChanges=[ALLOW])
+- WatermarkAssigner(rowtime=[rt], watermark=[-(rt, 1000:INTERVAL SECOND)],
duplicateChanges=[ALLOW])
+- TableSourceScan(table=[[default_catalog, default_database,
append_src1]], fields=[a, b, c, rt], duplicateChanges=[ALLOW])
@@ -113,7 +113,7 @@ Sink(table=[default_catalog.default_database.no_pk_snk],
fields=[a, b, c], dupli
<TestCase name="testCalcWithNestedNonDeterministicFilter[testSinkWithPk =
true]">
<Resource name="ast">
<![CDATA[
-LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c])
+LogicalSink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a,
b, c])
+- LogicalProject(a=[$0], b=[$1], c=[$2])
+- LogicalFilter(condition=[<($2, CAST(CAST(NOW()):INTEGER NOT NULL):BIGINT
NOT NULL)])
+- LogicalWatermarkAssigner(rowtime=[rt], watermark=[-($3, 1000:INTERVAL
SECOND)])
@@ -122,7 +122,7 @@
LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c])
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-Sink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c],
duplicateChanges=[NONE])
+Sink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, b, c],
duplicateChanges=[NONE])
+- Calc(select=[a, b, c], where=[<(c, CAST(CAST(NOW() AS INTEGER) AS
BIGINT))], duplicateChanges=[ALLOW])
+- WatermarkAssigner(rowtime=[rt], watermark=[-(rt, 1000:INTERVAL SECOND)],
duplicateChanges=[DISALLOW])
+- TableSourceScan(table=[[default_catalog, default_database,
append_src1]], fields=[a, b, c, rt], duplicateChanges=[DISALLOW])
@@ -150,7 +150,7 @@ Sink(table=[default_catalog.default_database.no_pk_snk],
fields=[a, b, EXPR$2],
<TestCase name="testCalcWithNestedNonDeterministicProjection[testSinkWithPk
= true]">
<Resource name="ast">
<![CDATA[
-LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, b,
EXPR$2])
+LogicalSink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a,
b, EXPR$2])
+- LogicalProject(a=[$0], b=[$1], EXPR$2=[CAST(CAST(NOW()):INTEGER NOT
NULL):BIGINT NOT NULL])
+- LogicalWatermarkAssigner(rowtime=[rt], watermark=[-($3, 1000:INTERVAL
SECOND)])
+- LogicalTableScan(table=[[default_catalog, default_database,
append_src1]])
@@ -158,7 +158,7 @@
LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, b, EXPR$
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-Sink(table=[default_catalog.default_database.pk_snk], fields=[a, b, EXPR$2],
duplicateChanges=[NONE])
+Sink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, b,
EXPR$2], duplicateChanges=[NONE])
+- Calc(select=[a, b, CAST(CAST(NOW() AS INTEGER) AS BIGINT) AS EXPR$2],
duplicateChanges=[ALLOW])
+- WatermarkAssigner(rowtime=[rt], watermark=[-(rt, 1000:INTERVAL SECOND)],
duplicateChanges=[DISALLOW])
+- TableSourceScan(table=[[default_catalog, default_database,
append_src1, project=[a, b, rt], metadata=[]]], fields=[a, b, rt],
duplicateChanges=[DISALLOW])
@@ -187,7 +187,7 @@ Sink(table=[default_catalog.default_database.no_pk_snk],
fields=[a, b, c], dupli
<TestCase name="testCalcWithNonDeterministicFilter1[testSinkWithPk = true]">
<Resource name="ast">
<![CDATA[
-LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c])
+LogicalSink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a,
b, c])
+- LogicalProject(a=[$0], b=[$1], c=[$2])
+- LogicalFilter(condition=[<($2, CAST(NOW()):BIGINT NOT NULL)])
+- LogicalWatermarkAssigner(rowtime=[rt], watermark=[-($3, 1000:INTERVAL
SECOND)])
@@ -196,7 +196,7 @@
LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c])
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-Sink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c],
duplicateChanges=[NONE])
+Sink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, b, c],
duplicateChanges=[NONE])
+- Calc(select=[a, b, c], where=[<(c, CAST(NOW() AS BIGINT))],
duplicateChanges=[ALLOW])
+- WatermarkAssigner(rowtime=[rt], watermark=[-(rt, 1000:INTERVAL SECOND)],
duplicateChanges=[DISALLOW])
+- TableSourceScan(table=[[default_catalog, default_database,
append_src1]], fields=[a, b, c, rt], duplicateChanges=[DISALLOW])
@@ -225,7 +225,7 @@ Sink(table=[default_catalog.default_database.no_pk_snk],
fields=[a, b, c], dupli
<TestCase name="testCalcWithNonDeterministicFilter2[testSinkWithPk = true]">
<Resource name="ast">
<![CDATA[
-LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c])
+LogicalSink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a,
b, c])
+- LogicalProject(a=[$0], b=[$1], c=[$2])
+- LogicalFilter(condition=[AND(<>($0, 1), <($2, CAST(NOW()):BIGINT NOT
NULL))])
+- LogicalWatermarkAssigner(rowtime=[rt], watermark=[-($3, 1000:INTERVAL
SECOND)])
@@ -234,7 +234,7 @@
LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c])
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-Sink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c],
duplicateChanges=[NONE])
+Sink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, b, c],
duplicateChanges=[NONE])
+- Calc(select=[a, b, c], where=[AND(<>(a, 1), <(c, CAST(NOW() AS BIGINT)))],
duplicateChanges=[ALLOW])
+- WatermarkAssigner(rowtime=[rt], watermark=[-(rt, 1000:INTERVAL SECOND)],
duplicateChanges=[DISALLOW])
+- TableSourceScan(table=[[default_catalog, default_database,
append_src1]], fields=[a, b, c, rt], duplicateChanges=[DISALLOW])
@@ -262,7 +262,7 @@ Sink(table=[default_catalog.default_database.no_pk_snk],
fields=[a, b, EXPR$2],
<TestCase name="testCalcWithNonDeterministicProjection[testSinkWithPk =
true]">
<Resource name="ast">
<![CDATA[
-LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, b,
EXPR$2])
+LogicalSink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a,
b, EXPR$2])
+- LogicalProject(a=[$0], b=[$1], EXPR$2=[CAST(NOW()):BIGINT NOT NULL])
+- LogicalWatermarkAssigner(rowtime=[rt], watermark=[-($3, 1000:INTERVAL
SECOND)])
+- LogicalTableScan(table=[[default_catalog, default_database,
append_src1]])
@@ -270,7 +270,7 @@
LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, b, EXPR$
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-Sink(table=[default_catalog.default_database.pk_snk], fields=[a, b, EXPR$2],
duplicateChanges=[NONE])
+Sink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, b,
EXPR$2], duplicateChanges=[NONE])
+- Calc(select=[a, b, CAST(NOW() AS BIGINT) AS EXPR$2],
duplicateChanges=[ALLOW])
+- WatermarkAssigner(rowtime=[rt], watermark=[-(rt, 1000:INTERVAL SECOND)],
duplicateChanges=[DISALLOW])
+- TableSourceScan(table=[[default_catalog, default_database,
append_src1, project=[a, b, rt], metadata=[]]], fields=[a, b, rt],
duplicateChanges=[DISALLOW])
@@ -280,7 +280,7 @@ Sink(table=[default_catalog.default_database.pk_snk],
fields=[a, b, EXPR$2], dup
<TestCase name="testChangelogNormalize[testSinkWithPk = true]">
<Resource name="ast">
<![CDATA[
-LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c])
+LogicalSink(table=[default_catalog.default_database.pk_retract_snk],
fields=[a, b, c])
+- LogicalProject(a=[$0], b=[$1], c=[$2])
+- LogicalWatermarkAssigner(rowtime=[rt], watermark=[-($3, 1000:INTERVAL
SECOND)])
+- LogicalTableScan(table=[[default_catalog, default_database,
upsert_src]])
@@ -288,8 +288,8 @@
LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c])
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-Sink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c],
duplicateChanges=[NONE])
-+- ChangelogNormalize(key=[a], duplicateChanges=[ALLOW])
+Sink(table=[default_catalog.default_database.pk_retract_snk], fields=[a, b,
c], duplicateChanges=[NONE])
++- ChangelogNormalize(key=[a], duplicateChanges=[DISALLOW])
+- Exchange(distribution=[hash[a]], duplicateChanges=[DISALLOW])
+- Calc(select=[a, b, c], duplicateChanges=[DISALLOW])
+- WatermarkAssigner(rowtime=[rt], watermark=[-(rt, 1000:INTERVAL
SECOND)], duplicateChanges=[DISALLOW])
@@ -300,7 +300,7 @@ Sink(table=[default_catalog.default_database.pk_snk],
fields=[a, b, c], duplicat
<TestCase name="testDropUpdateBefore[testSinkWithPk = true]">
<Resource name="ast">
<![CDATA[
-LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c])
+LogicalSink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a,
b, c])
+- LogicalProject(a=[$0], b=[$1], c=[$2])
+- LogicalWatermarkAssigner(rowtime=[rt], watermark=[-($3, 1000:INTERVAL
SECOND)])
+- LogicalTableScan(table=[[default_catalog, default_database,
retract_src]])
@@ -308,17 +308,18 @@
LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c])
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-Sink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c],
duplicateChanges=[NONE])
+Sink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, b, c],
duplicateChanges=[NONE])
+- Calc(select=[a, b, c], duplicateChanges=[ALLOW])
+- WatermarkAssigner(rowtime=[rt], watermark=[-(rt, 1000:INTERVAL SECOND)],
duplicateChanges=[ALLOW])
- +- TableSourceScan(table=[[default_catalog, default_database,
retract_src]], fields=[a, b, c, rt], duplicateChanges=[ALLOW])
+ +- DropUpdateBefore(duplicateChanges=[ALLOW])
+ +- TableSourceScan(table=[[default_catalog, default_database,
retract_src]], fields=[a, b, c, rt], duplicateChanges=[ALLOW])
]]>
</Resource>
</TestCase>
<TestCase name="testExpandAndIncrementalAggregate[testSinkWithPk = true]">
<Resource name="ast">
<![CDATA[
-LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a,
EXPR$1, EXPR$2])
+LogicalSink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a,
EXPR$1, EXPR$2])
+- LogicalAggregate(group=[{0}], EXPR$1=[MAX($1)], EXPR$2=[COUNT(DISTINCT $2)])
+- LogicalProject(a=[$0], b=[$1], c=[$2])
+- LogicalWatermarkAssigner(rowtime=[rt], watermark=[-($3, 1000:INTERVAL
SECOND)])
@@ -327,7 +328,7 @@
LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, EXPR$1,
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-Sink(table=[default_catalog.default_database.pk_snk], fields=[a, $f1, $f2],
duplicateChanges=[NONE])
+Sink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, $f1,
$f2], duplicateChanges=[NONE])
+- Calc(select=[a, CAST($f1 AS VARCHAR(2147483647)) AS $f1, $f2],
duplicateChanges=[ALLOW])
+- GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a,
MAX(max$0) AS $f1, $SUM0(count$1) AS $f2], duplicateChanges=[ALLOW])
+- Exchange(distribution=[hash[a]], duplicateChanges=[DISALLOW])
@@ -346,7 +347,7 @@ Sink(table=[default_catalog.default_database.pk_snk],
fields=[a, $f1, $f2], dupl
<TestCase name="testIntervalJoin[testSinkWithPk = true]">
<Resource name="ast">
<![CDATA[
-LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c])
+LogicalSink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a,
b, c])
+- LogicalProject(a=[$0], b=[$5], c=[$2])
+- LogicalJoin(condition=[AND(=($0, $4), >($3, -($7, 60000:INTERVAL
MINUTE)), <($3, +($7, 60000:INTERVAL MINUTE)))], joinType=[inner])
:- LogicalWatermarkAssigner(rowtime=[rt], watermark=[-($3, 1000:INTERVAL
SECOND)])
@@ -357,7 +358,7 @@
LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c])
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-Sink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c],
duplicateChanges=[NONE])
+Sink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, b, c],
duplicateChanges=[NONE])
+- Calc(select=[a, b, c], duplicateChanges=[ALLOW])
+- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true,
leftLowerBound=-59999, leftUpperBound=59999, leftTimeIndex=2,
rightTimeIndex=2], where=[AND(=(a, a0), >(rt, -(rt0, 60000:INTERVAL MINUTE)),
<(rt, +(rt0, 60000:INTERVAL MINUTE)))], select=[a, c, rt, a0, b, rt0],
duplicateChanges=[ALLOW])
:- Exchange(distribution=[hash[a]], duplicateChanges=[DISALLOW])
@@ -400,7 +401,7 @@ Sink(table=[default_catalog.default_database.no_pk_snk],
fields=[a, b, c], dupli
<TestCase name="testJoin[testSinkWithPk = true]">
<Resource name="ast">
<![CDATA[
-LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c])
+LogicalSink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a,
b, c])
+- LogicalProject(a=[$0], b=[$5], c=[$2])
+- LogicalJoin(condition=[=($0, $4)], joinType=[inner])
:- LogicalWatermarkAssigner(rowtime=[rt], watermark=[-($3, 1000:INTERVAL
SECOND)])
@@ -411,7 +412,7 @@
LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c])
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-Sink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c],
duplicateChanges=[NONE])
+Sink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, b, c],
duplicateChanges=[NONE])
+- Calc(select=[a, b, c], duplicateChanges=[ALLOW])
+- Join(joinType=[InnerJoin], where=[=(a, a0)], select=[a, c, a0, b],
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey],
duplicateChanges=[ALLOW])
:- Exchange(distribution=[hash[a]], duplicateChanges=[DISALLOW])
@@ -449,7 +450,7 @@ Sink(table=[default_catalog.default_database.no_pk_snk],
fields=[a, b, c], dupli
<TestCase name="testLimit[testSinkWithPk = true]">
<Resource name="ast">
<![CDATA[
-LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c])
+LogicalSink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a,
b, c])
+- LogicalSort(fetch=[10])
+- LogicalProject(a=[$0], b=[$1], c=[$2])
+- LogicalWatermarkAssigner(rowtime=[rt], watermark=[-($3, 1000:INTERVAL
SECOND)])
@@ -458,7 +459,7 @@
LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c])
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-Sink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c],
duplicateChanges=[NONE])
+Sink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, b, c],
duplicateChanges=[NONE])
+- Calc(select=[a, b, c], duplicateChanges=[ALLOW])
+- Limit(offset=[0], fetch=[10], duplicateChanges=[ALLOW])
+- Exchange(distribution=[single], duplicateChanges=[DISALLOW])
@@ -495,7 +496,7 @@ Sink(table=[default_catalog.default_database.no_pk_snk],
fields=[a, b, c], dupli
<TestCase name="testLookupJoin[testSinkWithPk = true]">
<Resource name="ast">
<![CDATA[
-LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c])
+LogicalSink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a,
b, c])
+- LogicalProject(a=[$0], b=[$1], c=[$2])
+- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{0, 4}])
:- LogicalProject(a=[$0], b=[$1], c=[$2], rt=[$3], pt=[PROCTIME()])
@@ -508,7 +509,7 @@
LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c])
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-Sink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c],
duplicateChanges=[NONE])
+Sink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, b, c],
duplicateChanges=[NONE])
+- Calc(select=[a, b, c], duplicateChanges=[ALLOW])
+- LookupJoin(table=[default_catalog.default_database.dim_src],
joinType=[InnerJoin], lookup=[a=a], select=[a, b, c, a0],
duplicateChanges=[ALLOW])
+- Calc(select=[a, b, c], duplicateChanges=[DISALLOW])
@@ -520,7 +521,7 @@ Sink(table=[default_catalog.default_database.pk_snk],
fields=[a, b, c], duplicat
<TestCase name="testMiniBatchTwoStageAggregate[testSinkWithPk = true]">
<Resource name="ast">
<![CDATA[
-LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a,
EXPR$1, EXPR$2])
+LogicalSink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a,
EXPR$1, EXPR$2])
+- LogicalAggregate(group=[{0}], EXPR$1=[MAX($1)], EXPR$2=[SUM($2)])
+- LogicalProject(a=[$0], b=[$1], c=[$2])
+- LogicalWatermarkAssigner(rowtime=[rt], watermark=[-($3, 1000:INTERVAL
SECOND)])
@@ -529,7 +530,7 @@
LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, EXPR$1,
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-Sink(table=[default_catalog.default_database.pk_snk], fields=[a, EXPR$1,
EXPR$2], duplicateChanges=[NONE])
+Sink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a,
EXPR$1, EXPR$2], duplicateChanges=[NONE])
+- GlobalGroupAggregate(groupBy=[a], select=[a, MAX(max$0) AS EXPR$1,
SUM(sum$1) AS EXPR$2], duplicateChanges=[ALLOW])
+- Exchange(distribution=[hash[a]], duplicateChanges=[DISALLOW])
+- LocalGroupAggregate(groupBy=[a], select=[a, MAX(b) AS max$0, SUM(c)
AS sum$1], duplicateChanges=[DISALLOW])
@@ -543,13 +544,13 @@ Sink(table=[default_catalog.default_database.pk_snk],
fields=[a, EXPR$1, EXPR$2]
<TestCase name="testMultiSink1[testSinkWithPk = true]">
<Resource name="ast">
<![CDATA[
-LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, b,
EXPR$2])
+LogicalSink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a,
b, EXPR$2])
+- LogicalProject(a=[$0], b=[$1], EXPR$2=[/($2, 2)])
+- LogicalProject(a=[$0], b=[$1], c=[+($2, 1)])
+- LogicalWatermarkAssigner(rowtime=[rt], watermark=[-($3, 1000:INTERVAL
SECOND)])
+- LogicalTableScan(table=[[default_catalog, default_database,
append_src1]])
-LogicalSink(table=[default_catalog.default_database.pk_snk2], fields=[a,
EXPR$1, EXPR$2])
+LogicalSink(table=[default_catalog.default_database.pk_upsert_snk2],
fields=[a, EXPR$1, EXPR$2])
+- LogicalAggregate(group=[{0}], EXPR$1=[MAX($1)], EXPR$2=[SUM($2)])
+- LogicalProject(a=[$0], b=[$1], c=[+($2, 1)])
+- LogicalWatermarkAssigner(rowtime=[rt], watermark=[-($3, 1000:INTERVAL
SECOND)])
@@ -558,13 +559,13 @@
LogicalSink(table=[default_catalog.default_database.pk_snk2], fields=[a, EXPR$1,
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-Sink(table=[default_catalog.default_database.pk_snk], fields=[a, b, EXPR$2],
duplicateChanges=[NONE])
+Sink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, b,
EXPR$2], duplicateChanges=[NONE])
+- Calc(select=[a, b, /(c, 2) AS EXPR$2], duplicateChanges=[ALLOW])
+- Calc(select=[a, b, +(c, 1) AS c], duplicateChanges=[DISALLOW])
+- WatermarkAssigner(rowtime=[rt], watermark=[-(rt, 1000:INTERVAL
SECOND)], duplicateChanges=[DISALLOW])
+- TableSourceScan(table=[[default_catalog, default_database,
append_src1]], fields=[a, b, c, rt], duplicateChanges=[DISALLOW])
-Sink(table=[default_catalog.default_database.pk_snk2], fields=[a, EXPR$1,
EXPR$2], duplicateChanges=[NONE])
+Sink(table=[default_catalog.default_database.pk_upsert_snk2], fields=[a,
EXPR$1, EXPR$2], duplicateChanges=[NONE])
+- GroupAggregate(groupBy=[a], select=[a, MAX(b) AS EXPR$1, SUM(c) AS EXPR$2],
duplicateChanges=[ALLOW])
+- Exchange(distribution=[hash[a]], duplicateChanges=[DISALLOW])
+- Calc(select=[a, b, +(c, 1) AS c], duplicateChanges=[DISALLOW])
@@ -576,13 +577,13 @@ Sink(table=[default_catalog.default_database.pk_snk2],
fields=[a, EXPR$1, EXPR$2
<TestCase name="testMultiSink2[testSinkWithPk = true]">
<Resource name="ast">
<![CDATA[
-LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a,
EXPR$1, EXPR$2])
+LogicalSink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a,
EXPR$1, EXPR$2])
+- LogicalAggregate(group=[{0}], EXPR$1=[MAX($1)], EXPR$2=[SUM($2)])
+- LogicalProject(a=[$0], b=[$1], c=[+($2, 1)])
+- LogicalWatermarkAssigner(rowtime=[rt], watermark=[-($3, 1000:INTERVAL
SECOND)])
+- LogicalTableScan(table=[[default_catalog, default_database,
append_src1]])
-LogicalSink(table=[default_catalog.default_database.pk_snk2], fields=[a, b,
EXPR$2])
+LogicalSink(table=[default_catalog.default_database.pk_upsert_snk2],
fields=[a, b, EXPR$2])
+- LogicalProject(a=[$0], b=[$1], EXPR$2=[/($2, 2)])
+- LogicalProject(a=[$0], b=[$1], c=[+($2, 1)])
+- LogicalWatermarkAssigner(rowtime=[rt], watermark=[-($3, 1000:INTERVAL
SECOND)])
@@ -591,14 +592,14 @@
LogicalSink(table=[default_catalog.default_database.pk_snk2], fields=[a, b, EXPR
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-Sink(table=[default_catalog.default_database.pk_snk], fields=[a, EXPR$1,
EXPR$2], duplicateChanges=[NONE])
+Sink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a,
EXPR$1, EXPR$2], duplicateChanges=[NONE])
+- GroupAggregate(groupBy=[a], select=[a, MAX(b) AS EXPR$1, SUM(c) AS EXPR$2],
duplicateChanges=[ALLOW])
+- Exchange(distribution=[hash[a]], duplicateChanges=[DISALLOW])
+- Calc(select=[a, b, +(c, 1) AS c], duplicateChanges=[DISALLOW])
+- WatermarkAssigner(rowtime=[rt], watermark=[-(rt, 1000:INTERVAL
SECOND)], duplicateChanges=[DISALLOW])
+- TableSourceScan(table=[[default_catalog, default_database,
append_src1]], fields=[a, b, c, rt], duplicateChanges=[DISALLOW])
-Sink(table=[default_catalog.default_database.pk_snk2], fields=[a, b, EXPR$2],
duplicateChanges=[NONE])
+Sink(table=[default_catalog.default_database.pk_upsert_snk2], fields=[a, b,
EXPR$2], duplicateChanges=[NONE])
+- Calc(select=[a, b, /(c, 2) AS EXPR$2], duplicateChanges=[ALLOW])
+- Calc(select=[a, b, +(c, 1) AS c], duplicateChanges=[DISALLOW])
+- WatermarkAssigner(rowtime=[rt], watermark=[-(rt, 1000:INTERVAL
SECOND)], duplicateChanges=[DISALLOW])
@@ -609,13 +610,13 @@ Sink(table=[default_catalog.default_database.pk_snk2],
fields=[a, b, EXPR$2], du
<TestCase name="testMultiSink3[testSinkWithPk = true]">
<Resource name="ast">
<![CDATA[
-LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, b,
EXPR$2])
+LogicalSink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a,
b, EXPR$2])
+- LogicalProject(a=[$0], b=[$1], EXPR$2=[/($2, 3)])
+- LogicalProject(a=[$0], b=[$1], c=[+($2, 1)])
+- LogicalWatermarkAssigner(rowtime=[rt], watermark=[-($3, 1000:INTERVAL
SECOND)])
+- LogicalTableScan(table=[[default_catalog, default_database,
append_src1]])
-LogicalSink(table=[default_catalog.default_database.pk_snk2], fields=[a, b,
EXPR$2])
+LogicalSink(table=[default_catalog.default_database.pk_upsert_snk2],
fields=[a, b, EXPR$2])
+- LogicalProject(a=[$0], b=[$1], EXPR$2=[/($2, 2)])
+- LogicalProject(a=[$0], b=[$1], c=[+($2, 1)])
+- LogicalWatermarkAssigner(rowtime=[rt], watermark=[-($3, 1000:INTERVAL
SECOND)])
@@ -624,13 +625,13 @@
LogicalSink(table=[default_catalog.default_database.pk_snk2], fields=[a, b, EXPR
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-Sink(table=[default_catalog.default_database.pk_snk], fields=[a, b, EXPR$2],
duplicateChanges=[NONE])
+Sink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, b,
EXPR$2], duplicateChanges=[NONE])
+- Calc(select=[a, b, /(c, 3) AS EXPR$2], duplicateChanges=[ALLOW])
+- Calc(select=[a, b, +(c, 1) AS c], duplicateChanges=[ALLOW])
+- WatermarkAssigner(rowtime=[rt], watermark=[-(rt, 1000:INTERVAL
SECOND)], duplicateChanges=[ALLOW])
+- TableSourceScan(table=[[default_catalog, default_database,
append_src1]], fields=[a, b, c, rt], duplicateChanges=[ALLOW])
-Sink(table=[default_catalog.default_database.pk_snk2], fields=[a, b, EXPR$2],
duplicateChanges=[NONE])
+Sink(table=[default_catalog.default_database.pk_upsert_snk2], fields=[a, b,
EXPR$2], duplicateChanges=[NONE])
+- Calc(select=[a, b, /(c, 2) AS EXPR$2], duplicateChanges=[ALLOW])
+- Calc(select=[a, b, +(c, 1) AS c], duplicateChanges=[ALLOW])
+- WatermarkAssigner(rowtime=[rt], watermark=[-(rt, 1000:INTERVAL
SECOND)], duplicateChanges=[ALLOW])
@@ -641,13 +642,13 @@ Sink(table=[default_catalog.default_database.pk_snk2],
fields=[a, b, EXPR$2], du
<TestCase name="testMultiSink4[testSinkWithPk = true]">
<Resource name="ast">
<![CDATA[
-LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a,
EXPR$1, EXPR$2])
+LogicalSink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a,
EXPR$1, EXPR$2])
+- LogicalAggregate(group=[{0}], EXPR$1=[MAX($1)], EXPR$2=[SUM($2)])
+- LogicalProject(a=[$0], b=[$1], c=[+($2, 1)])
+- LogicalWatermarkAssigner(rowtime=[rt], watermark=[-($3, 1000:INTERVAL
SECOND)])
+- LogicalTableScan(table=[[default_catalog, default_database,
append_src1]])
-LogicalSink(table=[default_catalog.default_database.pk_snk2], fields=[a,
EXPR$1, EXPR$2])
+LogicalSink(table=[default_catalog.default_database.pk_upsert_snk2],
fields=[a, EXPR$1, EXPR$2])
+- LogicalAggregate(group=[{0}], EXPR$1=[MIN($1)], EXPR$2=[MAX($2)])
+- LogicalProject(a=[$0], b=[$1], c=[+($2, 1)])
+- LogicalWatermarkAssigner(rowtime=[rt], watermark=[-($3, 1000:INTERVAL
SECOND)])
@@ -656,14 +657,14 @@
LogicalSink(table=[default_catalog.default_database.pk_snk2], fields=[a, EXPR$1,
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-Sink(table=[default_catalog.default_database.pk_snk], fields=[a, EXPR$1,
EXPR$2], duplicateChanges=[NONE])
+Sink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a,
EXPR$1, EXPR$2], duplicateChanges=[NONE])
+- GroupAggregate(groupBy=[a], select=[a, MAX(b) AS EXPR$1, SUM(c) AS EXPR$2],
duplicateChanges=[ALLOW])
+- Exchange(distribution=[hash[a]], duplicateChanges=[DISALLOW])
+- Calc(select=[a, b, +(c, 1) AS c], duplicateChanges=[DISALLOW])
+- WatermarkAssigner(rowtime=[rt], watermark=[-(rt, 1000:INTERVAL
SECOND)], duplicateChanges=[DISALLOW])
+- TableSourceScan(table=[[default_catalog, default_database,
append_src1]], fields=[a, b, c, rt], duplicateChanges=[DISALLOW])
-Sink(table=[default_catalog.default_database.pk_snk2], fields=[a, EXPR$1,
EXPR$2], duplicateChanges=[NONE])
+Sink(table=[default_catalog.default_database.pk_upsert_snk2], fields=[a,
EXPR$1, EXPR$2], duplicateChanges=[NONE])
+- GroupAggregate(groupBy=[a], select=[a, MIN(b) AS EXPR$1, MAX(c) AS EXPR$2],
duplicateChanges=[ALLOW])
+- Exchange(distribution=[hash[a]], duplicateChanges=[DISALLOW])
+- Calc(select=[a, b, +(c, 1) AS c], duplicateChanges=[DISALLOW])
@@ -699,7 +700,7 @@ Sink(table=[default_catalog.default_database.no_pk_snk],
fields=[a, EXPR$1, EXPR
<TestCase name="testOneStageWindowAggregate[testSinkWithPk = true]">
<Resource name="ast">
<![CDATA[
-LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a,
EXPR$1, EXPR$2])
+LogicalSink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a,
EXPR$1, EXPR$2])
+- LogicalProject(a=[$0], EXPR$1=[$3], EXPR$2=[$4])
+- LogicalAggregate(group=[{0, 1, 2}], EXPR$1=[MAX($3)], EXPR$2=[MAX($4)])
+- LogicalProject(a=[$0], window_start=[$4], window_end=[$5], b=[$1],
c=[$2])
@@ -711,7 +712,7 @@
LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, EXPR$1,
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-Sink(table=[default_catalog.default_database.pk_snk], fields=[a, EXPR$1,
EXPR$2], duplicateChanges=[NONE])
+Sink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a,
EXPR$1, EXPR$2], duplicateChanges=[NONE])
+- Calc(select=[a, EXPR$1, EXPR$2], duplicateChanges=[ALLOW])
+- WindowAggregate(groupBy=[a], window=[TUMBLE(time_col=[rt], size=[1
min])], select=[a, MAX(b) AS EXPR$1, MAX(c) AS EXPR$2, start('w$) AS
window_start, end('w$) AS window_end], duplicateChanges=[ALLOW])
+- Exchange(distribution=[hash[a]], duplicateChanges=[DISALLOW])
@@ -723,7 +724,7 @@ Sink(table=[default_catalog.default_database.pk_snk],
fields=[a, EXPR$1, EXPR$2]
<TestCase name="testRank[testSinkWithPk = true]">
<Resource name="ast">
<![CDATA[
-LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c])
+LogicalSink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a,
b, c])
+- LogicalProject(a=[$0], b=[$1], c=[$2])
+- LogicalFilter(condition=[<=($3, 1)])
+- LogicalProject(a=[$0], b=[$1], c=[$2], rank_num=[ROW_NUMBER() OVER
(PARTITION BY $0 ORDER BY $2 DESC NULLS LAST)])
@@ -733,19 +734,37 @@
LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c])
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-Sink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c],
duplicateChanges=[NONE])
+Sink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, b, c],
duplicateChanges=[NONE])
+- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER],
rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[c DESC],
select=[a, b, c], duplicateChanges=[ALLOW])
+- Exchange(distribution=[hash[a]], duplicateChanges=[DISALLOW])
+- Calc(select=[a, b, c], duplicateChanges=[DISALLOW])
+- WatermarkAssigner(rowtime=[rt], watermark=[-(rt, 1000:INTERVAL
SECOND)], duplicateChanges=[DISALLOW])
+- TableSourceScan(table=[[default_catalog, default_database,
append_src1]], fields=[a, b, c, rt], duplicateChanges=[DISALLOW])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testRetractSink[testSinkWithPk = true]">
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.pk_retract_snk],
fields=[a, b, c])
++- LogicalProject(a=[$0], b=[$1], c=[$2])
+ +- LogicalWatermarkAssigner(rowtime=[rt], watermark=[-($3, 1000:INTERVAL
SECOND)])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
retract_src]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Sink(table=[default_catalog.default_database.pk_retract_snk], fields=[a, b,
c], duplicateChanges=[NONE])
++- Calc(select=[a, b, c], duplicateChanges=[DISALLOW])
+ +- WatermarkAssigner(rowtime=[rt], watermark=[-(rt, 1000:INTERVAL SECOND)],
duplicateChanges=[DISALLOW])
+ +- TableSourceScan(table=[[default_catalog, default_database,
retract_src]], fields=[a, b, c, rt], duplicateChanges=[DISALLOW])
]]>
</Resource>
</TestCase>
<TestCase name="testSinkWithMaterialize[testSinkWithPk = true]">
<Resource name="ast">
<![CDATA[
-LogicalSink(table=[default_catalog.default_database.another_pk_snk],
fields=[a, b, c])
+LogicalSink(table=[default_catalog.default_database.another_pk_upsert_snk],
fields=[a, b, c])
+- LogicalProject(a=[$0], b=[$1], c=[$2])
+- LogicalWatermarkAssigner(rowtime=[rt], watermark=[-($3, 1000:INTERVAL
SECOND)])
+- LogicalTableScan(table=[[default_catalog, default_database,
retract_src]])
@@ -753,10 +772,10 @@
LogicalSink(table=[default_catalog.default_database.another_pk_snk], fields=[a,
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-Sink(table=[default_catalog.default_database.another_pk_snk], fields=[a, b,
c], duplicateChanges=[NONE])
-+- Calc(select=[a, b, c], duplicateChanges=[ALLOW])
- +- WatermarkAssigner(rowtime=[rt], watermark=[-(rt, 1000:INTERVAL SECOND)],
duplicateChanges=[ALLOW])
- +- TableSourceScan(table=[[default_catalog, default_database,
retract_src]], fields=[a, b, c, rt], duplicateChanges=[ALLOW])
+Sink(table=[default_catalog.default_database.another_pk_upsert_snk],
fields=[a, b, c], upsertMaterialize=[true], duplicateChanges=[NONE])
++- Calc(select=[a, b, c], duplicateChanges=[DISALLOW])
+ +- WatermarkAssigner(rowtime=[rt], watermark=[-(rt, 1000:INTERVAL SECOND)],
duplicateChanges=[DISALLOW])
+ +- TableSourceScan(table=[[default_catalog, default_database,
retract_src]], fields=[a, b, c, rt], duplicateChanges=[DISALLOW])
]]>
</Resource>
</TestCase>
@@ -794,7 +813,7 @@ Sink(table=[default_catalog.default_database.no_pk_snk],
fields=[a, b, c], dupli
<TestCase name="testTemporalJoin[testSinkWithPk = true]">
<Resource name="ast">
<![CDATA[
-LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c])
+LogicalSink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a,
b, c])
+- LogicalProject(a=[$0], b=[$1], c=[$2])
+- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{0, 4}])
:- LogicalProject(a=[$0], b=[$1], c=[$2], rt=[$3], pt=[PROCTIME()])
@@ -808,7 +827,7 @@
LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c])
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-Sink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c],
duplicateChanges=[NONE])
+Sink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, b, c],
duplicateChanges=[NONE])
+- Calc(select=[a, b, c], duplicateChanges=[ALLOW])
+- TemporalJoin(joinType=[InnerJoin], where=[AND(=(a, a0),
__TEMPORAL_JOIN_CONDITION(pt, __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(a0),
__TEMPORAL_JOIN_LEFT_KEY(a), __TEMPORAL_JOIN_RIGHT_KEY(a0)))], select=[a, b, c,
pt, a0], duplicateChanges=[ALLOW])
:- Exchange(distribution=[hash[a]], duplicateChanges=[DISALLOW])
@@ -850,7 +869,7 @@ Sink(table=[default_catalog.default_database.no_pk_snk],
fields=[a, EXPR$1, EXPR
<TestCase name="testTwoStageWindowAggregate[testSinkWithPk = true]">
<Resource name="ast">
<![CDATA[
-LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a,
EXPR$1, EXPR$2])
+LogicalSink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a,
EXPR$1, EXPR$2])
+- LogicalProject(a=[$0], EXPR$1=[$3], EXPR$2=[$4])
+- LogicalAggregate(group=[{0, 1, 2}], EXPR$1=[MAX($3)], EXPR$2=[MAX($4)])
+- LogicalProject(a=[$0], window_start=[$4], window_end=[$5], b=[$1],
c=[$2])
@@ -862,7 +881,7 @@
LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, EXPR$1,
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-Sink(table=[default_catalog.default_database.pk_snk], fields=[a, EXPR$1,
EXPR$2], duplicateChanges=[NONE])
+Sink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a,
EXPR$1, EXPR$2], duplicateChanges=[NONE])
+- Calc(select=[a, EXPR$1, EXPR$2], duplicateChanges=[ALLOW])
+- GlobalWindowAggregate(groupBy=[a],
window=[TUMBLE(slice_end=[$slice_end], size=[1 min])], select=[a, MAX(max$0) AS
EXPR$1, MAX(max$1) AS EXPR$2, start('w$) AS window_start, end('w$) AS
window_end], duplicateChanges=[ALLOW])
+- Exchange(distribution=[hash[a]], duplicateChanges=[DISALLOW])
@@ -901,7 +920,7 @@ Sink(table=[default_catalog.default_database.no_pk_snk],
fields=[a, b, c], dupli
<TestCase name="testUnion[testSinkWithPk = true]">
<Resource name="ast">
<![CDATA[
-LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c])
+LogicalSink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a,
b, c])
+- LogicalUnion(all=[true])
:- LogicalProject(a=[$0], b=[$1], c=[$2])
: +- LogicalWatermarkAssigner(rowtime=[rt], watermark=[-($3, 1000:INTERVAL
SECOND)])
@@ -913,7 +932,7 @@
LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c])
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-Sink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c],
duplicateChanges=[NONE])
+Sink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, b, c],
duplicateChanges=[NONE])
+- Union(all=[true], union=[a, b, c], duplicateChanges=[ALLOW])
:- Calc(select=[a, b, c], duplicateChanges=[DISALLOW])
: +- WatermarkAssigner(rowtime=[rt], watermark=[-(rt, 1000:INTERVAL
SECOND)], duplicateChanges=[DISALLOW])
@@ -927,7 +946,7 @@ Sink(table=[default_catalog.default_database.pk_snk],
fields=[a, b, c], duplicat
<TestCase name="testWindowTVF[testSinkWithPk = true]">
<Resource name="ast">
<![CDATA[
-LogicalSink(table=[default_catalog.default_database.pk_snk_with_time_col],
fields=[a, b, c, window_start, window_end])
+LogicalSink(table=[default_catalog.default_database.pk_upsert_snk_with_time_col],
fields=[a, b, c, window_start, window_end])
+- LogicalProject(a=[$0], b=[$1], c=[$2], window_start=[$4], window_end=[$5])
+- LogicalTableFunctionScan(invocation=[TUMBLE(TABLE(#0),
DESCRIPTOR(_UTF-16LE'rt'), 60000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER
a, VARCHAR(2147483647) b, BIGINT c, TIMESTAMP(3) *ROWTIME* rt, TIMESTAMP(3)
window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
+- LogicalProject(a=[$0], b=[$1], c=[$2], rt=[$3])
@@ -937,7 +956,7 @@
LogicalSink(table=[default_catalog.default_database.pk_snk_with_time_col], field
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-Sink(table=[default_catalog.default_database.pk_snk_with_time_col], fields=[a,
b, c, window_start, window_end], duplicateChanges=[NONE])
+Sink(table=[default_catalog.default_database.pk_upsert_snk_with_time_col],
fields=[a, b, c, window_start, window_end], duplicateChanges=[NONE])
+- Calc(select=[a, b, c, window_start, window_end], duplicateChanges=[ALLOW])
+- WindowTableFunction(window=[TUMBLE(time_col=[rt], size=[1 min])],
duplicateChanges=[ALLOW])
+- WatermarkAssigner(rowtime=[rt], watermark=[-(rt, 1000:INTERVAL
SECOND)], duplicateChanges=[DISALLOW])
@@ -948,7 +967,7 @@
Sink(table=[default_catalog.default_database.pk_snk_with_time_col], fields=[a, b
<TestCase name="testWindowJoin[testSinkWithPk = true]">
<Resource name="ast">
<![CDATA[
-LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c])
+LogicalSink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a,
b, c])
+- LogicalProject(a=[$0], b=[$8], c=[$2])
+- LogicalJoin(condition=[AND(=($0, $7), =($4, $11), =($5, $12))],
joinType=[inner])
:- LogicalProject(a=[$0], b=[$1], c=[$2], rt=[$3], window_start=[$4],
window_end=[$5], window_time=[$6])
@@ -965,7 +984,7 @@
LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c])
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-Sink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c],
duplicateChanges=[NONE])
+Sink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, b, c],
duplicateChanges=[NONE])
+- Calc(select=[a, b, c], duplicateChanges=[ALLOW])
+- WindowJoin(leftWindow=[TUMBLE(win_start=[window_start],
win_end=[window_end], size=[1 min])],
rightWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[1
min])], joinType=[InnerJoin], where=[=(a, a0)], select=[a, c, window_start,
window_end, a0, b, window_start0, window_end0], duplicateChanges=[ALLOW])
:- Exchange(distribution=[hash[a]], duplicateChanges=[DISALLOW])
@@ -984,7 +1003,7 @@ Sink(table=[default_catalog.default_database.pk_snk],
fields=[a, b, c], duplicat
<TestCase name="testWindowRank[testSinkWithPk = true]">
<Resource name="ast">
<![CDATA[
-LogicalSink(table=[default_catalog.default_database.pk_snk_with_time_col],
fields=[a, b, c, window_start, window_end])
+LogicalSink(table=[default_catalog.default_database.pk_upsert_snk_with_time_col],
fields=[a, b, c, window_start, window_end])
+- LogicalProject(a=[$0], b=[$1], c=[$2], window_start=[$3], window_end=[$4])
+- LogicalFilter(condition=[<=($5, 1)])
+- LogicalProject(a=[$0], b=[$1], c=[$2], window_start=[$4],
window_end=[$5], rank_num=[ROW_NUMBER() OVER (PARTITION BY $0, $4, $5 ORDER BY
$2 DESC NULLS LAST)])
@@ -996,7 +1015,7 @@
LogicalSink(table=[default_catalog.default_database.pk_snk_with_time_col], field
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-Sink(table=[default_catalog.default_database.pk_snk_with_time_col], fields=[a,
b, c, window_start, window_end], duplicateChanges=[NONE])
+Sink(table=[default_catalog.default_database.pk_upsert_snk_with_time_col],
fields=[a, b, c, window_start, window_end], duplicateChanges=[NONE])
+- WindowRank(window=[TUMBLE(win_start=[window_start], win_end=[window_end],
size=[1 min])], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1],
partitionBy=[a], orderBy=[c DESC], select=[a, b, c, window_start, window_end],
duplicateChanges=[ALLOW])
+- Exchange(distribution=[hash[a]], duplicateChanges=[DISALLOW])
+- Calc(select=[a, b, c, window_start, window_end],
duplicateChanges=[DISALLOW])
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.xml
index 201f4ec63bf..7cd943cb20a 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.xml
@@ -16,6 +16,30 @@ See the License for the specific language governing
permissions and
limitations under the License.
-->
<Root>
+ <TestCase name="testAppendSink">
+ <Resource name="sql">
+ <![CDATA[insert into append_snk select * from src1 join src2 on src1.a1
= src2.b1 and src1.a2 = src2.b2]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.append_snk], fields=[a0,
a1, a2, a3, b0, b2, b1])
++- LogicalProject(a0=[$0], a1=[$1], a2=[$2], a3=[$3], b0=[$4], b2=[$5],
b1=[$6])
+ +- LogicalJoin(condition=[AND(=($1, $6), =($2, $5))], joinType=[inner])
+ :- LogicalTableScan(table=[[default_catalog, default_database, src1]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, src2]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Sink(table=[default_catalog.default_database.append_snk], fields=[a0, a1, a2,
a3, b0, b2, b1])
++- Join(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2))], select=[a0,
a1, a2, a3, b0, b2, b1], leftInputSpec=[NoUniqueKey],
rightInputSpec=[NoUniqueKey])
+ :- Exchange(distribution=[hash[a1, a2]])
+ : +- TableSourceScan(table=[[default_catalog, default_database, src1]],
fields=[a0, a1, a2, a3])
+ +- Exchange(distribution=[hash[b1, b2]])
+ +- TableSourceScan(table=[[default_catalog, default_database, src2]],
fields=[b0, b2, b1])
+]]>
+ </Resource>
+ </TestCase>
<TestCase name="testCdcSource">
<Resource name="sql">
<![CDATA[insert into snk select * from src1 join cdc_src on src1.a1 =
cdc_src.b1 and src1.a2 = cdc_src.b2]]>
@@ -349,7 +373,43 @@ Sink(table=[default_catalog.default_database.snk],
fields=[a0, a1, a2, a3, b0, b
]]>
</Resource>
</TestCase>
- <TestCase name="testMultiRootsWithoutReusingDeltaJoin">
+ <TestCase name="testMultiRootsWithoutReusingDeltaJoin1">
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.snk], fields=[a0, a1, a2,
a3, b0, b2, b1])
++- LogicalProject(a0=[$0], a1=[$1], a2=[$2], a3=[$3], b0=[$4], b2=[$5],
b1=[$6])
+ +- LogicalJoin(condition=[AND(=($1, $6), =($2, $5))], joinType=[inner])
+ :- LogicalTableScan(table=[[default_catalog, default_database, src1]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, src2]])
+
+LogicalSink(table=[default_catalog.default_database.snk2], fields=[a0, a1, a2,
a3, b0, b2, b1])
++- LogicalProject(a0=[$0], a1=[$1], a2=[$2], a3=[$3], b0=[$4], b2=[$5],
b1=[$6])
+ +- LogicalJoin(condition=[AND(=($1, $6), =($2, $5))], joinType=[inner])
+ :- LogicalTableScan(table=[[default_catalog, default_database, src1]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, src2]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Exchange(distribution=[hash[a1, a2]])(reuse_id=[1])
++- TableSourceScan(table=[[default_catalog, default_database, src1]],
fields=[a0, a1, a2, a3])
+
+Exchange(distribution=[hash[b1, b2]])(reuse_id=[2])
++- TableSourceScan(table=[[default_catalog, default_database, src2]],
fields=[b0, b2, b1])
+
+Sink(table=[default_catalog.default_database.snk], fields=[a0, a1, a2, a3, b0,
b2, b1])
++- DeltaJoin(joinType=[InnerJoin], where=[((a1 = b1) AND (a2 = b2))],
select=[a0, a1, a2, a3, b0, b2, b1])
+ :- Reused(reference_id=[1])
+ +- Reused(reference_id=[2])
+
+Sink(table=[default_catalog.default_database.snk2], fields=[a0, a1, a2, a3,
b0, b2, b1])
++- Join(joinType=[InnerJoin], where=[((a1 = b1) AND (a2 = b2))], select=[a0,
a1, a2, a3, b0, b2, b1], leftInputSpec=[NoUniqueKey],
rightInputSpec=[NoUniqueKey])
+ :- Reused(reference_id=[1])
+ +- Reused(reference_id=[2])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testMultiRootsWithoutReusingDeltaJoin2">
<Resource name="ast">
<![CDATA[
LogicalSink(table=[default_catalog.default_database.snk], fields=[a0, a1, a2,
a3, b0, b2, b1])
@@ -582,35 +642,6 @@ Sink(table=[default_catalog.default_database.snk],
fields=[a0, a1, a2, a3, b0, b
: +- TableSourceScan(table=[[default_catalog, default_database, src1]],
fields=[a0, a1, a2, a3])
+- Exchange(distribution=[hash[b1, b2]])
+- TableSourceScan(table=[[default_catalog, default_database,
non_index_src]], fields=[b0, b2, b1])
-]]>
- </Resource>
- </TestCase>
- <TestCase name="testWithAggregatingAfterJoin">
- <Resource name="sql">
- <![CDATA[insert into snk select a0, max(a1), max(a2), max(a3), max(b0),
max(b2), b1 from src1 join src2 on src1.a1 = src2.b1 and src1.a2 = src2.b2
group by a0, b1]]>
- </Resource>
- <Resource name="ast">
- <![CDATA[
-LogicalSink(table=[default_catalog.default_database.snk], fields=[a0, EXPR$1,
EXPR$2, EXPR$3, EXPR$4, EXPR$5, b1])
-+- LogicalProject(a0=[$0], EXPR$1=[$2], EXPR$2=[$3], EXPR$3=[$4], EXPR$4=[$5],
EXPR$5=[$6], b1=[$1])
- +- LogicalAggregate(group=[{0, 1}], EXPR$1=[MAX($2)], EXPR$2=[MAX($3)],
EXPR$3=[MAX($4)], EXPR$4=[MAX($5)], EXPR$5=[MAX($6)])
- +- LogicalProject(a0=[$0], b1=[$6], a1=[$1], a2=[$2], a3=[$3], b0=[$4],
b2=[$5])
- +- LogicalJoin(condition=[AND(=($1, $6), =($2, $5))],
joinType=[inner])
- :- LogicalTableScan(table=[[default_catalog, default_database,
src1]])
- +- LogicalTableScan(table=[[default_catalog, default_database,
src2]])
-]]>
- </Resource>
- <Resource name="optimized rel plan">
- <![CDATA[
-Sink(table=[default_catalog.default_database.snk], fields=[a0, EXPR$1, EXPR$2,
EXPR$3, EXPR$4, EXPR$5, b1])
-+- Calc(select=[a0, EXPR$1, EXPR$2, EXPR$3, EXPR$4, EXPR$5, b1])
- +- GroupAggregate(groupBy=[a0, b1], select=[a0, b1, MAX(a1) AS EXPR$1,
MAX(a2) AS EXPR$2, MAX(a3) AS EXPR$3, MAX(b0) AS EXPR$4, MAX(b2) AS EXPR$5])
- +- Exchange(distribution=[hash[a0, b1]])
- +- Join(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2))],
select=[a0, a1, a2, a3, b0, b2, b1], leftInputSpec=[NoUniqueKey],
rightInputSpec=[NoUniqueKey])
- :- Exchange(distribution=[hash[a1, a2]])
- : +- TableSourceScan(table=[[default_catalog, default_database,
src1]], fields=[a0, a1, a2, a3])
- +- Exchange(distribution=[hash[b1, b2]])
- +- TableSourceScan(table=[[default_catalog, default_database,
src2]], fields=[b0, b2, b1])
]]>
</Resource>
</TestCase>
@@ -641,6 +672,59 @@ Sink(table=[default_catalog.default_database.tmp_snk],
fields=[a0, a1, a2, a3, b
: +- TableSourceScan(table=[[default_catalog, default_database,
src2]], fields=[b0, b2, b1])
+- Exchange(distribution=[hash[b1, b2]])
+- TableSourceScan(table=[[default_catalog, default_database, src3]],
fields=[b0, b2, b1])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testRetractSink">
+ <Resource name="sql">
+ <![CDATA[insert into retract_snk select * from src1 join src2 on src1.a1
= src2.b1 and src1.a2 = src2.b2]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.retract_snk], fields=[a0,
a1, a2, a3, b0, b2, b1])
++- LogicalProject(a0=[$0], a1=[$1], a2=[$2], a3=[$3], b0=[$4], b2=[$5],
b1=[$6])
+ +- LogicalJoin(condition=[AND(=($1, $6), =($2, $5))], joinType=[inner])
+ :- LogicalTableScan(table=[[default_catalog, default_database, src1]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, src2]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Sink(table=[default_catalog.default_database.retract_snk], fields=[a0, a1, a2,
a3, b0, b2, b1])
++- Join(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2))], select=[a0,
a1, a2, a3, b0, b2, b1], leftInputSpec=[NoUniqueKey],
rightInputSpec=[NoUniqueKey])
+ :- Exchange(distribution=[hash[a1, a2]])
+ : +- TableSourceScan(table=[[default_catalog, default_database, src1]],
fields=[a0, a1, a2, a3])
+ +- Exchange(distribution=[hash[b1, b2]])
+ +- TableSourceScan(table=[[default_catalog, default_database, src2]],
fields=[b0, b2, b1])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testWithAggregatingAfterJoin">
+ <Resource name="sql">
+ <![CDATA[insert into snk select a0, max(a1), max(a2), max(a3), max(b0),
max(b2), b1 from src1 join src2 on src1.a1 = src2.b1 and src1.a2 = src2.b2
group by a0, b1]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.snk], fields=[a0, EXPR$1,
EXPR$2, EXPR$3, EXPR$4, EXPR$5, b1])
++- LogicalProject(a0=[$0], EXPR$1=[$2], EXPR$2=[$3], EXPR$3=[$4], EXPR$4=[$5],
EXPR$5=[$6], b1=[$1])
+ +- LogicalAggregate(group=[{0, 1}], EXPR$1=[MAX($2)], EXPR$2=[MAX($3)],
EXPR$3=[MAX($4)], EXPR$4=[MAX($5)], EXPR$5=[MAX($6)])
+ +- LogicalProject(a0=[$0], b1=[$6], a1=[$1], a2=[$2], a3=[$3], b0=[$4],
b2=[$5])
+ +- LogicalJoin(condition=[AND(=($1, $6), =($2, $5))],
joinType=[inner])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
src1]])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
src2]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Sink(table=[default_catalog.default_database.snk], fields=[a0, EXPR$1, EXPR$2,
EXPR$3, EXPR$4, EXPR$5, b1])
++- Calc(select=[a0, EXPR$1, EXPR$2, EXPR$3, EXPR$4, EXPR$5, b1])
+ +- GroupAggregate(groupBy=[a0, b1], select=[a0, b1, MAX(a1) AS EXPR$1,
MAX(a2) AS EXPR$2, MAX(a3) AS EXPR$3, MAX(b0) AS EXPR$4, MAX(b2) AS EXPR$5])
+ +- Exchange(distribution=[hash[a0, b1]])
+ +- Join(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2))],
select=[a0, a1, a2, a3, b0, b2, b1], leftInputSpec=[NoUniqueKey],
rightInputSpec=[NoUniqueKey])
+ :- Exchange(distribution=[hash[a1, a2]])
+ : +- TableSourceScan(table=[[default_catalog, default_database,
src1]], fields=[a0, a1, a2, a3])
+ +- Exchange(distribution=[hash[b1, b2]])
+ +- TableSourceScan(table=[[default_catalog, default_database,
src2]], fields=[b0, b2, b1])
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest_jsonplan/testJsonPlanWithTableHints.out
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest_jsonplan/testJsonPlanWithTableHints.out
index 7822ff82dea..a285abe7cad 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest_jsonplan/testJsonPlanWithTableHints.out
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest_jsonplan/testJsonPlanWithTableHints.out
@@ -32,7 +32,7 @@
"bounded" : "false",
"connector" : "values",
"failing-source" : "true",
- "sink-changelog-mode-enforced" : "I,UA,UB,D",
+ "sink-changelog-mode-enforced" : "I,UA,D",
"sink-insert-only" : "false"
}
}
@@ -82,7 +82,7 @@
"bounded" : "false",
"connector" : "values",
"failing-source" : "true",
- "sink-changelog-mode-enforced" : "I,UA,UB,D",
+ "sink-changelog-mode-enforced" : "I,UA,D",
"sink-insert-only" : "false"
}
}
@@ -140,7 +140,7 @@
"bounded" : "false",
"connector" : "values",
"failing-source" : "true",
- "sink-changelog-mode-enforced" : "I,UA,UB,D",
+ "sink-changelog-mode-enforced" : "I,UA,D",
"sink-insert-only" : "false"
}
}
@@ -206,7 +206,7 @@
"bounded" : "false",
"connector" : "values",
"failing-source" : "true",
- "sink-changelog-mode-enforced" : "I,UA,UB,D",
+ "sink-changelog-mode-enforced" : "I,UA,D",
"sink-insert-only" : "false"
}
}
@@ -309,7 +309,7 @@
"async" : "true",
"bounded" : "false",
"connector" : "values",
- "sink-changelog-mode-enforced" : "I,UA,UB,D",
+ "sink-changelog-mode-enforced" : "I,UA,D",
"sink-insert-only" : "false"
}
}
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.scala
index 8c92844dec2..50347c42915 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.scala
@@ -42,7 +42,7 @@ class DeltaJoinTest extends TableTestBase {
options.put("connector", "values")
options.put("bounded", "false")
options.put("sink-insert-only", "false")
- options.put("sink-changelog-mode-enforced", "I,UA,UB,D")
+ options.put("sink-changelog-mode-enforced", "I,UA,D")
options.put("async", "true")
options
}
@@ -254,7 +254,8 @@ class DeltaJoinTest extends TableTestBase {
}
@Test
- def testMultiRootsWithoutReusingDeltaJoin(): Unit = {
+ def testMultiRootsWithoutReusingDeltaJoin1(): Unit = {
+ // one sink has pk but another doesn't
util.tableEnv.executeSql(
"create table snk2 like snk(" +
" EXCLUDING CONSTRAINTS" +
@@ -274,6 +275,31 @@ class DeltaJoinTest extends TableTestBase {
util.verifyExecPlan(stmt)
}
+ @Test
+ def testMultiRootsWithoutReusingDeltaJoin2(): Unit = {
+ // one sink is an upsert sink but another is a retract sink
+ util.tableEnv
+ .executeSql(
+ "CREATE TABLE snk2 WITH (\n"
+ + " 'sink-changelog-mode-enforced' = 'I,UA,UB,D'"
+ + ") LIKE snk (\n"
+ + " OVERWRITING OPTIONS\n"
+ + ")")
+
+ val stmt = tEnv.createStatementSet()
+ stmt.addInsertSql(
+ "insert into snk select * from src1 join src2 " +
+ "on src1.a1 = src2.b1 " +
+ "and src1.a2 = src2.b2")
+
+ stmt.addInsertSql(
+ "insert into snk2 select * from src1 join src2 " +
+ "on src1.a1 = src2.b1 " +
+ "and src1.a2 = src2.b2")
+
+ util.verifyExecPlan(stmt)
+ }
+
@Test
def testExplainPlanAdvice(): Unit = {
util.verifyExplainInsert(
@@ -347,6 +373,10 @@ class DeltaJoinTest extends TableTestBase {
@Test
def testCdcSource(): Unit = {
+ util.tableConfig.set(
+ ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE,
+ UpsertMaterialize.NONE)
+
util.tableEnv.executeSql(
"create table cdc_src with ('changelog-mode' = 'I,UA,UB,D') " +
"like src2 (OVERWRITING OPTIONS)")
@@ -372,6 +402,10 @@ class DeltaJoinTest extends TableTestBase {
@Test
def testWithAggregatingSourceTableBeforeJoin(): Unit = {
+ util.tableConfig.set(
+ ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE,
+ UpsertMaterialize.NONE)
+
util.verifyRelPlanInsert(
"insert into snk select * from ( " +
" select distinct max(a0) as a0, a1, max(a2) as a2, max(a3) as a3
from src1 group by a1" +
@@ -382,6 +416,10 @@ class DeltaJoinTest extends TableTestBase {
@Test
def testWithAggregatingAfterJoin(): Unit = {
+ util.tableConfig.set(
+ ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE,
+ UpsertMaterialize.NONE)
+
util.verifyRelPlanInsert(
"insert into snk " +
"select a0, max(a1), max(a2), max(a3), max(b0), max(b2), b1 from src1
join src2 " +
@@ -545,6 +583,39 @@ class DeltaJoinTest extends TableTestBase {
util.verifyRelPlanInsert("insert into tmp_snk select a0, a1 from src1")
}
+ @Test
+ def testRetractSink(): Unit = {
+ util.tableEnv
+ .executeSql(
+ "CREATE TABLE retract_snk WITH (\n"
+ + " 'sink-changelog-mode-enforced' = 'I,UA,UB,D'"
+ + ") LIKE snk (\n"
+ + " OVERWRITING OPTIONS\n"
+ + ")")
+
+ util.verifyRelPlanInsert(
+ "insert into retract_snk select * from src1 join src2 " +
+ "on src1.a1 = src2.b1 " +
+ "and src1.a2 = src2.b2")
+ }
+
+ @Test
+ def testAppendSink(): Unit = {
+ util.tableEnv
+ .executeSql(
+ "CREATE TABLE append_snk WITH (\n"
+ + " 'sink-insert-only' = 'true',\n"
+ + " 'sink-changelog-mode-enforced' = 'I'\n"
+ + ") LIKE snk (\n"
+ + " OVERWRITING OPTIONS\n"
+ + ")")
+
+ util.verifyRelPlanInsert(
+ "insert into append_snk select * from src1 join src2 " +
+ "on src1.a1 = src2.b1 " +
+ "and src1.a2 = src2.b2")
+ }
+
private def addTable(
tableName: String,
schema: Schema,