[ https://issues.apache.org/jira/browse/FLINK-31013?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jane Chan updated FLINK-31013: ------------------------------ Description: {code:java} -- test against Flink 1.16.0 create catalog fscat with ( 'type' = 'table-store', 'warehouse' = 'file:///tmp/fscat' ); use catalog fscat; create table events ( `id` int, `type` string, `date` TIMESTAMP(3), watermark for `date` AS `date`); insert into events values (1, 'T1', to_timestamp('2018-01-24', 'yyyy-MM-dd')), (2, 'T1', to_timestamp('2018-01-26', 'yyyy-MM-dd')), (1, 'T2', to_timestamp('2018-01-28', 'yyyy-MM-dd')), (1, 'T2', to_timestamp('2018-01-28', 'yyyy-MM-dd')); -- no output select `id`, `type`, COUNT(1) as event_cnt, session_start(`date`, interval '1' DAY) as ss, session_end(`date`, interval '1' DAY) as se from events group by `id`, `type`, session(`date`, interval '1' DAY); -- explain plan == Abstract Syntax Tree == LogicalProject(id=[$0], type=[$1], event_cnt=[$3], ss=[SESSION_START($2)], se=[SESSION_END($2)]) +- LogicalAggregate(group=[{0, 1, 2}], event_cnt=[COUNT()]) +- LogicalProject(id=[$0], type=[$1], $f2=[$SESSION($2, 86400000:INTERVAL DAY)]) +- LogicalWatermarkAssigner(rowtime=[date], watermark=[$2]) +- LogicalTableScan(table=[[fscat, default, events]]) == Optimized Physical Plan == Calc(select=[id, type, event_cnt, w$start AS ss, w$end AS se]) +- GroupWindowAggregate(groupBy=[id, type], window=[SessionGroupWindow('w$, date, 86400000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[id, type, COUNT(*) AS event_cnt, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]) +- Exchange(distribution=[hash[id, type]]) +- TableSourceScan(table=[[fscat, default, events, watermark=[date]]], fields=[id, type, date]) == Optimized Execution Plan == Calc(select=[id, type, event_cnt, w$start AS ss, w$end AS se]) +- GroupWindowAggregate(groupBy=[id, type], window=[SessionGroupWindow('w$, date, 86400000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[id, type, COUNT(*) AS event_cnt, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]) +- Exchange(distribution=[hash[id, type]]) +- TableSourceScan(table=[[fscat, default, events, watermark=[date]]], fields=[id, type, date]) -- however, if switch to filesystem source, the window can be triggered normally CREATE TEMPORARY TABLE `fscat`.`default`.`event_file_source` ( `id` INT, `type` VARCHAR(2147483647), `date` TIMESTAMP(3), WATERMARK FOR `date` AS `date` ) WITH ( 'format' = 'csv', 'path' = '/tmp/events.csv', 'source.monitor-interval' = '1 min', 'connector' = 'filesystem' ); // cat events.csv 1,T1,2018-01-24 00:00:00.000 2,T1,2018-01-26 00:00:00.000 1,T2,2018-01-28 00:00:00.000 1,T2,2018-01-28 00:00:00.000 -- same query using filesystem source select `id`, `type`, COUNT(1) as event_cnt, session_start(`date`, interval '1' DAY) as ss, session_end(`date`, interval '1' DAY) as se from event_file_source group by `id`, `type`, session(`date`, interval '1' DAY); -- output id type event_cnt ss se 1 T1 1 2018-01-24 00:00:00.000 2018-01-25 00:00:00.000 2 T1 1 2018-01-26 00:00:00.000 2018-01-27 00:00:00.000{code} was: {code:java} -- test against Flink 1.16.0 create catalog fscat with ( 'type' = 'table-store', 'warehouse' = 'file:///tmp/fscat' ); use catalog fscat; create table events ( `id` int, `type` string, `date` TIMESTAMP(3), watermark for `date` AS `date`); insert into events values (1, 'T1', to_timestamp('2018-01-24', 'yyyy-MM-dd')), (2, 'T1', to_timestamp('2018-01-26', 'yyyy-MM-dd')), (1, 'T2', to_timestamp('2018-01-28', 'yyyy-MM-dd')), (1, 'T2', to_timestamp('2018-01-28', 'yyyy-MM-dd')); -- no output select `id`, `type`, COUNT(1) as event_cnt, session_start(`date`, interval '1' DAY) as ss, session_end(`date`, interval '1' DAY) as se from events group by `id`, `type`, session(`date`, interval '1' DAY); {code} > Session window aggregation cannot trigger window using event time > ----------------------------------------------------------------- > > Key: FLINK-31013 > URL: https://issues.apache.org/jira/browse/FLINK-31013 > Project: Flink > Issue Type: Bug > Components: Table Store > Affects Versions: table-store-0.4.0 > Reporter: Jane Chan > Priority: Major > Fix For: table-store-0.4.0 > > > {code:java} > -- test against Flink 1.16.0 > create catalog fscat with ( > 'type' = 'table-store', > 'warehouse' = 'file:///tmp/fscat' > ); > use catalog fscat; > create table events ( > `id` int, > `type` string, > `date` TIMESTAMP(3), > watermark for `date` AS `date`); > > insert into events > values (1, 'T1', to_timestamp('2018-01-24', 'yyyy-MM-dd')), > (2, 'T1', to_timestamp('2018-01-26', 'yyyy-MM-dd')), > (1, 'T2', to_timestamp('2018-01-28', 'yyyy-MM-dd')), > (1, 'T2', to_timestamp('2018-01-28', 'yyyy-MM-dd')); > -- no output > select `id`, > `type`, > COUNT(1) as event_cnt, > session_start(`date`, interval '1' DAY) as ss, > session_end(`date`, interval '1' DAY) as se > from events group by `id`, `type`, session(`date`, interval '1' DAY); > -- explain plan > == Abstract Syntax Tree == > LogicalProject(id=[$0], type=[$1], event_cnt=[$3], ss=[SESSION_START($2)], > se=[SESSION_END($2)]) > +- LogicalAggregate(group=[{0, 1, 2}], event_cnt=[COUNT()]) > +- LogicalProject(id=[$0], type=[$1], $f2=[$SESSION($2, 86400000:INTERVAL > DAY)]) > +- LogicalWatermarkAssigner(rowtime=[date], watermark=[$2]) > +- LogicalTableScan(table=[[fscat, default, events]]) > == Optimized Physical Plan == > Calc(select=[id, type, event_cnt, w$start AS ss, w$end AS se]) > +- GroupWindowAggregate(groupBy=[id, type], window=[SessionGroupWindow('w$, > date, 86400000)], properties=[w$start, w$end, w$rowtime, w$proctime], > select=[id, type, COUNT(*) AS event_cnt, start('w$) AS w$start, end('w$) AS > w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]) > +- Exchange(distribution=[hash[id, type]]) > +- TableSourceScan(table=[[fscat, default, events, watermark=[date]]], > fields=[id, type, date]) > == Optimized Execution Plan == > Calc(select=[id, type, event_cnt, w$start AS ss, w$end AS se]) > +- GroupWindowAggregate(groupBy=[id, type], window=[SessionGroupWindow('w$, > date, 86400000)], properties=[w$start, w$end, w$rowtime, w$proctime], > select=[id, type, COUNT(*) AS event_cnt, start('w$) AS w$start, end('w$) AS > w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]) > +- Exchange(distribution=[hash[id, type]]) > +- TableSourceScan(table=[[fscat, default, events, watermark=[date]]], > fields=[id, type, date]) > -- however, if switch to filesystem source, the window can be triggered > normally > CREATE TEMPORARY TABLE `fscat`.`default`.`event_file_source` ( > `id` INT, > `type` VARCHAR(2147483647), > `date` TIMESTAMP(3), > WATERMARK FOR `date` AS `date` > ) WITH ( > 'format' = 'csv', > 'path' = '/tmp/events.csv', > 'source.monitor-interval' = '1 min', > 'connector' = 'filesystem' > ); > // cat events.csv > 1,T1,2018-01-24 00:00:00.000 > 2,T1,2018-01-26 00:00:00.000 > 1,T2,2018-01-28 00:00:00.000 > 1,T2,2018-01-28 00:00:00.000 > -- same query using filesystem source > select `id`, `type`, COUNT(1) as event_cnt, session_start(`date`, interval > '1' DAY) as ss, session_end(`date`, interval '1' DAY) as se from > event_file_source group by `id`, `type`, session(`date`, interval '1' DAY); > -- output > id type event_cnt > ss se > 1 T1 1 2018-01-24 > 00:00:00.000 2018-01-25 00:00:00.000 > 2 T1 1 2018-01-26 > 00:00:00.000 2018-01-27 00:00:00.000{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)