Hi Shay,

This sounds very much like the off-by-one bug described by FLINK-9857 [1].
The problem was identified in another recent user ml thread and fixed for
Flink 1.5.2 and 1.6.0.

Best, Fabian

[1] https://issues.apache.org/jira/browse/FLINK-9857

2018-07-18 19:00 GMT+02:00 Andrey Zagrebin <and...@data-artisans.com>:

> Hi Shay,
>
> I would suggest to try Allowed Lateness, like you mention 500 ms:
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/
> operators/windows.html#allowed-lateness
> It might also work for processing time.
>
> Cheers,
> Andrey
>
> On 18 Jul 2018, at 17:22, Shimony, Shay <shay.shim...@citi.com> wrote:
>
> Hi,
>
> It seems like we encounter a race situation between the aggregation thread
> and the Time Trigger thread.
> It might not be a bug, but it still seems strange to us, and we would like
> your help to fix it/work around it please.
>
> First, few descriptions about our use case and system:
> ·         We are working with processing time.
> ·         We are using Flink 1.4.
> ·         We use our customized sliding window of size 1 minute, slide 10
> seconds.
> But we think it can happen also in tumbling window. So for simplicity,
> let’s assume tumbling window of 1 minute.
> ·         Our window Trigger does FIRE upon each element.
> ·         We have constant 2k/sec incoming messages, balanced rate.
> ·         When I say “window state” I mean simply our aggregation value
> in it.
>
> If the timestamp of an element is very close to the end of the window,
> then it will be assigned with that window of course, but it occasionally
> happen that this window is timing out and cleared – before this element is
> aggregated with the window state, thus we lost the previous aggregation
> value and got new aggregation state with the element value.
>
> Below is the story as seen by the threads.
> Timestamps are logical.
>
> Suppose we are in the beginning of WindowOperator.processElement.
> Current time: 119 (nearly 120)
>
> *Reducer thread*
> *Time Trigger thread*
> Assign element to window [60, 120],
>
> because context.getCurrentProcessingTime()
>
> Returned 119 (in assignWindows)
>
>
>
> Time is 120 à clear window state
> Add the element value to window state [60, 120] (it starts from new state)
>
>
> Our questions:
> 1.       Is it a legitimate race? (We expected that (1) assigning element
> to a window + aggregating it to its state, and (2) clearing the window –
> would be atomic to each other – that is, if an element is valid for a
> window, then it will be assigned to it and aggregated fully into its state,
> and only then window clear can happen).
> 2.       How could we make the Time Trigger thread wait a little bit with
> the window cleaning? Like adding 500ms to clean window time schedule.
> We thought to override WindowOperator.cleanupTime, so is it possible to
> easily replace WindowOperator with ours?
> 3.       Maybe you have different idea to work around it?
>
> Thanks!
> Shay
>
>
>

Reply via email to