Re: [PR] [FLINK-24239] Event time temporal join should support values from array, map, row, etc. as join key [flink]

2024-02-08 Thread via GitHub


dawidwys merged PR #24253:
URL: https://github.com/apache/flink/pull/24253


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-24239] Event time temporal join should support values from array, map, row, etc. as join key [flink]

2024-02-08 Thread via GitHub


snuyanzin commented on PR #24253:
URL: https://github.com/apache/flink/pull/24253#issuecomment-1934213444

   yes, also played locally , seems working, thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-24239] Event time temporal join should support values from array, map, row, etc. as join key [flink]

2024-02-08 Thread via GitHub


dawidwys commented on code in PR #24253:
URL: https://github.com/apache/flink/pull/24253#discussion_r1482693914


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinTestPrograms.java:
##
@@ -84,6 +130,36 @@ public class TemporalJoinTestPrograms {
 + "ON o.currency = r.currency ")
 .build();
 
+static final TableTestProgram TEMPORAL_JOIN_TABLE_JOIN_NESTED_KEY =
+TableTestProgram.of(
+"temporal-join-table-join-nested-key",
+"validates temporal join with a table when the 
join keys comes from a nested row")
+.setupTableSource(ORDERS_WITH_NESTED_ID)
+.setupTableSource(RATES)
+.setupTableSink(AMOUNTS)
+.runSql(
+"INSERT INTO MySink "
++ "SELECT amount * r.rate "
++ "FROM OrdersNestedId AS o "
++ "JOIN RatesHistory FOR SYSTEM_TIME AS OF 
o.rowtime AS r "
++ "ON o.nested_row.currency = r.currency ")
+.build();
+
+static final TableTestProgram TEMPORAL_JOIN_TABLE_JOIN_KEY_FROM_MAP =
+TableTestProgram.of(
+"temporal-join-table-join-key-from-map",
+"validates temporal join with a table when the 
join key comes from a map value")
+.setupTableSource(ORDERS_WITH_NESTED_ID)
+.setupTableSource(RATES)
+.setupTableSink(AMOUNTS)
+.runSql(
+"INSERT INTO MySink "
++ "SELECT amount * r.rate "
++ "FROM OrdersNestedId AS o "
++ "JOIN RatesHistory FOR SYSTEM_TIME AS OF 
o.rowtime AS r "
++ "ON o.nested_map['currency'] = 
r.currency ")

Review Comment:
   I tried adding a case with a projection on the ride side of the input.
   
   The change introduced here works fine, unfortunately the query fails with a 
different condition unmet. The temporal table join requires the right side of 
the join (the versioned table) to be a `PRIMARY KEY`. `PRIMARY KEY` can only be 
defined on a source table, projections remove that constraint. That means the 
change improves the situation only for the left side of the join.
   
   I'll still add a case with a function on the left side. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-24239] Event time temporal join should support values from array, map, row, etc. as join key [flink]

2024-02-08 Thread via GitHub


dawidwys commented on PR #24253:
URL: https://github.com/apache/flink/pull/24253#issuecomment-1933701764

   @snuyanzin There is a test for using an entry from a map:
   
   ```
   INSERT INTO MySink
   SELECT amount * r.rate
   FROM OrdersNestedId AS o
   JOIN RatesHistory FOR SYSTEM_TIME AS OF o.rowtime AS r 
   ON o.nested_map['currency'] = r.currency
   ```
   
   Additional levels of nesting don't change anything imo.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-24239] Event time temporal join should support values from array, map, row, etc. as join key [flink]

2024-02-08 Thread via GitHub


snuyanzin commented on PR #24253:
URL: https://github.com/apache/flink/pull/24253#issuecomment-1933613412

   Out of curiocity: I didn't find tests for nested arrays/maps/row... Are they 
out of scope?
   
   I mean e.g. this
   ```sql
 CREATE TABLE A (
   a MAP>,
   ts TIMESTAMP(3),
   WATERMARK FOR ts AS ts
 ) WITH (
   'connector' = 'values'
 );
 
 CREATE TABLE B (
   id INT,
   ts TIMESTAMP(3),
   WATERMARK FOR ts AS ts
 ) WITH (
   'connector' = 'values'
 );
   ```
   and 
   ```sql
   SELECT * FROM A LEFT JOIN B FOR SYSTEM_TIME AS OF A.ts AS b ON 
A.a['ID']['ID_NESTED'] = id
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-24239] Event time temporal join should support values from array, map, row, etc. as join key [flink]

2024-02-07 Thread via GitHub


dawidwys commented on code in PR #24253:
URL: https://github.com/apache/flink/pull/24253#discussion_r1481739305


##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala:
##
@@ -323,15 +306,47 @@ abstract class 
LogicalCorrelateToJoinFromGeneralTemporalTableRule(
 }
 
 val builder = call.builder()
-val condition = builder.and(joinCondition, temporalCondition)
-
-builder.push(leftInput)
-builder.push(snapshot)
-builder.join(correlate.getJoinType, condition)
-val temporalJoin = builder.build()
+val condition = builder.and(actualJoin.getCondition, temporalCondition)
+
+val joinWithTemporalCondition = actualJoin.copy(
+  actualJoin.getTraitSet,
+  condition,
+  actualJoin.getLeft,
+  actualJoin.getRight,
+  actualJoin.getJoinType,
+  actualJoin.isSemiJoinDone)
+
+val temporalJoin = if (actualJoin != rewriteJoin) {
+  rewriteJoin.replaceInput(0, joinWithTemporalCondition)

Review Comment:
   I admit I had to check this again, but we add the projection if we pushed 
anything down. If we don't we don't have the extra projection. We have cases 
for that (basically all tests before this PR).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-24239] Event time temporal join should support values from array, map, row, etc. as join key [flink]

2024-02-07 Thread via GitHub


jnh5y commented on code in PR #24253:
URL: https://github.com/apache/flink/pull/24253#discussion_r1481698481


##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala:
##
@@ -323,15 +306,47 @@ abstract class 
LogicalCorrelateToJoinFromGeneralTemporalTableRule(
 }
 
 val builder = call.builder()
-val condition = builder.and(joinCondition, temporalCondition)
-
-builder.push(leftInput)
-builder.push(snapshot)
-builder.join(correlate.getJoinType, condition)
-val temporalJoin = builder.build()
+val condition = builder.and(actualJoin.getCondition, temporalCondition)
+
+val joinWithTemporalCondition = actualJoin.copy(
+  actualJoin.getTraitSet,
+  condition,
+  actualJoin.getLeft,
+  actualJoin.getRight,
+  actualJoin.getJoinType,
+  actualJoin.isSemiJoinDone)
+
+val temporalJoin = if (actualJoin != rewriteJoin) {
+  rewriteJoin.replaceInput(0, joinWithTemporalCondition)

Review Comment:
   Do we have a test cases for both when the `actualJoin` is equal and not 
equal to the `rewriteJoin`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-24239] Event time temporal join should support values from array, map, row, etc. as join key [flink]

2024-02-07 Thread via GitHub


dawidwys commented on code in PR #24253:
URL: https://github.com/apache/flink/pull/24253#discussion_r1481687197


##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala:
##
@@ -323,15 +306,47 @@ abstract class 
LogicalCorrelateToJoinFromGeneralTemporalTableRule(
 }
 
 val builder = call.builder()
-val condition = builder.and(joinCondition, temporalCondition)
-
-builder.push(leftInput)
-builder.push(snapshot)
-builder.join(correlate.getJoinType, condition)
-val temporalJoin = builder.build()
+val condition = builder.and(actualJoin.getCondition, temporalCondition)
+
+val joinWithTemporalCondition = actualJoin.copy(
+  actualJoin.getTraitSet,
+  condition,
+  actualJoin.getLeft,
+  actualJoin.getRight,
+  actualJoin.getJoinType,
+  actualJoin.isSemiJoinDone)
+
+val temporalJoin = if (actualJoin != rewriteJoin) {
+  rewriteJoin.replaceInput(0, joinWithTemporalCondition)

Review Comment:
   What test cases do you have in mind? As far as I can tell we don't have a 
pushdown through a correlate.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-24239] Event time temporal join should support values from array, map, row, etc. as join key [flink]

2024-02-07 Thread via GitHub


dawidwys commented on code in PR #24253:
URL: https://github.com/apache/flink/pull/24253#discussion_r1481689031


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinTestPrograms.java:
##
@@ -84,6 +130,36 @@ public class TemporalJoinTestPrograms {
 + "ON o.currency = r.currency ")
 .build();
 
+static final TableTestProgram TEMPORAL_JOIN_TABLE_JOIN_NESTED_KEY =
+TableTestProgram.of(
+"temporal-join-table-join-nested-key",
+"validates temporal join with a table when the 
join keys comes from a nested row")
+.setupTableSource(ORDERS_WITH_NESTED_ID)
+.setupTableSource(RATES)
+.setupTableSink(AMOUNTS)
+.runSql(
+"INSERT INTO MySink "
++ "SELECT amount * r.rate "
++ "FROM OrdersNestedId AS o "
++ "JOIN RatesHistory FOR SYSTEM_TIME AS OF 
o.rowtime AS r "
++ "ON o.nested_row.currency = r.currency ")
+.build();
+
+static final TableTestProgram TEMPORAL_JOIN_TABLE_JOIN_KEY_FROM_MAP =
+TableTestProgram.of(
+"temporal-join-table-join-key-from-map",
+"validates temporal join with a table when the 
join key comes from a map value")
+.setupTableSource(ORDERS_WITH_NESTED_ID)
+.setupTableSource(RATES)
+.setupTableSink(AMOUNTS)
+.runSql(
+"INSERT INTO MySink "
++ "SELECT amount * r.rate "
++ "FROM OrdersNestedId AS o "
++ "JOIN RatesHistory FOR SYSTEM_TIME AS OF 
o.rowtime AS r "
++ "ON o.nested_map['currency'] = 
r.currency ")

Review Comment:
   I'll add those cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-24239] Event time temporal join should support values from array, map, row, etc. as join key [flink]

2024-02-07 Thread via GitHub


dawidwys commented on code in PR #24253:
URL: https://github.com/apache/flink/pull/24253#discussion_r1481688174


##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala:
##
@@ -237,57 +245,32 @@ abstract class 
LogicalCorrelateToJoinFromGeneralTemporalTableRule(
 val snapshot = getLogicalSnapshot(call)
 
 val leftRowType = leftInput.getRowType
-val joinCondition = filterCondition.accept(new RexShuttle() {
-  // change correlate variable expression to normal RexInputRef (which is 
from left side)
-  override def visitFieldAccess(fieldAccess: RexFieldAccess): RexNode = {
-fieldAccess.getReferenceExpr match {
-  case corVar: RexCorrelVariable =>
-require(correlate.getCorrelationId.equals(corVar.id))
-val index = leftRowType.getFieldList.indexOf(fieldAccess.getField)
-RexInputRef.of(index, leftRowType)
-  case _ => super.visitFieldAccess(fieldAccess)
-}
-  }
-
-  // update the field index from right side
-  override def visitInputRef(inputRef: RexInputRef): RexNode = {
-val rightIndex = leftRowType.getFieldCount + inputRef.getIndex
-new RexInputRef(rightIndex, inputRef.getType)
-  }
-})
+val joinCondition = decorrelate(filterCondition, leftRowType, 
correlate.getCorrelationId)
 
 validateSnapshotInCorrelate(snapshot, correlate)
 
 val rexBuilder = correlate.getCluster.getRexBuilder
-val (leftJoinKey, rightJoinKey) = {
-  val relBuilder = call.builder()
-  relBuilder.push(leftInput)
-  relBuilder.push(snapshot)
-  val rewriteJoin = relBuilder.join(correlate.getJoinType, 
joinCondition).build()
-  val joinInfo = rewriteJoin.asInstanceOf[LogicalJoin].analyzeCondition()
-  val leftJoinKey = joinInfo.leftKeys.map(i => 
rexBuilder.makeInputRef(leftInput, i))
-  val leftFieldCnt = leftInput.getRowType.getFieldCount
-  val rightJoinKey = joinInfo.rightKeys.map(
-i => {
-  val leftKeyType = snapshot.getRowType.getFieldList.get(i).getType
-  rexBuilder.makeInputRef(leftKeyType, leftFieldCnt + i)
-})
-  if (leftJoinKey.length == 0 || rightJoinKey.length == 0) {
-throw new ValidationException(
-  "Currently the join key in Temporal Table Join " +
-"can not be empty.")
-  }
-  (leftJoinKey, rightJoinKey)
+val relBuilder = call.builder()
+relBuilder.push(leftInput)
+relBuilder.push(snapshot)
+val nonPushedJoin =
+  relBuilder.join(correlate.getJoinType, 
joinCondition).build().asInstanceOf[LogicalJoin]
+val rewriteJoin = RelOptUtil.pushDownJoinConditions(nonPushedJoin, 
relBuilder)

Review Comment:
   If they cannot they won't. If that results in that we won't have an equi 
join condition, this will fail as before.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-24239] Event time temporal join should support values from array, map, row, etc. as join key [flink]

2024-02-07 Thread via GitHub


dawidwys commented on code in PR #24253:
URL: https://github.com/apache/flink/pull/24253#discussion_r1481688174


##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala:
##
@@ -237,57 +245,32 @@ abstract class 
LogicalCorrelateToJoinFromGeneralTemporalTableRule(
 val snapshot = getLogicalSnapshot(call)
 
 val leftRowType = leftInput.getRowType
-val joinCondition = filterCondition.accept(new RexShuttle() {
-  // change correlate variable expression to normal RexInputRef (which is 
from left side)
-  override def visitFieldAccess(fieldAccess: RexFieldAccess): RexNode = {
-fieldAccess.getReferenceExpr match {
-  case corVar: RexCorrelVariable =>
-require(correlate.getCorrelationId.equals(corVar.id))
-val index = leftRowType.getFieldList.indexOf(fieldAccess.getField)
-RexInputRef.of(index, leftRowType)
-  case _ => super.visitFieldAccess(fieldAccess)
-}
-  }
-
-  // update the field index from right side
-  override def visitInputRef(inputRef: RexInputRef): RexNode = {
-val rightIndex = leftRowType.getFieldCount + inputRef.getIndex
-new RexInputRef(rightIndex, inputRef.getType)
-  }
-})
+val joinCondition = decorrelate(filterCondition, leftRowType, 
correlate.getCorrelationId)
 
 validateSnapshotInCorrelate(snapshot, correlate)
 
 val rexBuilder = correlate.getCluster.getRexBuilder
-val (leftJoinKey, rightJoinKey) = {
-  val relBuilder = call.builder()
-  relBuilder.push(leftInput)
-  relBuilder.push(snapshot)
-  val rewriteJoin = relBuilder.join(correlate.getJoinType, 
joinCondition).build()
-  val joinInfo = rewriteJoin.asInstanceOf[LogicalJoin].analyzeCondition()
-  val leftJoinKey = joinInfo.leftKeys.map(i => 
rexBuilder.makeInputRef(leftInput, i))
-  val leftFieldCnt = leftInput.getRowType.getFieldCount
-  val rightJoinKey = joinInfo.rightKeys.map(
-i => {
-  val leftKeyType = snapshot.getRowType.getFieldList.get(i).getType
-  rexBuilder.makeInputRef(leftKeyType, leftFieldCnt + i)
-})
-  if (leftJoinKey.length == 0 || rightJoinKey.length == 0) {
-throw new ValidationException(
-  "Currently the join key in Temporal Table Join " +
-"can not be empty.")
-  }
-  (leftJoinKey, rightJoinKey)
+val relBuilder = call.builder()
+relBuilder.push(leftInput)
+relBuilder.push(snapshot)
+val nonPushedJoin =
+  relBuilder.join(correlate.getJoinType, 
joinCondition).build().asInstanceOf[LogicalJoin]
+val rewriteJoin = RelOptUtil.pushDownJoinConditions(nonPushedJoin, 
relBuilder)

Review Comment:
   If they cannot they won't. If that results we won't have equi join 
condition, this will fail as before.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-24239] Event time temporal join should support values from array, map, row, etc. as join key [flink]

2024-02-07 Thread via GitHub


dawidwys commented on code in PR #24253:
URL: https://github.com/apache/flink/pull/24253#discussion_r1481687197


##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala:
##
@@ -323,15 +306,47 @@ abstract class 
LogicalCorrelateToJoinFromGeneralTemporalTableRule(
 }
 
 val builder = call.builder()
-val condition = builder.and(joinCondition, temporalCondition)
-
-builder.push(leftInput)
-builder.push(snapshot)
-builder.join(correlate.getJoinType, condition)
-val temporalJoin = builder.build()
+val condition = builder.and(actualJoin.getCondition, temporalCondition)
+
+val joinWithTemporalCondition = actualJoin.copy(
+  actualJoin.getTraitSet,
+  condition,
+  actualJoin.getLeft,
+  actualJoin.getRight,
+  actualJoin.getJoinType,
+  actualJoin.isSemiJoinDone)
+
+val temporalJoin = if (actualJoin != rewriteJoin) {
+  rewriteJoin.replaceInput(0, joinWithTemporalCondition)

Review Comment:
   What cases do you have in mind? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-24239] Event time temporal join should support values from array, map, row, etc. as join key [flink]

2024-02-07 Thread via GitHub


jnh5y commented on code in PR #24253:
URL: https://github.com/apache/flink/pull/24253#discussion_r1481681792


##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala:
##
@@ -237,57 +245,32 @@ abstract class 
LogicalCorrelateToJoinFromGeneralTemporalTableRule(
 val snapshot = getLogicalSnapshot(call)
 
 val leftRowType = leftInput.getRowType
-val joinCondition = filterCondition.accept(new RexShuttle() {
-  // change correlate variable expression to normal RexInputRef (which is 
from left side)
-  override def visitFieldAccess(fieldAccess: RexFieldAccess): RexNode = {
-fieldAccess.getReferenceExpr match {
-  case corVar: RexCorrelVariable =>
-require(correlate.getCorrelationId.equals(corVar.id))
-val index = leftRowType.getFieldList.indexOf(fieldAccess.getField)
-RexInputRef.of(index, leftRowType)
-  case _ => super.visitFieldAccess(fieldAccess)
-}
-  }
-
-  // update the field index from right side
-  override def visitInputRef(inputRef: RexInputRef): RexNode = {
-val rightIndex = leftRowType.getFieldCount + inputRef.getIndex
-new RexInputRef(rightIndex, inputRef.getType)
-  }
-})
+val joinCondition = decorrelate(filterCondition, leftRowType, 
correlate.getCorrelationId)
 
 validateSnapshotInCorrelate(snapshot, correlate)
 
 val rexBuilder = correlate.getCluster.getRexBuilder
-val (leftJoinKey, rightJoinKey) = {
-  val relBuilder = call.builder()
-  relBuilder.push(leftInput)
-  relBuilder.push(snapshot)
-  val rewriteJoin = relBuilder.join(correlate.getJoinType, 
joinCondition).build()
-  val joinInfo = rewriteJoin.asInstanceOf[LogicalJoin].analyzeCondition()
-  val leftJoinKey = joinInfo.leftKeys.map(i => 
rexBuilder.makeInputRef(leftInput, i))
-  val leftFieldCnt = leftInput.getRowType.getFieldCount
-  val rightJoinKey = joinInfo.rightKeys.map(
-i => {
-  val leftKeyType = snapshot.getRowType.getFieldList.get(i).getType
-  rexBuilder.makeInputRef(leftKeyType, leftFieldCnt + i)
-})
-  if (leftJoinKey.length == 0 || rightJoinKey.length == 0) {
-throw new ValidationException(
-  "Currently the join key in Temporal Table Join " +
-"can not be empty.")
-  }
-  (leftJoinKey, rightJoinKey)
+val relBuilder = call.builder()
+relBuilder.push(leftInput)
+relBuilder.push(snapshot)
+val nonPushedJoin =
+  relBuilder.join(correlate.getJoinType, 
joinCondition).build().asInstanceOf[LogicalJoin]
+val rewriteJoin = RelOptUtil.pushDownJoinConditions(nonPushedJoin, 
relBuilder)

Review Comment:
   In what situations can the join conditions not be pushed down?
   
   We need some validation for those situations?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-24239] Event time temporal join should support values from array, map, row, etc. as join key [flink]

2024-02-07 Thread via GitHub


jnh5y commented on code in PR #24253:
URL: https://github.com/apache/flink/pull/24253#discussion_r1481679233


##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala:
##
@@ -323,15 +306,47 @@ abstract class 
LogicalCorrelateToJoinFromGeneralTemporalTableRule(
 }
 
 val builder = call.builder()
-val condition = builder.and(joinCondition, temporalCondition)
-
-builder.push(leftInput)
-builder.push(snapshot)
-builder.join(correlate.getJoinType, condition)
-val temporalJoin = builder.build()
+val condition = builder.and(actualJoin.getCondition, temporalCondition)
+
+val joinWithTemporalCondition = actualJoin.copy(
+  actualJoin.getTraitSet,
+  condition,
+  actualJoin.getLeft,
+  actualJoin.getRight,
+  actualJoin.getJoinType,
+  actualJoin.isSemiJoinDone)
+
+val temporalJoin = if (actualJoin != rewriteJoin) {
+  rewriteJoin.replaceInput(0, joinWithTemporalCondition)

Review Comment:
   Are there test cases which cover adding the projection?  
   
   Also, does this run into any issues if there are multiple pushdowns?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-24239] Event time temporal join should support values from array, map, row, etc. as join key [flink]

2024-02-07 Thread via GitHub


jnh5y commented on code in PR #24253:
URL: https://github.com/apache/flink/pull/24253#discussion_r1481662098


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinTestPrograms.java:
##
@@ -84,6 +130,36 @@ public class TemporalJoinTestPrograms {
 + "ON o.currency = r.currency ")
 .build();
 
+static final TableTestProgram TEMPORAL_JOIN_TABLE_JOIN_NESTED_KEY =
+TableTestProgram.of(
+"temporal-join-table-join-nested-key",
+"validates temporal join with a table when the 
join keys comes from a nested row")
+.setupTableSource(ORDERS_WITH_NESTED_ID)
+.setupTableSource(RATES)
+.setupTableSink(AMOUNTS)
+.runSql(
+"INSERT INTO MySink "
++ "SELECT amount * r.rate "
++ "FROM OrdersNestedId AS o "
++ "JOIN RatesHistory FOR SYSTEM_TIME AS OF 
o.rowtime AS r "
++ "ON o.nested_row.currency = r.currency ")
+.build();
+
+static final TableTestProgram TEMPORAL_JOIN_TABLE_JOIN_KEY_FROM_MAP =
+TableTestProgram.of(
+"temporal-join-table-join-key-from-map",
+"validates temporal join with a table when the 
join key comes from a map value")
+.setupTableSource(ORDERS_WITH_NESTED_ID)
+.setupTableSource(RATES)
+.setupTableSink(AMOUNTS)
+.runSql(
+"INSERT INTO MySink "
++ "SELECT amount * r.rate "
++ "FROM OrdersNestedId AS o "
++ "JOIN RatesHistory FOR SYSTEM_TIME AS OF 
o.rowtime AS r "
++ "ON o.nested_map['currency'] = 
r.currency ")

Review Comment:
   Tests are added for covering what happens with the left-hand side of the 
join.  
   
   Do we need to verify anything for the right-hand side and/or both sides?
   
   Also, can join on the output of a function call?  (E.g., `ON 
UPPERCASE(o.nested_map['currency']) = r.currency`?)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-24239] Event time temporal join should support values from array, map, row, etc. as join key [flink]

2024-02-02 Thread via GitHub


flinkbot commented on PR #24253:
URL: https://github.com/apache/flink/pull/24253#issuecomment-1924021657

   
   ## CI report:
   
   * e06aee059e610ce72fb153eff7c8f466cde82bfa UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-24239] Event time temporal join should support values from array, map, row, etc. as join key [flink]

2024-02-02 Thread via GitHub


dawidwys opened a new pull request, #24253:
URL: https://github.com/apache/flink/pull/24253

   ## What is the purpose of the change
   
   This makes it possible to use more complex keys for temporal join. It tries 
pushing down join predicates before extracting join keys which makes more cases 
produce an equi-join condition required for a temporal join.
   
   
   ## Verifying this change
   
   Added tests in `TemporalJoinRestoreTest` covering keys from:
   * a nested row
   * value from a map
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't 
know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org