[ https://issues.apache.org/jira/browse/FLINK-31013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17690177#comment-17690177 ]
Jane Chan commented on FLINK-31013: ----------------------------------- [~xzw0223] What's your point? According to the table schema ||id||type||date|| |1|T1|2018-01-24 00:00:00.000| |2|T1|2018-01-26 00:00:00.000| |1|T2|2018-01-28 00:00:00.000| |1|T2|2018-01-28 00:00:00.000| And the query with a session window gap of 1 DAY, the first two windows should be triggered. {code:sql} select `id`, `type`, COUNT(1) as event_cnt, session_start(`date`, interval '1' DAY) as ss, session_end(`date`, interval '1' DAY) as se from events group by `id`, `type`, session(`date`, interval '1' DAY); {code} Expected Output ||id||type||event_cnt||ss||se|| |1|T1|1|2018-01-24 00:00:00.000|2018-01-25 00:00:00.000| |2|T1|1|2018-01-26 00:00:00.000|2018-01-27 00:00:00.000| Table Store Actual Output No output > Session window aggregation cannot trigger window using event time > ----------------------------------------------------------------- > > Key: FLINK-31013 > URL: https://issues.apache.org/jira/browse/FLINK-31013 > Project: Flink > Issue Type: Bug > Components: Table Store > Affects Versions: table-store-0.4.0 > Reporter: Jane Chan > Priority: Major > Fix For: table-store-0.4.0 > > > {code:sql} > -- test against Flink 1.16.0 > create catalog fscat with ( > 'type' = 'table-store', > 'warehouse' = 'file:///tmp/fscat' > ); > use catalog fscat; > create table events ( > `id` int, > `type` string, > `date` TIMESTAMP(3), > watermark for `date` AS `date`); > > insert into events > values (1, 'T1', to_timestamp('2018-01-24', 'yyyy-MM-dd')), > (2, 'T1', to_timestamp('2018-01-26', 'yyyy-MM-dd')), > (1, 'T2', to_timestamp('2018-01-28', 'yyyy-MM-dd')), > (1, 'T2', to_timestamp('2018-01-28', 'yyyy-MM-dd')); > -- no output > select `id`, > `type`, > COUNT(1) as event_cnt, > session_start(`date`, interval '1' DAY) as ss, > session_end(`date`, interval '1' DAY) as se > from events group by `id`, `type`, session(`date`, interval '1' DAY); > -- explain plan > == Abstract Syntax Tree == > LogicalProject(id=[$0], type=[$1], event_cnt=[$3], ss=[SESSION_START($2)], > se=[SESSION_END($2)]) > +- LogicalAggregate(group=[{0, 1, 2}], event_cnt=[COUNT()]) > +- LogicalProject(id=[$0], type=[$1], $f2=[$SESSION($2, 86400000:INTERVAL > DAY)]) > +- LogicalWatermarkAssigner(rowtime=[date], watermark=[$2]) > +- LogicalTableScan(table=[[fscat, default, events]]) > == Optimized Physical Plan == > Calc(select=[id, type, event_cnt, w$start AS ss, w$end AS se]) > +- GroupWindowAggregate(groupBy=[id, type], window=[SessionGroupWindow('w$, > date, 86400000)], properties=[w$start, w$end, w$rowtime, w$proctime], > select=[id, type, COUNT(*) AS event_cnt, start('w$) AS w$start, end('w$) AS > w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]) > +- Exchange(distribution=[hash[id, type]]) > +- TableSourceScan(table=[[fscat, default, events, watermark=[date]]], > fields=[id, type, date]) > == Optimized Execution Plan == > Calc(select=[id, type, event_cnt, w$start AS ss, w$end AS se]) > +- GroupWindowAggregate(groupBy=[id, type], window=[SessionGroupWindow('w$, > date, 86400000)], properties=[w$start, w$end, w$rowtime, w$proctime], > select=[id, type, COUNT(*) AS event_cnt, start('w$) AS w$start, end('w$) AS > w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]) > +- Exchange(distribution=[hash[id, type]]) > +- TableSourceScan(table=[[fscat, default, events, watermark=[date]]], > fields=[id, type, date]) > -- however, if switch to filesystem source, the window can be triggered > normally > CREATE TEMPORARY TABLE `fscat`.`default`.`event_file_source` ( > `id` INT, > `type` VARCHAR(2147483647), > `date` TIMESTAMP(3), > WATERMARK FOR `date` AS `date` > ) WITH ( > 'format' = 'csv', > 'path' = '/tmp/events.csv', > 'source.monitor-interval' = '1 min', > 'connector' = 'filesystem' > ); > // cat events.csv > 1,T1,2018-01-24 00:00:00.000 > 2,T1,2018-01-26 00:00:00.000 > 1,T2,2018-01-28 00:00:00.000 > 1,T2,2018-01-28 00:00:00.000 > -- same query using filesystem source > select `id`, `type`, COUNT(1) as event_cnt, session_start(`date`, interval > '1' DAY) as ss, session_end(`date`, interval '1' DAY) as se from > event_file_source group by `id`, `type`, session(`date`, interval '1' DAY); > -- output > id type event_cnt > ss se > 1 T1 1 2018-01-24 > 00:00:00.000 2018-01-25 00:00:00.000 > 2 T1 1 2018-01-26 > 00:00:00.000 2018-01-27 00:00:00.000{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)