This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit bfcf13b86aa25a2b787e0de812fd0f881fb84975 Author: Dawid Wysakowicz <[email protected]> AuthorDate: Tue Dec 2 15:26:19 2025 +0100 [FLINK-38765] Fix persisted metadata handling in sink --- .../table/tests/jsonplan/testGetJsonPlan.out | 2 +- .../flink/table/test/program/TableTestProgram.java | 15 +++ .../table/planner/connectors/DynamicSinkUtils.java | 2 +- .../plan/nodes/exec/batch/BatchExecSink.java | 26 ++-- .../plan/nodes/exec/common/CommonExecSink.java | 23 +++- .../plan/nodes/exec/stream/StreamExecSink.java | 29 ++++ .../plan/nodes/exec/TransformationsTest.java | 6 +- .../nodes/exec/common/TableSinkTestPrograms.java | 75 +++++++++++ .../nodes/exec/stream/TableSinkRestoreTest.java | 13 +- .../plan/nodes/exec/testutils/RestoreTestBase.java | 25 +++- .../test/resources/jsonplan/testGetJsonPlan.out | 2 +- .../jsonplan/testInvalidTypeJsonPlan.json | 2 +- .../resources/jsonplan/testSinkTableWithHints.out | 2 +- .../jsonplan/testSourceTableWithHints.out | 2 +- .../testPythonCalc.out | 2 +- .../testPythonFunctionInWhereClause.out | 2 +- .../testJoinWithFilter.out | 2 +- .../testPythonTableFunction.out | 2 +- .../tesPythonAggCallsWithGroupBy.out | 2 +- .../testEventTimeHopWindow.out | 2 +- .../testEventTimeSessionWindow.out | 2 +- .../testEventTimeTumbleWindow.out | 2 +- .../testProcTimeHopWindow.out | 2 +- .../testProcTimeSessionWindow.out | 2 +- .../testProcTimeTumbleWindow.out | 2 +- .../testProcTimeBoundedNonPartitionedRangeOver.out | 2 +- .../testProcTimeBoundedPartitionedRangeOver.out | 2 +- ...undedPartitionedRowsOverWithBuiltinProctime.out | 2 +- .../testProcTimeUnboundedPartitionedRangeOver.out | 2 +- .../testRowTimeBoundedPartitionedRowsOver.out | 2 +- .../testJsonPlanWithTableHints.out | 2 +- ...k-upsert-materializer-writable-metadata-v1.json | 149 +++++++++++++++++++++ .../savepoint/_metadata | Bin 0 -> 8665 bytes ...k-upsert-materializer-writable-metadata-v1.json | 149 +++++++++++++++++++++ .../savepoint/_metadata | Bin 0 -> 9499 bytes ...sink-upsert-materializer-writable-metadata.json | 149 +++++++++++++++++++++ .../savepoint/_metadata | Bin 0 -> 9502 bytes .../sink-bucketing_hash-with-keys-with-count.json | 51 +++---- .../savepoint/_metadata | Bin 0 -> 8439 bytes ...nk-bucketing_range_with_keys_without_count.json | 50 +++---- .../savepoint/_metadata | Bin 0 -> 8437 bytes .../plan/sink-bucketing_with-count.json} | 44 +++--- .../sink-bucketing_with-count/savepoint/_metadata | Bin 0 -> 8435 bytes .../plan/sink-bucketing_with-keys-and-count.json | 51 +++---- .../savepoint/_metadata | Bin 0 -> 8437 bytes .../plan/sink-ndf-primary-key.json} | 94 ++++++------- .../sink-ndf-primary-key/savepoint/_metadata | Bin 0 -> 5212 bytes .../sink-overwrite/plan/sink-overwrite.json | 46 +++---- .../sink-overwrite/savepoint/_metadata | Bin 0 -> 8441 bytes .../plan/sink-partial-insert.json} | 111 ++++++++------- .../sink-partial-insert/savepoint/_metadata | Bin 0 -> 11115 bytes .../sink-partition/plan/sink-partition.json} | 106 +++++++-------- .../sink-partition/savepoint/_metadata | Bin 0 -> 9561 bytes .../sink-upsert/plan/sink-upsert.json | 57 ++++---- .../sink-upsert/savepoint/_metadata | Bin 0 -> 5102 bytes .../plan/sink-writing-metadata.json | 51 +++---- .../sink-writing-metadata/savepoint/_metadata | Bin 0 -> 8393 bytes 57 files changed, 992 insertions(+), 374 deletions(-) diff --git a/flink-python/pyflink/table/tests/jsonplan/testGetJsonPlan.out b/flink-python/pyflink/table/tests/jsonplan/testGetJsonPlan.out index 880fb2cf11d..171e8fefad6 100644 --- a/flink-python/pyflink/table/tests/jsonplan/testGetJsonPlan.out +++ b/flink-python/pyflink/table/tests/jsonplan/testGetJsonPlan.out @@ -30,7 +30,7 @@ "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c])" }, { "id" : 0, - "type" : "stream-exec-sink_1", + "type" : "stream-exec-sink_2", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", "table.exec.sink.not-null-enforcer" : "ERROR", diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestProgram.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestProgram.java index ea3925ab06a..37dbfbb5db9 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestProgram.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestProgram.java @@ -114,6 +114,21 @@ public class TableTestProgram { return id; } + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + + TableTestProgram that = (TableTestProgram) o; + return id.equals(that.id); + } + + @Override + public int hashCode() { + return id.hashCode(); + } + /** * Entrypoint for a {@link TableTestProgram} that forces an identifier and description of the * test program. diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java index 411470f5dbc..3733e7a24df 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java @@ -1326,7 +1326,7 @@ public final class DynamicSinkUtils { * * <p>The format looks as follows: {@code PHYSICAL COLUMNS + PERSISTED METADATA COLUMNS} */ - private static RowType createConsumedType(ResolvedSchema schema, DynamicTableSink sink) { + public static RowType createConsumedType(ResolvedSchema schema, DynamicTableSink sink) { final Map<String, DataType> metadataMap = extractMetadataMap(sink); final Stream<RowField> physicalFields = diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java index c9a6e8c3c63..8a1dd04bcfe 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java @@ -124,7 +124,7 @@ public class BatchExecSink extends CommonExecSink implements BatchExecNode<Objec } @Override - protected RowType getPhysicalRowType(ResolvedSchema schema) { + protected final RowType getPersistedRowType(ResolvedSchema schema, DynamicTableSink tableSink) { // row-level modification may only write partial columns, // so we try to prune the RowType to get the real RowType containing // the physical columns to be written @@ -132,16 +132,20 @@ public class BatchExecSink extends CommonExecSink implements BatchExecNode<Objec for (SinkAbilitySpec sinkAbilitySpec : tableSinkSpec.getSinkAbilities()) { if (sinkAbilitySpec instanceof RowLevelUpdateSpec) { RowLevelUpdateSpec rowLevelUpdateSpec = (RowLevelUpdateSpec) sinkAbilitySpec; - return getPhysicalRowType( - schema, rowLevelUpdateSpec.getRequiredPhysicalColumnIndices()); + return getPersistedRowType( + schema, + rowLevelUpdateSpec.getRequiredPhysicalColumnIndices(), + tableSink); } else if (sinkAbilitySpec instanceof RowLevelDeleteSpec) { RowLevelDeleteSpec rowLevelDeleteSpec = (RowLevelDeleteSpec) sinkAbilitySpec; - return getPhysicalRowType( - schema, rowLevelDeleteSpec.getRequiredPhysicalColumnIndices()); + return getPersistedRowType( + schema, + rowLevelDeleteSpec.getRequiredPhysicalColumnIndices(), + tableSink); } } } - return (RowType) schema.toPhysicalRowDataType().getLogicalType(); + return super.getPersistedRowType(schema, tableSink); } @Override @@ -183,12 +187,18 @@ public class BatchExecSink extends CommonExecSink implements BatchExecNode<Objec } /** Get the physical row type with given column indices. */ - private RowType getPhysicalRowType(ResolvedSchema schema, int[] columnIndices) { + private RowType getPersistedRowType( + ResolvedSchema schema, int[] columnIndices, DynamicTableSink sink) { List<Column> columns = schema.getColumns(); List<Column> requireColumns = new ArrayList<>(); for (int columnIndex : columnIndices) { requireColumns.add(columns.get(columnIndex)); } - return (RowType) ResolvedSchema.of(requireColumns).toPhysicalRowDataType().getLogicalType(); + return super.getPersistedRowType(ResolvedSchema.of(requireColumns), sink); + } + + @Override + protected final boolean legacyPhysicalTypeEnabled() { + return false; } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java index 86866a98559..ad333589232 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java @@ -52,6 +52,7 @@ import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete; import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate; import org.apache.flink.table.connector.sink.legacy.SinkFunctionProvider; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.planner.connectors.DynamicSinkUtils; import org.apache.flink.table.planner.lineage.TableLineageUtils; import org.apache.flink.table.planner.lineage.TableSinkLineageVertex; import org.apache.flink.table.planner.lineage.TableSinkLineageVertexImpl; @@ -151,8 +152,8 @@ public abstract class CommonExecSink extends ExecNodeBase<Object> tableSink.getSinkRuntimeProvider( new SinkRuntimeProviderContext( isBounded, tableSinkSpec.getTargetColumns())); - final RowType physicalRowType = getPhysicalRowType(schema); - final int[] primaryKeys = getPrimaryKeyIndices(physicalRowType, schema); + final RowType persistedRowType = getPersistedRowType(schema, tableSink); + final int[] primaryKeys = getPrimaryKeyIndices(persistedRowType, schema); final int sinkParallelism = deriveSinkParallelism(inputTransform, runtimeProvider); sinkParallelismConfigured = isParallelismConfigured(runtimeProvider); final int inputParallelism = inputTransform.getParallelism(); @@ -190,7 +191,7 @@ public abstract class CommonExecSink extends ExecNodeBase<Object> final boolean needMaterialization = !inputInsertOnly && upsertMaterialize; Transformation<RowData> sinkTransform = - applyConstraintValidations(inputTransform, config, physicalRowType); + applyConstraintValidations(inputTransform, config, persistedRowType); if (hasPk) { sinkTransform = @@ -212,7 +213,7 @@ public abstract class CommonExecSink extends ExecNodeBase<Object> sinkParallelism, config, classLoader, - physicalRowType, + persistedRowType, inputUpsertKey); } @@ -545,8 +546,16 @@ public abstract class CommonExecSink extends ExecNodeBase<Object> .orElse(new int[0]); } - protected RowType getPhysicalRowType(ResolvedSchema schema) { - return (RowType) schema.toPhysicalRowDataType().getLogicalType(); + /** + * The method recreates the type of the incoming record from the sink's schema. It puts the + * physical columns first, followed by persisted metadata columns. + */ + protected RowType getPersistedRowType(ResolvedSchema schema, DynamicTableSink sink) { + if (legacyPhysicalTypeEnabled()) { + return (RowType) schema.toPhysicalRowDataType().getLogicalType(); + } else { + return DynamicSinkUtils.createConsumedType(schema, sink); + } } /** @@ -574,4 +583,6 @@ public abstract class CommonExecSink extends ExecNodeBase<Object> } return Optional.empty(); } + + protected abstract boolean legacyPhysicalTypeEnabled(); } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java index e1052a96533..c6947403d31 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java @@ -104,6 +104,30 @@ import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXE }, minPlanVersion = FlinkVersion.v1_15, minStateVersion = FlinkVersion.v1_15) +// Version 2: Fixed the data type used for creating constraint enforcer and sink upsert +// materializer. Since this version the sink works correctly with persisted metadata columns. +// We introduced a new version, because statements that were never rolling back to a value from +// state could run succesfully. We allow those jobs to be upgraded. Without a new versions such jobs +// would fail on restore, because the state serializer would differ +@ExecNodeMetadata( + name = "stream-exec-sink", + version = 2, + consumedOptions = { + "table.exec.sink.not-null-enforcer", + "table.exec.sink.type-length-enforcer", + "table.exec.sink.upsert-materialize", + "table.exec.sink.keyed-shuffle", + "table.exec.sink.rowtime-inserter" + }, + producedTransformations = { + CommonExecSink.CONSTRAINT_VALIDATOR_TRANSFORMATION, + CommonExecSink.PARTITIONER_TRANSFORMATION, + CommonExecSink.UPSERT_MATERIALIZE_TRANSFORMATION, + CommonExecSink.TIMESTAMP_INSERTER_TRANSFORMATION, + CommonExecSink.SINK_TRANSFORMATION + }, + minPlanVersion = FlinkVersion.v2_3, + minStateVersion = FlinkVersion.v2_3) public class StreamExecSink extends CommonExecSink implements StreamExecNode<Object> { private static final Logger LOG = LoggerFactory.getLogger(StreamExecSink.class); @@ -391,4 +415,9 @@ public class StreamExecSink extends CommonExecSink implements StreamExecNode<Obj throw new IllegalArgumentException("Unsupported strategy: " + strategy); } } + + @Override + protected final boolean legacyPhysicalTypeEnabled() { + return getVersion() == 1; + } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java index 007bfef4734..49609c1d439 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java @@ -188,8 +188,8 @@ class TransformationsTest { config.set(TABLE_EXEC_UID_FORMAT, "<id>_<type>_<version>_<transformation>"), json -> {}, env -> planFromCurrentFlinkValues(env).asJsonString(), - "\\d+_stream-exec-sink_1_sink", - "\\d+_stream-exec-sink_1_constraint-validator", + "\\d+_stream-exec-sink_2_sink", + "\\d+_stream-exec-sink_2_constraint-validator", "\\d+_stream-exec-values_1_values"); } @@ -200,7 +200,7 @@ class TransformationsTest { json -> JsonTestUtils.setExecNodeConfig( json, - "stream-exec-sink_1", + "stream-exec-sink_2", TABLE_EXEC_UID_FORMAT.key(), "my_custom_<transformation>_<id>"), env -> planFromCurrentFlinkValues(env).asJsonString(), diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/TableSinkTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/TableSinkTestPrograms.java index 7939a672415..b1248a2bb1b 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/TableSinkTestPrograms.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/TableSinkTestPrograms.java @@ -18,6 +18,7 @@ package org.apache.flink.table.planner.plan.nodes.exec.common; +import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.catalog.TableDistribution; import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions; import org.apache.flink.table.test.program.SinkTestStep; @@ -28,6 +29,9 @@ import org.apache.flink.types.RowKind; import java.util.Arrays; +import static org.apache.flink.configuration.StateBackendOptions.STATE_BACKEND; +import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE_STRATEGY; + /** {@link TableTestProgram} definitions for testing {@link StreamExecSink}. */ public class TableSinkTestPrograms { @@ -222,4 +226,75 @@ public class TableSinkTestPrograms { .build()) .runSql("INSERT INTO sink_t SELECT * FROM source_t") .build(); + + // The queries could run as long as the value was never rolled back to one from state, which is + // possible. We validate those can restore and still run + public static final TableTestProgram INSERT_RETRACT_WITH_WRITABLE_METADATA_FOR_LEGACY_TYPE = + getInsertRetractWithWritableMetadata(true); + + public static final TableTestProgram INSERT_RETRACT_WITH_WRITABLE_METADATA = + getInsertRetractWithWritableMetadata(false); + + private static TableTestProgram getInsertRetractWithWritableMetadata( + boolean forLegacyPhysicalType) { + final Row producedAfterRestore; + final String consumedAfterRestore; + if (forLegacyPhysicalType) { + producedAfterRestore = Row.ofKind(RowKind.INSERT, "Bob", 7); + consumedAfterRestore = "+U[BOB, 7, Bob, 7]"; + } else { + // retract the last record, which should roll back to + // the previous state + producedAfterRestore = Row.ofKind(RowKind.DELETE, "Bob", 6); + consumedAfterRestore = "+U[BOB, 5, Bob, 5]"; + } + return TableTestProgram.of( + "insert-into-upsert-with-sink-upsert-materializer-writable-metadata" + + (forLegacyPhysicalType ? "-v1" : ""), + "The query requires a sink upsert materializer and the sink" + + " uses writable metadata columns. The scenario showcases a" + + " bug where a wrong type was used in sinks which did not" + + " consider metadata columns. There needs to be multiple" + + " requirements for the bug to show up. 1. We need to use " + + " rocksdb, so that we use a serializer when putting records" + + " into state in SinkUpsertMaterializer. 2. We need to retract" + + " to a previous value taken from the state, otherwise we" + + " forward the incoming record. 3. There need to be persisted" + + " metadata columns.") + .setupConfig( + TABLE_EXEC_SINK_UPSERT_MATERIALIZE_STRATEGY, + ExecutionConfigOptions.SinkUpsertMaterializeStrategy.LEGACY) + .setupConfig(STATE_BACKEND, "rocksdb") + .setupTableSource( + SourceTestStep.newBuilder("source_t") + .addSchema("name STRING", "score INT") + .addOption("changelog-mode", "I,UB,UA,D") + .producedBeforeRestore( + Row.ofKind(RowKind.INSERT, "Bob", 5), + Row.ofKind(RowKind.INSERT, "Bob", 6)) + .producedAfterRestore(producedAfterRestore) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema( + "name STRING PRIMARY KEY NOT ENFORCED", + "scoreMetadata BIGINT METADATA", + "score BIGINT", + "nameMetadata STRING METADATA") + .addOption("sink-changelog-mode-enforced", "I,UA,D") + // The test sink lists metadata columns + // (SupportsWritingMetadata#listWritableMetadata) in + // alphabetical order, this is also the order in the record of + // a sink, irrespective of the table schema + .addOption( + "writable-metadata", + "nameMetadata:STRING,scoreMetadata:BIGINT") + // physical columns first, then metadata columns, sorted + // alphabetically by columns name (test sink property) + .consumedBeforeRestore("+I[BOB, 5, Bob, 5]", "+U[BOB, 6, Bob, 6]") + .consumedAfterRestore(consumedAfterRestore) + .build()) + .runSql("INSERT INTO sink_t SELECT UPPER(name), score, score, name FROM source_t") + .build(); + } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkRestoreTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkRestoreTest.java index 1ea01803717..2e7197525f7 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkRestoreTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkRestoreTest.java @@ -24,6 +24,7 @@ import org.apache.flink.table.test.program.TableTestProgram; import java.util.Arrays; import java.util.List; +import java.util.Map; /** Restore tests for {@link StreamExecSink}. */ public class TableSinkRestoreTest extends RestoreTestBase { @@ -44,6 +45,16 @@ public class TableSinkRestoreTest extends RestoreTestBase { TableSinkTestPrograms.SINK_WRITING_METADATA, TableSinkTestPrograms.SINK_NDF_PRIMARY_KEY, TableSinkTestPrograms.SINK_PARTIAL_INSERT, - TableSinkTestPrograms.SINK_UPSERT); + TableSinkTestPrograms.SINK_UPSERT, + TableSinkTestPrograms.INSERT_RETRACT_WITH_WRITABLE_METADATA_FOR_LEGACY_TYPE, + TableSinkTestPrograms.INSERT_RETRACT_WITH_WRITABLE_METADATA); + } + + @Override + protected Map<Integer, List<TableTestProgram>> programsToIgnore() { + return Map.of( + // disable the writable metadata test for sink node with version 1. it fails after + // the restore + 1, List.of(TableSinkTestPrograms.INSERT_RETRACT_WITH_WRITABLE_METADATA)); } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java index 13f91ef9a52..f6ec45fb5a0 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java @@ -205,6 +205,18 @@ public abstract class RestoreTestBase implements TableTestProgramRunner { savepointPath)))); } + // ==================================================================================== + // Extension points for adjusting test combinations + // ==================================================================================== + + /** + * Can be overridden with a collection of programs that should be ignored for a particular + * version of the node under test. + */ + protected Map<Integer, List<TableTestProgram>> programsToIgnore() { + return Collections.emptyMap(); + } + /** * The method can be overridden in a subclass to test multiple savepoint files for a given * program and a node in a particular version. This can be useful e.g. to test a node against @@ -212,9 +224,16 @@ public abstract class RestoreTestBase implements TableTestProgramRunner { */ protected Stream<String> getSavepointPaths( TableTestProgram program, ExecNodeMetadata metadata) { - return Stream.of(getSavepointPath(program, metadata, null)); + if (programsToIgnore() + .getOrDefault(metadata.version(), Collections.emptyList()) + .contains(program)) { + return Stream.empty(); + } else { + return Stream.of(getSavepointPath(program, metadata, null)); + } } + /** Can be used in {@link #getSavepointPaths(TableTestProgram, ExecNodeMetadata)}. */ protected final String getSavepointPath( TableTestProgram program, ExecNodeMetadata metadata, @@ -229,6 +248,10 @@ public abstract class RestoreTestBase implements TableTestProgramRunner { return builder.toString(); } + // ==================================================================================== + // End of extension points + // ==================================================================================== + private void registerSinkObserver( final List<CompletableFuture<?>> futures, final SinkTestStep sinkTestStep, diff --git a/flink-table/flink-table-planner/src/test/resources/jsonplan/testGetJsonPlan.out b/flink-table/flink-table-planner/src/test/resources/jsonplan/testGetJsonPlan.out index 19cd2a29e6c..176a1eb36b4 100644 --- a/flink-table/flink-table-planner/src/test/resources/jsonplan/testGetJsonPlan.out +++ b/flink-table/flink-table-planner/src/test/resources/jsonplan/testGetJsonPlan.out @@ -30,7 +30,7 @@ "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c])" }, { "id" : 2, - "type" : "stream-exec-sink_1", + "type" : "stream-exec-sink_2", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", "table.exec.sink.not-null-enforcer" : "ERROR", diff --git a/flink-table/flink-table-planner/src/test/resources/jsonplan/testInvalidTypeJsonPlan.json b/flink-table/flink-table-planner/src/test/resources/jsonplan/testInvalidTypeJsonPlan.json index 957899f9eb1..f04ebbd701d 100644 --- a/flink-table/flink-table-planner/src/test/resources/jsonplan/testInvalidTypeJsonPlan.json +++ b/flink-table/flink-table-planner/src/test/resources/jsonplan/testInvalidTypeJsonPlan.json @@ -129,7 +129,7 @@ }, { "id": 5, - "type": "stream-exec-sink_1", + "type": "stream-exec-sink_2", "configuration": { "table.exec.sink.keyed-shuffle": "AUTO", "table.exec.sink.not-null-enforcer": "ERROR", diff --git a/flink-table/flink-table-planner/src/test/resources/jsonplan/testSinkTableWithHints.out b/flink-table/flink-table-planner/src/test/resources/jsonplan/testSinkTableWithHints.out index 5fa8f04dbda..6018fa8f081 100644 --- a/flink-table/flink-table-planner/src/test/resources/jsonplan/testSinkTableWithHints.out +++ b/flink-table/flink-table-planner/src/test/resources/jsonplan/testSinkTableWithHints.out @@ -30,7 +30,7 @@ "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c])" }, { "id" : 0, - "type" : "stream-exec-sink_1", + "type" : "stream-exec-sink_2", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", "table.exec.sink.not-null-enforcer" : "ERROR", diff --git a/flink-table/flink-table-planner/src/test/resources/jsonplan/testSourceTableWithHints.out b/flink-table/flink-table-planner/src/test/resources/jsonplan/testSourceTableWithHints.out index 4080cb14712..4b1ddd9ad3e 100644 --- a/flink-table/flink-table-planner/src/test/resources/jsonplan/testSourceTableWithHints.out +++ b/flink-table/flink-table-planner/src/test/resources/jsonplan/testSourceTableWithHints.out @@ -31,7 +31,7 @@ "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c], hints=[[[OPTIONS options:{scan.parallelism=2, bounded=true}]]])" }, { "id" : 0, - "type" : "stream-exec-sink_1", + "type" : "stream-exec-sink_2", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", "table.exec.sink.not-null-enforcer" : "ERROR", diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest_jsonplan/testPythonCalc.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest_jsonplan/testPythonCalc.out index 9ebcad236f8..989225dbed2 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest_jsonplan/testPythonCalc.out +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest_jsonplan/testPythonCalc.out @@ -72,7 +72,7 @@ "description" : "PythonCalc(select=[a, pyFunc(b, b) AS EXPR$1])" }, { "id" : 3, - "type" : "stream-exec-sink_1", + "type" : "stream-exec-sink_2", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", "table.exec.sink.not-null-enforcer" : "ERROR", diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest_jsonplan/testPythonFunctionInWhereClause.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest_jsonplan/testPythonFunctionInWhereClause.out index c3169f69f9d..11a1628fbca 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest_jsonplan/testPythonFunctionInWhereClause.out +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest_jsonplan/testPythonFunctionInWhereClause.out @@ -141,7 +141,7 @@ "description" : "Calc(select=[a, b], where=[f0])" }, { "id" : 5, - "type" : "stream-exec-sink_1", + "type" : "stream-exec-sink_2", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", "table.exec.sink.not-null-enforcer" : "ERROR", diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest_jsonplan/testJoinWithFilter.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest_jsonplan/testJoinWithFilter.out index 195be418c0d..71ecfdb9335 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest_jsonplan/testJoinWithFilter.out +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest_jsonplan/testJoinWithFilter.out @@ -189,7 +189,7 @@ "description" : "Calc(select=[x, y], where=[(((y + 1) = (y * y)) AND (x = a))])" }, { "id" : 5, - "type" : "stream-exec-sink_1", + "type" : "stream-exec-sink_2", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", "table.exec.sink.not-null-enforcer" : "ERROR", diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest_jsonplan/testPythonTableFunction.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest_jsonplan/testPythonTableFunction.out index 73f72972634..3d3d35aaff2 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest_jsonplan/testPythonTableFunction.out +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest_jsonplan/testPythonTableFunction.out @@ -135,7 +135,7 @@ "description" : "Calc(select=[x, y])" }, { "id" : 5, - "type" : "stream-exec-sink_1", + "type" : "stream-exec-sink_2", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", "table.exec.sink.not-null-enforcer" : "ERROR", diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupAggregateJsonPlanTest_jsonplan/tesPythonAggCallsWithGroupBy.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupAggregateJsonPlanTest_jsonplan/tesPythonAggCallsWithGroupBy.out index 77a97c2d42a..fd7538d2991 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupAggregateJsonPlanTest_jsonplan/tesPythonAggCallsWithGroupBy.out +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupAggregateJsonPlanTest_jsonplan/tesPythonAggCallsWithGroupBy.out @@ -149,7 +149,7 @@ "description" : "Calc(select=[CAST(b AS BIGINT) AS a, EXPR$1 AS b])" }, { "id" : 6, - "type" : "stream-exec-sink_1", + "type" : "stream-exec-sink_2", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", "table.exec.sink.not-null-enforcer" : "ERROR", diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeHopWindow.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeHopWindow.out index c788c49e306..711d77a9810 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeHopWindow.out +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeHopWindow.out @@ -303,7 +303,7 @@ "description" : "PythonGroupWindowAggregate(groupBy=[b], window=[SlidingGroupWindow('w$, rowtime, 10000, 5000)], select=[b, pyFunc(a, $f3) AS EXPR$1])" }, { "id" : 7, - "type" : "stream-exec-sink_1", + "type" : "stream-exec-sink_2", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", "table.exec.sink.not-null-enforcer" : "ERROR", diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeSessionWindow.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeSessionWindow.out index 892f3eb3b9a..80528083d31 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeSessionWindow.out +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeSessionWindow.out @@ -301,7 +301,7 @@ "description" : "PythonGroupWindowAggregate(groupBy=[b], window=[SessionGroupWindow('w$, rowtime, 10000)], select=[b, pyFunc(a, $f3) AS EXPR$1])" }, { "id" : 7, - "type" : "stream-exec-sink_1", + "type" : "stream-exec-sink_2", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", "table.exec.sink.not-null-enforcer" : "ERROR", diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out index 1bde1c99e43..625404e3afc 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out @@ -413,7 +413,7 @@ "description" : "Calc(select=[b, w$start AS window_start, w$end AS window_end, EXPR$3])" }, { "id" : 8, - "type" : "stream-exec-sink_1", + "type" : "stream-exec-sink_2", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", "table.exec.sink.not-null-enforcer" : "ERROR", diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeHopWindow.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeHopWindow.out index 66562e9f1f2..e9092e8a395 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeHopWindow.out +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeHopWindow.out @@ -356,7 +356,7 @@ "description" : "PythonGroupWindowAggregate(groupBy=[b], window=[SlidingGroupWindow('w$, proctime, 600000, 300000)], select=[b, pyFunc(a, $f3) AS EXPR$1])" }, { "id" : 7, - "type" : "stream-exec-sink_1", + "type" : "stream-exec-sink_2", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", "table.exec.sink.not-null-enforcer" : "ERROR", diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeSessionWindow.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeSessionWindow.out index 11ee30655dd..ca4a4cbd1a8 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeSessionWindow.out +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeSessionWindow.out @@ -354,7 +354,7 @@ "description" : "PythonGroupWindowAggregate(groupBy=[b], window=[SessionGroupWindow('w$, proctime, 600000)], select=[b, pyFunc(a, $f3) AS EXPR$1])" }, { "id" : 7, - "type" : "stream-exec-sink_1", + "type" : "stream-exec-sink_2", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", "table.exec.sink.not-null-enforcer" : "ERROR", diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeTumbleWindow.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeTumbleWindow.out index 28630581efe..1be4cb3d7a4 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeTumbleWindow.out +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeTumbleWindow.out @@ -445,7 +445,7 @@ "description" : "Calc(select=[b, w$end AS window_end, EXPR$2])" }, { "id" : 8, - "type" : "stream-exec-sink_1", + "type" : "stream-exec-sink_2", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", "table.exec.sink.not-null-enforcer" : "ERROR", diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedNonPartitionedRangeOver.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedNonPartitionedRangeOver.out index 260321275c3..44dae4c0af9 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedNonPartitionedRangeOver.out +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedNonPartitionedRangeOver.out @@ -352,7 +352,7 @@ "description" : "Calc(select=[$2 AS $0, w0$o0 AS $1])" }, { "id" : 8, - "type" : "stream-exec-sink_1", + "type" : "stream-exec-sink_2", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", "table.exec.sink.not-null-enforcer" : "ERROR", diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedPartitionedRangeOver.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedPartitionedRangeOver.out index 5cad90f407c..eba924fca5e 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedPartitionedRangeOver.out +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedPartitionedRangeOver.out @@ -366,7 +366,7 @@ "description" : "Calc(select=[$3 AS $0, w0$o0 AS $1])" }, { "id" : 8, - "type" : "stream-exec-sink_1", + "type" : "stream-exec-sink_2", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", "table.exec.sink.not-null-enforcer" : "ERROR", diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedPartitionedRowsOverWithBuiltinProctime.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedPartitionedRowsOverWithBuiltinProctime.out index afc22d4037a..1d7a1d15ce9 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedPartitionedRowsOverWithBuiltinProctime.out +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedPartitionedRowsOverWithBuiltinProctime.out @@ -303,7 +303,7 @@ "description" : "Calc(select=[$2 AS $0, w0$o0 AS $1])" }, { "id" : 7, - "type" : "stream-exec-sink_1", + "type" : "stream-exec-sink_2", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", "table.exec.sink.not-null-enforcer" : "ERROR", diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeUnboundedPartitionedRangeOver.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeUnboundedPartitionedRangeOver.out index b792d066f59..3252699022c 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeUnboundedPartitionedRangeOver.out +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeUnboundedPartitionedRangeOver.out @@ -356,7 +356,7 @@ "description" : "Calc(select=[$3 AS $0, w0$o0 AS $1])" }, { "id" : 8, - "type" : "stream-exec-sink_1", + "type" : "stream-exec-sink_2", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", "table.exec.sink.not-null-enforcer" : "ERROR", diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testRowTimeBoundedPartitionedRowsOver.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testRowTimeBoundedPartitionedRowsOver.out index 1f25a552ba3..d44f5a0aac4 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testRowTimeBoundedPartitionedRowsOver.out +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testRowTimeBoundedPartitionedRowsOver.out @@ -298,7 +298,7 @@ "description" : "Calc(select=[$3 AS $0, w0$o0 AS $1])" }, { "id" : 7, - "type" : "stream-exec-sink_1", + "type" : "stream-exec-sink_2", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", "table.exec.sink.not-null-enforcer" : "ERROR", diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest_jsonplan/testJsonPlanWithTableHints.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest_jsonplan/testJsonPlanWithTableHints.out index cc6ba9c8b6e..b3db697e6ab 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest_jsonplan/testJsonPlanWithTableHints.out +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest_jsonplan/testJsonPlanWithTableHints.out @@ -263,7 +263,7 @@ } }, { "id" : 6, - "type" : "stream-exec-sink_1", + "type" : "stream-exec-sink_2", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", "table.exec.sink.not-null-enforcer" : "ERROR", diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/insert-into-upsert-with-sink-upsert-materializer-writable-metadata-v1/plan/insert-into-upsert-with-sink-upsert-materializer-writable-metadata-v1.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/insert-into-upsert-with-sink-upsert-materializer-writable-metadata-v1/plan/insert-into-upsert-with-sink-upsert-materializer-writable-metadata-v1.json new file mode 100644 index 00000000000..2959e98ad25 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/insert-into-upsert-with-sink-upsert-materializer-writable-metadata-v1/plan/insert-into-upsert-with-sink-upsert-materializer-writable-metadata-v1.json @@ -0,0 +1,149 @@ +{ + "flinkVersion" : "2.3", + "nodes" : [ { + "id" : 1, + "type" : "stream-exec-table-source-scan_2", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`source_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "name", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "score", + "dataType" : "INT" + } ] + } + } + } + }, + "outputType" : "ROW<`name` VARCHAR(2147483647), `score` INT>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t]], fields=[name, score])" + }, { + "id" : 2, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "CALL", + "internalName" : "$UPPER$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + } ], + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$CAST$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "INT" + } ], + "type" : "BIGINT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$CAST$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "INT" + } ], + "type" : "BIGINT" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`name` VARCHAR(2147483647), `score` BIGINT, `nameMetadata` VARCHAR(2147483647), `scoreMetadata` BIGINT>", + "description" : "Calc(select=[UPPER(name) AS name, CAST(score AS BIGINT) AS score, name AS nameMetadata, CAST(score AS BIGINT) AS scoreMetadata])" + }, { + "id" : 3, + "type" : "stream-exec-sink_1", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "name", + "dataType" : "VARCHAR(2147483647) NOT NULL" + }, { + "name" : "scoreMetadata", + "kind" : "METADATA", + "dataType" : "BIGINT", + "isVirtual" : false + }, { + "name" : "score", + "dataType" : "BIGINT" + }, { + "name" : "nameMetadata", + "kind" : "METADATA", + "dataType" : "VARCHAR(2147483647)", + "isVirtual" : false + } ], + "primaryKey" : { + "name" : "PK_name", + "type" : "PRIMARY_KEY", + "columns" : [ "name" ] + } + } + } + }, + "abilities" : [ { + "type" : "WritingMetadata", + "metadataKeys" : [ "nameMetadata", "scoreMetadata" ], + "consumedType" : "ROW<`name` VARCHAR(2147483647) NOT NULL, `score` BIGINT, `nameMetadata` VARCHAR(2147483647), `scoreMetadata` BIGINT> NOT NULL" + } ] + }, + "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE" ], + "requireUpsertMaterialize" : true, + "state" : [ { + "index" : 0, + "ttl" : "0 ms", + "name" : "sinkMaterializeState" + } ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`name` VARCHAR(2147483647), `score` BIGINT, `nameMetadata` VARCHAR(2147483647), `scoreMetadata` BIGINT>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[name, score, nameMetadata, scoreMetadata], upsertMaterialize=[true])" + } ], + "edges" : [ { + "source" : 1, + "target" : 2, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 2, + "target" : 3, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/insert-into-upsert-with-sink-upsert-materializer-writable-metadata-v1/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/insert-into-upsert-with-sink-upsert-materializer-writable-metadata-v1/savepoint/_metadata new file mode 100644 index 00000000000..52dbca2634c Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/insert-into-upsert-with-sink-upsert-materializer-writable-metadata-v1/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/insert-into-upsert-with-sink-upsert-materializer-writable-metadata-v1/plan/insert-into-upsert-with-sink-upsert-materializer-writable-metadata-v1.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/insert-into-upsert-with-sink-upsert-materializer-writable-metadata-v1/plan/insert-into-upsert-with-sink-upsert-materializer-writable-metadata-v1.json new file mode 100644 index 00000000000..02743978262 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/insert-into-upsert-with-sink-upsert-materializer-writable-metadata-v1/plan/insert-into-upsert-with-sink-upsert-materializer-writable-metadata-v1.json @@ -0,0 +1,149 @@ +{ + "flinkVersion" : "2.3", + "nodes" : [ { + "id" : 1, + "type" : "stream-exec-table-source-scan_2", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`source_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "name", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "score", + "dataType" : "INT" + } ] + } + } + } + }, + "outputType" : "ROW<`name` VARCHAR(2147483647), `score` INT>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t]], fields=[name, score])" + }, { + "id" : 2, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "CALL", + "internalName" : "$UPPER$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + } ], + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$CAST$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "INT" + } ], + "type" : "BIGINT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$CAST$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "INT" + } ], + "type" : "BIGINT" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`name` VARCHAR(2147483647), `score` BIGINT, `nameMetadata` VARCHAR(2147483647), `scoreMetadata` BIGINT>", + "description" : "Calc(select=[UPPER(name) AS name, CAST(score AS BIGINT) AS score, name AS nameMetadata, CAST(score AS BIGINT) AS scoreMetadata])" + }, { + "id" : 3, + "type" : "stream-exec-sink_2", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "name", + "dataType" : "VARCHAR(2147483647) NOT NULL" + }, { + "name" : "scoreMetadata", + "kind" : "METADATA", + "dataType" : "BIGINT", + "isVirtual" : false + }, { + "name" : "score", + "dataType" : "BIGINT" + }, { + "name" : "nameMetadata", + "kind" : "METADATA", + "dataType" : "VARCHAR(2147483647)", + "isVirtual" : false + } ], + "primaryKey" : { + "name" : "PK_name", + "type" : "PRIMARY_KEY", + "columns" : [ "name" ] + } + } + } + }, + "abilities" : [ { + "type" : "WritingMetadata", + "metadataKeys" : [ "nameMetadata", "scoreMetadata" ], + "consumedType" : "ROW<`name` VARCHAR(2147483647) NOT NULL, `score` BIGINT, `nameMetadata` VARCHAR(2147483647), `scoreMetadata` BIGINT> NOT NULL" + } ] + }, + "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE" ], + "requireUpsertMaterialize" : true, + "state" : [ { + "index" : 0, + "ttl" : "0 ms", + "name" : "sinkMaterializeState" + } ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`name` VARCHAR(2147483647), `score` BIGINT, `nameMetadata` VARCHAR(2147483647), `scoreMetadata` BIGINT>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[name, score, nameMetadata, scoreMetadata], upsertMaterialize=[true])" + } ], + "edges" : [ { + "source" : 1, + "target" : 2, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 2, + "target" : 3, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/insert-into-upsert-with-sink-upsert-materializer-writable-metadata-v1/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/insert-into-upsert-with-sink-upsert-materializer-writable-metadata-v1/savepoint/_metadata new file mode 100644 index 00000000000..be3b51c2def Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/insert-into-upsert-with-sink-upsert-materializer-writable-metadata-v1/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/insert-into-upsert-with-sink-upsert-materializer-writable-metadata/plan/insert-into-upsert-with-sink-upsert-materializer-writable-metadata.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/insert-into-upsert-with-sink-upsert-materializer-writable-metadata/plan/insert-into-upsert-with-sink-upsert-materializer-writable-metadata.json new file mode 100644 index 00000000000..43168424eec --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/insert-into-upsert-with-sink-upsert-materializer-writable-metadata/plan/insert-into-upsert-with-sink-upsert-materializer-writable-metadata.json @@ -0,0 +1,149 @@ +{ + "flinkVersion" : "2.3", + "nodes" : [ { + "id" : 4, + "type" : "stream-exec-table-source-scan_2", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`source_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "name", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "score", + "dataType" : "INT" + } ] + } + } + } + }, + "outputType" : "ROW<`name` VARCHAR(2147483647), `score` INT>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t]], fields=[name, score])" + }, { + "id" : 5, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "CALL", + "internalName" : "$UPPER$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + } ], + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$CAST$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "INT" + } ], + "type" : "BIGINT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$CAST$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "INT" + } ], + "type" : "BIGINT" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`name` VARCHAR(2147483647), `score` BIGINT, `nameMetadata` VARCHAR(2147483647), `scoreMetadata` BIGINT>", + "description" : "Calc(select=[UPPER(name) AS name, CAST(score AS BIGINT) AS score, name AS nameMetadata, CAST(score AS BIGINT) AS scoreMetadata])" + }, { + "id" : 6, + "type" : "stream-exec-sink_2", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "name", + "dataType" : "VARCHAR(2147483647) NOT NULL" + }, { + "name" : "scoreMetadata", + "kind" : "METADATA", + "dataType" : "BIGINT", + "isVirtual" : false + }, { + "name" : "score", + "dataType" : "BIGINT" + }, { + "name" : "nameMetadata", + "kind" : "METADATA", + "dataType" : "VARCHAR(2147483647)", + "isVirtual" : false + } ], + "primaryKey" : { + "name" : "PK_name", + "type" : "PRIMARY_KEY", + "columns" : [ "name" ] + } + } + } + }, + "abilities" : [ { + "type" : "WritingMetadata", + "metadataKeys" : [ "nameMetadata", "scoreMetadata" ], + "consumedType" : "ROW<`name` VARCHAR(2147483647) NOT NULL, `score` BIGINT, `nameMetadata` VARCHAR(2147483647), `scoreMetadata` BIGINT> NOT NULL" + } ] + }, + "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE" ], + "requireUpsertMaterialize" : true, + "state" : [ { + "index" : 0, + "ttl" : "0 ms", + "name" : "sinkMaterializeState" + } ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`name` VARCHAR(2147483647), `score` BIGINT, `nameMetadata` VARCHAR(2147483647), `scoreMetadata` BIGINT>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[name, score, nameMetadata, scoreMetadata], upsertMaterialize=[true])" + } ], + "edges" : [ { + "source" : 4, + "target" : 5, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 5, + "target" : 6, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/insert-into-upsert-with-sink-upsert-materializer-writable-metadata/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/insert-into-upsert-with-sink-upsert-materializer-writable-metadata/savepoint/_metadata new file mode 100644 index 00000000000..b2c30bf6e50 Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/insert-into-upsert-with-sink-upsert-materializer-writable-metadata/savepoint/_metadata differ diff --git a/flink-python/pyflink/table/tests/jsonplan/testGetJsonPlan.out b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_hash-with-keys-with-count/plan/sink-bucketing_hash-with-keys-with-count.json similarity index 68% copy from flink-python/pyflink/table/tests/jsonplan/testGetJsonPlan.out copy to flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_hash-with-keys-with-count/plan/sink-bucketing_hash-with-keys-with-count.json index 880fb2cf11d..338e923a20e 100644 --- a/flink-python/pyflink/table/tests/jsonplan/testGetJsonPlan.out +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_hash-with-keys-with-count/plan/sink-bucketing_hash-with-keys-with-count.json @@ -1,36 +1,32 @@ { - "flinkVersion" : "", + "flinkVersion" : "2.3", "nodes" : [ { - "id" : 0, + "id" : 5, "type" : "stream-exec-table-source-scan_2", "scanTableSource" : { "table" : { - "identifier" : "`default_catalog`.`default_database`.`MyTable`", + "identifier" : "`default_catalog`.`default_database`.`source_t`", "resolvedTable" : { "schema" : { "columns" : [ { "name" : "a", - "dataType" : "BIGINT" + "dataType" : "INT" }, { "name" : "b", - "dataType" : "INT" + "dataType" : "BIGINT" }, { "name" : "c", "dataType" : "VARCHAR(2147483647)" } ] - }, - "options" : { - "connector" : "datagen", - "number-of-rows" : "5" } } } }, - "outputType" : "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>", - "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c])" + "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t]], fields=[a, b, c])" }, { - "id" : 0, - "type" : "stream-exec-sink_1", + "id" : 6, + "type" : "stream-exec-sink_2", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", "table.exec.sink.not-null-enforcer" : "ERROR", @@ -40,27 +36,34 @@ }, "dynamicTableSink" : { "table" : { - "identifier" : "`default_catalog`.`default_database`.`MySink`", + "identifier" : "`default_catalog`.`default_database`.`sink_t`", "resolvedTable" : { "schema" : { "columns" : [ { "name" : "a", - "dataType" : "BIGINT" + "dataType" : "INT" }, { "name" : "b", - "dataType" : "INT" + "dataType" : "BIGINT" }, { "name" : "c", "dataType" : "VARCHAR(2147483647)" } ] }, - "options" : { - "connector" : "blackhole" - } + "distribution" : { + "kind" : "HASH", + "bucketCount" : 3, + "bucketKeys" : [ "a" ] + }, + "partitionKeys" : [ "b" ] } - } + }, + "abilities" : [ { + "type" : "Bucketing" + } ] }, "inputChangelogMode" : [ "INSERT" ], + "upsertMaterializeStrategy" : "MAP", "inputProperties" : [ { "requiredDistribution" : { "type" : "UNKNOWN" @@ -68,12 +71,12 @@ "damBehavior" : "PIPELINED", "priority" : 0 } ], - "outputType" : "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>", - "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[a, b, c])" + "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[a, b, c])" } ], "edges" : [ { - "source" : 0, - "target" : 0, + "source" : 5, + "target" : 6, "shuffle" : { "type" : "FORWARD" }, diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_hash-with-keys-with-count/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_hash-with-keys-with-count/savepoint/_metadata new file mode 100644 index 00000000000..6998265c31e Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_hash-with-keys-with-count/savepoint/_metadata differ diff --git a/flink-python/pyflink/table/tests/jsonplan/testGetJsonPlan.out b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_range_with_keys_without_count/plan/sink-bucketing_range_with_keys_without_count.json similarity index 69% copy from flink-python/pyflink/table/tests/jsonplan/testGetJsonPlan.out copy to flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_range_with_keys_without_count/plan/sink-bucketing_range_with_keys_without_count.json index 880fb2cf11d..47ce499372a 100644 --- a/flink-python/pyflink/table/tests/jsonplan/testGetJsonPlan.out +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_range_with_keys_without_count/plan/sink-bucketing_range_with_keys_without_count.json @@ -1,36 +1,32 @@ { - "flinkVersion" : "", + "flinkVersion" : "2.3", "nodes" : [ { - "id" : 0, + "id" : 7, "type" : "stream-exec-table-source-scan_2", "scanTableSource" : { "table" : { - "identifier" : "`default_catalog`.`default_database`.`MyTable`", + "identifier" : "`default_catalog`.`default_database`.`source_t`", "resolvedTable" : { "schema" : { "columns" : [ { "name" : "a", - "dataType" : "BIGINT" + "dataType" : "INT" }, { "name" : "b", - "dataType" : "INT" + "dataType" : "BIGINT" }, { "name" : "c", "dataType" : "VARCHAR(2147483647)" } ] - }, - "options" : { - "connector" : "datagen", - "number-of-rows" : "5" } } } }, - "outputType" : "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>", - "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c])" + "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t]], fields=[a, b, c])" }, { - "id" : 0, - "type" : "stream-exec-sink_1", + "id" : 8, + "type" : "stream-exec-sink_2", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", "table.exec.sink.not-null-enforcer" : "ERROR", @@ -40,27 +36,33 @@ }, "dynamicTableSink" : { "table" : { - "identifier" : "`default_catalog`.`default_database`.`MySink`", + "identifier" : "`default_catalog`.`default_database`.`sink_t`", "resolvedTable" : { "schema" : { "columns" : [ { "name" : "a", - "dataType" : "BIGINT" + "dataType" : "INT" }, { "name" : "b", - "dataType" : "INT" + "dataType" : "BIGINT" }, { "name" : "c", "dataType" : "VARCHAR(2147483647)" } ] }, - "options" : { - "connector" : "blackhole" - } + "distribution" : { + "kind" : "HASH", + "bucketKeys" : [ "a" ] + }, + "partitionKeys" : [ "b" ] } - } + }, + "abilities" : [ { + "type" : "Bucketing" + } ] }, "inputChangelogMode" : [ "INSERT" ], + "upsertMaterializeStrategy" : "MAP", "inputProperties" : [ { "requiredDistribution" : { "type" : "UNKNOWN" @@ -68,12 +70,12 @@ "damBehavior" : "PIPELINED", "priority" : 0 } ], - "outputType" : "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>", - "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[a, b, c])" + "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[a, b, c])" } ], "edges" : [ { - "source" : 0, - "target" : 0, + "source" : 7, + "target" : 8, "shuffle" : { "type" : "FORWARD" }, diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_range_with_keys_without_count/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_range_with_keys_without_count/savepoint/_metadata new file mode 100644 index 00000000000..0fd79331c10 Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_range_with_keys_without_count/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/jsonplan/testGetJsonPlan.out b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_with-count/plan/sink-bucketing_with-count.json similarity index 71% copy from flink-table/flink-table-planner/src/test/resources/jsonplan/testGetJsonPlan.out copy to flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_with-count/plan/sink-bucketing_with-count.json index 19cd2a29e6c..1df0a88d5f2 100644 --- a/flink-table/flink-table-planner/src/test/resources/jsonplan/testGetJsonPlan.out +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_with-count/plan/sink-bucketing_with-count.json @@ -1,36 +1,32 @@ { - "flinkVersion" : "", + "flinkVersion" : "2.3", "nodes" : [ { "id" : 1, "type" : "stream-exec-table-source-scan_2", "scanTableSource" : { "table" : { - "identifier" : "`default_catalog`.`default_database`.`MyTable`", + "identifier" : "`default_catalog`.`default_database`.`source_t`", "resolvedTable" : { "schema" : { "columns" : [ { "name" : "a", - "dataType" : "BIGINT" + "dataType" : "INT" }, { "name" : "b", - "dataType" : "INT" + "dataType" : "BIGINT" }, { "name" : "c", "dataType" : "VARCHAR(2147483647)" } ] - }, - "options" : { - "bounded" : "false", - "connector" : "values" } } } }, - "outputType" : "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>", - "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c])" + "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t]], fields=[a, b, c])" }, { "id" : 2, - "type" : "stream-exec-sink_1", + "type" : "stream-exec-sink_2", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", "table.exec.sink.not-null-enforcer" : "ERROR", @@ -40,28 +36,34 @@ }, "dynamicTableSink" : { "table" : { - "identifier" : "`default_catalog`.`default_database`.`MySink`", + "identifier" : "`default_catalog`.`default_database`.`sink_t`", "resolvedTable" : { "schema" : { "columns" : [ { "name" : "a", - "dataType" : "BIGINT" + "dataType" : "INT" }, { "name" : "b", - "dataType" : "INT" + "dataType" : "BIGINT" }, { "name" : "c", "dataType" : "VARCHAR(2147483647)" } ] }, - "options" : { - "connector" : "values", - "table-sink-class" : "DEFAULT" - } + "distribution" : { + "kind" : "UNKNOWN", + "bucketCount" : 3, + "bucketKeys" : [ ] + }, + "partitionKeys" : [ "b" ] } - } + }, + "abilities" : [ { + "type" : "Bucketing" + } ] }, "inputChangelogMode" : [ "INSERT" ], + "upsertMaterializeStrategy" : "MAP", "inputProperties" : [ { "requiredDistribution" : { "type" : "UNKNOWN" @@ -69,8 +71,8 @@ "damBehavior" : "PIPELINED", "priority" : 0 } ], - "outputType" : "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>", - "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[a, b, c])" + "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[a, b, c])" } ], "edges" : [ { "source" : 1, diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_with-count/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_with-count/savepoint/_metadata new file mode 100644 index 00000000000..ef824265bbb Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_with-count/savepoint/_metadata differ diff --git a/flink-python/pyflink/table/tests/jsonplan/testGetJsonPlan.out b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_with-keys-and-count/plan/sink-bucketing_with-keys-and-count.json similarity index 68% copy from flink-python/pyflink/table/tests/jsonplan/testGetJsonPlan.out copy to flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_with-keys-and-count/plan/sink-bucketing_with-keys-and-count.json index 880fb2cf11d..9e95055e3ca 100644 --- a/flink-python/pyflink/table/tests/jsonplan/testGetJsonPlan.out +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_with-keys-and-count/plan/sink-bucketing_with-keys-and-count.json @@ -1,36 +1,32 @@ { - "flinkVersion" : "", + "flinkVersion" : "2.3", "nodes" : [ { - "id" : 0, + "id" : 3, "type" : "stream-exec-table-source-scan_2", "scanTableSource" : { "table" : { - "identifier" : "`default_catalog`.`default_database`.`MyTable`", + "identifier" : "`default_catalog`.`default_database`.`source_t`", "resolvedTable" : { "schema" : { "columns" : [ { "name" : "a", - "dataType" : "BIGINT" + "dataType" : "INT" }, { "name" : "b", - "dataType" : "INT" + "dataType" : "BIGINT" }, { "name" : "c", "dataType" : "VARCHAR(2147483647)" } ] - }, - "options" : { - "connector" : "datagen", - "number-of-rows" : "5" } } } }, - "outputType" : "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>", - "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c])" + "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t]], fields=[a, b, c])" }, { - "id" : 0, - "type" : "stream-exec-sink_1", + "id" : 4, + "type" : "stream-exec-sink_2", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", "table.exec.sink.not-null-enforcer" : "ERROR", @@ -40,27 +36,34 @@ }, "dynamicTableSink" : { "table" : { - "identifier" : "`default_catalog`.`default_database`.`MySink`", + "identifier" : "`default_catalog`.`default_database`.`sink_t`", "resolvedTable" : { "schema" : { "columns" : [ { "name" : "a", - "dataType" : "BIGINT" + "dataType" : "INT" }, { "name" : "b", - "dataType" : "INT" + "dataType" : "BIGINT" }, { "name" : "c", "dataType" : "VARCHAR(2147483647)" } ] }, - "options" : { - "connector" : "blackhole" - } + "distribution" : { + "kind" : "UNKNOWN", + "bucketCount" : 3, + "bucketKeys" : [ "a" ] + }, + "partitionKeys" : [ "b" ] } - } + }, + "abilities" : [ { + "type" : "Bucketing" + } ] }, "inputChangelogMode" : [ "INSERT" ], + "upsertMaterializeStrategy" : "MAP", "inputProperties" : [ { "requiredDistribution" : { "type" : "UNKNOWN" @@ -68,12 +71,12 @@ "damBehavior" : "PIPELINED", "priority" : 0 } ], - "outputType" : "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>", - "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[a, b, c])" + "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[a, b, c])" } ], "edges" : [ { - "source" : 0, - "target" : 0, + "source" : 3, + "target" : 4, "shuffle" : { "type" : "FORWARD" }, diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_with-keys-and-count/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_with-keys-and-count/savepoint/_metadata new file mode 100644 index 00000000000..023fa406bf3 Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_with-keys-and-count/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest_jsonplan/testPythonCalc.out b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-ndf-primary-key/plan/sink-ndf-primary-key.json similarity index 54% copy from flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest_jsonplan/testPythonCalc.out copy to flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-ndf-primary-key/plan/sink-ndf-primary-key.json index 9ebcad236f8..192ef2898a9 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest_jsonplan/testPythonCalc.out +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-ndf-primary-key/plan/sink-ndf-primary-key.json @@ -1,66 +1,51 @@ { - "flinkVersion" : "", + "flinkVersion" : "2.3", "nodes" : [ { - "id" : 1, + "id" : 16, "type" : "stream-exec-table-source-scan_2", "scanTableSource" : { "table" : { - "identifier" : "`default_catalog`.`default_database`.`MyTable`", + "identifier" : "`default_catalog`.`default_database`.`source_t`", "resolvedTable" : { "schema" : { "columns" : [ { "name" : "a", - "dataType" : "BIGINT" + "dataType" : "INT" }, { "name" : "b", - "dataType" : "INT NOT NULL" + "dataType" : "BIGINT" }, { "name" : "c", "dataType" : "VARCHAR(2147483647)" - }, { - "name" : "d", - "dataType" : "TIMESTAMP(3)" } ] - }, - "options" : { - "bounded" : "false", - "connector" : "values" } } - }, - "abilities" : [ { - "type" : "ProjectPushDown", - "projectedFields" : [ [ 0 ], [ 1 ] ], - "producedType" : "ROW<`a` BIGINT, `b` INT NOT NULL> NOT NULL" - }, { - "type" : "ReadingMetadata", - "metadataKeys" : [ ], - "producedType" : "ROW<`a` BIGINT, `b` INT NOT NULL> NOT NULL" - } ] + } }, - "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL>", - "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, b], metadata=[]]], fields=[a, b])" + "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t]], fields=[a, b, c])" }, { - "id" : 2, - "type" : "stream-exec-python-calc_1", + "id" : 17, + "type" : "stream-exec-calc_1", "projection" : [ { "kind" : "INPUT_REF", "inputIndex" : 0, + "type" : "INT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, "type" : "BIGINT" }, { "kind" : "CALL", - "catalogName" : "`default_catalog`.`default_database`.`pyFunc`", + "catalogName" : "`default_catalog`.`default_database`.`ndf`", "operands" : [ { "kind" : "INPUT_REF", - "inputIndex" : 1, - "type" : "INT NOT NULL" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 1, - "type" : "INT NOT NULL" + "inputIndex" : 2, + "type" : "VARCHAR(2147483647)" } ], - "type" : "INT NOT NULL" + "type" : "VARCHAR(2147483647)" } ], + "condition" : null, "inputProperties" : [ { "requiredDistribution" : { "type" : "UNKNOWN" @@ -68,11 +53,11 @@ "damBehavior" : "PIPELINED", "priority" : 0 } ], - "outputType" : "ROW<`a` BIGINT, `EXPR$1` INT NOT NULL>", - "description" : "PythonCalc(select=[a, pyFunc(b, b) AS EXPR$1])" + "outputType" : "ROW<`a` INT, `b` BIGINT, `EXPR$2` VARCHAR(2147483647)>", + "description" : "Calc(select=[a, b, ndf(c) AS EXPR$2])" }, { - "id" : 3, - "type" : "stream-exec-sink_1", + "id" : 18, + "type" : "stream-exec-sink_2", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", "table.exec.sink.not-null-enforcer" : "ERROR", @@ -82,25 +67,30 @@ }, "dynamicTableSink" : { "table" : { - "identifier" : "`default_catalog`.`default_database`.`MySink`", + "identifier" : "`default_catalog`.`default_database`.`sink_t`", "resolvedTable" : { "schema" : { "columns" : [ { "name" : "a", - "dataType" : "BIGINT" + "dataType" : "INT" }, { "name" : "b", - "dataType" : "INT" - } ] - }, - "options" : { - "connector" : "values", - "table-sink-class" : "DEFAULT" + "dataType" : "BIGINT" + }, { + "name" : "c", + "dataType" : "VARCHAR(2147483647) NOT NULL" + } ], + "primaryKey" : { + "name" : "PK_c", + "type" : "PRIMARY_KEY", + "columns" : [ "c" ] + } } } } }, "inputChangelogMode" : [ "INSERT" ], + "upsertMaterializeStrategy" : "MAP", "inputProperties" : [ { "requiredDistribution" : { "type" : "UNKNOWN" @@ -108,19 +98,19 @@ "damBehavior" : "PIPELINED", "priority" : 0 } ], - "outputType" : "ROW<`a` BIGINT, `EXPR$1` INT NOT NULL>", - "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[a, EXPR$1])" + "outputType" : "ROW<`a` INT, `b` BIGINT, `EXPR$2` VARCHAR(2147483647)>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[a, b, EXPR$2])" } ], "edges" : [ { - "source" : 1, - "target" : 2, + "source" : 16, + "target" : 17, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 2, - "target" : 3, + "source" : 17, + "target" : 18, "shuffle" : { "type" : "FORWARD" }, diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-ndf-primary-key/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-ndf-primary-key/savepoint/_metadata new file mode 100644 index 00000000000..fb6418009ee Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-ndf-primary-key/savepoint/_metadata differ diff --git a/flink-python/pyflink/table/tests/jsonplan/testGetJsonPlan.out b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-overwrite/plan/sink-overwrite.json similarity index 71% copy from flink-python/pyflink/table/tests/jsonplan/testGetJsonPlan.out copy to flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-overwrite/plan/sink-overwrite.json index 880fb2cf11d..1be7ade9d61 100644 --- a/flink-python/pyflink/table/tests/jsonplan/testGetJsonPlan.out +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-overwrite/plan/sink-overwrite.json @@ -1,36 +1,32 @@ { - "flinkVersion" : "", + "flinkVersion" : "2.3", "nodes" : [ { - "id" : 0, + "id" : 12, "type" : "stream-exec-table-source-scan_2", "scanTableSource" : { "table" : { - "identifier" : "`default_catalog`.`default_database`.`MyTable`", + "identifier" : "`default_catalog`.`default_database`.`source_t`", "resolvedTable" : { "schema" : { "columns" : [ { "name" : "a", - "dataType" : "BIGINT" + "dataType" : "INT" }, { "name" : "b", - "dataType" : "INT" + "dataType" : "BIGINT" }, { "name" : "c", "dataType" : "VARCHAR(2147483647)" } ] - }, - "options" : { - "connector" : "datagen", - "number-of-rows" : "5" } } } }, - "outputType" : "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>", - "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c])" + "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t]], fields=[a, b, c])" }, { - "id" : 0, - "type" : "stream-exec-sink_1", + "id" : 13, + "type" : "stream-exec-sink_2", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", "table.exec.sink.not-null-enforcer" : "ERROR", @@ -40,27 +36,29 @@ }, "dynamicTableSink" : { "table" : { - "identifier" : "`default_catalog`.`default_database`.`MySink`", + "identifier" : "`default_catalog`.`default_database`.`sink_t`", "resolvedTable" : { "schema" : { "columns" : [ { "name" : "a", - "dataType" : "BIGINT" + "dataType" : "INT" }, { "name" : "b", - "dataType" : "INT" + "dataType" : "BIGINT" }, { "name" : "c", "dataType" : "VARCHAR(2147483647)" } ] - }, - "options" : { - "connector" : "blackhole" } } - } + }, + "abilities" : [ { + "type" : "Overwrite", + "overwrite" : true + } ] }, "inputChangelogMode" : [ "INSERT" ], + "upsertMaterializeStrategy" : "MAP", "inputProperties" : [ { "requiredDistribution" : { "type" : "UNKNOWN" @@ -68,12 +66,12 @@ "damBehavior" : "PIPELINED", "priority" : 0 } ], - "outputType" : "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>", - "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[a, b, c])" + "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[a, b, c])" } ], "edges" : [ { - "source" : 0, - "target" : 0, + "source" : 12, + "target" : 13, "shuffle" : { "type" : "FORWARD" }, diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-overwrite/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-overwrite/savepoint/_metadata new file mode 100644 index 00000000000..874462a05b3 Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-overwrite/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest_jsonplan/testPythonCalc.out b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-partial-insert/plan/sink-partial-insert.json similarity index 53% copy from flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest_jsonplan/testPythonCalc.out copy to flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-partial-insert/plan/sink-partial-insert.json index 9ebcad236f8..ddd0df467ae 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest_jsonplan/testPythonCalc.out +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-partial-insert/plan/sink-partial-insert.json @@ -1,66 +1,54 @@ { - "flinkVersion" : "", + "flinkVersion" : "2.3", "nodes" : [ { - "id" : 1, + "id" : 19, "type" : "stream-exec-table-source-scan_2", "scanTableSource" : { "table" : { - "identifier" : "`default_catalog`.`default_database`.`MyTable`", + "identifier" : "`default_catalog`.`default_database`.`source_t`", "resolvedTable" : { "schema" : { "columns" : [ { "name" : "a", - "dataType" : "BIGINT" + "dataType" : "INT" }, { "name" : "b", - "dataType" : "INT NOT NULL" + "dataType" : "BIGINT" }, { "name" : "c", "dataType" : "VARCHAR(2147483647)" - }, { - "name" : "d", - "dataType" : "TIMESTAMP(3)" } ] - }, - "options" : { - "bounded" : "false", - "connector" : "values" } } - }, - "abilities" : [ { - "type" : "ProjectPushDown", - "projectedFields" : [ [ 0 ], [ 1 ] ], - "producedType" : "ROW<`a` BIGINT, `b` INT NOT NULL> NOT NULL" - }, { - "type" : "ReadingMetadata", - "metadataKeys" : [ ], - "producedType" : "ROW<`a` BIGINT, `b` INT NOT NULL> NOT NULL" - } ] + } }, - "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL>", - "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, b], metadata=[]]], fields=[a, b])" + "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t]], fields=[a, b, c])" }, { - "id" : 2, - "type" : "stream-exec-python-calc_1", + "id" : 20, + "type" : "stream-exec-calc_1", "projection" : [ { "kind" : "INPUT_REF", "inputIndex" : 0, + "type" : "INT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, "type" : "BIGINT" }, { - "kind" : "CALL", - "catalogName" : "`default_catalog`.`default_database`.`pyFunc`", - "operands" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 1, - "type" : "INT NOT NULL" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 1, - "type" : "INT NOT NULL" - } ], - "type" : "INT NOT NULL" + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "LITERAL", + "value" : null, + "type" : "DECIMAL(10, 2)" + }, { + "kind" : "LITERAL", + "value" : null, + "type" : "DOUBLE" } ], + "condition" : null, "inputProperties" : [ { "requiredDistribution" : { "type" : "UNKNOWN" @@ -68,11 +56,11 @@ "damBehavior" : "PIPELINED", "priority" : 0 } ], - "outputType" : "ROW<`a` BIGINT, `EXPR$1` INT NOT NULL>", - "description" : "PythonCalc(select=[a, pyFunc(b, b) AS EXPR$1])" + "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647), `EXPR$3` DECIMAL(10, 2), `EXPR$4` DOUBLE>", + "description" : "Calc(select=[a, b, c, null:DECIMAL(10, 2) AS EXPR$3, null:DOUBLE AS EXPR$4])" }, { - "id" : 3, - "type" : "stream-exec-sink_1", + "id" : 21, + "type" : "stream-exec-sink_2", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", "table.exec.sink.not-null-enforcer" : "ERROR", @@ -82,25 +70,36 @@ }, "dynamicTableSink" : { "table" : { - "identifier" : "`default_catalog`.`default_database`.`MySink`", + "identifier" : "`default_catalog`.`default_database`.`sink_t`", "resolvedTable" : { "schema" : { "columns" : [ { "name" : "a", - "dataType" : "BIGINT" + "dataType" : "INT" }, { "name" : "b", - "dataType" : "INT" + "dataType" : "BIGINT" + }, { + "name" : "c", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "d", + "dataType" : "DECIMAL(10, 2)" + }, { + "name" : "e", + "dataType" : "DOUBLE" } ] - }, - "options" : { - "connector" : "values", - "table-sink-class" : "DEFAULT" } } - } + }, + "abilities" : [ { + "type" : "TargetColumnWriting", + "targetColumns" : [ [ 0 ], [ 1 ], [ 2 ] ] + } ], + "targetColumns" : [ [ 0 ], [ 1 ], [ 2 ] ] }, "inputChangelogMode" : [ "INSERT" ], + "upsertMaterializeStrategy" : "MAP", "inputProperties" : [ { "requiredDistribution" : { "type" : "UNKNOWN" @@ -108,19 +107,19 @@ "damBehavior" : "PIPELINED", "priority" : 0 } ], - "outputType" : "ROW<`a` BIGINT, `EXPR$1` INT NOT NULL>", - "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[a, EXPR$1])" + "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647), `EXPR$3` DECIMAL(10, 2), `EXPR$4` DOUBLE>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], targetColumns=[[0],[1],[2]], fields=[a, b, c, EXPR$3, EXPR$4])" } ], "edges" : [ { - "source" : 1, - "target" : 2, + "source" : 19, + "target" : 20, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 2, - "target" : 3, + "source" : 20, + "target" : 21, "shuffle" : { "type" : "FORWARD" }, diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-partial-insert/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-partial-insert/savepoint/_metadata new file mode 100644 index 00000000000..5b02515c833 Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-partial-insert/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest_jsonplan/testPythonCalc.out b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-partition/plan/sink-partition.json similarity index 53% copy from flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest_jsonplan/testPythonCalc.out copy to flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-partition/plan/sink-partition.json index 9ebcad236f8..79f725b5f5c 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest_jsonplan/testPythonCalc.out +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-partition/plan/sink-partition.json @@ -1,66 +1,50 @@ { - "flinkVersion" : "", + "flinkVersion" : "2.3", "nodes" : [ { - "id" : 1, + "id" : 9, "type" : "stream-exec-table-source-scan_2", "scanTableSource" : { "table" : { - "identifier" : "`default_catalog`.`default_database`.`MyTable`", + "identifier" : "`default_catalog`.`default_database`.`source_t`", "resolvedTable" : { "schema" : { "columns" : [ { "name" : "a", - "dataType" : "BIGINT" + "dataType" : "INT" }, { "name" : "b", - "dataType" : "INT NOT NULL" + "dataType" : "BIGINT" }, { "name" : "c", "dataType" : "VARCHAR(2147483647)" - }, { - "name" : "d", - "dataType" : "TIMESTAMP(3)" } ] - }, - "options" : { - "bounded" : "false", - "connector" : "values" } } - }, - "abilities" : [ { - "type" : "ProjectPushDown", - "projectedFields" : [ [ 0 ], [ 1 ] ], - "producedType" : "ROW<`a` BIGINT, `b` INT NOT NULL> NOT NULL" - }, { - "type" : "ReadingMetadata", - "metadataKeys" : [ ], - "producedType" : "ROW<`a` BIGINT, `b` INT NOT NULL> NOT NULL" - } ] + } }, - "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL>", - "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, b], metadata=[]]], fields=[a, b])" + "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t]], fields=[a, b, c])" }, { - "id" : 2, - "type" : "stream-exec-python-calc_1", + "id" : 10, + "type" : "stream-exec-calc_1", "projection" : [ { "kind" : "INPUT_REF", "inputIndex" : 0, + "type" : "INT" + }, { + "kind" : "LITERAL", + "value" : 2, + "type" : "BIGINT NOT NULL" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, "type" : "BIGINT" }, { - "kind" : "CALL", - "catalogName" : "`default_catalog`.`default_database`.`pyFunc`", - "operands" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 1, - "type" : "INT NOT NULL" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 1, - "type" : "INT NOT NULL" - } ], - "type" : "INT NOT NULL" + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "VARCHAR(2147483647)" } ], + "condition" : null, "inputProperties" : [ { "requiredDistribution" : { "type" : "UNKNOWN" @@ -68,11 +52,11 @@ "damBehavior" : "PIPELINED", "priority" : 0 } ], - "outputType" : "ROW<`a` BIGINT, `EXPR$1` INT NOT NULL>", - "description" : "PythonCalc(select=[a, pyFunc(b, b) AS EXPR$1])" + "outputType" : "ROW<`a` INT, `EXPR$1` BIGINT NOT NULL, `b` BIGINT, `c` VARCHAR(2147483647)>", + "description" : "Calc(select=[a, 2 AS EXPR$1, b, c])" }, { - "id" : 3, - "type" : "stream-exec-sink_1", + "id" : 11, + "type" : "stream-exec-sink_2", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", "table.exec.sink.not-null-enforcer" : "ERROR", @@ -82,25 +66,35 @@ }, "dynamicTableSink" : { "table" : { - "identifier" : "`default_catalog`.`default_database`.`MySink`", + "identifier" : "`default_catalog`.`default_database`.`sink_t`", "resolvedTable" : { "schema" : { "columns" : [ { "name" : "a", - "dataType" : "BIGINT" + "dataType" : "INT" }, { "name" : "b", - "dataType" : "INT" + "dataType" : "BIGINT" + }, { + "name" : "p", + "dataType" : "BIGINT NOT NULL" + }, { + "name" : "c", + "dataType" : "VARCHAR(2147483647)" } ] }, - "options" : { - "connector" : "values", - "table-sink-class" : "DEFAULT" - } + "partitionKeys" : [ "b" ] } - } + }, + "abilities" : [ { + "type" : "Partitioning", + "partition" : { + "b" : "2" + } + } ] }, "inputChangelogMode" : [ "INSERT" ], + "upsertMaterializeStrategy" : "MAP", "inputProperties" : [ { "requiredDistribution" : { "type" : "UNKNOWN" @@ -108,19 +102,19 @@ "damBehavior" : "PIPELINED", "priority" : 0 } ], - "outputType" : "ROW<`a` BIGINT, `EXPR$1` INT NOT NULL>", - "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[a, EXPR$1])" + "outputType" : "ROW<`a` INT, `EXPR$1` BIGINT NOT NULL, `b` BIGINT, `c` VARCHAR(2147483647)>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[a, EXPR$1, b, c])" } ], "edges" : [ { - "source" : 1, - "target" : 2, + "source" : 9, + "target" : 10, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 2, - "target" : 3, + "source" : 10, + "target" : 11, "shuffle" : { "type" : "FORWARD" }, diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-partition/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-partition/savepoint/_metadata new file mode 100644 index 00000000000..32a8e76dbe0 Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-partition/savepoint/_metadata differ diff --git a/flink-python/pyflink/table/tests/jsonplan/testGetJsonPlan.out b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-upsert/plan/sink-upsert.json similarity index 58% copy from flink-python/pyflink/table/tests/jsonplan/testGetJsonPlan.out copy to flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-upsert/plan/sink-upsert.json index 880fb2cf11d..53aa33b7e10 100644 --- a/flink-python/pyflink/table/tests/jsonplan/testGetJsonPlan.out +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-upsert/plan/sink-upsert.json @@ -1,36 +1,37 @@ { - "flinkVersion" : "", + "flinkVersion" : "2.3", "nodes" : [ { - "id" : 0, + "id" : 22, "type" : "stream-exec-table-source-scan_2", "scanTableSource" : { "table" : { - "identifier" : "`default_catalog`.`default_database`.`MyTable`", + "identifier" : "`default_catalog`.`default_database`.`source_t`", "resolvedTable" : { "schema" : { "columns" : [ { "name" : "a", - "dataType" : "BIGINT" + "dataType" : "INT NOT NULL" }, { "name" : "b", - "dataType" : "INT" + "dataType" : "BIGINT" }, { "name" : "c", "dataType" : "VARCHAR(2147483647)" - } ] - }, - "options" : { - "connector" : "datagen", - "number-of-rows" : "5" + } ], + "primaryKey" : { + "name" : "PK_a", + "type" : "PRIMARY_KEY", + "columns" : [ "a" ] + } } } } }, - "outputType" : "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>", - "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c])" + "outputType" : "ROW<`a` INT NOT NULL, `b` BIGINT, `c` VARCHAR(2147483647)>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t]], fields=[a, b, c])" }, { - "id" : 0, - "type" : "stream-exec-sink_1", + "id" : 23, + "type" : "stream-exec-sink_2", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", "table.exec.sink.not-null-enforcer" : "ERROR", @@ -40,27 +41,31 @@ }, "dynamicTableSink" : { "table" : { - "identifier" : "`default_catalog`.`default_database`.`MySink`", + "identifier" : "`default_catalog`.`default_database`.`sink_t`", "resolvedTable" : { "schema" : { "columns" : [ { "name" : "a", - "dataType" : "BIGINT" + "dataType" : "INT NOT NULL" }, { "name" : "b", - "dataType" : "INT" + "dataType" : "BIGINT" }, { "name" : "c", "dataType" : "VARCHAR(2147483647)" - } ] - }, - "options" : { - "connector" : "blackhole" + } ], + "primaryKey" : { + "name" : "PK_a", + "type" : "PRIMARY_KEY", + "columns" : [ "a" ] + } } } } }, - "inputChangelogMode" : [ "INSERT" ], + "inputChangelogMode" : [ "INSERT", "UPDATE_AFTER", "DELETE" ], + "upsertMaterializeStrategy" : "MAP", + "inputUpsertKey" : [ 0 ], "inputProperties" : [ { "requiredDistribution" : { "type" : "UNKNOWN" @@ -68,12 +73,12 @@ "damBehavior" : "PIPELINED", "priority" : 0 } ], - "outputType" : "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>", - "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[a, b, c])" + "outputType" : "ROW<`a` INT NOT NULL, `b` BIGINT, `c` VARCHAR(2147483647)>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[a, b, c])" } ], "edges" : [ { - "source" : 0, - "target" : 0, + "source" : 22, + "target" : 23, "shuffle" : { "type" : "FORWARD" }, diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-upsert/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-upsert/savepoint/_metadata new file mode 100644 index 00000000000..e9567fea0c2 Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-upsert/savepoint/_metadata differ diff --git a/flink-python/pyflink/table/tests/jsonplan/testGetJsonPlan.out b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-writing-metadata/plan/sink-writing-metadata.json similarity index 65% copy from flink-python/pyflink/table/tests/jsonplan/testGetJsonPlan.out copy to flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-writing-metadata/plan/sink-writing-metadata.json index 880fb2cf11d..405453d3f43 100644 --- a/flink-python/pyflink/table/tests/jsonplan/testGetJsonPlan.out +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-writing-metadata/plan/sink-writing-metadata.json @@ -1,36 +1,32 @@ { - "flinkVersion" : "", + "flinkVersion" : "2.3", "nodes" : [ { - "id" : 0, + "id" : 14, "type" : "stream-exec-table-source-scan_2", "scanTableSource" : { "table" : { - "identifier" : "`default_catalog`.`default_database`.`MyTable`", + "identifier" : "`default_catalog`.`default_database`.`source_t`", "resolvedTable" : { "schema" : { "columns" : [ { "name" : "a", - "dataType" : "BIGINT" + "dataType" : "INT" }, { "name" : "b", - "dataType" : "INT" + "dataType" : "BIGINT" }, { "name" : "c", "dataType" : "VARCHAR(2147483647)" } ] - }, - "options" : { - "connector" : "datagen", - "number-of-rows" : "5" } } } }, - "outputType" : "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>", - "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c])" + "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t]], fields=[a, b, c])" }, { - "id" : 0, - "type" : "stream-exec-sink_1", + "id" : 15, + "type" : "stream-exec-sink_2", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", "table.exec.sink.not-null-enforcer" : "ERROR", @@ -40,27 +36,32 @@ }, "dynamicTableSink" : { "table" : { - "identifier" : "`default_catalog`.`default_database`.`MySink`", + "identifier" : "`default_catalog`.`default_database`.`sink_t`", "resolvedTable" : { "schema" : { "columns" : [ { "name" : "a", - "dataType" : "BIGINT" + "dataType" : "INT" }, { "name" : "b", - "dataType" : "INT" + "dataType" : "BIGINT" }, { "name" : "c", - "dataType" : "VARCHAR(2147483647)" + "kind" : "METADATA", + "dataType" : "VARCHAR(2147483647)", + "isVirtual" : false } ] - }, - "options" : { - "connector" : "blackhole" } } - } + }, + "abilities" : [ { + "type" : "WritingMetadata", + "metadataKeys" : [ "c" ], + "consumedType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)> NOT NULL" + } ] }, "inputChangelogMode" : [ "INSERT" ], + "upsertMaterializeStrategy" : "MAP", "inputProperties" : [ { "requiredDistribution" : { "type" : "UNKNOWN" @@ -68,12 +69,12 @@ "damBehavior" : "PIPELINED", "priority" : 0 } ], - "outputType" : "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>", - "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[a, b, c])" + "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[a, b, c])" } ], "edges" : [ { - "source" : 0, - "target" : 0, + "source" : 14, + "target" : 15, "shuffle" : { "type" : "FORWARD" }, diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-writing-metadata/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-writing-metadata/savepoint/_metadata new file mode 100644 index 00000000000..75224ac197c Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-writing-metadata/savepoint/_metadata differ
