This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 413aed084974efe708833b3e1cfeb7a3f5ce544c Author: bvarghese1 <bvargh...@confluent.io> AuthorDate: Wed Jan 3 17:25:34 2024 -0800 [FLINK-33979] Remove TableSink Json Plan & Json IT tests - These are covered by the new restore tests --- .../nodes/exec/stream/TableSinkJsonPlanTest.java | 149 -------------------- .../stream/jsonplan/TableSinkJsonPlanITCase.java | 75 ---------- ...WithNonDeterministicFuncSinkWithDifferentPk.out | 153 --------------------- .../testOverwrite.out | 93 ------------- .../testPartialInsert.out | 150 -------------------- .../testPartitioning.out | 137 ------------------ .../testWritingMetadata.out | 95 ------------- 7 files changed, 852 deletions(-) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkJsonPlanTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkJsonPlanTest.java deleted file mode 100644 index 3c321b5c59a..00000000000 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkJsonPlanTest.java +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.plan.nodes.exec.stream; - -import org.apache.flink.table.api.TableConfig; -import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions; -import org.apache.flink.table.planner.utils.StreamTableTestUtil; -import org.apache.flink.table.planner.utils.TableTestBase; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -/** Test json serialization/deserialization for table sink. */ -class TableSinkJsonPlanTest extends TableTestBase { - - private StreamTableTestUtil util; - private TableEnvironment tEnv; - - @BeforeEach - void setup() { - util = streamTestUtil(TableConfig.getDefault()); - tEnv = util.getTableEnv(); - - String srcTableDdl = - "CREATE TABLE MyTable (\n" - + " a bigint,\n" - + " b int,\n" - + " c varchar\n" - + ") with (\n" - + " 'connector' = 'values',\n" - + " 'bounded' = 'false')"; - tEnv.executeSql(srcTableDdl); - } - - @Test - void testOverwrite() { - String sinkTableDdl = - "CREATE TABLE MySink (\n" - + " a bigint,\n" - + " b int,\n" - + " c varchar\n" - + ") with (\n" - + " 'connector' = 'filesystem',\n" - + " 'format' = 'testcsv',\n" - + " 'path' = '/tmp')"; - tEnv.executeSql(sinkTableDdl); - util.verifyJsonPlan("insert overwrite MySink select * from MyTable"); - } - - @Test - void testPartitioning() { - String sinkTableDdl = - "CREATE TABLE MySink (\n" - + " a bigint,\n" - + " b int,\n" - + " c varchar\n" - + ") partitioned by (c) with (\n" - + " 'connector' = 'filesystem',\n" - + " 'format' = 'testcsv',\n" - + " 'path' = '/tmp')"; - tEnv.executeSql(sinkTableDdl); - util.verifyJsonPlan("insert into MySink partition (c='A') select a, b from MyTable"); - } - - @Test - void testWritingMetadata() { - String sinkTableDdl = - "CREATE TABLE MySink (\n" - + " a bigint,\n" - + " b int,\n" - + " m varchar METADATA\n" - + ") with (\n" - + " 'connector' = 'values',\n" - + " 'writable-metadata' = 'm:STRING')"; - tEnv.executeSql(sinkTableDdl); - util.verifyJsonPlan("insert into MySink select * from MyTable"); - } - - @Test - void testCdcWithNonDeterministicFuncSinkWithDifferentPk() { - tEnv.createTemporaryFunction( - "ndFunc", new JavaUserDefinedScalarFunctions.NonDeterministicUdf()); - - String cdcDdl = - "CREATE TABLE users (\n" - + " user_id STRING,\n" - + " user_name STRING,\n" - + " email STRING,\n" - + " balance DECIMAL(18,2),\n" - + " primary key (user_id) not enforced\n" - + ") WITH (\n" - + " 'connector' = 'values',\n" - + " 'changelog-mode' = 'I,UA,UB,D'\n" - + ")"; - - String sinkTableDdl = - "CREATE TABLE sink (\n" - + " user_id STRING,\n" - + " user_name STRING,\n" - + " email STRING,\n" - + " balance DECIMAL(18,2),\n" - + " PRIMARY KEY(email) NOT ENFORCED\n" - + ") WITH(\n" - + " 'connector' = 'values',\n" - + " 'sink-insert-only' = 'false'\n" - + ")"; - tEnv.executeSql(cdcDdl); - tEnv.executeSql(sinkTableDdl); - - util.verifyJsonPlan( - "insert into sink select user_id, ndFunc(user_name), email, balance from users"); - } - - @Test - void testPartialInsert() { - String sinkTableDdl = - "CREATE TABLE MySink (\n" - + " a bigint,\n" - + " b int,\n" - + " c varchar,\n" - + " d int,\n" - + " e double,\n" - + " f varchar\n" - + ") partitioned by (c) with (\n" - + " 'connector' = 'filesystem',\n" - + " 'format' = 'testcsv',\n" - + " 'path' = '/tmp')"; - tEnv.executeSql(sinkTableDdl); - util.verifyJsonPlan( - "insert into MySink partition (c='A') (f,a,b) select c, a, b from MyTable"); - } -} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TableSinkJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TableSinkJsonPlanITCase.java deleted file mode 100644 index 0ba61358fbf..00000000000 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TableSinkJsonPlanITCase.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.runtime.stream.jsonplan; - -import org.apache.flink.table.planner.factories.TestValuesTableFactory; -import org.apache.flink.table.planner.utils.JsonPlanTestBase; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import java.io.File; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; - -/** Test for table sink json plan. */ -class TableSinkJsonPlanITCase extends JsonPlanTestBase { - - List<String> data = Arrays.asList("1,1,hi", "2,1,hello", "3,2,hello world"); - - @BeforeEach - @Override - protected void setup() throws Exception { - super.setup(); - createTestCsvSourceTable("MyTable", data, "a bigint", "b int", "c varchar"); - } - - @Test - void testPartitioning() throws Exception { - File sinkPath = - createTestCsvSinkTable( - "MySink", - new String[] {"a bigint", "p int not null", "b int", "c varchar"}, - "b"); - - compileSqlAndExecutePlan("insert into MySink partition (b=3) select * from MyTable") - .await(); - - assertResult(data, sinkPath); - } - - @Test - void testWritingMetadata() throws Exception { - createTestValuesSinkTable( - "MySink", - new String[] {"a bigint", "b int", "c varchar METADATA"}, - new HashMap<String, String>() { - { - put("writable-metadata", "c:STRING"); - } - }); - - compileSqlAndExecutePlan("insert into MySink select * from MyTable").await(); - - List<String> result = TestValuesTableFactory.getResultsAsStrings("MySink"); - assertResult( - Arrays.asList("+I[1, 1, hi]", "+I[2, 1, hello]", "+I[3, 2, hello world]"), result); - } -} diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkJsonPlanTest_jsonplan/testCdcWithNonDeterministicFuncSinkWithDifferentPk.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkJsonPlanTest_jsonplan/testCdcWithNonDeterministicFuncSinkWithDifferentPk.out deleted file mode 100644 index 1c25e479325..00000000000 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkJsonPlanTest_jsonplan/testCdcWithNonDeterministicFuncSinkWithDifferentPk.out +++ /dev/null @@ -1,153 +0,0 @@ -{ - "flinkVersion" : "", - "nodes" : [ { - "id" : 1, - "type" : "stream-exec-table-source-scan_1", - "scanTableSource" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`users`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "user_id", - "dataType" : "VARCHAR(2147483647) NOT NULL" - }, { - "name" : "user_name", - "dataType" : "VARCHAR(2147483647)" - }, { - "name" : "email", - "dataType" : "VARCHAR(2147483647)" - }, { - "name" : "balance", - "dataType" : "DECIMAL(18, 2)" - } ], - "watermarkSpecs" : [ ], - "primaryKey" : { - "name" : "PK_user_id", - "type" : "PRIMARY_KEY", - "columns" : [ "user_id" ] - } - }, - "partitionKeys" : [ ], - "options" : { - "changelog-mode" : "I,UA,UB,D", - "connector" : "values" - } - } - } - }, - "outputType" : "ROW<`user_id` VARCHAR(2147483647) NOT NULL, `user_name` VARCHAR(2147483647), `email` VARCHAR(2147483647), `balance` DECIMAL(18, 2)>", - "description" : "TableSourceScan(table=[[default_catalog, default_database, users]], fields=[user_id, user_name, email, balance])", - "inputProperties" : [ ] - }, { - "id" : 2, - "type" : "stream-exec-calc_1", - "projection" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 0, - "type" : "VARCHAR(2147483647) NOT NULL" - }, { - "kind" : "CALL", - "catalogName" : "`default_catalog`.`default_database`.`ndFunc`", - "operands" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 1, - "type" : "VARCHAR(2147483647)" - } ], - "type" : "VARCHAR(2147483647)" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 2, - "type" : "VARCHAR(2147483647)" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 3, - "type" : "DECIMAL(18, 2)" - } ], - "condition" : null, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`user_id` VARCHAR(2147483647) NOT NULL, `EXPR$1` VARCHAR(2147483647), `email` VARCHAR(2147483647), `balance` DECIMAL(18, 2)>", - "description" : "Calc(select=[user_id, ndFunc(user_name) AS EXPR$1, email, balance])" - }, { - "id" : 3, - "type" : "stream-exec-sink_1", - "configuration" : { - "table.exec.sink.keyed-shuffle" : "AUTO", - "table.exec.sink.not-null-enforcer" : "ERROR", - "table.exec.sink.rowtime-inserter" : "ENABLED", - "table.exec.sink.type-length-enforcer" : "IGNORE", - "table.exec.sink.upsert-materialize" : "AUTO" - }, - "dynamicTableSink" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`sink`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "user_id", - "dataType" : "VARCHAR(2147483647)" - }, { - "name" : "user_name", - "dataType" : "VARCHAR(2147483647)" - }, { - "name" : "email", - "dataType" : "VARCHAR(2147483647) NOT NULL" - }, { - "name" : "balance", - "dataType" : "DECIMAL(18, 2)" - } ], - "watermarkSpecs" : [ ], - "primaryKey" : { - "name" : "PK_email", - "type" : "PRIMARY_KEY", - "columns" : [ "email" ] - } - }, - "partitionKeys" : [ ], - "options" : { - "connector" : "values", - "sink-insert-only" : "false" - } - } - } - }, - "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE" ], - "requireUpsertMaterialize" : true, - "state" : [ { - "index" : 0, - "ttl" : "0 ms", - "name" : "sinkMaterializeState" - } ], - "inputUpsertKey" : [ 0 ], - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`user_id` VARCHAR(2147483647) NOT NULL, `EXPR$1` VARCHAR(2147483647), `email` VARCHAR(2147483647), `balance` DECIMAL(18, 2)>", - "description" : "Sink(table=[default_catalog.default_database.sink], fields=[user_id, EXPR$1, email, balance], upsertMaterialize=[true])" - } ], - "edges" : [ { - "source" : 1, - "target" : 2, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 2, - "target" : 3, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - } ] -} diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkJsonPlanTest_jsonplan/testOverwrite.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkJsonPlanTest_jsonplan/testOverwrite.out deleted file mode 100644 index 18fc2687ff3..00000000000 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkJsonPlanTest_jsonplan/testOverwrite.out +++ /dev/null @@ -1,93 +0,0 @@ -{ - "flinkVersion" : "", - "nodes" : [ { - "id" : 1, - "type" : "stream-exec-table-source-scan_1", - "scanTableSource" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`MyTable`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "a", - "dataType" : "BIGINT" - }, { - "name" : "b", - "dataType" : "INT" - }, { - "name" : "c", - "dataType" : "VARCHAR(2147483647)" - } ], - "watermarkSpecs" : [ ] - }, - "partitionKeys" : [ ], - "options" : { - "bounded" : "false", - "connector" : "values" - } - } - } - }, - "outputType" : "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>", - "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c])", - "inputProperties" : [ ] - }, { - "id" : 2, - "type" : "stream-exec-sink_1", - "configuration" : { - "table.exec.sink.keyed-shuffle" : "AUTO", - "table.exec.sink.not-null-enforcer" : "ERROR", - "table.exec.sink.rowtime-inserter" : "ENABLED", - "table.exec.sink.type-length-enforcer" : "IGNORE", - "table.exec.sink.upsert-materialize" : "AUTO" - }, - "dynamicTableSink" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`MySink`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "a", - "dataType" : "BIGINT" - }, { - "name" : "b", - "dataType" : "INT" - }, { - "name" : "c", - "dataType" : "VARCHAR(2147483647)" - } ], - "watermarkSpecs" : [ ] - }, - "partitionKeys" : [ ], - "options" : { - "connector" : "filesystem", - "format" : "testcsv", - "path" : "/tmp" - } - } - }, - "abilities" : [ { - "type" : "Overwrite", - "overwrite" : true - } ] - }, - "inputChangelogMode" : [ "INSERT" ], - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>", - "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[a, b, c])" - } ], - "edges" : [ { - "source" : 1, - "target" : 2, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - } ] -} diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkJsonPlanTest_jsonplan/testPartialInsert.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkJsonPlanTest_jsonplan/testPartialInsert.out deleted file mode 100644 index db7fba66429..00000000000 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkJsonPlanTest_jsonplan/testPartialInsert.out +++ /dev/null @@ -1,150 +0,0 @@ -{ - "flinkVersion" : "", - "nodes" : [ { - "id" : 1, - "type" : "stream-exec-table-source-scan_1", - "scanTableSource" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`MyTable`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "a", - "dataType" : "BIGINT" - }, { - "name" : "b", - "dataType" : "INT" - }, { - "name" : "c", - "dataType" : "VARCHAR(2147483647)" - } ], - "watermarkSpecs" : [ ] - }, - "partitionKeys" : [ ], - "options" : { - "bounded" : "false", - "connector" : "values" - } - } - } - }, - "outputType" : "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>", - "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c])", - "inputProperties" : [ ] - }, { - "id" : 2, - "type" : "stream-exec-calc_1", - "projection" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 0, - "type" : "BIGINT" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 1, - "type" : "INT" - }, { - "kind" : "LITERAL", - "value" : "A", - "type" : "VARCHAR(2147483647) NOT NULL" - }, { - "kind" : "LITERAL", - "value" : null, - "type" : "INT" - }, { - "kind" : "LITERAL", - "value" : null, - "type" : "DOUBLE" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 2, - "type" : "VARCHAR(2147483647)" - } ], - "condition" : null, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`a` BIGINT, `b` INT, `EXPR$2` VARCHAR(2147483647) NOT NULL, `EXPR$3` INT, `EXPR$4` DOUBLE, `c` VARCHAR(2147483647)>", - "description" : "Calc(select=[a, b, 'A' AS EXPR$2, null:INTEGER AS EXPR$3, null:DOUBLE AS EXPR$4, c])" - }, { - "id" : 3, - "type" : "stream-exec-sink_1", - "configuration" : { - "table.exec.sink.keyed-shuffle" : "AUTO", - "table.exec.sink.not-null-enforcer" : "ERROR", - "table.exec.sink.rowtime-inserter" : "ENABLED", - "table.exec.sink.type-length-enforcer" : "IGNORE", - "table.exec.sink.upsert-materialize" : "AUTO" - }, - "dynamicTableSink" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`MySink`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "a", - "dataType" : "BIGINT" - }, { - "name" : "b", - "dataType" : "INT" - }, { - "name" : "c", - "dataType" : "VARCHAR(2147483647)" - }, { - "name" : "d", - "dataType" : "INT" - }, { - "name" : "e", - "dataType" : "DOUBLE" - }, { - "name" : "f", - "dataType" : "VARCHAR(2147483647)" - } ], - "watermarkSpecs" : [ ] - }, - "partitionKeys" : [ "c" ], - "options" : { - "connector" : "filesystem", - "format" : "testcsv", - "path" : "/tmp" - } - } - }, - "abilities" : [ { - "type" : "Partitioning", - "partition" : { - "c" : "A" - } - } ], - "targetColumns" : [ [ 5 ], [ 0 ], [ 1 ] ] - }, - "inputChangelogMode" : [ "INSERT" ], - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`a` BIGINT, `b` INT, `EXPR$2` VARCHAR(2147483647) NOT NULL, `EXPR$3` INT, `EXPR$4` DOUBLE, `c` VARCHAR(2147483647)>", - "description" : "Sink(table=[default_catalog.default_database.MySink], targetColumns=[[5],[0],[1]], fields=[a, b, EXPR$2, EXPR$3, EXPR$4, c])" - } ], - "edges" : [ { - "source" : 1, - "target" : 2, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 2, - "target" : 3, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - } ] -} diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkJsonPlanTest_jsonplan/testPartitioning.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkJsonPlanTest_jsonplan/testPartitioning.out deleted file mode 100644 index d4c1415e67a..00000000000 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkJsonPlanTest_jsonplan/testPartitioning.out +++ /dev/null @@ -1,137 +0,0 @@ -{ - "flinkVersion" : "", - "nodes" : [ { - "id" : 1, - "type" : "stream-exec-table-source-scan_1", - "scanTableSource" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`MyTable`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "a", - "dataType" : "BIGINT" - }, { - "name" : "b", - "dataType" : "INT" - }, { - "name" : "c", - "dataType" : "VARCHAR(2147483647)" - } ], - "watermarkSpecs" : [ ] - }, - "partitionKeys" : [ ], - "options" : { - "bounded" : "false", - "connector" : "values" - } - } - }, - "abilities" : [ { - "type" : "ProjectPushDown", - "projectedFields" : [ [ 0 ], [ 1 ] ], - "producedType" : "ROW<`a` BIGINT, `b` INT> NOT NULL" - }, { - "type" : "ReadingMetadata", - "metadataKeys" : [ ], - "producedType" : "ROW<`a` BIGINT, `b` INT> NOT NULL" - } ] - }, - "outputType" : "ROW<`a` BIGINT, `b` INT>", - "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, b], metadata=[]]], fields=[a, b])", - "inputProperties" : [ ] - }, { - "id" : 2, - "type" : "stream-exec-calc_1", - "projection" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 0, - "type" : "BIGINT" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 1, - "type" : "INT" - }, { - "kind" : "LITERAL", - "value" : "A", - "type" : "VARCHAR(2147483647) NOT NULL" - } ], - "condition" : null, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`a` BIGINT, `b` INT, `EXPR$2` VARCHAR(2147483647) NOT NULL>", - "description" : "Calc(select=[a, b, 'A' AS EXPR$2])" - }, { - "id" : 3, - "type" : "stream-exec-sink_1", - "configuration" : { - "table.exec.sink.keyed-shuffle" : "AUTO", - "table.exec.sink.not-null-enforcer" : "ERROR", - "table.exec.sink.rowtime-inserter" : "ENABLED", - "table.exec.sink.type-length-enforcer" : "IGNORE", - "table.exec.sink.upsert-materialize" : "AUTO" - }, - "dynamicTableSink" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`MySink`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "a", - "dataType" : "BIGINT" - }, { - "name" : "b", - "dataType" : "INT" - }, { - "name" : "c", - "dataType" : "VARCHAR(2147483647)" - } ], - "watermarkSpecs" : [ ] - }, - "partitionKeys" : [ "c" ], - "options" : { - "connector" : "filesystem", - "format" : "testcsv", - "path" : "/tmp" - } - } - }, - "abilities" : [ { - "type" : "Partitioning", - "partition" : { - "c" : "A" - } - } ] - }, - "inputChangelogMode" : [ "INSERT" ], - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`a` BIGINT, `b` INT, `EXPR$2` VARCHAR(2147483647) NOT NULL>", - "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[a, b, EXPR$2])" - } ], - "edges" : [ { - "source" : 1, - "target" : 2, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 2, - "target" : 3, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - } ] -} diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkJsonPlanTest_jsonplan/testWritingMetadata.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkJsonPlanTest_jsonplan/testWritingMetadata.out deleted file mode 100644 index 3508d24f689..00000000000 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkJsonPlanTest_jsonplan/testWritingMetadata.out +++ /dev/null @@ -1,95 +0,0 @@ -{ - "flinkVersion" : "", - "nodes" : [ { - "id" : 1, - "type" : "stream-exec-table-source-scan_1", - "scanTableSource" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`MyTable`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "a", - "dataType" : "BIGINT" - }, { - "name" : "b", - "dataType" : "INT" - }, { - "name" : "c", - "dataType" : "VARCHAR(2147483647)" - } ], - "watermarkSpecs" : [ ] - }, - "partitionKeys" : [ ], - "options" : { - "bounded" : "false", - "connector" : "values" - } - } - } - }, - "outputType" : "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>", - "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c])", - "inputProperties" : [ ] - }, { - "id" : 2, - "type" : "stream-exec-sink_1", - "configuration" : { - "table.exec.sink.keyed-shuffle" : "AUTO", - "table.exec.sink.not-null-enforcer" : "ERROR", - "table.exec.sink.rowtime-inserter" : "ENABLED", - "table.exec.sink.type-length-enforcer" : "IGNORE", - "table.exec.sink.upsert-materialize" : "AUTO" - }, - "dynamicTableSink" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`MySink`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "a", - "dataType" : "BIGINT" - }, { - "name" : "b", - "dataType" : "INT" - }, { - "name" : "m", - "kind" : "METADATA", - "dataType" : "VARCHAR(2147483647)", - "isVirtual" : false - } ], - "watermarkSpecs" : [ ] - }, - "partitionKeys" : [ ], - "options" : { - "connector" : "values", - "writable-metadata" : "m:STRING" - } - } - }, - "abilities" : [ { - "type" : "WritingMetadata", - "metadataKeys" : [ "m" ], - "consumedType" : "ROW<`a` BIGINT, `b` INT, `m` VARCHAR(2147483647)> NOT NULL" - } ] - }, - "inputChangelogMode" : [ "INSERT" ], - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>", - "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[a, b, c])" - } ], - "edges" : [ { - "source" : 1, - "target" : 2, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - } ] -}