As for the question of writing tests in the face of non-determinism, you should look into TestStream. MyStatefulDoFn still needs to be updated to not assume an ordering. (This can be done by setting timers that provide guarantees that (modulo late data) one has seen all data up to a certain timestamp.)
On Mon, Aug 24, 2020 at 8:56 AM Reuven Lax <re...@google.com> wrote: > > Generally you should not rely on PCollection being ordered, though there have > been discussions about adding some time-ordering semantics. > > > > On Sun, Aug 23, 2020 at 9:06 PM Rui Wang <ruw...@google.com> wrote: >> >> Current Beam model does not guarantee an ordering after a GBK (i.e. >> Combine.perKey() in your). So you cannot expect that the C step sees >> elements in a specific order. >> >> As I recall on Dataflow runner, there is very limited ordering support. Hi >> +Reuven Lax can share your insights about it? >> >> >> -Rui >> >> >> >> On Sun, Aug 23, 2020 at 8:32 PM Dongwon Kim <eastcirc...@gmail.com> wrote: >>> >>> Hi, >>> >>> My Beam pipeline is designed to work with an unbounded source KafkaIO. >>> It roughly looks like below: >>> p.apply(KafkaIO.read() ...) // (A-1) >>> .apply(WithKeys.of(...).withKeyType(...)) >>> .apply(Window.into(FixedWindows.of(...))) >>> .apply(Combine.perKey(...)) // (B) >>> .apply(Window.into(new GlobalWindows())) // to have per-key stats in >>> (C) >>> .apply(ParDo.of(new MyStatefulDoFn())) // (C) >>> Note that (C) has its own state which is expected to be fetched and updated >>> by window results (B) in order of event-time. >>> >>> Now I'm writing an integration test where (A-1) is replaced by (A-2): >>>> >>>> p.apply(TextIO.read().from("test.txt")) // (A-2) >>> >>> "text.txt" contains samples having a single key. >>> >>> I get a wrong result and it turns out that window results didn't feed into >>> (C) in order. >>> Is it because (A-2) makes the pipeline a bounded one? >>> >>> Q1. How to prevent this from happening? >>> Q2. How do you guys usually write an integration test for an unbounded one >>> with stateful function? >>> >>> Best, >>> >>> Dongwon