beyond1920 commented on a change in pull request #16620:
URL: https://github.com/apache/flink/pull/16620#discussion_r678085882



##########
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalWindowAggregateRuleBase.scala
##########
@@ -327,9 +328,17 @@ abstract class LogicalWindowAggregateRuleBase(description: 
String)
       windowExpression: RexCall): RexNode
 
   /** Returns the expression that replaces the window expression after the 
aggregation. */
-  private[table] def getOutAggregateGroupExpression(
+  private def getOutAggregateGroupExpression(
       rexBuilder: RexBuilder,
-      windowExpression: RexCall): RexNode
+      windowExpression: RexCall): RexNode = {
+    val zeroLiteral = rexBuilder.makeZeroLiteral(windowExpression.getType)
+    if (isTimeIndicatorType(windowExpression.getType)) {
+      // cast zero literal to time indicator field

Review comment:
       In the previous version, this would not happen because the rowtime 
indicator in group key would be materialized to regular timestamp.
   But after we move time indicator materialize after logical_rewrite, the rule 
need to encounter the window expression in group key.
   It's safe to simply cast the literal to time indicator type, because the 
window expression column in group key would be projected out in the successor 
Project node.

##########
File path: 
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/IntervalJoinTest.xml
##########
@@ -255,7 +255,7 @@ LogicalProject(a=[$0], b=[$6])
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, b])
-+- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=false, 
leftLowerBound=-3600000, leftUpperBound=3600000, leftTimeIndex=1, 
rightTimeIndex=2], where=[((a = a0) AND (PROCTIME_MATERIALIZE(proctime) >= 
(PROCTIME_MATERIALIZE(proctime0) - 3600000:INTERVAL HOUR)) AND 
(PROCTIME_MATERIALIZE(proctime) <= (PROCTIME_MATERIALIZE(proctime0) + 
3600000:INTERVAL HOUR)))], select=[a, proctime, a0, b, proctime0])
++- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=false, 
leftLowerBound=-3600000, leftUpperBound=3600000, leftTimeIndex=1, 
rightTimeIndex=2], where=[((a = a0) AND (proctime >= (proctime0 - 
3600000:INTERVAL HOUR)) AND (proctime <= (proctime0 + 3600000:INTERVAL 
HOUR)))], select=[a, proctime, a0, b, proctime0])

Review comment:
       The xml is changed because after predicate_pushdown phase, the filter 
would be pushed down into join condition, while join condition conversion 
logical in RelTimeIndicatorConverter is different with filter condition 
conversion in the previous version.
   
   

##########
File path: 
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/JoinTest.xml
##########
@@ -158,12 +158,12 @@ LogicalProject(a=[$0], e=[$5], lptime=[$3])
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, e, PROCTIME_MATERIALIZE(lptime) AS lptime])
-+- Join(joinType=[InnerJoin], where=[((a = d) AND ($f4 = $f40))], select=[a, 
lptime, $f4, d, e, $f40], leftInputSpec=[NoUniqueKey], 
rightInputSpec=[NoUniqueKey])
-   :- Exchange(distribution=[hash[a, $f4]])
-   :  +- Calc(select=[a, lptime, PROCTIME_MATERIALIZE(lptime) AS $f4])
++- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=false, 
leftLowerBound=0, leftUpperBound=0, leftTimeIndex=1, rightTimeIndex=2], 
where=[((a = d) AND (lptime = rptime))], select=[a, lptime, d, e, rptime])

Review comment:
       The xml is changed because after refactor time indicator 
materialization, the two case would be translated to IntervalJoin as expected.

##########
File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/IntervalJoinTest.scala
##########
@@ -230,7 +230,6 @@ class IntervalJoinTest extends TableTestBase {
 
   @Test
   def testJoinWithEquiProcTime(): Unit = {
-    // TODO: this should be translated into window join

Review comment:
       done

##########
File path: 
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.xml
##########
@@ -207,7 +207,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], 
rowtime=[$4], rowNum=[$5])
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, 
1:BIGINT AS rowNum])
+Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, 
1:BIGINT AS $5])

Review comment:
       The xml is changed because after  apply `ProjectToWindowRule`, the new 
generated LogicalWindow contains a field which name is w0$o0

##########
File path: 
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.xml
##########
@@ -76,7 +76,7 @@ GroupAggregate(groupBy=[b], partialFinalType=[FINAL], 
select=[b, FIRST_VALUE_RET
 +- Exchange(distribution=[hash[b]], changelogMode=[I,UB,UA,D])
    +- GroupAggregate(groupBy=[b, $f2], partialFinalType=[PARTIAL], select=[b, 
$f2, FIRST_VALUE_RETRACT(c) AS $f2_0, LAST_VALUE_RETRACT(c) AS $f3, 
COUNT_RETRACT(DISTINCT c) AS $f4], changelogMode=[I,UB,UA,D])
       +- Exchange(distribution=[hash[b, $f2]], changelogMode=[I,UB,UA,D])
-         +- Calc(select=[b, c, MOD(HASH_CODE(c), 1024) AS $f2], 
changelogMode=[I,UB,UA,D])
+         +- Calc(select=[$f1 AS b, $f2 AS c, MOD(HASH_CODE($f2), 1024) AS 
$f2], changelogMode=[I,UB,UA,D])

Review comment:
       The xml is changed because after logical phase, a FlinkLogicalCalc which 
rowtype of it's input node is different with the inputRowType of it's 
RexProgram would be generated.

##########
File path: 
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MatchRecognizeTest.xml
##########
@@ -117,9 +117,9 @@ LogicalProject(symbol=[$0], dPrice=[$1], matchRowtime=[$2])
 Match(partitionBy=[symbol], orderBy=[matchRowtime ASC], 
measures=[FINAL(A.price) AS dPrice, FINAL(A.matchRowtime) AS matchRowtime], 
rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP TO NEXT ROW], 
pattern=[_UTF-16LE'A'], define=[{A=>=(PREV(A.$1, 0), -(CURRENT_TIMESTAMP, 
86400000:INTERVAL DAY))}])
 +- Exchange(distribution=[hash[symbol]])
    +- Calc(select=[symbol, matchRowtime, price, w$start AS startTime])
-      +- GroupWindowAggregate(groupBy=[symbol, matchRowtime, price], 
window=[TumblingGroupWindow('w$, matchRowtime0, 3000)], properties=[w$start, 
w$end, w$rowtime, w$proctime], select=[symbol, matchRowtime, price, start('w$) 
AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS 
w$proctime])
-         +- Exchange(distribution=[hash[symbol, matchRowtime, price]])
-            +- Calc(select=[symbol, CAST(matchRowtime) AS matchRowtime, price, 
matchRowtime AS matchRowtime0])
+      +- GroupWindowAggregate(groupBy=[symbol, price, matchRowtime], 
window=[TumblingGroupWindow('w$, matchRowtime0, 3000)], properties=[w$start, 
w$end, w$rowtime, w$proctime], select=[symbol, price, matchRowtime, start('w$) 
AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS 
w$proctime])

Review comment:
       The xml is changed because after `logical` phase, project contains 
redundant expression would be simplified.

##########
File path: 
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.xml
##########
@@ -238,7 +238,7 @@ GroupAggregate(groupBy=[b], partialFinalType=[FINAL], 
select=[b, FIRST_VALUE_RET
 +- Exchange(distribution=[hash[b]], changelogMode=[I,UB,UA,D])
    +- GroupAggregate(groupBy=[b, $f2], partialFinalType=[PARTIAL], select=[b, 
$f2, FIRST_VALUE_RETRACT(c) AS $f2_0, LAST_VALUE_RETRACT(c) AS $f3, 
COUNT_RETRACT(DISTINCT c) AS $f4], changelogMode=[I,UB,UA,D])
       +- Exchange(distribution=[hash[b, $f2]], changelogMode=[I,UB,UA,D])
-         +- Calc(select=[b, c, MOD(HASH_CODE(c), 1024) AS $f2], 
changelogMode=[I,UB,UA,D])
+         +- Calc(select=[$f1 AS b, $f2 AS c, MOD(HASH_CODE($f2), 1024) AS 
$f2], changelogMode=[I,UB,UA,D])

Review comment:
       The xml is changed because after logical phase, a FlinkLogicalCalc which 
rowtype of it's input node is different with the inputRowType of it's 
RexProgram would be generated.

##########
File path: 
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RelTimeIndicatorConverterTest.xml
##########
@@ -267,9 +267,9 @@ LogicalProject(EXPR$0=[$2], long=[$0])
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[EXPR$0, long])
-+- GroupWindowAggregate(groupBy=[long], window=[TumblingGroupWindow('w$, 
rowtime, 100)], select=[long, MIN(rowtime0) AS EXPR$0])
++- GroupWindowAggregate(groupBy=[long], window=[TumblingGroupWindow('w$, 
rowtime, 100)], select=[long, MIN(rowtime) AS EXPR$0])

Review comment:
       The xml is changed, because after `project_rewrite` phase, the identify 
`project` is removed. The new project is generated by 
`RelTimeIndicatorConverter` when materialize `Aggregate` node.

##########
File path: 
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/WindowRankTest.xml
##########
@@ -212,8 +212,8 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[$
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Calc(select=[a, b, c, d, e, rowtime, PROCTIME_MATERIALIZE(proctime) AS 
proctime, window_start, window_end, window_time, rownum])
-+- WindowRank(window=[CUMULATE(win_start=[window_start], win_end=[window_end], 
max_size=[1 h], step=[10 min])], rankType=[ROW_NUMBER], rankRange=[rankStart=1, 
rankEnd=3], partitionBy=[a], orderBy=[b DESC], select=[a, b, c, d, e, rowtime, 
proctime, window_start, window_end, window_time, rownum])
+Calc(select=[a, b, c, d, e, rowtime, PROCTIME_MATERIALIZE(proctime) AS 
proctime, window_start, window_end, window_time, w0$o0])

Review comment:
       The xml is changed because after apply ProjectToWindowRule, the new 
generated LogicalWindow contains a field which name is w0$o0

##########
File path: 
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/IntervalJoinTest.xml
##########
@@ -42,7 +42,7 @@ LogicalProject(b=[$1], aSum=[$2], bCnt=[$3])
 GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow('w$, rowtime, 
21600000)], select=[b, SUM(a0) AS aSum, COUNT(b0) AS bCnt])
 +- Exchange(distribution=[hash[b]])
    +- Calc(select=[rowtime, b, a0, b0])
-      +- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, 
leftLowerBound=-600000, leftUpperBound=3600000, leftTimeIndex=2, 
rightTimeIndex=2], where=[((a = a0) AND (CAST(rowtime) >= (CAST(rowtime0) - 
600000:INTERVAL MINUTE)) AND (CAST(rowtime) <= (CAST(rowtime0) + 
3600000:INTERVAL HOUR)))], select=[a, b, rowtime, a0, b0, rowtime0])
+      +- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, 
leftLowerBound=-600000, leftUpperBound=3600000, leftTimeIndex=2, 
rightTimeIndex=2], where=[((a = a0) AND (rowtime >= (rowtime0 - 600000:INTERVAL 
MINUTE)) AND (rowtime <= (rowtime0 + 3600000:INTERVAL HOUR)))], select=[a, b, 
rowtime, a0, b0, rowtime0])

Review comment:
       The xml is changed because after predicate_pushdown phase, the filter 
would be pushed down into join condition, while join condition conversion 
logical in RelTimeIndicatorConverter is different with filter condition 
conversion in the previous version.

##########
File path: 
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
##########
@@ -138,11 +138,11 @@ Calc(select=[a, b, $f3 AS EXPR$2, $f4 AS EXPR$3, $f5 AS 
uv])
       +- Calc(select=[a, b, window_start, window_end, $e, $f8, $f9, $f5 AS 
$f7, $f6 AS $f8_0, $f7 AS $f9_0])
          +- WindowAggregate(groupBy=[a, b, $e, $f8, $f9], 
window=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[15 min])], 
select=[a, b, $e, $f8, $f9, COUNT(*) FILTER $g_3 AS $f5, MAX(d) FILTER $g_1 AS 
$f6, COUNT(DISTINCT window_time) FILTER $g_2 AS $f7, start('w$) AS 
window_start, end('w$) AS window_end])
             +- Exchange(distribution=[hash[a, b, $e, $f8, $f9]])
-               +- Calc(select=[a, b, window_start, window_end, d, $f5, 
window_time, $e, $f8, $f9, =($e_0, 3) AS $g_3, AND(=($e_0, 1), $f5) AS $g_1, 
=($e_0, 2) AS $g_2])
+               +- Calc(select=[a, b, window_start, window_end, d, $f5, 
CAST(window_time) AS window_time, $e, $f8, $f9, =($e_0, 3) AS $g_3, AND(=($e_0, 
1), $f5) AS $g_1, =($e_0, 2) AS $g_2])

Review comment:
       The xml is changed, because after project_rewrite phase, the identify 
project is removed.

##########
File path: 
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.xml
##########
@@ -16,6 +16,37 @@ See the License for the specific language governing 
permissions and
 limitations under the License.
 -->
 <Root>
+  <TestCase name="testAggFilterClauseBothWithAvgAndCount">

Review comment:
       The xml is changed because after `logical` phase, a `FlinkLogicalCalc` 
which rowtype of it's input node is different with the inputRowType of it's 
RexProgram would be generated.

##########
File path: 
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/api/stream/ExplainTest.xml
##########
@@ -433,7 +433,7 @@ 
LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], fields=[a,
 +- GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow('w$, 
rowtime, 8000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1], 
changelogMode=[I]): rowcount = , cumulative cost = {rows, cpu, io, network, 
memory}
    +- Exchange(distribution=[hash[id1]], changelogMode=[I]): rowcount = , 
cumulative cost = {rows, cpu, io, network, memory}
       +- Calc(select=[id1, rowtime, text, _UTF-16LE'#' AS $f3], 
changelogMode=[I]): rowcount = , cumulative cost = {rows, cpu, io, network, 
memory}
-         +- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, 
leftLowerBound=-299999, leftUpperBound=179999, leftTimeIndex=2, 
rightTimeIndex=4], where=[AND(=(id1, id2), >(CAST(rowtime), -(CAST(rowtime0), 
300000:INTERVAL MINUTE)), <(CAST(rowtime), +(CAST(rowtime0), 180000:INTERVAL 
MINUTE)))], select=[id1, text, rowtime, id2, cnt, name, goods, rowtime0], 
changelogMode=[I]): rowcount = , cumulative cost = {rows, cpu, io, network, 
memory}
+         +- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, 
leftLowerBound=-299999, leftUpperBound=179999, leftTimeIndex=2, 
rightTimeIndex=4], where=[AND(=(id1, id2), >(rowtime, -(rowtime0, 
300000:INTERVAL MINUTE)), <(rowtime, +(rowtime0, 180000:INTERVAL MINUTE)))], 
select=[id1, text, rowtime, id2, cnt, name, goods, rowtime0], 
changelogMode=[I]): rowcount = , cumulative cost = {rows, cpu, io, network, 
memory}

Review comment:
       The xml is changed because after `predicate_pushdown` phase, the filter 
would be pushed down into join condition, while join condition conversion 
logical in `RelTimeIndicatorConverter` is different with filter condition 
conversion in the previous version.
   

##########
File path: 
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/IntervalJoinTest.xml
##########
@@ -314,12 +314,12 @@ LogicalProject(a=[$0], b=[$6])
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, b])
-+- Join(joinType=[InnerJoin], where=[((a = a0) AND ($f5 = $f50))], select=[a, 
$f5, a0, b, $f50], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
-   :- Exchange(distribution=[hash[a, $f5]])
-   :  +- Calc(select=[a, PROCTIME_MATERIALIZE(proctime) AS $f5])
++- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=false, 
leftLowerBound=0, leftUpperBound=0, leftTimeIndex=1, rightTimeIndex=2], 
where=[((a = a0) AND (proctime = proctime0))], select=[a, proctime, a0, b, 
proctime0])

Review comment:
       The xml is changed because after refactor time indicator 
materialization, the two case would be translated to IntervalJoin as expected.

##########
File path: 
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/IntervalJoinTest.xml
##########
@@ -440,7 +440,7 @@ LogicalProject(b=[$1], aSum=[$2], bCnt=[$3])
 GroupWindowAggregate(groupBy=[b0], window=[TumblingGroupWindow('w$, rowtime0, 
21600000)], select=[b0, SUM(a) AS aSum, COUNT(b) AS bCnt])
 +- Exchange(distribution=[hash[b0]])
    +- Calc(select=[rowtime0, b0, a, b])
-      +- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, 
leftLowerBound=-600000, leftUpperBound=3600000, leftTimeIndex=2, 
rightTimeIndex=2], where=[((a = a0) AND (CAST(rowtime) >= (CAST(rowtime0) - 
600000:INTERVAL MINUTE)) AND (CAST(rowtime) <= (CAST(rowtime0) + 
3600000:INTERVAL HOUR)))], select=[a, b, rowtime, a0, b0, rowtime0])
+      +- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, 
leftLowerBound=-600000, leftUpperBound=3600000, leftTimeIndex=2, 
rightTimeIndex=2], where=[((a = a0) AND (rowtime >= (rowtime0 - 600000:INTERVAL 
MINUTE)) AND (rowtime <= (rowtime0 + 3600000:INTERVAL HOUR)))], select=[a, b, 
rowtime, a0, b0, rowtime0])

Review comment:
       The xml is changed because after predicate_pushdown phase, the filter 
would be pushed down into join condition, while join condition conversion 
logical in RelTimeIndicatorConverter is different with filter condition 
conversion in the previous version.
   
   

##########
File path: 
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/TemporalJoinTest.xml
##########
@@ -522,7 +522,7 @@ LogicalProject(amount=[$0], currency=[$1], rowtime=[$2], 
proctime=[$3], currency
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Calc(select=[amount, currency, rowtime, PROCTIME_MATERIALIZE(proctime) AS 
proctime, currency0, rate, PROCTIME_MATERIALIZE(proctime0) AS proctime0])
+Calc(select=[amount, currency, rowtime, PROCTIME_MATERIALIZE(proctime) AS 
proctime, currency0, rate, PROCTIME_MATERIALIZE($2) AS $2])

Review comment:
       The xml is changed, because after project_rewrite phase, the identify 
project is removed.
   
   

##########
File path: 
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/IntervalJoinTest.xml
##########
@@ -107,12 +107,12 @@ LogicalProject(a=[$0], b=[$6])
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, b])
-+- Join(joinType=[InnerJoin], where=[((a = a0) AND (rowtime0 = rowtime00))], 
select=[a, rowtime0, a0, b, rowtime00], leftInputSpec=[NoUniqueKey], 
rightInputSpec=[NoUniqueKey])
-   :- Exchange(distribution=[hash[a, rowtime0]])
-   :  +- Calc(select=[a, CAST(rowtime) AS rowtime0])
++- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, 
leftLowerBound=0, leftUpperBound=0, leftTimeIndex=1, rightTimeIndex=2], 
where=[((a = a0) AND (rowtime = rowtime0))], select=[a, rowtime, a0, b, 
rowtime0])

Review comment:
       The xml is changed because after refactor time indicator 
materialization, the two case would be translated to `IntervalJoin` as expected.

##########
File path: 
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml
##########
@@ -357,7 +357,7 @@ 
LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], fields=[a,
          +- GroupWindowAggregate(groupBy=[id1], 
window=[TumblingGroupWindow('w$, rowtime, 6000)], properties=[w$start, w$end, 
w$rowtime, w$proctime], select=[id1, LISTAGG(text, $f3) AS text, start('w$) AS 
w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS 
w$proctime])
             +- Exchange(distribution=[hash[id1]])
                +- Calc(select=[rowtime, id1, text, _UTF-16LE'#' AS $f3])
-                  +- IntervalJoin(joinType=[InnerJoin], 
windowBounds=[isRowTime=true, leftLowerBound=-299999, leftUpperBound=179999, 
leftTimeIndex=1, rightTimeIndex=1], where=[AND(=(id1, id2), >(CAST(rowtime), 
-(CAST(rowtime0), 300000:INTERVAL MINUTE)), <(CAST(rowtime), +(CAST(rowtime0), 
180000:INTERVAL MINUTE)))], select=[id1, rowtime, text, id2, rowtime0, cnt, 
name, goods])
+                  +- IntervalJoin(joinType=[InnerJoin], 
windowBounds=[isRowTime=true, leftLowerBound=-299999, leftUpperBound=179999, 
leftTimeIndex=1, rightTimeIndex=1], where=[AND(=(id1, id2), >(rowtime, 
-(rowtime0, 300000:INTERVAL MINUTE)), <(rowtime, +(rowtime0, 180000:INTERVAL 
MINUTE)))], select=[id1, rowtime, text, id2, rowtime0, cnt, name, goods])

Review comment:
       The xml is changed because after predicate_pushdown phase, the filter 
would be pushed down into join condition, while join condition conversion 
logical in RelTimeIndicatorConverter is different with filter condition 
conversion in the previous version.

##########
File path: 
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/JoinTest.xml
##########
@@ -70,7 +70,7 @@ LogicalFilter(condition=[AND(=($0, $4), >=($3, -($7, 
300000:INTERVAL DAY TO SECO
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, 
leftLowerBound=-300000, leftUpperBound=-1, leftTimeIndex=3, rightTimeIndex=3], 
where=[((a = d) AND (CAST(lrtime) >= (CAST(rrtime) - 300000:INTERVAL DAY TO 
SECOND)) AND (CAST(lrtime) < CAST(rrtime)) AND (CAST(lrtime) > f))], select=[a, 
b, c, lrtime, d, e, f, rrtime])
+IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, 
leftLowerBound=-300000, leftUpperBound=-1, leftTimeIndex=3, rightTimeIndex=3], 
where=[((a = d) AND (lrtime >= (rrtime - 300000:INTERVAL DAY TO SECOND)) AND 
(lrtime < rrtime) AND (lrtime > f))], select=[a, b, c, lrtime, d, e, f, rrtime])

Review comment:
       The xml is changed because after predicate_pushdown phase, the filter 
would be pushed down into join condition, while join condition conversion 
logical in RelTimeIndicatorConverter is different with filter condition 
conversion in the previous version.
   
   

##########
File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/IntervalJoinTest.scala
##########
@@ -242,7 +241,6 @@ class IntervalJoinTest extends TableTestBase {
 
   @Test
   def testJoinWithEquiRowTime(): Unit = {
-    // TODO: this should be translated into window join

Review comment:
       done

##########
File path: 
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/UnionTest.xml
##########
@@ -16,6 +16,42 @@ See the License for the specific language governing 
permissions and
 limitations under the License.
 -->
 <Root>
+  <TestCase name="testUnionDiffRowTime">

Review comment:
       The xml is changed because the plan of  `UnionTest#testUnionDiffRowTime` 
could be generated now. 
   In `DEFAULT_REWRITE` phase, after apply 
`CoerceInputsRule(classOf[LogicalUnion]`, optimizer would  pre-casts inputs to 
a particular type to ensure union set operator have the same row type.

##########
File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/UnionTest.scala
##########
@@ -112,9 +112,6 @@ class UnionTest extends TableTestBase {
 
   @Test
   def testUnionDiffRowTime(): Unit = {
-    expectedException.expectMessage(

Review comment:
       UnionTest#testUnionDiffRowTime could work fine now.
   In DEFAULT_REWRITE phase, after apply 
CoerceInputsRule(classOf[LogicalUnion], optimizer would pre-casts inputs to a 
particular type to ensure union set operator have the same row type.

##########
File path: 
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/IntervalJoinTest.xml
##########
@@ -557,7 +557,7 @@ LogicalProject(a=[$0], b=[$6])
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, b])
-+- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, 
leftLowerBound=-600000, leftUpperBound=3600000, leftTimeIndex=1, 
rightTimeIndex=2], where=[((a = a0) AND (CAST(rowtime) >= (CAST(rowtime0) - 
600000:INTERVAL MINUTE)) AND (CAST(rowtime) <= (CAST(rowtime0) + 
3600000:INTERVAL HOUR)))], select=[a, rowtime, a0, b, rowtime0])
++- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, 
leftLowerBound=-600000, leftUpperBound=3600000, leftTimeIndex=1, 
rightTimeIndex=2], where=[((a = a0) AND (rowtime >= (rowtime0 - 600000:INTERVAL 
MINUTE)) AND (rowtime <= (rowtime0 + 3600000:INTERVAL HOUR)))], select=[a, 
rowtime, a0, b, rowtime0])

Review comment:
       The xml is changed because after predicate_pushdown phase, the filter 
would be pushed down into join condition, while join condition conversion 
logical in RelTimeIndicatorConverter is different with filter condition 
conversion in the previous version.
   
   

##########
File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/TemporalJoinRewriteWithUniqueKeyRuleTest.scala
##########
@@ -35,7 +35,7 @@ class TemporalJoinRewriteWithUniqueKeyRuleTest extends 
TableTestBase {
 
   @Before
   def setup(): Unit = {
-    util.buildStreamProgram(LOGICAL_REWRITE)
+    util.buildStreamProgram(PHYSICAL)

Review comment:
       To include `TIME_INDICATOR` phase




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to