This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 413aed084974efe708833b3e1cfeb7a3f5ce544c
Author: bvarghese1 <bvargh...@confluent.io>
AuthorDate: Wed Jan 3 17:25:34 2024 -0800

    [FLINK-33979] Remove TableSink Json Plan & Json IT tests
    
    - These are covered by the new restore tests
---
 .../nodes/exec/stream/TableSinkJsonPlanTest.java   | 149 --------------------
 .../stream/jsonplan/TableSinkJsonPlanITCase.java   |  75 ----------
 ...WithNonDeterministicFuncSinkWithDifferentPk.out | 153 ---------------------
 .../testOverwrite.out                              |  93 -------------
 .../testPartialInsert.out                          | 150 --------------------
 .../testPartitioning.out                           | 137 ------------------
 .../testWritingMetadata.out                        |  95 -------------
 7 files changed, 852 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkJsonPlanTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkJsonPlanTest.java
deleted file mode 100644
index 3c321b5c59a..00000000000
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkJsonPlanTest.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.plan.nodes.exec.stream;
-
-import org.apache.flink.table.api.TableConfig;
-import org.apache.flink.table.api.TableEnvironment;
-import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions;
-import org.apache.flink.table.planner.utils.StreamTableTestUtil;
-import org.apache.flink.table.planner.utils.TableTestBase;
-
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-/** Test json serialization/deserialization for table sink. */
-class TableSinkJsonPlanTest extends TableTestBase {
-
-    private StreamTableTestUtil util;
-    private TableEnvironment tEnv;
-
-    @BeforeEach
-    void setup() {
-        util = streamTestUtil(TableConfig.getDefault());
-        tEnv = util.getTableEnv();
-
-        String srcTableDdl =
-                "CREATE TABLE MyTable (\n"
-                        + "  a bigint,\n"
-                        + "  b int,\n"
-                        + "  c varchar\n"
-                        + ") with (\n"
-                        + "  'connector' = 'values',\n"
-                        + "  'bounded' = 'false')";
-        tEnv.executeSql(srcTableDdl);
-    }
-
-    @Test
-    void testOverwrite() {
-        String sinkTableDdl =
-                "CREATE TABLE MySink (\n"
-                        + "  a bigint,\n"
-                        + "  b int,\n"
-                        + "  c varchar\n"
-                        + ") with (\n"
-                        + "  'connector' = 'filesystem',\n"
-                        + "  'format' = 'testcsv',\n"
-                        + "  'path' = '/tmp')";
-        tEnv.executeSql(sinkTableDdl);
-        util.verifyJsonPlan("insert overwrite MySink select * from MyTable");
-    }
-
-    @Test
-    void testPartitioning() {
-        String sinkTableDdl =
-                "CREATE TABLE MySink (\n"
-                        + "  a bigint,\n"
-                        + "  b int,\n"
-                        + "  c varchar\n"
-                        + ") partitioned by (c) with (\n"
-                        + "  'connector' = 'filesystem',\n"
-                        + "  'format' = 'testcsv',\n"
-                        + "  'path' = '/tmp')";
-        tEnv.executeSql(sinkTableDdl);
-        util.verifyJsonPlan("insert into MySink partition (c='A') select a, b 
from MyTable");
-    }
-
-    @Test
-    void testWritingMetadata() {
-        String sinkTableDdl =
-                "CREATE TABLE MySink (\n"
-                        + "  a bigint,\n"
-                        + "  b int,\n"
-                        + "  m varchar METADATA\n"
-                        + ") with (\n"
-                        + "  'connector' = 'values',\n"
-                        + "  'writable-metadata' = 'm:STRING')";
-        tEnv.executeSql(sinkTableDdl);
-        util.verifyJsonPlan("insert into MySink select * from MyTable");
-    }
-
-    @Test
-    void testCdcWithNonDeterministicFuncSinkWithDifferentPk() {
-        tEnv.createTemporaryFunction(
-                "ndFunc", new 
JavaUserDefinedScalarFunctions.NonDeterministicUdf());
-
-        String cdcDdl =
-                "CREATE TABLE users (\n"
-                        + "  user_id STRING,\n"
-                        + "  user_name STRING,\n"
-                        + "  email STRING,\n"
-                        + "  balance DECIMAL(18,2),\n"
-                        + "  primary key (user_id) not enforced\n"
-                        + ") WITH (\n"
-                        + " 'connector' = 'values',\n"
-                        + " 'changelog-mode' = 'I,UA,UB,D'\n"
-                        + ")";
-
-        String sinkTableDdl =
-                "CREATE TABLE sink (\n"
-                        + "  user_id STRING,\n"
-                        + "  user_name STRING,\n"
-                        + "  email STRING,\n"
-                        + "  balance DECIMAL(18,2),\n"
-                        + "  PRIMARY KEY(email) NOT ENFORCED\n"
-                        + ") WITH(\n"
-                        + "  'connector' = 'values',\n"
-                        + "  'sink-insert-only' = 'false'\n"
-                        + ")";
-        tEnv.executeSql(cdcDdl);
-        tEnv.executeSql(sinkTableDdl);
-
-        util.verifyJsonPlan(
-                "insert into sink select user_id, ndFunc(user_name), email, 
balance from users");
-    }
-
-    @Test
-    void testPartialInsert() {
-        String sinkTableDdl =
-                "CREATE TABLE MySink (\n"
-                        + "  a bigint,\n"
-                        + "  b int,\n"
-                        + "  c varchar,\n"
-                        + "  d int,\n"
-                        + "  e double,\n"
-                        + "  f varchar\n"
-                        + ") partitioned by (c) with (\n"
-                        + "  'connector' = 'filesystem',\n"
-                        + "  'format' = 'testcsv',\n"
-                        + "  'path' = '/tmp')";
-        tEnv.executeSql(sinkTableDdl);
-        util.verifyJsonPlan(
-                "insert into MySink partition (c='A') (f,a,b) select c, a, b 
from MyTable");
-    }
-}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TableSinkJsonPlanITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TableSinkJsonPlanITCase.java
deleted file mode 100644
index 0ba61358fbf..00000000000
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TableSinkJsonPlanITCase.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.runtime.stream.jsonplan;
-
-import org.apache.flink.table.planner.factories.TestValuesTableFactory;
-import org.apache.flink.table.planner.utils.JsonPlanTestBase;
-
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import java.io.File;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-
-/** Test for table sink json plan. */
-class TableSinkJsonPlanITCase extends JsonPlanTestBase {
-
-    List<String> data = Arrays.asList("1,1,hi", "2,1,hello", "3,2,hello 
world");
-
-    @BeforeEach
-    @Override
-    protected void setup() throws Exception {
-        super.setup();
-        createTestCsvSourceTable("MyTable", data, "a bigint", "b int", "c 
varchar");
-    }
-
-    @Test
-    void testPartitioning() throws Exception {
-        File sinkPath =
-                createTestCsvSinkTable(
-                        "MySink",
-                        new String[] {"a bigint", "p int not null", "b int", 
"c varchar"},
-                        "b");
-
-        compileSqlAndExecutePlan("insert into MySink partition (b=3) select * 
from MyTable")
-                .await();
-
-        assertResult(data, sinkPath);
-    }
-
-    @Test
-    void testWritingMetadata() throws Exception {
-        createTestValuesSinkTable(
-                "MySink",
-                new String[] {"a bigint", "b int", "c varchar METADATA"},
-                new HashMap<String, String>() {
-                    {
-                        put("writable-metadata", "c:STRING");
-                    }
-                });
-
-        compileSqlAndExecutePlan("insert into MySink select * from 
MyTable").await();
-
-        List<String> result = 
TestValuesTableFactory.getResultsAsStrings("MySink");
-        assertResult(
-                Arrays.asList("+I[1, 1, hi]", "+I[2, 1, hello]", "+I[3, 2, 
hello world]"), result);
-    }
-}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkJsonPlanTest_jsonplan/testCdcWithNonDeterministicFuncSinkWithDifferentPk.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkJsonPlanTest_jsonplan/testCdcWithNonDeterministicFuncSinkWithDifferentPk.out
deleted file mode 100644
index 1c25e479325..00000000000
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkJsonPlanTest_jsonplan/testCdcWithNonDeterministicFuncSinkWithDifferentPk.out
+++ /dev/null
@@ -1,153 +0,0 @@
-{
-  "flinkVersion" : "",
-  "nodes" : [ {
-    "id" : 1,
-    "type" : "stream-exec-table-source-scan_1",
-    "scanTableSource" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`users`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "user_id",
-              "dataType" : "VARCHAR(2147483647) NOT NULL"
-            }, {
-              "name" : "user_name",
-              "dataType" : "VARCHAR(2147483647)"
-            }, {
-              "name" : "email",
-              "dataType" : "VARCHAR(2147483647)"
-            }, {
-              "name" : "balance",
-              "dataType" : "DECIMAL(18, 2)"
-            } ],
-            "watermarkSpecs" : [ ],
-            "primaryKey" : {
-              "name" : "PK_user_id",
-              "type" : "PRIMARY_KEY",
-              "columns" : [ "user_id" ]
-            }
-          },
-          "partitionKeys" : [ ],
-          "options" : {
-            "changelog-mode" : "I,UA,UB,D",
-            "connector" : "values"
-          }
-        }
-      }
-    },
-    "outputType" : "ROW<`user_id` VARCHAR(2147483647) NOT NULL, `user_name` 
VARCHAR(2147483647), `email` VARCHAR(2147483647), `balance` DECIMAL(18, 2)>",
-    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, users]], fields=[user_id, user_name, email, balance])",
-    "inputProperties" : [ ]
-  }, {
-    "id" : 2,
-    "type" : "stream-exec-calc_1",
-    "projection" : [ {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 0,
-      "type" : "VARCHAR(2147483647) NOT NULL"
-    }, {
-      "kind" : "CALL",
-      "catalogName" : "`default_catalog`.`default_database`.`ndFunc`",
-      "operands" : [ {
-        "kind" : "INPUT_REF",
-        "inputIndex" : 1,
-        "type" : "VARCHAR(2147483647)"
-      } ],
-      "type" : "VARCHAR(2147483647)"
-    }, {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 2,
-      "type" : "VARCHAR(2147483647)"
-    }, {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 3,
-      "type" : "DECIMAL(18, 2)"
-    } ],
-    "condition" : null,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`user_id` VARCHAR(2147483647) NOT NULL, `EXPR$1` 
VARCHAR(2147483647), `email` VARCHAR(2147483647), `balance` DECIMAL(18, 2)>",
-    "description" : "Calc(select=[user_id, ndFunc(user_name) AS EXPR$1, email, 
balance])"
-  }, {
-    "id" : 3,
-    "type" : "stream-exec-sink_1",
-    "configuration" : {
-      "table.exec.sink.keyed-shuffle" : "AUTO",
-      "table.exec.sink.not-null-enforcer" : "ERROR",
-      "table.exec.sink.rowtime-inserter" : "ENABLED",
-      "table.exec.sink.type-length-enforcer" : "IGNORE",
-      "table.exec.sink.upsert-materialize" : "AUTO"
-    },
-    "dynamicTableSink" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`sink`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "user_id",
-              "dataType" : "VARCHAR(2147483647)"
-            }, {
-              "name" : "user_name",
-              "dataType" : "VARCHAR(2147483647)"
-            }, {
-              "name" : "email",
-              "dataType" : "VARCHAR(2147483647) NOT NULL"
-            }, {
-              "name" : "balance",
-              "dataType" : "DECIMAL(18, 2)"
-            } ],
-            "watermarkSpecs" : [ ],
-            "primaryKey" : {
-              "name" : "PK_email",
-              "type" : "PRIMARY_KEY",
-              "columns" : [ "email" ]
-            }
-          },
-          "partitionKeys" : [ ],
-          "options" : {
-            "connector" : "values",
-            "sink-insert-only" : "false"
-          }
-        }
-      }
-    },
-    "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", 
"DELETE" ],
-    "requireUpsertMaterialize" : true,
-    "state" : [ {
-      "index" : 0,
-      "ttl" : "0 ms",
-      "name" : "sinkMaterializeState"
-    } ],
-    "inputUpsertKey" : [ 0 ],
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`user_id` VARCHAR(2147483647) NOT NULL, `EXPR$1` 
VARCHAR(2147483647), `email` VARCHAR(2147483647), `balance` DECIMAL(18, 2)>",
-    "description" : "Sink(table=[default_catalog.default_database.sink], 
fields=[user_id, EXPR$1, email, balance], upsertMaterialize=[true])"
-  } ],
-  "edges" : [ {
-    "source" : 1,
-    "target" : 2,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 2,
-    "target" : 3,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  } ]
-}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkJsonPlanTest_jsonplan/testOverwrite.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkJsonPlanTest_jsonplan/testOverwrite.out
deleted file mode 100644
index 18fc2687ff3..00000000000
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkJsonPlanTest_jsonplan/testOverwrite.out
+++ /dev/null
@@ -1,93 +0,0 @@
-{
-  "flinkVersion" : "",
-  "nodes" : [ {
-    "id" : 1,
-    "type" : "stream-exec-table-source-scan_1",
-    "scanTableSource" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MyTable`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "a",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "b",
-              "dataType" : "INT"
-            }, {
-              "name" : "c",
-              "dataType" : "VARCHAR(2147483647)"
-            } ],
-            "watermarkSpecs" : [ ]
-          },
-          "partitionKeys" : [ ],
-          "options" : {
-            "bounded" : "false",
-            "connector" : "values"
-          }
-        }
-      }
-    },
-    "outputType" : "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>",
-    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c])",
-    "inputProperties" : [ ]
-  }, {
-    "id" : 2,
-    "type" : "stream-exec-sink_1",
-    "configuration" : {
-      "table.exec.sink.keyed-shuffle" : "AUTO",
-      "table.exec.sink.not-null-enforcer" : "ERROR",
-      "table.exec.sink.rowtime-inserter" : "ENABLED",
-      "table.exec.sink.type-length-enforcer" : "IGNORE",
-      "table.exec.sink.upsert-materialize" : "AUTO"
-    },
-    "dynamicTableSink" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MySink`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "a",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "b",
-              "dataType" : "INT"
-            }, {
-              "name" : "c",
-              "dataType" : "VARCHAR(2147483647)"
-            } ],
-            "watermarkSpecs" : [ ]
-          },
-          "partitionKeys" : [ ],
-          "options" : {
-            "connector" : "filesystem",
-            "format" : "testcsv",
-            "path" : "/tmp"
-          }
-        }
-      },
-      "abilities" : [ {
-        "type" : "Overwrite",
-        "overwrite" : true
-      } ]
-    },
-    "inputChangelogMode" : [ "INSERT" ],
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>",
-    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[a, b, c])"
-  } ],
-  "edges" : [ {
-    "source" : 1,
-    "target" : 2,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  } ]
-}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkJsonPlanTest_jsonplan/testPartialInsert.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkJsonPlanTest_jsonplan/testPartialInsert.out
deleted file mode 100644
index db7fba66429..00000000000
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkJsonPlanTest_jsonplan/testPartialInsert.out
+++ /dev/null
@@ -1,150 +0,0 @@
-{
-  "flinkVersion" : "",
-  "nodes" : [ {
-    "id" : 1,
-    "type" : "stream-exec-table-source-scan_1",
-    "scanTableSource" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MyTable`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "a",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "b",
-              "dataType" : "INT"
-            }, {
-              "name" : "c",
-              "dataType" : "VARCHAR(2147483647)"
-            } ],
-            "watermarkSpecs" : [ ]
-          },
-          "partitionKeys" : [ ],
-          "options" : {
-            "bounded" : "false",
-            "connector" : "values"
-          }
-        }
-      }
-    },
-    "outputType" : "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>",
-    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c])",
-    "inputProperties" : [ ]
-  }, {
-    "id" : 2,
-    "type" : "stream-exec-calc_1",
-    "projection" : [ {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 0,
-      "type" : "BIGINT"
-    }, {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 1,
-      "type" : "INT"
-    }, {
-      "kind" : "LITERAL",
-      "value" : "A",
-      "type" : "VARCHAR(2147483647) NOT NULL"
-    }, {
-      "kind" : "LITERAL",
-      "value" : null,
-      "type" : "INT"
-    }, {
-      "kind" : "LITERAL",
-      "value" : null,
-      "type" : "DOUBLE"
-    }, {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 2,
-      "type" : "VARCHAR(2147483647)"
-    } ],
-    "condition" : null,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`a` BIGINT, `b` INT, `EXPR$2` VARCHAR(2147483647) NOT 
NULL, `EXPR$3` INT, `EXPR$4` DOUBLE, `c` VARCHAR(2147483647)>",
-    "description" : "Calc(select=[a, b, 'A' AS EXPR$2, null:INTEGER AS EXPR$3, 
null:DOUBLE AS EXPR$4, c])"
-  }, {
-    "id" : 3,
-    "type" : "stream-exec-sink_1",
-    "configuration" : {
-      "table.exec.sink.keyed-shuffle" : "AUTO",
-      "table.exec.sink.not-null-enforcer" : "ERROR",
-      "table.exec.sink.rowtime-inserter" : "ENABLED",
-      "table.exec.sink.type-length-enforcer" : "IGNORE",
-      "table.exec.sink.upsert-materialize" : "AUTO"
-    },
-    "dynamicTableSink" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MySink`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "a",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "b",
-              "dataType" : "INT"
-            }, {
-              "name" : "c",
-              "dataType" : "VARCHAR(2147483647)"
-            }, {
-              "name" : "d",
-              "dataType" : "INT"
-            }, {
-              "name" : "e",
-              "dataType" : "DOUBLE"
-            }, {
-              "name" : "f",
-              "dataType" : "VARCHAR(2147483647)"
-            } ],
-            "watermarkSpecs" : [ ]
-          },
-          "partitionKeys" : [ "c" ],
-          "options" : {
-            "connector" : "filesystem",
-            "format" : "testcsv",
-            "path" : "/tmp"
-          }
-        }
-      },
-      "abilities" : [ {
-        "type" : "Partitioning",
-        "partition" : {
-          "c" : "A"
-        }
-      } ],
-      "targetColumns" : [ [ 5 ], [ 0 ], [ 1 ] ]
-    },
-    "inputChangelogMode" : [ "INSERT" ],
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`a` BIGINT, `b` INT, `EXPR$2` VARCHAR(2147483647) NOT 
NULL, `EXPR$3` INT, `EXPR$4` DOUBLE, `c` VARCHAR(2147483647)>",
-    "description" : "Sink(table=[default_catalog.default_database.MySink], 
targetColumns=[[5],[0],[1]], fields=[a, b, EXPR$2, EXPR$3, EXPR$4, c])"
-  } ],
-  "edges" : [ {
-    "source" : 1,
-    "target" : 2,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 2,
-    "target" : 3,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  } ]
-}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkJsonPlanTest_jsonplan/testPartitioning.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkJsonPlanTest_jsonplan/testPartitioning.out
deleted file mode 100644
index d4c1415e67a..00000000000
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkJsonPlanTest_jsonplan/testPartitioning.out
+++ /dev/null
@@ -1,137 +0,0 @@
-{
-  "flinkVersion" : "",
-  "nodes" : [ {
-    "id" : 1,
-    "type" : "stream-exec-table-source-scan_1",
-    "scanTableSource" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MyTable`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "a",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "b",
-              "dataType" : "INT"
-            }, {
-              "name" : "c",
-              "dataType" : "VARCHAR(2147483647)"
-            } ],
-            "watermarkSpecs" : [ ]
-          },
-          "partitionKeys" : [ ],
-          "options" : {
-            "bounded" : "false",
-            "connector" : "values"
-          }
-        }
-      },
-      "abilities" : [ {
-        "type" : "ProjectPushDown",
-        "projectedFields" : [ [ 0 ], [ 1 ] ],
-        "producedType" : "ROW<`a` BIGINT, `b` INT> NOT NULL"
-      }, {
-        "type" : "ReadingMetadata",
-        "metadataKeys" : [ ],
-        "producedType" : "ROW<`a` BIGINT, `b` INT> NOT NULL"
-      } ]
-    },
-    "outputType" : "ROW<`a` BIGINT, `b` INT>",
-    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, MyTable, project=[a, b], metadata=[]]], fields=[a, b])",
-    "inputProperties" : [ ]
-  }, {
-    "id" : 2,
-    "type" : "stream-exec-calc_1",
-    "projection" : [ {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 0,
-      "type" : "BIGINT"
-    }, {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 1,
-      "type" : "INT"
-    }, {
-      "kind" : "LITERAL",
-      "value" : "A",
-      "type" : "VARCHAR(2147483647) NOT NULL"
-    } ],
-    "condition" : null,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`a` BIGINT, `b` INT, `EXPR$2` VARCHAR(2147483647) NOT 
NULL>",
-    "description" : "Calc(select=[a, b, 'A' AS EXPR$2])"
-  }, {
-    "id" : 3,
-    "type" : "stream-exec-sink_1",
-    "configuration" : {
-      "table.exec.sink.keyed-shuffle" : "AUTO",
-      "table.exec.sink.not-null-enforcer" : "ERROR",
-      "table.exec.sink.rowtime-inserter" : "ENABLED",
-      "table.exec.sink.type-length-enforcer" : "IGNORE",
-      "table.exec.sink.upsert-materialize" : "AUTO"
-    },
-    "dynamicTableSink" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MySink`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "a",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "b",
-              "dataType" : "INT"
-            }, {
-              "name" : "c",
-              "dataType" : "VARCHAR(2147483647)"
-            } ],
-            "watermarkSpecs" : [ ]
-          },
-          "partitionKeys" : [ "c" ],
-          "options" : {
-            "connector" : "filesystem",
-            "format" : "testcsv",
-            "path" : "/tmp"
-          }
-        }
-      },
-      "abilities" : [ {
-        "type" : "Partitioning",
-        "partition" : {
-          "c" : "A"
-        }
-      } ]
-    },
-    "inputChangelogMode" : [ "INSERT" ],
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`a` BIGINT, `b` INT, `EXPR$2` VARCHAR(2147483647) NOT 
NULL>",
-    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[a, b, EXPR$2])"
-  } ],
-  "edges" : [ {
-    "source" : 1,
-    "target" : 2,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 2,
-    "target" : 3,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  } ]
-}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkJsonPlanTest_jsonplan/testWritingMetadata.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkJsonPlanTest_jsonplan/testWritingMetadata.out
deleted file mode 100644
index 3508d24f689..00000000000
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkJsonPlanTest_jsonplan/testWritingMetadata.out
+++ /dev/null
@@ -1,95 +0,0 @@
-{
-  "flinkVersion" : "",
-  "nodes" : [ {
-    "id" : 1,
-    "type" : "stream-exec-table-source-scan_1",
-    "scanTableSource" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MyTable`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "a",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "b",
-              "dataType" : "INT"
-            }, {
-              "name" : "c",
-              "dataType" : "VARCHAR(2147483647)"
-            } ],
-            "watermarkSpecs" : [ ]
-          },
-          "partitionKeys" : [ ],
-          "options" : {
-            "bounded" : "false",
-            "connector" : "values"
-          }
-        }
-      }
-    },
-    "outputType" : "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>",
-    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c])",
-    "inputProperties" : [ ]
-  }, {
-    "id" : 2,
-    "type" : "stream-exec-sink_1",
-    "configuration" : {
-      "table.exec.sink.keyed-shuffle" : "AUTO",
-      "table.exec.sink.not-null-enforcer" : "ERROR",
-      "table.exec.sink.rowtime-inserter" : "ENABLED",
-      "table.exec.sink.type-length-enforcer" : "IGNORE",
-      "table.exec.sink.upsert-materialize" : "AUTO"
-    },
-    "dynamicTableSink" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MySink`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "a",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "b",
-              "dataType" : "INT"
-            }, {
-              "name" : "m",
-              "kind" : "METADATA",
-              "dataType" : "VARCHAR(2147483647)",
-              "isVirtual" : false
-            } ],
-            "watermarkSpecs" : [ ]
-          },
-          "partitionKeys" : [ ],
-          "options" : {
-            "connector" : "values",
-            "writable-metadata" : "m:STRING"
-          }
-        }
-      },
-      "abilities" : [ {
-        "type" : "WritingMetadata",
-        "metadataKeys" : [ "m" ],
-        "consumedType" : "ROW<`a` BIGINT, `b` INT, `m` VARCHAR(2147483647)> 
NOT NULL"
-      } ]
-    },
-    "inputChangelogMode" : [ "INSERT" ],
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>",
-    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[a, b, c])"
-  } ],
-  "edges" : [ {
-    "source" : 1,
-    "target" : 2,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  } ]
-}

Reply via email to