OK, I've now read your pipeline. In the minimal pipeline, are you saying
that you see logs with "STATE <event>" but then never a corresponding
"STATE TIMER"?

There are couple other things to mention:

 - reading data from Pubsub and then using WithTimestamps is not as good as
using the timestamp attribute control on PubsubIO. So you will create late
/ dropped data potentially.
 - you have a processing time timer, but when a window expires it will not
fire, so you also need to set an event time timer for the end of the
window + allowed lateness

Kenn

On Sun, Mar 17, 2019 at 6:53 AM Amit Ziv-Kenet <amit.zivke...@gmail.com>
wrote:

> Hello,
>
> I've been able to produce a (relatively) minimal pipeline which reproduces
> the behavior I've previously noticed in one of our production pipelines.
>
> The minimal pipeline repo can be found here:
> https://github.com/azk/late-samples-state
> The actual pipeline is implemented in one big class here:
> https://github.com/azk/late-samples-state/blob/master/src/main/java/late/samples/state/LateSamplesState.java
>
> Basically, this pipeline consumes events from a pubsub topic, which are
> very simple json objects. The events are:
> * deserialized
> * each element's timestamp is extracted from the json payload
> * the elements are keyed and then windowed in a one-minute fixed window
> with an allowed lateness of 7 days, with processing time triggers and
> accumulating panes.
>
> The pipeline then branches into 2 branches:
> 1. A simple GroupByKey, followed by a logging step which records which
> panes were fired.
> 2. A stateful ParDo, which collects elements in a BagState, logs what is
> currently in the Bag and sets a one minute timer, which also logs the
> contents of the BagState.
>
> When running this pipeline in Dataflow, and publishing a few "very late"
> events through the script which is part of the repo:
> https://github.com/azk/late-samples-state/blob/master/scripts/publish_events.py
>  ,
> the following behavior is observed:
> 1. The very late elements never appear after the GroupBy step, and are
> dropped as expected.
> 2. The very late elements are recorded in the stateful transform and added
> to the BagState.
> 3. When the timer expires, the BagState is empty and the elements that
> were previously in the Bag seem to disappear.
>
> Is this an expected behavior or is there some subtle issue going on?
> I'd be more than happy to deep dive into this with anyone interested in
> looking into this behavior.
>
> Thank you,
> Amit.
>
>
> On Wed, Mar 6, 2019 at 11:14 PM Amit Ziv-Kenet <amit.zivke...@gmail.com>
> wrote:
>
>> Hi Kenn,
>> Thank you for the the explanation!
>>
>> These points make total sense, that's why I was surprised with the
>> observed behavior, which breaks at least points 3 and 4.
>> I'll try to extract and share a minimal working example which
>> demonstrates this behavior.
>>
>> Thank you,
>> Amit.
>>
>> On Wed, Mar 6, 2019 at 8:11 PM Kenneth Knowles <k...@apache.org> wrote:
>>
>>> What you describe is not expected. Here are the relevant points, I think:
>>>
>>>  - A window is expired when the watermark is past the end of the
>>> window + allowed lateness
>>>  - An element is droppable when it is associated to an expired window
>>>  - All droppable elements should be dropped before reaching the stateful
>>> ParDo step
>>>  - The state and processing time timers for a particular key+window pair
>>> are garbage collected when the window is expired, because it is known that
>>> nothing can cause that state to be read
>>>  - A key+window state is not cleared until all event time timers have
>>> been dispatched
>>>
>>> Do these make sense? I'd love to see more detail of your pipeline code.
>>> One thing to note is that an element being behind the watermark doesn't
>>> really matter. What matters is how the watermark relates to its window.
>>>
>>> Kenn
>>>
>>> On Wed, Mar 6, 2019 at 4:28 AM Amit Ziv-Kenet <amit.zivke...@gmail.com>
>>> wrote:
>>>
>>>> Hello,
>>>>
>>>> I'm encountering an unexpected behavior in one of our pipeline, I hope
>>>> you might help me make sense of it.
>>>>
>>>> This is a streaming pipeline implemented with the Java SDK (2.10) and
>>>> running on Dataflow.
>>>>
>>>> * In the pipeline a FixedWindow is applied to the data with an allowed
>>>> lateness of a week (doesn't really matter).
>>>> * The windowed PCollection is then used in 2 separate branches: one
>>>> regular GroupByKey and and a second one which is Transform which makes have
>>>> use of State and Timers.
>>>> * In the stateful transform, incoming elements are added to a BagState
>>>> container until some condition on the elements is reached, and the elements
>>>> in the BagState are dispatched downstream. A timer makes sure that the
>>>> BagState is dispatched even if the condition is not met after some timeout
>>>> has expired.
>>>>
>>>> Occasionally very late data enters the pipeline, with timestamps older
>>>> than the allowed lateness.
>>>> In these cases, the GroupByKey transform behaves as expected, and there
>>>> aren't any panes with the late data output downstream.
>>>> In the stateful transform, on the other hand, I see that these late
>>>> elements are processed and added to the BagState, but at a later point in
>>>> time (e.g. when the timer is triggered) the elements "disappear" from the
>>>> BagState and are no longer observable. This can happen multiple times, i.e.
>>>> late elements added to the BagState and then disappearing at a later time.
>>>>
>>>> Is this an expected behavior? Is there some sort of late data "garbage
>>>> collection" which eventually removes late elements?
>>>>
>>>> Thank you for your help, I hope my description is clear enough.
>>>>
>>>> Regards,
>>>> Amit.
>>>>
>>>

Reply via email to