xuyangzhong commented on code in PR #24162:
URL: https://github.com/apache/flink/pull/24162#discussion_r1467143192


##########
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml:
##########
@@ -2170,6 +2360,386 @@ Sink(table=[default_catalog.default_database.sink], 
fields=[ws, we, b, c])
       +- Exchange(distribution=[hash[b]])
          +- Calc(select=[b, CASE((a = 11), 1, null:INTEGER) AS $f3, PROCTIME() 
AS proctime], where=[SEARCH(a, Sarg[1, 5, 7, 9, 11])])
             +- TableSourceScan(table=[[default_catalog, default_database, 
source, project=[a, b], metadata=[]]], fields=[a, b])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase 
name="testSession_DistinctSplitEnabled[aggPhaseEnforcer=ONE_PHASE]">
+    <Resource name="sql">
+      <![CDATA[
+SELECT
+   a,
+   window_start,
+   window_end,
+   count(*),
+   sum(d),
+   max(d) filter (where b > 1000),
+   count(distinct c) AS uv
+FROM TABLE(
+  SESSION(TABLE MyTable PARTITION BY a, DESCRIPTOR(proctime), INTERVAL '5' 
MINUTE))
+GROUP BY a, window_start, window_end
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()], EXPR$4=[SUM($3)], 
EXPR$5=[MAX($3) FILTER $4], uv=[COUNT(DISTINCT $5)])
++- LogicalProject(a=[$0], window_start=[$7], window_end=[$8], d=[$3], $f4=[IS 
TRUE(>($1, 1000))], c=[$2])
+   +- LogicalTableFunctionScan(invocation=[SESSION(PARTITION BY($0), 
DESCRIPTOR($6), 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT 
b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* 
rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, 
TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
+      +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], 
proctime=[$6])
+         +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 
1000:INTERVAL SECOND)])
+            +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[PROCTIME()])
+               +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[a, window_start, window_end, EXPR$3, EXPR$4, EXPR$5, uv])
++- WindowAggregate(groupBy=[a], window=[SESSION(time_col=[proctime], gap=[5 
min], partition keys=[a])], select=[a, COUNT(*) AS EXPR$3, SUM(d) AS EXPR$4, 
MAX(d) FILTER $f4 AS EXPR$5, COUNT(DISTINCT c) AS uv, start('w$) AS 
window_start, end('w$) AS window_end])
+   +- Exchange(distribution=[hash[a]])
+      +- Calc(select=[a, d, IS TRUE(>(b, 1000)) AS $f4, c, proctime])
+         +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
+            +- Calc(select=[a, b, c, d, PROCTIME() AS proctime, rowtime])
+               +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable, project=[a, b, c, d, rowtime], metadata=[]]], fields=[a, b, c, d, 
rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase 
name="testSession_DistinctSplitEnabled[aggPhaseEnforcer=TWO_PHASE]">
+    <Resource name="sql">
+      <![CDATA[
+SELECT
+   a,
+   window_start,
+   window_end,
+   count(*),
+   sum(d),
+   max(d) filter (where b > 1000),
+   count(distinct c) AS uv
+FROM TABLE(
+  SESSION(TABLE MyTable PARTITION BY a, DESCRIPTOR(proctime), INTERVAL '5' 
MINUTE))
+GROUP BY a, window_start, window_end
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()], EXPR$4=[SUM($3)], 
EXPR$5=[MAX($3) FILTER $4], uv=[COUNT(DISTINCT $5)])
++- LogicalProject(a=[$0], window_start=[$7], window_end=[$8], d=[$3], $f4=[IS 
TRUE(>($1, 1000))], c=[$2])
+   +- LogicalTableFunctionScan(invocation=[SESSION(PARTITION BY($0), 
DESCRIPTOR($6), 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT 
b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* 
rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, 
TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
+      +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], 
proctime=[$6])
+         +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 
1000:INTERVAL SECOND)])
+            +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[PROCTIME()])
+               +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[a, window_start, window_end, EXPR$3, EXPR$4, EXPR$5, uv])
++- WindowAggregate(groupBy=[a], window=[SESSION(time_col=[proctime], gap=[5 
min], partition keys=[a])], select=[a, COUNT(*) AS EXPR$3, SUM(d) AS EXPR$4, 
MAX(d) FILTER $f4 AS EXPR$5, COUNT(DISTINCT c) AS uv, start('w$) AS 
window_start, end('w$) AS window_end])
+   +- Exchange(distribution=[hash[a]])
+      +- Calc(select=[a, d, IS TRUE(>(b, 1000)) AS $f4, c, proctime])
+         +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
+            +- Calc(select=[a, b, c, d, PROCTIME() AS proctime, rowtime])
+               +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable, project=[a, b, c, d, rowtime], metadata=[]]], fields=[a, b, c, d, 
rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testSession_OnProctime[aggPhaseEnforcer=TWO_PHASE]">
+    <Resource name="sql">
+      <![CDATA[
+SELECT
+   a,
+   window_start,
+   window_end,
+   count(*),
+   sum(d),
+   max(d) filter (where b > 1000),
+   weightedAvg(b, e) AS wAvg,
+   count(distinct c) AS uv
+FROM TABLE(
+  SESSION(TABLE MyTable PARTITION BY a, DESCRIPTOR(proctime), INTERVAL '5' 
MINUTE))
+GROUP BY a, window_start, window_end
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()], EXPR$4=[SUM($3)], 
EXPR$5=[MAX($3) FILTER $4], wAvg=[weightedAvg($5, $6)], uv=[COUNT(DISTINCT $7)])
++- LogicalProject(a=[$0], window_start=[$7], window_end=[$8], d=[$3], $f4=[IS 
TRUE(>($1, 1000))], b=[$1], e=[$4], c=[$2])
+   +- LogicalTableFunctionScan(invocation=[SESSION(PARTITION BY($0), 
DESCRIPTOR($6), 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT 
b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* 
rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, 
TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
+      +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], 
proctime=[$6])
+         +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 
1000:INTERVAL SECOND)])
+            +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[PROCTIME()])
+               +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[a, window_start, window_end, EXPR$3, EXPR$4, EXPR$5, wAvg, uv])
++- WindowAggregate(groupBy=[a], window=[SESSION(time_col=[proctime], gap=[5 
min], partition keys=[a])], select=[a, COUNT(*) AS EXPR$3, SUM(d) AS EXPR$4, 
MAX(d) FILTER $f4 AS EXPR$5, weightedAvg(b, e) AS wAvg, COUNT(DISTINCT c) AS 
uv, start('w$) AS window_start, end('w$) AS window_end])
+   +- Exchange(distribution=[hash[a]])
+      +- Calc(select=[a, d, IS TRUE(>(b, 1000)) AS $f4, b, e, c, proctime])
+         +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
+            +- Calc(select=[a, b, c, d, e, rowtime, PROCTIME() AS proctime])
+               +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d, e, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testSession_OnRowtime[aggPhaseEnforcer=ONE_PHASE]">
+    <Resource name="sql">
+      <![CDATA[
+SELECT
+   a,
+   window_start,
+   window_end,
+   count(*),
+   sum(d),
+   max(d) filter (where b > 1000),
+   weightedAvg(b, e) AS wAvg,
+   count(distinct c) AS uv
+FROM TABLE(
+  SESSION(TABLE MyTable PARTITION BY a, DESCRIPTOR(rowtime), INTERVAL '5' 
MINUTE))
+GROUP BY a, window_start, window_end
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()], EXPR$4=[SUM($3)], 
EXPR$5=[MAX($3) FILTER $4], wAvg=[weightedAvg($5, $6)], uv=[COUNT(DISTINCT $7)])
++- LogicalProject(a=[$0], window_start=[$7], window_end=[$8], d=[$3], $f4=[IS 
TRUE(>($1, 1000))], b=[$1], e=[$4], c=[$2])
+   +- LogicalTableFunctionScan(invocation=[SESSION(PARTITION BY($0), 
DESCRIPTOR($5), 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT 
b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* 
rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, 
TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
+      +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], 
proctime=[$6])
+         +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 
1000:INTERVAL SECOND)])
+            +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[PROCTIME()])
+               +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[a, window_start, window_end, EXPR$3, EXPR$4, EXPR$5, wAvg, uv])
++- WindowAggregate(groupBy=[a], window=[SESSION(time_col=[rowtime], gap=[5 
min], partition keys=[a])], select=[a, COUNT(*) AS EXPR$3, SUM(d) AS EXPR$4, 
MAX(d) FILTER $f4 AS EXPR$5, weightedAvg(b, e) AS wAvg, COUNT(DISTINCT c) AS 
uv, start('w$) AS window_start, end('w$) AS window_end])
+   +- Exchange(distribution=[hash[a]])
+      +- Calc(select=[a, d, IS TRUE(>(b, 1000)) AS $f4, b, e, c, rowtime])
+         +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
+            +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d, e, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testSession_OnRowtime[aggPhaseEnforcer=TWO_PHASE]">
+    <Resource name="sql">
+      <![CDATA[
+SELECT
+   a,
+   window_start,
+   window_end,
+   count(*),
+   sum(d),
+   max(d) filter (where b > 1000),
+   weightedAvg(b, e) AS wAvg,
+   count(distinct c) AS uv
+FROM TABLE(
+  SESSION(TABLE MyTable PARTITION BY a, DESCRIPTOR(rowtime), INTERVAL '5' 
MINUTE))
+GROUP BY a, window_start, window_end
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()], EXPR$4=[SUM($3)], 
EXPR$5=[MAX($3) FILTER $4], wAvg=[weightedAvg($5, $6)], uv=[COUNT(DISTINCT $7)])
++- LogicalProject(a=[$0], window_start=[$7], window_end=[$8], d=[$3], $f4=[IS 
TRUE(>($1, 1000))], b=[$1], e=[$4], c=[$2])
+   +- LogicalTableFunctionScan(invocation=[SESSION(PARTITION BY($0), 
DESCRIPTOR($5), 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT 
b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* 
rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, 
TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
+      +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], 
proctime=[$6])
+         +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 
1000:INTERVAL SECOND)])
+            +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[PROCTIME()])
+               +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[a, window_start, window_end, EXPR$3, EXPR$4, EXPR$5, wAvg, uv])
++- WindowAggregate(groupBy=[a], window=[SESSION(time_col=[rowtime], gap=[5 
min], partition keys=[a])], select=[a, COUNT(*) AS EXPR$3, SUM(d) AS EXPR$4, 
MAX(d) FILTER $f4 AS EXPR$5, weightedAvg(b, e) AS wAvg, COUNT(DISTINCT c) AS 
uv, start('w$) AS window_start, end('w$) AS window_end])
+   +- Exchange(distribution=[hash[a]])
+      +- Calc(select=[a, d, IS TRUE(>(b, 1000)) AS $f4, b, e, c, rowtime])
+         +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
+            +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d, e, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase 
name="testSessionWindowTVFWithoutPartitionKeyWhenCantMerge[aggPhaseEnforcer=ONE_PHASE]">
+    <Resource name="sql">
+      <![CDATA[
+SELECT
+   window_start,
+   window_end,
+   count(*),
+   sum(d),
+   max(d) filter (where b > 1000),
+   weightedAvg(b, e) AS wAvg,
+   count(distinct c) AS uv
+FROM (
+  SELECT window_start, rowtime, d, proctime, e, b, c, window_end, window_time, 
a
+  FROM TABLE(SESSION(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE))
+  WHERE window_start >= TIMESTAMP '2021-01-01 10:10:00.000'
+)
+GROUP BY window_start, window_end
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalAggregate(group=[{0, 1}], EXPR$2=[COUNT()], EXPR$3=[SUM($2)], 
EXPR$4=[MAX($2) FILTER $3], wAvg=[weightedAvg($4, $5)], uv=[COUNT(DISTINCT $6)])
++- LogicalProject(window_start=[$0], window_end=[$7], d=[$2], $f3=[IS 
TRUE(>($5, 1000))], b=[$5], e=[$4], c=[$6])
+   +- LogicalProject(window_start=[$7], rowtime=[$5], d=[$3], proctime=[$6], 
e=[$4], b=[$1], c=[$2], window_end=[$8], window_time=[$9], a=[$0])
+      +- LogicalFilter(condition=[>=($7, 2021-01-01 10:10:00)])
+         +- LogicalTableFunctionScan(invocation=[SESSION(DESCRIPTOR($5), 
300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, 
VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* 
rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, 
TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
+            +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[$6])
+               +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 
1000:INTERVAL SECOND)])
+                  +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[PROCTIME()])
+                     +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[window_start, window_end, EXPR$2, EXPR$3, EXPR$4, wAvg, uv])
++- WindowAggregate(window=[SESSION(win_start=[window_start], 
win_end=[window_end], gap=[5 min])], select=[COUNT(*) AS EXPR$2, SUM(d) AS 
EXPR$3, MAX(d) FILTER $f3 AS EXPR$4, weightedAvg(b, e) AS wAvg, COUNT(DISTINCT 
c) AS uv, start('w$) AS window_start, end('w$) AS window_end])
+   +- Exchange(distribution=[single])
+      +- Calc(select=[window_start, window_end, d, IS TRUE(>(b, 1000)) AS $f3, 
b, e, c], where=[>=(window_start, 2021-01-01 10:10:00)])
+         +- WindowTableFunction(window=[SESSION(time_col=[rowtime], gap=[5 
min])])
+            +- Exchange(distribution=[single])
+               +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
+                  +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable, project=[b, c, d, e, rowtime], metadata=[]]], 
fields=[b, c, d, e, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase 
name="testSessionWindowTVFWithoutPartitionKeyWhenCantMerge[aggPhaseEnforcer=TWO_PHASE]">
+    <Resource name="sql">
+      <![CDATA[
+SELECT
+   window_start,
+   window_end,
+   count(*),
+   sum(d),
+   max(d) filter (where b > 1000),
+   weightedAvg(b, e) AS wAvg,
+   count(distinct c) AS uv
+FROM (
+  SELECT window_start, rowtime, d, proctime, e, b, c, window_end, window_time, 
a
+  FROM TABLE(SESSION(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE))
+  WHERE window_start >= TIMESTAMP '2021-01-01 10:10:00.000'
+)
+GROUP BY window_start, window_end
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalAggregate(group=[{0, 1}], EXPR$2=[COUNT()], EXPR$3=[SUM($2)], 
EXPR$4=[MAX($2) FILTER $3], wAvg=[weightedAvg($4, $5)], uv=[COUNT(DISTINCT $6)])
++- LogicalProject(window_start=[$0], window_end=[$7], d=[$2], $f3=[IS 
TRUE(>($5, 1000))], b=[$5], e=[$4], c=[$6])
+   +- LogicalProject(window_start=[$7], rowtime=[$5], d=[$3], proctime=[$6], 
e=[$4], b=[$1], c=[$2], window_end=[$8], window_time=[$9], a=[$0])
+      +- LogicalFilter(condition=[>=($7, 2021-01-01 10:10:00)])
+         +- LogicalTableFunctionScan(invocation=[SESSION(DESCRIPTOR($5), 
300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, 
VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* 
rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, 
TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
+            +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[$6])
+               +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 
1000:INTERVAL SECOND)])
+                  +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[PROCTIME()])
+                     +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[window_start, window_end, EXPR$2, EXPR$3, EXPR$4, wAvg, uv])
++- WindowAggregate(window=[SESSION(win_start=[window_start], 
win_end=[window_end], gap=[5 min])], select=[COUNT(*) AS EXPR$2, SUM(d) AS 
EXPR$3, MAX(d) FILTER $f3 AS EXPR$4, weightedAvg(b, e) AS wAvg, COUNT(DISTINCT 
c) AS uv, start('w$) AS window_start, end('w$) AS window_end])
+   +- Exchange(distribution=[single])
+      +- Calc(select=[window_start, window_end, d, IS TRUE(>(b, 1000)) AS $f3, 
b, e, c], where=[>=(window_start, 2021-01-01 10:10:00)])
+         +- WindowTableFunction(window=[SESSION(time_col=[rowtime], gap=[5 
min])])
+            +- Exchange(distribution=[single])
+               +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
+                  +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable, project=[b, c, d, e, rowtime], metadata=[]]], 
fields=[b, c, d, e, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase 
name="testSessionWindowTVFWithPartitionKeyWhenCantMerge[aggPhaseEnforcer=ONE_PHASE]">
+    <Resource name="sql">
+      <![CDATA[
+SELECT
+   window_start,
+   window_end,
+   a,
+   count(*),
+   sum(d),
+   max(d) filter (where b > 1000),
+   weightedAvg(b, e) AS wAvg,
+   count(distinct c) AS uv
+FROM (
+  SELECT window_start, rowtime, d, proctime, e, b, c, window_end, window_time, 
a
+  FROM TABLE(SESSION(TABLE MyTable PARTITION BY a, DESCRIPTOR(rowtime), 
INTERVAL '5' MINUTE))
+  WHERE window_start >= TIMESTAMP '2021-01-01 10:10:00.000'
+)
+GROUP BY a, window_start, window_end
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(window_start=[$1], window_end=[$2], a=[$0], EXPR$3=[$3], 
EXPR$4=[$4], EXPR$5=[$5], wAvg=[$6], uv=[$7])
++- LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()], EXPR$4=[SUM($3)], 
EXPR$5=[MAX($3) FILTER $4], wAvg=[weightedAvg($5, $6)], uv=[COUNT(DISTINCT $7)])
+   +- LogicalProject(a=[$9], window_start=[$0], window_end=[$7], d=[$2], 
$f4=[IS TRUE(>($5, 1000))], b=[$5], e=[$4], c=[$6])
+      +- LogicalProject(window_start=[$7], rowtime=[$5], d=[$3], 
proctime=[$6], e=[$4], b=[$1], c=[$2], window_end=[$8], window_time=[$9], 
a=[$0])
+         +- LogicalFilter(condition=[>=($7, 2021-01-01 10:10:00)])
+            +- LogicalTableFunctionScan(invocation=[SESSION(PARTITION BY($0), 
DESCRIPTOR($5), 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT 
b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* 
rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, 
TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
+               +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[$6])
+                  +- LogicalWatermarkAssigner(rowtime=[rowtime], 
watermark=[-($5, 1000:INTERVAL SECOND)])
+                     +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[PROCTIME()])
+                        +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[window_start, window_end, a, EXPR$3, EXPR$4, EXPR$5, wAvg, uv])
++- WindowAggregate(groupBy=[a], window=[SESSION(win_start=[window_start], 
win_end=[window_end], gap=[5 min], partition keys=[a])], select=[a, COUNT(*) AS 
EXPR$3, SUM(d) AS EXPR$4, MAX(d) FILTER $f4 AS EXPR$5, weightedAvg(b, e) AS 
wAvg, COUNT(DISTINCT c) AS uv, start('w$) AS window_start, end('w$) AS 
window_end])
+   +- Exchange(distribution=[hash[a]])

Review Comment:
   Sure. https://issues.apache.org/jira/browse/FLINK-34238



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to