[ 
https://issues.apache.org/jira/browse/FLINK-26498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17504810#comment-17504810
 ] 

Jark Wu commented on FLINK-26498:
---------------------------------

[~hehuiyuan], [~paul8263], the current behavior works as expect with the given 
configuraitons and watermark definition. So I don't think there is a bug there. 

There is no guarantee to emit results when window cleanup, no configuraiton or 
semantics provides this guarantee. 
If you want to emit result when window cleanup, you are talking about a new 
feature which may need to introduce a new configuration (e.g. 
{{table.exec.emit.on-cleanup}}) to control this behavior.

However, I'm not so sure about this, because no records will be dropped by 
having a better watermark definition (e.g. ts -INTERVAL '5' MINUTE).
Introducing a new emit configuraiton will increase the window complexity.

> The window result may not have been  emitted when use window emit feature and 
> set allow-latency 
> ------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-26498
>                 URL: https://issues.apache.org/jira/browse/FLINK-26498
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / Planner
>            Reporter: hehuiyuan
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: image-2022-03-05-23-53-37-086.png, 
> image-2022-03-05-23-53-44-196.png, image-2022-03-06-00-03-11-670.png
>
>
> the sql of job :
> {code:java}
> CREATE TABLE tableSource(
>     name string,
>     age int not null,
>     sex string,
>     dt TIMESTAMP(3),
>     WATERMARK FOR dt AS dt 
> ) WITH (
> );
> CREATE TABLE tableSink(
>     windowstart timestamp(3),
>     windowend timestamp(3),
>     name string,
>     age int,
>     cou bigint
> )
> WITH (
> );
> INSERT INTO tablesink
>   SELECT
>     TUMBLE_START(dt, INTERVAL '1' HOUR),
>     TUMBLE_END(dt, INTERVAL '1' HOUR),
>     name,
>     age,
>     count(sex)
> FROM tableSource
> GROUP BY TUMBLE(dt, INTERVAL '1' HOUR), name,age {code}
>  
> and table config:
> {code:java}
> table.exec.emit.allow-lateness = 1 hour 
> table.exec.emit.late-fire.delay = 1 min
> table.exec.emit.early-fire.delay = 1min{code}
>  
> The data:
> {code:java}
> >hehuiyuan1,22,woman,2022-03-05 00:30:22.000
> >hehuiyuan1,22,woman,2022-03-05 00:40:22.000
>  //pause ,wait for the window trigger for earlyTrigger 1 min
> >hehuiyuan1,22,woman,2022-03-05 00:50:22.000
> >hehuiyuan1,22,woman,2022-03-05 00:56:22.000
> //pause ,wait for the window trigger for earlyTrigger 1 min 
> >hehuiyuan1,22,woman,2022-03-05 01:00:00.000
> //pause ,wait for the window trigger for earlyTrigger 1 min 
> >hehuiyuan1,22,woman,2022-03-05 00:59:20.000 --latency data
> //pause ,wait for the window trigger for earlyTrigger 1 min 
> >hehuiyuan1,22,woman,2022-03-05 00:59:20.100 --latency data 
> >hehuiyuan1,22,woman,2022-03-05 02:00:00.000 -- window state clean for 
> >[0:00:00 1:00:00]
> >hehuiyuan1,22,woman,2022-03-05 02:10:00.000 {code}
>  
> The result:
> {code:java}
> > +I(+I[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 2])
> > -U(-U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 2]) 
> > +U(+U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 4]) 
> > +I(+I[2022-03-05T01:00, 2022-03-05T02:00, hehuiyuan1, 22, 1])
> > -U(-U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 4]) 
> > +U(+U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 5])
>  
>  
> > +I(+I[2022-03-05T02:00, 2022-03-05T03:00, hehuiyuan1, 22, 2]) {code}
>  
> The window result  is lost when `hehuiyuan1,22,woman,2022-03-05 00:59:20.100` 
>  arrived, the lateTrigger is not trigger and the window[0:00:00 ,1:00:00] is 
> cleaned when the data `hehuiyuan1,22,woman,2022-03-05 02:00:00.000` arrived 
> that updated watermark.
>  
> The window[0:00:00 ,1:00:00]   has 6 pieces of data, but we got 5.
> The trigger is AfterEndOfWindowEarlyAndLate .
> So WindowOpearator may need to emit reuslt when the window cleanupTimer call 
> onEventTime.
>  
> I think the correct result is as follows:
> {code:java}
> > +I(+I[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 2])
> > -U(-U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 2])
> > +U(+U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 4])
> > +I(+I[2022-03-05T01:00, 2022-03-05T02:00, hehuiyuan1, 22, 1])
> > -U(-U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 4])
> > +U(+U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 5])
> > -U(-U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 5])
> > +U(+U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 6])
> > +I(+I[2022-03-05T02:00, 2022-03-05T03:00, hehuiyuan1, 22, 2]) {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to