[ 
https://issues.apache.org/jira/browse/FLINK-31013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17690177#comment-17690177
 ] 

Jane Chan commented on FLINK-31013:
-----------------------------------

[~xzw0223] What's your point?


According to the table schema
||id||type||date||
|1|T1|2018-01-24 00:00:00.000|
|2|T1|2018-01-26 00:00:00.000|
|1|T2|2018-01-28 00:00:00.000|
|1|T2|2018-01-28 00:00:00.000|

 
And the query with a session window gap of 1 DAY, the first two windows should 
be triggered.
{code:sql}
select `id`,
    `type`, 
    COUNT(1) as event_cnt, 
    session_start(`date`, interval '1' DAY) as ss, 
    session_end(`date`, interval '1' DAY) as se 
from events group by `id`, `type`, session(`date`, interval '1' DAY); 
{code}
 

Expected Output
||id||type||event_cnt||ss||se||
|1|T1|1|2018-01-24 00:00:00.000|2018-01-25 00:00:00.000|
|2|T1|1|2018-01-26 00:00:00.000|2018-01-27 00:00:00.000|

 

Table Store Actual Output

No output

> Session window aggregation cannot trigger window using event time
> -----------------------------------------------------------------
>
>                 Key: FLINK-31013
>                 URL: https://issues.apache.org/jira/browse/FLINK-31013
>             Project: Flink
>          Issue Type: Bug
>          Components: Table Store
>    Affects Versions: table-store-0.4.0
>            Reporter: Jane Chan
>            Priority: Major
>             Fix For: table-store-0.4.0
>
>
> {code:sql}
> -- test against Flink 1.16.0
> create catalog fscat with (
>     'type' = 'table-store',
>     'warehouse' = 'file:///tmp/fscat'
> );
> use catalog fscat;
> create table events (
>   `id` int, 
>   `type` string, 
>   `date` TIMESTAMP(3), 
>   watermark for `date` AS `date`);
>   
> insert into events 
> values (1, 'T1', to_timestamp('2018-01-24', 'yyyy-MM-dd')), 
> (2, 'T1', to_timestamp('2018-01-26', 'yyyy-MM-dd')), 
> (1, 'T2', to_timestamp('2018-01-28', 'yyyy-MM-dd')), 
> (1, 'T2', to_timestamp('2018-01-28', 'yyyy-MM-dd'));  
> -- no output
> select `id`,
>     `type`, 
>     COUNT(1) as event_cnt, 
>     session_start(`date`, interval '1' DAY) as ss, 
>     session_end(`date`, interval '1' DAY) as se 
> from events group by `id`, `type`, session(`date`, interval '1' DAY); 
> -- explain plan
> == Abstract Syntax Tree ==
> LogicalProject(id=[$0], type=[$1], event_cnt=[$3], ss=[SESSION_START($2)], 
> se=[SESSION_END($2)])
> +- LogicalAggregate(group=[{0, 1, 2}], event_cnt=[COUNT()])
>    +- LogicalProject(id=[$0], type=[$1], $f2=[$SESSION($2, 86400000:INTERVAL 
> DAY)])
>       +- LogicalWatermarkAssigner(rowtime=[date], watermark=[$2])
>          +- LogicalTableScan(table=[[fscat, default, events]])
> == Optimized Physical Plan ==
> Calc(select=[id, type, event_cnt, w$start AS ss, w$end AS se])
> +- GroupWindowAggregate(groupBy=[id, type], window=[SessionGroupWindow('w$, 
> date, 86400000)], properties=[w$start, w$end, w$rowtime, w$proctime], 
> select=[id, type, COUNT(*) AS event_cnt, start('w$) AS w$start, end('w$) AS 
> w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
>    +- Exchange(distribution=[hash[id, type]])
>       +- TableSourceScan(table=[[fscat, default, events, watermark=[date]]], 
> fields=[id, type, date])
> == Optimized Execution Plan ==
> Calc(select=[id, type, event_cnt, w$start AS ss, w$end AS se])
> +- GroupWindowAggregate(groupBy=[id, type], window=[SessionGroupWindow('w$, 
> date, 86400000)], properties=[w$start, w$end, w$rowtime, w$proctime], 
> select=[id, type, COUNT(*) AS event_cnt, start('w$) AS w$start, end('w$) AS 
> w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
>    +- Exchange(distribution=[hash[id, type]])
>       +- TableSourceScan(table=[[fscat, default, events, watermark=[date]]], 
> fields=[id, type, date])
> -- however, if switch to filesystem source, the window can be triggered 
> normally
> CREATE TEMPORARY TABLE `fscat`.`default`.`event_file_source` (
>   `id` INT,
>   `type` VARCHAR(2147483647),
>   `date` TIMESTAMP(3),
>   WATERMARK FOR `date` AS `date`
> ) WITH (
>   'format' = 'csv',
>   'path' = '/tmp/events.csv',
>   'source.monitor-interval' = '1 min',
>   'connector' = 'filesystem'
> );
> // cat events.csv                                
> 1,T1,2018-01-24 00:00:00.000
> 2,T1,2018-01-26 00:00:00.000
> 1,T2,2018-01-28 00:00:00.000
> 1,T2,2018-01-28 00:00:00.000
> -- same query using filesystem source
> select `id`, `type`, COUNT(1) as event_cnt, session_start(`date`, interval 
> '1' DAY) as ss, session_end(`date`, interval '1' DAY) as se from 
> event_file_source group by `id`, `type`, session(`date`, interval '1' DAY);
> -- output
>           id                           type            event_cnt              
>         ss                      se
>            1                             T1                    1 2018-01-24 
> 00:00:00.000 2018-01-25 00:00:00.000
>            2                             T1                    1 2018-01-26 
> 00:00:00.000 2018-01-27 00:00:00.000{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to