OK, I've now read your pipeline. In the minimal pipeline, are you saying that you see logs with "STATE <event>" but then never a corresponding "STATE TIMER"?
There are couple other things to mention: - reading data from Pubsub and then using WithTimestamps is not as good as using the timestamp attribute control on PubsubIO. So you will create late / dropped data potentially. - you have a processing time timer, but when a window expires it will not fire, so you also need to set an event time timer for the end of the window + allowed lateness Kenn On Sun, Mar 17, 2019 at 6:53 AM Amit Ziv-Kenet <amit.zivke...@gmail.com> wrote: > Hello, > > I've been able to produce a (relatively) minimal pipeline which reproduces > the behavior I've previously noticed in one of our production pipelines. > > The minimal pipeline repo can be found here: > https://github.com/azk/late-samples-state > The actual pipeline is implemented in one big class here: > https://github.com/azk/late-samples-state/blob/master/src/main/java/late/samples/state/LateSamplesState.java > > Basically, this pipeline consumes events from a pubsub topic, which are > very simple json objects. The events are: > * deserialized > * each element's timestamp is extracted from the json payload > * the elements are keyed and then windowed in a one-minute fixed window > with an allowed lateness of 7 days, with processing time triggers and > accumulating panes. > > The pipeline then branches into 2 branches: > 1. A simple GroupByKey, followed by a logging step which records which > panes were fired. > 2. A stateful ParDo, which collects elements in a BagState, logs what is > currently in the Bag and sets a one minute timer, which also logs the > contents of the BagState. > > When running this pipeline in Dataflow, and publishing a few "very late" > events through the script which is part of the repo: > https://github.com/azk/late-samples-state/blob/master/scripts/publish_events.py > , > the following behavior is observed: > 1. The very late elements never appear after the GroupBy step, and are > dropped as expected. > 2. The very late elements are recorded in the stateful transform and added > to the BagState. > 3. When the timer expires, the BagState is empty and the elements that > were previously in the Bag seem to disappear. > > Is this an expected behavior or is there some subtle issue going on? > I'd be more than happy to deep dive into this with anyone interested in > looking into this behavior. > > Thank you, > Amit. > > > On Wed, Mar 6, 2019 at 11:14 PM Amit Ziv-Kenet <amit.zivke...@gmail.com> > wrote: > >> Hi Kenn, >> Thank you for the the explanation! >> >> These points make total sense, that's why I was surprised with the >> observed behavior, which breaks at least points 3 and 4. >> I'll try to extract and share a minimal working example which >> demonstrates this behavior. >> >> Thank you, >> Amit. >> >> On Wed, Mar 6, 2019 at 8:11 PM Kenneth Knowles <k...@apache.org> wrote: >> >>> What you describe is not expected. Here are the relevant points, I think: >>> >>> - A window is expired when the watermark is past the end of the >>> window + allowed lateness >>> - An element is droppable when it is associated to an expired window >>> - All droppable elements should be dropped before reaching the stateful >>> ParDo step >>> - The state and processing time timers for a particular key+window pair >>> are garbage collected when the window is expired, because it is known that >>> nothing can cause that state to be read >>> - A key+window state is not cleared until all event time timers have >>> been dispatched >>> >>> Do these make sense? I'd love to see more detail of your pipeline code. >>> One thing to note is that an element being behind the watermark doesn't >>> really matter. What matters is how the watermark relates to its window. >>> >>> Kenn >>> >>> On Wed, Mar 6, 2019 at 4:28 AM Amit Ziv-Kenet <amit.zivke...@gmail.com> >>> wrote: >>> >>>> Hello, >>>> >>>> I'm encountering an unexpected behavior in one of our pipeline, I hope >>>> you might help me make sense of it. >>>> >>>> This is a streaming pipeline implemented with the Java SDK (2.10) and >>>> running on Dataflow. >>>> >>>> * In the pipeline a FixedWindow is applied to the data with an allowed >>>> lateness of a week (doesn't really matter). >>>> * The windowed PCollection is then used in 2 separate branches: one >>>> regular GroupByKey and and a second one which is Transform which makes have >>>> use of State and Timers. >>>> * In the stateful transform, incoming elements are added to a BagState >>>> container until some condition on the elements is reached, and the elements >>>> in the BagState are dispatched downstream. A timer makes sure that the >>>> BagState is dispatched even if the condition is not met after some timeout >>>> has expired. >>>> >>>> Occasionally very late data enters the pipeline, with timestamps older >>>> than the allowed lateness. >>>> In these cases, the GroupByKey transform behaves as expected, and there >>>> aren't any panes with the late data output downstream. >>>> In the stateful transform, on the other hand, I see that these late >>>> elements are processed and added to the BagState, but at a later point in >>>> time (e.g. when the timer is triggered) the elements "disappear" from the >>>> BagState and are no longer observable. This can happen multiple times, i.e. >>>> late elements added to the BagState and then disappearing at a later time. >>>> >>>> Is this an expected behavior? Is there some sort of late data "garbage >>>> collection" which eventually removes late elements? >>>> >>>> Thank you for your help, I hope my description is clear enough. >>>> >>>> Regards, >>>> Amit. >>>> >>>