Hi Shay, This sounds very much like the off-by-one bug described by FLINK-9857 [1]. The problem was identified in another recent user ml thread and fixed for Flink 1.5.2 and 1.6.0.
Best, Fabian [1] https://issues.apache.org/jira/browse/FLINK-9857 2018-07-18 19:00 GMT+02:00 Andrey Zagrebin <and...@data-artisans.com>: > Hi Shay, > > I would suggest to try Allowed Lateness, like you mention 500 ms: > https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/ > operators/windows.html#allowed-lateness > It might also work for processing time. > > Cheers, > Andrey > > On 18 Jul 2018, at 17:22, Shimony, Shay <shay.shim...@citi.com> wrote: > > Hi, > > It seems like we encounter a race situation between the aggregation thread > and the Time Trigger thread. > It might not be a bug, but it still seems strange to us, and we would like > your help to fix it/work around it please. > > First, few descriptions about our use case and system: > · We are working with processing time. > · We are using Flink 1.4. > · We use our customized sliding window of size 1 minute, slide 10 > seconds. > But we think it can happen also in tumbling window. So for simplicity, > let’s assume tumbling window of 1 minute. > · Our window Trigger does FIRE upon each element. > · We have constant 2k/sec incoming messages, balanced rate. > · When I say “window state” I mean simply our aggregation value > in it. > > If the timestamp of an element is very close to the end of the window, > then it will be assigned with that window of course, but it occasionally > happen that this window is timing out and cleared – before this element is > aggregated with the window state, thus we lost the previous aggregation > value and got new aggregation state with the element value. > > Below is the story as seen by the threads. > Timestamps are logical. > > Suppose we are in the beginning of WindowOperator.processElement. > Current time: 119 (nearly 120) > > *Reducer thread* > *Time Trigger thread* > Assign element to window [60, 120], > > because context.getCurrentProcessingTime() > > Returned 119 (in assignWindows) > > > > Time is 120 à clear window state > Add the element value to window state [60, 120] (it starts from new state) > > > Our questions: > 1. Is it a legitimate race? (We expected that (1) assigning element > to a window + aggregating it to its state, and (2) clearing the window – > would be atomic to each other – that is, if an element is valid for a > window, then it will be assigned to it and aggregated fully into its state, > and only then window clear can happen). > 2. How could we make the Time Trigger thread wait a little bit with > the window cleaning? Like adding 500ms to clean window time schedule. > We thought to override WindowOperator.cleanupTime, so is it possible to > easily replace WindowOperator with ours? > 3. Maybe you have different idea to work around it? > > Thanks! > Shay > > >