This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 78b0a625f49058d0c0213de49d6337a7d4f2ab72
Author: Jim Hughes <jhug...@confluent.io>
AuthorDate: Tue Nov 7 17:14:09 2023 -0500

    [FLINK-33470] Deleting JoinJsonPlanTest.java and JoinJsonPlanITCase.java
    
    This closes #23869
---
 .../plan/nodes/exec/stream/JoinJsonPlanTest.java   | 144 -------
 .../stream/jsonplan/JoinJsonPlanITCase.java        | 168 --------
 .../JoinJsonPlanTest_jsonplan/testInnerJoin.out    | 222 ----------
 .../testInnerJoinWithEqualPk.out                   | 332 ---------------
 .../testInnerJoinWithPk.out                        | 450 ---------------------
 .../testLeftJoinNonEqui.out                        | 266 ------------
 6 files changed, 1582 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest.java
deleted file mode 100644
index dd2771ea7cf..00000000000
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.plan.nodes.exec.stream;
-
-import org.apache.flink.table.api.TableConfig;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.planner.utils.StreamTableTestUtil;
-import org.apache.flink.table.planner.utils.TableTestBase;
-
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-/** Test json serialization/deserialization for join. */
-class JoinJsonPlanTest extends TableTestBase {
-
-    private StreamTableTestUtil util;
-    private TableEnvironment tEnv;
-
-    @BeforeEach
-    void setup() {
-        util = streamTestUtil(TableConfig.getDefault());
-        tEnv = util.getTableEnv();
-
-        String srcTableA =
-                "CREATE TABLE A (\n"
-                        + "  a1 int,\n"
-                        + "  a2 bigint,\n"
-                        + "  a3 bigint\n"
-                        + ") with (\n"
-                        + "  'connector' = 'values',\n"
-                        + "  'bounded' = 'false')";
-        String srcTableB =
-                "CREATE TABLE B (\n"
-                        + "  b1 int,\n"
-                        + "  b2 bigint,\n"
-                        + "  b3 bigint\n"
-                        + ") with (\n"
-                        + "  'connector' = 'values',\n"
-                        + "  'bounded' = 'false')";
-        String srcTableT =
-                "CREATE TABLE t (\n"
-                        + "  a int,\n"
-                        + "  b bigint,\n"
-                        + "  c varchar\n"
-                        + ") with (\n"
-                        + "  'connector' = 'values',\n"
-                        + "  'bounded' = 'false')";
-        String srcTableS =
-                "CREATE TABLE s (\n"
-                        + "  x bigint,\n"
-                        + "  y varchar,\n"
-                        + "  z int\n"
-                        + ") with (\n"
-                        + "  'connector' = 'values',\n"
-                        + "  'bounded' = 'false')";
-        tEnv.executeSql(srcTableA);
-        tEnv.executeSql(srcTableB);
-        tEnv.executeSql(srcTableT);
-        tEnv.executeSql(srcTableS);
-    }
-
-    @Test
-    void testInnerJoin() {
-        String sinkTableDdl =
-                "CREATE TABLE MySink (\n"
-                        + "  a1 int,\n"
-                        + "  b1 int\n"
-                        + ") with (\n"
-                        + "  'connector' = 'values',\n"
-                        + "  'table-sink-class' = 'DEFAULT')";
-        tEnv.executeSql(sinkTableDdl);
-        util.verifyJsonPlan("INSERT INTO MySink SELECT a1, b1 FROM A JOIN B ON 
a1 = b1");
-    }
-
-    @Test
-    void testInnerJoinWithEqualPk() {
-        String query1 = "SELECT SUM(a2) AS a2, a1 FROM A GROUP BY a1";
-        String query2 = "SELECT SUM(b2) AS b2, b1 FROM B GROUP BY b1";
-        String query =
-                String.format("SELECT a1, b1 FROM (%s) JOIN (%s) ON a1 = b1", 
query1, query2);
-        String sinkTableDdl =
-                "CREATE TABLE MySink (\n"
-                        + "  a1 int,\n"
-                        + "  b1 int\n"
-                        + ") with (\n"
-                        + "  'connector' = 'values',\n"
-                        + "  'sink-insert-only' = 'false',\n"
-                        + "  'table-sink-class' = 'DEFAULT')";
-        tEnv.executeSql(sinkTableDdl);
-        util.verifyJsonPlan(String.format("INSERT INTO MySink %s", query));
-    }
-
-    @Test
-    void testInnerJoinWithPk() {
-        String query1 = "SELECT SUM(a2) AS a2, a1 FROM A GROUP BY a1";
-        String query2 = "SELECT SUM(b2) AS b2, b1 FROM B GROUP BY b1";
-        String query =
-                String.format(
-                        "SELECT a1, a2, b1, b2 FROM (%s) JOIN (%s) ON a2 = 
b2", query1, query2);
-        String sinkTableDdl =
-                "CREATE TABLE MySink (\n"
-                        + "  a1 int,\n"
-                        + "  a2 bigint,\n"
-                        + "  b1 int,\n"
-                        + "  b2 bigint\n"
-                        + ") with (\n"
-                        + "  'connector' = 'values',\n"
-                        + "  'sink-insert-only' = 'false',\n"
-                        + "  'table-sink-class' = 'DEFAULT')";
-        tEnv.executeSql(sinkTableDdl);
-        util.verifyJsonPlan(String.format("INSERT INTO MySink %s", query));
-    }
-
-    @Test
-    void testLeftJoinNonEqui() {
-        String sinkTableDdl =
-                "CREATE TABLE MySink (\n"
-                        + "  a1 int,\n"
-                        + "  b1 int\n"
-                        + ") with (\n"
-                        + "  'connector' = 'values',\n"
-                        + "  'sink-insert-only' = 'false',\n"
-                        + "  'table-sink-class' = 'DEFAULT')";
-        tEnv.executeSql(sinkTableDdl);
-        util.verifyJsonPlan(
-                "INSERT INTO MySink SELECT a1, b1 FROM A LEFT JOIN B ON a1 = 
b1 AND a2 > b2");
-    }
-}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/JoinJsonPlanITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/JoinJsonPlanITCase.java
deleted file mode 100644
index 539cfc401e8..00000000000
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/JoinJsonPlanITCase.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.runtime.stream.jsonplan;
-
-import org.apache.flink.table.planner.factories.TestValuesTableFactory;
-import org.apache.flink.table.planner.runtime.utils.TestData;
-import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
-import org.apache.flink.table.planner.utils.JsonPlanTestBase;
-
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import java.io.File;
-import java.util.Arrays;
-import java.util.List;
-
-/** Test for join json plan. */
-class JoinJsonPlanITCase extends JsonPlanTestBase {
-
-    @Override
-    @BeforeEach
-    protected void setup() throws Exception {
-        super.setup();
-        createTestValuesSourceTable(
-                "A",
-                JavaScalaConversionUtil.toJava(TestData.smallData3()),
-                "a1 int",
-                "a2 bigint",
-                "a3 varchar");
-        createTestValuesSourceTable(
-                "B",
-                JavaScalaConversionUtil.toJava(TestData.smallData5()),
-                "b1 int",
-                "b2 bigint",
-                "b3 int",
-                "b4 varchar",
-                "b5 bigint");
-    }
-
-    /** test non-window inner join. * */
-    @Test
-    void testNonWindowInnerJoin() throws Exception {
-        List<String> dataT1 =
-                Arrays.asList(
-                        "1,1,Hi1", "1,2,Hi2", "1,2,Hi2", "1,5,Hi3", "2,7,Hi5", 
"1,9,Hi6", "1,8,Hi8",
-                        "3,8,Hi9");
-        List<String> dataT2 = Arrays.asList("1,1,HiHi", "2,2,HeHe", 
"3,2,HeHe");
-        createTestCsvSourceTable("T1", dataT1, "a int", "b bigint", "c 
varchar");
-        createTestCsvSourceTable("T2", dataT2, "a int", "b bigint", "c 
varchar");
-        File sinkPath = createTestCsvSinkTable("MySink", "a int", "c1 
varchar", "c2 varchar");
-
-        compileSqlAndExecutePlan(
-                        "insert into MySink "
-                                + "SELECT t2.a, t2.c, t1.c\n"
-                                + "FROM (\n"
-                                + " SELECT if(a = 3, cast(null as int), a) as 
a, b, c FROM T1\n"
-                                + ") as t1\n"
-                                + "JOIN (\n"
-                                + " SELECT if(a = 3, cast(null as int), a) as 
a, b, c FROM T2\n"
-                                + ") as t2\n"
-                                + "ON t1.a = t2.a AND t1.b > t2.b")
-                .await();
-        List<String> expected =
-                Arrays.asList(
-                        "1,HiHi,Hi2",
-                        "1,HiHi,Hi2",
-                        "1,HiHi,Hi3",
-                        "1,HiHi,Hi6",
-                        "1,HiHi,Hi8",
-                        "2,HeHe,Hi5");
-        assertResult(expected, sinkPath);
-    }
-
-    @Test
-    void testIsNullInnerJoinWithNullCond() throws Exception {
-        List<String> dataT1 =
-                Arrays.asList(
-                        "1,1,Hi1", "1,2,Hi2", "1,2,Hi2", "1,5,Hi3", "2,7,Hi5", 
"1,9,Hi6", "1,8,Hi8",
-                        "3,8,Hi9");
-        List<String> dataT2 = Arrays.asList("1,1,HiHi", "2,2,HeHe", 
"3,2,HeHe");
-        createTestCsvSourceTable("T1", dataT1, "a int", "b bigint", "c 
varchar");
-        createTestCsvSourceTable("T2", dataT2, "a int", "b bigint", "c 
varchar");
-        createTestValuesSinkTable("MySink", "a int", "c1 varchar", "c2 
varchar");
-
-        compileSqlAndExecutePlan(
-                        "insert into MySink "
-                                + "SELECT t2.a, t2.c, t1.c\n"
-                                + "FROM (\n"
-                                + " SELECT if(a = 3, cast(null as int), a) as 
a, b, c FROM T1\n"
-                                + ") as t1\n"
-                                + "JOIN (\n"
-                                + " SELECT if(a = 3, cast(null as int), a) as 
a, b, c FROM T2\n"
-                                + ") as t2\n"
-                                + "ON \n"
-                                + "  ((t1.a is null AND t2.a is null) OR\n"
-                                + "  (t1.a = t2.a))\n"
-                                + "  AND t1.b > t2.b")
-                .await();
-        List<String> expected =
-                Arrays.asList(
-                        "+I[1, HiHi, Hi2]",
-                        "+I[1, HiHi, Hi2]",
-                        "+I[1, HiHi, Hi3]",
-                        "+I[1, HiHi, Hi6]",
-                        "+I[1, HiHi, Hi8]",
-                        "+I[2, HeHe, Hi5]",
-                        "+I[null, HeHe, Hi9]");
-        assertResult(expected, 
TestValuesTableFactory.getResultsAsStrings("MySink"));
-    }
-
-    @Test
-    void testJoin() throws Exception {
-        createTestValuesSinkTable("MySink", "a3 varchar", "b4 varchar");
-        compileSqlAndExecutePlan("insert into MySink \n" + "SELECT a3, b4 FROM 
A, B WHERE a2 = b2")
-                .await();
-        List<String> expected =
-                Arrays.asList(
-                        "+I[Hello world, Hallo Welt]", "+I[Hello, Hallo 
Welt]", "+I[Hi, Hallo]");
-        assertResult(expected, 
TestValuesTableFactory.getResultsAsStrings("MySink"));
-    }
-
-    @Test
-    void testInnerJoin() throws Exception {
-        createTestValuesSinkTable("MySink", "a1 int", "b1 int");
-        compileSqlAndExecutePlan("insert into MySink \n" + "SELECT a1, b1 FROM 
A JOIN B ON a1 = b1")
-                .await();
-        List<String> expected = Arrays.asList("+I[1, 1]", "+I[2, 2]", "+I[2, 
2]");
-        assertResult(expected, 
TestValuesTableFactory.getResultsAsStrings("MySink"));
-    }
-
-    @Test
-    void testJoinWithFilter() throws Exception {
-        createTestValuesSinkTable("MySink", "a3 varchar", "b4 varchar");
-        compileSqlAndExecutePlan(
-                        "insert into MySink \n"
-                                + "SELECT a3, b4 FROM A, B where a2 = b2 and 
a2 < 2")
-                .await();
-        List<String> expected = Arrays.asList("+I[Hi, Hallo]");
-        assertResult(expected, 
TestValuesTableFactory.getResultsAsStrings("MySink"));
-    }
-
-    @Test
-    void testInnerJoinWithDuplicateKey() throws Exception {
-        createTestValuesSinkTable("MySink", "a1 int", "b1 int", "b3 int");
-        compileSqlAndExecutePlan(
-                        "insert into MySink \n"
-                                + "SELECT a1, b1, b3 FROM A JOIN B ON a1 = b1 
AND a1 = b3")
-                .await();
-        List<String> expected = Arrays.asList("+I[2, 2, 2]");
-        assertResult(expected, 
TestValuesTableFactory.getResultsAsStrings("MySink"));
-    }
-}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoin.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoin.out
deleted file mode 100644
index b260494e3b3..00000000000
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoin.out
+++ /dev/null
@@ -1,222 +0,0 @@
-{
-  "flinkVersion" : "",
-  "nodes" : [ {
-    "id" : 1,
-    "type" : "stream-exec-table-source-scan_1",
-    "scanTableSource" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`A`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "a1",
-              "dataType" : "INT"
-            }, {
-              "name" : "a2",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "a3",
-              "dataType" : "BIGINT"
-            } ],
-            "watermarkSpecs" : [ ]
-          },
-          "partitionKeys" : [ ],
-          "options" : {
-            "bounded" : "false",
-            "connector" : "values"
-          }
-        }
-      },
-      "abilities" : [ {
-        "type" : "ProjectPushDown",
-        "projectedFields" : [ [ 0 ] ],
-        "producedType" : "ROW<`a1` INT> NOT NULL"
-      }, {
-        "type" : "ReadingMetadata",
-        "metadataKeys" : [ ],
-        "producedType" : "ROW<`a1` INT> NOT NULL"
-      } ]
-    },
-    "outputType" : "ROW<`a1` INT>",
-    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, A, project=[a1], metadata=[]]], fields=[a1])",
-    "inputProperties" : [ ]
-  }, {
-    "id" : 2,
-    "type" : "stream-exec-exchange_1",
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "HASH",
-        "keys" : [ 0 ]
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`a1` INT>",
-    "description" : "Exchange(distribution=[hash[a1]])"
-  }, {
-    "id" : 3,
-    "type" : "stream-exec-table-source-scan_1",
-    "scanTableSource" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`B`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "b1",
-              "dataType" : "INT"
-            }, {
-              "name" : "b2",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "b3",
-              "dataType" : "BIGINT"
-            } ],
-            "watermarkSpecs" : [ ]
-          },
-          "partitionKeys" : [ ],
-          "options" : {
-            "bounded" : "false",
-            "connector" : "values"
-          }
-        }
-      },
-      "abilities" : [ {
-        "type" : "ProjectPushDown",
-        "projectedFields" : [ [ 0 ] ],
-        "producedType" : "ROW<`b1` INT> NOT NULL"
-      }, {
-        "type" : "ReadingMetadata",
-        "metadataKeys" : [ ],
-        "producedType" : "ROW<`b1` INT> NOT NULL"
-      } ]
-    },
-    "outputType" : "ROW<`b1` INT>",
-    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, B, project=[b1], metadata=[]]], fields=[b1])",
-    "inputProperties" : [ ]
-  }, {
-    "id" : 4,
-    "type" : "stream-exec-exchange_1",
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "HASH",
-        "keys" : [ 0 ]
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`b1` INT>",
-    "description" : "Exchange(distribution=[hash[b1]])"
-  }, {
-    "id" : 5,
-    "type" : "stream-exec-join_1",
-    "joinSpec" : {
-      "joinType" : "INNER",
-      "leftKeys" : [ 0 ],
-      "rightKeys" : [ 0 ],
-      "filterNulls" : [ true ],
-      "nonEquiCondition" : null
-    },
-    "state" : [ {
-      "index" : 0,
-      "ttl" : "0 ms",
-      "name" : "leftState"
-    }, {
-      "index" : 1,
-      "ttl" : "0 ms",
-      "name" : "rightState"
-    } ],
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    }, {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`a1` INT, `b1` INT>",
-    "description" : "Join(joinType=[InnerJoin], where=[(a1 = b1)], select=[a1, 
b1], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])"
-  }, {
-    "id" : 6,
-    "type" : "stream-exec-sink_1",
-    "configuration" : {
-      "table.exec.sink.keyed-shuffle" : "AUTO",
-      "table.exec.sink.not-null-enforcer" : "ERROR",
-      "table.exec.sink.rowtime-inserter" : "ENABLED",
-      "table.exec.sink.type-length-enforcer" : "IGNORE",
-      "table.exec.sink.upsert-materialize" : "AUTO"
-    },
-    "dynamicTableSink" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MySink`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "a1",
-              "dataType" : "INT"
-            }, {
-              "name" : "b1",
-              "dataType" : "INT"
-            } ],
-            "watermarkSpecs" : [ ]
-          },
-          "partitionKeys" : [ ],
-          "options" : {
-            "connector" : "values",
-            "table-sink-class" : "DEFAULT"
-          }
-        }
-      }
-    },
-    "inputChangelogMode" : [ "INSERT" ],
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`a1` INT, `b1` INT>",
-    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[a1, b1])"
-  } ],
-  "edges" : [ {
-    "source" : 1,
-    "target" : 2,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 3,
-    "target" : 4,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 2,
-    "target" : 5,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 4,
-    "target" : 5,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 5,
-    "target" : 6,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  } ]
-}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoinWithEqualPk.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoinWithEqualPk.out
deleted file mode 100644
index 7c65bb54840..00000000000
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoinWithEqualPk.out
+++ /dev/null
@@ -1,332 +0,0 @@
-{
-  "flinkVersion" : "",
-  "nodes" : [ {
-    "id" : 1,
-    "type" : "stream-exec-table-source-scan_1",
-    "scanTableSource" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`A`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "a1",
-              "dataType" : "INT"
-            }, {
-              "name" : "a2",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "a3",
-              "dataType" : "BIGINT"
-            } ],
-            "watermarkSpecs" : [ ]
-          },
-          "partitionKeys" : [ ],
-          "options" : {
-            "bounded" : "false",
-            "connector" : "values"
-          }
-        }
-      },
-      "abilities" : [ {
-        "type" : "ProjectPushDown",
-        "projectedFields" : [ [ 0 ], [ 1 ] ],
-        "producedType" : "ROW<`a1` INT, `a2` BIGINT> NOT NULL"
-      }, {
-        "type" : "ReadingMetadata",
-        "metadataKeys" : [ ],
-        "producedType" : "ROW<`a1` INT, `a2` BIGINT> NOT NULL"
-      } ]
-    },
-    "outputType" : "ROW<`a1` INT, `a2` BIGINT>",
-    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, A, project=[a1, a2], metadata=[]]], fields=[a1, a2])",
-    "inputProperties" : [ ]
-  }, {
-    "id" : 2,
-    "type" : "stream-exec-exchange_1",
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "HASH",
-        "keys" : [ 0 ]
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`a1` INT, `a2` BIGINT>",
-    "description" : "Exchange(distribution=[hash[a1]])"
-  }, {
-    "id" : 3,
-    "type" : "stream-exec-group-aggregate_1",
-    "configuration" : {
-      "table.exec.mini-batch.enabled" : "false",
-      "table.exec.mini-batch.size" : "-1"
-    },
-    "grouping" : [ 0 ],
-    "aggCalls" : [ ],
-    "aggCallNeedRetractions" : [ ],
-    "generateUpdateBefore" : true,
-    "needRetraction" : false,
-    "state" : [ {
-      "index" : 0,
-      "ttl" : "0 ms",
-      "name" : "groupAggregateState"
-    } ],
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`a1` INT>",
-    "description" : "GroupAggregate(groupBy=[a1], select=[a1])"
-  }, {
-    "id" : 4,
-    "type" : "stream-exec-exchange_1",
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "HASH",
-        "keys" : [ 0 ]
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`a1` INT>",
-    "description" : "Exchange(distribution=[hash[a1]])"
-  }, {
-    "id" : 5,
-    "type" : "stream-exec-table-source-scan_1",
-    "scanTableSource" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`B`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "b1",
-              "dataType" : "INT"
-            }, {
-              "name" : "b2",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "b3",
-              "dataType" : "BIGINT"
-            } ],
-            "watermarkSpecs" : [ ]
-          },
-          "partitionKeys" : [ ],
-          "options" : {
-            "bounded" : "false",
-            "connector" : "values"
-          }
-        }
-      },
-      "abilities" : [ {
-        "type" : "ProjectPushDown",
-        "projectedFields" : [ [ 0 ], [ 1 ] ],
-        "producedType" : "ROW<`b1` INT, `b2` BIGINT> NOT NULL"
-      }, {
-        "type" : "ReadingMetadata",
-        "metadataKeys" : [ ],
-        "producedType" : "ROW<`b1` INT, `b2` BIGINT> NOT NULL"
-      } ]
-    },
-    "outputType" : "ROW<`b1` INT, `b2` BIGINT>",
-    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, B, project=[b1, b2], metadata=[]]], fields=[b1, b2])",
-    "inputProperties" : [ ]
-  }, {
-    "id" : 6,
-    "type" : "stream-exec-exchange_1",
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "HASH",
-        "keys" : [ 0 ]
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`b1` INT, `b2` BIGINT>",
-    "description" : "Exchange(distribution=[hash[b1]])"
-  }, {
-    "id" : 7,
-    "type" : "stream-exec-group-aggregate_1",
-    "configuration" : {
-      "table.exec.mini-batch.enabled" : "false",
-      "table.exec.mini-batch.size" : "-1"
-    },
-    "grouping" : [ 0 ],
-    "aggCalls" : [ ],
-    "aggCallNeedRetractions" : [ ],
-    "generateUpdateBefore" : true,
-    "needRetraction" : false,
-    "state" : [ {
-      "index" : 0,
-      "ttl" : "0 ms",
-      "name" : "groupAggregateState"
-    } ],
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`b1` INT>",
-    "description" : "GroupAggregate(groupBy=[b1], select=[b1])"
-  }, {
-    "id" : 8,
-    "type" : "stream-exec-exchange_1",
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "HASH",
-        "keys" : [ 0 ]
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`b1` INT>",
-    "description" : "Exchange(distribution=[hash[b1]])"
-  }, {
-    "id" : 9,
-    "type" : "stream-exec-join_1",
-    "joinSpec" : {
-      "joinType" : "INNER",
-      "leftKeys" : [ 0 ],
-      "rightKeys" : [ 0 ],
-      "filterNulls" : [ true ],
-      "nonEquiCondition" : null
-    },
-    "leftUpsertKeys" : [ [ 0 ] ],
-    "rightUpsertKeys" : [ [ 0 ] ],
-    "state" : [ {
-      "index" : 0,
-      "ttl" : "0 ms",
-      "name" : "leftState"
-    }, {
-      "index" : 1,
-      "ttl" : "0 ms",
-      "name" : "rightState"
-    } ],
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    }, {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`a1` INT, `b1` INT>",
-    "description" : "Join(joinType=[InnerJoin], where=[(a1 = b1)], select=[a1, 
b1], leftInputSpec=[JoinKeyContainsUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey])"
-  }, {
-    "id" : 10,
-    "type" : "stream-exec-sink_1",
-    "configuration" : {
-      "table.exec.sink.keyed-shuffle" : "AUTO",
-      "table.exec.sink.not-null-enforcer" : "ERROR",
-      "table.exec.sink.rowtime-inserter" : "ENABLED",
-      "table.exec.sink.type-length-enforcer" : "IGNORE",
-      "table.exec.sink.upsert-materialize" : "AUTO"
-    },
-    "dynamicTableSink" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MySink`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "a1",
-              "dataType" : "INT"
-            }, {
-              "name" : "b1",
-              "dataType" : "INT"
-            } ],
-            "watermarkSpecs" : [ ]
-          },
-          "partitionKeys" : [ ],
-          "options" : {
-            "connector" : "values",
-            "sink-insert-only" : "false",
-            "table-sink-class" : "DEFAULT"
-          }
-        }
-      }
-    },
-    "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER" ],
-    "inputUpsertKey" : [ 0 ],
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`a1` INT, `b1` INT>",
-    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[a1, b1])"
-  } ],
-  "edges" : [ {
-    "source" : 1,
-    "target" : 2,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 2,
-    "target" : 3,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 3,
-    "target" : 4,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 5,
-    "target" : 6,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 6,
-    "target" : 7,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 7,
-    "target" : 8,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 4,
-    "target" : 9,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 8,
-    "target" : 9,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 9,
-    "target" : 10,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  } ]
-}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoinWithPk.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoinWithPk.out
deleted file mode 100644
index 5eb471f3a88..00000000000
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoinWithPk.out
+++ /dev/null
@@ -1,450 +0,0 @@
-{
-  "flinkVersion" : "",
-  "nodes" : [ {
-    "id" : 1,
-    "type" : "stream-exec-table-source-scan_1",
-    "scanTableSource" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`A`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "a1",
-              "dataType" : "INT"
-            }, {
-              "name" : "a2",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "a3",
-              "dataType" : "BIGINT"
-            } ],
-            "watermarkSpecs" : [ ]
-          },
-          "partitionKeys" : [ ],
-          "options" : {
-            "bounded" : "false",
-            "connector" : "values"
-          }
-        }
-      },
-      "abilities" : [ {
-        "type" : "ProjectPushDown",
-        "projectedFields" : [ [ 0 ], [ 1 ] ],
-        "producedType" : "ROW<`a1` INT, `a2` BIGINT> NOT NULL"
-      }, {
-        "type" : "ReadingMetadata",
-        "metadataKeys" : [ ],
-        "producedType" : "ROW<`a1` INT, `a2` BIGINT> NOT NULL"
-      } ]
-    },
-    "outputType" : "ROW<`a1` INT, `a2` BIGINT>",
-    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, A, project=[a1, a2], metadata=[]]], fields=[a1, a2])",
-    "inputProperties" : [ ]
-  }, {
-    "id" : 2,
-    "type" : "stream-exec-exchange_1",
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "HASH",
-        "keys" : [ 0 ]
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`a1` INT, `a2` BIGINT>",
-    "description" : "Exchange(distribution=[hash[a1]])"
-  }, {
-    "id" : 3,
-    "type" : "stream-exec-group-aggregate_1",
-    "configuration" : {
-      "table.exec.mini-batch.enabled" : "false",
-      "table.exec.mini-batch.size" : "-1"
-    },
-    "grouping" : [ 0 ],
-    "aggCalls" : [ {
-      "name" : "a2",
-      "internalName" : "$SUM$1",
-      "argList" : [ 1 ],
-      "filterArg" : -1,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "BIGINT"
-    } ],
-    "aggCallNeedRetractions" : [ false ],
-    "generateUpdateBefore" : true,
-    "needRetraction" : false,
-    "state" : [ {
-      "index" : 0,
-      "ttl" : "0 ms",
-      "name" : "groupAggregateState"
-    } ],
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`a1` INT, `a2` BIGINT>",
-    "description" : "GroupAggregate(groupBy=[a1], select=[a1, SUM(a2) AS a2])"
-  }, {
-    "id" : 4,
-    "type" : "stream-exec-calc_1",
-    "projection" : [ {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 1,
-      "type" : "BIGINT"
-    }, {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 0,
-      "type" : "INT"
-    } ],
-    "condition" : null,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`a2` BIGINT, `a1` INT>",
-    "description" : "Calc(select=[a2, a1])"
-  }, {
-    "id" : 5,
-    "type" : "stream-exec-exchange_1",
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "HASH",
-        "keys" : [ 0 ]
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`a2` BIGINT, `a1` INT>",
-    "description" : "Exchange(distribution=[hash[a2]])"
-  }, {
-    "id" : 6,
-    "type" : "stream-exec-table-source-scan_1",
-    "scanTableSource" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`B`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "b1",
-              "dataType" : "INT"
-            }, {
-              "name" : "b2",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "b3",
-              "dataType" : "BIGINT"
-            } ],
-            "watermarkSpecs" : [ ]
-          },
-          "partitionKeys" : [ ],
-          "options" : {
-            "bounded" : "false",
-            "connector" : "values"
-          }
-        }
-      },
-      "abilities" : [ {
-        "type" : "ProjectPushDown",
-        "projectedFields" : [ [ 0 ], [ 1 ] ],
-        "producedType" : "ROW<`b1` INT, `b2` BIGINT> NOT NULL"
-      }, {
-        "type" : "ReadingMetadata",
-        "metadataKeys" : [ ],
-        "producedType" : "ROW<`b1` INT, `b2` BIGINT> NOT NULL"
-      } ]
-    },
-    "outputType" : "ROW<`b1` INT, `b2` BIGINT>",
-    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, B, project=[b1, b2], metadata=[]]], fields=[b1, b2])",
-    "inputProperties" : [ ]
-  }, {
-    "id" : 7,
-    "type" : "stream-exec-exchange_1",
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "HASH",
-        "keys" : [ 0 ]
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`b1` INT, `b2` BIGINT>",
-    "description" : "Exchange(distribution=[hash[b1]])"
-  }, {
-    "id" : 8,
-    "type" : "stream-exec-group-aggregate_1",
-    "configuration" : {
-      "table.exec.mini-batch.enabled" : "false",
-      "table.exec.mini-batch.size" : "-1"
-    },
-    "grouping" : [ 0 ],
-    "aggCalls" : [ {
-      "name" : "b2",
-      "internalName" : "$SUM$1",
-      "argList" : [ 1 ],
-      "filterArg" : -1,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "BIGINT"
-    } ],
-    "aggCallNeedRetractions" : [ false ],
-    "generateUpdateBefore" : true,
-    "needRetraction" : false,
-    "state" : [ {
-      "index" : 0,
-      "ttl" : "0 ms",
-      "name" : "groupAggregateState"
-    } ],
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`b1` INT, `b2` BIGINT>",
-    "description" : "GroupAggregate(groupBy=[b1], select=[b1, SUM(b2) AS b2])"
-  }, {
-    "id" : 9,
-    "type" : "stream-exec-calc_1",
-    "projection" : [ {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 1,
-      "type" : "BIGINT"
-    }, {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 0,
-      "type" : "INT"
-    } ],
-    "condition" : null,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`b2` BIGINT, `b1` INT>",
-    "description" : "Calc(select=[b2, b1])"
-  }, {
-    "id" : 10,
-    "type" : "stream-exec-exchange_1",
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "HASH",
-        "keys" : [ 0 ]
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`b2` BIGINT, `b1` INT>",
-    "description" : "Exchange(distribution=[hash[b2]])"
-  }, {
-    "id" : 11,
-    "type" : "stream-exec-join_1",
-    "joinSpec" : {
-      "joinType" : "INNER",
-      "leftKeys" : [ 0 ],
-      "rightKeys" : [ 0 ],
-      "filterNulls" : [ true ],
-      "nonEquiCondition" : null
-    },
-    "leftUpsertKeys" : [ [ 1 ] ],
-    "rightUpsertKeys" : [ [ 1 ] ],
-    "state" : [ {
-      "index" : 0,
-      "ttl" : "0 ms",
-      "name" : "leftState"
-    }, {
-      "index" : 1,
-      "ttl" : "0 ms",
-      "name" : "rightState"
-    } ],
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    }, {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`a2` BIGINT, `a1` INT, `b2` BIGINT, `b1` INT>",
-    "description" : "Join(joinType=[InnerJoin], where=[(a2 = b2)], select=[a2, 
a1, b2, b1], leftInputSpec=[HasUniqueKey], rightInputSpec=[HasUniqueKey])"
-  }, {
-    "id" : 12,
-    "type" : "stream-exec-calc_1",
-    "projection" : [ {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 1,
-      "type" : "INT"
-    }, {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 0,
-      "type" : "BIGINT"
-    }, {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 3,
-      "type" : "INT"
-    }, {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 2,
-      "type" : "BIGINT"
-    } ],
-    "condition" : null,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`a1` INT, `a2` BIGINT, `b1` INT, `b2` BIGINT>",
-    "description" : "Calc(select=[a1, a2, b1, b2])"
-  }, {
-    "id" : 13,
-    "type" : "stream-exec-sink_1",
-    "configuration" : {
-      "table.exec.sink.keyed-shuffle" : "AUTO",
-      "table.exec.sink.not-null-enforcer" : "ERROR",
-      "table.exec.sink.rowtime-inserter" : "ENABLED",
-      "table.exec.sink.type-length-enforcer" : "IGNORE",
-      "table.exec.sink.upsert-materialize" : "AUTO"
-    },
-    "dynamicTableSink" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MySink`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "a1",
-              "dataType" : "INT"
-            }, {
-              "name" : "a2",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "b1",
-              "dataType" : "INT"
-            }, {
-              "name" : "b2",
-              "dataType" : "BIGINT"
-            } ],
-            "watermarkSpecs" : [ ]
-          },
-          "partitionKeys" : [ ],
-          "options" : {
-            "connector" : "values",
-            "sink-insert-only" : "false",
-            "table-sink-class" : "DEFAULT"
-          }
-        }
-      }
-    },
-    "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER" ],
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`a1` INT, `a2` BIGINT, `b1` INT, `b2` BIGINT>",
-    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[a1, a2, b1, b2])"
-  } ],
-  "edges" : [ {
-    "source" : 1,
-    "target" : 2,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 2,
-    "target" : 3,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 3,
-    "target" : 4,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 4,
-    "target" : 5,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 6,
-    "target" : 7,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 7,
-    "target" : 8,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 8,
-    "target" : 9,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 9,
-    "target" : 10,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 5,
-    "target" : 11,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 10,
-    "target" : 11,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 11,
-    "target" : 12,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 12,
-    "target" : 13,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  } ]
-}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testLeftJoinNonEqui.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testLeftJoinNonEqui.out
deleted file mode 100644
index da25c9e64b0..00000000000
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testLeftJoinNonEqui.out
+++ /dev/null
@@ -1,266 +0,0 @@
-{
-  "flinkVersion" : "",
-  "nodes" : [ {
-    "id" : 1,
-    "type" : "stream-exec-table-source-scan_1",
-    "scanTableSource" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`A`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "a1",
-              "dataType" : "INT"
-            }, {
-              "name" : "a2",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "a3",
-              "dataType" : "BIGINT"
-            } ],
-            "watermarkSpecs" : [ ]
-          },
-          "partitionKeys" : [ ],
-          "options" : {
-            "bounded" : "false",
-            "connector" : "values"
-          }
-        }
-      },
-      "abilities" : [ {
-        "type" : "ProjectPushDown",
-        "projectedFields" : [ [ 0 ], [ 1 ] ],
-        "producedType" : "ROW<`a1` INT, `a2` BIGINT> NOT NULL"
-      }, {
-        "type" : "ReadingMetadata",
-        "metadataKeys" : [ ],
-        "producedType" : "ROW<`a1` INT, `a2` BIGINT> NOT NULL"
-      } ]
-    },
-    "outputType" : "ROW<`a1` INT, `a2` BIGINT>",
-    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, A, project=[a1, a2], metadata=[]]], fields=[a1, a2])",
-    "inputProperties" : [ ]
-  }, {
-    "id" : 2,
-    "type" : "stream-exec-exchange_1",
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "HASH",
-        "keys" : [ 0 ]
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`a1` INT, `a2` BIGINT>",
-    "description" : "Exchange(distribution=[hash[a1]])"
-  }, {
-    "id" : 3,
-    "type" : "stream-exec-table-source-scan_1",
-    "scanTableSource" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`B`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "b1",
-              "dataType" : "INT"
-            }, {
-              "name" : "b2",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "b3",
-              "dataType" : "BIGINT"
-            } ],
-            "watermarkSpecs" : [ ]
-          },
-          "partitionKeys" : [ ],
-          "options" : {
-            "bounded" : "false",
-            "connector" : "values"
-          }
-        }
-      },
-      "abilities" : [ {
-        "type" : "ProjectPushDown",
-        "projectedFields" : [ [ 0 ], [ 1 ] ],
-        "producedType" : "ROW<`b1` INT, `b2` BIGINT> NOT NULL"
-      }, {
-        "type" : "ReadingMetadata",
-        "metadataKeys" : [ ],
-        "producedType" : "ROW<`b1` INT, `b2` BIGINT> NOT NULL"
-      } ]
-    },
-    "outputType" : "ROW<`b1` INT, `b2` BIGINT>",
-    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, B, project=[b1, b2], metadata=[]]], fields=[b1, b2])",
-    "inputProperties" : [ ]
-  }, {
-    "id" : 4,
-    "type" : "stream-exec-exchange_1",
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "HASH",
-        "keys" : [ 0 ]
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`b1` INT, `b2` BIGINT>",
-    "description" : "Exchange(distribution=[hash[b1]])"
-  }, {
-    "id" : 5,
-    "type" : "stream-exec-join_1",
-    "joinSpec" : {
-      "joinType" : "LEFT",
-      "leftKeys" : [ 0 ],
-      "rightKeys" : [ 0 ],
-      "filterNulls" : [ true ],
-      "nonEquiCondition" : {
-        "kind" : "CALL",
-        "syntax" : "BINARY",
-        "internalName" : "$>$1",
-        "operands" : [ {
-          "kind" : "INPUT_REF",
-          "inputIndex" : 1,
-          "type" : "BIGINT"
-        }, {
-          "kind" : "INPUT_REF",
-          "inputIndex" : 3,
-          "type" : "BIGINT"
-        } ],
-        "type" : "BOOLEAN"
-      }
-    },
-    "state" : [ {
-      "index" : 0,
-      "ttl" : "0 ms",
-      "name" : "leftState"
-    }, {
-      "index" : 1,
-      "ttl" : "0 ms",
-      "name" : "rightState"
-    } ],
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    }, {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`a1` INT, `a2` BIGINT, `b1` INT, `b2` BIGINT>",
-    "description" : "Join(joinType=[LeftOuterJoin], where=[((a1 = b1) AND (a2 
> b2))], select=[a1, a2, b1, b2], leftInputSpec=[NoUniqueKey], 
rightInputSpec=[NoUniqueKey])"
-  }, {
-    "id" : 6,
-    "type" : "stream-exec-calc_1",
-    "projection" : [ {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 0,
-      "type" : "INT"
-    }, {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 2,
-      "type" : "INT"
-    } ],
-    "condition" : null,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`a1` INT, `b1` INT>",
-    "description" : "Calc(select=[a1, b1])"
-  }, {
-    "id" : 7,
-    "type" : "stream-exec-sink_1",
-    "configuration" : {
-      "table.exec.sink.keyed-shuffle" : "AUTO",
-      "table.exec.sink.not-null-enforcer" : "ERROR",
-      "table.exec.sink.rowtime-inserter" : "ENABLED",
-      "table.exec.sink.type-length-enforcer" : "IGNORE",
-      "table.exec.sink.upsert-materialize" : "AUTO"
-    },
-    "dynamicTableSink" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MySink`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "a1",
-              "dataType" : "INT"
-            }, {
-              "name" : "b1",
-              "dataType" : "INT"
-            } ],
-            "watermarkSpecs" : [ ]
-          },
-          "partitionKeys" : [ ],
-          "options" : {
-            "connector" : "values",
-            "sink-insert-only" : "false",
-            "table-sink-class" : "DEFAULT"
-          }
-        }
-      }
-    },
-    "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", 
"DELETE" ],
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`a1` INT, `b1` INT>",
-    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[a1, b1])"
-  } ],
-  "edges" : [ {
-    "source" : 1,
-    "target" : 2,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 3,
-    "target" : 4,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 2,
-    "target" : 5,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 4,
-    "target" : 5,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 5,
-    "target" : 6,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 6,
-    "target" : 7,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  } ]
-}

Reply via email to