Generally you should not rely on PCollection being ordered, though there have been discussions about adding some time-ordering semantics.
On Sun, Aug 23, 2020 at 9:06 PM Rui Wang <ruw...@google.com> wrote: > Current Beam model does not guarantee an ordering after a GBK (i.e. > Combine.perKey() in your). So you cannot expect that the C step sees > elements in a specific order. > > As I recall on Dataflow runner, there is very limited ordering support. Hi > +Reuven > Lax <re...@google.com> can share your insights about it? > > > -Rui > > > > On Sun, Aug 23, 2020 at 8:32 PM Dongwon Kim <eastcirc...@gmail.com> wrote: > >> Hi, >> >> My Beam pipeline is designed to work with an unbounded source KafkaIO. >> It roughly looks like below: >> p.apply(KafkaIO.read() ...) // (A-1) >> .apply(WithKeys.of(...).withKeyType(...)) >> .apply(Window.into(FixedWindows.of(...))) >> .apply(Combine.perKey(...)) // (B) >> .apply(Window.into(new GlobalWindows())) // to have per-key stats >> in (C) >> .apply(ParDo.of(new MyStatefulDoFn())) // (C) >> Note that (C) has its own state which is expected to be fetched and >> updated by window results (B) in order of event-time. >> >> Now I'm writing an integration test where (A-1) is replaced by (A-2): >> >>> p.apply(TextIO.read().from("test.txt")) // (A-2) >> >> "text.txt" contains samples having a single key. >> >> I get a wrong result and it turns out that window results didn't feed >> into (C) in order. >> Is it because (A-2) makes the pipeline a bounded one? >> >> Q1. How to prevent this from happening? >> Q2. How do you guys usually write an integration test for an unbounded >> one with stateful function? >> >> Best, >> >> Dongwon >> >