Current Beam model does not guarantee an ordering after a GBK (i.e. Combine.perKey() in your). So you cannot expect that the C step sees elements in a specific order.
As I recall on Dataflow runner, there is very limited ordering support. Hi +Reuven Lax <re...@google.com> can share your insights about it? -Rui On Sun, Aug 23, 2020 at 8:32 PM Dongwon Kim <eastcirc...@gmail.com> wrote: > Hi, > > My Beam pipeline is designed to work with an unbounded source KafkaIO. > It roughly looks like below: > p.apply(KafkaIO.read() ...) // (A-1) > .apply(WithKeys.of(...).withKeyType(...)) > .apply(Window.into(FixedWindows.of(...))) > .apply(Combine.perKey(...)) // (B) > .apply(Window.into(new GlobalWindows())) // to have per-key stats in > (C) > .apply(ParDo.of(new MyStatefulDoFn())) // (C) > Note that (C) has its own state which is expected to be fetched and > updated by window results (B) in order of event-time. > > Now I'm writing an integration test where (A-1) is replaced by (A-2): > >> p.apply(TextIO.read().from("test.txt")) // (A-2) > > "text.txt" contains samples having a single key. > > I get a wrong result and it turns out that window results didn't feed into > (C) in order. > Is it because (A-2) makes the pipeline a bounded one? > > Q1. How to prevent this from happening? > Q2. How do you guys usually write an integration test for an unbounded one > with stateful function? > > Best, > > Dongwon >