This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 78b0a625f49058d0c0213de49d6337a7d4f2ab72 Author: Jim Hughes <jhug...@confluent.io> AuthorDate: Tue Nov 7 17:14:09 2023 -0500 [FLINK-33470] Deleting JoinJsonPlanTest.java and JoinJsonPlanITCase.java This closes #23869 --- .../plan/nodes/exec/stream/JoinJsonPlanTest.java | 144 ------- .../stream/jsonplan/JoinJsonPlanITCase.java | 168 -------- .../JoinJsonPlanTest_jsonplan/testInnerJoin.out | 222 ---------- .../testInnerJoinWithEqualPk.out | 332 --------------- .../testInnerJoinWithPk.out | 450 --------------------- .../testLeftJoinNonEqui.out | 266 ------------ 6 files changed, 1582 deletions(-) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest.java deleted file mode 100644 index dd2771ea7cf..00000000000 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.plan.nodes.exec.stream; - -import org.apache.flink.table.api.TableConfig; -import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.planner.utils.StreamTableTestUtil; -import org.apache.flink.table.planner.utils.TableTestBase; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -/** Test json serialization/deserialization for join. */ -class JoinJsonPlanTest extends TableTestBase { - - private StreamTableTestUtil util; - private TableEnvironment tEnv; - - @BeforeEach - void setup() { - util = streamTestUtil(TableConfig.getDefault()); - tEnv = util.getTableEnv(); - - String srcTableA = - "CREATE TABLE A (\n" - + " a1 int,\n" - + " a2 bigint,\n" - + " a3 bigint\n" - + ") with (\n" - + " 'connector' = 'values',\n" - + " 'bounded' = 'false')"; - String srcTableB = - "CREATE TABLE B (\n" - + " b1 int,\n" - + " b2 bigint,\n" - + " b3 bigint\n" - + ") with (\n" - + " 'connector' = 'values',\n" - + " 'bounded' = 'false')"; - String srcTableT = - "CREATE TABLE t (\n" - + " a int,\n" - + " b bigint,\n" - + " c varchar\n" - + ") with (\n" - + " 'connector' = 'values',\n" - + " 'bounded' = 'false')"; - String srcTableS = - "CREATE TABLE s (\n" - + " x bigint,\n" - + " y varchar,\n" - + " z int\n" - + ") with (\n" - + " 'connector' = 'values',\n" - + " 'bounded' = 'false')"; - tEnv.executeSql(srcTableA); - tEnv.executeSql(srcTableB); - tEnv.executeSql(srcTableT); - tEnv.executeSql(srcTableS); - } - - @Test - void testInnerJoin() { - String sinkTableDdl = - "CREATE TABLE MySink (\n" - + " a1 int,\n" - + " b1 int\n" - + ") with (\n" - + " 'connector' = 'values',\n" - + " 'table-sink-class' = 'DEFAULT')"; - tEnv.executeSql(sinkTableDdl); - util.verifyJsonPlan("INSERT INTO MySink SELECT a1, b1 FROM A JOIN B ON a1 = b1"); - } - - @Test - void testInnerJoinWithEqualPk() { - String query1 = "SELECT SUM(a2) AS a2, a1 FROM A GROUP BY a1"; - String query2 = "SELECT SUM(b2) AS b2, b1 FROM B GROUP BY b1"; - String query = - String.format("SELECT a1, b1 FROM (%s) JOIN (%s) ON a1 = b1", query1, query2); - String sinkTableDdl = - "CREATE TABLE MySink (\n" - + " a1 int,\n" - + " b1 int\n" - + ") with (\n" - + " 'connector' = 'values',\n" - + " 'sink-insert-only' = 'false',\n" - + " 'table-sink-class' = 'DEFAULT')"; - tEnv.executeSql(sinkTableDdl); - util.verifyJsonPlan(String.format("INSERT INTO MySink %s", query)); - } - - @Test - void testInnerJoinWithPk() { - String query1 = "SELECT SUM(a2) AS a2, a1 FROM A GROUP BY a1"; - String query2 = "SELECT SUM(b2) AS b2, b1 FROM B GROUP BY b1"; - String query = - String.format( - "SELECT a1, a2, b1, b2 FROM (%s) JOIN (%s) ON a2 = b2", query1, query2); - String sinkTableDdl = - "CREATE TABLE MySink (\n" - + " a1 int,\n" - + " a2 bigint,\n" - + " b1 int,\n" - + " b2 bigint\n" - + ") with (\n" - + " 'connector' = 'values',\n" - + " 'sink-insert-only' = 'false',\n" - + " 'table-sink-class' = 'DEFAULT')"; - tEnv.executeSql(sinkTableDdl); - util.verifyJsonPlan(String.format("INSERT INTO MySink %s", query)); - } - - @Test - void testLeftJoinNonEqui() { - String sinkTableDdl = - "CREATE TABLE MySink (\n" - + " a1 int,\n" - + " b1 int\n" - + ") with (\n" - + " 'connector' = 'values',\n" - + " 'sink-insert-only' = 'false',\n" - + " 'table-sink-class' = 'DEFAULT')"; - tEnv.executeSql(sinkTableDdl); - util.verifyJsonPlan( - "INSERT INTO MySink SELECT a1, b1 FROM A LEFT JOIN B ON a1 = b1 AND a2 > b2"); - } -} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/JoinJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/JoinJsonPlanITCase.java deleted file mode 100644 index 539cfc401e8..00000000000 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/JoinJsonPlanITCase.java +++ /dev/null @@ -1,168 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.runtime.stream.jsonplan; - -import org.apache.flink.table.planner.factories.TestValuesTableFactory; -import org.apache.flink.table.planner.runtime.utils.TestData; -import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; -import org.apache.flink.table.planner.utils.JsonPlanTestBase; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import java.io.File; -import java.util.Arrays; -import java.util.List; - -/** Test for join json plan. */ -class JoinJsonPlanITCase extends JsonPlanTestBase { - - @Override - @BeforeEach - protected void setup() throws Exception { - super.setup(); - createTestValuesSourceTable( - "A", - JavaScalaConversionUtil.toJava(TestData.smallData3()), - "a1 int", - "a2 bigint", - "a3 varchar"); - createTestValuesSourceTable( - "B", - JavaScalaConversionUtil.toJava(TestData.smallData5()), - "b1 int", - "b2 bigint", - "b3 int", - "b4 varchar", - "b5 bigint"); - } - - /** test non-window inner join. * */ - @Test - void testNonWindowInnerJoin() throws Exception { - List<String> dataT1 = - Arrays.asList( - "1,1,Hi1", "1,2,Hi2", "1,2,Hi2", "1,5,Hi3", "2,7,Hi5", "1,9,Hi6", "1,8,Hi8", - "3,8,Hi9"); - List<String> dataT2 = Arrays.asList("1,1,HiHi", "2,2,HeHe", "3,2,HeHe"); - createTestCsvSourceTable("T1", dataT1, "a int", "b bigint", "c varchar"); - createTestCsvSourceTable("T2", dataT2, "a int", "b bigint", "c varchar"); - File sinkPath = createTestCsvSinkTable("MySink", "a int", "c1 varchar", "c2 varchar"); - - compileSqlAndExecutePlan( - "insert into MySink " - + "SELECT t2.a, t2.c, t1.c\n" - + "FROM (\n" - + " SELECT if(a = 3, cast(null as int), a) as a, b, c FROM T1\n" - + ") as t1\n" - + "JOIN (\n" - + " SELECT if(a = 3, cast(null as int), a) as a, b, c FROM T2\n" - + ") as t2\n" - + "ON t1.a = t2.a AND t1.b > t2.b") - .await(); - List<String> expected = - Arrays.asList( - "1,HiHi,Hi2", - "1,HiHi,Hi2", - "1,HiHi,Hi3", - "1,HiHi,Hi6", - "1,HiHi,Hi8", - "2,HeHe,Hi5"); - assertResult(expected, sinkPath); - } - - @Test - void testIsNullInnerJoinWithNullCond() throws Exception { - List<String> dataT1 = - Arrays.asList( - "1,1,Hi1", "1,2,Hi2", "1,2,Hi2", "1,5,Hi3", "2,7,Hi5", "1,9,Hi6", "1,8,Hi8", - "3,8,Hi9"); - List<String> dataT2 = Arrays.asList("1,1,HiHi", "2,2,HeHe", "3,2,HeHe"); - createTestCsvSourceTable("T1", dataT1, "a int", "b bigint", "c varchar"); - createTestCsvSourceTable("T2", dataT2, "a int", "b bigint", "c varchar"); - createTestValuesSinkTable("MySink", "a int", "c1 varchar", "c2 varchar"); - - compileSqlAndExecutePlan( - "insert into MySink " - + "SELECT t2.a, t2.c, t1.c\n" - + "FROM (\n" - + " SELECT if(a = 3, cast(null as int), a) as a, b, c FROM T1\n" - + ") as t1\n" - + "JOIN (\n" - + " SELECT if(a = 3, cast(null as int), a) as a, b, c FROM T2\n" - + ") as t2\n" - + "ON \n" - + " ((t1.a is null AND t2.a is null) OR\n" - + " (t1.a = t2.a))\n" - + " AND t1.b > t2.b") - .await(); - List<String> expected = - Arrays.asList( - "+I[1, HiHi, Hi2]", - "+I[1, HiHi, Hi2]", - "+I[1, HiHi, Hi3]", - "+I[1, HiHi, Hi6]", - "+I[1, HiHi, Hi8]", - "+I[2, HeHe, Hi5]", - "+I[null, HeHe, Hi9]"); - assertResult(expected, TestValuesTableFactory.getResultsAsStrings("MySink")); - } - - @Test - void testJoin() throws Exception { - createTestValuesSinkTable("MySink", "a3 varchar", "b4 varchar"); - compileSqlAndExecutePlan("insert into MySink \n" + "SELECT a3, b4 FROM A, B WHERE a2 = b2") - .await(); - List<String> expected = - Arrays.asList( - "+I[Hello world, Hallo Welt]", "+I[Hello, Hallo Welt]", "+I[Hi, Hallo]"); - assertResult(expected, TestValuesTableFactory.getResultsAsStrings("MySink")); - } - - @Test - void testInnerJoin() throws Exception { - createTestValuesSinkTable("MySink", "a1 int", "b1 int"); - compileSqlAndExecutePlan("insert into MySink \n" + "SELECT a1, b1 FROM A JOIN B ON a1 = b1") - .await(); - List<String> expected = Arrays.asList("+I[1, 1]", "+I[2, 2]", "+I[2, 2]"); - assertResult(expected, TestValuesTableFactory.getResultsAsStrings("MySink")); - } - - @Test - void testJoinWithFilter() throws Exception { - createTestValuesSinkTable("MySink", "a3 varchar", "b4 varchar"); - compileSqlAndExecutePlan( - "insert into MySink \n" - + "SELECT a3, b4 FROM A, B where a2 = b2 and a2 < 2") - .await(); - List<String> expected = Arrays.asList("+I[Hi, Hallo]"); - assertResult(expected, TestValuesTableFactory.getResultsAsStrings("MySink")); - } - - @Test - void testInnerJoinWithDuplicateKey() throws Exception { - createTestValuesSinkTable("MySink", "a1 int", "b1 int", "b3 int"); - compileSqlAndExecutePlan( - "insert into MySink \n" - + "SELECT a1, b1, b3 FROM A JOIN B ON a1 = b1 AND a1 = b3") - .await(); - List<String> expected = Arrays.asList("+I[2, 2, 2]"); - assertResult(expected, TestValuesTableFactory.getResultsAsStrings("MySink")); - } -} diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoin.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoin.out deleted file mode 100644 index b260494e3b3..00000000000 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoin.out +++ /dev/null @@ -1,222 +0,0 @@ -{ - "flinkVersion" : "", - "nodes" : [ { - "id" : 1, - "type" : "stream-exec-table-source-scan_1", - "scanTableSource" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`A`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "a1", - "dataType" : "INT" - }, { - "name" : "a2", - "dataType" : "BIGINT" - }, { - "name" : "a3", - "dataType" : "BIGINT" - } ], - "watermarkSpecs" : [ ] - }, - "partitionKeys" : [ ], - "options" : { - "bounded" : "false", - "connector" : "values" - } - } - }, - "abilities" : [ { - "type" : "ProjectPushDown", - "projectedFields" : [ [ 0 ] ], - "producedType" : "ROW<`a1` INT> NOT NULL" - }, { - "type" : "ReadingMetadata", - "metadataKeys" : [ ], - "producedType" : "ROW<`a1` INT> NOT NULL" - } ] - }, - "outputType" : "ROW<`a1` INT>", - "description" : "TableSourceScan(table=[[default_catalog, default_database, A, project=[a1], metadata=[]]], fields=[a1])", - "inputProperties" : [ ] - }, { - "id" : 2, - "type" : "stream-exec-exchange_1", - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "HASH", - "keys" : [ 0 ] - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`a1` INT>", - "description" : "Exchange(distribution=[hash[a1]])" - }, { - "id" : 3, - "type" : "stream-exec-table-source-scan_1", - "scanTableSource" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`B`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "b1", - "dataType" : "INT" - }, { - "name" : "b2", - "dataType" : "BIGINT" - }, { - "name" : "b3", - "dataType" : "BIGINT" - } ], - "watermarkSpecs" : [ ] - }, - "partitionKeys" : [ ], - "options" : { - "bounded" : "false", - "connector" : "values" - } - } - }, - "abilities" : [ { - "type" : "ProjectPushDown", - "projectedFields" : [ [ 0 ] ], - "producedType" : "ROW<`b1` INT> NOT NULL" - }, { - "type" : "ReadingMetadata", - "metadataKeys" : [ ], - "producedType" : "ROW<`b1` INT> NOT NULL" - } ] - }, - "outputType" : "ROW<`b1` INT>", - "description" : "TableSourceScan(table=[[default_catalog, default_database, B, project=[b1], metadata=[]]], fields=[b1])", - "inputProperties" : [ ] - }, { - "id" : 4, - "type" : "stream-exec-exchange_1", - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "HASH", - "keys" : [ 0 ] - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`b1` INT>", - "description" : "Exchange(distribution=[hash[b1]])" - }, { - "id" : 5, - "type" : "stream-exec-join_1", - "joinSpec" : { - "joinType" : "INNER", - "leftKeys" : [ 0 ], - "rightKeys" : [ 0 ], - "filterNulls" : [ true ], - "nonEquiCondition" : null - }, - "state" : [ { - "index" : 0, - "ttl" : "0 ms", - "name" : "leftState" - }, { - "index" : 1, - "ttl" : "0 ms", - "name" : "rightState" - } ], - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - }, { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`a1` INT, `b1` INT>", - "description" : "Join(joinType=[InnerJoin], where=[(a1 = b1)], select=[a1, b1], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])" - }, { - "id" : 6, - "type" : "stream-exec-sink_1", - "configuration" : { - "table.exec.sink.keyed-shuffle" : "AUTO", - "table.exec.sink.not-null-enforcer" : "ERROR", - "table.exec.sink.rowtime-inserter" : "ENABLED", - "table.exec.sink.type-length-enforcer" : "IGNORE", - "table.exec.sink.upsert-materialize" : "AUTO" - }, - "dynamicTableSink" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`MySink`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "a1", - "dataType" : "INT" - }, { - "name" : "b1", - "dataType" : "INT" - } ], - "watermarkSpecs" : [ ] - }, - "partitionKeys" : [ ], - "options" : { - "connector" : "values", - "table-sink-class" : "DEFAULT" - } - } - } - }, - "inputChangelogMode" : [ "INSERT" ], - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`a1` INT, `b1` INT>", - "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[a1, b1])" - } ], - "edges" : [ { - "source" : 1, - "target" : 2, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 3, - "target" : 4, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 2, - "target" : 5, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 4, - "target" : 5, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 5, - "target" : 6, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - } ] -} diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoinWithEqualPk.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoinWithEqualPk.out deleted file mode 100644 index 7c65bb54840..00000000000 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoinWithEqualPk.out +++ /dev/null @@ -1,332 +0,0 @@ -{ - "flinkVersion" : "", - "nodes" : [ { - "id" : 1, - "type" : "stream-exec-table-source-scan_1", - "scanTableSource" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`A`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "a1", - "dataType" : "INT" - }, { - "name" : "a2", - "dataType" : "BIGINT" - }, { - "name" : "a3", - "dataType" : "BIGINT" - } ], - "watermarkSpecs" : [ ] - }, - "partitionKeys" : [ ], - "options" : { - "bounded" : "false", - "connector" : "values" - } - } - }, - "abilities" : [ { - "type" : "ProjectPushDown", - "projectedFields" : [ [ 0 ], [ 1 ] ], - "producedType" : "ROW<`a1` INT, `a2` BIGINT> NOT NULL" - }, { - "type" : "ReadingMetadata", - "metadataKeys" : [ ], - "producedType" : "ROW<`a1` INT, `a2` BIGINT> NOT NULL" - } ] - }, - "outputType" : "ROW<`a1` INT, `a2` BIGINT>", - "description" : "TableSourceScan(table=[[default_catalog, default_database, A, project=[a1, a2], metadata=[]]], fields=[a1, a2])", - "inputProperties" : [ ] - }, { - "id" : 2, - "type" : "stream-exec-exchange_1", - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "HASH", - "keys" : [ 0 ] - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`a1` INT, `a2` BIGINT>", - "description" : "Exchange(distribution=[hash[a1]])" - }, { - "id" : 3, - "type" : "stream-exec-group-aggregate_1", - "configuration" : { - "table.exec.mini-batch.enabled" : "false", - "table.exec.mini-batch.size" : "-1" - }, - "grouping" : [ 0 ], - "aggCalls" : [ ], - "aggCallNeedRetractions" : [ ], - "generateUpdateBefore" : true, - "needRetraction" : false, - "state" : [ { - "index" : 0, - "ttl" : "0 ms", - "name" : "groupAggregateState" - } ], - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`a1` INT>", - "description" : "GroupAggregate(groupBy=[a1], select=[a1])" - }, { - "id" : 4, - "type" : "stream-exec-exchange_1", - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "HASH", - "keys" : [ 0 ] - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`a1` INT>", - "description" : "Exchange(distribution=[hash[a1]])" - }, { - "id" : 5, - "type" : "stream-exec-table-source-scan_1", - "scanTableSource" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`B`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "b1", - "dataType" : "INT" - }, { - "name" : "b2", - "dataType" : "BIGINT" - }, { - "name" : "b3", - "dataType" : "BIGINT" - } ], - "watermarkSpecs" : [ ] - }, - "partitionKeys" : [ ], - "options" : { - "bounded" : "false", - "connector" : "values" - } - } - }, - "abilities" : [ { - "type" : "ProjectPushDown", - "projectedFields" : [ [ 0 ], [ 1 ] ], - "producedType" : "ROW<`b1` INT, `b2` BIGINT> NOT NULL" - }, { - "type" : "ReadingMetadata", - "metadataKeys" : [ ], - "producedType" : "ROW<`b1` INT, `b2` BIGINT> NOT NULL" - } ] - }, - "outputType" : "ROW<`b1` INT, `b2` BIGINT>", - "description" : "TableSourceScan(table=[[default_catalog, default_database, B, project=[b1, b2], metadata=[]]], fields=[b1, b2])", - "inputProperties" : [ ] - }, { - "id" : 6, - "type" : "stream-exec-exchange_1", - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "HASH", - "keys" : [ 0 ] - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`b1` INT, `b2` BIGINT>", - "description" : "Exchange(distribution=[hash[b1]])" - }, { - "id" : 7, - "type" : "stream-exec-group-aggregate_1", - "configuration" : { - "table.exec.mini-batch.enabled" : "false", - "table.exec.mini-batch.size" : "-1" - }, - "grouping" : [ 0 ], - "aggCalls" : [ ], - "aggCallNeedRetractions" : [ ], - "generateUpdateBefore" : true, - "needRetraction" : false, - "state" : [ { - "index" : 0, - "ttl" : "0 ms", - "name" : "groupAggregateState" - } ], - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`b1` INT>", - "description" : "GroupAggregate(groupBy=[b1], select=[b1])" - }, { - "id" : 8, - "type" : "stream-exec-exchange_1", - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "HASH", - "keys" : [ 0 ] - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`b1` INT>", - "description" : "Exchange(distribution=[hash[b1]])" - }, { - "id" : 9, - "type" : "stream-exec-join_1", - "joinSpec" : { - "joinType" : "INNER", - "leftKeys" : [ 0 ], - "rightKeys" : [ 0 ], - "filterNulls" : [ true ], - "nonEquiCondition" : null - }, - "leftUpsertKeys" : [ [ 0 ] ], - "rightUpsertKeys" : [ [ 0 ] ], - "state" : [ { - "index" : 0, - "ttl" : "0 ms", - "name" : "leftState" - }, { - "index" : 1, - "ttl" : "0 ms", - "name" : "rightState" - } ], - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - }, { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`a1` INT, `b1` INT>", - "description" : "Join(joinType=[InnerJoin], where=[(a1 = b1)], select=[a1, b1], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])" - }, { - "id" : 10, - "type" : "stream-exec-sink_1", - "configuration" : { - "table.exec.sink.keyed-shuffle" : "AUTO", - "table.exec.sink.not-null-enforcer" : "ERROR", - "table.exec.sink.rowtime-inserter" : "ENABLED", - "table.exec.sink.type-length-enforcer" : "IGNORE", - "table.exec.sink.upsert-materialize" : "AUTO" - }, - "dynamicTableSink" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`MySink`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "a1", - "dataType" : "INT" - }, { - "name" : "b1", - "dataType" : "INT" - } ], - "watermarkSpecs" : [ ] - }, - "partitionKeys" : [ ], - "options" : { - "connector" : "values", - "sink-insert-only" : "false", - "table-sink-class" : "DEFAULT" - } - } - } - }, - "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER" ], - "inputUpsertKey" : [ 0 ], - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`a1` INT, `b1` INT>", - "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[a1, b1])" - } ], - "edges" : [ { - "source" : 1, - "target" : 2, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 2, - "target" : 3, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 3, - "target" : 4, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 5, - "target" : 6, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 6, - "target" : 7, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 7, - "target" : 8, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 4, - "target" : 9, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 8, - "target" : 9, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 9, - "target" : 10, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - } ] -} diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoinWithPk.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoinWithPk.out deleted file mode 100644 index 5eb471f3a88..00000000000 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoinWithPk.out +++ /dev/null @@ -1,450 +0,0 @@ -{ - "flinkVersion" : "", - "nodes" : [ { - "id" : 1, - "type" : "stream-exec-table-source-scan_1", - "scanTableSource" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`A`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "a1", - "dataType" : "INT" - }, { - "name" : "a2", - "dataType" : "BIGINT" - }, { - "name" : "a3", - "dataType" : "BIGINT" - } ], - "watermarkSpecs" : [ ] - }, - "partitionKeys" : [ ], - "options" : { - "bounded" : "false", - "connector" : "values" - } - } - }, - "abilities" : [ { - "type" : "ProjectPushDown", - "projectedFields" : [ [ 0 ], [ 1 ] ], - "producedType" : "ROW<`a1` INT, `a2` BIGINT> NOT NULL" - }, { - "type" : "ReadingMetadata", - "metadataKeys" : [ ], - "producedType" : "ROW<`a1` INT, `a2` BIGINT> NOT NULL" - } ] - }, - "outputType" : "ROW<`a1` INT, `a2` BIGINT>", - "description" : "TableSourceScan(table=[[default_catalog, default_database, A, project=[a1, a2], metadata=[]]], fields=[a1, a2])", - "inputProperties" : [ ] - }, { - "id" : 2, - "type" : "stream-exec-exchange_1", - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "HASH", - "keys" : [ 0 ] - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`a1` INT, `a2` BIGINT>", - "description" : "Exchange(distribution=[hash[a1]])" - }, { - "id" : 3, - "type" : "stream-exec-group-aggregate_1", - "configuration" : { - "table.exec.mini-batch.enabled" : "false", - "table.exec.mini-batch.size" : "-1" - }, - "grouping" : [ 0 ], - "aggCalls" : [ { - "name" : "a2", - "internalName" : "$SUM$1", - "argList" : [ 1 ], - "filterArg" : -1, - "distinct" : false, - "approximate" : false, - "ignoreNulls" : false, - "type" : "BIGINT" - } ], - "aggCallNeedRetractions" : [ false ], - "generateUpdateBefore" : true, - "needRetraction" : false, - "state" : [ { - "index" : 0, - "ttl" : "0 ms", - "name" : "groupAggregateState" - } ], - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`a1` INT, `a2` BIGINT>", - "description" : "GroupAggregate(groupBy=[a1], select=[a1, SUM(a2) AS a2])" - }, { - "id" : 4, - "type" : "stream-exec-calc_1", - "projection" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 1, - "type" : "BIGINT" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 0, - "type" : "INT" - } ], - "condition" : null, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`a2` BIGINT, `a1` INT>", - "description" : "Calc(select=[a2, a1])" - }, { - "id" : 5, - "type" : "stream-exec-exchange_1", - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "HASH", - "keys" : [ 0 ] - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`a2` BIGINT, `a1` INT>", - "description" : "Exchange(distribution=[hash[a2]])" - }, { - "id" : 6, - "type" : "stream-exec-table-source-scan_1", - "scanTableSource" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`B`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "b1", - "dataType" : "INT" - }, { - "name" : "b2", - "dataType" : "BIGINT" - }, { - "name" : "b3", - "dataType" : "BIGINT" - } ], - "watermarkSpecs" : [ ] - }, - "partitionKeys" : [ ], - "options" : { - "bounded" : "false", - "connector" : "values" - } - } - }, - "abilities" : [ { - "type" : "ProjectPushDown", - "projectedFields" : [ [ 0 ], [ 1 ] ], - "producedType" : "ROW<`b1` INT, `b2` BIGINT> NOT NULL" - }, { - "type" : "ReadingMetadata", - "metadataKeys" : [ ], - "producedType" : "ROW<`b1` INT, `b2` BIGINT> NOT NULL" - } ] - }, - "outputType" : "ROW<`b1` INT, `b2` BIGINT>", - "description" : "TableSourceScan(table=[[default_catalog, default_database, B, project=[b1, b2], metadata=[]]], fields=[b1, b2])", - "inputProperties" : [ ] - }, { - "id" : 7, - "type" : "stream-exec-exchange_1", - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "HASH", - "keys" : [ 0 ] - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`b1` INT, `b2` BIGINT>", - "description" : "Exchange(distribution=[hash[b1]])" - }, { - "id" : 8, - "type" : "stream-exec-group-aggregate_1", - "configuration" : { - "table.exec.mini-batch.enabled" : "false", - "table.exec.mini-batch.size" : "-1" - }, - "grouping" : [ 0 ], - "aggCalls" : [ { - "name" : "b2", - "internalName" : "$SUM$1", - "argList" : [ 1 ], - "filterArg" : -1, - "distinct" : false, - "approximate" : false, - "ignoreNulls" : false, - "type" : "BIGINT" - } ], - "aggCallNeedRetractions" : [ false ], - "generateUpdateBefore" : true, - "needRetraction" : false, - "state" : [ { - "index" : 0, - "ttl" : "0 ms", - "name" : "groupAggregateState" - } ], - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`b1` INT, `b2` BIGINT>", - "description" : "GroupAggregate(groupBy=[b1], select=[b1, SUM(b2) AS b2])" - }, { - "id" : 9, - "type" : "stream-exec-calc_1", - "projection" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 1, - "type" : "BIGINT" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 0, - "type" : "INT" - } ], - "condition" : null, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`b2` BIGINT, `b1` INT>", - "description" : "Calc(select=[b2, b1])" - }, { - "id" : 10, - "type" : "stream-exec-exchange_1", - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "HASH", - "keys" : [ 0 ] - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`b2` BIGINT, `b1` INT>", - "description" : "Exchange(distribution=[hash[b2]])" - }, { - "id" : 11, - "type" : "stream-exec-join_1", - "joinSpec" : { - "joinType" : "INNER", - "leftKeys" : [ 0 ], - "rightKeys" : [ 0 ], - "filterNulls" : [ true ], - "nonEquiCondition" : null - }, - "leftUpsertKeys" : [ [ 1 ] ], - "rightUpsertKeys" : [ [ 1 ] ], - "state" : [ { - "index" : 0, - "ttl" : "0 ms", - "name" : "leftState" - }, { - "index" : 1, - "ttl" : "0 ms", - "name" : "rightState" - } ], - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - }, { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`a2` BIGINT, `a1` INT, `b2` BIGINT, `b1` INT>", - "description" : "Join(joinType=[InnerJoin], where=[(a2 = b2)], select=[a2, a1, b2, b1], leftInputSpec=[HasUniqueKey], rightInputSpec=[HasUniqueKey])" - }, { - "id" : 12, - "type" : "stream-exec-calc_1", - "projection" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 1, - "type" : "INT" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 0, - "type" : "BIGINT" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 3, - "type" : "INT" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 2, - "type" : "BIGINT" - } ], - "condition" : null, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`a1` INT, `a2` BIGINT, `b1` INT, `b2` BIGINT>", - "description" : "Calc(select=[a1, a2, b1, b2])" - }, { - "id" : 13, - "type" : "stream-exec-sink_1", - "configuration" : { - "table.exec.sink.keyed-shuffle" : "AUTO", - "table.exec.sink.not-null-enforcer" : "ERROR", - "table.exec.sink.rowtime-inserter" : "ENABLED", - "table.exec.sink.type-length-enforcer" : "IGNORE", - "table.exec.sink.upsert-materialize" : "AUTO" - }, - "dynamicTableSink" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`MySink`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "a1", - "dataType" : "INT" - }, { - "name" : "a2", - "dataType" : "BIGINT" - }, { - "name" : "b1", - "dataType" : "INT" - }, { - "name" : "b2", - "dataType" : "BIGINT" - } ], - "watermarkSpecs" : [ ] - }, - "partitionKeys" : [ ], - "options" : { - "connector" : "values", - "sink-insert-only" : "false", - "table-sink-class" : "DEFAULT" - } - } - } - }, - "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER" ], - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`a1` INT, `a2` BIGINT, `b1` INT, `b2` BIGINT>", - "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[a1, a2, b1, b2])" - } ], - "edges" : [ { - "source" : 1, - "target" : 2, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 2, - "target" : 3, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 3, - "target" : 4, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 4, - "target" : 5, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 6, - "target" : 7, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 7, - "target" : 8, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 8, - "target" : 9, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 9, - "target" : 10, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 5, - "target" : 11, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 10, - "target" : 11, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 11, - "target" : 12, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 12, - "target" : 13, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - } ] -} diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testLeftJoinNonEqui.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testLeftJoinNonEqui.out deleted file mode 100644 index da25c9e64b0..00000000000 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testLeftJoinNonEqui.out +++ /dev/null @@ -1,266 +0,0 @@ -{ - "flinkVersion" : "", - "nodes" : [ { - "id" : 1, - "type" : "stream-exec-table-source-scan_1", - "scanTableSource" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`A`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "a1", - "dataType" : "INT" - }, { - "name" : "a2", - "dataType" : "BIGINT" - }, { - "name" : "a3", - "dataType" : "BIGINT" - } ], - "watermarkSpecs" : [ ] - }, - "partitionKeys" : [ ], - "options" : { - "bounded" : "false", - "connector" : "values" - } - } - }, - "abilities" : [ { - "type" : "ProjectPushDown", - "projectedFields" : [ [ 0 ], [ 1 ] ], - "producedType" : "ROW<`a1` INT, `a2` BIGINT> NOT NULL" - }, { - "type" : "ReadingMetadata", - "metadataKeys" : [ ], - "producedType" : "ROW<`a1` INT, `a2` BIGINT> NOT NULL" - } ] - }, - "outputType" : "ROW<`a1` INT, `a2` BIGINT>", - "description" : "TableSourceScan(table=[[default_catalog, default_database, A, project=[a1, a2], metadata=[]]], fields=[a1, a2])", - "inputProperties" : [ ] - }, { - "id" : 2, - "type" : "stream-exec-exchange_1", - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "HASH", - "keys" : [ 0 ] - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`a1` INT, `a2` BIGINT>", - "description" : "Exchange(distribution=[hash[a1]])" - }, { - "id" : 3, - "type" : "stream-exec-table-source-scan_1", - "scanTableSource" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`B`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "b1", - "dataType" : "INT" - }, { - "name" : "b2", - "dataType" : "BIGINT" - }, { - "name" : "b3", - "dataType" : "BIGINT" - } ], - "watermarkSpecs" : [ ] - }, - "partitionKeys" : [ ], - "options" : { - "bounded" : "false", - "connector" : "values" - } - } - }, - "abilities" : [ { - "type" : "ProjectPushDown", - "projectedFields" : [ [ 0 ], [ 1 ] ], - "producedType" : "ROW<`b1` INT, `b2` BIGINT> NOT NULL" - }, { - "type" : "ReadingMetadata", - "metadataKeys" : [ ], - "producedType" : "ROW<`b1` INT, `b2` BIGINT> NOT NULL" - } ] - }, - "outputType" : "ROW<`b1` INT, `b2` BIGINT>", - "description" : "TableSourceScan(table=[[default_catalog, default_database, B, project=[b1, b2], metadata=[]]], fields=[b1, b2])", - "inputProperties" : [ ] - }, { - "id" : 4, - "type" : "stream-exec-exchange_1", - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "HASH", - "keys" : [ 0 ] - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`b1` INT, `b2` BIGINT>", - "description" : "Exchange(distribution=[hash[b1]])" - }, { - "id" : 5, - "type" : "stream-exec-join_1", - "joinSpec" : { - "joinType" : "LEFT", - "leftKeys" : [ 0 ], - "rightKeys" : [ 0 ], - "filterNulls" : [ true ], - "nonEquiCondition" : { - "kind" : "CALL", - "syntax" : "BINARY", - "internalName" : "$>$1", - "operands" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 1, - "type" : "BIGINT" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 3, - "type" : "BIGINT" - } ], - "type" : "BOOLEAN" - } - }, - "state" : [ { - "index" : 0, - "ttl" : "0 ms", - "name" : "leftState" - }, { - "index" : 1, - "ttl" : "0 ms", - "name" : "rightState" - } ], - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - }, { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`a1` INT, `a2` BIGINT, `b1` INT, `b2` BIGINT>", - "description" : "Join(joinType=[LeftOuterJoin], where=[((a1 = b1) AND (a2 > b2))], select=[a1, a2, b1, b2], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])" - }, { - "id" : 6, - "type" : "stream-exec-calc_1", - "projection" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 0, - "type" : "INT" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 2, - "type" : "INT" - } ], - "condition" : null, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`a1` INT, `b1` INT>", - "description" : "Calc(select=[a1, b1])" - }, { - "id" : 7, - "type" : "stream-exec-sink_1", - "configuration" : { - "table.exec.sink.keyed-shuffle" : "AUTO", - "table.exec.sink.not-null-enforcer" : "ERROR", - "table.exec.sink.rowtime-inserter" : "ENABLED", - "table.exec.sink.type-length-enforcer" : "IGNORE", - "table.exec.sink.upsert-materialize" : "AUTO" - }, - "dynamicTableSink" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`MySink`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "a1", - "dataType" : "INT" - }, { - "name" : "b1", - "dataType" : "INT" - } ], - "watermarkSpecs" : [ ] - }, - "partitionKeys" : [ ], - "options" : { - "connector" : "values", - "sink-insert-only" : "false", - "table-sink-class" : "DEFAULT" - } - } - } - }, - "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE" ], - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`a1` INT, `b1` INT>", - "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[a1, b1])" - } ], - "edges" : [ { - "source" : 1, - "target" : 2, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 3, - "target" : 4, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 2, - "target" : 5, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 4, - "target" : 5, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 5, - "target" : 6, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 6, - "target" : 7, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - } ] -}