[jira] [Commented] (FLINK-31013) Session window aggregation cannot trigger window using event time

2023-02-16 Thread Jane Chan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17690177#comment-17690177
 ] 

Jane Chan commented on FLINK-31013:
---

[~xzw0223] What's your point?


According to the table schema
||id||type||date||
|1|T1|2018-01-24 00:00:00.000|
|2|T1|2018-01-26 00:00:00.000|
|1|T2|2018-01-28 00:00:00.000|
|1|T2|2018-01-28 00:00:00.000|

 
And the query with a session window gap of 1 DAY, the first two windows should 
be triggered.
{code:sql}
select `id`,
    `type`, 
    COUNT(1) as event_cnt, 
    session_start(`date`, interval '1' DAY) as ss, 
    session_end(`date`, interval '1' DAY) as se 
from events group by `id`, `type`, session(`date`, interval '1' DAY); 
{code}
 

Expected Output
||id||type||event_cnt||ss||se||
|1|T1|1|2018-01-24 00:00:00.000|2018-01-25 00:00:00.000|
|2|T1|1|2018-01-26 00:00:00.000|2018-01-27 00:00:00.000|

 

Table Store Actual Output

No output

> Session window aggregation cannot trigger window using event time
> -
>
> Key: FLINK-31013
> URL: https://issues.apache.org/jira/browse/FLINK-31013
> Project: Flink
>  Issue Type: Bug
>  Components: Table Store
>Affects Versions: table-store-0.4.0
>Reporter: Jane Chan
>Priority: Major
> Fix For: table-store-0.4.0
>
>
> {code:sql}
> -- test against Flink 1.16.0
> create catalog fscat with (
>     'type' = 'table-store',
>     'warehouse' = 'file:///tmp/fscat'
> );
> use catalog fscat;
> create table events (
>   `id` int, 
>   `type` string, 
>   `date` TIMESTAMP(3), 
>   watermark for `date` AS `date`);
>   
> insert into events 
> values (1, 'T1', to_timestamp('2018-01-24', '-MM-dd')), 
> (2, 'T1', to_timestamp('2018-01-26', '-MM-dd')), 
> (1, 'T2', to_timestamp('2018-01-28', '-MM-dd')), 
> (1, 'T2', to_timestamp('2018-01-28', '-MM-dd'));  
> -- no output
> select `id`,
>     `type`, 
>     COUNT(1) as event_cnt, 
>     session_start(`date`, interval '1' DAY) as ss, 
>     session_end(`date`, interval '1' DAY) as se 
> from events group by `id`, `type`, session(`date`, interval '1' DAY); 
> -- explain plan
> == Abstract Syntax Tree ==
> LogicalProject(id=[$0], type=[$1], event_cnt=[$3], ss=[SESSION_START($2)], 
> se=[SESSION_END($2)])
> +- LogicalAggregate(group=[{0, 1, 2}], event_cnt=[COUNT()])
>    +- LogicalProject(id=[$0], type=[$1], $f2=[$SESSION($2, 8640:INTERVAL 
> DAY)])
>       +- LogicalWatermarkAssigner(rowtime=[date], watermark=[$2])
>          +- LogicalTableScan(table=[[fscat, default, events]])
> == Optimized Physical Plan ==
> Calc(select=[id, type, event_cnt, w$start AS ss, w$end AS se])
> +- GroupWindowAggregate(groupBy=[id, type], window=[SessionGroupWindow('w$, 
> date, 8640)], properties=[w$start, w$end, w$rowtime, w$proctime], 
> select=[id, type, COUNT(*) AS event_cnt, start('w$) AS w$start, end('w$) AS 
> w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
>    +- Exchange(distribution=[hash[id, type]])
>       +- TableSourceScan(table=[[fscat, default, events, watermark=[date]]], 
> fields=[id, type, date])
> == Optimized Execution Plan ==
> Calc(select=[id, type, event_cnt, w$start AS ss, w$end AS se])
> +- GroupWindowAggregate(groupBy=[id, type], window=[SessionGroupWindow('w$, 
> date, 8640)], properties=[w$start, w$end, w$rowtime, w$proctime], 
> select=[id, type, COUNT(*) AS event_cnt, start('w$) AS w$start, end('w$) AS 
> w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
>    +- Exchange(distribution=[hash[id, type]])
>       +- TableSourceScan(table=[[fscat, default, events, watermark=[date]]], 
> fields=[id, type, date])
> -- however, if switch to filesystem source, the window can be triggered 
> normally
> CREATE TEMPORARY TABLE `fscat`.`default`.`event_file_source` (
>   `id` INT,
>   `type` VARCHAR(2147483647),
>   `date` TIMESTAMP(3),
>   WATERMARK FOR `date` AS `date`
> ) WITH (
>   'format' = 'csv',
>   'path' = '/tmp/events.csv',
>   'source.monitor-interval' = '1 min',
>   'connector' = 'filesystem'
> );
> // cat events.csv                                
> 1,T1,2018-01-24 00:00:00.000
> 2,T1,2018-01-26 00:00:00.000
> 1,T2,2018-01-28 00:00:00.000
> 1,T2,2018-01-28 00:00:00.000
> -- same query using filesystem source
> select `id`, `type`, COUNT(1) as event_cnt, session_start(`date`, interval 
> '1' DAY) as ss, session_end(`date`, interval '1' DAY) as se from 
> event_file_source group by `id`, `type`, session(`date`, interval '1' DAY);
> -- output
>           id                           type            event_cnt              
>         ss                      se
>            1                             T1                    1 2018-01-24 
> 00:00:00.000 2018-01-25 00:00:00.000
>            2                             T1                    1 2018-01-26 
> 00:00:00.000 2018-01-27 

[jira] [Commented] (FLINK-31013) Session window aggregation cannot trigger window using event time

2023-02-14 Thread xzw0223 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17688777#comment-17688777
 ] 

xzw0223 commented on FLINK-31013:
-

I refer you to the documentation 
[https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/queries/window-agg/#group-window-aggregation]
  on Windows

Instead of dividing the window by the specified interval, the sessionwindow 
will fire only after the specified interval window has no data and is inactive.

 

This is your data
1,T1,2018-01-24 00:00:00.000
2,T1,2018-01-26 00:00:00.000
1,T2,2018-01-28 00:00:00.000
1,T2,2018-01-28 00:00:00.000
 

When the first data is accepted and the second data is received, the second 
data event time is longer than the activity interval of the window, then the 
window will be triggered.

When three pieces of data are received, his event event is greater than the 
window interval, triggering the window calculation.

When the fourth piece of data is received, it has the same time as the third 
piece of data, and they will be divided into the same sessionwindow. Since no 
new data is continuously received, the sessionwindow will not trigger 
calculation until the received data It will only be triggered when the event 
time of is greater than the window interval.

 

 

> Session window aggregation cannot trigger window using event time
> -
>
> Key: FLINK-31013
> URL: https://issues.apache.org/jira/browse/FLINK-31013
> Project: Flink
>  Issue Type: Bug
>  Components: Table Store
>Affects Versions: table-store-0.4.0
>Reporter: Jane Chan
>Priority: Major
> Fix For: table-store-0.4.0
>
>
> {code:sql}
> -- test against Flink 1.16.0
> create catalog fscat with (
>     'type' = 'table-store',
>     'warehouse' = 'file:///tmp/fscat'
> );
> use catalog fscat;
> create table events (
>   `id` int, 
>   `type` string, 
>   `date` TIMESTAMP(3), 
>   watermark for `date` AS `date`);
>   
> insert into events 
> values (1, 'T1', to_timestamp('2018-01-24', '-MM-dd')), 
> (2, 'T1', to_timestamp('2018-01-26', '-MM-dd')), 
> (1, 'T2', to_timestamp('2018-01-28', '-MM-dd')), 
> (1, 'T2', to_timestamp('2018-01-28', '-MM-dd'));  
> -- no output
> select `id`,
>     `type`, 
>     COUNT(1) as event_cnt, 
>     session_start(`date`, interval '1' DAY) as ss, 
>     session_end(`date`, interval '1' DAY) as se 
> from events group by `id`, `type`, session(`date`, interval '1' DAY); 
> -- explain plan
> == Abstract Syntax Tree ==
> LogicalProject(id=[$0], type=[$1], event_cnt=[$3], ss=[SESSION_START($2)], 
> se=[SESSION_END($2)])
> +- LogicalAggregate(group=[{0, 1, 2}], event_cnt=[COUNT()])
>    +- LogicalProject(id=[$0], type=[$1], $f2=[$SESSION($2, 8640:INTERVAL 
> DAY)])
>       +- LogicalWatermarkAssigner(rowtime=[date], watermark=[$2])
>          +- LogicalTableScan(table=[[fscat, default, events]])
> == Optimized Physical Plan ==
> Calc(select=[id, type, event_cnt, w$start AS ss, w$end AS se])
> +- GroupWindowAggregate(groupBy=[id, type], window=[SessionGroupWindow('w$, 
> date, 8640)], properties=[w$start, w$end, w$rowtime, w$proctime], 
> select=[id, type, COUNT(*) AS event_cnt, start('w$) AS w$start, end('w$) AS 
> w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
>    +- Exchange(distribution=[hash[id, type]])
>       +- TableSourceScan(table=[[fscat, default, events, watermark=[date]]], 
> fields=[id, type, date])
> == Optimized Execution Plan ==
> Calc(select=[id, type, event_cnt, w$start AS ss, w$end AS se])
> +- GroupWindowAggregate(groupBy=[id, type], window=[SessionGroupWindow('w$, 
> date, 8640)], properties=[w$start, w$end, w$rowtime, w$proctime], 
> select=[id, type, COUNT(*) AS event_cnt, start('w$) AS w$start, end('w$) AS 
> w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
>    +- Exchange(distribution=[hash[id, type]])
>       +- TableSourceScan(table=[[fscat, default, events, watermark=[date]]], 
> fields=[id, type, date])
> -- however, if switch to filesystem source, the window can be triggered 
> normally
> CREATE TEMPORARY TABLE `fscat`.`default`.`event_file_source` (
>   `id` INT,
>   `type` VARCHAR(2147483647),
>   `date` TIMESTAMP(3),
>   WATERMARK FOR `date` AS `date`
> ) WITH (
>   'format' = 'csv',
>   'path' = '/tmp/events.csv',
>   'source.monitor-interval' = '1 min',
>   'connector' = 'filesystem'
> );
> // cat events.csv                                
> 1,T1,2018-01-24 00:00:00.000
> 2,T1,2018-01-26 00:00:00.000
> 1,T2,2018-01-28 00:00:00.000
> 1,T2,2018-01-28 00:00:00.000
> -- same query using filesystem source
> select `id`, `type`, COUNT(1) as event_cnt, session_start(`date`, interval 
> '1' DAY) as ss, session_end(`date`, interval '1' DAY) as se from 
> event_file_source group by `id`, `type`,