This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-2.3
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-2.3 by this push:
     new 1f8f870b389 [FLINK-39515][table-planner] Fix compiled plan restore for 
built-in PTFs with default args
1f8f870b389 is described below

commit 1f8f870b389044f7a7bd98dd3006c229427416b0
Author: Gustavo de Morais <[email protected]>
AuthorDate: Thu Apr 23 15:09:38 2026 +0200

    [FLINK-39515][table-planner] Fix compiled plan restore for built-in PTFs 
with default args
    
    This closes #28007.
---
 .../nodes/exec/serde/RexNodeJsonDeserializer.java  |   9 +-
 .../nodes/exec/serde/RexNodeJsonSerdeTest.java     |   8 +-
 .../exec/stream/FromChangelogRestoreTest.java      |  40 +++++++
 .../exec/stream/FromChangelogTestPrograms.java     |  40 +++++++
 .../nodes/exec/stream/ToChangelogRestoreTest.java  |  40 +++++++
 .../nodes/exec/stream/ToChangelogTestPrograms.java |  32 +++++
 .../plan/from-changelog-retract-restore.json       | 127 ++++++++++++++++++++
 .../savepoint/_metadata                            | Bin 0 -> 7308 bytes
 .../plan/to-changelog-retract-restore.json         | 132 +++++++++++++++++++++
 .../savepoint/_metadata                            | Bin 0 -> 8317 bytes
 10 files changed, 426 insertions(+), 2 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
index 49720d39cc0..0c6f3ace423 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
@@ -32,6 +32,7 @@ import 
org.apache.flink.table.planner.calcite.RexTableArgCall.SortOrder;
 import 
org.apache.flink.table.planner.functions.bridging.BridgingSqlAggFunction;
 import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction;
 import org.apache.flink.table.planner.functions.sql.BuiltInSqlOperator;
+import org.apache.flink.table.planner.functions.sql.SqlDefaultArgOperator;
 import org.apache.flink.table.planner.typeutils.SymbolUtil.SerializableSymbol;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
@@ -374,7 +375,13 @@ final class RexNodeJsonDeserializer extends 
StdDeserializer<RexNode> {
         } else {
             callType = serdeContext.getRexBuilder().deriveReturnType(operator, 
rexOperands);
         }
-        return serdeContext.getRexBuilder().makeCall(callType, operator, 
rexOperands);
+        // SqlDefaultArgOperator is constructed per-call site by 
FlinkSqlCallBinding and not
+        // registered in any operator table. Rebuild the typed Flink instance 
here.
+        final SqlOperator effectiveOperator =
+                operator.getKind() == SqlKind.DEFAULT
+                        ? new SqlDefaultArgOperator(callType)
+                        : operator;
+        return serdeContext.getRexBuilder().makeCall(callType, 
effectiveOperator, rexOperands);
     }
 
     // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerdeTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerdeTest.java
index d5fdb0709df..2cd692ca097 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerdeTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerdeTest.java
@@ -45,6 +45,7 @@ import org.apache.flink.table.planner.calcite.RexTableArgCall;
 import org.apache.flink.table.planner.calcite.RexTableArgCall.SortOrder;
 import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction;
 import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable;
+import org.apache.flink.table.planner.functions.sql.SqlDefaultArgOperator;
 import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils;
 import org.apache.flink.table.types.inference.TypeInference;
 import org.apache.flink.table.types.inference.TypeStrategies;
@@ -86,6 +87,7 @@ import java.io.IOException;
 import java.math.BigDecimal;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -841,7 +843,11 @@ public class RexNodeJsonSerdeTest {
                         0,
                         new int[] {1},
                         new int[] {0},
-                        new SortOrder[] {SortOrder.ASC_NULLS_LAST}));
+                        new SortOrder[] {SortOrder.ASC_NULLS_LAST}),
+                rexBuilder.makeCall(
+                        FACTORY.createSqlType(SqlTypeName.VARCHAR),
+                        new 
SqlDefaultArgOperator(FACTORY.createSqlType(SqlTypeName.VARCHAR)),
+                        List.of()));
     }
 
     // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogRestoreTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogRestoreTest.java
new file mode 100644
index 00000000000..7cd25c35ab5
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogRestoreTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
+import org.apache.flink.table.test.program.TableTestProgram;
+
+import java.util.List;
+
+/**
+ * Restore tests for the built-in {@link
+ * org.apache.flink.table.functions.BuiltInFunctionDefinitions#FROM_CHANGELOG} 
PTF.
+ */
+public class FromChangelogRestoreTest extends RestoreTestBase {
+
+    public FromChangelogRestoreTest() {
+        super(StreamExecProcessTableFunction.class);
+    }
+
+    @Override
+    public List<TableTestProgram> programs() {
+        return List.of(FromChangelogTestPrograms.RETRACT_RESTORE);
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java
index 56f2c422b1a..725b4a3d2d0 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java
@@ -183,6 +183,46 @@ public class FromChangelogTestPrograms {
                                     + "input => TABLE changelog_view)")
                     .build();
 
+    // 
--------------------------------------------------------------------------------------------
+    // Restore tests
+    // 
--------------------------------------------------------------------------------------------
+
+    /**
+     * Append source with retract op codes through FROM_CHANGELOG, split 
across a compiled-plan +
+     * savepoint restore.
+     */
+    public static final TableTestProgram RETRACT_RESTORE =
+            TableTestProgram.of(
+                            "from-changelog-retract-restore",
+                            "FROM_CHANGELOG over an append CDC source restores 
via compiled plan "
+                                    + "+ savepoint")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("cdc_stream")
+                                    .addSchema(SIMPLE_CDC_SCHEMA)
+                                    .producedBeforeRestore(
+                                            Row.of(1, "INSERT", "Alice"),
+                                            Row.of(2, "INSERT", "Bob"))
+                                    .producedAfterRestore(
+                                            Row.of(1, "UPDATE_BEFORE", 
"Alice"),
+                                            Row.of(1, "UPDATE_AFTER", 
"Alice2"),
+                                            Row.of(2, "DELETE", "Bob"))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("id INT", "name STRING")
+                                    .consumedBeforeRestore(
+                                            Row.ofKind(RowKind.INSERT, 1, 
"Alice"),
+                                            Row.ofKind(RowKind.INSERT, 2, 
"Bob"))
+                                    .consumedAfterRestore(
+                                            Row.ofKind(RowKind.UPDATE_BEFORE, 
1, "Alice"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
1, "Alice2"),
+                                            Row.ofKind(RowKind.DELETE, 2, 
"Bob"))
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink SELECT * FROM FROM_CHANGELOG("
+                                    + "input => TABLE cdc_stream)")
+                    .build();
+
     // 
--------------------------------------------------------------------------------------------
     // Error validation tests
     // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogRestoreTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogRestoreTest.java
new file mode 100644
index 00000000000..5b7258946d7
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogRestoreTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
+import org.apache.flink.table.test.program.TableTestProgram;
+
+import java.util.List;
+
+/**
+ * Restore tests for the built-in {@link
+ * org.apache.flink.table.functions.BuiltInFunctionDefinitions#TO_CHANGELOG} 
PTF.
+ */
+public class ToChangelogRestoreTest extends RestoreTestBase {
+
+    public ToChangelogRestoreTest() {
+        super(StreamExecProcessTableFunction.class);
+    }
+
+    @Override
+    public List<TableTestProgram> programs() {
+        return List.of(ToChangelogTestPrograms.RETRACT_RESTORE);
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
index 4056b652a79..d6da85f8c0c 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
@@ -89,6 +89,38 @@ public class ToChangelogTestPrograms {
                     .runSql("INSERT INTO sink SELECT * FROM TO_CHANGELOG(input 
=> TABLE t)")
                     .build();
 
+    /** Retract input through TO_CHANGELOG, split across a compiled-plan + 
savepoint restore. */
+    public static final TableTestProgram RETRACT_RESTORE =
+            TableTestProgram.of(
+                            "to-changelog-retract-restore",
+                            "TO_CHANGELOG over a retract source restores via 
compiled plan + "
+                                    + "savepoint")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("t")
+                                    .addSchema(
+                                            "name STRING PRIMARY KEY NOT 
ENFORCED", "score BIGINT")
+                                    .addMode(ChangelogMode.all())
+                                    .producedBeforeRestore(
+                                            Row.ofKind(RowKind.INSERT, 
"Alice", 10L),
+                                            Row.ofKind(RowKind.INSERT, "Bob", 
20L))
+                                    .producedAfterRestore(
+                                            Row.ofKind(RowKind.UPDATE_BEFORE, 
"Alice", 10L),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"Alice", 30L),
+                                            Row.ofKind(RowKind.DELETE, "Bob", 
20L))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("op STRING", "name STRING", 
"score BIGINT")
+                                    .consumedBeforeRestore(
+                                            "+I[INSERT, Alice, 10]", 
"+I[INSERT, Bob, 20]")
+                                    .consumedAfterRestore(
+                                            "+I[UPDATE_BEFORE, Alice, 10]",
+                                            "+I[UPDATE_AFTER, Alice, 30]",
+                                            "+I[DELETE, Bob, 20]")
+                                    .build())
+                    .runSql("INSERT INTO sink SELECT * FROM TO_CHANGELOG(input 
=> TABLE t)")
+                    .build();
+
     public static final TableTestProgram UPSERT_INPUT =
             TableTestProgram.of(
                             "to-changelog-upsert-input",
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/from-changelog-retract-restore/plan/from-changelog-retract-restore.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/from-changelog-retract-restore/plan/from-changelog-retract-restore.json
new file mode 100644
index 00000000000..b0e21b27bd1
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/from-changelog-retract-restore/plan/from-changelog-retract-restore.json
@@ -0,0 +1,127 @@
+{
+  "flinkVersion" : "2.3",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "stream-exec-table-source-scan_2",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`cdc_stream`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "id",
+              "dataType" : "INT"
+            }, {
+              "name" : "op",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "name",
+              "dataType" : "VARCHAR(2147483647)"
+            } ]
+          }
+        }
+      }
+    },
+    "outputType" : "ROW<`id` INT, `op` VARCHAR(2147483647), `name` 
VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, cdc_stream]], fields=[id, op, name])"
+  }, {
+    "id" : 2,
+    "type" : "stream-exec-process-table-function_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`id` INT, `name` VARCHAR(2147483647)>",
+    "description" : 
"ProcessTableFunction(invocation=[FROM_CHANGELOG(TABLE(#0), DEFAULT(), 
DEFAULT(), DEFAULT(), DEFAULT())], uid=[null], select=[id,name], 
rowType=[RecordType(INTEGER id, VARCHAR(2147483647) name)])",
+    "uid" : null,
+    "functionCall" : {
+      "kind" : "CALL",
+      "internalName" : "$FROM_CHANGELOG$1",
+      "operands" : [ {
+        "kind" : "TABLE_ARG_CALL",
+        "inputIndex" : 0,
+        "partitionKeys" : [ ],
+        "orderKeys" : [ ],
+        "orderDirections" : [ ],
+        "type" : "ROW<`id` INT, `op` VARCHAR(2147483647), `name` 
VARCHAR(2147483647)> NOT NULL"
+      }, {
+        "kind" : "CALL",
+        "syntax" : "SPECIAL",
+        "internalName" : "$DEFAULT$1",
+        "type" : "DESCRIPTOR"
+      }, {
+        "kind" : "CALL",
+        "syntax" : "SPECIAL",
+        "internalName" : "$DEFAULT$1",
+        "type" : "MAP<VARCHAR(2147483647), VARCHAR(2147483647)>"
+      }, {
+        "kind" : "CALL",
+        "syntax" : "SPECIAL",
+        "internalName" : "$DEFAULT$1",
+        "type" : "DESCRIPTOR"
+      }, {
+        "kind" : "CALL",
+        "syntax" : "SPECIAL",
+        "internalName" : "$DEFAULT$1",
+        "type" : "VARCHAR(2147483647)"
+      } ],
+      "type" : "ROW<`id` INT, `name` VARCHAR(2147483647)> NOT NULL"
+    },
+    "inputChangelogModes" : [ [ "INSERT" ] ],
+    "outputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", 
"DELETE" ]
+  }, {
+    "id" : 3,
+    "type" : "stream-exec-sink_2",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "id",
+              "dataType" : "INT"
+            }, {
+              "name" : "name",
+              "dataType" : "VARCHAR(2147483647)"
+            } ]
+          }
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", 
"DELETE" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`id` INT, `name` VARCHAR(2147483647)>",
+    "description" : "Sink(table=[default_catalog.default_database.sink], 
fields=[id, name])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/from-changelog-retract-restore/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/from-changelog-retract-restore/savepoint/_metadata
new file mode 100644
index 00000000000..c204f7502ec
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/from-changelog-retract-restore/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-retract-restore/plan/to-changelog-retract-restore.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-retract-restore/plan/to-changelog-retract-restore.json
new file mode 100644
index 00000000000..eff7936df3a
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-retract-restore/plan/to-changelog-retract-restore.json
@@ -0,0 +1,132 @@
+{
+  "flinkVersion" : "2.3",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "stream-exec-table-source-scan_2",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "name",
+              "dataType" : "VARCHAR(2147483647) NOT NULL"
+            }, {
+              "name" : "score",
+              "dataType" : "BIGINT"
+            } ],
+            "primaryKey" : {
+              "name" : "PK_name",
+              "type" : "PRIMARY_KEY",
+              "columns" : [ "name" ]
+            }
+          }
+        }
+      }
+    },
+    "outputType" : "ROW<`name` VARCHAR(2147483647) NOT NULL, `score` BIGINT>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, t]], fields=[name, score])"
+  }, {
+    "id" : 2,
+    "type" : "stream-exec-process-table-function_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`op` VARCHAR(2147483647), `name` VARCHAR(2147483647) 
NOT NULL, `score` BIGINT>",
+    "description" : "ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0), 
DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], uid=[null], 
select=[op,name,score], rowType=[RecordType(VARCHAR(2147483647) op, 
VARCHAR(2147483647) name, BIGINT score)])",
+    "uid" : null,
+    "functionCall" : {
+      "kind" : "CALL",
+      "internalName" : "$TO_CHANGELOG$1",
+      "operands" : [ {
+        "kind" : "TABLE_ARG_CALL",
+        "inputIndex" : 0,
+        "partitionKeys" : [ ],
+        "orderKeys" : [ ],
+        "orderDirections" : [ ],
+        "type" : "ROW<`name` VARCHAR(2147483647) NOT NULL, `score` BIGINT> NOT 
NULL"
+      }, {
+        "kind" : "CALL",
+        "syntax" : "SPECIAL",
+        "internalName" : "$DEFAULT$1",
+        "type" : "DESCRIPTOR"
+      }, {
+        "kind" : "CALL",
+        "syntax" : "SPECIAL",
+        "internalName" : "$DEFAULT$1",
+        "type" : "MAP<VARCHAR(2147483647), VARCHAR(2147483647)>"
+      }, {
+        "kind" : "CALL",
+        "syntax" : "SPECIAL",
+        "internalName" : "$DEFAULT$1",
+        "type" : "DESCRIPTOR"
+      }, {
+        "kind" : "CALL",
+        "syntax" : "SPECIAL",
+        "internalName" : "$DEFAULT$1",
+        "type" : "VARCHAR(2147483647)"
+      } ],
+      "type" : "ROW<`op` VARCHAR(2147483647), `name` VARCHAR(2147483647) NOT 
NULL, `score` BIGINT> NOT NULL"
+    },
+    "inputChangelogModes" : [ [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", 
"DELETE" ] ],
+    "outputChangelogMode" : [ "INSERT" ]
+  }, {
+    "id" : 3,
+    "type" : "stream-exec-sink_2",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "op",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "name",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "score",
+              "dataType" : "BIGINT"
+            } ]
+          }
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`op` VARCHAR(2147483647), `name` VARCHAR(2147483647) 
NOT NULL, `score` BIGINT>",
+    "description" : "Sink(table=[default_catalog.default_database.sink], 
fields=[op, name, score])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-retract-restore/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-retract-restore/savepoint/_metadata
new file mode 100644
index 00000000000..06eb683537d
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-retract-restore/savepoint/_metadata
 differ

Reply via email to