Re: [PR] [FLINK-24239] Event time temporal join should support values from array, map, row, etc. as join key [flink]
dawidwys merged PR #24253: URL: https://github.com/apache/flink/pull/24253 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-24239] Event time temporal join should support values from array, map, row, etc. as join key [flink]
snuyanzin commented on PR #24253: URL: https://github.com/apache/flink/pull/24253#issuecomment-1934213444 yes, also played locally , seems working, thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-24239] Event time temporal join should support values from array, map, row, etc. as join key [flink]
dawidwys commented on code in PR #24253: URL: https://github.com/apache/flink/pull/24253#discussion_r1482693914 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinTestPrograms.java: ## @@ -84,6 +130,36 @@ public class TemporalJoinTestPrograms { + "ON o.currency = r.currency ") .build(); +static final TableTestProgram TEMPORAL_JOIN_TABLE_JOIN_NESTED_KEY = +TableTestProgram.of( +"temporal-join-table-join-nested-key", +"validates temporal join with a table when the join keys comes from a nested row") +.setupTableSource(ORDERS_WITH_NESTED_ID) +.setupTableSource(RATES) +.setupTableSink(AMOUNTS) +.runSql( +"INSERT INTO MySink " ++ "SELECT amount * r.rate " ++ "FROM OrdersNestedId AS o " ++ "JOIN RatesHistory FOR SYSTEM_TIME AS OF o.rowtime AS r " ++ "ON o.nested_row.currency = r.currency ") +.build(); + +static final TableTestProgram TEMPORAL_JOIN_TABLE_JOIN_KEY_FROM_MAP = +TableTestProgram.of( +"temporal-join-table-join-key-from-map", +"validates temporal join with a table when the join key comes from a map value") +.setupTableSource(ORDERS_WITH_NESTED_ID) +.setupTableSource(RATES) +.setupTableSink(AMOUNTS) +.runSql( +"INSERT INTO MySink " ++ "SELECT amount * r.rate " ++ "FROM OrdersNestedId AS o " ++ "JOIN RatesHistory FOR SYSTEM_TIME AS OF o.rowtime AS r " ++ "ON o.nested_map['currency'] = r.currency ") Review Comment: I tried adding a case with a projection on the ride side of the input. The change introduced here works fine, unfortunately the query fails with a different condition unmet. The temporal table join requires the right side of the join (the versioned table) to be a `PRIMARY KEY`. `PRIMARY KEY` can only be defined on a source table, projections remove that constraint. That means the change improves the situation only for the left side of the join. I'll still add a case with a function on the left side. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-24239] Event time temporal join should support values from array, map, row, etc. as join key [flink]
dawidwys commented on PR #24253: URL: https://github.com/apache/flink/pull/24253#issuecomment-1933701764 @snuyanzin There is a test for using an entry from a map: ``` INSERT INTO MySink SELECT amount * r.rate FROM OrdersNestedId AS o JOIN RatesHistory FOR SYSTEM_TIME AS OF o.rowtime AS r ON o.nested_map['currency'] = r.currency ``` Additional levels of nesting don't change anything imo. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-24239] Event time temporal join should support values from array, map, row, etc. as join key [flink]
snuyanzin commented on PR #24253: URL: https://github.com/apache/flink/pull/24253#issuecomment-1933613412 Out of curiocity: I didn't find tests for nested arrays/maps/row... Are they out of scope? I mean e.g. this ```sql CREATE TABLE A ( a MAP>, ts TIMESTAMP(3), WATERMARK FOR ts AS ts ) WITH ( 'connector' = 'values' ); CREATE TABLE B ( id INT, ts TIMESTAMP(3), WATERMARK FOR ts AS ts ) WITH ( 'connector' = 'values' ); ``` and ```sql SELECT * FROM A LEFT JOIN B FOR SYSTEM_TIME AS OF A.ts AS b ON A.a['ID']['ID_NESTED'] = id ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-24239] Event time temporal join should support values from array, map, row, etc. as join key [flink]
dawidwys commented on code in PR #24253: URL: https://github.com/apache/flink/pull/24253#discussion_r1481739305 ## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala: ## @@ -323,15 +306,47 @@ abstract class LogicalCorrelateToJoinFromGeneralTemporalTableRule( } val builder = call.builder() -val condition = builder.and(joinCondition, temporalCondition) - -builder.push(leftInput) -builder.push(snapshot) -builder.join(correlate.getJoinType, condition) -val temporalJoin = builder.build() +val condition = builder.and(actualJoin.getCondition, temporalCondition) + +val joinWithTemporalCondition = actualJoin.copy( + actualJoin.getTraitSet, + condition, + actualJoin.getLeft, + actualJoin.getRight, + actualJoin.getJoinType, + actualJoin.isSemiJoinDone) + +val temporalJoin = if (actualJoin != rewriteJoin) { + rewriteJoin.replaceInput(0, joinWithTemporalCondition) Review Comment: I admit I had to check this again, but we add the projection if we pushed anything down. If we don't we don't have the extra projection. We have cases for that (basically all tests before this PR). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-24239] Event time temporal join should support values from array, map, row, etc. as join key [flink]
jnh5y commented on code in PR #24253: URL: https://github.com/apache/flink/pull/24253#discussion_r1481698481 ## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala: ## @@ -323,15 +306,47 @@ abstract class LogicalCorrelateToJoinFromGeneralTemporalTableRule( } val builder = call.builder() -val condition = builder.and(joinCondition, temporalCondition) - -builder.push(leftInput) -builder.push(snapshot) -builder.join(correlate.getJoinType, condition) -val temporalJoin = builder.build() +val condition = builder.and(actualJoin.getCondition, temporalCondition) + +val joinWithTemporalCondition = actualJoin.copy( + actualJoin.getTraitSet, + condition, + actualJoin.getLeft, + actualJoin.getRight, + actualJoin.getJoinType, + actualJoin.isSemiJoinDone) + +val temporalJoin = if (actualJoin != rewriteJoin) { + rewriteJoin.replaceInput(0, joinWithTemporalCondition) Review Comment: Do we have a test cases for both when the `actualJoin` is equal and not equal to the `rewriteJoin`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-24239] Event time temporal join should support values from array, map, row, etc. as join key [flink]
dawidwys commented on code in PR #24253: URL: https://github.com/apache/flink/pull/24253#discussion_r1481687197 ## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala: ## @@ -323,15 +306,47 @@ abstract class LogicalCorrelateToJoinFromGeneralTemporalTableRule( } val builder = call.builder() -val condition = builder.and(joinCondition, temporalCondition) - -builder.push(leftInput) -builder.push(snapshot) -builder.join(correlate.getJoinType, condition) -val temporalJoin = builder.build() +val condition = builder.and(actualJoin.getCondition, temporalCondition) + +val joinWithTemporalCondition = actualJoin.copy( + actualJoin.getTraitSet, + condition, + actualJoin.getLeft, + actualJoin.getRight, + actualJoin.getJoinType, + actualJoin.isSemiJoinDone) + +val temporalJoin = if (actualJoin != rewriteJoin) { + rewriteJoin.replaceInput(0, joinWithTemporalCondition) Review Comment: What test cases do you have in mind? As far as I can tell we don't have a pushdown through a correlate. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-24239] Event time temporal join should support values from array, map, row, etc. as join key [flink]
dawidwys commented on code in PR #24253: URL: https://github.com/apache/flink/pull/24253#discussion_r1481689031 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinTestPrograms.java: ## @@ -84,6 +130,36 @@ public class TemporalJoinTestPrograms { + "ON o.currency = r.currency ") .build(); +static final TableTestProgram TEMPORAL_JOIN_TABLE_JOIN_NESTED_KEY = +TableTestProgram.of( +"temporal-join-table-join-nested-key", +"validates temporal join with a table when the join keys comes from a nested row") +.setupTableSource(ORDERS_WITH_NESTED_ID) +.setupTableSource(RATES) +.setupTableSink(AMOUNTS) +.runSql( +"INSERT INTO MySink " ++ "SELECT amount * r.rate " ++ "FROM OrdersNestedId AS o " ++ "JOIN RatesHistory FOR SYSTEM_TIME AS OF o.rowtime AS r " ++ "ON o.nested_row.currency = r.currency ") +.build(); + +static final TableTestProgram TEMPORAL_JOIN_TABLE_JOIN_KEY_FROM_MAP = +TableTestProgram.of( +"temporal-join-table-join-key-from-map", +"validates temporal join with a table when the join key comes from a map value") +.setupTableSource(ORDERS_WITH_NESTED_ID) +.setupTableSource(RATES) +.setupTableSink(AMOUNTS) +.runSql( +"INSERT INTO MySink " ++ "SELECT amount * r.rate " ++ "FROM OrdersNestedId AS o " ++ "JOIN RatesHistory FOR SYSTEM_TIME AS OF o.rowtime AS r " ++ "ON o.nested_map['currency'] = r.currency ") Review Comment: I'll add those cases. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-24239] Event time temporal join should support values from array, map, row, etc. as join key [flink]
dawidwys commented on code in PR #24253: URL: https://github.com/apache/flink/pull/24253#discussion_r1481688174 ## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala: ## @@ -237,57 +245,32 @@ abstract class LogicalCorrelateToJoinFromGeneralTemporalTableRule( val snapshot = getLogicalSnapshot(call) val leftRowType = leftInput.getRowType -val joinCondition = filterCondition.accept(new RexShuttle() { - // change correlate variable expression to normal RexInputRef (which is from left side) - override def visitFieldAccess(fieldAccess: RexFieldAccess): RexNode = { -fieldAccess.getReferenceExpr match { - case corVar: RexCorrelVariable => -require(correlate.getCorrelationId.equals(corVar.id)) -val index = leftRowType.getFieldList.indexOf(fieldAccess.getField) -RexInputRef.of(index, leftRowType) - case _ => super.visitFieldAccess(fieldAccess) -} - } - - // update the field index from right side - override def visitInputRef(inputRef: RexInputRef): RexNode = { -val rightIndex = leftRowType.getFieldCount + inputRef.getIndex -new RexInputRef(rightIndex, inputRef.getType) - } -}) +val joinCondition = decorrelate(filterCondition, leftRowType, correlate.getCorrelationId) validateSnapshotInCorrelate(snapshot, correlate) val rexBuilder = correlate.getCluster.getRexBuilder -val (leftJoinKey, rightJoinKey) = { - val relBuilder = call.builder() - relBuilder.push(leftInput) - relBuilder.push(snapshot) - val rewriteJoin = relBuilder.join(correlate.getJoinType, joinCondition).build() - val joinInfo = rewriteJoin.asInstanceOf[LogicalJoin].analyzeCondition() - val leftJoinKey = joinInfo.leftKeys.map(i => rexBuilder.makeInputRef(leftInput, i)) - val leftFieldCnt = leftInput.getRowType.getFieldCount - val rightJoinKey = joinInfo.rightKeys.map( -i => { - val leftKeyType = snapshot.getRowType.getFieldList.get(i).getType - rexBuilder.makeInputRef(leftKeyType, leftFieldCnt + i) -}) - if (leftJoinKey.length == 0 || rightJoinKey.length == 0) { -throw new ValidationException( - "Currently the join key in Temporal Table Join " + -"can not be empty.") - } - (leftJoinKey, rightJoinKey) +val relBuilder = call.builder() +relBuilder.push(leftInput) +relBuilder.push(snapshot) +val nonPushedJoin = + relBuilder.join(correlate.getJoinType, joinCondition).build().asInstanceOf[LogicalJoin] +val rewriteJoin = RelOptUtil.pushDownJoinConditions(nonPushedJoin, relBuilder) Review Comment: If they cannot they won't. If that results in that we won't have an equi join condition, this will fail as before. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-24239] Event time temporal join should support values from array, map, row, etc. as join key [flink]
dawidwys commented on code in PR #24253: URL: https://github.com/apache/flink/pull/24253#discussion_r1481688174 ## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala: ## @@ -237,57 +245,32 @@ abstract class LogicalCorrelateToJoinFromGeneralTemporalTableRule( val snapshot = getLogicalSnapshot(call) val leftRowType = leftInput.getRowType -val joinCondition = filterCondition.accept(new RexShuttle() { - // change correlate variable expression to normal RexInputRef (which is from left side) - override def visitFieldAccess(fieldAccess: RexFieldAccess): RexNode = { -fieldAccess.getReferenceExpr match { - case corVar: RexCorrelVariable => -require(correlate.getCorrelationId.equals(corVar.id)) -val index = leftRowType.getFieldList.indexOf(fieldAccess.getField) -RexInputRef.of(index, leftRowType) - case _ => super.visitFieldAccess(fieldAccess) -} - } - - // update the field index from right side - override def visitInputRef(inputRef: RexInputRef): RexNode = { -val rightIndex = leftRowType.getFieldCount + inputRef.getIndex -new RexInputRef(rightIndex, inputRef.getType) - } -}) +val joinCondition = decorrelate(filterCondition, leftRowType, correlate.getCorrelationId) validateSnapshotInCorrelate(snapshot, correlate) val rexBuilder = correlate.getCluster.getRexBuilder -val (leftJoinKey, rightJoinKey) = { - val relBuilder = call.builder() - relBuilder.push(leftInput) - relBuilder.push(snapshot) - val rewriteJoin = relBuilder.join(correlate.getJoinType, joinCondition).build() - val joinInfo = rewriteJoin.asInstanceOf[LogicalJoin].analyzeCondition() - val leftJoinKey = joinInfo.leftKeys.map(i => rexBuilder.makeInputRef(leftInput, i)) - val leftFieldCnt = leftInput.getRowType.getFieldCount - val rightJoinKey = joinInfo.rightKeys.map( -i => { - val leftKeyType = snapshot.getRowType.getFieldList.get(i).getType - rexBuilder.makeInputRef(leftKeyType, leftFieldCnt + i) -}) - if (leftJoinKey.length == 0 || rightJoinKey.length == 0) { -throw new ValidationException( - "Currently the join key in Temporal Table Join " + -"can not be empty.") - } - (leftJoinKey, rightJoinKey) +val relBuilder = call.builder() +relBuilder.push(leftInput) +relBuilder.push(snapshot) +val nonPushedJoin = + relBuilder.join(correlate.getJoinType, joinCondition).build().asInstanceOf[LogicalJoin] +val rewriteJoin = RelOptUtil.pushDownJoinConditions(nonPushedJoin, relBuilder) Review Comment: If they cannot they won't. If that results we won't have equi join condition, this will fail as before. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-24239] Event time temporal join should support values from array, map, row, etc. as join key [flink]
dawidwys commented on code in PR #24253: URL: https://github.com/apache/flink/pull/24253#discussion_r1481687197 ## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala: ## @@ -323,15 +306,47 @@ abstract class LogicalCorrelateToJoinFromGeneralTemporalTableRule( } val builder = call.builder() -val condition = builder.and(joinCondition, temporalCondition) - -builder.push(leftInput) -builder.push(snapshot) -builder.join(correlate.getJoinType, condition) -val temporalJoin = builder.build() +val condition = builder.and(actualJoin.getCondition, temporalCondition) + +val joinWithTemporalCondition = actualJoin.copy( + actualJoin.getTraitSet, + condition, + actualJoin.getLeft, + actualJoin.getRight, + actualJoin.getJoinType, + actualJoin.isSemiJoinDone) + +val temporalJoin = if (actualJoin != rewriteJoin) { + rewriteJoin.replaceInput(0, joinWithTemporalCondition) Review Comment: What cases do you have in mind? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-24239] Event time temporal join should support values from array, map, row, etc. as join key [flink]
jnh5y commented on code in PR #24253: URL: https://github.com/apache/flink/pull/24253#discussion_r1481681792 ## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala: ## @@ -237,57 +245,32 @@ abstract class LogicalCorrelateToJoinFromGeneralTemporalTableRule( val snapshot = getLogicalSnapshot(call) val leftRowType = leftInput.getRowType -val joinCondition = filterCondition.accept(new RexShuttle() { - // change correlate variable expression to normal RexInputRef (which is from left side) - override def visitFieldAccess(fieldAccess: RexFieldAccess): RexNode = { -fieldAccess.getReferenceExpr match { - case corVar: RexCorrelVariable => -require(correlate.getCorrelationId.equals(corVar.id)) -val index = leftRowType.getFieldList.indexOf(fieldAccess.getField) -RexInputRef.of(index, leftRowType) - case _ => super.visitFieldAccess(fieldAccess) -} - } - - // update the field index from right side - override def visitInputRef(inputRef: RexInputRef): RexNode = { -val rightIndex = leftRowType.getFieldCount + inputRef.getIndex -new RexInputRef(rightIndex, inputRef.getType) - } -}) +val joinCondition = decorrelate(filterCondition, leftRowType, correlate.getCorrelationId) validateSnapshotInCorrelate(snapshot, correlate) val rexBuilder = correlate.getCluster.getRexBuilder -val (leftJoinKey, rightJoinKey) = { - val relBuilder = call.builder() - relBuilder.push(leftInput) - relBuilder.push(snapshot) - val rewriteJoin = relBuilder.join(correlate.getJoinType, joinCondition).build() - val joinInfo = rewriteJoin.asInstanceOf[LogicalJoin].analyzeCondition() - val leftJoinKey = joinInfo.leftKeys.map(i => rexBuilder.makeInputRef(leftInput, i)) - val leftFieldCnt = leftInput.getRowType.getFieldCount - val rightJoinKey = joinInfo.rightKeys.map( -i => { - val leftKeyType = snapshot.getRowType.getFieldList.get(i).getType - rexBuilder.makeInputRef(leftKeyType, leftFieldCnt + i) -}) - if (leftJoinKey.length == 0 || rightJoinKey.length == 0) { -throw new ValidationException( - "Currently the join key in Temporal Table Join " + -"can not be empty.") - } - (leftJoinKey, rightJoinKey) +val relBuilder = call.builder() +relBuilder.push(leftInput) +relBuilder.push(snapshot) +val nonPushedJoin = + relBuilder.join(correlate.getJoinType, joinCondition).build().asInstanceOf[LogicalJoin] +val rewriteJoin = RelOptUtil.pushDownJoinConditions(nonPushedJoin, relBuilder) Review Comment: In what situations can the join conditions not be pushed down? We need some validation for those situations? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-24239] Event time temporal join should support values from array, map, row, etc. as join key [flink]
jnh5y commented on code in PR #24253: URL: https://github.com/apache/flink/pull/24253#discussion_r1481679233 ## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala: ## @@ -323,15 +306,47 @@ abstract class LogicalCorrelateToJoinFromGeneralTemporalTableRule( } val builder = call.builder() -val condition = builder.and(joinCondition, temporalCondition) - -builder.push(leftInput) -builder.push(snapshot) -builder.join(correlate.getJoinType, condition) -val temporalJoin = builder.build() +val condition = builder.and(actualJoin.getCondition, temporalCondition) + +val joinWithTemporalCondition = actualJoin.copy( + actualJoin.getTraitSet, + condition, + actualJoin.getLeft, + actualJoin.getRight, + actualJoin.getJoinType, + actualJoin.isSemiJoinDone) + +val temporalJoin = if (actualJoin != rewriteJoin) { + rewriteJoin.replaceInput(0, joinWithTemporalCondition) Review Comment: Are there test cases which cover adding the projection? Also, does this run into any issues if there are multiple pushdowns? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-24239] Event time temporal join should support values from array, map, row, etc. as join key [flink]
jnh5y commented on code in PR #24253: URL: https://github.com/apache/flink/pull/24253#discussion_r1481662098 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinTestPrograms.java: ## @@ -84,6 +130,36 @@ public class TemporalJoinTestPrograms { + "ON o.currency = r.currency ") .build(); +static final TableTestProgram TEMPORAL_JOIN_TABLE_JOIN_NESTED_KEY = +TableTestProgram.of( +"temporal-join-table-join-nested-key", +"validates temporal join with a table when the join keys comes from a nested row") +.setupTableSource(ORDERS_WITH_NESTED_ID) +.setupTableSource(RATES) +.setupTableSink(AMOUNTS) +.runSql( +"INSERT INTO MySink " ++ "SELECT amount * r.rate " ++ "FROM OrdersNestedId AS o " ++ "JOIN RatesHistory FOR SYSTEM_TIME AS OF o.rowtime AS r " ++ "ON o.nested_row.currency = r.currency ") +.build(); + +static final TableTestProgram TEMPORAL_JOIN_TABLE_JOIN_KEY_FROM_MAP = +TableTestProgram.of( +"temporal-join-table-join-key-from-map", +"validates temporal join with a table when the join key comes from a map value") +.setupTableSource(ORDERS_WITH_NESTED_ID) +.setupTableSource(RATES) +.setupTableSink(AMOUNTS) +.runSql( +"INSERT INTO MySink " ++ "SELECT amount * r.rate " ++ "FROM OrdersNestedId AS o " ++ "JOIN RatesHistory FOR SYSTEM_TIME AS OF o.rowtime AS r " ++ "ON o.nested_map['currency'] = r.currency ") Review Comment: Tests are added for covering what happens with the left-hand side of the join. Do we need to verify anything for the right-hand side and/or both sides? Also, can join on the output of a function call? (E.g., `ON UPPERCASE(o.nested_map['currency']) = r.currency`?) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-24239] Event time temporal join should support values from array, map, row, etc. as join key [flink]
flinkbot commented on PR #24253: URL: https://github.com/apache/flink/pull/24253#issuecomment-1924021657 ## CI report: * e06aee059e610ce72fb153eff7c8f466cde82bfa UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-24239] Event time temporal join should support values from array, map, row, etc. as join key [flink]
dawidwys opened a new pull request, #24253: URL: https://github.com/apache/flink/pull/24253 ## What is the purpose of the change This makes it possible to use more complex keys for temporal join. It tries pushing down join predicates before extracting join keys which makes more cases produce an equi-join condition required for a temporal join. ## Verifying this change Added tests in `TemporalJoinRestoreTest` covering keys from: * a nested row * value from a map ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org