This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new de207e1deaf [FLINK-39515][table-planner] Fix compiled plan restore for
built-in PTFs with default args
de207e1deaf is described below
commit de207e1deaf38ab3412ebfd8474f2d46afd3767c
Author: Gustavo de Morais <[email protected]>
AuthorDate: Thu Apr 23 10:17:49 2026 +0200
[FLINK-39515][table-planner] Fix compiled plan restore for built-in PTFs
with default args
This closes #27996.
---
.../nodes/exec/serde/RexNodeJsonDeserializer.java | 9 +-
.../nodes/exec/serde/RexNodeJsonSerdeTest.java | 8 +-
.../exec/stream/FromChangelogRestoreTest.java | 40 +++++++
.../exec/stream/FromChangelogTestPrograms.java | 40 +++++++
.../nodes/exec/stream/ToChangelogRestoreTest.java | 40 +++++++
.../nodes/exec/stream/ToChangelogTestPrograms.java | 32 +++++
.../plan/from-changelog-retract-restore.json | 128 ++++++++++++++++++++
.../savepoint/_metadata | Bin 0 -> 7310 bytes
.../plan/to-changelog-retract-restore.json | 133 +++++++++++++++++++++
.../savepoint/_metadata | Bin 0 -> 8315 bytes
10 files changed, 428 insertions(+), 2 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
index 49720d39cc0..0c6f3ace423 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
@@ -32,6 +32,7 @@ import
org.apache.flink.table.planner.calcite.RexTableArgCall.SortOrder;
import
org.apache.flink.table.planner.functions.bridging.BridgingSqlAggFunction;
import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction;
import org.apache.flink.table.planner.functions.sql.BuiltInSqlOperator;
+import org.apache.flink.table.planner.functions.sql.SqlDefaultArgOperator;
import org.apache.flink.table.planner.typeutils.SymbolUtil.SerializableSymbol;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
@@ -374,7 +375,13 @@ final class RexNodeJsonDeserializer extends
StdDeserializer<RexNode> {
} else {
callType = serdeContext.getRexBuilder().deriveReturnType(operator,
rexOperands);
}
- return serdeContext.getRexBuilder().makeCall(callType, operator,
rexOperands);
+ // SqlDefaultArgOperator is constructed per-call site by
FlinkSqlCallBinding and not
+ // registered in any operator table. Rebuild the typed Flink instance
here.
+ final SqlOperator effectiveOperator =
+ operator.getKind() == SqlKind.DEFAULT
+ ? new SqlDefaultArgOperator(callType)
+ : operator;
+ return serdeContext.getRexBuilder().makeCall(callType,
effectiveOperator, rexOperands);
}
//
--------------------------------------------------------------------------------------------
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerdeTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerdeTest.java
index d5fdb0709df..2cd692ca097 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerdeTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerdeTest.java
@@ -45,6 +45,7 @@ import org.apache.flink.table.planner.calcite.RexTableArgCall;
import org.apache.flink.table.planner.calcite.RexTableArgCall.SortOrder;
import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction;
import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable;
+import org.apache.flink.table.planner.functions.sql.SqlDefaultArgOperator;
import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils;
import org.apache.flink.table.types.inference.TypeInference;
import org.apache.flink.table.types.inference.TypeStrategies;
@@ -86,6 +87,7 @@ import java.io.IOException;
import java.math.BigDecimal;
import java.util.Arrays;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -841,7 +843,11 @@ public class RexNodeJsonSerdeTest {
0,
new int[] {1},
new int[] {0},
- new SortOrder[] {SortOrder.ASC_NULLS_LAST}));
+ new SortOrder[] {SortOrder.ASC_NULLS_LAST}),
+ rexBuilder.makeCall(
+ FACTORY.createSqlType(SqlTypeName.VARCHAR),
+ new
SqlDefaultArgOperator(FACTORY.createSqlType(SqlTypeName.VARCHAR)),
+ List.of()));
}
//
--------------------------------------------------------------------------------------------
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogRestoreTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogRestoreTest.java
new file mode 100644
index 00000000000..7cd25c35ab5
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogRestoreTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import
org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
+import org.apache.flink.table.test.program.TableTestProgram;
+
+import java.util.List;
+
+/**
+ * Restore tests for the built-in {@link
+ * org.apache.flink.table.functions.BuiltInFunctionDefinitions#FROM_CHANGELOG}
PTF.
+ */
+public class FromChangelogRestoreTest extends RestoreTestBase {
+
+ public FromChangelogRestoreTest() {
+ super(StreamExecProcessTableFunction.class);
+ }
+
+ @Override
+ public List<TableTestProgram> programs() {
+ return List.of(FromChangelogTestPrograms.RETRACT_RESTORE);
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java
index 56f2c422b1a..725b4a3d2d0 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java
@@ -183,6 +183,46 @@ public class FromChangelogTestPrograms {
+ "input => TABLE changelog_view)")
.build();
+ //
--------------------------------------------------------------------------------------------
+ // Restore tests
+ //
--------------------------------------------------------------------------------------------
+
+ /**
+ * Append source with retract op codes through FROM_CHANGELOG, split
across a compiled-plan +
+ * savepoint restore.
+ */
+ public static final TableTestProgram RETRACT_RESTORE =
+ TableTestProgram.of(
+ "from-changelog-retract-restore",
+ "FROM_CHANGELOG over an append CDC source restores
via compiled plan "
+ + "+ savepoint")
+ .setupTableSource(
+ SourceTestStep.newBuilder("cdc_stream")
+ .addSchema(SIMPLE_CDC_SCHEMA)
+ .producedBeforeRestore(
+ Row.of(1, "INSERT", "Alice"),
+ Row.of(2, "INSERT", "Bob"))
+ .producedAfterRestore(
+ Row.of(1, "UPDATE_BEFORE",
"Alice"),
+ Row.of(1, "UPDATE_AFTER",
"Alice2"),
+ Row.of(2, "DELETE", "Bob"))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink")
+ .addSchema("id INT", "name STRING")
+ .consumedBeforeRestore(
+ Row.ofKind(RowKind.INSERT, 1,
"Alice"),
+ Row.ofKind(RowKind.INSERT, 2,
"Bob"))
+ .consumedAfterRestore(
+ Row.ofKind(RowKind.UPDATE_BEFORE,
1, "Alice"),
+ Row.ofKind(RowKind.UPDATE_AFTER,
1, "Alice2"),
+ Row.ofKind(RowKind.DELETE, 2,
"Bob"))
+ .build())
+ .runSql(
+ "INSERT INTO sink SELECT * FROM FROM_CHANGELOG("
+ + "input => TABLE cdc_stream)")
+ .build();
+
//
--------------------------------------------------------------------------------------------
// Error validation tests
//
--------------------------------------------------------------------------------------------
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogRestoreTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogRestoreTest.java
new file mode 100644
index 00000000000..5b7258946d7
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogRestoreTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import
org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
+import org.apache.flink.table.test.program.TableTestProgram;
+
+import java.util.List;
+
+/**
+ * Restore tests for the built-in {@link
+ * org.apache.flink.table.functions.BuiltInFunctionDefinitions#TO_CHANGELOG}
PTF.
+ */
+public class ToChangelogRestoreTest extends RestoreTestBase {
+
+ public ToChangelogRestoreTest() {
+ super(StreamExecProcessTableFunction.class);
+ }
+
+ @Override
+ public List<TableTestProgram> programs() {
+ return List.of(ToChangelogTestPrograms.RETRACT_RESTORE);
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
index 4056b652a79..d6da85f8c0c 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
@@ -89,6 +89,38 @@ public class ToChangelogTestPrograms {
.runSql("INSERT INTO sink SELECT * FROM TO_CHANGELOG(input
=> TABLE t)")
.build();
+ /** Retract input through TO_CHANGELOG, split across a compiled-plan +
savepoint restore. */
+ public static final TableTestProgram RETRACT_RESTORE =
+ TableTestProgram.of(
+ "to-changelog-retract-restore",
+ "TO_CHANGELOG over a retract source restores via
compiled plan + "
+ + "savepoint")
+ .setupTableSource(
+ SourceTestStep.newBuilder("t")
+ .addSchema(
+ "name STRING PRIMARY KEY NOT
ENFORCED", "score BIGINT")
+ .addMode(ChangelogMode.all())
+ .producedBeforeRestore(
+ Row.ofKind(RowKind.INSERT,
"Alice", 10L),
+ Row.ofKind(RowKind.INSERT, "Bob",
20L))
+ .producedAfterRestore(
+ Row.ofKind(RowKind.UPDATE_BEFORE,
"Alice", 10L),
+ Row.ofKind(RowKind.UPDATE_AFTER,
"Alice", 30L),
+ Row.ofKind(RowKind.DELETE, "Bob",
20L))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink")
+ .addSchema("op STRING", "name STRING",
"score BIGINT")
+ .consumedBeforeRestore(
+ "+I[INSERT, Alice, 10]",
"+I[INSERT, Bob, 20]")
+ .consumedAfterRestore(
+ "+I[UPDATE_BEFORE, Alice, 10]",
+ "+I[UPDATE_AFTER, Alice, 30]",
+ "+I[DELETE, Bob, 20]")
+ .build())
+ .runSql("INSERT INTO sink SELECT * FROM TO_CHANGELOG(input
=> TABLE t)")
+ .build();
+
public static final TableTestProgram UPSERT_INPUT =
TableTestProgram.of(
"to-changelog-upsert-input",
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/from-changelog-retract-restore/plan/from-changelog-retract-restore.json
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/from-changelog-retract-restore/plan/from-changelog-retract-restore.json
new file mode 100644
index 00000000000..8cdaa6e5449
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/from-changelog-retract-restore/plan/from-changelog-retract-restore.json
@@ -0,0 +1,128 @@
+{
+ "flinkVersion" : "2.4",
+ "nodes" : [ {
+ "id" : 1,
+ "type" : "stream-exec-table-source-scan_2",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`cdc_stream`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "id",
+ "dataType" : "INT"
+ }, {
+ "name" : "op",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "name",
+ "dataType" : "VARCHAR(2147483647)"
+ } ]
+ }
+ }
+ }
+ },
+ "outputType" : "ROW<`id` INT, `op` VARCHAR(2147483647), `name`
VARCHAR(2147483647)>",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, cdc_stream]], fields=[id, op, name])"
+ }, {
+ "id" : 2,
+ "type" : "stream-exec-process-table-function_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`id` INT, `name` VARCHAR(2147483647)>",
+ "description" :
"ProcessTableFunction(invocation=[FROM_CHANGELOG(TABLE(#0), DEFAULT(),
DEFAULT(), DEFAULT(), DEFAULT())], uid=[null], select=[id,name],
rowType=[RecordType(INTEGER id, VARCHAR(2147483647) name)])",
+ "uid" : null,
+ "functionCall" : {
+ "kind" : "CALL",
+ "internalName" : "$FROM_CHANGELOG$1",
+ "operands" : [ {
+ "kind" : "TABLE_ARG_CALL",
+ "inputIndex" : 0,
+ "partitionKeys" : [ ],
+ "orderKeys" : [ ],
+ "orderDirections" : [ ],
+ "type" : "ROW<`id` INT, `op` VARCHAR(2147483647), `name`
VARCHAR(2147483647)> NOT NULL"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$DEFAULT$1",
+ "type" : "DESCRIPTOR"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$DEFAULT$1",
+ "type" : "MAP<VARCHAR(2147483647), VARCHAR(2147483647)>"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$DEFAULT$1",
+ "type" : "DESCRIPTOR"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$DEFAULT$1",
+ "type" : "VARCHAR(2147483647)"
+ } ],
+ "type" : "ROW<`id` INT, `name` VARCHAR(2147483647)> NOT NULL"
+ },
+ "inputChangelogModes" : [ [ "INSERT" ] ],
+ "outputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER",
"DELETE" ]
+ }, {
+ "id" : 3,
+ "type" : "stream-exec-sink_2",
+ "configuration" : {
+ "table.exec.sink.keyed-shuffle" : "AUTO",
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.rowtime-inserter" : "ENABLED",
+ "table.exec.sink.type-length-enforcer" : "IGNORE",
+ "table.exec.sink.upsert-materialize" : "AUTO"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`sink`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "id",
+ "dataType" : "INT"
+ }, {
+ "name" : "name",
+ "dataType" : "VARCHAR(2147483647)"
+ } ]
+ }
+ }
+ }
+ },
+ "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER",
"DELETE" ],
+ "upsertMaterializeStrategy" : "ADAPTIVE",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`id` INT, `name` VARCHAR(2147483647)>",
+ "description" : "Sink(table=[default_catalog.default_database.sink],
fields=[id, name])"
+ } ],
+ "edges" : [ {
+ "source" : 1,
+ "target" : 2,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 2,
+ "target" : 3,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/from-changelog-retract-restore/savepoint/_metadata
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/from-changelog-retract-restore/savepoint/_metadata
new file mode 100644
index 00000000000..ccf7b073b66
Binary files /dev/null and
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/from-changelog-retract-restore/savepoint/_metadata
differ
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-retract-restore/plan/to-changelog-retract-restore.json
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-retract-restore/plan/to-changelog-retract-restore.json
new file mode 100644
index 00000000000..959c9473118
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-retract-restore/plan/to-changelog-retract-restore.json
@@ -0,0 +1,133 @@
+{
+ "flinkVersion" : "2.4",
+ "nodes" : [ {
+ "id" : 1,
+ "type" : "stream-exec-table-source-scan_2",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "name",
+ "dataType" : "VARCHAR(2147483647) NOT NULL"
+ }, {
+ "name" : "score",
+ "dataType" : "BIGINT"
+ } ],
+ "primaryKey" : {
+ "name" : "PK_name",
+ "type" : "PRIMARY_KEY",
+ "columns" : [ "name" ]
+ }
+ }
+ }
+ }
+ },
+ "outputType" : "ROW<`name` VARCHAR(2147483647) NOT NULL, `score` BIGINT>",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, t]], fields=[name, score])"
+ }, {
+ "id" : 2,
+ "type" : "stream-exec-process-table-function_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`op` VARCHAR(2147483647), `name` VARCHAR(2147483647)
NOT NULL, `score` BIGINT>",
+ "description" : "ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0),
DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], uid=[null],
select=[op,name,score], rowType=[RecordType(VARCHAR(2147483647) op,
VARCHAR(2147483647) name, BIGINT score)])",
+ "uid" : null,
+ "functionCall" : {
+ "kind" : "CALL",
+ "internalName" : "$TO_CHANGELOG$1",
+ "operands" : [ {
+ "kind" : "TABLE_ARG_CALL",
+ "inputIndex" : 0,
+ "partitionKeys" : [ ],
+ "orderKeys" : [ ],
+ "orderDirections" : [ ],
+ "type" : "ROW<`name` VARCHAR(2147483647) NOT NULL, `score` BIGINT> NOT
NULL"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$DEFAULT$1",
+ "type" : "DESCRIPTOR"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$DEFAULT$1",
+ "type" : "MAP<VARCHAR(2147483647), VARCHAR(2147483647)>"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$DEFAULT$1",
+ "type" : "DESCRIPTOR"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$DEFAULT$1",
+ "type" : "VARCHAR(2147483647)"
+ } ],
+ "type" : "ROW<`op` VARCHAR(2147483647), `name` VARCHAR(2147483647) NOT
NULL, `score` BIGINT> NOT NULL"
+ },
+ "inputChangelogModes" : [ [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER",
"DELETE" ] ],
+ "outputChangelogMode" : [ "INSERT" ]
+ }, {
+ "id" : 3,
+ "type" : "stream-exec-sink_2",
+ "configuration" : {
+ "table.exec.sink.keyed-shuffle" : "AUTO",
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.rowtime-inserter" : "ENABLED",
+ "table.exec.sink.type-length-enforcer" : "IGNORE",
+ "table.exec.sink.upsert-materialize" : "AUTO"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`sink`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "op",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "name",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "score",
+ "dataType" : "BIGINT"
+ } ]
+ }
+ }
+ }
+ },
+ "inputChangelogMode" : [ "INSERT" ],
+ "upsertMaterializeStrategy" : "ADAPTIVE",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`op` VARCHAR(2147483647), `name` VARCHAR(2147483647)
NOT NULL, `score` BIGINT>",
+ "description" : "Sink(table=[default_catalog.default_database.sink],
fields=[op, name, score])"
+ } ],
+ "edges" : [ {
+ "source" : 1,
+ "target" : 2,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 2,
+ "target" : 3,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-retract-restore/savepoint/_metadata
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-retract-restore/savepoint/_metadata
new file mode 100644
index 00000000000..fd50f78d013
Binary files /dev/null and
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-retract-restore/savepoint/_metadata
differ