This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new a321f65  [hotfix][table-planner-blink] Fix bug for window join: plan 
is wrong if join condition contains 'IS NOT DISTINCT FROM'
a321f65 is described below

commit a321f6550c24a10b598122e7e65d4b2d96f9db81
Author: Jing Zhang <beyond1...@126.com>
AuthorDate: Wed Apr 21 12:31:20 2021 +0800

    [hotfix][table-planner-blink] Fix bug for window join: plan is wrong if 
join condition contains 'IS NOT DISTINCT FROM'
    
    Fix Flink-22098 caused by a mistake when rebasing
    
    This closes #15695
    
    (cherry picked from commit a0124007d6ed33988a4def18809c18baced0f45b)
---
 .../table/planner/plan/utils/WindowJoinUtil.scala  | 44 ++++++++++++----------
 .../plan/stream/sql/join/WindowJoinTest.xml        |  2 +-
 2 files changed, 26 insertions(+), 20 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowJoinUtil.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowJoinUtil.scala
index b9453f5..509fb27 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowJoinUtil.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowJoinUtil.scala
@@ -86,43 +86,49 @@ object WindowJoinUtil {
       windowEndEqualityRightKeys) =
       excludeWindowStartEqualityAndEndEqualityFromJoinInfoPairs(join)
 
-    val joinInfo = join.analyzeCondition()
+    val joinSpec = JoinUtil.createJoinSpec(join)
     val (remainLeftKeys, remainRightKeys, remainCondition) = if (
       windowStartEqualityLeftKeys.nonEmpty || 
windowEndEqualityLeftKeys.nonEmpty) {
       val leftChildFieldsType = join.getLeft.getRowType.getFieldList
       val rightChildFieldsType = join.getRight.getRowType.getFieldList
       val leftFieldCnt = join.getLeft.getRowType.getFieldCount
       val rexBuilder = join.getCluster.getRexBuilder
-      val remainEquals = mutable.ArrayBuffer[RexNode]()
+      val remainingConditions = mutable.ArrayBuffer[RexNode]()
       val remainLeftKeysArray = mutable.ArrayBuffer[Int]()
       val remainRightKeysArray = mutable.ArrayBuffer[Int]()
       // convert remain pairs to RexInputRef tuple for building 
SqlStdOperatorTable.EQUALS calls
       // or SqlStdOperatorTable.IS_NOT_DISTINCT_FROM
-      joinInfo.pairs().foreach { p =>
-        if (!windowStartEqualityLeftKeys.contains(p.source) &&
-          !windowEndEqualityLeftKeys.contains(p.source)) {
-          val leftFieldType = leftChildFieldsType.get(p.source).getType
-          val leftInputRef = new RexInputRef(p.source, leftFieldType)
-          val rightFieldType = rightChildFieldsType.get(p.target).getType
-          val rightIndex = leftFieldCnt + p.target
+      joinSpec.getLeftKeys.zip(joinSpec.getRightKeys).
+        zip(joinSpec.getFilterNulls).foreach { case ((source, target), 
filterNull) =>
+        if (!windowStartEqualityLeftKeys.contains(source) &&
+          !windowEndEqualityLeftKeys.contains(source)) {
+          val leftFieldType = leftChildFieldsType.get(source).getType
+          val leftInputRef = new RexInputRef(source, leftFieldType)
+          val rightFieldType = rightChildFieldsType.get(target).getType
+          val rightIndex = leftFieldCnt + target
           val rightInputRef = new RexInputRef(rightIndex, rightFieldType)
-          val remainEqual = rexBuilder.makeCall(
-            SqlStdOperatorTable.EQUALS,
-            leftInputRef,
-            rightInputRef)
-          remainEquals.add(remainEqual)
-          remainLeftKeysArray.add(p.source)
-          remainRightKeysArray.add(p.target)
+          val op = if (filterNull) {
+            SqlStdOperatorTable.EQUALS
+          } else {
+            SqlStdOperatorTable.IS_NOT_DISTINCT_FROM
+          }
+          val remainEqual = rexBuilder.makeCall(op, leftInputRef, 
rightInputRef)
+          remainingConditions += remainEqual
+          remainLeftKeysArray += source
+          remainRightKeysArray += target
         }
       }
-      val remainAnds = remainEquals ++ joinInfo.nonEquiConditions
+      val notEquiCondition = joinSpec.getNonEquiCondition
+      if (notEquiCondition.isPresent) {
+        remainingConditions += notEquiCondition.get()
+      }
       (
         remainLeftKeysArray.toArray,
         remainRightKeysArray.toArray,
         // build a new condition
-        RexUtil.composeConjunction(rexBuilder, remainAnds.toList))
+        RexUtil.composeConjunction(rexBuilder, remainingConditions.toList))
     } else {
-      (joinInfo.leftKeys.toIntArray, joinInfo.rightKeys.toIntArray, 
join.getCondition)
+      (joinSpec.getLeftKeys, joinSpec.getRightKeys, join.getCondition)
     }
 
     (
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml
index 0210a61..e083914 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml
@@ -537,7 +537,7 @@ LogicalProject(a=[$0], window_start=[$1], window_end=[$2], 
window_time=[$3], cnt
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-WindowJoin(leftWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], 
size=[15 min])], rightWindow=[TUMBLE(win_start=[window_start], 
win_end=[window_end], size=[15 min])], joinType=[InnerJoin], where=[=(a, a0)], 
select=[a, window_start, window_end, window_time, cnt, uv, a0, window_start0, 
window_end0, window_time0, cnt0, uv0])
+WindowJoin(leftWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], 
size=[15 min])], rightWindow=[TUMBLE(win_start=[window_start], 
win_end=[window_end], size=[15 min])], joinType=[InnerJoin], where=[IS NOT 
DISTINCT FROM(a, a0)], select=[a, window_start, window_end, window_time, cnt, 
uv, a0, window_start0, window_end0, window_time0, cnt0, uv0])
 :- Exchange(distribution=[hash[a]])
 :  +- Calc(select=[a, window_start, window_end, window_time, cnt, uv])
 :     +- GlobalWindowAggregate(groupBy=[a], 
window=[TUMBLE(slice_end=[$slice_end], size=[15 min])], select=[a, 
COUNT(count1$0) AS cnt, COUNT(distinct$0 count$1) AS uv, start('w$) AS 
window_start, end('w$) AS window_end, rowtime('w$) AS window_time])

Reply via email to