Hi Kenn,
Thank you for the the explanation!

These points make total sense, that's why I was surprised with the observed
behavior, which breaks at least points 3 and 4.
I'll try to extract and share a minimal working example which demonstrates
this behavior.

Thank you,
Amit.

On Wed, Mar 6, 2019 at 8:11 PM Kenneth Knowles <k...@apache.org> wrote:

> What you describe is not expected. Here are the relevant points, I think:
>
>  - A window is expired when the watermark is past the end of the window +
> allowed lateness
>  - An element is droppable when it is associated to an expired window
>  - All droppable elements should be dropped before reaching the stateful
> ParDo step
>  - The state and processing time timers for a particular key+window pair
> are garbage collected when the window is expired, because it is known that
> nothing can cause that state to be read
>  - A key+window state is not cleared until all event time timers have been
> dispatched
>
> Do these make sense? I'd love to see more detail of your pipeline code.
> One thing to note is that an element being behind the watermark doesn't
> really matter. What matters is how the watermark relates to its window.
>
> Kenn
>
> On Wed, Mar 6, 2019 at 4:28 AM Amit Ziv-Kenet <amit.zivke...@gmail.com>
> wrote:
>
>> Hello,
>>
>> I'm encountering an unexpected behavior in one of our pipeline, I hope
>> you might help me make sense of it.
>>
>> This is a streaming pipeline implemented with the Java SDK (2.10) and
>> running on Dataflow.
>>
>> * In the pipeline a FixedWindow is applied to the data with an allowed
>> lateness of a week (doesn't really matter).
>> * The windowed PCollection is then used in 2 separate branches: one
>> regular GroupByKey and and a second one which is Transform which makes have
>> use of State and Timers.
>> * In the stateful transform, incoming elements are added to a BagState
>> container until some condition on the elements is reached, and the elements
>> in the BagState are dispatched downstream. A timer makes sure that the
>> BagState is dispatched even if the condition is not met after some timeout
>> has expired.
>>
>> Occasionally very late data enters the pipeline, with timestamps older
>> than the allowed lateness.
>> In these cases, the GroupByKey transform behaves as expected, and there
>> aren't any panes with the late data output downstream.
>> In the stateful transform, on the other hand, I see that these late
>> elements are processed and added to the BagState, but at a later point in
>> time (e.g. when the timer is triggered) the elements "disappear" from the
>> BagState and are no longer observable. This can happen multiple times, i.e.
>> late elements added to the BagState and then disappearing at a later time.
>>
>> Is this an expected behavior? Is there some sort of late data "garbage
>> collection" which eventually removes late elements?
>>
>> Thank you for your help, I hope my description is clear enough.
>>
>> Regards,
>> Amit.
>>
>

Reply via email to