This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 01cdc703ee6 [FLINK-24239] Event time temporal join should support values from array, map, row, etc. as join key (#24253) 01cdc703ee6 is described below commit 01cdc703ee6fa56bdfdf799d016c0e882e9e5d99 Author: Dawid Wysakowicz <dwysakow...@apache.org> AuthorDate: Thu Feb 8 15:26:16 2024 +0100 [FLINK-24239] Event time temporal join should support values from array, map, row, etc. as join key (#24253) --- ...gicalCorrelateToJoinFromTemporalTableRule.scala | 151 +++--- .../nodes/exec/stream/TemporalJoinRestoreTest.java | 2 + .../exec/stream/TemporalJoinTestPrograms.java | 82 +++ .../temporal-join-table-join-key-from-map.json | 569 +++++++++++++++++++ .../savepoint/_metadata | Bin 0 -> 14977 bytes .../plan/temporal-join-table-join-nested-key.json | 600 +++++++++++++++++++++ .../savepoint/_metadata | Bin 0 -> 14973 bytes 7 files changed, 1336 insertions(+), 68 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala index 217b9597561..25f1d29d8ea 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala @@ -26,11 +26,12 @@ import org.apache.flink.table.planner.plan.schema.{LegacyTableSourceTable, Table import org.apache.flink.table.planner.plan.utils.TemporalJoinUtil import org.apache.flink.table.sources.LookupableTableSource -import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand, RelOptUtil} import org.apache.calcite.plan.RelOptRule.{any, operand} import org.apache.calcite.plan.hep.{HepPlanner, HepRelVertex} +import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.core.TableScan +import org.apache.calcite.rel.core.{CorrelationId, TableScan} import org.apache.calcite.rel.logical._ import org.apache.calcite.rex._ @@ -141,6 +142,30 @@ abstract class LogicalCorrelateToJoinFromTemporalTableRule( } case _ => false } + + protected def decorrelate( + rexNode: RexNode, + leftRowType: RelDataType, + correlationId: CorrelationId): RexNode = { + rexNode.accept(new RexShuttle() { + // change correlate variable expression to normal RexInputRef (which is from left side) + override def visitFieldAccess(fieldAccess: RexFieldAccess): RexNode = { + fieldAccess.getReferenceExpr match { + case corVar: RexCorrelVariable => + require(correlationId.equals(corVar.id)) + val index = leftRowType.getFieldList.indexOf(fieldAccess.getField) + RexInputRef.of(index, leftRowType) + case _ => super.visitFieldAccess(fieldAccess) + } + } + + // update the field index from right side + override def visitInputRef(inputRef: RexInputRef): RexNode = { + val rightIndex = leftRowType.getFieldCount + inputRef.getIndex + new RexInputRef(rightIndex, inputRef.getType) + } + }) + } } /** @@ -161,24 +186,7 @@ abstract class LogicalCorrelateToJoinFromLookupTemporalTableRule( validateSnapshotInCorrelate(snapshot, correlate) val leftRowType = leftInput.getRowType - val joinCondition = filterCondition.accept(new RexShuttle() { - // change correlate variable expression to normal RexInputRef (which is from left side) - override def visitFieldAccess(fieldAccess: RexFieldAccess): RexNode = { - fieldAccess.getReferenceExpr match { - case corVar: RexCorrelVariable => - require(correlate.getCorrelationId.equals(corVar.id)) - val index = leftRowType.getFieldList.indexOf(fieldAccess.getField) - RexInputRef.of(index, leftRowType) - case _ => super.visitFieldAccess(fieldAccess) - } - } - - // update the field index from right side - override def visitInputRef(inputRef: RexInputRef): RexNode = { - val rightIndex = leftRowType.getFieldCount + inputRef.getIndex - new RexInputRef(rightIndex, inputRef.getType) - } - }) + val joinCondition = decorrelate(filterCondition, leftRowType, correlate.getCorrelationId) val builder = call.builder() builder.push(leftInput) @@ -198,8 +206,8 @@ abstract class LogicalCorrelateToJoinFromGeneralTemporalTableRule( protected def extractRightEventTimeInputRef( leftInput: RelNode, - snapshot: LogicalSnapshot): Option[RexNode] = { - val rightFields = snapshot.getRowType.getFieldList.asScala + rightInput: RelNode): Option[RexNode] = { + val rightFields = rightInput.getRowType.getFieldList.asScala val timeAttributeFields = rightFields.filter( f => f.getType.isInstanceOf[TimeIndicatorRelDataType] && @@ -209,7 +217,7 @@ abstract class LogicalCorrelateToJoinFromGeneralTemporalTableRule( val timeColIndex = leftInput.getRowType.getFieldCount + rightFields.indexOf(timeAttributeFields.get(0)) val timeColDataType = timeAttributeFields.get(0).getType - val rexBuilder = snapshot.getCluster.getRexBuilder + val rexBuilder = rightInput.getCluster.getRexBuilder Some(rexBuilder.makeInputRef(timeColDataType, timeColIndex)) } else { None @@ -237,57 +245,32 @@ abstract class LogicalCorrelateToJoinFromGeneralTemporalTableRule( val snapshot = getLogicalSnapshot(call) val leftRowType = leftInput.getRowType - val joinCondition = filterCondition.accept(new RexShuttle() { - // change correlate variable expression to normal RexInputRef (which is from left side) - override def visitFieldAccess(fieldAccess: RexFieldAccess): RexNode = { - fieldAccess.getReferenceExpr match { - case corVar: RexCorrelVariable => - require(correlate.getCorrelationId.equals(corVar.id)) - val index = leftRowType.getFieldList.indexOf(fieldAccess.getField) - RexInputRef.of(index, leftRowType) - case _ => super.visitFieldAccess(fieldAccess) - } - } - - // update the field index from right side - override def visitInputRef(inputRef: RexInputRef): RexNode = { - val rightIndex = leftRowType.getFieldCount + inputRef.getIndex - new RexInputRef(rightIndex, inputRef.getType) - } - }) + val joinCondition = decorrelate(filterCondition, leftRowType, correlate.getCorrelationId) validateSnapshotInCorrelate(snapshot, correlate) val rexBuilder = correlate.getCluster.getRexBuilder - val (leftJoinKey, rightJoinKey) = { - val relBuilder = call.builder() - relBuilder.push(leftInput) - relBuilder.push(snapshot) - val rewriteJoin = relBuilder.join(correlate.getJoinType, joinCondition).build() - val joinInfo = rewriteJoin.asInstanceOf[LogicalJoin].analyzeCondition() - val leftJoinKey = joinInfo.leftKeys.map(i => rexBuilder.makeInputRef(leftInput, i)) - val leftFieldCnt = leftInput.getRowType.getFieldCount - val rightJoinKey = joinInfo.rightKeys.map( - i => { - val leftKeyType = snapshot.getRowType.getFieldList.get(i).getType - rexBuilder.makeInputRef(leftKeyType, leftFieldCnt + i) - }) - if (leftJoinKey.length == 0 || rightJoinKey.length == 0) { - throw new ValidationException( - "Currently the join key in Temporal Table Join " + - "can not be empty.") - } - (leftJoinKey, rightJoinKey) + val relBuilder = call.builder() + relBuilder.push(leftInput) + relBuilder.push(snapshot) + val nonPushedJoin = + relBuilder.join(correlate.getJoinType, joinCondition).build().asInstanceOf[LogicalJoin] + val rewriteJoin = RelOptUtil.pushDownJoinConditions(nonPushedJoin, relBuilder) + val actualJoin = rewriteJoin match { + case _: LogicalJoin => rewriteJoin.asInstanceOf[LogicalJoin] + case _ => rewriteJoin.getInput(0).asInstanceOf[LogicalJoin] } - val snapshotTimeInputRef = extractSnapshotTimeInputRef(leftInput, snapshot) + val (leftJoinKey, rightJoinKey) = extractJoinKeys(actualJoin) + + val snapshotTimeInputRef = extractSnapshotTimeInputRef(actualJoin.getLeft, snapshot) .getOrElse( throw new ValidationException( "Temporal Table Join requires time " + "attribute in the left table, but no time attribute found.")) val temporalCondition = if (isRowTimeTemporalTableJoin(snapshot)) { - val rightTimeInputRef = extractRightEventTimeInputRef(leftInput, snapshot) + val rightTimeInputRef = extractRightEventTimeInputRef(actualJoin.getLeft, actualJoin.getRight) if (rightTimeInputRef.isEmpty || !isRowtimeIndicatorType(rightTimeInputRef.get.getType)) { throw new ValidationException( "Event-Time Temporal Table Join requires both" + @@ -323,15 +306,47 @@ abstract class LogicalCorrelateToJoinFromGeneralTemporalTableRule( } val builder = call.builder() - val condition = builder.and(joinCondition, temporalCondition) - - builder.push(leftInput) - builder.push(snapshot) - builder.join(correlate.getJoinType, condition) - val temporalJoin = builder.build() + val condition = builder.and(actualJoin.getCondition, temporalCondition) + + val joinWithTemporalCondition = actualJoin.copy( + actualJoin.getTraitSet, + condition, + actualJoin.getLeft, + actualJoin.getRight, + actualJoin.getJoinType, + actualJoin.isSemiJoinDone) + + val temporalJoin = if (actualJoin != rewriteJoin) { + rewriteJoin.replaceInput(0, joinWithTemporalCondition) + rewriteJoin + } else { + joinWithTemporalCondition + } call.transformTo(temporalJoin) } + private def extractJoinKeys(actualJoin: LogicalJoin): (Seq[RexNode], Seq[RexNode]) = { + + val joinInfo = actualJoin.analyzeCondition() + val leftInput = actualJoin.getInput(0) + val rightInput = actualJoin.getInput(1) + val rexBuilder = actualJoin.getCluster.getRexBuilder + + val leftJoinKey = joinInfo.leftKeys.map(i => rexBuilder.makeInputRef(leftInput, i)) + val leftFieldCnt = leftInput.getRowType.getFieldCount + val rightJoinKey = joinInfo.rightKeys.map( + i => { + val rightKeyType = rightInput.getRowType.getFieldList.get(i).getType + rexBuilder.makeInputRef(rightKeyType, leftFieldCnt + i) + }) + if (leftJoinKey.isEmpty || rightJoinKey.isEmpty) { + throw new ValidationException( + "Currently the join key in Temporal Table Join " + + "can not be empty.") + } + (leftJoinKey, rightJoinKey) + } + private def isRowTimeTemporalTableJoin(snapshot: LogicalSnapshot): Boolean = snapshot.getPeriod.getType match { case t: TimeIndicatorRelDataType if t.isEventTime => true diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinRestoreTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinRestoreTest.java index 44f48fc24c3..a8584ac951f 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinRestoreTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinRestoreTest.java @@ -35,6 +35,8 @@ public class TemporalJoinRestoreTest extends RestoreTestBase { public List<TableTestProgram> programs() { return Arrays.asList( TemporalJoinTestPrograms.TEMPORAL_JOIN_TABLE_JOIN, + TemporalJoinTestPrograms.TEMPORAL_JOIN_TABLE_JOIN_NESTED_KEY, + TemporalJoinTestPrograms.TEMPORAL_JOIN_TABLE_JOIN_KEY_FROM_MAP, TemporalJoinTestPrograms.TEMPORAL_JOIN_TEMPORAL_FUNCTION); } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinTestPrograms.java index 883d628fbd9..ed83169c74a 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinTestPrograms.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinTestPrograms.java @@ -23,6 +23,9 @@ import org.apache.flink.table.test.program.SourceTestStep; import org.apache.flink.table.test.program.TableTestProgram; import org.apache.flink.types.Row; +import java.util.HashMap; +import java.util.Map; + import static org.apache.flink.table.api.Expressions.$; /** {@link TableTestProgram} definitions for testing {@link StreamExecTemporalJoin}. */ @@ -45,6 +48,49 @@ public class TemporalJoinTestPrograms { Row.of(1L, "USD", "2020-10-10 00:00:58")) .build(); + static final SourceTestStep ORDERS_WITH_NESTED_ID = + SourceTestStep.newBuilder("OrdersNestedId") + .addSchema( + "amount bigint", + "nested_row ROW<currency STRING>", + "nested_map MAP<STRING NOT NULL, STRING>", + "order_time STRING", + "rowtime as TO_TIMESTAMP(order_time) ", + "WATERMARK FOR rowtime AS rowtime") + .producedBeforeRestore( + Row.of( + 2L, + Row.of("Euro"), + mapOf("currency", "Euro"), + "2020-10-10 00:00:42"), + Row.of( + 1L, + Row.of("usd"), + mapOf("currency", "USD"), + "2020-10-10 00:00:43"), + Row.of( + 50L, + Row.of("Yen"), + mapOf("currency", "Yen"), + "2020-10-10 00:00:44"), + Row.of( + 3L, + Row.of("Euro"), + mapOf("currency", "Euro"), + "2020-10-10 00:00:45")) + .producedAfterRestore( + Row.of( + 1L, + Row.of("Euro"), + mapOf("currency", "Euro"), + "2020-10-10 00:00:58"), + Row.of( + 1L, + Row.of("usd"), + mapOf("currency", "USD"), + "2020-10-10 00:00:58")) + .build(); + static final SourceTestStep RATES = SourceTestStep.newBuilder("RatesHistory") .addSchema( @@ -84,6 +130,36 @@ public class TemporalJoinTestPrograms { + "ON o.currency = r.currency ") .build(); + static final TableTestProgram TEMPORAL_JOIN_TABLE_JOIN_NESTED_KEY = + TableTestProgram.of( + "temporal-join-table-join-nested-key", + "validates temporal join with a table when the join keys comes from a nested row") + .setupTableSource(ORDERS_WITH_NESTED_ID) + .setupTableSource(RATES) + .setupTableSink(AMOUNTS) + .runSql( + "INSERT INTO MySink " + + "SELECT amount * r.rate " + + "FROM OrdersNestedId AS o " + + "JOIN RatesHistory FOR SYSTEM_TIME AS OF o.rowtime AS r " + + "ON (case when o.nested_row.currency = 'usd' then upper(o.nested_row.currency) ELSE o.nested_row.currency END) = r.currency ") + .build(); + + static final TableTestProgram TEMPORAL_JOIN_TABLE_JOIN_KEY_FROM_MAP = + TableTestProgram.of( + "temporal-join-table-join-key-from-map", + "validates temporal join with a table when the join key comes from a map value") + .setupTableSource(ORDERS_WITH_NESTED_ID) + .setupTableSource(RATES) + .setupTableSink(AMOUNTS) + .runSql( + "INSERT INTO MySink " + + "SELECT amount * r.rate " + + "FROM OrdersNestedId AS o " + + "JOIN RatesHistory FOR SYSTEM_TIME AS OF o.rowtime AS r " + + "ON o.nested_map['currency'] = r.currency ") + .build(); + static final TableTestProgram TEMPORAL_JOIN_TEMPORAL_FUNCTION = TableTestProgram.of( "temporal-join-temporal-function", @@ -100,4 +176,10 @@ public class TemporalJoinTestPrograms { + "LATERAL TABLE (Rates(o.rowtime)) AS r " + "WHERE o.currency = r.currency ") .build(); + + private static Map<String, String> mapOf(String key, String value) { + final HashMap<String, String> map = new HashMap<>(); + map.put(key, value); + return map; + } } diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-join_1/temporal-join-table-join-key-from-map/plan/temporal-join-table-join-key-from-map.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-join_1/temporal-join-table-join-key-from-map/plan/temporal-join-table-join-key-from-map.json new file mode 100644 index 00000000000..130b8af8d4c --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-join_1/temporal-join-table-join-key-from-map/plan/temporal-join-table-join-key-from-map.json @@ -0,0 +1,569 @@ +{ + "flinkVersion" : "1.19", + "nodes" : [ { + "id" : 24, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`OrdersNestedId`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "amount", + "dataType" : "BIGINT" + }, { + "name" : "nested_row", + "dataType" : "ROW<`currency` VARCHAR(2147483647)>" + }, { + "name" : "nested_map", + "dataType" : "MAP<VARCHAR(2147483647) NOT NULL, VARCHAR(2147483647)>" + }, { + "name" : "order_time", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "rowtime", + "kind" : "COMPUTED", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "internalName" : "$TO_TIMESTAMP$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : "VARCHAR(2147483647)" + } ], + "type" : "TIMESTAMP(3)" + }, + "serializableString" : "TO_TIMESTAMP(`order_time`)" + } + } ], + "watermarkSpecs" : [ { + "rowtimeAttribute" : "rowtime", + "expression" : { + "rexNode" : { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : "TIMESTAMP(3)" + }, + "serializableString" : "`rowtime`" + } + } ] + }, + "partitionKeys" : [ ] + } + }, + "abilities" : [ { + "type" : "ProjectPushDown", + "projectedFields" : [ [ 0 ], [ 3 ], [ 2 ] ], + "producedType" : "ROW<`amount` BIGINT, `order_time` VARCHAR(2147483647), `nested_map` MAP<VARCHAR(2147483647) NOT NULL, VARCHAR(2147483647)>> NOT NULL" + }, { + "type" : "ReadingMetadata", + "metadataKeys" : [ ], + "producedType" : "ROW<`amount` BIGINT, `order_time` VARCHAR(2147483647), `nested_map` MAP<VARCHAR(2147483647) NOT NULL, VARCHAR(2147483647)>> NOT NULL" + } ] + }, + "outputType" : "ROW<`amount` BIGINT, `order_time` VARCHAR(2147483647), `nested_map` MAP<VARCHAR(2147483647) NOT NULL, VARCHAR(2147483647)>>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, OrdersNestedId, project=[amount, order_time, nested_map], metadata=[]]], fields=[amount, order_time, nested_map])", + "inputProperties" : [ ] + }, { + "id" : 25, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "BIGINT" + }, { + "kind" : "CALL", + "internalName" : "$TO_TIMESTAMP$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "VARCHAR(2147483647)" + } ], + "type" : "TIMESTAMP(3)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "MAP<VARCHAR(2147483647) NOT NULL, VARCHAR(2147483647)>" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`amount` BIGINT, `rowtime` TIMESTAMP(3), `nested_map` MAP<VARCHAR(2147483647) NOT NULL, VARCHAR(2147483647)>>", + "description" : "Calc(select=[amount, TO_TIMESTAMP(order_time) AS rowtime, nested_map])" + }, { + "id" : 26, + "type" : "stream-exec-watermark-assigner_1", + "watermarkExpr" : { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "TIMESTAMP(3)" + }, + "rowtimeFieldIndex" : 1, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "amount", + "fieldType" : "BIGINT" + }, { + "name" : "rowtime", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "name" : "nested_map", + "fieldType" : "MAP<VARCHAR(2147483647) NOT NULL, VARCHAR(2147483647)>" + } ] + }, + "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])" + }, { + "id" : 27, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "BIGINT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$ITEM$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "MAP<VARCHAR(2147483647) NOT NULL, VARCHAR(2147483647)>" + }, { + "kind" : "LITERAL", + "value" : "currency", + "type" : "CHAR(8) NOT NULL" + } ], + "type" : "VARCHAR(2147483647)" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "amount", + "fieldType" : "BIGINT" + }, { + "name" : "rowtime", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "name" : "$f5", + "fieldType" : "VARCHAR(2147483647)" + } ] + }, + "description" : "Calc(select=[amount, rowtime, ITEM(nested_map, 'currency') AS $f5])" + }, { + "id" : 28, + "type" : "stream-exec-exchange_1", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "HASH", + "keys" : [ 2 ] + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "amount", + "fieldType" : "BIGINT" + }, { + "name" : "rowtime", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "name" : "$f5", + "fieldType" : "VARCHAR(2147483647)" + } ] + }, + "description" : "Exchange(distribution=[hash[$f5]])" + }, { + "id" : 29, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`RatesHistory`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "currency", + "dataType" : "VARCHAR(2147483647) NOT NULL" + }, { + "name" : "rate", + "dataType" : "BIGINT" + }, { + "name" : "rate_time", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "rowtime", + "kind" : "COMPUTED", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "internalName" : "$TO_TIMESTAMP$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "VARCHAR(2147483647)" + } ], + "type" : "TIMESTAMP(3)" + }, + "serializableString" : "TO_TIMESTAMP(`rate_time`)" + } + } ], + "watermarkSpecs" : [ { + "rowtimeAttribute" : "rowtime", + "expression" : { + "rexNode" : { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : "TIMESTAMP(3)" + }, + "serializableString" : "`rowtime`" + } + } ], + "primaryKey" : { + "name" : "PK_currency", + "type" : "PRIMARY_KEY", + "columns" : [ "currency" ] + } + }, + "partitionKeys" : [ ] + } + } + }, + "outputType" : "ROW<`currency` VARCHAR(2147483647) NOT NULL, `rate` BIGINT, `rate_time` VARCHAR(2147483647)>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, RatesHistory]], fields=[currency, rate, rate_time])", + "inputProperties" : [ ] + }, { + "id" : 30, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647) NOT NULL" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "BIGINT" + }, { + "kind" : "CALL", + "internalName" : "$TO_TIMESTAMP$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "VARCHAR(2147483647)" + } ], + "type" : "TIMESTAMP(3)" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`currency` VARCHAR(2147483647) NOT NULL, `rate` BIGINT, `rowtime` TIMESTAMP(3)>", + "description" : "Calc(select=[currency, rate, TO_TIMESTAMP(rate_time) AS rowtime])" + }, { + "id" : 31, + "type" : "stream-exec-watermark-assigner_1", + "watermarkExpr" : { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "TIMESTAMP(3)" + }, + "rowtimeFieldIndex" : 2, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "currency", + "fieldType" : "VARCHAR(2147483647) NOT NULL" + }, { + "name" : "rate", + "fieldType" : "BIGINT" + }, { + "name" : "rowtime", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])" + }, { + "id" : 32, + "type" : "stream-exec-exchange_1", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "HASH", + "keys" : [ 0 ] + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "currency", + "fieldType" : "VARCHAR(2147483647) NOT NULL" + }, { + "name" : "rate", + "fieldType" : "BIGINT" + }, { + "name" : "rowtime", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "Exchange(distribution=[hash[currency]])" + }, { + "id" : 33, + "type" : "stream-exec-temporal-join_1", + "joinSpec" : { + "joinType" : "INNER", + "leftKeys" : [ 2 ], + "rightKeys" : [ 0 ], + "filterNulls" : [ true ], + "nonEquiCondition" : null + }, + "isTemporalFunctionJoin" : false, + "leftTimeAttributeIndex" : 1, + "rightTimeAttributeIndex" : 2, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + }, { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "amount", + "fieldType" : "BIGINT" + }, { + "name" : "rowtime", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "name" : "$f5", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "currency", + "fieldType" : "VARCHAR(2147483647) NOT NULL" + }, { + "name" : "rate", + "fieldType" : "BIGINT" + }, { + "name" : "rowtime0", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "TemporalJoin(joinType=[InnerJoin], where=[(($f5 = currency) AND __TEMPORAL_JOIN_CONDITION(rowtime, rowtime0, __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(currency), __TEMPORAL_JOIN_LEFT_KEY($f5), __TEMPORAL_JOIN_RIGHT_KEY(currency)))], select=[amount, rowtime, $f5, currency, rate, rowtime0])" + }, { + "id" : 34, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "CALL", + "syntax" : "BINARY", + "internalName" : "$*$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "BIGINT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : "BIGINT" + } ], + "type" : "BIGINT" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`EXPR$0` BIGINT>", + "description" : "Calc(select=[(amount * rate) AS EXPR$0])" + }, { + "id" : 35, + "type" : "stream-exec-sink_1", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`MySink`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "amount", + "dataType" : "BIGINT" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "inputChangelogMode" : [ "INSERT" ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`EXPR$0` BIGINT>", + "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[EXPR$0])" + } ], + "edges" : [ { + "source" : 24, + "target" : 25, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 25, + "target" : 26, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 26, + "target" : 27, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 27, + "target" : 28, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 29, + "target" : 30, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 30, + "target" : 31, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 31, + "target" : 32, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 28, + "target" : 33, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 32, + "target" : 33, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 33, + "target" : 34, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 34, + "target" : 35, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-join_1/temporal-join-table-join-key-from-map/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-join_1/temporal-join-table-join-key-from-map/savepoint/_metadata new file mode 100644 index 00000000000..ac6d04137e8 Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-join_1/temporal-join-table-join-key-from-map/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-join_1/temporal-join-table-join-nested-key/plan/temporal-join-table-join-nested-key.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-join_1/temporal-join-table-join-nested-key/plan/temporal-join-table-join-nested-key.json new file mode 100644 index 00000000000..679b3835273 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-join_1/temporal-join-table-join-nested-key/plan/temporal-join-table-join-nested-key.json @@ -0,0 +1,600 @@ +{ + "flinkVersion" : "1.19", + "nodes" : [ { + "id" : 12, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`OrdersNestedId`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "amount", + "dataType" : "BIGINT" + }, { + "name" : "nested_row", + "dataType" : "ROW<`currency` VARCHAR(2147483647)>" + }, { + "name" : "nested_map", + "dataType" : "MAP<VARCHAR(2147483647) NOT NULL, VARCHAR(2147483647)>" + }, { + "name" : "order_time", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "rowtime", + "kind" : "COMPUTED", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "internalName" : "$TO_TIMESTAMP$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : "VARCHAR(2147483647)" + } ], + "type" : "TIMESTAMP(3)" + }, + "serializableString" : "TO_TIMESTAMP(`order_time`)" + } + } ], + "watermarkSpecs" : [ { + "rowtimeAttribute" : "rowtime", + "expression" : { + "rexNode" : { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : "TIMESTAMP(3)" + }, + "serializableString" : "`rowtime`" + } + } ] + }, + "partitionKeys" : [ ] + } + }, + "abilities" : [ { + "type" : "ProjectPushDown", + "projectedFields" : [ [ 0 ], [ 3 ], [ 1 ] ], + "producedType" : "ROW<`amount` BIGINT, `order_time` VARCHAR(2147483647), `nested_row` ROW<`currency` VARCHAR(2147483647)>> NOT NULL" + }, { + "type" : "ReadingMetadata", + "metadataKeys" : [ ], + "producedType" : "ROW<`amount` BIGINT, `order_time` VARCHAR(2147483647), `nested_row` ROW<`currency` VARCHAR(2147483647)>> NOT NULL" + } ] + }, + "outputType" : "ROW<`amount` BIGINT, `order_time` VARCHAR(2147483647), `nested_row` ROW<`currency` VARCHAR(2147483647)>>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, OrdersNestedId, project=[amount, order_time, nested_row], metadata=[]]], fields=[amount, order_time, nested_row])", + "inputProperties" : [ ] + }, { + "id" : 13, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "BIGINT" + }, { + "kind" : "CALL", + "internalName" : "$TO_TIMESTAMP$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "VARCHAR(2147483647)" + } ], + "type" : "TIMESTAMP(3)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "ROW<`currency` VARCHAR(2147483647)>" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`amount` BIGINT, `rowtime` TIMESTAMP(3), `nested_row` ROW<`currency` VARCHAR(2147483647)>>", + "description" : "Calc(select=[amount, TO_TIMESTAMP(order_time) AS rowtime, nested_row])" + }, { + "id" : 14, + "type" : "stream-exec-watermark-assigner_1", + "watermarkExpr" : { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "TIMESTAMP(3)" + }, + "rowtimeFieldIndex" : 1, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "amount", + "fieldType" : "BIGINT" + }, { + "name" : "rowtime", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "name" : "nested_row", + "fieldType" : "ROW<`currency` VARCHAR(2147483647)>" + } ] + }, + "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])" + }, { + "id" : 15, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "BIGINT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$CASE$1", + "operands" : [ { + "kind" : "CALL", + "syntax" : "BINARY", + "internalName" : "$=$1", + "operands" : [ { + "kind" : "FIELD_ACCESS", + "name" : "currency", + "expr" : { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "ROW<`currency` VARCHAR(2147483647)>" + } + }, { + "kind" : "LITERAL", + "value" : "usd", + "type" : "VARCHAR(2147483647) NOT NULL" + } ], + "type" : "BOOLEAN" + }, { + "kind" : "CALL", + "internalName" : "$UPPER$1", + "operands" : [ { + "kind" : "FIELD_ACCESS", + "name" : "currency", + "expr" : { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "ROW<`currency` VARCHAR(2147483647)>" + } + } ], + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "FIELD_ACCESS", + "name" : "currency", + "expr" : { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "ROW<`currency` VARCHAR(2147483647)>" + } + } ], + "type" : "VARCHAR(2147483647)" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "amount", + "fieldType" : "BIGINT" + }, { + "name" : "rowtime", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "name" : "$f5", + "fieldType" : "VARCHAR(2147483647)" + } ] + }, + "description" : "Calc(select=[amount, rowtime, CASE((nested_row.currency = 'usd'), UPPER(nested_row.currency), nested_row.currency) AS $f5])" + }, { + "id" : 16, + "type" : "stream-exec-exchange_1", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "HASH", + "keys" : [ 2 ] + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "amount", + "fieldType" : "BIGINT" + }, { + "name" : "rowtime", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "name" : "$f5", + "fieldType" : "VARCHAR(2147483647)" + } ] + }, + "description" : "Exchange(distribution=[hash[$f5]])" + }, { + "id" : 17, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`RatesHistory`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "currency", + "dataType" : "VARCHAR(2147483647) NOT NULL" + }, { + "name" : "rate", + "dataType" : "BIGINT" + }, { + "name" : "rate_time", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "rowtime", + "kind" : "COMPUTED", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "internalName" : "$TO_TIMESTAMP$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "VARCHAR(2147483647)" + } ], + "type" : "TIMESTAMP(3)" + }, + "serializableString" : "TO_TIMESTAMP(`rate_time`)" + } + } ], + "watermarkSpecs" : [ { + "rowtimeAttribute" : "rowtime", + "expression" : { + "rexNode" : { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : "TIMESTAMP(3)" + }, + "serializableString" : "`rowtime`" + } + } ], + "primaryKey" : { + "name" : "PK_currency", + "type" : "PRIMARY_KEY", + "columns" : [ "currency" ] + } + }, + "partitionKeys" : [ ] + } + } + }, + "outputType" : "ROW<`currency` VARCHAR(2147483647) NOT NULL, `rate` BIGINT, `rate_time` VARCHAR(2147483647)>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, RatesHistory]], fields=[currency, rate, rate_time])", + "inputProperties" : [ ] + }, { + "id" : 18, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647) NOT NULL" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "BIGINT" + }, { + "kind" : "CALL", + "internalName" : "$TO_TIMESTAMP$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "VARCHAR(2147483647)" + } ], + "type" : "TIMESTAMP(3)" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`currency` VARCHAR(2147483647) NOT NULL, `rate` BIGINT, `rowtime` TIMESTAMP(3)>", + "description" : "Calc(select=[currency, rate, TO_TIMESTAMP(rate_time) AS rowtime])" + }, { + "id" : 19, + "type" : "stream-exec-watermark-assigner_1", + "watermarkExpr" : { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "TIMESTAMP(3)" + }, + "rowtimeFieldIndex" : 2, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "currency", + "fieldType" : "VARCHAR(2147483647) NOT NULL" + }, { + "name" : "rate", + "fieldType" : "BIGINT" + }, { + "name" : "rowtime", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])" + }, { + "id" : 20, + "type" : "stream-exec-exchange_1", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "HASH", + "keys" : [ 0 ] + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "currency", + "fieldType" : "VARCHAR(2147483647) NOT NULL" + }, { + "name" : "rate", + "fieldType" : "BIGINT" + }, { + "name" : "rowtime", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "Exchange(distribution=[hash[currency]])" + }, { + "id" : 21, + "type" : "stream-exec-temporal-join_1", + "joinSpec" : { + "joinType" : "INNER", + "leftKeys" : [ 2 ], + "rightKeys" : [ 0 ], + "filterNulls" : [ true ], + "nonEquiCondition" : null + }, + "isTemporalFunctionJoin" : false, + "leftTimeAttributeIndex" : 1, + "rightTimeAttributeIndex" : 2, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + }, { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "amount", + "fieldType" : "BIGINT" + }, { + "name" : "rowtime", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "name" : "$f5", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "currency", + "fieldType" : "VARCHAR(2147483647) NOT NULL" + }, { + "name" : "rate", + "fieldType" : "BIGINT" + }, { + "name" : "rowtime0", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "TemporalJoin(joinType=[InnerJoin], where=[(($f5 = currency) AND __TEMPORAL_JOIN_CONDITION(rowtime, rowtime0, __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(currency), __TEMPORAL_JOIN_LEFT_KEY($f5), __TEMPORAL_JOIN_RIGHT_KEY(currency)))], select=[amount, rowtime, $f5, currency, rate, rowtime0])" + }, { + "id" : 22, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "CALL", + "syntax" : "BINARY", + "internalName" : "$*$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "BIGINT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : "BIGINT" + } ], + "type" : "BIGINT" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`EXPR$0` BIGINT>", + "description" : "Calc(select=[(amount * rate) AS EXPR$0])" + }, { + "id" : 23, + "type" : "stream-exec-sink_1", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`MySink`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "amount", + "dataType" : "BIGINT" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "inputChangelogMode" : [ "INSERT" ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`EXPR$0` BIGINT>", + "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[EXPR$0])" + } ], + "edges" : [ { + "source" : 12, + "target" : 13, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 13, + "target" : 14, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 14, + "target" : 15, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 15, + "target" : 16, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 17, + "target" : 18, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 18, + "target" : 19, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 19, + "target" : 20, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 16, + "target" : 21, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 20, + "target" : 21, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 21, + "target" : 22, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 22, + "target" : 23, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-join_1/temporal-join-table-join-nested-key/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-join_1/temporal-join-table-join-nested-key/savepoint/_metadata new file mode 100644 index 00000000000..c70770ac5db Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-join_1/temporal-join-table-join-nested-key/savepoint/_metadata differ