This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7e4a2f3fe6c558a08ec68dcb8e21ba43e85f3cf1 Author: bvarghese1 <bvargh...@confluent.io> AuthorDate: Mon Nov 27 10:47:56 2023 -0800 [FLINK-33647] Implement restore tests for LookupJoin node --- .../nodes/exec/stream/LookupJoinRestoreTest.java | 46 +++ .../nodes/exec/stream/LookupJoinTestPrograms.java | 388 +++++++++++++++++++++ .../plan/nodes/exec/testutils/RestoreTestBase.java | 3 - .../plan/lookup-join-async-hint.json | 251 +++++++++++++ .../lookup-join-async-hint/savepoint/_metadata | Bin 0 -> 15900 bytes .../plan/lookup-join-filter-pushdown.json | 264 ++++++++++++++ .../savepoint/_metadata | Bin 0 -> 13735 bytes .../plan/lookup-join-left-join.json | 279 +++++++++++++++ .../lookup-join-left-join/savepoint/_metadata | Bin 0 -> 14069 bytes .../plan/lookup-join-post-filter.json | 266 ++++++++++++++ .../lookup-join-post-filter/savepoint/_metadata | Bin 0 -> 14021 bytes .../plan/lookup-join-pre-filter.json | 279 +++++++++++++++ .../lookup-join-pre-filter/savepoint/_metadata | Bin 0 -> 14005 bytes .../plan/lookup-join-pre-post-filter.json | 281 +++++++++++++++ .../savepoint/_metadata | Bin 0 -> 13957 bytes .../plan/lookup-join-project-pushdown.json | 248 +++++++++++++ .../savepoint/_metadata | Bin 0 -> 13139 bytes .../plan/lookup-join-retry-hint.json | 252 +++++++++++++ .../lookup-join-retry-hint/savepoint/_metadata | Bin 0 -> 14135 bytes 19 files changed, 2554 insertions(+), 3 deletions(-) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinRestoreTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinRestoreTest.java new file mode 100644 index 00000000000..42ca645162b --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinRestoreTest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase; +import org.apache.flink.table.test.program.TableTestProgram; + +import java.util.Arrays; +import java.util.List; + +/** Restore tests for {@link StreamExecLookupJoin}. */ +public class LookupJoinRestoreTest extends RestoreTestBase { + + public LookupJoinRestoreTest() { + super(StreamExecLookupJoin.class); + } + + @Override + public List<TableTestProgram> programs() { + return Arrays.asList( + LookupJoinTestPrograms.LOOKUP_JOIN_PROJECT_PUSHDOWN, + LookupJoinTestPrograms.LOOKUP_JOIN_FILTER_PUSHDOWN, + LookupJoinTestPrograms.LOOKUP_JOIN_LEFT_JOIN, + LookupJoinTestPrograms.LOOKUP_JOIN_PRE_FILTER, + LookupJoinTestPrograms.LOOKUP_JOIN_POST_FILTER, + LookupJoinTestPrograms.LOOKUP_JOIN_PRE_POST_FILTER, + LookupJoinTestPrograms.LOOKUP_JOIN_ASYNC_HINT, + LookupJoinTestPrograms.LOOKUP_JOIN_RETRY_HINT); + } +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinTestPrograms.java new file mode 100644 index 00000000000..9b5c18b3f98 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinTestPrograms.java @@ -0,0 +1,388 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.test.program.SinkTestStep; +import org.apache.flink.table.test.program.SourceTestStep; +import org.apache.flink.table.test.program.TableTestProgram; +import org.apache.flink.types.Row; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +/** {@link TableTestProgram} definitions for testing {@link StreamExecLookupJoin}. */ +public class LookupJoinTestPrograms { + + static final String[] CUSTOMERS_SCHEMA = + new String[] { + "id INT PRIMARY KEY NOT ENFORCED", + "name STRING", + "age INT", + "city STRING", + "state STRING", + "zipcode INT" + }; + + static final Row[] CUSTOMERS_BEFORE_DATA = + new Row[] { + Row.of(1, "Bob", 28, "Mountain View", "California", 94043), + Row.of(2, "Alice", 32, "San Francisco", "California", 95016), + Row.of(3, "Claire", 37, "Austin", "Texas", 73301), + Row.of(4, "Shannon", 29, "Boise", "Idaho", 83701), + Row.of(5, "Jake", 42, "New York City", "New York", 10001) + }; + + static final Row[] CUSTOMERS_AFTER_DATA = + new Row[] { + Row.of(1, "Bob", 28, "San Jose", "California", 94089), + Row.of(6, "Joana", 54, "Atlanta", "Georgia", 30033) + }; + + static final SourceTestStep CUSTOMERS = + SourceTestStep.newBuilder("customers_t") + .addOption("disable-lookup", "false") // static/lookup table + .addOption("filterable-fields", "age") + .addSchema(CUSTOMERS_SCHEMA) + // Note: The before/after data is used to initialize the lookup table + // The data is not consumed like regular tables since lookup tables are + // external tables with data already present in them. + // Therefore, no state is persisted for lookup tables + .producedBeforeRestore(CUSTOMERS_BEFORE_DATA) + .producedAfterRestore(CUSTOMERS_AFTER_DATA) + .build(); + + static final SourceTestStep CUSTOMERS_ASYNC = + SourceTestStep.newBuilder("customers_t") + .addOption("disable-lookup", "false") // static/lookup table + .addOption("filterable-fields", "age") + .addOption("async", "true") + .addSchema(CUSTOMERS_SCHEMA) + // Note: The before/after data is used to initialize the lookup table + // The data is not consumed like regular tables since lookup tables are + // external tables with data already present in them. + // Therefore, no state is persisted for lookup tables + .producedBeforeRestore(CUSTOMERS_BEFORE_DATA) + .producedAfterRestore(CUSTOMERS_AFTER_DATA) + .build(); + + static final SourceTestStep ORDERS = + SourceTestStep.newBuilder("orders_t") + .addOption("filterable-fields", "customer_id") + .addSchema( + "order_id INT", + "customer_id INT", + "total DOUBLE", + "order_time STRING", + "proc_time AS PROCTIME()") + .producedBeforeRestore( + Row.of(1, 3, 44.44, "2020-10-10 00:00:01"), + Row.of(2, 5, 100.02, "2020-10-10 00:00:02"), + Row.of(4, 2, 92.61, "2020-10-10 00:00:04"), + Row.of(3, 1, 23.89, "2020-10-10 00:00:03"), + Row.of(6, 4, 7.65, "2020-10-10 00:00:06"), + Row.of(5, 2, 12.78, "2020-10-10 00:00:05")) + .producedAfterRestore( + Row.of(7, 6, 17.58, "2020-10-10 00:00:07"), // new customer + Row.of(9, 1, 143.21, "2020-10-10 00:00:08") // updated zip code + ) + .build(); + + static final List<String> SINK_SCHEMA = + Arrays.asList( + "order_id INT", + "total DOUBLE", + "id INT", + "name STRING", + "age INT", + "city STRING", + "state STRING", + "zipcode INT"); + + static final TableTestProgram LOOKUP_JOIN_PROJECT_PUSHDOWN = + TableTestProgram.of( + "lookup-join-project-pushdown", + "validates lookup join with project pushdown") + .setupTableSource(CUSTOMERS) + .setupTableSource(ORDERS) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema( + SINK_SCHEMA.stream() + .filter(field -> !field.equals("age INT")) + .collect(Collectors.toList())) + .consumedBeforeRestore( + "+I[1, 44.44, 3, Claire, Austin, Texas, 73301]", + "+I[2, 100.02, 5, Jake, New York City, New York, 10001]", + "+I[4, 92.61, 2, Alice, San Francisco, California, 95016]", + "+I[3, 23.89, 1, Bob, Mountain View, California, 94043]", + "+I[6, 7.65, 4, Shannon, Boise, Idaho, 83701]", + "+I[5, 12.78, 2, Alice, San Francisco, California, 95016]") + .consumedAfterRestore( + "+I[7, 17.58, 6, Joana, Atlanta, Georgia, 30033]", + "+I[9, 143.21, 1, Bob, San Jose, California, 94089]") + .build()) + .runSql( + "INSERT INTO sink_t SELECT " + + "O.order_id, " + + "O.total, " + + "C.id, " + + "C.name, " + + "C.city, " + + "C.state, " + + "C.zipcode " + + "FROM orders_t as O " + + "JOIN customers_t FOR SYSTEM_TIME AS OF O.proc_time AS C " + + "ON O.customer_id = C.id") + .build(); + + static final TableTestProgram LOOKUP_JOIN_FILTER_PUSHDOWN = + TableTestProgram.of( + "lookup-join-filter-pushdown", + "validates lookup join with filter pushdown") + .setupTableSource(CUSTOMERS) + .setupTableSource(ORDERS) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema(SINK_SCHEMA) + .consumedBeforeRestore( + "+I[1, 44.44, 3, Claire, 37, Austin, Texas, 73301]", + "+I[2, 100.02, 5, Jake, 42, New York City, New York, 10001]", + "+I[4, 92.61, 2, Alice, 32, San Francisco, California, 95016]", + "+I[5, 12.78, 2, Alice, 32, San Francisco, California, 95016]") + .consumedAfterRestore( + "+I[7, 17.58, 6, Joana, 54, Atlanta, Georgia, 30033]") + .build()) + .runSql( + "INSERT INTO sink_t SELECT " + + "O.order_id, " + + "O.total, " + + "C.id, " + + "C.name, " + + "C.age, " + + "C.city, " + + "C.state, " + + "C.zipcode " + + "FROM orders_t as O " + + "JOIN customers_t FOR SYSTEM_TIME AS OF O.proc_time AS C " + + "ON O.customer_id = C.id AND C.age > 30") + .build(); + + static final TableTestProgram LOOKUP_JOIN_PRE_FILTER = + TableTestProgram.of("lookup-join-pre-filter", "validates lookup join with pre filter") + .setupTableSource(CUSTOMERS) + .setupTableSource(ORDERS) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema(SINK_SCHEMA) + .consumedBeforeRestore( + "+I[1, 44.44, 3, Claire, 37, Austin, Texas, 73301]", + "+I[2, 100.02, 5, Jake, 42, New York City, New York, 10001]", + "+I[4, 92.61, 2, Alice, 32, San Francisco, California, 95016]", + "+I[3, 23.89, null, null, null, null, null, null]", + "+I[6, 7.65, null, null, null, null, null, null]", + "+I[5, 12.78, null, null, null, null, null, null]") + .consumedAfterRestore( + "+I[7, 17.58, 6, Joana, 54, Atlanta, Georgia, 30033]", + "+I[9, 143.21, null, null, null, null, null, null]") + .build()) + .runSql( + "INSERT INTO sink_t SELECT " + + "O.order_id, " + + "O.total, " + + "C.id, " + + "C.name, " + + "C.age, " + + "C.city, " + + "C.state, " + + "C.zipcode " + + "FROM orders_t as O " + + "LEFT JOIN customers_t FOR SYSTEM_TIME AS OF O.proc_time AS C " + + "ON O.customer_id = C.id AND C.age > 30 AND O.total > 15.3") + .build(); + + static final TableTestProgram LOOKUP_JOIN_POST_FILTER = + TableTestProgram.of("lookup-join-post-filter", "validates lookup join with post filter") + .setupTableSource(CUSTOMERS) + .setupTableSource(ORDERS) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema(SINK_SCHEMA) + .consumedBeforeRestore( + "+I[1, 44.44, null, null, null, null, null, null]", + "+I[2, 100.02, null, null, null, null, null, null]", + "+I[4, 92.61, null, null, null, null, null, null]", + "+I[3, 23.89, 1, Bob, 28, Mountain View, California, 94043]", + "+I[6, 7.65, 4, Shannon, 29, Boise, Idaho, 83701]", + "+I[5, 12.78, 2, Alice, 32, San Francisco, California, 95016]") + .consumedAfterRestore( + "+I[7, 17.58, 6, Joana, 54, Atlanta, Georgia, 30033]", + "+I[9, 143.21, null, null, null, null, null, null]") + .build()) + .runSql( + "INSERT INTO sink_t SELECT " + + "O.order_id, " + + "O.total, " + + "C.id, " + + "C.name, " + + "C.age, " + + "C.city, " + + "C.state, " + + "C.zipcode " + + "FROM orders_t as O " + + "LEFT JOIN customers_t FOR SYSTEM_TIME AS OF O.proc_time AS C " + + "ON O.customer_id = C.id AND CAST(O.total AS INT) < C.age") + .build(); + + static final TableTestProgram LOOKUP_JOIN_PRE_POST_FILTER = + TableTestProgram.of( + "lookup-join-pre-post-filter", + "validates lookup join with pre and post filters") + .setupTableSource(CUSTOMERS) + .setupTableSource(ORDERS) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema(SINK_SCHEMA) + .consumedBeforeRestore( + "+I[1, 44.44, null, null, null, null, null, null]", + "+I[2, 100.02, null, null, null, null, null, null]", + "+I[4, 92.61, null, null, null, null, null, null]", + "+I[3, 23.89, 1, Bob, 28, Mountain View, California, 94043]", + "+I[6, 7.65, null, null, null, null, null, null]", + "+I[5, 12.78, null, null, null, null, null, null]") + .consumedAfterRestore( + "+I[7, 17.58, 6, Joana, 54, Atlanta, Georgia, 30033]", + "+I[9, 143.21, null, null, null, null, null, null]") + .build()) + .runSql( + "INSERT INTO sink_t SELECT " + + "O.order_id, " + + "O.total, " + + "C.id, " + + "C.name, " + + "C.age, " + + "C.city, " + + "C.state, " + + "C.zipcode " + + "FROM orders_t as O " + + "LEFT JOIN customers_t FOR SYSTEM_TIME AS OF O.proc_time AS C " + + "ON O.customer_id = C.id AND O.total > 15.3 AND CAST(O.total AS INT) < C.age") + .build(); + + static final TableTestProgram LOOKUP_JOIN_LEFT_JOIN = + TableTestProgram.of("lookup-join-left-join", "validates lookup join with left join") + .setupTableSource(CUSTOMERS) + .setupTableSource(ORDERS) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema(SINK_SCHEMA) + .consumedBeforeRestore( + "+I[1, 44.44, null, null, null, null, null, null]", + "+I[2, 100.02, 5, Jake, 42, New York City, New York, 10001]", + "+I[4, 92.61, 2, Alice, 32, San Francisco, California, 95016]", + "+I[3, 23.89, null, null, null, null, null, null]", + "+I[6, 7.65, null, null, null, null, null, null]", + "+I[5, 12.78, 2, Alice, 32, San Francisco, California, 95016]") + .consumedAfterRestore( + "+I[7, 17.58, 6, Joana, 54, Atlanta, Georgia, 30033]", + "+I[9, 143.21, null, null, null, null, null, null]") + .build()) + .runSql( + "INSERT INTO sink_t SELECT " + + "O.order_id, " + + "O.total, " + + "C.id, " + + "C.name, " + + "C.age, " + + "C.city, " + + "C.state, " + + "C.zipcode " + + "FROM orders_t as O " + + "LEFT JOIN customers_t FOR SYSTEM_TIME AS OF O.proc_time AS C " + + "ON O.customer_id = C.id AND C.age > 30 AND O.customer_id <> 3") + .build(); + + static final TableTestProgram LOOKUP_JOIN_ASYNC_HINT = + TableTestProgram.of("lookup-join-async-hint", "validates lookup join with async hint") + .setupTableSource(CUSTOMERS_ASYNC) + .setupTableSource(ORDERS) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema(SINK_SCHEMA) + .consumedBeforeRestore( + "+I[1, 44.44, 3, Claire, 37, Austin, Texas, 73301]", + "+I[2, 100.02, 5, Jake, 42, New York City, New York, 10001]", + "+I[4, 92.61, 2, Alice, 32, San Francisco, California, 95016]", + "+I[3, 23.89, 1, Bob, 28, Mountain View, California, 94043]", + "+I[6, 7.65, 4, Shannon, 29, Boise, Idaho, 83701]", + "+I[5, 12.78, 2, Alice, 32, San Francisco, California, 95016]") + .consumedAfterRestore( + "+I[7, 17.58, 6, Joana, 54, Atlanta, Georgia, 30033]", + "+I[9, 143.21, 1, Bob, 28, San Jose, California, 94089]") + .build()) + .runSql( + "INSERT INTO sink_t SELECT " + + "/*+ LOOKUP('table'='C', 'async'='true', 'output-mode'='allow_unordered', 'timeout'='500s', 'capacity'='2000') */ " + + "O.order_id, " + + "O.total, " + + "C.id, " + + "C.name, " + + "C.age, " + + "C.city, " + + "C.state, " + + "C.zipcode " + + "FROM orders_t as O " + + "JOIN customers_t FOR SYSTEM_TIME AS OF O.proc_time AS C " + + "ON O.customer_id = C.id") + .build(); + + static final TableTestProgram LOOKUP_JOIN_RETRY_HINT = + TableTestProgram.of("lookup-join-retry-hint", "validates lookup join with retry hint") + .setupTableSource(CUSTOMERS) + .setupTableSource(ORDERS) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema(SINK_SCHEMA) + .consumedBeforeRestore( + "+I[1, 44.44, 3, Claire, 37, Austin, Texas, 73301]", + "+I[2, 100.02, 5, Jake, 42, New York City, New York, 10001]", + "+I[4, 92.61, 2, Alice, 32, San Francisco, California, 95016]", + "+I[3, 23.89, 1, Bob, 28, Mountain View, California, 94043]", + "+I[6, 7.65, 4, Shannon, 29, Boise, Idaho, 83701]", + "+I[5, 12.78, 2, Alice, 32, San Francisco, California, 95016]") + .consumedAfterRestore( + "+I[7, 17.58, 6, Joana, 54, Atlanta, Georgia, 30033]", + "+I[9, 143.21, 1, Bob, 28, San Jose, California, 94089]") + .build()) + .runSql( + "INSERT INTO sink_t SELECT " + + "/*+ LOOKUP('table'='C', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s', 'max-attempts'='3') */ " + + "O.order_id, " + + "O.total, " + + "C.id, " + + "C.name, " + + "C.age, " + + "C.city, " + + "C.state, " + + "C.zipcode " + + "FROM orders_t as O " + + "JOIN customers_t FOR SYSTEM_TIME AS OF O.proc_time AS C " + + "ON O.customer_id = C.id") + .build(); +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java index 7c07bf9bf51..8dd4646df10 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java @@ -193,7 +193,6 @@ public abstract class RestoreTestBase implements TableTestProgramRunner { options.put("connector", "values"); options.put("data-id", id); options.put("terminating", "false"); - options.put("disable-lookup", "true"); options.put("runtime-source", "NewSource"); sourceTestStep.apply(tEnv, options); } @@ -203,7 +202,6 @@ public abstract class RestoreTestBase implements TableTestProgramRunner { registerSinkObserver(futures, sinkTestStep, true); final Map<String, String> options = new HashMap<>(); options.put("connector", "values"); - options.put("disable-lookup", "true"); options.put("sink-insert-only", "false"); sinkTestStep.apply(tEnv, options); } @@ -252,7 +250,6 @@ public abstract class RestoreTestBase implements TableTestProgramRunner { final Map<String, String> options = new HashMap<>(); options.put("connector", "values"); options.put("data-id", id); - options.put("disable-lookup", "true"); options.put("runtime-source", "NewSource"); if (afterRestoreSource == AfterRestoreSource.INFINITE) { options.put("terminating", "false"); diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-async-hint/plan/lookup-join-async-hint.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-async-hint/plan/lookup-join-async-hint.json new file mode 100644 index 00000000000..8538f0cffe1 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-async-hint/plan/lookup-join-async-hint.json @@ -0,0 +1,251 @@ +{ + "flinkVersion" : "1.19", + "nodes" : [ { + "id" : 25, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`orders_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "order_id", + "dataType" : "INT" + }, { + "name" : "customer_id", + "dataType" : "INT" + }, { + "name" : "total", + "dataType" : "DOUBLE" + }, { + "name" : "order_time", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "proc_time", + "kind" : "COMPUTED", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "internalName" : "$PROCTIME$1", + "operands" : [ ], + "type" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, + "serializableString" : "PROCTIME()" + } + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + }, + "abilities" : [ { + "type" : "ProjectPushDown", + "projectedFields" : [ [ 0 ], [ 1 ], [ 2 ] ], + "producedType" : "ROW<`order_id` INT, `customer_id` INT, `total` DOUBLE> NOT NULL" + }, { + "type" : "ReadingMetadata", + "metadataKeys" : [ ], + "producedType" : "ROW<`order_id` INT, `customer_id` INT, `total` DOUBLE> NOT NULL" + } ] + }, + "outputType" : "ROW<`order_id` INT, `customer_id` INT, `total` DOUBLE>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, orders_t, project=[order_id, customer_id, total], metadata=[]]], fields=[order_id, customer_id, total])", + "inputProperties" : [ ] + }, { + "id" : 26, + "type" : "stream-exec-lookup-join_1", + "joinType" : "INNER", + "joinCondition" : null, + "temporalTable" : { + "lookupTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`customers_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "id", + "dataType" : "INT NOT NULL" + }, { + "name" : "name", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "age", + "dataType" : "INT" + }, { + "name" : "city", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "state", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "zipcode", + "dataType" : "INT" + } ], + "watermarkSpecs" : [ ], + "primaryKey" : { + "name" : "PK_id", + "type" : "PRIMARY_KEY", + "columns" : [ "id" ] + } + }, + "partitionKeys" : [ ] + } + } + }, + "outputType" : "ROW<`id` INT NOT NULL, `name` VARCHAR(2147483647), `age` INT, `city` VARCHAR(2147483647), `state` VARCHAR(2147483647), `zipcode` INT> NOT NULL" + }, + "lookupKeys" : { + "0" : { + "type" : "FieldRef", + "index" : 1 + } + }, + "projectionOnTemporalTable" : null, + "filterOnTemporalTable" : null, + "lookupKeyContainsPrimaryKey" : true, + "asyncOptions" : { + "capacity " : 2000, + "timeout" : 500000, + "output-mode" : "UNORDERED" + }, + "inputChangelogMode" : [ "INSERT" ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`order_id` INT, `customer_id` INT, `total` DOUBLE, `id` INT NOT NULL, `name` VARCHAR(2147483647), `age` INT, `city` VARCHAR(2147483647), `state` VARCHAR(2147483647), `zipcode` INT>", + "description" : "LookupJoin(table=[default_catalog.default_database.customers_t], joinType=[InnerJoin], lookup=[id=customer_id], select=[order_id, customer_id, total, id, name, age, city, state, zipcode], async=[UNORDERED, 500000ms, 2000])" + }, { + "id" : 27, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "INT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "DOUBLE" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : "INT NOT NULL" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 5, + "type" : "INT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 6, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 7, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 8, + "type" : "INT" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`order_id` INT, `total` DOUBLE, `id` INT NOT NULL, `name` VARCHAR(2147483647), `age` INT, `city` VARCHAR(2147483647), `state` VARCHAR(2147483647), `zipcode` INT>", + "description" : "Calc(select=[order_id, total, id, name, age, city, state, zipcode])" + }, { + "id" : 28, + "type" : "stream-exec-sink_1", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "order_id", + "dataType" : "INT" + }, { + "name" : "total", + "dataType" : "DOUBLE" + }, { + "name" : "id", + "dataType" : "INT" + }, { + "name" : "name", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "age", + "dataType" : "INT" + }, { + "name" : "city", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "state", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "zipcode", + "dataType" : "INT" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "inputChangelogMode" : [ "INSERT" ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`order_id` INT, `total` DOUBLE, `id` INT NOT NULL, `name` VARCHAR(2147483647), `age` INT, `city` VARCHAR(2147483647), `state` VARCHAR(2147483647), `zipcode` INT>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[order_id, total, id, name, age, city, state, zipcode])" + } ], + "edges" : [ { + "source" : 25, + "target" : 26, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 26, + "target" : 27, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 27, + "target" : 28, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-async-hint/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-async-hint/savepoint/_metadata new file mode 100644 index 00000000000..f811fb0517d Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-async-hint/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-filter-pushdown/plan/lookup-join-filter-pushdown.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-filter-pushdown/plan/lookup-join-filter-pushdown.json new file mode 100644 index 00000000000..325f14c90a3 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-filter-pushdown/plan/lookup-join-filter-pushdown.json @@ -0,0 +1,264 @@ +{ + "flinkVersion" : "1.19", + "nodes" : [ { + "id" : 5, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`orders_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "order_id", + "dataType" : "INT" + }, { + "name" : "customer_id", + "dataType" : "INT" + }, { + "name" : "total", + "dataType" : "DOUBLE" + }, { + "name" : "order_time", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "proc_time", + "kind" : "COMPUTED", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "internalName" : "$PROCTIME$1", + "operands" : [ ], + "type" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, + "serializableString" : "PROCTIME()" + } + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + }, + "abilities" : [ { + "type" : "ProjectPushDown", + "projectedFields" : [ [ 0 ], [ 1 ], [ 2 ] ], + "producedType" : "ROW<`order_id` INT, `customer_id` INT, `total` DOUBLE> NOT NULL" + }, { + "type" : "ReadingMetadata", + "metadataKeys" : [ ], + "producedType" : "ROW<`order_id` INT, `customer_id` INT, `total` DOUBLE> NOT NULL" + } ] + }, + "outputType" : "ROW<`order_id` INT, `customer_id` INT, `total` DOUBLE>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, orders_t, project=[order_id, customer_id, total], metadata=[]]], fields=[order_id, customer_id, total])", + "inputProperties" : [ ] + }, { + "id" : 6, + "type" : "stream-exec-lookup-join_1", + "joinType" : "INNER", + "joinCondition" : null, + "temporalTable" : { + "lookupTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`customers_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "id", + "dataType" : "INT NOT NULL" + }, { + "name" : "name", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "age", + "dataType" : "INT" + }, { + "name" : "city", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "state", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "zipcode", + "dataType" : "INT" + } ], + "watermarkSpecs" : [ ], + "primaryKey" : { + "name" : "PK_id", + "type" : "PRIMARY_KEY", + "columns" : [ "id" ] + } + }, + "partitionKeys" : [ ] + } + }, + "abilities" : [ { + "type" : "FilterPushDown", + "predicates" : [ { + "kind" : "CALL", + "syntax" : "BINARY", + "internalName" : "$>$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "INT" + }, { + "kind" : "LITERAL", + "value" : 30, + "type" : "INT NOT NULL" + } ], + "type" : "BOOLEAN" + } ] + } ] + }, + "outputType" : "ROW<`id` INT NOT NULL, `name` VARCHAR(2147483647), `age` INT, `city` VARCHAR(2147483647), `state` VARCHAR(2147483647), `zipcode` INT> NOT NULL" + }, + "lookupKeys" : { + "0" : { + "type" : "FieldRef", + "index" : 1 + } + }, + "projectionOnTemporalTable" : null, + "filterOnTemporalTable" : null, + "lookupKeyContainsPrimaryKey" : true, + "inputChangelogMode" : [ "INSERT" ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`order_id` INT, `customer_id` INT, `total` DOUBLE, `id` INT NOT NULL, `name` VARCHAR(2147483647), `age` INT, `city` VARCHAR(2147483647), `state` VARCHAR(2147483647), `zipcode` INT>", + "description" : "LookupJoin(table=[default_catalog.default_database.customers_t], joinType=[InnerJoin], lookup=[id=customer_id], select=[order_id, customer_id, total, id, name, age, city, state, zipcode])" + }, { + "id" : 7, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "INT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "DOUBLE" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : "INT NOT NULL" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 5, + "type" : "INT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 6, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 7, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 8, + "type" : "INT" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`order_id` INT, `total` DOUBLE, `id` INT NOT NULL, `name` VARCHAR(2147483647), `age` INT, `city` VARCHAR(2147483647), `state` VARCHAR(2147483647), `zipcode` INT>", + "description" : "Calc(select=[order_id, total, id, name, age, city, state, zipcode])" + }, { + "id" : 8, + "type" : "stream-exec-sink_1", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "order_id", + "dataType" : "INT" + }, { + "name" : "total", + "dataType" : "DOUBLE" + }, { + "name" : "id", + "dataType" : "INT" + }, { + "name" : "name", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "age", + "dataType" : "INT" + }, { + "name" : "city", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "state", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "zipcode", + "dataType" : "INT" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "inputChangelogMode" : [ "INSERT" ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`order_id` INT, `total` DOUBLE, `id` INT NOT NULL, `name` VARCHAR(2147483647), `age` INT, `city` VARCHAR(2147483647), `state` VARCHAR(2147483647), `zipcode` INT>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[order_id, total, id, name, age, city, state, zipcode])" + } ], + "edges" : [ { + "source" : 5, + "target" : 6, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 6, + "target" : 7, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 7, + "target" : 8, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-filter-pushdown/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-filter-pushdown/savepoint/_metadata new file mode 100644 index 00000000000..fa7a32bb113 Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-filter-pushdown/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-left-join/plan/lookup-join-left-join.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-left-join/plan/lookup-join-left-join.json new file mode 100644 index 00000000000..80faae47702 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-left-join/plan/lookup-join-left-join.json @@ -0,0 +1,279 @@ +{ + "flinkVersion" : "1.19", + "nodes" : [ { + "id" : 9, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`orders_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "order_id", + "dataType" : "INT" + }, { + "name" : "customer_id", + "dataType" : "INT" + }, { + "name" : "total", + "dataType" : "DOUBLE" + }, { + "name" : "order_time", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "proc_time", + "kind" : "COMPUTED", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "internalName" : "$PROCTIME$1", + "operands" : [ ], + "type" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, + "serializableString" : "PROCTIME()" + } + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + }, + "abilities" : [ { + "type" : "ProjectPushDown", + "projectedFields" : [ [ 0 ], [ 1 ], [ 2 ] ], + "producedType" : "ROW<`order_id` INT, `customer_id` INT, `total` DOUBLE> NOT NULL" + }, { + "type" : "ReadingMetadata", + "metadataKeys" : [ ], + "producedType" : "ROW<`order_id` INT, `customer_id` INT, `total` DOUBLE> NOT NULL" + } ] + }, + "outputType" : "ROW<`order_id` INT, `customer_id` INT, `total` DOUBLE>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, orders_t, project=[order_id, customer_id, total], metadata=[]]], fields=[order_id, customer_id, total])", + "inputProperties" : [ ] + }, { + "id" : 10, + "type" : "stream-exec-lookup-join_1", + "joinType" : "LEFT", + "preFilterCondition" : { + "kind" : "CALL", + "syntax" : "BINARY", + "internalName" : "$<>$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "INT" + }, { + "kind" : "LITERAL", + "value" : 3, + "type" : "INT NOT NULL" + } ], + "type" : "BOOLEAN" + }, + "joinCondition" : null, + "temporalTable" : { + "lookupTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`customers_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "id", + "dataType" : "INT NOT NULL" + }, { + "name" : "name", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "age", + "dataType" : "INT" + }, { + "name" : "city", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "state", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "zipcode", + "dataType" : "INT" + } ], + "watermarkSpecs" : [ ], + "primaryKey" : { + "name" : "PK_id", + "type" : "PRIMARY_KEY", + "columns" : [ "id" ] + } + }, + "partitionKeys" : [ ] + } + }, + "abilities" : [ { + "type" : "FilterPushDown", + "predicates" : [ { + "kind" : "CALL", + "syntax" : "BINARY", + "internalName" : "$>$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "INT" + }, { + "kind" : "LITERAL", + "value" : 30, + "type" : "INT NOT NULL" + } ], + "type" : "BOOLEAN" + } ] + } ] + }, + "outputType" : "ROW<`id` INT NOT NULL, `name` VARCHAR(2147483647), `age` INT, `city` VARCHAR(2147483647), `state` VARCHAR(2147483647), `zipcode` INT> NOT NULL" + }, + "lookupKeys" : { + "0" : { + "type" : "FieldRef", + "index" : 1 + } + }, + "projectionOnTemporalTable" : null, + "filterOnTemporalTable" : null, + "lookupKeyContainsPrimaryKey" : true, + "inputChangelogMode" : [ "INSERT" ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`order_id` INT, `customer_id` INT, `total` DOUBLE, `id` INT, `name` VARCHAR(2147483647), `age` INT, `city` VARCHAR(2147483647), `state` VARCHAR(2147483647), `zipcode` INT>", + "description" : "LookupJoin(table=[default_catalog.default_database.customers_t], joinType=[LeftOuterJoin], lookup=[id=customer_id], joinCondition=[(customer_id <> 3)], select=[order_id, customer_id, total, id, name, age, city, state, zipcode])" + }, { + "id" : 11, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "INT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "DOUBLE" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : "INT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 5, + "type" : "INT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 6, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 7, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 8, + "type" : "INT" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`order_id` INT, `total` DOUBLE, `id` INT, `name` VARCHAR(2147483647), `age` INT, `city` VARCHAR(2147483647), `state` VARCHAR(2147483647), `zipcode` INT>", + "description" : "Calc(select=[order_id, total, id, name, age, city, state, zipcode])" + }, { + "id" : 12, + "type" : "stream-exec-sink_1", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "order_id", + "dataType" : "INT" + }, { + "name" : "total", + "dataType" : "DOUBLE" + }, { + "name" : "id", + "dataType" : "INT" + }, { + "name" : "name", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "age", + "dataType" : "INT" + }, { + "name" : "city", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "state", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "zipcode", + "dataType" : "INT" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "inputChangelogMode" : [ "INSERT" ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`order_id` INT, `total` DOUBLE, `id` INT, `name` VARCHAR(2147483647), `age` INT, `city` VARCHAR(2147483647), `state` VARCHAR(2147483647), `zipcode` INT>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[order_id, total, id, name, age, city, state, zipcode])" + } ], + "edges" : [ { + "source" : 9, + "target" : 10, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 10, + "target" : 11, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 11, + "target" : 12, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-left-join/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-left-join/savepoint/_metadata new file mode 100644 index 00000000000..ac7367cc552 Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-left-join/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-post-filter/plan/lookup-join-post-filter.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-post-filter/plan/lookup-join-post-filter.json new file mode 100644 index 00000000000..fd2f1825e57 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-post-filter/plan/lookup-join-post-filter.json @@ -0,0 +1,266 @@ +{ + "flinkVersion" : "1.19", + "nodes" : [ { + "id" : 17, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`orders_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "order_id", + "dataType" : "INT" + }, { + "name" : "customer_id", + "dataType" : "INT" + }, { + "name" : "total", + "dataType" : "DOUBLE" + }, { + "name" : "order_time", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "proc_time", + "kind" : "COMPUTED", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "internalName" : "$PROCTIME$1", + "operands" : [ ], + "type" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, + "serializableString" : "PROCTIME()" + } + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + }, + "abilities" : [ { + "type" : "ProjectPushDown", + "projectedFields" : [ [ 0 ], [ 1 ], [ 2 ] ], + "producedType" : "ROW<`order_id` INT, `customer_id` INT, `total` DOUBLE> NOT NULL" + }, { + "type" : "ReadingMetadata", + "metadataKeys" : [ ], + "producedType" : "ROW<`order_id` INT, `customer_id` INT, `total` DOUBLE> NOT NULL" + } ] + }, + "outputType" : "ROW<`order_id` INT, `customer_id` INT, `total` DOUBLE>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, orders_t, project=[order_id, customer_id, total], metadata=[]]], fields=[order_id, customer_id, total])", + "inputProperties" : [ ] + }, { + "id" : 18, + "type" : "stream-exec-lookup-join_1", + "joinType" : "LEFT", + "joinCondition" : { + "kind" : "CALL", + "syntax" : "BINARY", + "internalName" : "$<$1", + "operands" : [ { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$CAST$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "DOUBLE" + } ], + "type" : "INT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 5, + "type" : "INT" + } ], + "type" : "BOOLEAN" + }, + "temporalTable" : { + "lookupTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`customers_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "id", + "dataType" : "INT NOT NULL" + }, { + "name" : "name", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "age", + "dataType" : "INT" + }, { + "name" : "city", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "state", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "zipcode", + "dataType" : "INT" + } ], + "watermarkSpecs" : [ ], + "primaryKey" : { + "name" : "PK_id", + "type" : "PRIMARY_KEY", + "columns" : [ "id" ] + } + }, + "partitionKeys" : [ ] + } + } + }, + "outputType" : "ROW<`id` INT NOT NULL, `name` VARCHAR(2147483647), `age` INT, `city` VARCHAR(2147483647), `state` VARCHAR(2147483647), `zipcode` INT> NOT NULL" + }, + "lookupKeys" : { + "0" : { + "type" : "FieldRef", + "index" : 1 + } + }, + "projectionOnTemporalTable" : null, + "filterOnTemporalTable" : null, + "lookupKeyContainsPrimaryKey" : true, + "inputChangelogMode" : [ "INSERT" ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`order_id` INT, `customer_id` INT, `total` DOUBLE, `id` INT, `name` VARCHAR(2147483647), `age` INT, `city` VARCHAR(2147483647), `state` VARCHAR(2147483647), `zipcode` INT>", + "description" : "LookupJoin(table=[default_catalog.default_database.customers_t], joinType=[LeftOuterJoin], lookup=[id=customer_id], joinCondition=[(CAST(total AS INTEGER) < age)], select=[order_id, customer_id, total, id, name, age, city, state, zipcode])" + }, { + "id" : 19, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "INT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "DOUBLE" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : "INT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 5, + "type" : "INT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 6, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 7, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 8, + "type" : "INT" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`order_id` INT, `total` DOUBLE, `id` INT, `name` VARCHAR(2147483647), `age` INT, `city` VARCHAR(2147483647), `state` VARCHAR(2147483647), `zipcode` INT>", + "description" : "Calc(select=[order_id, total, id, name, age, city, state, zipcode])" + }, { + "id" : 20, + "type" : "stream-exec-sink_1", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "order_id", + "dataType" : "INT" + }, { + "name" : "total", + "dataType" : "DOUBLE" + }, { + "name" : "id", + "dataType" : "INT" + }, { + "name" : "name", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "age", + "dataType" : "INT" + }, { + "name" : "city", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "state", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "zipcode", + "dataType" : "INT" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "inputChangelogMode" : [ "INSERT" ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`order_id` INT, `total` DOUBLE, `id` INT, `name` VARCHAR(2147483647), `age` INT, `city` VARCHAR(2147483647), `state` VARCHAR(2147483647), `zipcode` INT>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[order_id, total, id, name, age, city, state, zipcode])" + } ], + "edges" : [ { + "source" : 17, + "target" : 18, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 18, + "target" : 19, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 19, + "target" : 20, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-post-filter/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-post-filter/savepoint/_metadata new file mode 100644 index 00000000000..4aea6a1a2d9 Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-post-filter/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-pre-filter/plan/lookup-join-pre-filter.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-pre-filter/plan/lookup-join-pre-filter.json new file mode 100644 index 00000000000..a7c551450f3 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-pre-filter/plan/lookup-join-pre-filter.json @@ -0,0 +1,279 @@ +{ + "flinkVersion" : "1.19", + "nodes" : [ { + "id" : 13, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`orders_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "order_id", + "dataType" : "INT" + }, { + "name" : "customer_id", + "dataType" : "INT" + }, { + "name" : "total", + "dataType" : "DOUBLE" + }, { + "name" : "order_time", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "proc_time", + "kind" : "COMPUTED", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "internalName" : "$PROCTIME$1", + "operands" : [ ], + "type" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, + "serializableString" : "PROCTIME()" + } + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + }, + "abilities" : [ { + "type" : "ProjectPushDown", + "projectedFields" : [ [ 0 ], [ 1 ], [ 2 ] ], + "producedType" : "ROW<`order_id` INT, `customer_id` INT, `total` DOUBLE> NOT NULL" + }, { + "type" : "ReadingMetadata", + "metadataKeys" : [ ], + "producedType" : "ROW<`order_id` INT, `customer_id` INT, `total` DOUBLE> NOT NULL" + } ] + }, + "outputType" : "ROW<`order_id` INT, `customer_id` INT, `total` DOUBLE>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, orders_t, project=[order_id, customer_id, total], metadata=[]]], fields=[order_id, customer_id, total])", + "inputProperties" : [ ] + }, { + "id" : 14, + "type" : "stream-exec-lookup-join_1", + "joinType" : "LEFT", + "preFilterCondition" : { + "kind" : "CALL", + "syntax" : "BINARY", + "internalName" : "$>$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "DOUBLE" + }, { + "kind" : "LITERAL", + "value" : "15.3", + "type" : "DECIMAL(3, 1) NOT NULL" + } ], + "type" : "BOOLEAN" + }, + "joinCondition" : null, + "temporalTable" : { + "lookupTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`customers_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "id", + "dataType" : "INT NOT NULL" + }, { + "name" : "name", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "age", + "dataType" : "INT" + }, { + "name" : "city", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "state", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "zipcode", + "dataType" : "INT" + } ], + "watermarkSpecs" : [ ], + "primaryKey" : { + "name" : "PK_id", + "type" : "PRIMARY_KEY", + "columns" : [ "id" ] + } + }, + "partitionKeys" : [ ] + } + }, + "abilities" : [ { + "type" : "FilterPushDown", + "predicates" : [ { + "kind" : "CALL", + "syntax" : "BINARY", + "internalName" : "$>$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "INT" + }, { + "kind" : "LITERAL", + "value" : 30, + "type" : "INT NOT NULL" + } ], + "type" : "BOOLEAN" + } ] + } ] + }, + "outputType" : "ROW<`id` INT NOT NULL, `name` VARCHAR(2147483647), `age` INT, `city` VARCHAR(2147483647), `state` VARCHAR(2147483647), `zipcode` INT> NOT NULL" + }, + "lookupKeys" : { + "0" : { + "type" : "FieldRef", + "index" : 1 + } + }, + "projectionOnTemporalTable" : null, + "filterOnTemporalTable" : null, + "lookupKeyContainsPrimaryKey" : true, + "inputChangelogMode" : [ "INSERT" ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`order_id` INT, `customer_id` INT, `total` DOUBLE, `id` INT, `name` VARCHAR(2147483647), `age` INT, `city` VARCHAR(2147483647), `state` VARCHAR(2147483647), `zipcode` INT>", + "description" : "LookupJoin(table=[default_catalog.default_database.customers_t], joinType=[LeftOuterJoin], lookup=[id=customer_id], joinCondition=[(total > 15.3)], select=[order_id, customer_id, total, id, name, age, city, state, zipcode])" + }, { + "id" : 15, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "INT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "DOUBLE" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : "INT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 5, + "type" : "INT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 6, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 7, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 8, + "type" : "INT" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`order_id` INT, `total` DOUBLE, `id` INT, `name` VARCHAR(2147483647), `age` INT, `city` VARCHAR(2147483647), `state` VARCHAR(2147483647), `zipcode` INT>", + "description" : "Calc(select=[order_id, total, id, name, age, city, state, zipcode])" + }, { + "id" : 16, + "type" : "stream-exec-sink_1", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "order_id", + "dataType" : "INT" + }, { + "name" : "total", + "dataType" : "DOUBLE" + }, { + "name" : "id", + "dataType" : "INT" + }, { + "name" : "name", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "age", + "dataType" : "INT" + }, { + "name" : "city", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "state", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "zipcode", + "dataType" : "INT" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "inputChangelogMode" : [ "INSERT" ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`order_id` INT, `total` DOUBLE, `id` INT, `name` VARCHAR(2147483647), `age` INT, `city` VARCHAR(2147483647), `state` VARCHAR(2147483647), `zipcode` INT>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[order_id, total, id, name, age, city, state, zipcode])" + } ], + "edges" : [ { + "source" : 13, + "target" : 14, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 14, + "target" : 15, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 15, + "target" : 16, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-pre-filter/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-pre-filter/savepoint/_metadata new file mode 100644 index 00000000000..14c370e5f13 Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-pre-filter/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-pre-post-filter/plan/lookup-join-pre-post-filter.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-pre-post-filter/plan/lookup-join-pre-post-filter.json new file mode 100644 index 00000000000..df07598a99c --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-pre-post-filter/plan/lookup-join-pre-post-filter.json @@ -0,0 +1,281 @@ +{ + "flinkVersion" : "1.19", + "nodes" : [ { + "id" : 21, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`orders_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "order_id", + "dataType" : "INT" + }, { + "name" : "customer_id", + "dataType" : "INT" + }, { + "name" : "total", + "dataType" : "DOUBLE" + }, { + "name" : "order_time", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "proc_time", + "kind" : "COMPUTED", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "internalName" : "$PROCTIME$1", + "operands" : [ ], + "type" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, + "serializableString" : "PROCTIME()" + } + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + }, + "abilities" : [ { + "type" : "ProjectPushDown", + "projectedFields" : [ [ 0 ], [ 1 ], [ 2 ] ], + "producedType" : "ROW<`order_id` INT, `customer_id` INT, `total` DOUBLE> NOT NULL" + }, { + "type" : "ReadingMetadata", + "metadataKeys" : [ ], + "producedType" : "ROW<`order_id` INT, `customer_id` INT, `total` DOUBLE> NOT NULL" + } ] + }, + "outputType" : "ROW<`order_id` INT, `customer_id` INT, `total` DOUBLE>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, orders_t, project=[order_id, customer_id, total], metadata=[]]], fields=[order_id, customer_id, total])", + "inputProperties" : [ ] + }, { + "id" : 22, + "type" : "stream-exec-lookup-join_1", + "joinType" : "LEFT", + "preFilterCondition" : { + "kind" : "CALL", + "syntax" : "BINARY", + "internalName" : "$>$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "DOUBLE" + }, { + "kind" : "LITERAL", + "value" : "15.3", + "type" : "DECIMAL(3, 1) NOT NULL" + } ], + "type" : "BOOLEAN" + }, + "joinCondition" : { + "kind" : "CALL", + "syntax" : "BINARY", + "internalName" : "$<$1", + "operands" : [ { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$CAST$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "DOUBLE" + } ], + "type" : "INT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 5, + "type" : "INT" + } ], + "type" : "BOOLEAN" + }, + "temporalTable" : { + "lookupTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`customers_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "id", + "dataType" : "INT NOT NULL" + }, { + "name" : "name", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "age", + "dataType" : "INT" + }, { + "name" : "city", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "state", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "zipcode", + "dataType" : "INT" + } ], + "watermarkSpecs" : [ ], + "primaryKey" : { + "name" : "PK_id", + "type" : "PRIMARY_KEY", + "columns" : [ "id" ] + } + }, + "partitionKeys" : [ ] + } + } + }, + "outputType" : "ROW<`id` INT NOT NULL, `name` VARCHAR(2147483647), `age` INT, `city` VARCHAR(2147483647), `state` VARCHAR(2147483647), `zipcode` INT> NOT NULL" + }, + "lookupKeys" : { + "0" : { + "type" : "FieldRef", + "index" : 1 + } + }, + "projectionOnTemporalTable" : null, + "filterOnTemporalTable" : null, + "lookupKeyContainsPrimaryKey" : true, + "inputChangelogMode" : [ "INSERT" ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`order_id` INT, `customer_id` INT, `total` DOUBLE, `id` INT, `name` VARCHAR(2147483647), `age` INT, `city` VARCHAR(2147483647), `state` VARCHAR(2147483647), `zipcode` INT>", + "description" : "LookupJoin(table=[default_catalog.default_database.customers_t], joinType=[LeftOuterJoin], lookup=[id=customer_id], joinCondition=[(total > 15.3)(CAST(total AS INTEGER) < age)], select=[order_id, customer_id, total, id, name, age, city, state, zipcode])" + }, { + "id" : 23, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "INT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "DOUBLE" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : "INT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 5, + "type" : "INT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 6, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 7, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 8, + "type" : "INT" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`order_id` INT, `total` DOUBLE, `id` INT, `name` VARCHAR(2147483647), `age` INT, `city` VARCHAR(2147483647), `state` VARCHAR(2147483647), `zipcode` INT>", + "description" : "Calc(select=[order_id, total, id, name, age, city, state, zipcode])" + }, { + "id" : 24, + "type" : "stream-exec-sink_1", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "order_id", + "dataType" : "INT" + }, { + "name" : "total", + "dataType" : "DOUBLE" + }, { + "name" : "id", + "dataType" : "INT" + }, { + "name" : "name", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "age", + "dataType" : "INT" + }, { + "name" : "city", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "state", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "zipcode", + "dataType" : "INT" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "inputChangelogMode" : [ "INSERT" ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`order_id` INT, `total` DOUBLE, `id` INT, `name` VARCHAR(2147483647), `age` INT, `city` VARCHAR(2147483647), `state` VARCHAR(2147483647), `zipcode` INT>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[order_id, total, id, name, age, city, state, zipcode])" + } ], + "edges" : [ { + "source" : 21, + "target" : 22, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 22, + "target" : 23, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 23, + "target" : 24, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-pre-post-filter/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-pre-post-filter/savepoint/_metadata new file mode 100644 index 00000000000..e755cc09068 Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-pre-post-filter/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-project-pushdown/plan/lookup-join-project-pushdown.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-project-pushdown/plan/lookup-join-project-pushdown.json new file mode 100644 index 00000000000..91502b1858a --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-project-pushdown/plan/lookup-join-project-pushdown.json @@ -0,0 +1,248 @@ +{ + "flinkVersion" : "1.19", + "nodes" : [ { + "id" : 1, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`orders_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "order_id", + "dataType" : "INT" + }, { + "name" : "customer_id", + "dataType" : "INT" + }, { + "name" : "total", + "dataType" : "DOUBLE" + }, { + "name" : "order_time", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "proc_time", + "kind" : "COMPUTED", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "internalName" : "$PROCTIME$1", + "operands" : [ ], + "type" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, + "serializableString" : "PROCTIME()" + } + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + }, + "abilities" : [ { + "type" : "ProjectPushDown", + "projectedFields" : [ [ 0 ], [ 1 ], [ 2 ] ], + "producedType" : "ROW<`order_id` INT, `customer_id` INT, `total` DOUBLE> NOT NULL" + }, { + "type" : "ReadingMetadata", + "metadataKeys" : [ ], + "producedType" : "ROW<`order_id` INT, `customer_id` INT, `total` DOUBLE> NOT NULL" + } ] + }, + "outputType" : "ROW<`order_id` INT, `customer_id` INT, `total` DOUBLE>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, orders_t, project=[order_id, customer_id, total], metadata=[]]], fields=[order_id, customer_id, total])", + "inputProperties" : [ ] + }, { + "id" : 2, + "type" : "stream-exec-lookup-join_1", + "joinType" : "INNER", + "joinCondition" : null, + "temporalTable" : { + "lookupTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`customers_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "id", + "dataType" : "INT NOT NULL" + }, { + "name" : "name", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "age", + "dataType" : "INT" + }, { + "name" : "city", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "state", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "zipcode", + "dataType" : "INT" + } ], + "watermarkSpecs" : [ ], + "primaryKey" : { + "name" : "PK_id", + "type" : "PRIMARY_KEY", + "columns" : [ "id" ] + } + }, + "partitionKeys" : [ ] + } + }, + "abilities" : [ { + "type" : "ProjectPushDown", + "projectedFields" : [ [ 0 ], [ 1 ], [ 3 ], [ 4 ], [ 5 ] ], + "producedType" : "ROW<`id` INT NOT NULL, `name` VARCHAR(2147483647), `city` VARCHAR(2147483647), `state` VARCHAR(2147483647), `zipcode` INT> NOT NULL" + }, { + "type" : "ReadingMetadata", + "metadataKeys" : [ ], + "producedType" : "ROW<`id` INT NOT NULL, `name` VARCHAR(2147483647), `city` VARCHAR(2147483647), `state` VARCHAR(2147483647), `zipcode` INT> NOT NULL" + } ] + }, + "outputType" : "ROW<`id` INT NOT NULL, `name` VARCHAR(2147483647), `city` VARCHAR(2147483647), `state` VARCHAR(2147483647), `zipcode` INT> NOT NULL" + }, + "lookupKeys" : { + "0" : { + "type" : "FieldRef", + "index" : 1 + } + }, + "projectionOnTemporalTable" : null, + "filterOnTemporalTable" : null, + "lookupKeyContainsPrimaryKey" : true, + "inputChangelogMode" : [ "INSERT" ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`order_id` INT, `customer_id` INT, `total` DOUBLE, `id` INT NOT NULL, `name` VARCHAR(2147483647), `city` VARCHAR(2147483647), `state` VARCHAR(2147483647), `zipcode` INT>", + "description" : "LookupJoin(table=[default_catalog.default_database.customers_t], joinType=[InnerJoin], lookup=[id=customer_id], select=[order_id, customer_id, total, id, name, city, state, zipcode])" + }, { + "id" : 3, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "INT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "DOUBLE" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : "INT NOT NULL" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 5, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 6, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 7, + "type" : "INT" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`order_id` INT, `total` DOUBLE, `id` INT NOT NULL, `name` VARCHAR(2147483647), `city` VARCHAR(2147483647), `state` VARCHAR(2147483647), `zipcode` INT>", + "description" : "Calc(select=[order_id, total, id, name, city, state, zipcode])" + }, { + "id" : 4, + "type" : "stream-exec-sink_1", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "order_id", + "dataType" : "INT" + }, { + "name" : "total", + "dataType" : "DOUBLE" + }, { + "name" : "id", + "dataType" : "INT" + }, { + "name" : "name", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "city", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "state", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "zipcode", + "dataType" : "INT" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "inputChangelogMode" : [ "INSERT" ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`order_id` INT, `total` DOUBLE, `id` INT NOT NULL, `name` VARCHAR(2147483647), `city` VARCHAR(2147483647), `state` VARCHAR(2147483647), `zipcode` INT>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[order_id, total, id, name, city, state, zipcode])" + } ], + "edges" : [ { + "source" : 1, + "target" : 2, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 2, + "target" : 3, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 3, + "target" : 4, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-project-pushdown/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-project-pushdown/savepoint/_metadata new file mode 100644 index 00000000000..6cda43d12c3 Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-project-pushdown/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-retry-hint/plan/lookup-join-retry-hint.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-retry-hint/plan/lookup-join-retry-hint.json new file mode 100644 index 00000000000..6463fbf848e --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-retry-hint/plan/lookup-join-retry-hint.json @@ -0,0 +1,252 @@ +{ + "flinkVersion" : "1.19", + "nodes" : [ { + "id" : 29, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`orders_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "order_id", + "dataType" : "INT" + }, { + "name" : "customer_id", + "dataType" : "INT" + }, { + "name" : "total", + "dataType" : "DOUBLE" + }, { + "name" : "order_time", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "proc_time", + "kind" : "COMPUTED", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "internalName" : "$PROCTIME$1", + "operands" : [ ], + "type" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, + "serializableString" : "PROCTIME()" + } + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + }, + "abilities" : [ { + "type" : "ProjectPushDown", + "projectedFields" : [ [ 0 ], [ 1 ], [ 2 ] ], + "producedType" : "ROW<`order_id` INT, `customer_id` INT, `total` DOUBLE> NOT NULL" + }, { + "type" : "ReadingMetadata", + "metadataKeys" : [ ], + "producedType" : "ROW<`order_id` INT, `customer_id` INT, `total` DOUBLE> NOT NULL" + } ] + }, + "outputType" : "ROW<`order_id` INT, `customer_id` INT, `total` DOUBLE>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, orders_t, project=[order_id, customer_id, total], metadata=[]]], fields=[order_id, customer_id, total])", + "inputProperties" : [ ] + }, { + "id" : 30, + "type" : "stream-exec-lookup-join_1", + "joinType" : "INNER", + "joinCondition" : null, + "temporalTable" : { + "lookupTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`customers_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "id", + "dataType" : "INT NOT NULL" + }, { + "name" : "name", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "age", + "dataType" : "INT" + }, { + "name" : "city", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "state", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "zipcode", + "dataType" : "INT" + } ], + "watermarkSpecs" : [ ], + "primaryKey" : { + "name" : "PK_id", + "type" : "PRIMARY_KEY", + "columns" : [ "id" ] + } + }, + "partitionKeys" : [ ] + } + } + }, + "outputType" : "ROW<`id` INT NOT NULL, `name` VARCHAR(2147483647), `age` INT, `city` VARCHAR(2147483647), `state` VARCHAR(2147483647), `zipcode` INT> NOT NULL" + }, + "lookupKeys" : { + "0" : { + "type" : "FieldRef", + "index" : 1 + } + }, + "projectionOnTemporalTable" : null, + "filterOnTemporalTable" : null, + "lookupKeyContainsPrimaryKey" : true, + "retryOptions" : { + "retry-predicate" : "lookup_miss", + "retry-strategy" : "FIXED_DELAY", + "fixed-delay" : 10000, + "max-attempts" : 3 + }, + "inputChangelogMode" : [ "INSERT" ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`order_id` INT, `customer_id` INT, `total` DOUBLE, `id` INT NOT NULL, `name` VARCHAR(2147483647), `age` INT, `city` VARCHAR(2147483647), `state` VARCHAR(2147483647), `zipcode` INT>", + "description" : "LookupJoin(table=[default_catalog.default_database.customers_t], joinType=[InnerJoin], lookup=[id=customer_id], select=[order_id, customer_id, total, id, name, age, city, state, zipcode], retry=[lookup_miss, FIXED_DELAY, 10000ms, 3])" + }, { + "id" : 31, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "INT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "DOUBLE" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : "INT NOT NULL" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 5, + "type" : "INT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 6, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 7, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 8, + "type" : "INT" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`order_id` INT, `total` DOUBLE, `id` INT NOT NULL, `name` VARCHAR(2147483647), `age` INT, `city` VARCHAR(2147483647), `state` VARCHAR(2147483647), `zipcode` INT>", + "description" : "Calc(select=[order_id, total, id, name, age, city, state, zipcode])" + }, { + "id" : 32, + "type" : "stream-exec-sink_1", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "order_id", + "dataType" : "INT" + }, { + "name" : "total", + "dataType" : "DOUBLE" + }, { + "name" : "id", + "dataType" : "INT" + }, { + "name" : "name", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "age", + "dataType" : "INT" + }, { + "name" : "city", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "state", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "zipcode", + "dataType" : "INT" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "inputChangelogMode" : [ "INSERT" ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`order_id` INT, `total` DOUBLE, `id` INT NOT NULL, `name` VARCHAR(2147483647), `age` INT, `city` VARCHAR(2147483647), `state` VARCHAR(2147483647), `zipcode` INT>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[order_id, total, id, name, age, city, state, zipcode])" + } ], + "edges" : [ { + "source" : 29, + "target" : 30, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 30, + "target" : 31, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 31, + "target" : 32, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-retry-hint/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-retry-hint/savepoint/_metadata new file mode 100644 index 00000000000..1d7e42191d8 Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-lookup-join_1/lookup-join-retry-hint/savepoint/_metadata differ