[ https://issues.apache.org/jira/browse/FLINK-26498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17505973#comment-17505973 ]
Jark Wu commented on FLINK-26498: --------------------------------- Hi [~paul8263], as I mentioned before, we can introduce a configuration to control this behavior. But I think always emitting result when cleanup does change the semantics and is not a good idea. > The window result may not have been emitted when use window emit feature and > set allow-latency > ------------------------------------------------------------------------------------------------ > > Key: FLINK-26498 > URL: https://issues.apache.org/jira/browse/FLINK-26498 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner > Reporter: hehuiyuan > Priority: Major > Labels: pull-request-available > Attachments: image-2022-03-05-23-53-37-086.png, > image-2022-03-05-23-53-44-196.png, image-2022-03-06-00-03-11-670.png > > > the sql of job : > {code:java} > CREATE TABLE tableSource( > name string, > age int not null, > sex string, > dt TIMESTAMP(3), > WATERMARK FOR dt AS dt > ) WITH ( > ); > CREATE TABLE tableSink( > windowstart timestamp(3), > windowend timestamp(3), > name string, > age int, > cou bigint > ) > WITH ( > ); > INSERT INTO tablesink > SELECT > TUMBLE_START(dt, INTERVAL '1' HOUR), > TUMBLE_END(dt, INTERVAL '1' HOUR), > name, > age, > count(sex) > FROM tableSource > GROUP BY TUMBLE(dt, INTERVAL '1' HOUR), name,age {code} > > and table config: > {code:java} > table.exec.emit.allow-lateness = 1 hour > table.exec.emit.late-fire.delay = 1 min > table.exec.emit.early-fire.delay = 1min{code} > > The data: > {code:java} > >hehuiyuan1,22,woman,2022-03-05 00:30:22.000 > >hehuiyuan1,22,woman,2022-03-05 00:40:22.000 > //pause ,wait for the window trigger for earlyTrigger 1 min > >hehuiyuan1,22,woman,2022-03-05 00:50:22.000 > >hehuiyuan1,22,woman,2022-03-05 00:56:22.000 > //pause ,wait for the window trigger for earlyTrigger 1 min > >hehuiyuan1,22,woman,2022-03-05 01:00:00.000 > //pause ,wait for the window trigger for earlyTrigger 1 min > >hehuiyuan1,22,woman,2022-03-05 00:59:20.000 --latency data > //pause ,wait for the window trigger for earlyTrigger 1 min > >hehuiyuan1,22,woman,2022-03-05 00:59:20.100 --latency data > >hehuiyuan1,22,woman,2022-03-05 02:00:00.000 -- window state clean for > >[0:00:00 1:00:00] > >hehuiyuan1,22,woman,2022-03-05 02:10:00.000 {code} > > The result: > {code:java} > > +I(+I[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 2]) > > -U(-U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 2]) > > +U(+U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 4]) > > +I(+I[2022-03-05T01:00, 2022-03-05T02:00, hehuiyuan1, 22, 1]) > > -U(-U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 4]) > > +U(+U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 5]) > > > > +I(+I[2022-03-05T02:00, 2022-03-05T03:00, hehuiyuan1, 22, 2]) {code} > > The window result is lost when `hehuiyuan1,22,woman,2022-03-05 00:59:20.100` > arrived, the lateTrigger is not trigger and the window[0:00:00 ,1:00:00] is > cleaned when the data `hehuiyuan1,22,woman,2022-03-05 02:00:00.000` arrived > that updated watermark. > > The window[0:00:00 ,1:00:00] has 6 pieces of data, but we got 5. > The trigger is AfterEndOfWindowEarlyAndLate . > So WindowOpearator may need to emit reuslt when the window cleanupTimer call > onEventTime. > > I think the correct result is as follows: > {code:java} > > +I(+I[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 2]) > > -U(-U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 2]) > > +U(+U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 4]) > > +I(+I[2022-03-05T01:00, 2022-03-05T02:00, hehuiyuan1, 22, 1]) > > -U(-U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 4]) > > +U(+U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 5]) > > -U(-U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 5]) > > +U(+U[2022-03-05T00:00, 2022-03-05T01:00, hehuiyuan1, 22, 6]) > > +I(+I[2022-03-05T02:00, 2022-03-05T03:00, hehuiyuan1, 22, 2]) {code} > -- This message was sent by Atlassian Jira (v8.20.1#820001)