This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 6248ceda1d9 [FLINK-37882][table] Add tests for queries where 
LITERAL_AGG will appear after upgrade to Calcite 1.35
6248ceda1d9 is described below

commit 6248ceda1d9c311948fa93a25e452b166b4195f7
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Mon Jun 2 12:44:44 2025 +0200

    [FLINK-37882][table] Add tests for queries where LITERAL_AGG will appear 
after upgrade to Calcite 1.35
---
 .../nodes/exec/common/CorrelateTestPrograms.java   |   35 +
 .../plan/nodes/exec/common/JoinTestPrograms.java   |   36 +
 .../nodes/exec/stream/CorrelateRestoreTest.java    |    3 +-
 .../plan/nodes/exec/stream/JoinRestoreTest.java    |    3 +-
 .../plan/correlate-with-literal-agg.json           |  694 +++++++++++
 .../correlate-with-literal-agg/savepoint/_metadata |  Bin 0 -> 24525 bytes
 .../plan/semi-anti-join-with-literal-agg.json      | 1217 ++++++++++++++++++++
 .../savepoint/_metadata                            |  Bin 0 -> 44588 bytes
 8 files changed, 1986 insertions(+), 2 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CorrelateTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CorrelateTestPrograms.java
index 38203f3e384..a41be3af777 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CorrelateTestPrograms.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CorrelateTestPrograms.java
@@ -202,4 +202,39 @@ public class CorrelateTestPrograms {
                     .runSql(
                             "INSERT INTO sink_t SELECT (SELECT name, nested 
FROM source_t, UNNEST(arr) AS T(nested)) FROM source_t")
                     .build();
+
+    public static final TableTestProgram CORRELATE_WITH_LITERAL_AGG =
+            TableTestProgram.of(
+                            "correlate-with-literal-agg",
+                            "validate correlate with literal aggregate 
function")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t1")
+                                    .addSchema("a INTEGER", "b BIGINT", "c 
STRING")
+                                    .producedBeforeRestore(Row.of(1, 2L, "3"))
+                                    .producedAfterRestore(Row.of(2, 3L, "4"))
+                                    .build())
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t2")
+                                    .addSchema("d INTEGER", "e BIGINT", "f 
STRING")
+                                    .producedBeforeRestore(Row.of(1, 2L, "3"))
+                                    .producedAfterRestore(Row.of(2, 3L, "4"))
+                                    .build())
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t3")
+                                    .addSchema("i INTEGER", "j BIGINT", "k 
STRING")
+                                    .producedBeforeRestore(Row.of(1, 2L, "3"))
+                                    .producedAfterRestore(Row.of(2, 3L, "4"))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema("b BIGINT")
+                                    .consumedBeforeRestore("+I[2]")
+                                    .consumedAfterRestore(
+                                            "+I[3]", "-D[2]", "-D[3]", 
"+I[2]", "+I[3]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink_t SELECT b FROM source_t1 "
+                                    + " WHERE (CASE WHEN a IN (SELECT 1 FROM 
source_t3) THEN 1 ELSE 2 END) "
+                                    + " IN (SELECT d FROM source_t2 WHERE 
source_t1.c = source_t2.f)")
+                    .build();
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/JoinTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/JoinTestPrograms.java
index 1306b449d44..c1024565e0f 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/JoinTestPrograms.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/JoinTestPrograms.java
@@ -39,6 +39,7 @@ public class JoinTestPrograms {
     public static final TableTestProgram SEMI_JOIN;
     public static final TableTestProgram ANTI_JOIN;
     public static final TableTestProgram JOIN_WITH_STATE_TTL_HINT;
+    public static final TableTestProgram SEMI_ANTI_JOIN_WITH_LITERAL_AGG;
 
     static final SourceTestStep EMPLOYEE =
             SourceTestStep.newBuilder("EMPLOYEE")
@@ -465,5 +466,40 @@ public class JoinTestPrograms {
                                         "INSERT INTO MySink SELECT /*+ 
STATE_TTL('v1' = '1d', 'v2' = '4d'), STATE_TTL('v2' = '8d') */deptno, 
department_num FROM (%s) v1 JOIN (%s) v2 ON deptno = department_num",
                                         query1, query2))
                         .build();
+
+        SEMI_ANTI_JOIN_WITH_LITERAL_AGG =
+                TableTestProgram.of("semi-anti-join-with-literal-agg", "join 
with literal agg")
+                        .setupTableSource(
+                                SourceTestStep.newBuilder("source_t1")
+                                        .addSchema("a INTEGER", "b BIGINT", "c 
STRING")
+                                        .producedBeforeRestore(Row.of(1, 2L, 
"3"))
+                                        .producedAfterRestore(Row.of(12, 34L, 
"56"))
+                                        .build())
+                        .setupTableSource(
+                                SourceTestStep.newBuilder("source_t2")
+                                        .addSchema("d INTEGER", "e BIGINT", "f 
STRING")
+                                        .producedBeforeRestore(Row.of(1, 2L, 
"3"))
+                                        .producedAfterRestore(Row.of(11, 22L, 
"33"))
+                                        .build())
+                        .setupTableSource(
+                                SourceTestStep.newBuilder("source_t3")
+                                        .addSchema("i INTEGER", "j BIGINT", "k 
STRING")
+                                        .producedBeforeRestore(Row.of(1, 2L, 
"3"))
+                                        .producedAfterRestore(Row.of(111, 
222L, "333"))
+                                        .build())
+                        .setupTableSink(
+                                SinkTestStep.newBuilder("sink_t")
+                                        .addSchema("b BIGINT")
+                                        .consumedBeforeRestore("+I[2]")
+                                        .consumedAfterRestore(
+                                                "-D[2]", "+I[2]", "+I[34]", 
"-D[2]", "-D[34]",
+                                                "+I[2]", "+I[34]")
+                                        .build())
+                        .runSql(
+                                "INSERT INTO sink_t SELECT b FROM source_t1 
WHERE"
+                                        + " (CASE WHEN a NOT IN (SELECT i FROM 
source_t3) THEN 1"
+                                        + " WHEN a NOT IN (SELECT CAST(j AS 
INTEGER) FROM source_t3) THEN 2 ELSE 3 END)"
+                                        + " NOT IN (SELECT d FROM source_t2 
WHERE source_t1.c = source_t2.f)")
+                        .build();
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateRestoreTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateRestoreTest.java
index 02d487a80b5..4ce3c0b1bf7 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateRestoreTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateRestoreTest.java
@@ -39,6 +39,7 @@ public class CorrelateRestoreTest extends RestoreTestBase {
                 CorrelateTestPrograms.CORRELATE_SYSTEM_FUNC,
                 CorrelateTestPrograms.CORRELATE_JOIN_FILTER,
                 CorrelateTestPrograms.CORRELATE_LEFT_JOIN,
-                CorrelateTestPrograms.CORRELATE_CROSS_JOIN_UNNEST);
+                CorrelateTestPrograms.CORRELATE_CROSS_JOIN_UNNEST,
+                CorrelateTestPrograms.CORRELATE_WITH_LITERAL_AGG);
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinRestoreTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinRestoreTest.java
index 5cea793c91f..4ff137572e1 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinRestoreTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinRestoreTest.java
@@ -48,6 +48,7 @@ public class JoinRestoreTest extends RestoreTestBase {
                 JoinTestPrograms.RIGHT_JOIN,
                 JoinTestPrograms.SEMI_JOIN,
                 JoinTestPrograms.ANTI_JOIN,
-                JoinTestPrograms.JOIN_WITH_STATE_TTL_HINT);
+                JoinTestPrograms.JOIN_WITH_STATE_TTL_HINT,
+                JoinTestPrograms.SEMI_ANTI_JOIN_WITH_LITERAL_AGG);
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-with-literal-agg/plan/correlate-with-literal-agg.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-with-literal-agg/plan/correlate-with-literal-agg.json
new file mode 100644
index 00000000000..960d50cfeee
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-with-literal-agg/plan/correlate-with-literal-agg.json
@@ -0,0 +1,694 @@
+{
+  "flinkVersion" : "2.1",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t1`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t1]], fields=[a, b, c])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 2,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "SINGLETON"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "Exchange(distribution=[single])"
+  }, {
+    "id" : 3,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t3`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "i",
+              "dataType" : "INT"
+            }, {
+              "name" : "j",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "k",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "ProjectPushDown",
+        "projectedFields" : [ [ 0 ], [ 1 ], [ 2 ] ],
+        "producedType" : "ROW<`i` INT, `j` BIGINT, `k` VARCHAR(2147483647)> 
NOT NULL"
+      }, {
+        "type" : "ReadingMetadata",
+        "metadataKeys" : [ ],
+        "producedType" : "ROW<`i` INT, `j` BIGINT, `k` VARCHAR(2147483647)> 
NOT NULL"
+      } ]
+    },
+    "outputType" : "ROW<`i` INT, `j` BIGINT, `k` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t3, project=[i, j, k], metadata=[]]], fields=[i, j, 
k])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 4,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "SINGLETON"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`i` INT, `j` BIGINT, `k` VARCHAR(2147483647)>",
+    "description" : "Exchange(distribution=[single])"
+  }, {
+    "id" : 5,
+    "type" : "stream-exec-group-aggregate_1",
+    "configuration" : {
+      "table.exec.mini-batch.enabled" : "false",
+      "table.exec.mini-batch.size" : "-1"
+    },
+    "grouping" : [ ],
+    "aggCalls" : [ {
+      "name" : "c",
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
+      "argList" : [ ],
+      "filterArg" : -1,
+      "distinct" : false,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : "BIGINT NOT NULL"
+    } ],
+    "aggCallNeedRetractions" : [ false ],
+    "generateUpdateBefore" : true,
+    "needRetraction" : false,
+    "state" : [ {
+      "index" : 0,
+      "ttl" : "0 ms",
+      "name" : "groupAggregateState"
+    } ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`c` BIGINT NOT NULL>",
+    "description" : "GroupAggregate(select=[COUNT(*) AS c])"
+  }, {
+    "id" : 6,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "SINGLETON"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`c` BIGINT NOT NULL>",
+    "description" : "Exchange(distribution=[single])"
+  }, {
+    "id" : 7,
+    "type" : "stream-exec-join_1",
+    "joinSpec" : {
+      "joinType" : "INNER",
+      "leftKeys" : [ ],
+      "rightKeys" : [ ],
+      "filterNulls" : [ ],
+      "nonEquiCondition" : null
+    },
+    "rightUpsertKeys" : [ [ ] ],
+    "state" : [ {
+      "index" : 0,
+      "ttl" : "0 ms",
+      "name" : "leftState"
+    }, {
+      "index" : 1,
+      "ttl" : "0 ms",
+      "name" : "rightState"
+    } ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    }, {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647), `c0` 
BIGINT NOT NULL>",
+    "description" : "Join(joinType=[InnerJoin], where=[true], select=[a, b, c, 
c0], leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])"
+  }, {
+    "id" : 8,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "SINGLETON"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647), `c0` 
BIGINT NOT NULL>",
+    "description" : "Exchange(distribution=[single])"
+  }, {
+    "id" : 9,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "LITERAL",
+      "value" : 1,
+      "type" : "INT NOT NULL"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`EXPR$0` INT NOT NULL>",
+    "description" : "Calc(select=[1 AS EXPR$0])"
+  }, {
+    "id" : 10,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`EXPR$0` INT NOT NULL>",
+    "description" : "Exchange(distribution=[hash[EXPR$0]])"
+  }, {
+    "id" : 11,
+    "type" : "stream-exec-group-aggregate_1",
+    "configuration" : {
+      "table.exec.mini-batch.enabled" : "false",
+      "table.exec.mini-batch.size" : "-1"
+    },
+    "grouping" : [ 0 ],
+    "aggCalls" : [ ],
+    "aggCallNeedRetractions" : [ ],
+    "generateUpdateBefore" : true,
+    "needRetraction" : false,
+    "state" : [ {
+      "index" : 0,
+      "ttl" : "0 ms",
+      "name" : "groupAggregateState"
+    } ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`EXPR$0` INT NOT NULL>",
+    "description" : "GroupAggregate(groupBy=[EXPR$0], select=[EXPR$0])"
+  }, {
+    "id" : 12,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "LITERAL",
+      "value" : true,
+      "type" : "BOOLEAN NOT NULL"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`i` BOOLEAN NOT NULL>",
+    "description" : "Calc(select=[true AS i])"
+  }, {
+    "id" : 13,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "SINGLETON"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`i` BOOLEAN NOT NULL>",
+    "description" : "Exchange(distribution=[single])"
+  }, {
+    "id" : 14,
+    "type" : "stream-exec-join_1",
+    "joinSpec" : {
+      "joinType" : "LEFT",
+      "leftKeys" : [ ],
+      "rightKeys" : [ ],
+      "filterNulls" : [ ],
+      "nonEquiCondition" : {
+        "kind" : "CALL",
+        "syntax" : "BINARY",
+        "internalName" : "$=$1",
+        "operands" : [ {
+          "kind" : "INPUT_REF",
+          "inputIndex" : 0,
+          "type" : "INT"
+        }, {
+          "kind" : "LITERAL",
+          "value" : 1,
+          "type" : "INT NOT NULL"
+        } ],
+        "type" : "BOOLEAN"
+      }
+    },
+    "state" : [ {
+      "index" : 0,
+      "ttl" : "0 ms",
+      "name" : "leftState"
+    }, {
+      "index" : 1,
+      "ttl" : "0 ms",
+      "name" : "rightState"
+    } ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    }, {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647), `c0` 
BIGINT NOT NULL, `i` BOOLEAN>",
+    "description" : "Join(joinType=[LeftOuterJoin], where=[(a = 1)], 
select=[a, b, c, c0, i], leftInputSpec=[NoUniqueKey], 
rightInputSpec=[NoUniqueKey])"
+  }, {
+    "id" : 15,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "CALL",
+      "syntax" : "SPECIAL",
+      "internalName" : "$CASE$1",
+      "operands" : [ {
+        "kind" : "CALL",
+        "syntax" : "BINARY",
+        "internalName" : "$AND$1",
+        "operands" : [ {
+          "kind" : "CALL",
+          "syntax" : "BINARY",
+          "internalName" : "$<>$1",
+          "operands" : [ {
+            "kind" : "INPUT_REF",
+            "inputIndex" : 3,
+            "type" : "BIGINT NOT NULL"
+          }, {
+            "kind" : "LITERAL",
+            "value" : 0,
+            "type" : "INT NOT NULL"
+          } ],
+          "type" : "BOOLEAN NOT NULL"
+        }, {
+          "kind" : "CALL",
+          "syntax" : "POSTFIX",
+          "internalName" : "$IS NOT NULL$1",
+          "operands" : [ {
+            "kind" : "INPUT_REF",
+            "inputIndex" : 4,
+            "type" : "BOOLEAN"
+          } ],
+          "type" : "BOOLEAN NOT NULL"
+        }, {
+          "kind" : "CALL",
+          "syntax" : "POSTFIX",
+          "internalName" : "$IS NOT NULL$1",
+          "operands" : [ {
+            "kind" : "INPUT_REF",
+            "inputIndex" : 0,
+            "type" : "INT"
+          } ],
+          "type" : "BOOLEAN NOT NULL"
+        } ],
+        "type" : "BOOLEAN NOT NULL"
+      }, {
+        "kind" : "LITERAL",
+        "value" : 1,
+        "type" : "INT NOT NULL"
+      }, {
+        "kind" : "LITERAL",
+        "value" : 2,
+        "type" : "INT NOT NULL"
+      } ],
+      "type" : "INT NOT NULL"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`b` BIGINT, `c` VARCHAR(2147483647), `$f3` INT NOT 
NULL>",
+    "description" : "Calc(select=[b, c, CASE(((c0 <> 0) AND i IS NOT NULL AND 
a IS NOT NULL), 1, 2) AS $f3])"
+  }, {
+    "id" : 16,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 2, 1 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`b` BIGINT, `c` VARCHAR(2147483647), `$f3` INT NOT 
NULL>",
+    "description" : "Exchange(distribution=[hash[$f3, c]])"
+  }, {
+    "id" : 17,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t2`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "d",
+              "dataType" : "INT"
+            }, {
+              "name" : "e",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "f",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "ProjectPushDown",
+        "projectedFields" : [ [ 0 ], [ 2 ] ],
+        "producedType" : "ROW<`d` INT, `f` VARCHAR(2147483647)> NOT NULL"
+      }, {
+        "type" : "ReadingMetadata",
+        "metadataKeys" : [ ],
+        "producedType" : "ROW<`d` INT, `f` VARCHAR(2147483647)> NOT NULL"
+      } ]
+    },
+    "outputType" : "ROW<`d` INT, `f` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t2, project=[d, f], metadata=[]]], fields=[d, f])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 18,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0, 1 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`d` INT, `f` VARCHAR(2147483647)>",
+    "description" : "Exchange(distribution=[hash[d, f]])"
+  }, {
+    "id" : 19,
+    "type" : "stream-exec-join_1",
+    "joinSpec" : {
+      "joinType" : "SEMI",
+      "leftKeys" : [ 2, 1 ],
+      "rightKeys" : [ 0, 1 ],
+      "filterNulls" : [ true, true ],
+      "nonEquiCondition" : null
+    },
+    "state" : [ {
+      "index" : 0,
+      "ttl" : "0 ms",
+      "name" : "leftState"
+    }, {
+      "index" : 1,
+      "ttl" : "0 ms",
+      "name" : "rightState"
+    } ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    }, {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`b` BIGINT, `c` VARCHAR(2147483647), `$f3` INT NOT 
NULL>",
+    "description" : "Join(joinType=[LeftSemiJoin], where=[(($f3 = d) AND (c = 
f))], select=[b, c, $f3], leftInputSpec=[NoUniqueKey], 
rightInputSpec=[NoUniqueKey])"
+  }, {
+    "id" : 20,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "BIGINT"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`b` BIGINT>",
+    "description" : "Calc(select=[b])"
+  }, {
+    "id" : 21,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", 
"DELETE" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`b` BIGINT>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[b])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 3,
+    "target" : 4,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 4,
+    "target" : 5,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 5,
+    "target" : 6,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 7,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 6,
+    "target" : 7,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 7,
+    "target" : 8,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 3,
+    "target" : 9,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 9,
+    "target" : 10,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 10,
+    "target" : 11,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 11,
+    "target" : 12,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 12,
+    "target" : 13,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 8,
+    "target" : 14,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 13,
+    "target" : 14,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 14,
+    "target" : 15,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 15,
+    "target" : 16,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 17,
+    "target" : 18,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 16,
+    "target" : 19,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 18,
+    "target" : 19,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 19,
+    "target" : 20,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 20,
+    "target" : 21,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-with-literal-agg/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-with-literal-agg/savepoint/_metadata
new file mode 100644
index 00000000000..5f1b3b0315d
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-with-literal-agg/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-join_1/semi-anti-join-with-literal-agg/plan/semi-anti-join-with-literal-agg.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-join_1/semi-anti-join-with-literal-agg/plan/semi-anti-join-with-literal-agg.json
new file mode 100644
index 00000000000..8a5580a89d7
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-join_1/semi-anti-join-with-literal-agg/plan/semi-anti-join-with-literal-agg.json
@@ -0,0 +1,1217 @@
+{
+  "flinkVersion" : "2.1",
+  "nodes" : [ {
+    "id" : 115,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t1`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t1]], fields=[a, b, c])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 116,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "SINGLETON"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "Exchange(distribution=[single])"
+  }, {
+    "id" : 117,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t3`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "i",
+              "dataType" : "INT"
+            }, {
+              "name" : "j",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "k",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "ProjectPushDown",
+        "projectedFields" : [ [ 0 ], [ 1 ] ],
+        "producedType" : "ROW<`i` INT, `j` BIGINT> NOT NULL"
+      }, {
+        "type" : "ReadingMetadata",
+        "metadataKeys" : [ ],
+        "producedType" : "ROW<`i` INT, `j` BIGINT> NOT NULL"
+      } ]
+    },
+    "outputType" : "ROW<`i` INT, `j` BIGINT>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t3, project=[i, j], metadata=[]]], fields=[i, j])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 118,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`i` INT>",
+    "description" : "Calc(select=[i])"
+  }, {
+    "id" : 119,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "SINGLETON"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`i` INT>",
+    "description" : "Exchange(distribution=[single])"
+  }, {
+    "id" : 120,
+    "type" : "stream-exec-group-aggregate_1",
+    "configuration" : {
+      "table.exec.mini-batch.enabled" : "false",
+      "table.exec.mini-batch.size" : "-1"
+    },
+    "grouping" : [ ],
+    "aggCalls" : [ {
+      "name" : "c",
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
+      "argList" : [ ],
+      "filterArg" : -1,
+      "distinct" : false,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : "BIGINT NOT NULL"
+    }, {
+      "name" : "ck",
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
+      "argList" : [ 0 ],
+      "filterArg" : -1,
+      "distinct" : false,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : "BIGINT NOT NULL"
+    } ],
+    "aggCallNeedRetractions" : [ false, false ],
+    "generateUpdateBefore" : true,
+    "needRetraction" : false,
+    "state" : [ {
+      "index" : 0,
+      "ttl" : "0 ms",
+      "name" : "groupAggregateState"
+    } ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`c` BIGINT NOT NULL, `ck` BIGINT NOT NULL>",
+    "description" : "GroupAggregate(select=[COUNT(*) AS c, COUNT(i) AS ck])"
+  }, {
+    "id" : 121,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "SINGLETON"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`c` BIGINT NOT NULL, `ck` BIGINT NOT NULL>",
+    "description" : "Exchange(distribution=[single])"
+  }, {
+    "id" : 122,
+    "type" : "stream-exec-join_1",
+    "joinSpec" : {
+      "joinType" : "INNER",
+      "leftKeys" : [ ],
+      "rightKeys" : [ ],
+      "filterNulls" : [ ],
+      "nonEquiCondition" : null
+    },
+    "rightUpsertKeys" : [ [ ] ],
+    "state" : [ {
+      "index" : 0,
+      "ttl" : "0 ms",
+      "name" : "leftState"
+    }, {
+      "index" : 1,
+      "ttl" : "0 ms",
+      "name" : "rightState"
+    } ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    }, {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647), `c0` 
BIGINT NOT NULL, `ck` BIGINT NOT NULL>",
+    "description" : "Join(joinType=[InnerJoin], where=[true], select=[a, b, c, 
c0, ck], leftInputSpec=[NoUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey])"
+  }, {
+    "id" : 123,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647), `c0` 
BIGINT NOT NULL, `ck` BIGINT NOT NULL>",
+    "description" : "Exchange(distribution=[hash[a]])"
+  }, {
+    "id" : 124,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`i` INT>",
+    "description" : "Exchange(distribution=[hash[i]])"
+  }, {
+    "id" : 125,
+    "type" : "stream-exec-group-aggregate_1",
+    "configuration" : {
+      "table.exec.mini-batch.enabled" : "false",
+      "table.exec.mini-batch.size" : "-1"
+    },
+    "grouping" : [ 0 ],
+    "aggCalls" : [ ],
+    "aggCallNeedRetractions" : [ ],
+    "generateUpdateBefore" : true,
+    "needRetraction" : false,
+    "state" : [ {
+      "index" : 0,
+      "ttl" : "0 ms",
+      "name" : "groupAggregateState"
+    } ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`i` INT>",
+    "description" : "GroupAggregate(groupBy=[i], select=[i])"
+  }, {
+    "id" : 126,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT"
+    }, {
+      "kind" : "LITERAL",
+      "value" : true,
+      "type" : "BOOLEAN NOT NULL"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`i` INT, `i0` BOOLEAN NOT NULL>",
+    "description" : "Calc(select=[i, true AS i0])"
+  }, {
+    "id" : 127,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`i` INT, `i0` BOOLEAN NOT NULL>",
+    "description" : "Exchange(distribution=[hash[i]])"
+  }, {
+    "id" : 128,
+    "type" : "stream-exec-join_1",
+    "joinSpec" : {
+      "joinType" : "LEFT",
+      "leftKeys" : [ 0 ],
+      "rightKeys" : [ 0 ],
+      "filterNulls" : [ true ],
+      "nonEquiCondition" : null
+    },
+    "rightUpsertKeys" : [ [ 0 ] ],
+    "state" : [ {
+      "index" : 0,
+      "ttl" : "0 ms",
+      "name" : "leftState"
+    }, {
+      "index" : 1,
+      "ttl" : "0 ms",
+      "name" : "rightState"
+    } ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    }, {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647), `c0` 
BIGINT NOT NULL, `ck` BIGINT NOT NULL, `i` INT, `i0` BOOLEAN>",
+    "description" : "Join(joinType=[LeftOuterJoin], where=[(a = i)], 
select=[a, b, c, c0, ck, i, i0], leftInputSpec=[NoUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey])"
+  }, {
+    "id" : 129,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 3,
+      "type" : "BIGINT NOT NULL"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 4,
+      "type" : "BIGINT NOT NULL"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 6,
+      "type" : "BOOLEAN"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647), `c0` 
BIGINT NOT NULL, `ck` BIGINT NOT NULL, `i0` BOOLEAN>",
+    "description" : "Calc(select=[a, b, c, c0, ck, i0])"
+  }, {
+    "id" : 130,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "SINGLETON"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647), `c0` 
BIGINT NOT NULL, `ck` BIGINT NOT NULL, `i0` BOOLEAN>",
+    "description" : "Exchange(distribution=[single])"
+  }, {
+    "id" : 131,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "CALL",
+      "syntax" : "SPECIAL",
+      "internalName" : "$CAST$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 1,
+        "type" : "BIGINT"
+      } ],
+      "type" : "INT"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`EXPR$0` INT>",
+    "description" : "Calc(select=[CAST(j AS INTEGER) AS EXPR$0])"
+  }, {
+    "id" : 132,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "SINGLETON"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`EXPR$0` INT>",
+    "description" : "Exchange(distribution=[single])"
+  }, {
+    "id" : 133,
+    "type" : "stream-exec-group-aggregate_1",
+    "configuration" : {
+      "table.exec.mini-batch.enabled" : "false",
+      "table.exec.mini-batch.size" : "-1"
+    },
+    "grouping" : [ ],
+    "aggCalls" : [ {
+      "name" : "c",
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
+      "argList" : [ ],
+      "filterArg" : -1,
+      "distinct" : false,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : "BIGINT NOT NULL"
+    }, {
+      "name" : "ck",
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
+      "argList" : [ 0 ],
+      "filterArg" : -1,
+      "distinct" : false,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : "BIGINT NOT NULL"
+    } ],
+    "aggCallNeedRetractions" : [ false, false ],
+    "generateUpdateBefore" : true,
+    "needRetraction" : false,
+    "state" : [ {
+      "index" : 0,
+      "ttl" : "0 ms",
+      "name" : "groupAggregateState"
+    } ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`c` BIGINT NOT NULL, `ck` BIGINT NOT NULL>",
+    "description" : "GroupAggregate(select=[COUNT(*) AS c, COUNT(EXPR$0) AS 
ck])"
+  }, {
+    "id" : 134,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "SINGLETON"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`c` BIGINT NOT NULL, `ck` BIGINT NOT NULL>",
+    "description" : "Exchange(distribution=[single])"
+  }, {
+    "id" : 135,
+    "type" : "stream-exec-join_1",
+    "joinSpec" : {
+      "joinType" : "INNER",
+      "leftKeys" : [ ],
+      "rightKeys" : [ ],
+      "filterNulls" : [ ],
+      "nonEquiCondition" : null
+    },
+    "rightUpsertKeys" : [ [ ] ],
+    "state" : [ {
+      "index" : 0,
+      "ttl" : "0 ms",
+      "name" : "leftState"
+    }, {
+      "index" : 1,
+      "ttl" : "0 ms",
+      "name" : "rightState"
+    } ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    }, {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647), `c0` 
BIGINT NOT NULL, `ck` BIGINT NOT NULL, `i0` BOOLEAN, `c1` BIGINT NOT NULL, 
`ck0` BIGINT NOT NULL>",
+    "description" : "Join(joinType=[InnerJoin], where=[true], select=[a, b, c, 
c0, ck, i0, c1, ck0], leftInputSpec=[NoUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey])"
+  }, {
+    "id" : 136,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647), `c0` 
BIGINT NOT NULL, `ck` BIGINT NOT NULL, `i0` BOOLEAN, `c1` BIGINT NOT NULL, 
`ck0` BIGINT NOT NULL>",
+    "description" : "Exchange(distribution=[hash[a]])"
+  }, {
+    "id" : 137,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`EXPR$0` INT>",
+    "description" : "Exchange(distribution=[hash[EXPR$0]])"
+  }, {
+    "id" : 138,
+    "type" : "stream-exec-group-aggregate_1",
+    "configuration" : {
+      "table.exec.mini-batch.enabled" : "false",
+      "table.exec.mini-batch.size" : "-1"
+    },
+    "grouping" : [ 0 ],
+    "aggCalls" : [ ],
+    "aggCallNeedRetractions" : [ ],
+    "generateUpdateBefore" : true,
+    "needRetraction" : false,
+    "state" : [ {
+      "index" : 0,
+      "ttl" : "0 ms",
+      "name" : "groupAggregateState"
+    } ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`EXPR$0` INT>",
+    "description" : "GroupAggregate(groupBy=[EXPR$0], select=[EXPR$0])"
+  }, {
+    "id" : 139,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT"
+    }, {
+      "kind" : "LITERAL",
+      "value" : true,
+      "type" : "BOOLEAN NOT NULL"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`EXPR$0` INT, `i` BOOLEAN NOT NULL>",
+    "description" : "Calc(select=[EXPR$0, true AS i])"
+  }, {
+    "id" : 140,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`EXPR$0` INT, `i` BOOLEAN NOT NULL>",
+    "description" : "Exchange(distribution=[hash[EXPR$0]])"
+  }, {
+    "id" : 141,
+    "type" : "stream-exec-join_1",
+    "joinSpec" : {
+      "joinType" : "LEFT",
+      "leftKeys" : [ 0 ],
+      "rightKeys" : [ 0 ],
+      "filterNulls" : [ true ],
+      "nonEquiCondition" : null
+    },
+    "rightUpsertKeys" : [ [ 0 ] ],
+    "state" : [ {
+      "index" : 0,
+      "ttl" : "0 ms",
+      "name" : "leftState"
+    }, {
+      "index" : 1,
+      "ttl" : "0 ms",
+      "name" : "rightState"
+    } ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    }, {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647), `c0` 
BIGINT NOT NULL, `ck` BIGINT NOT NULL, `i0` BOOLEAN, `c1` BIGINT NOT NULL, 
`ck0` BIGINT NOT NULL, `EXPR$0` INT, `i` BOOLEAN>",
+    "description" : "Join(joinType=[LeftOuterJoin], where=[(a = EXPR$0)], 
select=[a, b, c, c0, ck, i0, c1, ck0, EXPR$0, i], leftInputSpec=[NoUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey])"
+  }, {
+    "id" : 142,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "CALL",
+      "syntax" : "SPECIAL",
+      "internalName" : "$CASE$1",
+      "operands" : [ {
+        "kind" : "CALL",
+        "syntax" : "BINARY",
+        "internalName" : "$OR$1",
+        "operands" : [ {
+          "kind" : "CALL",
+          "syntax" : "BINARY",
+          "internalName" : "$=$1",
+          "operands" : [ {
+            "kind" : "INPUT_REF",
+            "inputIndex" : 3,
+            "type" : "BIGINT NOT NULL"
+          }, {
+            "kind" : "LITERAL",
+            "value" : 0,
+            "type" : "INT NOT NULL"
+          } ],
+          "type" : "BOOLEAN NOT NULL"
+        }, {
+          "kind" : "CALL",
+          "syntax" : "BINARY",
+          "internalName" : "$AND$1",
+          "operands" : [ {
+            "kind" : "CALL",
+            "syntax" : "POSTFIX",
+            "internalName" : "$IS NULL$1",
+            "operands" : [ {
+              "kind" : "INPUT_REF",
+              "inputIndex" : 5,
+              "type" : "BOOLEAN"
+            } ],
+            "type" : "BOOLEAN NOT NULL"
+          }, {
+            "kind" : "CALL",
+            "syntax" : "BINARY",
+            "internalName" : "$>=$1",
+            "operands" : [ {
+              "kind" : "INPUT_REF",
+              "inputIndex" : 4,
+              "type" : "BIGINT NOT NULL"
+            }, {
+              "kind" : "INPUT_REF",
+              "inputIndex" : 3,
+              "type" : "BIGINT NOT NULL"
+            } ],
+            "type" : "BOOLEAN NOT NULL"
+          }, {
+            "kind" : "CALL",
+            "syntax" : "POSTFIX",
+            "internalName" : "$IS NOT NULL$1",
+            "operands" : [ {
+              "kind" : "INPUT_REF",
+              "inputIndex" : 0,
+              "type" : "INT"
+            } ],
+            "type" : "BOOLEAN NOT NULL"
+          } ],
+          "type" : "BOOLEAN NOT NULL"
+        } ],
+        "type" : "BOOLEAN NOT NULL"
+      }, {
+        "kind" : "LITERAL",
+        "value" : 1,
+        "type" : "INT NOT NULL"
+      }, {
+        "kind" : "CALL",
+        "syntax" : "BINARY",
+        "internalName" : "$OR$1",
+        "operands" : [ {
+          "kind" : "CALL",
+          "syntax" : "BINARY",
+          "internalName" : "$=$1",
+          "operands" : [ {
+            "kind" : "INPUT_REF",
+            "inputIndex" : 6,
+            "type" : "BIGINT NOT NULL"
+          }, {
+            "kind" : "LITERAL",
+            "value" : 0,
+            "type" : "INT NOT NULL"
+          } ],
+          "type" : "BOOLEAN NOT NULL"
+        }, {
+          "kind" : "CALL",
+          "syntax" : "BINARY",
+          "internalName" : "$AND$1",
+          "operands" : [ {
+            "kind" : "CALL",
+            "syntax" : "POSTFIX",
+            "internalName" : "$IS NULL$1",
+            "operands" : [ {
+              "kind" : "INPUT_REF",
+              "inputIndex" : 9,
+              "type" : "BOOLEAN"
+            } ],
+            "type" : "BOOLEAN NOT NULL"
+          }, {
+            "kind" : "CALL",
+            "syntax" : "BINARY",
+            "internalName" : "$>=$1",
+            "operands" : [ {
+              "kind" : "INPUT_REF",
+              "inputIndex" : 7,
+              "type" : "BIGINT NOT NULL"
+            }, {
+              "kind" : "INPUT_REF",
+              "inputIndex" : 6,
+              "type" : "BIGINT NOT NULL"
+            } ],
+            "type" : "BOOLEAN NOT NULL"
+          }, {
+            "kind" : "CALL",
+            "syntax" : "POSTFIX",
+            "internalName" : "$IS NOT NULL$1",
+            "operands" : [ {
+              "kind" : "INPUT_REF",
+              "inputIndex" : 0,
+              "type" : "INT"
+            } ],
+            "type" : "BOOLEAN NOT NULL"
+          } ],
+          "type" : "BOOLEAN NOT NULL"
+        } ],
+        "type" : "BOOLEAN NOT NULL"
+      }, {
+        "kind" : "LITERAL",
+        "value" : 2,
+        "type" : "INT NOT NULL"
+      }, {
+        "kind" : "LITERAL",
+        "value" : 3,
+        "type" : "INT NOT NULL"
+      } ],
+      "type" : "INT NOT NULL"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`b` BIGINT, `c` VARCHAR(2147483647), `$f3` INT NOT 
NULL>",
+    "description" : "Calc(select=[b, c, CASE(((c0 = 0) OR (i0 IS NULL AND (ck 
>= c0) AND a IS NOT NULL)), 1, ((c1 = 0) OR (i IS NULL AND (ck0 >= c1) AND a IS 
NOT NULL)), 2, 3) AS $f3])"
+  }, {
+    "id" : 143,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 1 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`b` BIGINT, `c` VARCHAR(2147483647), `$f3` INT NOT 
NULL>",
+    "description" : "Exchange(distribution=[hash[c]])"
+  }, {
+    "id" : 144,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t2`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "d",
+              "dataType" : "INT"
+            }, {
+              "name" : "e",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "f",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "ProjectPushDown",
+        "projectedFields" : [ [ 0 ], [ 2 ] ],
+        "producedType" : "ROW<`d` INT, `f` VARCHAR(2147483647)> NOT NULL"
+      }, {
+        "type" : "ReadingMetadata",
+        "metadataKeys" : [ ],
+        "producedType" : "ROW<`d` INT, `f` VARCHAR(2147483647)> NOT NULL"
+      } ]
+    },
+    "outputType" : "ROW<`d` INT, `f` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t2, project=[d, f], metadata=[]]], fields=[d, f])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 145,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 1 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`d` INT, `f` VARCHAR(2147483647)>",
+    "description" : "Exchange(distribution=[hash[f]])"
+  }, {
+    "id" : 146,
+    "type" : "stream-exec-join_1",
+    "joinSpec" : {
+      "joinType" : "ANTI",
+      "leftKeys" : [ 1 ],
+      "rightKeys" : [ 1 ],
+      "filterNulls" : [ true ],
+      "nonEquiCondition" : {
+        "kind" : "CALL",
+        "syntax" : "BINARY",
+        "internalName" : "$OR$1",
+        "operands" : [ {
+          "kind" : "CALL",
+          "syntax" : "POSTFIX",
+          "internalName" : "$IS NULL$1",
+          "operands" : [ {
+            "kind" : "INPUT_REF",
+            "inputIndex" : 3,
+            "type" : "INT"
+          } ],
+          "type" : "BOOLEAN NOT NULL"
+        }, {
+          "kind" : "CALL",
+          "syntax" : "BINARY",
+          "internalName" : "$=$1",
+          "operands" : [ {
+            "kind" : "INPUT_REF",
+            "inputIndex" : 2,
+            "type" : "INT NOT NULL"
+          }, {
+            "kind" : "INPUT_REF",
+            "inputIndex" : 3,
+            "type" : "INT"
+          } ],
+          "type" : "BOOLEAN"
+        } ],
+        "type" : "BOOLEAN"
+      }
+    },
+    "state" : [ {
+      "index" : 0,
+      "ttl" : "0 ms",
+      "name" : "leftState"
+    }, {
+      "index" : 1,
+      "ttl" : "0 ms",
+      "name" : "rightState"
+    } ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    }, {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`b` BIGINT, `c` VARCHAR(2147483647), `$f3` INT NOT 
NULL>",
+    "description" : "Join(joinType=[LeftAntiJoin], where=[((d IS NULL OR ($f3 
= d)) AND (c = f))], select=[b, c, $f3], leftInputSpec=[NoUniqueKey], 
rightInputSpec=[NoUniqueKey])"
+  }, {
+    "id" : 147,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "BIGINT"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`b` BIGINT>",
+    "description" : "Calc(select=[b])"
+  }, {
+    "id" : 148,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", 
"DELETE" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`b` BIGINT>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[b])"
+  } ],
+  "edges" : [ {
+    "source" : 115,
+    "target" : 116,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 117,
+    "target" : 118,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 118,
+    "target" : 119,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 119,
+    "target" : 120,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 120,
+    "target" : 121,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 116,
+    "target" : 122,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 121,
+    "target" : 122,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 122,
+    "target" : 123,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 118,
+    "target" : 124,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 124,
+    "target" : 125,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 125,
+    "target" : 126,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 126,
+    "target" : 127,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 123,
+    "target" : 128,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 127,
+    "target" : 128,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 128,
+    "target" : 129,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 129,
+    "target" : 130,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 117,
+    "target" : 131,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 131,
+    "target" : 132,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 132,
+    "target" : 133,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 133,
+    "target" : 134,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 130,
+    "target" : 135,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 134,
+    "target" : 135,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 135,
+    "target" : 136,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 131,
+    "target" : 137,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 137,
+    "target" : 138,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 138,
+    "target" : 139,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 139,
+    "target" : 140,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 136,
+    "target" : 141,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 140,
+    "target" : 141,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 141,
+    "target" : 142,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 142,
+    "target" : 143,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 144,
+    "target" : 145,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 143,
+    "target" : 146,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 145,
+    "target" : 146,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 146,
+    "target" : 147,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 147,
+    "target" : 148,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-join_1/semi-anti-join-with-literal-agg/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-join_1/semi-anti-join-with-literal-agg/savepoint/_metadata
new file mode 100644
index 00000000000..ad0b6d005e4
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-join_1/semi-anti-join-with-literal-agg/savepoint/_metadata
 differ

Reply via email to