I've encountered a seemingly correlated phenomenon with the Dataflow runner (wrote about it previously to the @users list). In my case the "disappearing" BagState elements were late elements (beyond allowed lateness) which I assumed shouldn't make it to the the stateful ParDo at all - but instead were appearing in the BagState and then disappearing.
I've implemented a small-scale example pipeline which demonstrated this behavior here <https://github.com/azk/late-samples-state>. On Thu, Apr 11, 2019 at 3:41 AM Steve Niemitz <sniem...@apache.org> wrote: > This sounds a lot like what I had reported in > https://issues.apache.org/jira/plugins/servlet/mobile#issue/BEAM-6813 > (sorry for the mobile link). > > On Wed, Apr 10, 2019 at 6:59 PM Anton Kedin <ke...@google.com> wrote: > >> Hi dev@, >> >> I am debugging a flaky test and observing an interesting problem with a >> BagState. The code in question is here >> <https://github.com/apache/beam/blob/b953645ed6db837d24284d7fe1fe091e7309f821/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.java#L363> >> [1]: >> >> - the pipeline >> <https://github.com/apache/beam/blob/b953645ed6db837d24284d7fe1fe091e7309f821/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.java#L322> >> [2] looks like: pcollection of elements -> re-window into global window -> >> assign the same dummy key -> stateful ParDo; >> >> - in the stateful ParDo I add the newly received element to the state, >> then read all the elements from the state, then do stuff to all elements; >> >> - I expect all previous and the new elements to be in the state, as >> they all are in the same (global) window and have the same key; >> >> However I noticed that sometimes I observe that the old elements are gone >> and I am only able to retrieve the new element I just added. This happens >> maybe in 10% of executions, without any changes to the code. I see this >> problem on my local machine in DirectRunner, and I think this is the reason >> for the test flake that happens in Jenkins as well. >> >> I am digging deeper to debug/fix this but wondering if anyone has seen >> something like this? Or maybe I am missing something or using the state >> incorrectly? >> >> [1]: >> https://github.com/apache/beam/blob/b953645ed6db837d24284d7fe1fe091e7309f821/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.java#L363 >> [2]: >> https://github.com/apache/beam/blob/b953645ed6db837d24284d7fe1fe091e7309f821/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.java#L322 >> >> Regards, >> Anton >> >