[ 
https://issues.apache.org/jira/browse/FLINK-31013?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jane Chan updated FLINK-31013:
------------------------------
    Description: 
{code:sql}
-- test against Flink 1.16.0

create catalog fscat with (
    'type' = 'table-store',
    'warehouse' = 'file:///tmp/fscat'
);


use catalog fscat;
create table events (
  `id` int, 
  `type` string, 
  `date` TIMESTAMP(3), 
  watermark for `date` AS `date`);
  
insert into events 
values (1, 'T1', to_timestamp('2018-01-24', 'yyyy-MM-dd')), 
(2, 'T1', to_timestamp('2018-01-26', 'yyyy-MM-dd')), 
(1, 'T2', to_timestamp('2018-01-28', 'yyyy-MM-dd')), 
(1, 'T2', to_timestamp('2018-01-28', 'yyyy-MM-dd'));  

-- no output
select `id`,
    `type`, 
    COUNT(1) as event_cnt, 
    session_start(`date`, interval '1' DAY) as ss, 
    session_end(`date`, interval '1' DAY) as se 
from events group by `id`, `type`, session(`date`, interval '1' DAY); 

-- explain plan
== Abstract Syntax Tree ==
LogicalProject(id=[$0], type=[$1], event_cnt=[$3], ss=[SESSION_START($2)], 
se=[SESSION_END($2)])
+- LogicalAggregate(group=[{0, 1, 2}], event_cnt=[COUNT()])
   +- LogicalProject(id=[$0], type=[$1], $f2=[$SESSION($2, 86400000:INTERVAL 
DAY)])
      +- LogicalWatermarkAssigner(rowtime=[date], watermark=[$2])
         +- LogicalTableScan(table=[[fscat, default, events]])


== Optimized Physical Plan ==
Calc(select=[id, type, event_cnt, w$start AS ss, w$end AS se])
+- GroupWindowAggregate(groupBy=[id, type], window=[SessionGroupWindow('w$, 
date, 86400000)], properties=[w$start, w$end, w$rowtime, w$proctime], 
select=[id, type, COUNT(*) AS event_cnt, start('w$) AS w$start, end('w$) AS 
w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
   +- Exchange(distribution=[hash[id, type]])
      +- TableSourceScan(table=[[fscat, default, events, watermark=[date]]], 
fields=[id, type, date])


== Optimized Execution Plan ==
Calc(select=[id, type, event_cnt, w$start AS ss, w$end AS se])
+- GroupWindowAggregate(groupBy=[id, type], window=[SessionGroupWindow('w$, 
date, 86400000)], properties=[w$start, w$end, w$rowtime, w$proctime], 
select=[id, type, COUNT(*) AS event_cnt, start('w$) AS w$start, end('w$) AS 
w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
   +- Exchange(distribution=[hash[id, type]])
      +- TableSourceScan(table=[[fscat, default, events, watermark=[date]]], 
fields=[id, type, date])

-- however, if switch to filesystem source, the window can be triggered normally

CREATE TEMPORARY TABLE `fscat`.`default`.`event_file_source` (
  `id` INT,
  `type` VARCHAR(2147483647),
  `date` TIMESTAMP(3),
  WATERMARK FOR `date` AS `date`
) WITH (
  'format' = 'csv',
  'path' = '/tmp/events.csv',
  'source.monitor-interval' = '1 min',
  'connector' = 'filesystem'
);

// cat events.csv                                
1,T1,2018-01-24 00:00:00.000
2,T1,2018-01-26 00:00:00.000
1,T2,2018-01-28 00:00:00.000
1,T2,2018-01-28 00:00:00.000


-- same query using filesystem source
select `id`, `type`, COUNT(1) as event_cnt, session_start(`date`, interval '1' 
DAY) as ss, session_end(`date`, interval '1' DAY) as se from event_file_source 
group by `id`, `type`, session(`date`, interval '1' DAY);

-- output

          id                           type            event_cnt                
      ss                      se
           1                             T1                    1 2018-01-24 
00:00:00.000 2018-01-25 00:00:00.000
           2                             T1                    1 2018-01-26 
00:00:00.000 2018-01-27 00:00:00.000{code}

  was:
{code:java}
-- test against Flink 1.16.0

create catalog fscat with (
    'type' = 'table-store',
    'warehouse' = 'file:///tmp/fscat'
);


use catalog fscat;
create table events (
  `id` int, 
  `type` string, 
  `date` TIMESTAMP(3), 
  watermark for `date` AS `date`);
  
insert into events 
values (1, 'T1', to_timestamp('2018-01-24', 'yyyy-MM-dd')), 
(2, 'T1', to_timestamp('2018-01-26', 'yyyy-MM-dd')), 
(1, 'T2', to_timestamp('2018-01-28', 'yyyy-MM-dd')), 
(1, 'T2', to_timestamp('2018-01-28', 'yyyy-MM-dd'));  

-- no output
select `id`,
    `type`, 
    COUNT(1) as event_cnt, 
    session_start(`date`, interval '1' DAY) as ss, 
    session_end(`date`, interval '1' DAY) as se 
from events group by `id`, `type`, session(`date`, interval '1' DAY); 

-- explain plan
== Abstract Syntax Tree ==
LogicalProject(id=[$0], type=[$1], event_cnt=[$3], ss=[SESSION_START($2)], 
se=[SESSION_END($2)])
+- LogicalAggregate(group=[{0, 1, 2}], event_cnt=[COUNT()])
   +- LogicalProject(id=[$0], type=[$1], $f2=[$SESSION($2, 86400000:INTERVAL 
DAY)])
      +- LogicalWatermarkAssigner(rowtime=[date], watermark=[$2])
         +- LogicalTableScan(table=[[fscat, default, events]])


== Optimized Physical Plan ==
Calc(select=[id, type, event_cnt, w$start AS ss, w$end AS se])
+- GroupWindowAggregate(groupBy=[id, type], window=[SessionGroupWindow('w$, 
date, 86400000)], properties=[w$start, w$end, w$rowtime, w$proctime], 
select=[id, type, COUNT(*) AS event_cnt, start('w$) AS w$start, end('w$) AS 
w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
   +- Exchange(distribution=[hash[id, type]])
      +- TableSourceScan(table=[[fscat, default, events, watermark=[date]]], 
fields=[id, type, date])


== Optimized Execution Plan ==
Calc(select=[id, type, event_cnt, w$start AS ss, w$end AS se])
+- GroupWindowAggregate(groupBy=[id, type], window=[SessionGroupWindow('w$, 
date, 86400000)], properties=[w$start, w$end, w$rowtime, w$proctime], 
select=[id, type, COUNT(*) AS event_cnt, start('w$) AS w$start, end('w$) AS 
w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
   +- Exchange(distribution=[hash[id, type]])
      +- TableSourceScan(table=[[fscat, default, events, watermark=[date]]], 
fields=[id, type, date])

-- however, if switch to filesystem source, the window can be triggered normally

CREATE TEMPORARY TABLE `fscat`.`default`.`event_file_source` (
  `id` INT,
  `type` VARCHAR(2147483647),
  `date` TIMESTAMP(3),
  WATERMARK FOR `date` AS `date`
) WITH (
  'format' = 'csv',
  'path' = '/tmp/events.csv',
  'source.monitor-interval' = '1 min',
  'connector' = 'filesystem'
);

// cat events.csv                                
1,T1,2018-01-24 00:00:00.000
2,T1,2018-01-26 00:00:00.000
1,T2,2018-01-28 00:00:00.000
1,T2,2018-01-28 00:00:00.000


-- same query using filesystem source
select `id`, `type`, COUNT(1) as event_cnt, session_start(`date`, interval '1' 
DAY) as ss, session_end(`date`, interval '1' DAY) as se from event_file_source 
group by `id`, `type`, session(`date`, interval '1' DAY);

-- output

          id                           type            event_cnt                
      ss                      se
           1                             T1                    1 2018-01-24 
00:00:00.000 2018-01-25 00:00:00.000
           2                             T1                    1 2018-01-26 
00:00:00.000 2018-01-27 00:00:00.000{code}


> Session window aggregation cannot trigger window using event time
> -----------------------------------------------------------------
>
>                 Key: FLINK-31013
>                 URL: https://issues.apache.org/jira/browse/FLINK-31013
>             Project: Flink
>          Issue Type: Bug
>          Components: Table Store
>    Affects Versions: table-store-0.4.0
>            Reporter: Jane Chan
>            Priority: Major
>             Fix For: table-store-0.4.0
>
>
> {code:sql}
> -- test against Flink 1.16.0
> create catalog fscat with (
>     'type' = 'table-store',
>     'warehouse' = 'file:///tmp/fscat'
> );
> use catalog fscat;
> create table events (
>   `id` int, 
>   `type` string, 
>   `date` TIMESTAMP(3), 
>   watermark for `date` AS `date`);
>   
> insert into events 
> values (1, 'T1', to_timestamp('2018-01-24', 'yyyy-MM-dd')), 
> (2, 'T1', to_timestamp('2018-01-26', 'yyyy-MM-dd')), 
> (1, 'T2', to_timestamp('2018-01-28', 'yyyy-MM-dd')), 
> (1, 'T2', to_timestamp('2018-01-28', 'yyyy-MM-dd'));  
> -- no output
> select `id`,
>     `type`, 
>     COUNT(1) as event_cnt, 
>     session_start(`date`, interval '1' DAY) as ss, 
>     session_end(`date`, interval '1' DAY) as se 
> from events group by `id`, `type`, session(`date`, interval '1' DAY); 
> -- explain plan
> == Abstract Syntax Tree ==
> LogicalProject(id=[$0], type=[$1], event_cnt=[$3], ss=[SESSION_START($2)], 
> se=[SESSION_END($2)])
> +- LogicalAggregate(group=[{0, 1, 2}], event_cnt=[COUNT()])
>    +- LogicalProject(id=[$0], type=[$1], $f2=[$SESSION($2, 86400000:INTERVAL 
> DAY)])
>       +- LogicalWatermarkAssigner(rowtime=[date], watermark=[$2])
>          +- LogicalTableScan(table=[[fscat, default, events]])
> == Optimized Physical Plan ==
> Calc(select=[id, type, event_cnt, w$start AS ss, w$end AS se])
> +- GroupWindowAggregate(groupBy=[id, type], window=[SessionGroupWindow('w$, 
> date, 86400000)], properties=[w$start, w$end, w$rowtime, w$proctime], 
> select=[id, type, COUNT(*) AS event_cnt, start('w$) AS w$start, end('w$) AS 
> w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
>    +- Exchange(distribution=[hash[id, type]])
>       +- TableSourceScan(table=[[fscat, default, events, watermark=[date]]], 
> fields=[id, type, date])
> == Optimized Execution Plan ==
> Calc(select=[id, type, event_cnt, w$start AS ss, w$end AS se])
> +- GroupWindowAggregate(groupBy=[id, type], window=[SessionGroupWindow('w$, 
> date, 86400000)], properties=[w$start, w$end, w$rowtime, w$proctime], 
> select=[id, type, COUNT(*) AS event_cnt, start('w$) AS w$start, end('w$) AS 
> w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
>    +- Exchange(distribution=[hash[id, type]])
>       +- TableSourceScan(table=[[fscat, default, events, watermark=[date]]], 
> fields=[id, type, date])
> -- however, if switch to filesystem source, the window can be triggered 
> normally
> CREATE TEMPORARY TABLE `fscat`.`default`.`event_file_source` (
>   `id` INT,
>   `type` VARCHAR(2147483647),
>   `date` TIMESTAMP(3),
>   WATERMARK FOR `date` AS `date`
> ) WITH (
>   'format' = 'csv',
>   'path' = '/tmp/events.csv',
>   'source.monitor-interval' = '1 min',
>   'connector' = 'filesystem'
> );
> // cat events.csv                                
> 1,T1,2018-01-24 00:00:00.000
> 2,T1,2018-01-26 00:00:00.000
> 1,T2,2018-01-28 00:00:00.000
> 1,T2,2018-01-28 00:00:00.000
> -- same query using filesystem source
> select `id`, `type`, COUNT(1) as event_cnt, session_start(`date`, interval 
> '1' DAY) as ss, session_end(`date`, interval '1' DAY) as se from 
> event_file_source group by `id`, `type`, session(`date`, interval '1' DAY);
> -- output
>           id                           type            event_cnt              
>         ss                      se
>            1                             T1                    1 2018-01-24 
> 00:00:00.000 2018-01-25 00:00:00.000
>            2                             T1                    1 2018-01-26 
> 00:00:00.000 2018-01-27 00:00:00.000{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to