Current Beam model does not guarantee an ordering after a GBK (i.e.
Combine.perKey() in your). So you cannot expect that the C step sees
elements in a specific order.

As I recall on Dataflow runner, there is very limited ordering
support. Hi +Reuven
Lax <re...@google.com> can share your insights about it?


-Rui



On Sun, Aug 23, 2020 at 8:32 PM Dongwon Kim <eastcirc...@gmail.com> wrote:

> Hi,
>
> My Beam pipeline is designed to work with an unbounded source KafkaIO.
> It roughly looks like below:
> p.apply(KafkaIO.read() ...)                                   // (A-1)
>   .apply(WithKeys.of(...).withKeyType(...))
>   .apply(Window.into(FixedWindows.of(...)))
>   .apply(Combine.perKey(...))                              // (B)
>   .apply(Window.into(new GlobalWindows()))     // to have per-key stats in
> (C)
>   .apply(ParDo.of(new MyStatefulDoFn()))          // (C)
> Note that (C) has its own state which is expected to be fetched and
> updated by window results (B) in order of event-time.
>
> Now I'm writing an integration test where (A-1) is replaced by (A-2):
>
>> p.apply(TextIO.read().from("test.txt"))                  // (A-2)
>
> "text.txt" contains samples having a single key.
>
> I get a wrong result and it turns out that window results didn't feed into
> (C) in order.
> Is it because (A-2) makes the pipeline a bounded one?
>
> Q1. How to prevent this from happening?
> Q2. How do you guys usually write an integration test for an unbounded one
> with stateful function?
>
> Best,
>
> Dongwon
>

Reply via email to