Hi Kenn, Thank you for the the explanation! These points make total sense, that's why I was surprised with the observed behavior, which breaks at least points 3 and 4. I'll try to extract and share a minimal working example which demonstrates this behavior.
Thank you, Amit. On Wed, Mar 6, 2019 at 8:11 PM Kenneth Knowles <k...@apache.org> wrote: > What you describe is not expected. Here are the relevant points, I think: > > - A window is expired when the watermark is past the end of the window + > allowed lateness > - An element is droppable when it is associated to an expired window > - All droppable elements should be dropped before reaching the stateful > ParDo step > - The state and processing time timers for a particular key+window pair > are garbage collected when the window is expired, because it is known that > nothing can cause that state to be read > - A key+window state is not cleared until all event time timers have been > dispatched > > Do these make sense? I'd love to see more detail of your pipeline code. > One thing to note is that an element being behind the watermark doesn't > really matter. What matters is how the watermark relates to its window. > > Kenn > > On Wed, Mar 6, 2019 at 4:28 AM Amit Ziv-Kenet <amit.zivke...@gmail.com> > wrote: > >> Hello, >> >> I'm encountering an unexpected behavior in one of our pipeline, I hope >> you might help me make sense of it. >> >> This is a streaming pipeline implemented with the Java SDK (2.10) and >> running on Dataflow. >> >> * In the pipeline a FixedWindow is applied to the data with an allowed >> lateness of a week (doesn't really matter). >> * The windowed PCollection is then used in 2 separate branches: one >> regular GroupByKey and and a second one which is Transform which makes have >> use of State and Timers. >> * In the stateful transform, incoming elements are added to a BagState >> container until some condition on the elements is reached, and the elements >> in the BagState are dispatched downstream. A timer makes sure that the >> BagState is dispatched even if the condition is not met after some timeout >> has expired. >> >> Occasionally very late data enters the pipeline, with timestamps older >> than the allowed lateness. >> In these cases, the GroupByKey transform behaves as expected, and there >> aren't any panes with the late data output downstream. >> In the stateful transform, on the other hand, I see that these late >> elements are processed and added to the BagState, but at a later point in >> time (e.g. when the timer is triggered) the elements "disappear" from the >> BagState and are no longer observable. This can happen multiple times, i.e. >> late elements added to the BagState and then disappearing at a later time. >> >> Is this an expected behavior? Is there some sort of late data "garbage >> collection" which eventually removes late elements? >> >> Thank you for your help, I hope my description is clear enough. >> >> Regards, >> Amit. >> >