This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 46d817d8d297b50fe91b5fb9471bda791a6f4319 Author: Jim Hughes <jhug...@confluent.io> AuthorDate: Mon Dec 11 13:26:42 2023 -0500 [FLINK-33767] Implement restore tests for TemporalJoin node This closes #23916 --- .../flink/table/test/program/TableTestProgram.java | 22 + .../test/program/TemporalFunctionTestStep.java | 67 +++ .../apache/flink/table/test/program/TestStep.java | 1 + .../nodes/exec/stream/TemporalJoinRestoreTest.java | 40 ++ .../exec/stream/TemporalJoinTestPrograms.java | 103 +++++ .../plan/nodes/exec/testutils/RestoreTestBase.java | 3 + .../testTemporalTableJoin.json | 421 ++++++++++++++++++ .../plan/temporal-join-table-join.json | 494 +++++++++++++++++++++ .../temporal-join-table-join/savepoint/_metadata | Bin 0 -> 14926 bytes .../plan/temporal-join-temporal-function.json | 494 +++++++++++++++++++++ .../savepoint/_metadata | Bin 0 -> 14926 bytes 11 files changed, 1645 insertions(+) diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestProgram.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestProgram.java index 7777f5323b0..7d4c4b45eb5 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestProgram.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestProgram.java @@ -20,6 +20,7 @@ package org.apache.flink.table.test.program; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.table.api.Table; +import org.apache.flink.table.expressions.Expression; import org.apache.flink.table.functions.UserDefinedFunction; import org.apache.flink.table.test.program.FunctionTestStep.FunctionBehavior; import org.apache.flink.table.test.program.FunctionTestStep.FunctionPersistence; @@ -176,6 +177,14 @@ public class TableTestProgram { .collect(Collectors.toList()); } + /** Convenience method to avoid casting. It assumes that the order of steps is not important. */ + public List<TemporalFunctionTestStep> getSetupTemporalFunctionTestSteps() { + return setupSteps.stream() + .filter(s -> s.getKind() == TestKind.TEMPORAL_FUNCTION) + .map(TemporalFunctionTestStep.class::cast) + .collect(Collectors.toList()); + } + /** * Convenience method to avoid boilerplate code. It assumes that only a single SQL statement is * tested. @@ -231,6 +240,19 @@ public class TableTestProgram { return this; } + /** Setup step for registering a temporary system function. */ + public Builder setupTemporarySystemTemporalTableFunction( + String name, String table, Expression timeAttribute, Expression primaryKey) { + this.setupSteps.add( + new TemporalFunctionTestStep( + TemporalFunctionTestStep.FunctionBehavior.SYSTEM, + name, + table, + timeAttribute, + primaryKey)); + return this; + } + /** Setup step for registering a temporary catalog function. */ public Builder setupTemporaryCatalogFunction( String name, Class<? extends UserDefinedFunction> function) { diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TemporalFunctionTestStep.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TemporalFunctionTestStep.java new file mode 100644 index 00000000000..206f7fa38c1 --- /dev/null +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TemporalFunctionTestStep.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.test.program; + +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.functions.TemporalTableFunction; + +/** Test step for registering a (temporary) (system or catalog) function. */ +public final class TemporalFunctionTestStep implements TestStep { + + /** Whether function should be persisted in a catalog or not. */ + enum FunctionBehavior { + SYSTEM, + CATALOG + } + + public final FunctionBehavior behavior; + public final String name; + public final String table; + public final Expression timeAttribute; + public final Expression primaryKey; + + TemporalFunctionTestStep( + FunctionBehavior behavior, + String name, + String table, + Expression timeAttribute, + Expression primaryKey) { + this.behavior = behavior; + this.name = name; + this.table = table; + this.timeAttribute = timeAttribute; + this.primaryKey = primaryKey; + } + + @Override + public TestKind getKind() { + return TestKind.TEMPORAL_FUNCTION; + } + + public void apply(TableEnvironment env) { + TemporalTableFunction function = + env.from(table).createTemporalTableFunction(timeAttribute, primaryKey); + if (behavior == FunctionBehavior.SYSTEM) { + env.createTemporarySystemFunction(name, function); + } else { + env.createTemporaryFunction(name, function); + } + } +} diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TestStep.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TestStep.java index 6839a66067c..293789a2353 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TestStep.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TestStep.java @@ -43,6 +43,7 @@ public interface TestStep { STATEMENT_SET, CONFIG, FUNCTION, + TEMPORAL_FUNCTION, SOURCE_WITHOUT_DATA, SOURCE_WITH_DATA, SOURCE_WITH_RESTORE_DATA, diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinRestoreTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinRestoreTest.java new file mode 100644 index 00000000000..44f48fc24c3 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinRestoreTest.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase; +import org.apache.flink.table.test.program.TableTestProgram; + +import java.util.Arrays; +import java.util.List; + +/** Restore tests for {@link StreamExecTemporalJoin}. */ +public class TemporalJoinRestoreTest extends RestoreTestBase { + + public TemporalJoinRestoreTest() { + super(StreamExecTemporalJoin.class); + } + + @Override + public List<TableTestProgram> programs() { + return Arrays.asList( + TemporalJoinTestPrograms.TEMPORAL_JOIN_TABLE_JOIN, + TemporalJoinTestPrograms.TEMPORAL_JOIN_TEMPORAL_FUNCTION); + } +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinTestPrograms.java new file mode 100644 index 00000000000..883d628fbd9 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinTestPrograms.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.test.program.SinkTestStep; +import org.apache.flink.table.test.program.SourceTestStep; +import org.apache.flink.table.test.program.TableTestProgram; +import org.apache.flink.types.Row; + +import static org.apache.flink.table.api.Expressions.$; + +/** {@link TableTestProgram} definitions for testing {@link StreamExecTemporalJoin}. */ +public class TemporalJoinTestPrograms { + static final SourceTestStep ORDERS = + SourceTestStep.newBuilder("Orders") + .addSchema( + "amount bigint", + "currency STRING", + "order_time STRING", + "rowtime as TO_TIMESTAMP(order_time) ", + "WATERMARK FOR rowtime AS rowtime") + .producedBeforeRestore( + Row.of(2L, "Euro", "2020-10-10 00:00:42"), + Row.of(1L, "USD", "2020-10-10 00:00:43"), + Row.of(50L, "Yen", "2020-10-10 00:00:44"), + Row.of(3L, "Euro", "2020-10-10 00:00:45")) + .producedAfterRestore( + Row.of(1L, "Euro", "2020-10-10 00:00:58"), + Row.of(1L, "USD", "2020-10-10 00:00:58")) + .build(); + + static final SourceTestStep RATES = + SourceTestStep.newBuilder("RatesHistory") + .addSchema( + "currency STRING", + "rate bigint", + "rate_time STRING", + "rowtime as TO_TIMESTAMP(rate_time) ", + "WATERMARK FOR rowtime AS rowtime", + "PRIMARY KEY(currency) NOT ENFORCED") + .producedBeforeRestore( + Row.of("USD", 102L, "2020-10-10 00:00:41"), + Row.of("Euro", 114L, "2020-10-10 00:00:41"), + Row.of("Yen", 1L, "2020-10-10 00:00:41"), + Row.of("Euro", 116L, "2020-10-10 00:00:45"), + Row.of("Euro", 119L, "2020-10-10 00:00:47")) + .producedAfterRestore( + Row.of("USD", 103L, "2020-10-10 00:00:58"), + Row.of("Euro", 120L, "2020-10-10 00:00:59")) + .build(); + + static final SinkTestStep AMOUNTS = + SinkTestStep.newBuilder("MySink") + .addSchema("amount bigint") + .consumedBeforeRestore("+I[102]", "+I[228]", "+I[348]", "+I[50]") + .consumedAfterRestore("+I[103]", "+I[119]") + .build(); + static final TableTestProgram TEMPORAL_JOIN_TABLE_JOIN = + TableTestProgram.of("temporal-join-table-join", "validates temporal join with a table") + .setupTableSource(ORDERS) + .setupTableSource(RATES) + .setupTableSink(AMOUNTS) + .runSql( + "INSERT INTO MySink " + + "SELECT amount * r.rate " + + "FROM Orders AS o " + + "JOIN RatesHistory FOR SYSTEM_TIME AS OF o.rowtime AS r " + + "ON o.currency = r.currency ") + .build(); + + static final TableTestProgram TEMPORAL_JOIN_TEMPORAL_FUNCTION = + TableTestProgram.of( + "temporal-join-temporal-function", + "validates temporal join with a temporal function") + .setupTableSource(ORDERS) + .setupTableSource(RATES) + .setupTemporarySystemTemporalTableFunction( + "Rates", "RatesHistory", $("rowtime"), $("currency")) + .setupTableSink(AMOUNTS) + .runSql( + "INSERT INTO MySink " + + "SELECT amount * r.rate " + + "FROM Orders AS o, " + + "LATERAL TABLE (Rates(o.rowtime)) AS r " + + "WHERE o.currency = r.currency ") + .build(); +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java index 8dd4646df10..619c3b3ba67 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java @@ -114,6 +114,7 @@ public abstract class RestoreTestBase implements TableTestProgramRunner { return EnumSet.of( TestKind.CONFIG, TestKind.FUNCTION, + TestKind.TEMPORAL_FUNCTION, TestKind.SOURCE_WITH_RESTORE_DATA, TestKind.SINK_WITH_RESTORE_DATA); } @@ -207,6 +208,7 @@ public abstract class RestoreTestBase implements TableTestProgramRunner { } program.getSetupFunctionTestSteps().forEach(s -> s.apply(tEnv)); + program.getSetupTemporalFunctionTestSteps().forEach(s -> s.apply(tEnv)); final SqlTestStep sqlTestStep = program.getRunSqlTestStep(); @@ -271,6 +273,7 @@ public abstract class RestoreTestBase implements TableTestProgramRunner { } program.getSetupFunctionTestSteps().forEach(s -> s.apply(tEnv)); + program.getSetupTemporalFunctionTestSteps().forEach(s -> s.apply(tEnv)); final CompiledPlan compiledPlan = tEnv.loadPlan(PlanReference.fromFile(getPlanPath(program, metadata))); diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinJsonPlanTest_jsonplan/testTemporalTableJoin.json b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinJsonPlanTest_jsonplan/testTemporalTableJoin.json new file mode 100644 index 00000000000..bbfd1524d3b --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinJsonPlanTest_jsonplan/testTemporalTableJoin.json @@ -0,0 +1,421 @@ +{ + "flinkVersion" : "", + "nodes" : [ { + "id" : 1, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`Orders`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "amount", + "dataType" : "INT" + }, { + "name" : "currency", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "rowtime", + "dataType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "name" : "proctime", + "kind" : "COMPUTED", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "internalName" : "$PROCTIME$1", + "operands" : [ ], + "type" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, + "serializableString" : "PROCTIME()" + } + } ], + "watermarkSpecs" : [ { + "rowtimeAttribute" : "rowtime", + "expression" : { + "rexNode" : { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "TIMESTAMP(3)" + }, + "serializableString" : "`rowtime`" + } + } ] + }, + "partitionKeys" : [ ], + "options" : { + "connector" : "values" + } + } + } + }, + "outputType" : "ROW<`amount` INT, `currency` VARCHAR(2147483647), `rowtime` TIMESTAMP(3)>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, Orders]], fields=[amount, currency, rowtime])", + "inputProperties" : [ ] + }, { + "id" : 2, + "type" : "stream-exec-watermark-assigner_1", + "watermarkExpr" : { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "TIMESTAMP(3)" + }, + "rowtimeFieldIndex" : 2, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "amount", + "fieldType" : "INT" + }, { + "name" : "currency", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "rowtime", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])" + }, { + "id" : 3, + "type" : "stream-exec-exchange_1", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "HASH", + "keys" : [ 1 ] + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "amount", + "fieldType" : "INT" + }, { + "name" : "currency", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "rowtime", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "Exchange(distribution=[hash[currency]])" + }, { + "id" : 4, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`RatesHistory`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "currency", + "dataType" : "VARCHAR(2147483647) NOT NULL" + }, { + "name" : "rate", + "dataType" : "INT" + }, { + "name" : "rowtime", + "dataType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } ], + "watermarkSpecs" : [ { + "rowtimeAttribute" : "rowtime", + "expression" : { + "rexNode" : { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "TIMESTAMP(3)" + }, + "serializableString" : "`rowtime`" + } + } ], + "primaryKey" : { + "name" : "PK_currency", + "type" : "PRIMARY_KEY", + "columns" : [ "currency" ] + } + }, + "partitionKeys" : [ ], + "options" : { + "connector" : "values" + } + } + } + }, + "outputType" : "ROW<`currency` VARCHAR(2147483647) NOT NULL, `rate` INT, `rowtime` TIMESTAMP(3)>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, RatesHistory]], fields=[currency, rate, rowtime])", + "inputProperties" : [ ] + }, { + "id" : 5, + "type" : "stream-exec-watermark-assigner_1", + "watermarkExpr" : { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "TIMESTAMP(3)" + }, + "rowtimeFieldIndex" : 2, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "currency", + "fieldType" : "VARCHAR(2147483647) NOT NULL" + }, { + "name" : "rate", + "fieldType" : "INT" + }, { + "name" : "rowtime", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])" + }, { + "id" : 6, + "type" : "stream-exec-exchange_1", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "HASH", + "keys" : [ 0 ] + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "currency", + "fieldType" : "VARCHAR(2147483647) NOT NULL" + }, { + "name" : "rate", + "fieldType" : "INT" + }, { + "name" : "rowtime", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "Exchange(distribution=[hash[currency]])" + }, { + "id" : 7, + "type" : "stream-exec-temporal-join_1", + "joinSpec" : { + "joinType" : "INNER", + "leftKeys" : [ 1 ], + "rightKeys" : [ 0 ], + "filterNulls" : [ true ], + "nonEquiCondition" : null + }, + "isTemporalFunctionJoin" : false, + "leftTimeAttributeIndex" : 2, + "rightTimeAttributeIndex" : 2, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + }, { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "amount", + "fieldType" : "INT" + }, { + "name" : "currency", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "rowtime", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "name" : "currency0", + "fieldType" : "VARCHAR(2147483647) NOT NULL" + }, { + "name" : "rate", + "fieldType" : "INT" + }, { + "name" : "rowtime0", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "TemporalJoin(joinType=[InnerJoin], where=[((currency = currency0) AND __TEMPORAL_JOIN_CONDITION(rowtime, rowtime0, __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(currency0), __TEMPORAL_JOIN_LEFT_KEY(currency), __TEMPORAL_JOIN_RIGHT_KEY(currency0)))], select=[amount, currency, rowtime, currency0, rate, rowtime0])" + }, { + "id" : 8, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "CALL", + "syntax" : "BINARY", + "internalName" : "$*$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "INT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : "INT" + } ], + "type" : "INT" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`EXPR$0` INT>", + "description" : "Calc(select=[(amount * rate) AS EXPR$0])" + }, { + "id" : 9, + "type" : "stream-exec-sink_1", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`MySink`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "INT" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ], + "options" : { + "connector" : "values", + "table-sink-class" : "DEFAULT" + } + } + } + }, + "inputChangelogMode" : [ "INSERT" ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`EXPR$0` INT>", + "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[EXPR$0])" + } ], + "edges" : [ { + "source" : 1, + "target" : 2, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 2, + "target" : 3, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 4, + "target" : 5, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 5, + "target" : 6, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 3, + "target" : 7, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 6, + "target" : 7, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 7, + "target" : 8, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 8, + "target" : 9, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-join_1/temporal-join-table-join/plan/temporal-join-table-join.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-join_1/temporal-join-table-join/plan/temporal-join-table-join.json new file mode 100644 index 00000000000..741b5a53df3 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-join_1/temporal-join-table-join/plan/temporal-join-table-join.json @@ -0,0 +1,494 @@ +{ + "flinkVersion" : "1.19", + "nodes" : [ { + "id" : 1, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`Orders`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "amount", + "dataType" : "BIGINT" + }, { + "name" : "currency", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "order_time", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "rowtime", + "kind" : "COMPUTED", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "internalName" : "$TO_TIMESTAMP$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "VARCHAR(2147483647)" + } ], + "type" : "TIMESTAMP(3)" + }, + "serializableString" : "TO_TIMESTAMP(`order_time`)" + } + } ], + "watermarkSpecs" : [ { + "rowtimeAttribute" : "rowtime", + "expression" : { + "rexNode" : { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : "TIMESTAMP(3)" + }, + "serializableString" : "`rowtime`" + } + } ] + }, + "partitionKeys" : [ ] + } + } + }, + "outputType" : "ROW<`amount` BIGINT, `currency` VARCHAR(2147483647), `order_time` VARCHAR(2147483647)>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, Orders]], fields=[amount, currency, order_time])", + "inputProperties" : [ ] + }, { + "id" : 2, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "BIGINT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "CALL", + "internalName" : "$TO_TIMESTAMP$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "VARCHAR(2147483647)" + } ], + "type" : "TIMESTAMP(3)" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`amount` BIGINT, `currency` VARCHAR(2147483647), `rowtime` TIMESTAMP(3)>", + "description" : "Calc(select=[amount, currency, TO_TIMESTAMP(order_time) AS rowtime])" + }, { + "id" : 3, + "type" : "stream-exec-watermark-assigner_1", + "watermarkExpr" : { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "TIMESTAMP(3)" + }, + "rowtimeFieldIndex" : 2, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "amount", + "fieldType" : "BIGINT" + }, { + "name" : "currency", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "rowtime", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])" + }, { + "id" : 4, + "type" : "stream-exec-exchange_1", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "HASH", + "keys" : [ 1 ] + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "amount", + "fieldType" : "BIGINT" + }, { + "name" : "currency", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "rowtime", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "Exchange(distribution=[hash[currency]])" + }, { + "id" : 5, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`RatesHistory`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "currency", + "dataType" : "VARCHAR(2147483647) NOT NULL" + }, { + "name" : "rate", + "dataType" : "BIGINT" + }, { + "name" : "rate_time", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "rowtime", + "kind" : "COMPUTED", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "internalName" : "$TO_TIMESTAMP$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "VARCHAR(2147483647)" + } ], + "type" : "TIMESTAMP(3)" + }, + "serializableString" : "TO_TIMESTAMP(`rate_time`)" + } + } ], + "watermarkSpecs" : [ { + "rowtimeAttribute" : "rowtime", + "expression" : { + "rexNode" : { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : "TIMESTAMP(3)" + }, + "serializableString" : "`rowtime`" + } + } ], + "primaryKey" : { + "name" : "PK_currency", + "type" : "PRIMARY_KEY", + "columns" : [ "currency" ] + } + }, + "partitionKeys" : [ ] + } + } + }, + "outputType" : "ROW<`currency` VARCHAR(2147483647) NOT NULL, `rate` BIGINT, `rate_time` VARCHAR(2147483647)>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, RatesHistory]], fields=[currency, rate, rate_time])", + "inputProperties" : [ ] + }, { + "id" : 6, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647) NOT NULL" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "BIGINT" + }, { + "kind" : "CALL", + "internalName" : "$TO_TIMESTAMP$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "VARCHAR(2147483647)" + } ], + "type" : "TIMESTAMP(3)" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`currency` VARCHAR(2147483647) NOT NULL, `rate` BIGINT, `rowtime` TIMESTAMP(3)>", + "description" : "Calc(select=[currency, rate, TO_TIMESTAMP(rate_time) AS rowtime])" + }, { + "id" : 7, + "type" : "stream-exec-watermark-assigner_1", + "watermarkExpr" : { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "TIMESTAMP(3)" + }, + "rowtimeFieldIndex" : 2, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "currency", + "fieldType" : "VARCHAR(2147483647) NOT NULL" + }, { + "name" : "rate", + "fieldType" : "BIGINT" + }, { + "name" : "rowtime", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])" + }, { + "id" : 8, + "type" : "stream-exec-exchange_1", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "HASH", + "keys" : [ 0 ] + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "currency", + "fieldType" : "VARCHAR(2147483647) NOT NULL" + }, { + "name" : "rate", + "fieldType" : "BIGINT" + }, { + "name" : "rowtime", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "Exchange(distribution=[hash[currency]])" + }, { + "id" : 9, + "type" : "stream-exec-temporal-join_1", + "joinSpec" : { + "joinType" : "INNER", + "leftKeys" : [ 1 ], + "rightKeys" : [ 0 ], + "filterNulls" : [ true ], + "nonEquiCondition" : null + }, + "isTemporalFunctionJoin" : false, + "leftTimeAttributeIndex" : 2, + "rightTimeAttributeIndex" : 2, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + }, { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "amount", + "fieldType" : "BIGINT" + }, { + "name" : "currency", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "rowtime", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "name" : "currency0", + "fieldType" : "VARCHAR(2147483647) NOT NULL" + }, { + "name" : "rate", + "fieldType" : "BIGINT" + }, { + "name" : "rowtime0", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "TemporalJoin(joinType=[InnerJoin], where=[((currency = currency0) AND __TEMPORAL_JOIN_CONDITION(rowtime, rowtime0, __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(currency0), __TEMPORAL_JOIN_LEFT_KEY(currency), __TEMPORAL_JOIN_RIGHT_KEY(currency0)))], select=[amount, currency, rowtime, currency0, rate, rowtime0])" + }, { + "id" : 10, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "CALL", + "syntax" : "BINARY", + "internalName" : "$*$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "BIGINT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : "BIGINT" + } ], + "type" : "BIGINT" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`EXPR$0` BIGINT>", + "description" : "Calc(select=[(amount * rate) AS EXPR$0])" + }, { + "id" : 11, + "type" : "stream-exec-sink_1", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`MySink`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "amount", + "dataType" : "BIGINT" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "inputChangelogMode" : [ "INSERT" ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`EXPR$0` BIGINT>", + "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[EXPR$0])" + } ], + "edges" : [ { + "source" : 1, + "target" : 2, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 2, + "target" : 3, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 3, + "target" : 4, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 5, + "target" : 6, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 6, + "target" : 7, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 7, + "target" : 8, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 4, + "target" : 9, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 8, + "target" : 9, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 9, + "target" : 10, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 10, + "target" : 11, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-join_1/temporal-join-table-join/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-join_1/temporal-join-table-join/savepoint/_metadata new file mode 100644 index 00000000000..4b11a62993e Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-join_1/temporal-join-table-join/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-join_1/temporal-join-temporal-function/plan/temporal-join-temporal-function.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-join_1/temporal-join-temporal-function/plan/temporal-join-temporal-function.json new file mode 100644 index 00000000000..c6db00d4ef2 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-join_1/temporal-join-temporal-function/plan/temporal-join-temporal-function.json @@ -0,0 +1,494 @@ +{ + "flinkVersion" : "1.19", + "nodes" : [ { + "id" : 12, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`Orders`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "amount", + "dataType" : "BIGINT" + }, { + "name" : "currency", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "order_time", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "rowtime", + "kind" : "COMPUTED", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "internalName" : "$TO_TIMESTAMP$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "VARCHAR(2147483647)" + } ], + "type" : "TIMESTAMP(3)" + }, + "serializableString" : "TO_TIMESTAMP(`order_time`)" + } + } ], + "watermarkSpecs" : [ { + "rowtimeAttribute" : "rowtime", + "expression" : { + "rexNode" : { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : "TIMESTAMP(3)" + }, + "serializableString" : "`rowtime`" + } + } ] + }, + "partitionKeys" : [ ] + } + } + }, + "outputType" : "ROW<`amount` BIGINT, `currency` VARCHAR(2147483647), `order_time` VARCHAR(2147483647)>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, Orders]], fields=[amount, currency, order_time])", + "inputProperties" : [ ] + }, { + "id" : 13, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "BIGINT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "CALL", + "internalName" : "$TO_TIMESTAMP$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "VARCHAR(2147483647)" + } ], + "type" : "TIMESTAMP(3)" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`amount` BIGINT, `currency` VARCHAR(2147483647), `rowtime` TIMESTAMP(3)>", + "description" : "Calc(select=[amount, currency, TO_TIMESTAMP(order_time) AS rowtime])" + }, { + "id" : 14, + "type" : "stream-exec-watermark-assigner_1", + "watermarkExpr" : { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "TIMESTAMP(3)" + }, + "rowtimeFieldIndex" : 2, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "amount", + "fieldType" : "BIGINT" + }, { + "name" : "currency", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "rowtime", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])" + }, { + "id" : 15, + "type" : "stream-exec-exchange_1", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "HASH", + "keys" : [ 1 ] + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "amount", + "fieldType" : "BIGINT" + }, { + "name" : "currency", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "rowtime", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "Exchange(distribution=[hash[currency]])" + }, { + "id" : 16, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`RatesHistory`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "currency", + "dataType" : "VARCHAR(2147483647) NOT NULL" + }, { + "name" : "rate", + "dataType" : "BIGINT" + }, { + "name" : "rate_time", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "rowtime", + "kind" : "COMPUTED", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "internalName" : "$TO_TIMESTAMP$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "VARCHAR(2147483647)" + } ], + "type" : "TIMESTAMP(3)" + }, + "serializableString" : "TO_TIMESTAMP(`rate_time`)" + } + } ], + "watermarkSpecs" : [ { + "rowtimeAttribute" : "rowtime", + "expression" : { + "rexNode" : { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : "TIMESTAMP(3)" + }, + "serializableString" : "`rowtime`" + } + } ], + "primaryKey" : { + "name" : "PK_currency", + "type" : "PRIMARY_KEY", + "columns" : [ "currency" ] + } + }, + "partitionKeys" : [ ] + } + } + }, + "outputType" : "ROW<`currency` VARCHAR(2147483647) NOT NULL, `rate` BIGINT, `rate_time` VARCHAR(2147483647)>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, RatesHistory]], fields=[currency, rate, rate_time])", + "inputProperties" : [ ] + }, { + "id" : 17, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647) NOT NULL" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "BIGINT" + }, { + "kind" : "CALL", + "internalName" : "$TO_TIMESTAMP$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "VARCHAR(2147483647)" + } ], + "type" : "TIMESTAMP(3)" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`currency` VARCHAR(2147483647) NOT NULL, `rate` BIGINT, `rowtime` TIMESTAMP(3)>", + "description" : "Calc(select=[currency, rate, TO_TIMESTAMP(rate_time) AS rowtime])" + }, { + "id" : 18, + "type" : "stream-exec-watermark-assigner_1", + "watermarkExpr" : { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "TIMESTAMP(3)" + }, + "rowtimeFieldIndex" : 2, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "currency", + "fieldType" : "VARCHAR(2147483647) NOT NULL" + }, { + "name" : "rate", + "fieldType" : "BIGINT" + }, { + "name" : "rowtime", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])" + }, { + "id" : 19, + "type" : "stream-exec-exchange_1", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "HASH", + "keys" : [ 0 ] + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "currency", + "fieldType" : "VARCHAR(2147483647) NOT NULL" + }, { + "name" : "rate", + "fieldType" : "BIGINT" + }, { + "name" : "rowtime", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "Exchange(distribution=[hash[currency]])" + }, { + "id" : 20, + "type" : "stream-exec-temporal-join_1", + "joinSpec" : { + "joinType" : "INNER", + "leftKeys" : [ 1 ], + "rightKeys" : [ 0 ], + "filterNulls" : [ true ], + "nonEquiCondition" : null + }, + "isTemporalFunctionJoin" : true, + "leftTimeAttributeIndex" : 2, + "rightTimeAttributeIndex" : 2, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + }, { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "amount", + "fieldType" : "BIGINT" + }, { + "name" : "currency", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "rowtime", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "name" : "currency0", + "fieldType" : "VARCHAR(2147483647) NOT NULL" + }, { + "name" : "rate", + "fieldType" : "BIGINT" + }, { + "name" : "rowtime0", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "TemporalJoin(joinType=[InnerJoin], where=[(__TEMPORAL_JOIN_CONDITION(rowtime, rowtime0, __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(currency0)) AND (currency = currency0))], select=[amount, currency, rowtime, currency0, rate, rowtime0])" + }, { + "id" : 21, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "CALL", + "syntax" : "BINARY", + "internalName" : "$*$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "BIGINT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : "BIGINT" + } ], + "type" : "BIGINT" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`EXPR$0` BIGINT>", + "description" : "Calc(select=[(amount * rate) AS EXPR$0])" + }, { + "id" : 22, + "type" : "stream-exec-sink_1", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`MySink`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "amount", + "dataType" : "BIGINT" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "inputChangelogMode" : [ "INSERT" ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`EXPR$0` BIGINT>", + "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[EXPR$0])" + } ], + "edges" : [ { + "source" : 12, + "target" : 13, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 13, + "target" : 14, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 14, + "target" : 15, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 16, + "target" : 17, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 17, + "target" : 18, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 18, + "target" : 19, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 15, + "target" : 20, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 19, + "target" : 20, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 20, + "target" : 21, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 21, + "target" : 22, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-join_1/temporal-join-temporal-function/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-join_1/temporal-join-temporal-function/savepoint/_metadata new file mode 100644 index 00000000000..e3307ab0c9e Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-join_1/temporal-join-temporal-function/savepoint/_metadata differ