This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 6248ceda1d9 [FLINK-37882][table] Add tests for queries where
LITERAL_AGG will appear after upgrade to Calcite 1.35
6248ceda1d9 is described below
commit 6248ceda1d9c311948fa93a25e452b166b4195f7
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Mon Jun 2 12:44:44 2025 +0200
[FLINK-37882][table] Add tests for queries where LITERAL_AGG will appear
after upgrade to Calcite 1.35
---
.../nodes/exec/common/CorrelateTestPrograms.java | 35 +
.../plan/nodes/exec/common/JoinTestPrograms.java | 36 +
.../nodes/exec/stream/CorrelateRestoreTest.java | 3 +-
.../plan/nodes/exec/stream/JoinRestoreTest.java | 3 +-
.../plan/correlate-with-literal-agg.json | 694 +++++++++++
.../correlate-with-literal-agg/savepoint/_metadata | Bin 0 -> 24525 bytes
.../plan/semi-anti-join-with-literal-agg.json | 1217 ++++++++++++++++++++
.../savepoint/_metadata | Bin 0 -> 44588 bytes
8 files changed, 1986 insertions(+), 2 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CorrelateTestPrograms.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CorrelateTestPrograms.java
index 38203f3e384..a41be3af777 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CorrelateTestPrograms.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CorrelateTestPrograms.java
@@ -202,4 +202,39 @@ public class CorrelateTestPrograms {
.runSql(
"INSERT INTO sink_t SELECT (SELECT name, nested
FROM source_t, UNNEST(arr) AS T(nested)) FROM source_t")
.build();
+
+ public static final TableTestProgram CORRELATE_WITH_LITERAL_AGG =
+ TableTestProgram.of(
+ "correlate-with-literal-agg",
+ "validate correlate with literal aggregate
function")
+ .setupTableSource(
+ SourceTestStep.newBuilder("source_t1")
+ .addSchema("a INTEGER", "b BIGINT", "c
STRING")
+ .producedBeforeRestore(Row.of(1, 2L, "3"))
+ .producedAfterRestore(Row.of(2, 3L, "4"))
+ .build())
+ .setupTableSource(
+ SourceTestStep.newBuilder("source_t2")
+ .addSchema("d INTEGER", "e BIGINT", "f
STRING")
+ .producedBeforeRestore(Row.of(1, 2L, "3"))
+ .producedAfterRestore(Row.of(2, 3L, "4"))
+ .build())
+ .setupTableSource(
+ SourceTestStep.newBuilder("source_t3")
+ .addSchema("i INTEGER", "j BIGINT", "k
STRING")
+ .producedBeforeRestore(Row.of(1, 2L, "3"))
+ .producedAfterRestore(Row.of(2, 3L, "4"))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink_t")
+ .addSchema("b BIGINT")
+ .consumedBeforeRestore("+I[2]")
+ .consumedAfterRestore(
+ "+I[3]", "-D[2]", "-D[3]",
"+I[2]", "+I[3]")
+ .build())
+ .runSql(
+ "INSERT INTO sink_t SELECT b FROM source_t1 "
+ + " WHERE (CASE WHEN a IN (SELECT 1 FROM
source_t3) THEN 1 ELSE 2 END) "
+ + " IN (SELECT d FROM source_t2 WHERE
source_t1.c = source_t2.f)")
+ .build();
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/JoinTestPrograms.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/JoinTestPrograms.java
index 1306b449d44..c1024565e0f 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/JoinTestPrograms.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/JoinTestPrograms.java
@@ -39,6 +39,7 @@ public class JoinTestPrograms {
public static final TableTestProgram SEMI_JOIN;
public static final TableTestProgram ANTI_JOIN;
public static final TableTestProgram JOIN_WITH_STATE_TTL_HINT;
+ public static final TableTestProgram SEMI_ANTI_JOIN_WITH_LITERAL_AGG;
static final SourceTestStep EMPLOYEE =
SourceTestStep.newBuilder("EMPLOYEE")
@@ -465,5 +466,40 @@ public class JoinTestPrograms {
"INSERT INTO MySink SELECT /*+
STATE_TTL('v1' = '1d', 'v2' = '4d'), STATE_TTL('v2' = '8d') */deptno,
department_num FROM (%s) v1 JOIN (%s) v2 ON deptno = department_num",
query1, query2))
.build();
+
+ SEMI_ANTI_JOIN_WITH_LITERAL_AGG =
+ TableTestProgram.of("semi-anti-join-with-literal-agg", "join
with literal agg")
+ .setupTableSource(
+ SourceTestStep.newBuilder("source_t1")
+ .addSchema("a INTEGER", "b BIGINT", "c
STRING")
+ .producedBeforeRestore(Row.of(1, 2L,
"3"))
+ .producedAfterRestore(Row.of(12, 34L,
"56"))
+ .build())
+ .setupTableSource(
+ SourceTestStep.newBuilder("source_t2")
+ .addSchema("d INTEGER", "e BIGINT", "f
STRING")
+ .producedBeforeRestore(Row.of(1, 2L,
"3"))
+ .producedAfterRestore(Row.of(11, 22L,
"33"))
+ .build())
+ .setupTableSource(
+ SourceTestStep.newBuilder("source_t3")
+ .addSchema("i INTEGER", "j BIGINT", "k
STRING")
+ .producedBeforeRestore(Row.of(1, 2L,
"3"))
+ .producedAfterRestore(Row.of(111,
222L, "333"))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink_t")
+ .addSchema("b BIGINT")
+ .consumedBeforeRestore("+I[2]")
+ .consumedAfterRestore(
+ "-D[2]", "+I[2]", "+I[34]",
"-D[2]", "-D[34]",
+ "+I[2]", "+I[34]")
+ .build())
+ .runSql(
+ "INSERT INTO sink_t SELECT b FROM source_t1
WHERE"
+ + " (CASE WHEN a NOT IN (SELECT i FROM
source_t3) THEN 1"
+ + " WHEN a NOT IN (SELECT CAST(j AS
INTEGER) FROM source_t3) THEN 2 ELSE 3 END)"
+ + " NOT IN (SELECT d FROM source_t2
WHERE source_t1.c = source_t2.f)")
+ .build();
}
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateRestoreTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateRestoreTest.java
index 02d487a80b5..4ce3c0b1bf7 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateRestoreTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateRestoreTest.java
@@ -39,6 +39,7 @@ public class CorrelateRestoreTest extends RestoreTestBase {
CorrelateTestPrograms.CORRELATE_SYSTEM_FUNC,
CorrelateTestPrograms.CORRELATE_JOIN_FILTER,
CorrelateTestPrograms.CORRELATE_LEFT_JOIN,
- CorrelateTestPrograms.CORRELATE_CROSS_JOIN_UNNEST);
+ CorrelateTestPrograms.CORRELATE_CROSS_JOIN_UNNEST,
+ CorrelateTestPrograms.CORRELATE_WITH_LITERAL_AGG);
}
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinRestoreTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinRestoreTest.java
index 5cea793c91f..4ff137572e1 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinRestoreTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinRestoreTest.java
@@ -48,6 +48,7 @@ public class JoinRestoreTest extends RestoreTestBase {
JoinTestPrograms.RIGHT_JOIN,
JoinTestPrograms.SEMI_JOIN,
JoinTestPrograms.ANTI_JOIN,
- JoinTestPrograms.JOIN_WITH_STATE_TTL_HINT);
+ JoinTestPrograms.JOIN_WITH_STATE_TTL_HINT,
+ JoinTestPrograms.SEMI_ANTI_JOIN_WITH_LITERAL_AGG);
}
}
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-with-literal-agg/plan/correlate-with-literal-agg.json
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-with-literal-agg/plan/correlate-with-literal-agg.json
new file mode 100644
index 00000000000..960d50cfeee
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-with-literal-agg/plan/correlate-with-literal-agg.json
@@ -0,0 +1,694 @@
+{
+ "flinkVersion" : "2.1",
+ "nodes" : [ {
+ "id" : 1,
+ "type" : "stream-exec-table-source-scan_1",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`source_t1`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "INT"
+ }, {
+ "name" : "b",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "c",
+ "dataType" : "VARCHAR(2147483647)"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ }
+ },
+ "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, source_t1]], fields=[a, b, c])",
+ "inputProperties" : [ ]
+ }, {
+ "id" : 2,
+ "type" : "stream-exec-exchange_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "SINGLETON"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+ "description" : "Exchange(distribution=[single])"
+ }, {
+ "id" : 3,
+ "type" : "stream-exec-table-source-scan_1",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`source_t3`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "i",
+ "dataType" : "INT"
+ }, {
+ "name" : "j",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "k",
+ "dataType" : "VARCHAR(2147483647)"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ },
+ "abilities" : [ {
+ "type" : "ProjectPushDown",
+ "projectedFields" : [ [ 0 ], [ 1 ], [ 2 ] ],
+ "producedType" : "ROW<`i` INT, `j` BIGINT, `k` VARCHAR(2147483647)>
NOT NULL"
+ }, {
+ "type" : "ReadingMetadata",
+ "metadataKeys" : [ ],
+ "producedType" : "ROW<`i` INT, `j` BIGINT, `k` VARCHAR(2147483647)>
NOT NULL"
+ } ]
+ },
+ "outputType" : "ROW<`i` INT, `j` BIGINT, `k` VARCHAR(2147483647)>",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, source_t3, project=[i, j, k], metadata=[]]], fields=[i, j,
k])",
+ "inputProperties" : [ ]
+ }, {
+ "id" : 4,
+ "type" : "stream-exec-exchange_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "SINGLETON"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`i` INT, `j` BIGINT, `k` VARCHAR(2147483647)>",
+ "description" : "Exchange(distribution=[single])"
+ }, {
+ "id" : 5,
+ "type" : "stream-exec-group-aggregate_1",
+ "configuration" : {
+ "table.exec.mini-batch.enabled" : "false",
+ "table.exec.mini-batch.size" : "-1"
+ },
+ "grouping" : [ ],
+ "aggCalls" : [ {
+ "name" : "c",
+ "syntax" : "FUNCTION_STAR",
+ "internalName" : "$COUNT$1",
+ "argList" : [ ],
+ "filterArg" : -1,
+ "distinct" : false,
+ "approximate" : false,
+ "ignoreNulls" : false,
+ "type" : "BIGINT NOT NULL"
+ } ],
+ "aggCallNeedRetractions" : [ false ],
+ "generateUpdateBefore" : true,
+ "needRetraction" : false,
+ "state" : [ {
+ "index" : 0,
+ "ttl" : "0 ms",
+ "name" : "groupAggregateState"
+ } ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`c` BIGINT NOT NULL>",
+ "description" : "GroupAggregate(select=[COUNT(*) AS c])"
+ }, {
+ "id" : 6,
+ "type" : "stream-exec-exchange_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "SINGLETON"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`c` BIGINT NOT NULL>",
+ "description" : "Exchange(distribution=[single])"
+ }, {
+ "id" : 7,
+ "type" : "stream-exec-join_1",
+ "joinSpec" : {
+ "joinType" : "INNER",
+ "leftKeys" : [ ],
+ "rightKeys" : [ ],
+ "filterNulls" : [ ],
+ "nonEquiCondition" : null
+ },
+ "rightUpsertKeys" : [ [ ] ],
+ "state" : [ {
+ "index" : 0,
+ "ttl" : "0 ms",
+ "name" : "leftState"
+ }, {
+ "index" : 1,
+ "ttl" : "0 ms",
+ "name" : "rightState"
+ } ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ }, {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647), `c0`
BIGINT NOT NULL>",
+ "description" : "Join(joinType=[InnerJoin], where=[true], select=[a, b, c,
c0], leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])"
+ }, {
+ "id" : 8,
+ "type" : "stream-exec-exchange_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "SINGLETON"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647), `c0`
BIGINT NOT NULL>",
+ "description" : "Exchange(distribution=[single])"
+ }, {
+ "id" : 9,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "LITERAL",
+ "value" : 1,
+ "type" : "INT NOT NULL"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`EXPR$0` INT NOT NULL>",
+ "description" : "Calc(select=[1 AS EXPR$0])"
+ }, {
+ "id" : 10,
+ "type" : "stream-exec-exchange_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "HASH",
+ "keys" : [ 0 ]
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`EXPR$0` INT NOT NULL>",
+ "description" : "Exchange(distribution=[hash[EXPR$0]])"
+ }, {
+ "id" : 11,
+ "type" : "stream-exec-group-aggregate_1",
+ "configuration" : {
+ "table.exec.mini-batch.enabled" : "false",
+ "table.exec.mini-batch.size" : "-1"
+ },
+ "grouping" : [ 0 ],
+ "aggCalls" : [ ],
+ "aggCallNeedRetractions" : [ ],
+ "generateUpdateBefore" : true,
+ "needRetraction" : false,
+ "state" : [ {
+ "index" : 0,
+ "ttl" : "0 ms",
+ "name" : "groupAggregateState"
+ } ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`EXPR$0` INT NOT NULL>",
+ "description" : "GroupAggregate(groupBy=[EXPR$0], select=[EXPR$0])"
+ }, {
+ "id" : 12,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "LITERAL",
+ "value" : true,
+ "type" : "BOOLEAN NOT NULL"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`i` BOOLEAN NOT NULL>",
+ "description" : "Calc(select=[true AS i])"
+ }, {
+ "id" : 13,
+ "type" : "stream-exec-exchange_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "SINGLETON"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`i` BOOLEAN NOT NULL>",
+ "description" : "Exchange(distribution=[single])"
+ }, {
+ "id" : 14,
+ "type" : "stream-exec-join_1",
+ "joinSpec" : {
+ "joinType" : "LEFT",
+ "leftKeys" : [ ],
+ "rightKeys" : [ ],
+ "filterNulls" : [ ],
+ "nonEquiCondition" : {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$=$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT"
+ }, {
+ "kind" : "LITERAL",
+ "value" : 1,
+ "type" : "INT NOT NULL"
+ } ],
+ "type" : "BOOLEAN"
+ }
+ },
+ "state" : [ {
+ "index" : 0,
+ "ttl" : "0 ms",
+ "name" : "leftState"
+ }, {
+ "index" : 1,
+ "ttl" : "0 ms",
+ "name" : "rightState"
+ } ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ }, {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647), `c0`
BIGINT NOT NULL, `i` BOOLEAN>",
+ "description" : "Join(joinType=[LeftOuterJoin], where=[(a = 1)],
select=[a, b, c, c0, i], leftInputSpec=[NoUniqueKey],
rightInputSpec=[NoUniqueKey])"
+ }, {
+ "id" : 15,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$CASE$1",
+ "operands" : [ {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$AND$1",
+ "operands" : [ {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$<>$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "BIGINT NOT NULL"
+ }, {
+ "kind" : "LITERAL",
+ "value" : 0,
+ "type" : "INT NOT NULL"
+ } ],
+ "type" : "BOOLEAN NOT NULL"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "POSTFIX",
+ "internalName" : "$IS NOT NULL$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 4,
+ "type" : "BOOLEAN"
+ } ],
+ "type" : "BOOLEAN NOT NULL"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "POSTFIX",
+ "internalName" : "$IS NOT NULL$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT"
+ } ],
+ "type" : "BOOLEAN NOT NULL"
+ } ],
+ "type" : "BOOLEAN NOT NULL"
+ }, {
+ "kind" : "LITERAL",
+ "value" : 1,
+ "type" : "INT NOT NULL"
+ }, {
+ "kind" : "LITERAL",
+ "value" : 2,
+ "type" : "INT NOT NULL"
+ } ],
+ "type" : "INT NOT NULL"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`b` BIGINT, `c` VARCHAR(2147483647), `$f3` INT NOT
NULL>",
+ "description" : "Calc(select=[b, c, CASE(((c0 <> 0) AND i IS NOT NULL AND
a IS NOT NULL), 1, 2) AS $f3])"
+ }, {
+ "id" : 16,
+ "type" : "stream-exec-exchange_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "HASH",
+ "keys" : [ 2, 1 ]
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`b` BIGINT, `c` VARCHAR(2147483647), `$f3` INT NOT
NULL>",
+ "description" : "Exchange(distribution=[hash[$f3, c]])"
+ }, {
+ "id" : 17,
+ "type" : "stream-exec-table-source-scan_1",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`source_t2`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "d",
+ "dataType" : "INT"
+ }, {
+ "name" : "e",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "f",
+ "dataType" : "VARCHAR(2147483647)"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ },
+ "abilities" : [ {
+ "type" : "ProjectPushDown",
+ "projectedFields" : [ [ 0 ], [ 2 ] ],
+ "producedType" : "ROW<`d` INT, `f` VARCHAR(2147483647)> NOT NULL"
+ }, {
+ "type" : "ReadingMetadata",
+ "metadataKeys" : [ ],
+ "producedType" : "ROW<`d` INT, `f` VARCHAR(2147483647)> NOT NULL"
+ } ]
+ },
+ "outputType" : "ROW<`d` INT, `f` VARCHAR(2147483647)>",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, source_t2, project=[d, f], metadata=[]]], fields=[d, f])",
+ "inputProperties" : [ ]
+ }, {
+ "id" : 18,
+ "type" : "stream-exec-exchange_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "HASH",
+ "keys" : [ 0, 1 ]
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`d` INT, `f` VARCHAR(2147483647)>",
+ "description" : "Exchange(distribution=[hash[d, f]])"
+ }, {
+ "id" : 19,
+ "type" : "stream-exec-join_1",
+ "joinSpec" : {
+ "joinType" : "SEMI",
+ "leftKeys" : [ 2, 1 ],
+ "rightKeys" : [ 0, 1 ],
+ "filterNulls" : [ true, true ],
+ "nonEquiCondition" : null
+ },
+ "state" : [ {
+ "index" : 0,
+ "ttl" : "0 ms",
+ "name" : "leftState"
+ }, {
+ "index" : 1,
+ "ttl" : "0 ms",
+ "name" : "rightState"
+ } ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ }, {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`b` BIGINT, `c` VARCHAR(2147483647), `$f3` INT NOT
NULL>",
+ "description" : "Join(joinType=[LeftSemiJoin], where=[(($f3 = d) AND (c =
f))], select=[b, c, $f3], leftInputSpec=[NoUniqueKey],
rightInputSpec=[NoUniqueKey])"
+ }, {
+ "id" : 20,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "BIGINT"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`b` BIGINT>",
+ "description" : "Calc(select=[b])"
+ }, {
+ "id" : 21,
+ "type" : "stream-exec-sink_1",
+ "configuration" : {
+ "table.exec.sink.keyed-shuffle" : "AUTO",
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.rowtime-inserter" : "ENABLED",
+ "table.exec.sink.type-length-enforcer" : "IGNORE",
+ "table.exec.sink.upsert-materialize" : "AUTO"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "b",
+ "dataType" : "BIGINT"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ }
+ },
+ "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER",
"DELETE" ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`b` BIGINT>",
+ "description" : "Sink(table=[default_catalog.default_database.sink_t],
fields=[b])"
+ } ],
+ "edges" : [ {
+ "source" : 1,
+ "target" : 2,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 3,
+ "target" : 4,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 4,
+ "target" : 5,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 5,
+ "target" : 6,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 2,
+ "target" : 7,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 6,
+ "target" : 7,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 7,
+ "target" : 8,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 3,
+ "target" : 9,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 9,
+ "target" : 10,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 10,
+ "target" : 11,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 11,
+ "target" : 12,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 12,
+ "target" : 13,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 8,
+ "target" : 14,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 13,
+ "target" : 14,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 14,
+ "target" : 15,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 15,
+ "target" : 16,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 17,
+ "target" : 18,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 16,
+ "target" : 19,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 18,
+ "target" : 19,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 19,
+ "target" : 20,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 20,
+ "target" : 21,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-with-literal-agg/savepoint/_metadata
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-with-literal-agg/savepoint/_metadata
new file mode 100644
index 00000000000..5f1b3b0315d
Binary files /dev/null and
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-with-literal-agg/savepoint/_metadata
differ
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-join_1/semi-anti-join-with-literal-agg/plan/semi-anti-join-with-literal-agg.json
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-join_1/semi-anti-join-with-literal-agg/plan/semi-anti-join-with-literal-agg.json
new file mode 100644
index 00000000000..8a5580a89d7
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-join_1/semi-anti-join-with-literal-agg/plan/semi-anti-join-with-literal-agg.json
@@ -0,0 +1,1217 @@
+{
+ "flinkVersion" : "2.1",
+ "nodes" : [ {
+ "id" : 115,
+ "type" : "stream-exec-table-source-scan_1",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`source_t1`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "INT"
+ }, {
+ "name" : "b",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "c",
+ "dataType" : "VARCHAR(2147483647)"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ }
+ },
+ "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, source_t1]], fields=[a, b, c])",
+ "inputProperties" : [ ]
+ }, {
+ "id" : 116,
+ "type" : "stream-exec-exchange_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "SINGLETON"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+ "description" : "Exchange(distribution=[single])"
+ }, {
+ "id" : 117,
+ "type" : "stream-exec-table-source-scan_1",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`source_t3`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "i",
+ "dataType" : "INT"
+ }, {
+ "name" : "j",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "k",
+ "dataType" : "VARCHAR(2147483647)"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ },
+ "abilities" : [ {
+ "type" : "ProjectPushDown",
+ "projectedFields" : [ [ 0 ], [ 1 ] ],
+ "producedType" : "ROW<`i` INT, `j` BIGINT> NOT NULL"
+ }, {
+ "type" : "ReadingMetadata",
+ "metadataKeys" : [ ],
+ "producedType" : "ROW<`i` INT, `j` BIGINT> NOT NULL"
+ } ]
+ },
+ "outputType" : "ROW<`i` INT, `j` BIGINT>",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, source_t3, project=[i, j], metadata=[]]], fields=[i, j])",
+ "inputProperties" : [ ]
+ }, {
+ "id" : 118,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`i` INT>",
+ "description" : "Calc(select=[i])"
+ }, {
+ "id" : 119,
+ "type" : "stream-exec-exchange_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "SINGLETON"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`i` INT>",
+ "description" : "Exchange(distribution=[single])"
+ }, {
+ "id" : 120,
+ "type" : "stream-exec-group-aggregate_1",
+ "configuration" : {
+ "table.exec.mini-batch.enabled" : "false",
+ "table.exec.mini-batch.size" : "-1"
+ },
+ "grouping" : [ ],
+ "aggCalls" : [ {
+ "name" : "c",
+ "syntax" : "FUNCTION_STAR",
+ "internalName" : "$COUNT$1",
+ "argList" : [ ],
+ "filterArg" : -1,
+ "distinct" : false,
+ "approximate" : false,
+ "ignoreNulls" : false,
+ "type" : "BIGINT NOT NULL"
+ }, {
+ "name" : "ck",
+ "syntax" : "FUNCTION_STAR",
+ "internalName" : "$COUNT$1",
+ "argList" : [ 0 ],
+ "filterArg" : -1,
+ "distinct" : false,
+ "approximate" : false,
+ "ignoreNulls" : false,
+ "type" : "BIGINT NOT NULL"
+ } ],
+ "aggCallNeedRetractions" : [ false, false ],
+ "generateUpdateBefore" : true,
+ "needRetraction" : false,
+ "state" : [ {
+ "index" : 0,
+ "ttl" : "0 ms",
+ "name" : "groupAggregateState"
+ } ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`c` BIGINT NOT NULL, `ck` BIGINT NOT NULL>",
+ "description" : "GroupAggregate(select=[COUNT(*) AS c, COUNT(i) AS ck])"
+ }, {
+ "id" : 121,
+ "type" : "stream-exec-exchange_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "SINGLETON"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`c` BIGINT NOT NULL, `ck` BIGINT NOT NULL>",
+ "description" : "Exchange(distribution=[single])"
+ }, {
+ "id" : 122,
+ "type" : "stream-exec-join_1",
+ "joinSpec" : {
+ "joinType" : "INNER",
+ "leftKeys" : [ ],
+ "rightKeys" : [ ],
+ "filterNulls" : [ ],
+ "nonEquiCondition" : null
+ },
+ "rightUpsertKeys" : [ [ ] ],
+ "state" : [ {
+ "index" : 0,
+ "ttl" : "0 ms",
+ "name" : "leftState"
+ }, {
+ "index" : 1,
+ "ttl" : "0 ms",
+ "name" : "rightState"
+ } ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ }, {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647), `c0`
BIGINT NOT NULL, `ck` BIGINT NOT NULL>",
+ "description" : "Join(joinType=[InnerJoin], where=[true], select=[a, b, c,
c0, ck], leftInputSpec=[NoUniqueKey],
rightInputSpec=[JoinKeyContainsUniqueKey])"
+ }, {
+ "id" : 123,
+ "type" : "stream-exec-exchange_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "HASH",
+ "keys" : [ 0 ]
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647), `c0`
BIGINT NOT NULL, `ck` BIGINT NOT NULL>",
+ "description" : "Exchange(distribution=[hash[a]])"
+ }, {
+ "id" : 124,
+ "type" : "stream-exec-exchange_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "HASH",
+ "keys" : [ 0 ]
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`i` INT>",
+ "description" : "Exchange(distribution=[hash[i]])"
+ }, {
+ "id" : 125,
+ "type" : "stream-exec-group-aggregate_1",
+ "configuration" : {
+ "table.exec.mini-batch.enabled" : "false",
+ "table.exec.mini-batch.size" : "-1"
+ },
+ "grouping" : [ 0 ],
+ "aggCalls" : [ ],
+ "aggCallNeedRetractions" : [ ],
+ "generateUpdateBefore" : true,
+ "needRetraction" : false,
+ "state" : [ {
+ "index" : 0,
+ "ttl" : "0 ms",
+ "name" : "groupAggregateState"
+ } ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`i` INT>",
+ "description" : "GroupAggregate(groupBy=[i], select=[i])"
+ }, {
+ "id" : 126,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT"
+ }, {
+ "kind" : "LITERAL",
+ "value" : true,
+ "type" : "BOOLEAN NOT NULL"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`i` INT, `i0` BOOLEAN NOT NULL>",
+ "description" : "Calc(select=[i, true AS i0])"
+ }, {
+ "id" : 127,
+ "type" : "stream-exec-exchange_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "HASH",
+ "keys" : [ 0 ]
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`i` INT, `i0` BOOLEAN NOT NULL>",
+ "description" : "Exchange(distribution=[hash[i]])"
+ }, {
+ "id" : 128,
+ "type" : "stream-exec-join_1",
+ "joinSpec" : {
+ "joinType" : "LEFT",
+ "leftKeys" : [ 0 ],
+ "rightKeys" : [ 0 ],
+ "filterNulls" : [ true ],
+ "nonEquiCondition" : null
+ },
+ "rightUpsertKeys" : [ [ 0 ] ],
+ "state" : [ {
+ "index" : 0,
+ "ttl" : "0 ms",
+ "name" : "leftState"
+ }, {
+ "index" : 1,
+ "ttl" : "0 ms",
+ "name" : "rightState"
+ } ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ }, {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647), `c0`
BIGINT NOT NULL, `ck` BIGINT NOT NULL, `i` INT, `i0` BOOLEAN>",
+ "description" : "Join(joinType=[LeftOuterJoin], where=[(a = i)],
select=[a, b, c, c0, ck, i, i0], leftInputSpec=[NoUniqueKey],
rightInputSpec=[JoinKeyContainsUniqueKey])"
+ }, {
+ "id" : 129,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "BIGINT NOT NULL"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 4,
+ "type" : "BIGINT NOT NULL"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 6,
+ "type" : "BOOLEAN"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647), `c0`
BIGINT NOT NULL, `ck` BIGINT NOT NULL, `i0` BOOLEAN>",
+ "description" : "Calc(select=[a, b, c, c0, ck, i0])"
+ }, {
+ "id" : 130,
+ "type" : "stream-exec-exchange_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "SINGLETON"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647), `c0`
BIGINT NOT NULL, `ck` BIGINT NOT NULL, `i0` BOOLEAN>",
+ "description" : "Exchange(distribution=[single])"
+ }, {
+ "id" : 131,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$CAST$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "BIGINT"
+ } ],
+ "type" : "INT"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`EXPR$0` INT>",
+ "description" : "Calc(select=[CAST(j AS INTEGER) AS EXPR$0])"
+ }, {
+ "id" : 132,
+ "type" : "stream-exec-exchange_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "SINGLETON"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`EXPR$0` INT>",
+ "description" : "Exchange(distribution=[single])"
+ }, {
+ "id" : 133,
+ "type" : "stream-exec-group-aggregate_1",
+ "configuration" : {
+ "table.exec.mini-batch.enabled" : "false",
+ "table.exec.mini-batch.size" : "-1"
+ },
+ "grouping" : [ ],
+ "aggCalls" : [ {
+ "name" : "c",
+ "syntax" : "FUNCTION_STAR",
+ "internalName" : "$COUNT$1",
+ "argList" : [ ],
+ "filterArg" : -1,
+ "distinct" : false,
+ "approximate" : false,
+ "ignoreNulls" : false,
+ "type" : "BIGINT NOT NULL"
+ }, {
+ "name" : "ck",
+ "syntax" : "FUNCTION_STAR",
+ "internalName" : "$COUNT$1",
+ "argList" : [ 0 ],
+ "filterArg" : -1,
+ "distinct" : false,
+ "approximate" : false,
+ "ignoreNulls" : false,
+ "type" : "BIGINT NOT NULL"
+ } ],
+ "aggCallNeedRetractions" : [ false, false ],
+ "generateUpdateBefore" : true,
+ "needRetraction" : false,
+ "state" : [ {
+ "index" : 0,
+ "ttl" : "0 ms",
+ "name" : "groupAggregateState"
+ } ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`c` BIGINT NOT NULL, `ck` BIGINT NOT NULL>",
+ "description" : "GroupAggregate(select=[COUNT(*) AS c, COUNT(EXPR$0) AS
ck])"
+ }, {
+ "id" : 134,
+ "type" : "stream-exec-exchange_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "SINGLETON"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`c` BIGINT NOT NULL, `ck` BIGINT NOT NULL>",
+ "description" : "Exchange(distribution=[single])"
+ }, {
+ "id" : 135,
+ "type" : "stream-exec-join_1",
+ "joinSpec" : {
+ "joinType" : "INNER",
+ "leftKeys" : [ ],
+ "rightKeys" : [ ],
+ "filterNulls" : [ ],
+ "nonEquiCondition" : null
+ },
+ "rightUpsertKeys" : [ [ ] ],
+ "state" : [ {
+ "index" : 0,
+ "ttl" : "0 ms",
+ "name" : "leftState"
+ }, {
+ "index" : 1,
+ "ttl" : "0 ms",
+ "name" : "rightState"
+ } ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ }, {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647), `c0`
BIGINT NOT NULL, `ck` BIGINT NOT NULL, `i0` BOOLEAN, `c1` BIGINT NOT NULL,
`ck0` BIGINT NOT NULL>",
+ "description" : "Join(joinType=[InnerJoin], where=[true], select=[a, b, c,
c0, ck, i0, c1, ck0], leftInputSpec=[NoUniqueKey],
rightInputSpec=[JoinKeyContainsUniqueKey])"
+ }, {
+ "id" : 136,
+ "type" : "stream-exec-exchange_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "HASH",
+ "keys" : [ 0 ]
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647), `c0`
BIGINT NOT NULL, `ck` BIGINT NOT NULL, `i0` BOOLEAN, `c1` BIGINT NOT NULL,
`ck0` BIGINT NOT NULL>",
+ "description" : "Exchange(distribution=[hash[a]])"
+ }, {
+ "id" : 137,
+ "type" : "stream-exec-exchange_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "HASH",
+ "keys" : [ 0 ]
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`EXPR$0` INT>",
+ "description" : "Exchange(distribution=[hash[EXPR$0]])"
+ }, {
+ "id" : 138,
+ "type" : "stream-exec-group-aggregate_1",
+ "configuration" : {
+ "table.exec.mini-batch.enabled" : "false",
+ "table.exec.mini-batch.size" : "-1"
+ },
+ "grouping" : [ 0 ],
+ "aggCalls" : [ ],
+ "aggCallNeedRetractions" : [ ],
+ "generateUpdateBefore" : true,
+ "needRetraction" : false,
+ "state" : [ {
+ "index" : 0,
+ "ttl" : "0 ms",
+ "name" : "groupAggregateState"
+ } ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`EXPR$0` INT>",
+ "description" : "GroupAggregate(groupBy=[EXPR$0], select=[EXPR$0])"
+ }, {
+ "id" : 139,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT"
+ }, {
+ "kind" : "LITERAL",
+ "value" : true,
+ "type" : "BOOLEAN NOT NULL"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`EXPR$0` INT, `i` BOOLEAN NOT NULL>",
+ "description" : "Calc(select=[EXPR$0, true AS i])"
+ }, {
+ "id" : 140,
+ "type" : "stream-exec-exchange_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "HASH",
+ "keys" : [ 0 ]
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`EXPR$0` INT, `i` BOOLEAN NOT NULL>",
+ "description" : "Exchange(distribution=[hash[EXPR$0]])"
+ }, {
+ "id" : 141,
+ "type" : "stream-exec-join_1",
+ "joinSpec" : {
+ "joinType" : "LEFT",
+ "leftKeys" : [ 0 ],
+ "rightKeys" : [ 0 ],
+ "filterNulls" : [ true ],
+ "nonEquiCondition" : null
+ },
+ "rightUpsertKeys" : [ [ 0 ] ],
+ "state" : [ {
+ "index" : 0,
+ "ttl" : "0 ms",
+ "name" : "leftState"
+ }, {
+ "index" : 1,
+ "ttl" : "0 ms",
+ "name" : "rightState"
+ } ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ }, {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647), `c0`
BIGINT NOT NULL, `ck` BIGINT NOT NULL, `i0` BOOLEAN, `c1` BIGINT NOT NULL,
`ck0` BIGINT NOT NULL, `EXPR$0` INT, `i` BOOLEAN>",
+ "description" : "Join(joinType=[LeftOuterJoin], where=[(a = EXPR$0)],
select=[a, b, c, c0, ck, i0, c1, ck0, EXPR$0, i], leftInputSpec=[NoUniqueKey],
rightInputSpec=[JoinKeyContainsUniqueKey])"
+ }, {
+ "id" : 142,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$CASE$1",
+ "operands" : [ {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$OR$1",
+ "operands" : [ {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$=$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "BIGINT NOT NULL"
+ }, {
+ "kind" : "LITERAL",
+ "value" : 0,
+ "type" : "INT NOT NULL"
+ } ],
+ "type" : "BOOLEAN NOT NULL"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$AND$1",
+ "operands" : [ {
+ "kind" : "CALL",
+ "syntax" : "POSTFIX",
+ "internalName" : "$IS NULL$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 5,
+ "type" : "BOOLEAN"
+ } ],
+ "type" : "BOOLEAN NOT NULL"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$>=$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 4,
+ "type" : "BIGINT NOT NULL"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "BIGINT NOT NULL"
+ } ],
+ "type" : "BOOLEAN NOT NULL"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "POSTFIX",
+ "internalName" : "$IS NOT NULL$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT"
+ } ],
+ "type" : "BOOLEAN NOT NULL"
+ } ],
+ "type" : "BOOLEAN NOT NULL"
+ } ],
+ "type" : "BOOLEAN NOT NULL"
+ }, {
+ "kind" : "LITERAL",
+ "value" : 1,
+ "type" : "INT NOT NULL"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$OR$1",
+ "operands" : [ {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$=$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 6,
+ "type" : "BIGINT NOT NULL"
+ }, {
+ "kind" : "LITERAL",
+ "value" : 0,
+ "type" : "INT NOT NULL"
+ } ],
+ "type" : "BOOLEAN NOT NULL"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$AND$1",
+ "operands" : [ {
+ "kind" : "CALL",
+ "syntax" : "POSTFIX",
+ "internalName" : "$IS NULL$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 9,
+ "type" : "BOOLEAN"
+ } ],
+ "type" : "BOOLEAN NOT NULL"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$>=$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 7,
+ "type" : "BIGINT NOT NULL"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 6,
+ "type" : "BIGINT NOT NULL"
+ } ],
+ "type" : "BOOLEAN NOT NULL"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "POSTFIX",
+ "internalName" : "$IS NOT NULL$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT"
+ } ],
+ "type" : "BOOLEAN NOT NULL"
+ } ],
+ "type" : "BOOLEAN NOT NULL"
+ } ],
+ "type" : "BOOLEAN NOT NULL"
+ }, {
+ "kind" : "LITERAL",
+ "value" : 2,
+ "type" : "INT NOT NULL"
+ }, {
+ "kind" : "LITERAL",
+ "value" : 3,
+ "type" : "INT NOT NULL"
+ } ],
+ "type" : "INT NOT NULL"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`b` BIGINT, `c` VARCHAR(2147483647), `$f3` INT NOT
NULL>",
+ "description" : "Calc(select=[b, c, CASE(((c0 = 0) OR (i0 IS NULL AND (ck
>= c0) AND a IS NOT NULL)), 1, ((c1 = 0) OR (i IS NULL AND (ck0 >= c1) AND a IS
NOT NULL)), 2, 3) AS $f3])"
+ }, {
+ "id" : 143,
+ "type" : "stream-exec-exchange_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "HASH",
+ "keys" : [ 1 ]
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`b` BIGINT, `c` VARCHAR(2147483647), `$f3` INT NOT
NULL>",
+ "description" : "Exchange(distribution=[hash[c]])"
+ }, {
+ "id" : 144,
+ "type" : "stream-exec-table-source-scan_1",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`source_t2`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "d",
+ "dataType" : "INT"
+ }, {
+ "name" : "e",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "f",
+ "dataType" : "VARCHAR(2147483647)"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ },
+ "abilities" : [ {
+ "type" : "ProjectPushDown",
+ "projectedFields" : [ [ 0 ], [ 2 ] ],
+ "producedType" : "ROW<`d` INT, `f` VARCHAR(2147483647)> NOT NULL"
+ }, {
+ "type" : "ReadingMetadata",
+ "metadataKeys" : [ ],
+ "producedType" : "ROW<`d` INT, `f` VARCHAR(2147483647)> NOT NULL"
+ } ]
+ },
+ "outputType" : "ROW<`d` INT, `f` VARCHAR(2147483647)>",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, source_t2, project=[d, f], metadata=[]]], fields=[d, f])",
+ "inputProperties" : [ ]
+ }, {
+ "id" : 145,
+ "type" : "stream-exec-exchange_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "HASH",
+ "keys" : [ 1 ]
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`d` INT, `f` VARCHAR(2147483647)>",
+ "description" : "Exchange(distribution=[hash[f]])"
+ }, {
+ "id" : 146,
+ "type" : "stream-exec-join_1",
+ "joinSpec" : {
+ "joinType" : "ANTI",
+ "leftKeys" : [ 1 ],
+ "rightKeys" : [ 1 ],
+ "filterNulls" : [ true ],
+ "nonEquiCondition" : {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$OR$1",
+ "operands" : [ {
+ "kind" : "CALL",
+ "syntax" : "POSTFIX",
+ "internalName" : "$IS NULL$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "INT"
+ } ],
+ "type" : "BOOLEAN NOT NULL"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$=$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "INT NOT NULL"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "INT"
+ } ],
+ "type" : "BOOLEAN"
+ } ],
+ "type" : "BOOLEAN"
+ }
+ },
+ "state" : [ {
+ "index" : 0,
+ "ttl" : "0 ms",
+ "name" : "leftState"
+ }, {
+ "index" : 1,
+ "ttl" : "0 ms",
+ "name" : "rightState"
+ } ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ }, {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`b` BIGINT, `c` VARCHAR(2147483647), `$f3` INT NOT
NULL>",
+ "description" : "Join(joinType=[LeftAntiJoin], where=[((d IS NULL OR ($f3
= d)) AND (c = f))], select=[b, c, $f3], leftInputSpec=[NoUniqueKey],
rightInputSpec=[NoUniqueKey])"
+ }, {
+ "id" : 147,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "BIGINT"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`b` BIGINT>",
+ "description" : "Calc(select=[b])"
+ }, {
+ "id" : 148,
+ "type" : "stream-exec-sink_1",
+ "configuration" : {
+ "table.exec.sink.keyed-shuffle" : "AUTO",
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.rowtime-inserter" : "ENABLED",
+ "table.exec.sink.type-length-enforcer" : "IGNORE",
+ "table.exec.sink.upsert-materialize" : "AUTO"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "b",
+ "dataType" : "BIGINT"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ }
+ },
+ "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER",
"DELETE" ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`b` BIGINT>",
+ "description" : "Sink(table=[default_catalog.default_database.sink_t],
fields=[b])"
+ } ],
+ "edges" : [ {
+ "source" : 115,
+ "target" : 116,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 117,
+ "target" : 118,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 118,
+ "target" : 119,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 119,
+ "target" : 120,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 120,
+ "target" : 121,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 116,
+ "target" : 122,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 121,
+ "target" : 122,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 122,
+ "target" : 123,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 118,
+ "target" : 124,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 124,
+ "target" : 125,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 125,
+ "target" : 126,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 126,
+ "target" : 127,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 123,
+ "target" : 128,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 127,
+ "target" : 128,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 128,
+ "target" : 129,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 129,
+ "target" : 130,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 117,
+ "target" : 131,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 131,
+ "target" : 132,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 132,
+ "target" : 133,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 133,
+ "target" : 134,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 130,
+ "target" : 135,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 134,
+ "target" : 135,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 135,
+ "target" : 136,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 131,
+ "target" : 137,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 137,
+ "target" : 138,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 138,
+ "target" : 139,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 139,
+ "target" : 140,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 136,
+ "target" : 141,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 140,
+ "target" : 141,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 141,
+ "target" : 142,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 142,
+ "target" : 143,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 144,
+ "target" : 145,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 143,
+ "target" : 146,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 145,
+ "target" : 146,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 146,
+ "target" : 147,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 147,
+ "target" : 148,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-join_1/semi-anti-join-with-literal-agg/savepoint/_metadata
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-join_1/semi-anti-join-with-literal-agg/savepoint/_metadata
new file mode 100644
index 00000000000..ad0b6d005e4
Binary files /dev/null and
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-join_1/semi-anti-join-with-literal-agg/savepoint/_metadata
differ