beyond1920 commented on a change in pull request #16620: URL: https://github.com/apache/flink/pull/16620#discussion_r678085882
########## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalWindowAggregateRuleBase.scala ########## @@ -327,9 +328,17 @@ abstract class LogicalWindowAggregateRuleBase(description: String) windowExpression: RexCall): RexNode /** Returns the expression that replaces the window expression after the aggregation. */ - private[table] def getOutAggregateGroupExpression( + private def getOutAggregateGroupExpression( rexBuilder: RexBuilder, - windowExpression: RexCall): RexNode + windowExpression: RexCall): RexNode = { + val zeroLiteral = rexBuilder.makeZeroLiteral(windowExpression.getType) + if (isTimeIndicatorType(windowExpression.getType)) { + // cast zero literal to time indicator field Review comment: In the previous version, this would not happen because the rowtime indicator in group key would be materialized to regular timestamp. But after we move time indicator materialize after logical_rewrite, the rule need to encounter the window expression in group key. It's safe to simply cast the literal to time indicator type, because the window expression column in group key would be projected out in the successor Project node. ########## File path: flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/IntervalJoinTest.xml ########## @@ -255,7 +255,7 @@ LogicalProject(a=[$0], b=[$6]) <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[a, b]) -+- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=false, leftLowerBound=-3600000, leftUpperBound=3600000, leftTimeIndex=1, rightTimeIndex=2], where=[((a = a0) AND (PROCTIME_MATERIALIZE(proctime) >= (PROCTIME_MATERIALIZE(proctime0) - 3600000:INTERVAL HOUR)) AND (PROCTIME_MATERIALIZE(proctime) <= (PROCTIME_MATERIALIZE(proctime0) + 3600000:INTERVAL HOUR)))], select=[a, proctime, a0, b, proctime0]) ++- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=false, leftLowerBound=-3600000, leftUpperBound=3600000, leftTimeIndex=1, rightTimeIndex=2], where=[((a = a0) AND (proctime >= (proctime0 - 3600000:INTERVAL HOUR)) AND (proctime <= (proctime0 + 3600000:INTERVAL HOUR)))], select=[a, proctime, a0, b, proctime0]) Review comment: The xml is changed because after predicate_pushdown phase, the filter would be pushed down into join condition, while join condition conversion logical in RelTimeIndicatorConverter is different with filter condition conversion in the previous version. ########## File path: flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/JoinTest.xml ########## @@ -158,12 +158,12 @@ LogicalProject(a=[$0], e=[$5], lptime=[$3]) <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[a, e, PROCTIME_MATERIALIZE(lptime) AS lptime]) -+- Join(joinType=[InnerJoin], where=[((a = d) AND ($f4 = $f40))], select=[a, lptime, $f4, d, e, $f40], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) - :- Exchange(distribution=[hash[a, $f4]]) - : +- Calc(select=[a, lptime, PROCTIME_MATERIALIZE(lptime) AS $f4]) ++- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=false, leftLowerBound=0, leftUpperBound=0, leftTimeIndex=1, rightTimeIndex=2], where=[((a = d) AND (lptime = rptime))], select=[a, lptime, d, e, rptime]) Review comment: The xml is changed because after refactor time indicator materialization, the two case would be translated to IntervalJoin as expected. ########## File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/IntervalJoinTest.scala ########## @@ -230,7 +230,6 @@ class IntervalJoinTest extends TableTestBase { @Test def testJoinWithEquiProcTime(): Unit = { - // TODO: this should be translated into window join Review comment: done ########## File path: flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.xml ########## @@ -207,7 +207,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], rowNum=[$5]) </Resource> <Resource name="optimized exec plan"> <![CDATA[ -Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, 1:BIGINT AS rowNum]) +Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, 1:BIGINT AS $5]) Review comment: The xml is changed because after apply `ProjectToWindowRule`, the new generated LogicalWindow contains a field which name is w0$o0 ########## File path: flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.xml ########## @@ -76,7 +76,7 @@ GroupAggregate(groupBy=[b], partialFinalType=[FINAL], select=[b, FIRST_VALUE_RET +- Exchange(distribution=[hash[b]], changelogMode=[I,UB,UA,D]) +- GroupAggregate(groupBy=[b, $f2], partialFinalType=[PARTIAL], select=[b, $f2, FIRST_VALUE_RETRACT(c) AS $f2_0, LAST_VALUE_RETRACT(c) AS $f3, COUNT_RETRACT(DISTINCT c) AS $f4], changelogMode=[I,UB,UA,D]) +- Exchange(distribution=[hash[b, $f2]], changelogMode=[I,UB,UA,D]) - +- Calc(select=[b, c, MOD(HASH_CODE(c), 1024) AS $f2], changelogMode=[I,UB,UA,D]) + +- Calc(select=[$f1 AS b, $f2 AS c, MOD(HASH_CODE($f2), 1024) AS $f2], changelogMode=[I,UB,UA,D]) Review comment: The xml is changed because after logical phase, a FlinkLogicalCalc which rowtype of it's input node is different with the inputRowType of it's RexProgram would be generated. ########## File path: flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MatchRecognizeTest.xml ########## @@ -117,9 +117,9 @@ LogicalProject(symbol=[$0], dPrice=[$1], matchRowtime=[$2]) Match(partitionBy=[symbol], orderBy=[matchRowtime ASC], measures=[FINAL(A.price) AS dPrice, FINAL(A.matchRowtime) AS matchRowtime], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP TO NEXT ROW], pattern=[_UTF-16LE'A'], define=[{A=>=(PREV(A.$1, 0), -(CURRENT_TIMESTAMP, 86400000:INTERVAL DAY))}]) +- Exchange(distribution=[hash[symbol]]) +- Calc(select=[symbol, matchRowtime, price, w$start AS startTime]) - +- GroupWindowAggregate(groupBy=[symbol, matchRowtime, price], window=[TumblingGroupWindow('w$, matchRowtime0, 3000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[symbol, matchRowtime, price, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]) - +- Exchange(distribution=[hash[symbol, matchRowtime, price]]) - +- Calc(select=[symbol, CAST(matchRowtime) AS matchRowtime, price, matchRowtime AS matchRowtime0]) + +- GroupWindowAggregate(groupBy=[symbol, price, matchRowtime], window=[TumblingGroupWindow('w$, matchRowtime0, 3000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[symbol, price, matchRowtime, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]) Review comment: The xml is changed because after `logical` phase, project contains redundant expression would be simplified. ########## File path: flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.xml ########## @@ -238,7 +238,7 @@ GroupAggregate(groupBy=[b], partialFinalType=[FINAL], select=[b, FIRST_VALUE_RET +- Exchange(distribution=[hash[b]], changelogMode=[I,UB,UA,D]) +- GroupAggregate(groupBy=[b, $f2], partialFinalType=[PARTIAL], select=[b, $f2, FIRST_VALUE_RETRACT(c) AS $f2_0, LAST_VALUE_RETRACT(c) AS $f3, COUNT_RETRACT(DISTINCT c) AS $f4], changelogMode=[I,UB,UA,D]) +- Exchange(distribution=[hash[b, $f2]], changelogMode=[I,UB,UA,D]) - +- Calc(select=[b, c, MOD(HASH_CODE(c), 1024) AS $f2], changelogMode=[I,UB,UA,D]) + +- Calc(select=[$f1 AS b, $f2 AS c, MOD(HASH_CODE($f2), 1024) AS $f2], changelogMode=[I,UB,UA,D]) Review comment: The xml is changed because after logical phase, a FlinkLogicalCalc which rowtype of it's input node is different with the inputRowType of it's RexProgram would be generated. ########## File path: flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RelTimeIndicatorConverterTest.xml ########## @@ -267,9 +267,9 @@ LogicalProject(EXPR$0=[$2], long=[$0]) <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[EXPR$0, long]) -+- GroupWindowAggregate(groupBy=[long], window=[TumblingGroupWindow('w$, rowtime, 100)], select=[long, MIN(rowtime0) AS EXPR$0]) ++- GroupWindowAggregate(groupBy=[long], window=[TumblingGroupWindow('w$, rowtime, 100)], select=[long, MIN(rowtime) AS EXPR$0]) Review comment: The xml is changed, because after `project_rewrite` phase, the identify `project` is removed. The new project is generated by `RelTimeIndicatorConverter` when materialize `Aggregate` node. ########## File path: flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/WindowRankTest.xml ########## @@ -212,8 +212,8 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[$ </Resource> <Resource name="optimized rel plan"> <![CDATA[ -Calc(select=[a, b, c, d, e, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, window_start, window_end, window_time, rownum]) -+- WindowRank(window=[CUMULATE(win_start=[window_start], win_end=[window_end], max_size=[1 h], step=[10 min])], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=3], partitionBy=[a], orderBy=[b DESC], select=[a, b, c, d, e, rowtime, proctime, window_start, window_end, window_time, rownum]) +Calc(select=[a, b, c, d, e, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, window_start, window_end, window_time, w0$o0]) Review comment: The xml is changed because after apply ProjectToWindowRule, the new generated LogicalWindow contains a field which name is w0$o0 ########## File path: flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/IntervalJoinTest.xml ########## @@ -42,7 +42,7 @@ LogicalProject(b=[$1], aSum=[$2], bCnt=[$3]) GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow('w$, rowtime, 21600000)], select=[b, SUM(a0) AS aSum, COUNT(b0) AS bCnt]) +- Exchange(distribution=[hash[b]]) +- Calc(select=[rowtime, b, a0, b0]) - +- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-600000, leftUpperBound=3600000, leftTimeIndex=2, rightTimeIndex=2], where=[((a = a0) AND (CAST(rowtime) >= (CAST(rowtime0) - 600000:INTERVAL MINUTE)) AND (CAST(rowtime) <= (CAST(rowtime0) + 3600000:INTERVAL HOUR)))], select=[a, b, rowtime, a0, b0, rowtime0]) + +- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-600000, leftUpperBound=3600000, leftTimeIndex=2, rightTimeIndex=2], where=[((a = a0) AND (rowtime >= (rowtime0 - 600000:INTERVAL MINUTE)) AND (rowtime <= (rowtime0 + 3600000:INTERVAL HOUR)))], select=[a, b, rowtime, a0, b0, rowtime0]) Review comment: The xml is changed because after predicate_pushdown phase, the filter would be pushed down into join condition, while join condition conversion logical in RelTimeIndicatorConverter is different with filter condition conversion in the previous version. ########## File path: flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml ########## @@ -138,11 +138,11 @@ Calc(select=[a, b, $f3 AS EXPR$2, $f4 AS EXPR$3, $f5 AS uv]) +- Calc(select=[a, b, window_start, window_end, $e, $f8, $f9, $f5 AS $f7, $f6 AS $f8_0, $f7 AS $f9_0]) +- WindowAggregate(groupBy=[a, b, $e, $f8, $f9], window=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[15 min])], select=[a, b, $e, $f8, $f9, COUNT(*) FILTER $g_3 AS $f5, MAX(d) FILTER $g_1 AS $f6, COUNT(DISTINCT window_time) FILTER $g_2 AS $f7, start('w$) AS window_start, end('w$) AS window_end]) +- Exchange(distribution=[hash[a, b, $e, $f8, $f9]]) - +- Calc(select=[a, b, window_start, window_end, d, $f5, window_time, $e, $f8, $f9, =($e_0, 3) AS $g_3, AND(=($e_0, 1), $f5) AS $g_1, =($e_0, 2) AS $g_2]) + +- Calc(select=[a, b, window_start, window_end, d, $f5, CAST(window_time) AS window_time, $e, $f8, $f9, =($e_0, 3) AS $g_3, AND(=($e_0, 1), $f5) AS $g_1, =($e_0, 2) AS $g_2]) Review comment: The xml is changed, because after project_rewrite phase, the identify project is removed. ########## File path: flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.xml ########## @@ -16,6 +16,37 @@ See the License for the specific language governing permissions and limitations under the License. --> <Root> + <TestCase name="testAggFilterClauseBothWithAvgAndCount"> Review comment: The xml is changed because after `logical` phase, a `FlinkLogicalCalc` which rowtype of it's input node is different with the inputRowType of it's RexProgram would be generated. ########## File path: flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/api/stream/ExplainTest.xml ########## @@ -433,7 +433,7 @@ LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], fields=[a, +- GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow('w$, rowtime, 8000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1], changelogMode=[I]): rowcount = , cumulative cost = {rows, cpu, io, network, memory} +- Exchange(distribution=[hash[id1]], changelogMode=[I]): rowcount = , cumulative cost = {rows, cpu, io, network, memory} +- Calc(select=[id1, rowtime, text, _UTF-16LE'#' AS $f3], changelogMode=[I]): rowcount = , cumulative cost = {rows, cpu, io, network, memory} - +- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-299999, leftUpperBound=179999, leftTimeIndex=2, rightTimeIndex=4], where=[AND(=(id1, id2), >(CAST(rowtime), -(CAST(rowtime0), 300000:INTERVAL MINUTE)), <(CAST(rowtime), +(CAST(rowtime0), 180000:INTERVAL MINUTE)))], select=[id1, text, rowtime, id2, cnt, name, goods, rowtime0], changelogMode=[I]): rowcount = , cumulative cost = {rows, cpu, io, network, memory} + +- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-299999, leftUpperBound=179999, leftTimeIndex=2, rightTimeIndex=4], where=[AND(=(id1, id2), >(rowtime, -(rowtime0, 300000:INTERVAL MINUTE)), <(rowtime, +(rowtime0, 180000:INTERVAL MINUTE)))], select=[id1, text, rowtime, id2, cnt, name, goods, rowtime0], changelogMode=[I]): rowcount = , cumulative cost = {rows, cpu, io, network, memory} Review comment: The xml is changed because after `predicate_pushdown` phase, the filter would be pushed down into join condition, while join condition conversion logical in `RelTimeIndicatorConverter` is different with filter condition conversion in the previous version. ########## File path: flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/IntervalJoinTest.xml ########## @@ -314,12 +314,12 @@ LogicalProject(a=[$0], b=[$6]) <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[a, b]) -+- Join(joinType=[InnerJoin], where=[((a = a0) AND ($f5 = $f50))], select=[a, $f5, a0, b, $f50], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) - :- Exchange(distribution=[hash[a, $f5]]) - : +- Calc(select=[a, PROCTIME_MATERIALIZE(proctime) AS $f5]) ++- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=false, leftLowerBound=0, leftUpperBound=0, leftTimeIndex=1, rightTimeIndex=2], where=[((a = a0) AND (proctime = proctime0))], select=[a, proctime, a0, b, proctime0]) Review comment: The xml is changed because after refactor time indicator materialization, the two case would be translated to IntervalJoin as expected. ########## File path: flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/IntervalJoinTest.xml ########## @@ -440,7 +440,7 @@ LogicalProject(b=[$1], aSum=[$2], bCnt=[$3]) GroupWindowAggregate(groupBy=[b0], window=[TumblingGroupWindow('w$, rowtime0, 21600000)], select=[b0, SUM(a) AS aSum, COUNT(b) AS bCnt]) +- Exchange(distribution=[hash[b0]]) +- Calc(select=[rowtime0, b0, a, b]) - +- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-600000, leftUpperBound=3600000, leftTimeIndex=2, rightTimeIndex=2], where=[((a = a0) AND (CAST(rowtime) >= (CAST(rowtime0) - 600000:INTERVAL MINUTE)) AND (CAST(rowtime) <= (CAST(rowtime0) + 3600000:INTERVAL HOUR)))], select=[a, b, rowtime, a0, b0, rowtime0]) + +- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-600000, leftUpperBound=3600000, leftTimeIndex=2, rightTimeIndex=2], where=[((a = a0) AND (rowtime >= (rowtime0 - 600000:INTERVAL MINUTE)) AND (rowtime <= (rowtime0 + 3600000:INTERVAL HOUR)))], select=[a, b, rowtime, a0, b0, rowtime0]) Review comment: The xml is changed because after predicate_pushdown phase, the filter would be pushed down into join condition, while join condition conversion logical in RelTimeIndicatorConverter is different with filter condition conversion in the previous version. ########## File path: flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/TemporalJoinTest.xml ########## @@ -522,7 +522,7 @@ LogicalProject(amount=[$0], currency=[$1], rowtime=[$2], proctime=[$3], currency </Resource> <Resource name="optimized exec plan"> <![CDATA[ -Calc(select=[amount, currency, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, currency0, rate, PROCTIME_MATERIALIZE(proctime0) AS proctime0]) +Calc(select=[amount, currency, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, currency0, rate, PROCTIME_MATERIALIZE($2) AS $2]) Review comment: The xml is changed, because after project_rewrite phase, the identify project is removed. ########## File path: flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/IntervalJoinTest.xml ########## @@ -107,12 +107,12 @@ LogicalProject(a=[$0], b=[$6]) <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[a, b]) -+- Join(joinType=[InnerJoin], where=[((a = a0) AND (rowtime0 = rowtime00))], select=[a, rowtime0, a0, b, rowtime00], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) - :- Exchange(distribution=[hash[a, rowtime0]]) - : +- Calc(select=[a, CAST(rowtime) AS rowtime0]) ++- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=0, leftUpperBound=0, leftTimeIndex=1, rightTimeIndex=2], where=[((a = a0) AND (rowtime = rowtime0))], select=[a, rowtime, a0, b, rowtime0]) Review comment: The xml is changed because after refactor time indicator materialization, the two case would be translated to `IntervalJoin` as expected. ########## File path: flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml ########## @@ -357,7 +357,7 @@ LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], fields=[a, +- GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow('w$, rowtime, 6000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[id1, LISTAGG(text, $f3) AS text, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]) +- Exchange(distribution=[hash[id1]]) +- Calc(select=[rowtime, id1, text, _UTF-16LE'#' AS $f3]) - +- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-299999, leftUpperBound=179999, leftTimeIndex=1, rightTimeIndex=1], where=[AND(=(id1, id2), >(CAST(rowtime), -(CAST(rowtime0), 300000:INTERVAL MINUTE)), <(CAST(rowtime), +(CAST(rowtime0), 180000:INTERVAL MINUTE)))], select=[id1, rowtime, text, id2, rowtime0, cnt, name, goods]) + +- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-299999, leftUpperBound=179999, leftTimeIndex=1, rightTimeIndex=1], where=[AND(=(id1, id2), >(rowtime, -(rowtime0, 300000:INTERVAL MINUTE)), <(rowtime, +(rowtime0, 180000:INTERVAL MINUTE)))], select=[id1, rowtime, text, id2, rowtime0, cnt, name, goods]) Review comment: The xml is changed because after predicate_pushdown phase, the filter would be pushed down into join condition, while join condition conversion logical in RelTimeIndicatorConverter is different with filter condition conversion in the previous version. ########## File path: flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/JoinTest.xml ########## @@ -70,7 +70,7 @@ LogicalFilter(condition=[AND(=($0, $4), >=($3, -($7, 300000:INTERVAL DAY TO SECO </Resource> <Resource name="optimized exec plan"> <![CDATA[ -IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-300000, leftUpperBound=-1, leftTimeIndex=3, rightTimeIndex=3], where=[((a = d) AND (CAST(lrtime) >= (CAST(rrtime) - 300000:INTERVAL DAY TO SECOND)) AND (CAST(lrtime) < CAST(rrtime)) AND (CAST(lrtime) > f))], select=[a, b, c, lrtime, d, e, f, rrtime]) +IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-300000, leftUpperBound=-1, leftTimeIndex=3, rightTimeIndex=3], where=[((a = d) AND (lrtime >= (rrtime - 300000:INTERVAL DAY TO SECOND)) AND (lrtime < rrtime) AND (lrtime > f))], select=[a, b, c, lrtime, d, e, f, rrtime]) Review comment: The xml is changed because after predicate_pushdown phase, the filter would be pushed down into join condition, while join condition conversion logical in RelTimeIndicatorConverter is different with filter condition conversion in the previous version. ########## File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/IntervalJoinTest.scala ########## @@ -242,7 +241,6 @@ class IntervalJoinTest extends TableTestBase { @Test def testJoinWithEquiRowTime(): Unit = { - // TODO: this should be translated into window join Review comment: done ########## File path: flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/UnionTest.xml ########## @@ -16,6 +16,42 @@ See the License for the specific language governing permissions and limitations under the License. --> <Root> + <TestCase name="testUnionDiffRowTime"> Review comment: The xml is changed because the plan of `UnionTest#testUnionDiffRowTime` could be generated now. In `DEFAULT_REWRITE` phase, after apply `CoerceInputsRule(classOf[LogicalUnion]`, optimizer would pre-casts inputs to a particular type to ensure union set operator have the same row type. ########## File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/UnionTest.scala ########## @@ -112,9 +112,6 @@ class UnionTest extends TableTestBase { @Test def testUnionDiffRowTime(): Unit = { - expectedException.expectMessage( Review comment: UnionTest#testUnionDiffRowTime could work fine now. In DEFAULT_REWRITE phase, after apply CoerceInputsRule(classOf[LogicalUnion], optimizer would pre-casts inputs to a particular type to ensure union set operator have the same row type. ########## File path: flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/IntervalJoinTest.xml ########## @@ -557,7 +557,7 @@ LogicalProject(a=[$0], b=[$6]) <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[a, b]) -+- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-600000, leftUpperBound=3600000, leftTimeIndex=1, rightTimeIndex=2], where=[((a = a0) AND (CAST(rowtime) >= (CAST(rowtime0) - 600000:INTERVAL MINUTE)) AND (CAST(rowtime) <= (CAST(rowtime0) + 3600000:INTERVAL HOUR)))], select=[a, rowtime, a0, b, rowtime0]) ++- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-600000, leftUpperBound=3600000, leftTimeIndex=1, rightTimeIndex=2], where=[((a = a0) AND (rowtime >= (rowtime0 - 600000:INTERVAL MINUTE)) AND (rowtime <= (rowtime0 + 3600000:INTERVAL HOUR)))], select=[a, rowtime, a0, b, rowtime0]) Review comment: The xml is changed because after predicate_pushdown phase, the filter would be pushed down into join condition, while join condition conversion logical in RelTimeIndicatorConverter is different with filter condition conversion in the previous version. ########## File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/TemporalJoinRewriteWithUniqueKeyRuleTest.scala ########## @@ -35,7 +35,7 @@ class TemporalJoinRewriteWithUniqueKeyRuleTest extends TableTestBase { @Before def setup(): Unit = { - util.buildStreamProgram(LOGICAL_REWRITE) + util.buildStreamProgram(PHYSICAL) Review comment: To include `TIME_INDICATOR` phase -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org