This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c89933e99d5087f81389560663984012733d3bf8
Author: bvarghese1 <bvargh...@confluent.io>
AuthorDate: Mon Jan 8 09:56:25 2024 -0800

    [FLINK-33896] Implement restore tests for Correlate node
---
 .../nodes/exec/stream/CorrelateRestoreTest.java    |  43 +++++
 .../nodes/exec/stream/CorrelateTestPrograms.java   | 174 +++++++++++++++++
 .../plan/correlate-catalog-func.json               | 145 ++++++++++++++
 .../correlate-catalog-func/savepoint/_metadata     | Bin 0 -> 7245 bytes
 .../plan/correlate-cross-join-unnest.json          | 138 ++++++++++++++
 .../savepoint/_metadata                            | Bin 0 -> 7091 bytes
 .../plan/correlate-join-filter.json                | 212 +++++++++++++++++++++
 .../correlate-join-filter/savepoint/_metadata      | Bin 0 -> 7120 bytes
 .../plan/correlate-left-join.json                  | 141 ++++++++++++++
 .../correlate-left-join/savepoint/_metadata        | Bin 0 -> 7245 bytes
 .../plan/correlate-system-func.json                | 145 ++++++++++++++
 .../correlate-system-func/savepoint/_metadata      | Bin 0 -> 7245 bytes
 12 files changed, 998 insertions(+)

diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateRestoreTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateRestoreTest.java
new file mode 100644
index 00000000000..cc24919cdca
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateRestoreTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
+import org.apache.flink.table.test.program.TableTestProgram;
+
+import java.util.Arrays;
+import java.util.List;
+
+/** Restore tests for {@link StreamExecCorrelate}. */
+public class CorrelateRestoreTest extends RestoreTestBase {
+
+    public CorrelateRestoreTest() {
+        super(StreamExecCorrelate.class);
+    }
+
+    @Override
+    public List<TableTestProgram> programs() {
+        return Arrays.asList(
+                CorrelateTestPrograms.CORRELATE_CATALOG_FUNC,
+                CorrelateTestPrograms.CORRELATE_SYSTEM_FUNC,
+                CorrelateTestPrograms.CORRELATE_JOIN_FILTER,
+                CorrelateTestPrograms.CORRELATE_LEFT_JOIN,
+                CorrelateTestPrograms.CORRELATE_CROSS_JOIN_UNNEST);
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateTestPrograms.java
new file mode 100644
index 00000000000..d1a2a1e46e3
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateTestPrograms.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedTableFunctions.StringSplit;
+import org.apache.flink.table.planner.utils.TableFunc1;
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+
+/** {@link TableTestProgram} definitions for testing {@link 
StreamExecCorrelate}. */
+public class CorrelateTestPrograms {
+
+    static final Row[] BEFORE_DATA = {Row.of(1L, 1, "hi#there"), Row.of(2L, 2, 
"hello#world")};
+
+    static final Row[] AFTER_DATA = {
+        Row.of(4L, 4, "foo#bar"), Row.of(3L, 3, "bar#fiz"),
+    };
+
+    static final String[] SOURCE_SCHEMA = {"a BIGINT", "b INT NOT NULL", "c 
VARCHAR"};
+
+    static final TableTestProgram CORRELATE_CATALOG_FUNC =
+            TableTestProgram.of(
+                            "correlate-catalog-func",
+                            "validate correlate with temporary catalog 
function")
+                    .setupTemporaryCatalogFunction("func1", TableFunc1.class)
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    .addSchema(SOURCE_SCHEMA)
+                                    .producedBeforeRestore(BEFORE_DATA)
+                                    .producedAfterRestore(AFTER_DATA)
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema("a VARCHAR", "b VARCHAR")
+                                    .consumedBeforeRestore(
+                                            "+I[hi#there, $hi]",
+                                            "+I[hi#there, $there]",
+                                            "+I[hello#world, $hello]",
+                                            "+I[hello#world, $world]")
+                                    .consumedAfterRestore(
+                                            "+I[foo#bar, $foo]",
+                                            "+I[foo#bar, $bar]",
+                                            "+I[bar#fiz, $bar]",
+                                            "+I[bar#fiz, $fiz]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink_t SELECT c, s FROM source_t, 
LATERAL TABLE(func1(c, '$')) AS T(s)")
+                    .build();
+
+    static final TableTestProgram CORRELATE_SYSTEM_FUNC =
+            TableTestProgram.of(
+                            "correlate-system-func",
+                            "validate correlate with temporary system 
function")
+                    .setupTemporarySystemFunction("STRING_SPLIT", 
StringSplit.class)
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    .addSchema(SOURCE_SCHEMA)
+                                    .producedBeforeRestore(BEFORE_DATA)
+                                    .producedAfterRestore(AFTER_DATA)
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema("a VARCHAR", "b VARCHAR")
+                                    .consumedBeforeRestore(
+                                            "+I[hi#there, hi]",
+                                            "+I[hi#there, there]",
+                                            "+I[hello#world, hello]",
+                                            "+I[hello#world, world]")
+                                    .consumedAfterRestore(
+                                            "+I[foo#bar, foo]",
+                                            "+I[foo#bar, bar]",
+                                            "+I[bar#fiz, bar]",
+                                            "+I[bar#fiz, fiz]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink_t SELECT c, s FROM source_t, 
LATERAL TABLE(STRING_SPLIT(c, '#')) AS T(s)")
+                    .build();
+
+    static final TableTestProgram CORRELATE_JOIN_FILTER =
+            TableTestProgram.of("correlate-join-filter", "validate correlate 
with join and filter")
+                    .setupTemporaryCatalogFunction("func1", TableFunc1.class)
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    .addSchema(SOURCE_SCHEMA)
+                                    .producedBeforeRestore(BEFORE_DATA)
+                                    .producedAfterRestore(AFTER_DATA)
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema("a VARCHAR", "b VARCHAR")
+                                    .consumedBeforeRestore(
+                                            "+I[hello#world, hello]", 
"+I[hello#world, world]")
+                                    .consumedAfterRestore("+I[bar#fiz, bar]", 
"+I[bar#fiz, fiz]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink_t SELECT * FROM (SELECT c, s 
FROM source_t, LATERAL TABLE(func1(c)) AS T(s)) AS T2 WHERE c LIKE '%hello%' OR 
c LIKE '%fiz%'")
+                    .build();
+
+    static final TableTestProgram CORRELATE_LEFT_JOIN =
+            TableTestProgram.of("correlate-left-join", "validate correlate 
with left join")
+                    .setupTemporaryCatalogFunction("func1", TableFunc1.class)
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    .addSchema(SOURCE_SCHEMA)
+                                    .producedBeforeRestore(BEFORE_DATA)
+                                    .producedAfterRestore(AFTER_DATA)
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema("a VARCHAR", "b VARCHAR")
+                                    .consumedBeforeRestore(
+                                            "+I[hi#there, hi]",
+                                            "+I[hi#there, there]",
+                                            "+I[hello#world, hello]",
+                                            "+I[hello#world, world]")
+                                    .consumedAfterRestore(
+                                            "+I[foo#bar, foo]",
+                                            "+I[foo#bar, bar]",
+                                            "+I[bar#fiz, bar]",
+                                            "+I[bar#fiz, fiz]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink_t SELECT c, s FROM source_t LEFT 
JOIN LATERAL TABLE(func1(c)) AS T(s) ON TRUE")
+                    .build();
+
+    static final TableTestProgram CORRELATE_CROSS_JOIN_UNNEST =
+            TableTestProgram.of(
+                            "correlate-cross-join-unnest",
+                            "validate correlate with cross join and unnest")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    .addSchema("name STRING", "arr 
ARRAY<ROW<nested STRING>>")
+                                    .producedBeforeRestore(
+                                            Row.of(
+                                                    "Bob",
+                                                    new Row[] {
+                                                        Row.of("1"), 
Row.of("2"), Row.of("3")
+                                                    }))
+                                    .producedAfterRestore(
+                                            Row.of(
+                                                    "Alice",
+                                                    new Row[] {
+                                                        Row.of("4"), 
Row.of("5"), Row.of("6")
+                                                    }))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema("name STRING", "nested STRING")
+                                    .consumedBeforeRestore("+I[Bob, 1]", 
"+I[Bob, 2]", "+I[Bob, 3]")
+                                    .consumedAfterRestore(
+                                            "+I[Alice, 4]", "+I[Alice, 5]", 
"+I[Alice, 6]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink_t SELECT name, nested FROM 
source_t CROSS JOIN UNNEST(arr) AS T(nested)")
+                    .build();
+}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-catalog-func/plan/correlate-catalog-func.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-catalog-func/plan/correlate-catalog-func.json
new file mode 100644
index 00000000000..ad40b066e4f
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-catalog-func/plan/correlate-catalog-func.json
@@ -0,0 +1,145 @@
+{
+  "flinkVersion" : "1.19",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "b",
+              "dataType" : "INT NOT NULL"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` 
VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t]], fields=[a, b, c])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 2,
+    "type" : "stream-exec-correlate_1",
+    "joinType" : "INNER",
+    "functionCall" : {
+      "kind" : "CALL",
+      "catalogName" : "`default_catalog`.`default_database`.`func1`",
+      "operands" : [ {
+        "kind" : "FIELD_ACCESS",
+        "name" : "c",
+        "expr" : {
+          "kind" : "CORREL_VARIABLE",
+          "correl" : "$cor0",
+          "type" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), 
`s` VARCHAR(2147483647)> NOT NULL"
+        }
+      }, {
+        "kind" : "LITERAL",
+        "value" : "$",
+        "type" : "CHAR(1) NOT NULL"
+      } ],
+      "type" : "ROW<`EXPR$0` VARCHAR(2147483647)> NOT NULL"
+    },
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), 
`EXPR$0` VARCHAR(2147483647)>",
+    "description" : "Correlate(invocation=[func1($cor0.c, _UTF-16LE'$')], 
correlate=[table(func1($cor0.c,'$'))], select=[a,b,c,EXPR$0], 
rowType=[RecordType(BIGINT a, INTEGER b, VARCHAR(2147483647) c, 
VARCHAR(2147483647) EXPR$0)], joinType=[INNER])"
+  }, {
+    "id" : 3,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 3,
+      "type" : "VARCHAR(2147483647)"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`c` VARCHAR(2147483647), `s` VARCHAR(2147483647)>",
+    "description" : "Calc(select=[c, EXPR$0 AS s])"
+  }, {
+    "id" : 4,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "b",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`c` VARCHAR(2147483647), `s` VARCHAR(2147483647)>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[c, s])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 3,
+    "target" : 4,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-catalog-func/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-catalog-func/savepoint/_metadata
new file mode 100644
index 00000000000..7866f20d147
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-catalog-func/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-cross-join-unnest/plan/correlate-cross-join-unnest.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-cross-join-unnest/plan/correlate-cross-join-unnest.json
new file mode 100644
index 00000000000..a72848c4b2e
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-cross-join-unnest/plan/correlate-cross-join-unnest.json
@@ -0,0 +1,138 @@
+{
+  "flinkVersion" : "1.19",
+  "nodes" : [ {
+    "id" : 18,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "name",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "arr",
+              "dataType" : "ARRAY<ROW<`nested` VARCHAR(2147483647)>>"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "outputType" : "ROW<`name` VARCHAR(2147483647), `arr` ARRAY<ROW<`nested` 
VARCHAR(2147483647)>>>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t]], fields=[name, arr])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 19,
+    "type" : "stream-exec-correlate_1",
+    "joinType" : "INNER",
+    "functionCall" : {
+      "kind" : "CALL",
+      "internalName" : "$UNNEST_ROWS$1",
+      "operands" : [ {
+        "kind" : "FIELD_ACCESS",
+        "name" : "arr",
+        "expr" : {
+          "kind" : "CORREL_VARIABLE",
+          "correl" : "$cor0",
+          "type" : "ROW<`name` VARCHAR(2147483647), `arr` ARRAY<ROW<`nested` 
VARCHAR(2147483647)>>, `nested` VARCHAR(2147483647)> NOT NULL"
+        }
+      } ],
+      "type" : "ROW<`nested` VARCHAR(2147483647)>"
+    },
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`name` VARCHAR(2147483647), `arr` ARRAY<ROW<`nested` 
VARCHAR(2147483647)>>, `nested` VARCHAR(2147483647)>",
+    "description" : "Correlate(invocation=[$UNNEST_ROWS$1($cor0.arr)], 
correlate=[table($UNNEST_ROWS$1($cor0.arr))], select=[name,arr,nested], 
rowType=[RecordType(VARCHAR(2147483647) name, 
RecordType:peek_no_expand(VARCHAR(2147483647) nested) ARRAY arr, 
VARCHAR(2147483647) nested)], joinType=[INNER])"
+  }, {
+    "id" : 20,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "VARCHAR(2147483647)"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`name` VARCHAR(2147483647), `nested` 
VARCHAR(2147483647)>",
+    "description" : "Calc(select=[name, nested])"
+  }, {
+    "id" : 21,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "name",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "nested",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`name` VARCHAR(2147483647), `nested` 
VARCHAR(2147483647)>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[name, nested])"
+  } ],
+  "edges" : [ {
+    "source" : 18,
+    "target" : 19,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 19,
+    "target" : 20,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 20,
+    "target" : 21,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-cross-join-unnest/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-cross-join-unnest/savepoint/_metadata
new file mode 100644
index 00000000000..a83a1f37ab6
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-cross-join-unnest/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-join-filter/plan/correlate-join-filter.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-join-filter/plan/correlate-join-filter.json
new file mode 100644
index 00000000000..08643e4e9e9
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-join-filter/plan/correlate-join-filter.json
@@ -0,0 +1,212 @@
+{
+  "flinkVersion" : "1.19",
+  "nodes" : [ {
+    "id" : 9,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "b",
+              "dataType" : "INT NOT NULL"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "FilterPushDown",
+        "predicates" : [ ]
+      } ]
+    },
+    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` 
VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t, filter=[]]], fields=[a, b, c])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 10,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "INT NOT NULL"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "VARCHAR(2147483647)"
+    } ],
+    "condition" : {
+      "kind" : "CALL",
+      "syntax" : "BINARY",
+      "internalName" : "$OR$1",
+      "operands" : [ {
+        "kind" : "CALL",
+        "syntax" : "SPECIAL",
+        "internalName" : "$LIKE$1",
+        "operands" : [ {
+          "kind" : "INPUT_REF",
+          "inputIndex" : 2,
+          "type" : "VARCHAR(2147483647)"
+        }, {
+          "kind" : "LITERAL",
+          "value" : "%hello%",
+          "type" : "CHAR(7) NOT NULL"
+        } ],
+        "type" : "BOOLEAN"
+      }, {
+        "kind" : "CALL",
+        "syntax" : "SPECIAL",
+        "internalName" : "$LIKE$1",
+        "operands" : [ {
+          "kind" : "INPUT_REF",
+          "inputIndex" : 2,
+          "type" : "VARCHAR(2147483647)"
+        }, {
+          "kind" : "LITERAL",
+          "value" : "%fiz%",
+          "type" : "CHAR(5) NOT NULL"
+        } ],
+        "type" : "BOOLEAN"
+      } ],
+      "type" : "BOOLEAN"
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` 
VARCHAR(2147483647)>",
+    "description" : "Calc(select=[a, b, c], where=[(LIKE(c, '%hello%') OR 
LIKE(c, '%fiz%'))])"
+  }, {
+    "id" : 11,
+    "type" : "stream-exec-correlate_1",
+    "joinType" : "INNER",
+    "functionCall" : {
+      "kind" : "CALL",
+      "catalogName" : "`default_catalog`.`default_database`.`func1`",
+      "operands" : [ {
+        "kind" : "FIELD_ACCESS",
+        "name" : "c",
+        "expr" : {
+          "kind" : "CORREL_VARIABLE",
+          "correl" : "$cor0",
+          "type" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), 
`s` VARCHAR(2147483647)> NOT NULL"
+        }
+      } ],
+      "type" : "ROW<`EXPR$0` VARCHAR(2147483647)> NOT NULL"
+    },
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), 
`EXPR$0` VARCHAR(2147483647)>",
+    "description" : "Correlate(invocation=[func1($cor0.c)], 
correlate=[table(func1($cor0.c))], select=[a,b,c,EXPR$0], 
rowType=[RecordType(BIGINT a, INTEGER b, VARCHAR(2147483647) c, 
VARCHAR(2147483647) EXPR$0)], joinType=[INNER])"
+  }, {
+    "id" : 12,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 3,
+      "type" : "VARCHAR(2147483647)"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`c` VARCHAR(2147483647), `s` VARCHAR(2147483647)>",
+    "description" : "Calc(select=[c, EXPR$0 AS s])"
+  }, {
+    "id" : 13,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "b",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`c` VARCHAR(2147483647), `s` VARCHAR(2147483647)>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[c, s])"
+  } ],
+  "edges" : [ {
+    "source" : 9,
+    "target" : 10,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 10,
+    "target" : 11,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 11,
+    "target" : 12,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 12,
+    "target" : 13,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-join-filter/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-join-filter/savepoint/_metadata
new file mode 100644
index 00000000000..225619c71e8
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-join-filter/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-left-join/plan/correlate-left-join.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-left-join/plan/correlate-left-join.json
new file mode 100644
index 00000000000..c70d6ef7fb7
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-left-join/plan/correlate-left-join.json
@@ -0,0 +1,141 @@
+{
+  "flinkVersion" : "1.19",
+  "nodes" : [ {
+    "id" : 14,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "b",
+              "dataType" : "INT NOT NULL"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` 
VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t]], fields=[a, b, c])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 15,
+    "type" : "stream-exec-correlate_1",
+    "joinType" : "LEFT",
+    "functionCall" : {
+      "kind" : "CALL",
+      "catalogName" : "`default_catalog`.`default_database`.`func1`",
+      "operands" : [ {
+        "kind" : "FIELD_ACCESS",
+        "name" : "c",
+        "expr" : {
+          "kind" : "CORREL_VARIABLE",
+          "correl" : "$cor0",
+          "type" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), 
`s` VARCHAR(2147483647)> NOT NULL"
+        }
+      } ],
+      "type" : "ROW<`EXPR$0` VARCHAR(2147483647)> NOT NULL"
+    },
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), 
`EXPR$0` VARCHAR(2147483647)>",
+    "description" : "Correlate(invocation=[func1($cor0.c)], 
correlate=[table(func1($cor0.c))], select=[a,b,c,EXPR$0], 
rowType=[RecordType(BIGINT a, INTEGER b, VARCHAR(2147483647) c, 
VARCHAR(2147483647) EXPR$0)], joinType=[LEFT])"
+  }, {
+    "id" : 16,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 3,
+      "type" : "VARCHAR(2147483647)"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`c` VARCHAR(2147483647), `s` VARCHAR(2147483647)>",
+    "description" : "Calc(select=[c, EXPR$0 AS s])"
+  }, {
+    "id" : 17,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "b",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`c` VARCHAR(2147483647), `s` VARCHAR(2147483647)>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[c, s])"
+  } ],
+  "edges" : [ {
+    "source" : 14,
+    "target" : 15,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 15,
+    "target" : 16,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 16,
+    "target" : 17,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-left-join/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-left-join/savepoint/_metadata
new file mode 100644
index 00000000000..b5330ece401
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-left-join/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-system-func/plan/correlate-system-func.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-system-func/plan/correlate-system-func.json
new file mode 100644
index 00000000000..b05d39229c3
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-system-func/plan/correlate-system-func.json
@@ -0,0 +1,145 @@
+{
+  "flinkVersion" : "1.19",
+  "nodes" : [ {
+    "id" : 5,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "b",
+              "dataType" : "INT NOT NULL"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` 
VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t]], fields=[a, b, c])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 6,
+    "type" : "stream-exec-correlate_1",
+    "joinType" : "INNER",
+    "functionCall" : {
+      "kind" : "CALL",
+      "systemName" : "STRING_SPLIT",
+      "operands" : [ {
+        "kind" : "FIELD_ACCESS",
+        "name" : "c",
+        "expr" : {
+          "kind" : "CORREL_VARIABLE",
+          "correl" : "$cor0",
+          "type" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), 
`s` VARCHAR(2147483647)> NOT NULL"
+        }
+      }, {
+        "kind" : "LITERAL",
+        "value" : "#",
+        "type" : "CHAR(1) NOT NULL"
+      } ],
+      "type" : "ROW<`EXPR$0` VARCHAR(2147483647)> NOT NULL"
+    },
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), 
`EXPR$0` VARCHAR(2147483647)>",
+    "description" : "Correlate(invocation=[STRING_SPLIT($cor0.c, 
_UTF-16LE'#')], correlate=[table(STRING_SPLIT($cor0.c,'#'))], 
select=[a,b,c,EXPR$0], rowType=[RecordType(BIGINT a, INTEGER b, 
VARCHAR(2147483647) c, VARCHAR(2147483647) EXPR$0)], joinType=[INNER])"
+  }, {
+    "id" : 7,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 3,
+      "type" : "VARCHAR(2147483647)"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`c` VARCHAR(2147483647), `s` VARCHAR(2147483647)>",
+    "description" : "Calc(select=[c, EXPR$0 AS s])"
+  }, {
+    "id" : 8,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "b",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`c` VARCHAR(2147483647), `s` VARCHAR(2147483647)>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[c, s])"
+  } ],
+  "edges" : [ {
+    "source" : 5,
+    "target" : 6,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 6,
+    "target" : 7,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 7,
+    "target" : 8,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-system-func/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-system-func/savepoint/_metadata
new file mode 100644
index 00000000000..bf0b0864597
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-system-func/savepoint/_metadata
 differ


Reply via email to