Re: Out-of-orderness of window results when testing stateful operators with TextIO

2020-08-26 Thread Luke Cwik
Splitting is part of the issue.

Other example issues are:
* "sources" that input data into the pipeline have no requirement to
produce records in a time ordered manner.
* timers can hold the output watermark and produce records out of order
with time.

All of this time ordering has a cost to performance and throughput so being
explicit that something needs time ordered input is useful.

On Mon, Aug 24, 2020 at 9:07 PM Dongwon Kim  wrote:

> Thanks Reuven for the input and Wang for CC'ing to Reuven.
>
> Generally you should not rely on PCollection being ordered
>
> Is it because Beam splits PCollection into multiple input splits and tries
> to process it as efficiently as possible without considering times?
> This one is very confusing as I've been using Flink for a long time;
> AFAIK, Flink DataStream API guarantees ordering for the same key between
> two different tasks.
>
> Best,
>
> Dongwon
>
> On Tue, Aug 25, 2020 at 12:56 AM Reuven Lax  wrote:
>
>> Generally you should not rely on PCollection being ordered, though there
>> have been discussions about adding some time-ordering semantics.
>>
>>
>>
>> On Sun, Aug 23, 2020 at 9:06 PM Rui Wang  wrote:
>>
>>> Current Beam model does not guarantee an ordering after a GBK (i.e.
>>> Combine.perKey() in your). So you cannot expect that the C step sees
>>> elements in a specific order.
>>>
>>> As I recall on Dataflow runner, there is very limited ordering support.
>>> Hi +Reuven Lax  can share your insights about it?
>>>
>>>
>>> -Rui
>>>
>>>
>>>
>>> On Sun, Aug 23, 2020 at 8:32 PM Dongwon Kim 
>>> wrote:
>>>
 Hi,

 My Beam pipeline is designed to work with an unbounded source KafkaIO.
 It roughly looks like below:
 p.apply(KafkaIO.read() ...)   // (A-1)
   .apply(WithKeys.of(...).withKeyType(...))
   .apply(Window.into(FixedWindows.of(...)))
   .apply(Combine.perKey(...))  // (B)
   .apply(Window.into(new GlobalWindows())) // to have per-key stats
 in (C)
   .apply(ParDo.of(new MyStatefulDoFn()))  // (C)
 Note that (C) has its own state which is expected to be fetched and
 updated by window results (B) in order of event-time.

 Now I'm writing an integration test where (A-1) is replaced by (A-2):

> p.apply(TextIO.read().from("test.txt"))  // (A-2)

 "text.txt" contains samples having a single key.

 I get a wrong result and it turns out that window results didn't feed
 into (C) in order.
 Is it because (A-2) makes the pipeline a bounded one?

 Q1. How to prevent this from happening?
 Q2. How do you guys usually write an integration test for an unbounded
 one with stateful function?

 Best,

 Dongwon

>>>


Re: Out-of-orderness of window results when testing stateful operators with TextIO

2020-08-24 Thread Dongwon Kim
Thanks Reuven for the input and Wang for CC'ing to Reuven.

Generally you should not rely on PCollection being ordered

Is it because Beam splits PCollection into multiple input splits and tries
to process it as efficiently as possible without considering times?
This one is very confusing as I've been using Flink for a long time; AFAIK,
Flink DataStream API guarantees ordering for the same key between two
different tasks.

Best,

Dongwon

On Tue, Aug 25, 2020 at 12:56 AM Reuven Lax  wrote:

> Generally you should not rely on PCollection being ordered, though there
> have been discussions about adding some time-ordering semantics.
>
>
>
> On Sun, Aug 23, 2020 at 9:06 PM Rui Wang  wrote:
>
>> Current Beam model does not guarantee an ordering after a GBK (i.e.
>> Combine.perKey() in your). So you cannot expect that the C step sees
>> elements in a specific order.
>>
>> As I recall on Dataflow runner, there is very limited ordering support.
>> Hi +Reuven Lax  can share your insights about it?
>>
>>
>> -Rui
>>
>>
>>
>> On Sun, Aug 23, 2020 at 8:32 PM Dongwon Kim 
>> wrote:
>>
>>> Hi,
>>>
>>> My Beam pipeline is designed to work with an unbounded source KafkaIO.
>>> It roughly looks like below:
>>> p.apply(KafkaIO.read() ...)   // (A-1)
>>>   .apply(WithKeys.of(...).withKeyType(...))
>>>   .apply(Window.into(FixedWindows.of(...)))
>>>   .apply(Combine.perKey(...))  // (B)
>>>   .apply(Window.into(new GlobalWindows())) // to have per-key stats
>>> in (C)
>>>   .apply(ParDo.of(new MyStatefulDoFn()))  // (C)
>>> Note that (C) has its own state which is expected to be fetched and
>>> updated by window results (B) in order of event-time.
>>>
>>> Now I'm writing an integration test where (A-1) is replaced by (A-2):
>>>
 p.apply(TextIO.read().from("test.txt"))  // (A-2)
>>>
>>> "text.txt" contains samples having a single key.
>>>
>>> I get a wrong result and it turns out that window results didn't feed
>>> into (C) in order.
>>> Is it because (A-2) makes the pipeline a bounded one?
>>>
>>> Q1. How to prevent this from happening?
>>> Q2. How do you guys usually write an integration test for an unbounded
>>> one with stateful function?
>>>
>>> Best,
>>>
>>> Dongwon
>>>
>>


Re: Out-of-orderness of window results when testing stateful operators with TextIO

2020-08-24 Thread Robert Bradshaw
As for the question of writing tests in the face of non-determinism,
you should look into TestStream. MyStatefulDoFn still needs to be
updated to not assume an ordering. (This can be done by setting timers
that provide guarantees that (modulo late data) one has seen all data
up to a certain timestamp.)

On Mon, Aug 24, 2020 at 8:56 AM Reuven Lax  wrote:
>
> Generally you should not rely on PCollection being ordered, though there have 
> been discussions about adding some time-ordering semantics.
>
>
>
> On Sun, Aug 23, 2020 at 9:06 PM Rui Wang  wrote:
>>
>> Current Beam model does not guarantee an ordering after a GBK (i.e. 
>> Combine.perKey() in your). So you cannot expect that the C step sees 
>> elements in a specific order.
>>
>> As I recall on Dataflow runner, there is very limited ordering support. Hi 
>> +Reuven Lax can share your insights about it?
>>
>>
>> -Rui
>>
>>
>>
>> On Sun, Aug 23, 2020 at 8:32 PM Dongwon Kim  wrote:
>>>
>>> Hi,
>>>
>>> My Beam pipeline is designed to work with an unbounded source KafkaIO.
>>> It roughly looks like below:
>>> p.apply(KafkaIO.read() ...)   // (A-1)
>>>   .apply(WithKeys.of(...).withKeyType(...))
>>>   .apply(Window.into(FixedWindows.of(...)))
>>>   .apply(Combine.perKey(...))  // (B)
>>>   .apply(Window.into(new GlobalWindows())) // to have per-key stats in 
>>> (C)
>>>   .apply(ParDo.of(new MyStatefulDoFn()))  // (C)
>>> Note that (C) has its own state which is expected to be fetched and updated 
>>> by window results (B) in order of event-time.
>>>
>>> Now I'm writing an integration test where (A-1) is replaced by (A-2):

 p.apply(TextIO.read().from("test.txt"))  // (A-2)
>>>
>>> "text.txt" contains samples having a single key.
>>>
>>> I get a wrong result and it turns out that window results didn't feed into 
>>> (C) in order.
>>> Is it because (A-2) makes the pipeline a bounded one?
>>>
>>> Q1. How to prevent this from happening?
>>> Q2. How do you guys usually write an integration test for an unbounded one 
>>> with stateful function?
>>>
>>> Best,
>>>
>>> Dongwon


Re: Out-of-orderness of window results when testing stateful operators with TextIO

2020-08-24 Thread Reuven Lax
Generally you should not rely on PCollection being ordered, though there
have been discussions about adding some time-ordering semantics.



On Sun, Aug 23, 2020 at 9:06 PM Rui Wang  wrote:

> Current Beam model does not guarantee an ordering after a GBK (i.e.
> Combine.perKey() in your). So you cannot expect that the C step sees
> elements in a specific order.
>
> As I recall on Dataflow runner, there is very limited ordering support. Hi 
> +Reuven
> Lax  can share your insights about it?
>
>
> -Rui
>
>
>
> On Sun, Aug 23, 2020 at 8:32 PM Dongwon Kim  wrote:
>
>> Hi,
>>
>> My Beam pipeline is designed to work with an unbounded source KafkaIO.
>> It roughly looks like below:
>> p.apply(KafkaIO.read() ...)   // (A-1)
>>   .apply(WithKeys.of(...).withKeyType(...))
>>   .apply(Window.into(FixedWindows.of(...)))
>>   .apply(Combine.perKey(...))  // (B)
>>   .apply(Window.into(new GlobalWindows())) // to have per-key stats
>> in (C)
>>   .apply(ParDo.of(new MyStatefulDoFn()))  // (C)
>> Note that (C) has its own state which is expected to be fetched and
>> updated by window results (B) in order of event-time.
>>
>> Now I'm writing an integration test where (A-1) is replaced by (A-2):
>>
>>> p.apply(TextIO.read().from("test.txt"))  // (A-2)
>>
>> "text.txt" contains samples having a single key.
>>
>> I get a wrong result and it turns out that window results didn't feed
>> into (C) in order.
>> Is it because (A-2) makes the pipeline a bounded one?
>>
>> Q1. How to prevent this from happening?
>> Q2. How do you guys usually write an integration test for an unbounded
>> one with stateful function?
>>
>> Best,
>>
>> Dongwon
>>
>


Re: Out-of-orderness of window results when testing stateful operators with TextIO

2020-08-23 Thread Rui Wang
Current Beam model does not guarantee an ordering after a GBK (i.e.
Combine.perKey() in your). So you cannot expect that the C step sees
elements in a specific order.

As I recall on Dataflow runner, there is very limited ordering
support. Hi +Reuven
Lax  can share your insights about it?


-Rui



On Sun, Aug 23, 2020 at 8:32 PM Dongwon Kim  wrote:

> Hi,
>
> My Beam pipeline is designed to work with an unbounded source KafkaIO.
> It roughly looks like below:
> p.apply(KafkaIO.read() ...)   // (A-1)
>   .apply(WithKeys.of(...).withKeyType(...))
>   .apply(Window.into(FixedWindows.of(...)))
>   .apply(Combine.perKey(...))  // (B)
>   .apply(Window.into(new GlobalWindows())) // to have per-key stats in
> (C)
>   .apply(ParDo.of(new MyStatefulDoFn()))  // (C)
> Note that (C) has its own state which is expected to be fetched and
> updated by window results (B) in order of event-time.
>
> Now I'm writing an integration test where (A-1) is replaced by (A-2):
>
>> p.apply(TextIO.read().from("test.txt"))  // (A-2)
>
> "text.txt" contains samples having a single key.
>
> I get a wrong result and it turns out that window results didn't feed into
> (C) in order.
> Is it because (A-2) makes the pipeline a bounded one?
>
> Q1. How to prevent this from happening?
> Q2. How do you guys usually write an integration test for an unbounded one
> with stateful function?
>
> Best,
>
> Dongwon
>


Out-of-orderness of window results when testing stateful operators with TextIO

2020-08-23 Thread Dongwon Kim
Hi,

My Beam pipeline is designed to work with an unbounded source KafkaIO.
It roughly looks like below:
p.apply(KafkaIO.read() ...)   // (A-1)
  .apply(WithKeys.of(...).withKeyType(...))
  .apply(Window.into(FixedWindows.of(...)))
  .apply(Combine.perKey(...))  // (B)
  .apply(Window.into(new GlobalWindows())) // to have per-key stats in
(C)
  .apply(ParDo.of(new MyStatefulDoFn()))  // (C)
Note that (C) has its own state which is expected to be fetched and updated
by window results (B) in order of event-time.

Now I'm writing an integration test where (A-1) is replaced by (A-2):

> p.apply(TextIO.read().from("test.txt"))  // (A-2)

"text.txt" contains samples having a single key.

I get a wrong result and it turns out that window results didn't feed into
(C) in order.
Is it because (A-2) makes the pipeline a bounded one?

Q1. How to prevent this from happening?
Q2. How do you guys usually write an integration test for an unbounded one
with stateful function?

Best,

Dongwon