This is an automated email from the ASF dual-hosted git repository. godfrey pushed a commit to branch release-1.13 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push: new a321f65 [hotfix][table-planner-blink] Fix bug for window join: plan is wrong if join condition contains 'IS NOT DISTINCT FROM' a321f65 is described below commit a321f6550c24a10b598122e7e65d4b2d96f9db81 Author: Jing Zhang <beyond1...@126.com> AuthorDate: Wed Apr 21 12:31:20 2021 +0800 [hotfix][table-planner-blink] Fix bug for window join: plan is wrong if join condition contains 'IS NOT DISTINCT FROM' Fix Flink-22098 caused by a mistake when rebasing This closes #15695 (cherry picked from commit a0124007d6ed33988a4def18809c18baced0f45b) --- .../table/planner/plan/utils/WindowJoinUtil.scala | 44 ++++++++++++---------- .../plan/stream/sql/join/WindowJoinTest.xml | 2 +- 2 files changed, 26 insertions(+), 20 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowJoinUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowJoinUtil.scala index b9453f5..509fb27 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowJoinUtil.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowJoinUtil.scala @@ -86,43 +86,49 @@ object WindowJoinUtil { windowEndEqualityRightKeys) = excludeWindowStartEqualityAndEndEqualityFromJoinInfoPairs(join) - val joinInfo = join.analyzeCondition() + val joinSpec = JoinUtil.createJoinSpec(join) val (remainLeftKeys, remainRightKeys, remainCondition) = if ( windowStartEqualityLeftKeys.nonEmpty || windowEndEqualityLeftKeys.nonEmpty) { val leftChildFieldsType = join.getLeft.getRowType.getFieldList val rightChildFieldsType = join.getRight.getRowType.getFieldList val leftFieldCnt = join.getLeft.getRowType.getFieldCount val rexBuilder = join.getCluster.getRexBuilder - val remainEquals = mutable.ArrayBuffer[RexNode]() + val remainingConditions = mutable.ArrayBuffer[RexNode]() val remainLeftKeysArray = mutable.ArrayBuffer[Int]() val remainRightKeysArray = mutable.ArrayBuffer[Int]() // convert remain pairs to RexInputRef tuple for building SqlStdOperatorTable.EQUALS calls // or SqlStdOperatorTable.IS_NOT_DISTINCT_FROM - joinInfo.pairs().foreach { p => - if (!windowStartEqualityLeftKeys.contains(p.source) && - !windowEndEqualityLeftKeys.contains(p.source)) { - val leftFieldType = leftChildFieldsType.get(p.source).getType - val leftInputRef = new RexInputRef(p.source, leftFieldType) - val rightFieldType = rightChildFieldsType.get(p.target).getType - val rightIndex = leftFieldCnt + p.target + joinSpec.getLeftKeys.zip(joinSpec.getRightKeys). + zip(joinSpec.getFilterNulls).foreach { case ((source, target), filterNull) => + if (!windowStartEqualityLeftKeys.contains(source) && + !windowEndEqualityLeftKeys.contains(source)) { + val leftFieldType = leftChildFieldsType.get(source).getType + val leftInputRef = new RexInputRef(source, leftFieldType) + val rightFieldType = rightChildFieldsType.get(target).getType + val rightIndex = leftFieldCnt + target val rightInputRef = new RexInputRef(rightIndex, rightFieldType) - val remainEqual = rexBuilder.makeCall( - SqlStdOperatorTable.EQUALS, - leftInputRef, - rightInputRef) - remainEquals.add(remainEqual) - remainLeftKeysArray.add(p.source) - remainRightKeysArray.add(p.target) + val op = if (filterNull) { + SqlStdOperatorTable.EQUALS + } else { + SqlStdOperatorTable.IS_NOT_DISTINCT_FROM + } + val remainEqual = rexBuilder.makeCall(op, leftInputRef, rightInputRef) + remainingConditions += remainEqual + remainLeftKeysArray += source + remainRightKeysArray += target } } - val remainAnds = remainEquals ++ joinInfo.nonEquiConditions + val notEquiCondition = joinSpec.getNonEquiCondition + if (notEquiCondition.isPresent) { + remainingConditions += notEquiCondition.get() + } ( remainLeftKeysArray.toArray, remainRightKeysArray.toArray, // build a new condition - RexUtil.composeConjunction(rexBuilder, remainAnds.toList)) + RexUtil.composeConjunction(rexBuilder, remainingConditions.toList)) } else { - (joinInfo.leftKeys.toIntArray, joinInfo.rightKeys.toIntArray, join.getCondition) + (joinSpec.getLeftKeys, joinSpec.getRightKeys, join.getCondition) } ( diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml index 0210a61..e083914 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml @@ -537,7 +537,7 @@ LogicalProject(a=[$0], window_start=[$1], window_end=[$2], window_time=[$3], cnt </Resource> <Resource name="optimized rel plan"> <![CDATA[ -WindowJoin(leftWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[15 min])], rightWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[15 min])], joinType=[InnerJoin], where=[=(a, a0)], select=[a, window_start, window_end, window_time, cnt, uv, a0, window_start0, window_end0, window_time0, cnt0, uv0]) +WindowJoin(leftWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[15 min])], rightWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[15 min])], joinType=[InnerJoin], where=[IS NOT DISTINCT FROM(a, a0)], select=[a, window_start, window_end, window_time, cnt, uv, a0, window_start0, window_end0, window_time0, cnt0, uv0]) :- Exchange(distribution=[hash[a]]) : +- Calc(select=[a, window_start, window_end, window_time, cnt, uv]) : +- GlobalWindowAggregate(groupBy=[a], window=[TUMBLE(slice_end=[$slice_end], size=[15 min])], select=[a, COUNT(count1$0) AS cnt, COUNT(distinct$0 count$1) AS uv, start('w$) AS window_start, end('w$) AS window_end, rowtime('w$) AS window_time])