Generally you should not rely on PCollection being ordered, though there
have been discussions about adding some time-ordering semantics.



On Sun, Aug 23, 2020 at 9:06 PM Rui Wang <ruw...@google.com> wrote:

> Current Beam model does not guarantee an ordering after a GBK (i.e.
> Combine.perKey() in your). So you cannot expect that the C step sees
> elements in a specific order.
>
> As I recall on Dataflow runner, there is very limited ordering support. Hi 
> +Reuven
> Lax <re...@google.com> can share your insights about it?
>
>
> -Rui
>
>
>
> On Sun, Aug 23, 2020 at 8:32 PM Dongwon Kim <eastcirc...@gmail.com> wrote:
>
>> Hi,
>>
>> My Beam pipeline is designed to work with an unbounded source KafkaIO.
>> It roughly looks like below:
>> p.apply(KafkaIO.read() ...)                                   // (A-1)
>>   .apply(WithKeys.of(...).withKeyType(...))
>>   .apply(Window.into(FixedWindows.of(...)))
>>   .apply(Combine.perKey(...))                              // (B)
>>   .apply(Window.into(new GlobalWindows()))     // to have per-key stats
>> in (C)
>>   .apply(ParDo.of(new MyStatefulDoFn()))          // (C)
>> Note that (C) has its own state which is expected to be fetched and
>> updated by window results (B) in order of event-time.
>>
>> Now I'm writing an integration test where (A-1) is replaced by (A-2):
>>
>>> p.apply(TextIO.read().from("test.txt"))                  // (A-2)
>>
>> "text.txt" contains samples having a single key.
>>
>> I get a wrong result and it turns out that window results didn't feed
>> into (C) in order.
>> Is it because (A-2) makes the pipeline a bounded one?
>>
>> Q1. How to prevent this from happening?
>> Q2. How do you guys usually write an integration test for an unbounded
>> one with stateful function?
>>
>> Best,
>>
>> Dongwon
>>
>

Reply via email to