Fabian Hueske created FLINK-6858:
------------------------------------

             Summary: Unbounded event time Over Window emits incorrect 
timestamps
                 Key: FLINK-6858
                 URL: https://issues.apache.org/jira/browse/FLINK-6858
             Project: Flink
          Issue Type: Bug
          Components: Table API & SQL
    Affects Versions: 1.3.0
            Reporter: Fabian Hueske
            Priority: Critical


The unbounded event time OVER windows emit records with incorrect timestamps.

OVER aggregates "enrich" each input row with aggregates computed over 
neighboring rows, i.e., they produce one output row for each input row. The 
(event-time) timestamp of each input row should be preserved and not modified.

All OVER window aggregates are implemented using the {{ProcessFunction}} 
interface. The interface has two methods {{processElement()}} and {{onTimer()}} 
that can produce output records. Records emitted by {{processElement()}} are 
emitted with the timestamp of the record that was given as an argument to the 
method. Records emitted by {{onTimer()}} are emitted with the timestamp of the 
timer that triggered the call of the method.

The implementation of the unbounded event-time OVER window registers a new new 
timer when {{processElement()}} is called for {{currentWatermark + 1}}. When 
the timer triggers, the {{onTimer()}} processes all rows that where received 
between this and the last {{onTimer()}} call with timestamps smaller than the 
current watermark. However, this means that all emitted rows have a timestamp 
of {{currentWatermark + 1}} which is not what we want.

The bounded event-time OVER window operators follow a different strategy and 
register a timer for the timestamp of each row that was processed by 
{{processElement()}} and emit the corresponding rows when {{onTimer()}} is 
called. Hence, they emit the rows with correct timestamps.

I think we should change the implementation of the unbounded event-time OVER 
aggregates to a similar strategy as the bounded event-time OVER aggregates.

What do you think [~Yuhong_kyo] [~sunjincheng121]?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to