Re: Out-of-orderness of window results when testing stateful operators with TextIO

2020-08-24 Thread Dongwon Kim
Thanks Reuven for the input and Wang for CC'ing to Reuven.

Generally you should not rely on PCollection being ordered

Is it because Beam splits PCollection into multiple input splits and tries
to process it as efficiently as possible without considering times?
This one is very confusing as I've been using Flink for a long time; AFAIK,
Flink DataStream API guarantees ordering for the same key between two
different tasks.

Best,

Dongwon

On Tue, Aug 25, 2020 at 12:56 AM Reuven Lax  wrote:

> Generally you should not rely on PCollection being ordered, though there
> have been discussions about adding some time-ordering semantics.
>
>
>
> On Sun, Aug 23, 2020 at 9:06 PM Rui Wang  wrote:
>
>> Current Beam model does not guarantee an ordering after a GBK (i.e.
>> Combine.perKey() in your). So you cannot expect that the C step sees
>> elements in a specific order.
>>
>> As I recall on Dataflow runner, there is very limited ordering support.
>> Hi +Reuven Lax  can share your insights about it?
>>
>>
>> -Rui
>>
>>
>>
>> On Sun, Aug 23, 2020 at 8:32 PM Dongwon Kim 
>> wrote:
>>
>>> Hi,
>>>
>>> My Beam pipeline is designed to work with an unbounded source KafkaIO.
>>> It roughly looks like below:
>>> p.apply(KafkaIO.read() ...)   // (A-1)
>>>   .apply(WithKeys.of(...).withKeyType(...))
>>>   .apply(Window.into(FixedWindows.of(...)))
>>>   .apply(Combine.perKey(...))  // (B)
>>>   .apply(Window.into(new GlobalWindows())) // to have per-key stats
>>> in (C)
>>>   .apply(ParDo.of(new MyStatefulDoFn()))  // (C)
>>> Note that (C) has its own state which is expected to be fetched and
>>> updated by window results (B) in order of event-time.
>>>
>>> Now I'm writing an integration test where (A-1) is replaced by (A-2):
>>>
>>>> p.apply(TextIO.read().from("test.txt"))  // (A-2)
>>>
>>> "text.txt" contains samples having a single key.
>>>
>>> I get a wrong result and it turns out that window results didn't feed
>>> into (C) in order.
>>> Is it because (A-2) makes the pipeline a bounded one?
>>>
>>> Q1. How to prevent this from happening?
>>> Q2. How do you guys usually write an integration test for an unbounded
>>> one with stateful function?
>>>
>>> Best,
>>>
>>> Dongwon
>>>
>>


Out-of-orderness of window results when testing stateful operators with TextIO

2020-08-23 Thread Dongwon Kim
Hi,

My Beam pipeline is designed to work with an unbounded source KafkaIO.
It roughly looks like below:
p.apply(KafkaIO.read() ...)   // (A-1)
  .apply(WithKeys.of(...).withKeyType(...))
  .apply(Window.into(FixedWindows.of(...)))
  .apply(Combine.perKey(...))  // (B)
  .apply(Window.into(new GlobalWindows())) // to have per-key stats in
(C)
  .apply(ParDo.of(new MyStatefulDoFn()))  // (C)
Note that (C) has its own state which is expected to be fetched and updated
by window results (B) in order of event-time.

Now I'm writing an integration test where (A-1) is replaced by (A-2):

> p.apply(TextIO.read().from("test.txt"))  // (A-2)

"text.txt" contains samples having a single key.

I get a wrong result and it turns out that window results didn't feed into
(C) in order.
Is it because (A-2) makes the pipeline a bounded one?

Q1. How to prevent this from happening?
Q2. How do you guys usually write an integration test for an unbounded one
with stateful function?

Best,

Dongwon


Re: Support of per-key state after windowing

2020-08-23 Thread Dongwon Kim
Reuven and Kenneth,

Thanks for the tip!

Now I can get window information without having to modify the type of my
aggregator :-)

Best,

Dongwon

On Mon, Aug 24, 2020 at 3:16 AM Reuven Lax  wrote:

> Kenn - shouldn't the Reify happen before the rewindow?
>
> On Sun, Aug 23, 2020 at 11:08 AM Kenneth Knowles  wrote:
>
>>
>>
>> On Sun, Aug 23, 2020 at 1:12 PM Dongwon Kim 
>> wrote:
>>
>>> Hi Reuven,
>>>
>>> You and Kenneth are right; I thought GlobalWindows in unbounded streams
>>> need triggers.
>>>
>>> p.apply(WithKeys.of(...).withKeyType(...))   // (A)
>>>>   .apply(Window.into(FixedWindows.of(...)))// (B)
>>>>   .apply(Combine.perKey(new MyCombinFn()))  // (C)
>>>>   .apply(Window.into(new GlobalWindows()))  // (E)
>>>>   .apply(ParDo.of(new MyDoFn()))  // (D)
>>>
>>>
>>> So just adding (E) blurs windows and makes the state defined in MyDoFn
>>> (D) a per-key state.
>>> Hope I understand you and Kenneth correctly this time.
>>>
>>
>> That is correct. However, I think you may want:
>>
>> p.apply(WithKeys.of(...).withKeyType(...))   // (A)
>>>   .apply(Window.into(FixedWindows.of(...)))// (B)
>>>   .apply(Combine.perKey(new MyCombinFn()))  // (C)
>>>   .apply(Window.into(new GlobalWindows()))  // (E)
>>
>>
>> .apply(Reify.windowsInValue()
>> <https://beam.apache.org/releases/javadoc/2.23.0/index.html?org/apache/beam/sdk/transforms/Reify.html>)
>>  // (G)
>>
>>
>>>   .apply(ParDo.of(new MyDoFn()))  // (D)
>>
>>
>> This will make the window information from (B) & (C) available to MyDoFn
>> in (D)
>>
>> Kenn
>>
>>
>>>
>>> Best,
>>>
>>> Dongwon
>>>
>>> On Mon, Aug 24, 2020 at 1:51 AM Reuven Lax  wrote:
>>>
>>>> You could simply window into GlobalWindows and add a stateful DoFn
>>>> afterwards. No need for the triggering and GroupByKey.
>>>>
>>>> On Sun, Aug 23, 2020 at 9:45 AM Dongwon Kim 
>>>> wrote:
>>>>
>>>>> Hi Kenneth,
>>>>>
>>>>> According to your suggestion, I modified my pipeline as follows:
>>>>>
>>>>> p.apply(WithKeys.of(...).withKeyType(...))
>>>>>>  // (A)
>>>>>>   .apply(Window.into(FixedWindows.of(...)))
>>>>>>// (B)
>>>>>>   .apply(Combine.perKey(new MyCombinFn()))//
>>>>>> (C)
>>>>>>   .apply(
>>>>>> Window
>>>>>>   .into(new GlobalWindows())
>>>>>>   // (E1)
>>>>>>   .triggering(
>>>>>> Repeatedly.forever(AfterPane.elementCountAtLeast(1)   // (E2)
>>>>>>   )
>>>>>>   .accumulatingFiredPanes()
>>>>>>// (E3)
>>>>>>   )
>>>>>>   .apply(GroupByKey.create())
>>>>>>// (F)
>>>>>>   .apply(ParDo.of(new MyDoFn()))
>>>>>>   // (D)
>>>>>
>>>>>
>>>>> I had to include (E1), (E2), (E3), and (F) so that MyDoFn (D) can
>>>>> iterate over a list of output records from (C) sharing the same key.
>>>>> This way I can achieve the same effect without having a per-key state
>>>>> at (D).
>>>>>
>>>>> Do I understand your intention correctly?
>>>>> If not, please advise me with some hints on it.
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Dongwon
>>>>>
>>>>>
>>>>> On Sun, Aug 23, 2020 at 5:10 AM Kenneth Knowles 
>>>>> wrote:
>>>>>
>>>>>> Hi Dongwon,
>>>>>>
>>>>>> On Sat, Aug 22, 2020 at 2:46 PM Dongwon Kim 
>>>>>> wrote:
>>>>>>
>>>>>>> Hi all,
>>>>>>>
>>>>>>> I'm using Beam 2.23.0 with FlinkRunner and a part of my unbounded
>>>>>>> pipeline looks like below:
>>>>>>>
>>>>>>>> p.apply(WithKeys.of(...).withKeyType(...)) // (A)
>>>>>>>>   .apply(Window.into(FixedWindows.of(...)))// (B)
>>>>>>>
>>>>>>>   .apply(Combine.perKey(new MyCombinFn()))  // (C)
>>>>>>>
>>>>>>>   .apply(ParDo.of(new MyDoFn()))  // (D)
>>>>>>>
>>>>>>>
>>>>>>> What I want to do is
>>>>>>> (1) to group data by key (A) and window (B),
>>>>>>> (2) to do some aggregation (C)
>>>>>>> (3) to perform the final computation on each group (D)
>>>>>>>
>>>>>>> I've noticed that a ValueState for a particular key is NULL whenever
>>>>>>> a new window for the key is arriving, which gives me a feeling that Beam
>>>>>>> seems to support only per-key+window state, not per-key state, after
>>>>>>> windowing.
>>>>>>>
>>>>>>> I usually work with Flink DataStream API and Flink supports both
>>>>>>> per-key state and per-key+window state [1].
>>>>>>>
>>>>>>> Does Beam support per-key states, not per-key+window states, after
>>>>>>> windowing (D)? If I miss something, please correct me.
>>>>>>>
>>>>>>
>>>>>> You understand correctly - Beam does not include per-key state that
>>>>>> crosses window boundaries. If I understand your goal correctly, you can
>>>>>> achieve the same effect by copying the window metadata into the element 
>>>>>> and
>>>>>> then re-windowing into the global window before (D).
>>>>>>
>>>>>> Kenn
>>>>>>
>>>>>>
>>>>>>>
>>>>>>> [1]
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#using-per-window-state-in-processwindowfunction
>>>>>>>
>>>>>>> Best,
>>>>>>>
>>>>>>> Dongwon
>>>>>>>
>>>>>>>


Re: Support of per-key state after windowing

2020-08-23 Thread Dongwon Kim
Hi Reuven,

You and Kenneth are right; I thought GlobalWindows in unbounded streams
need triggers.

p.apply(WithKeys.of(...).withKeyType(...))   // (A)
>   .apply(Window.into(FixedWindows.of(...)))// (B)
>   .apply(Combine.perKey(new MyCombinFn()))  // (C)
>   .apply(Window.into(new GlobalWindows()))  // (E)
>   .apply(ParDo.of(new MyDoFn()))  // (D)


So just adding (E) blurs windows and makes the state defined in MyDoFn (D)
a per-key state.
Hope I understand you and Kenneth correctly this time.

Best,

Dongwon

On Mon, Aug 24, 2020 at 1:51 AM Reuven Lax  wrote:

> You could simply window into GlobalWindows and add a stateful DoFn
> afterwards. No need for the triggering and GroupByKey.
>
> On Sun, Aug 23, 2020 at 9:45 AM Dongwon Kim  wrote:
>
>> Hi Kenneth,
>>
>> According to your suggestion, I modified my pipeline as follows:
>>
>> p.apply(WithKeys.of(...).withKeyType(...))
>>>  // (A)
>>>   .apply(Window.into(FixedWindows.of(...)))
>>>  // (B)
>>>   .apply(Combine.perKey(new MyCombinFn()))// (C)
>>>   .apply(
>>> Window
>>>   .into(new GlobalWindows())
>>> // (E1)
>>>   .triggering(
>>> Repeatedly.forever(AfterPane.elementCountAtLeast(1)   // (E2)
>>>   )
>>>   .accumulatingFiredPanes()
>>>// (E3)
>>>   )
>>>   .apply(GroupByKey.create())
>>>// (F)
>>>   .apply(ParDo.of(new MyDoFn()))
>>> // (D)
>>
>>
>> I had to include (E1), (E2), (E3), and (F) so that MyDoFn (D) can iterate
>> over a list of output records from (C) sharing the same key.
>> This way I can achieve the same effect without having a per-key state at
>> (D).
>>
>> Do I understand your intention correctly?
>> If not, please advise me with some hints on it.
>>
>> Thanks,
>>
>> Dongwon
>>
>>
>> On Sun, Aug 23, 2020 at 5:10 AM Kenneth Knowles  wrote:
>>
>>> Hi Dongwon,
>>>
>>> On Sat, Aug 22, 2020 at 2:46 PM Dongwon Kim 
>>> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I'm using Beam 2.23.0 with FlinkRunner and a part of my unbounded
>>>> pipeline looks like below:
>>>>
>>>>> p.apply(WithKeys.of(...).withKeyType(...)) // (A)
>>>>>   .apply(Window.into(FixedWindows.of(...)))// (B)
>>>>
>>>>   .apply(Combine.perKey(new MyCombinFn()))  // (C)
>>>>
>>>>   .apply(ParDo.of(new MyDoFn()))  // (D)
>>>>
>>>>
>>>> What I want to do is
>>>> (1) to group data by key (A) and window (B),
>>>> (2) to do some aggregation (C)
>>>> (3) to perform the final computation on each group (D)
>>>>
>>>> I've noticed that a ValueState for a particular key is NULL whenever a
>>>> new window for the key is arriving, which gives me a feeling that Beam
>>>> seems to support only per-key+window state, not per-key state, after
>>>> windowing.
>>>>
>>>> I usually work with Flink DataStream API and Flink supports both
>>>> per-key state and per-key+window state [1].
>>>>
>>>> Does Beam support per-key states, not per-key+window states, after
>>>> windowing (D)? If I miss something, please correct me.
>>>>
>>>
>>> You understand correctly - Beam does not include per-key state that
>>> crosses window boundaries. If I understand your goal correctly, you can
>>> achieve the same effect by copying the window metadata into the element and
>>> then re-windowing into the global window before (D).
>>>
>>> Kenn
>>>
>>>
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#using-per-window-state-in-processwindowfunction
>>>>
>>>> Best,
>>>>
>>>> Dongwon
>>>>
>>>>


Re: Support of per-key state after windowing

2020-08-23 Thread Dongwon Kim
Hi Kenneth,

According to your suggestion, I modified my pipeline as follows:

p.apply(WithKeys.of(...).withKeyType(...))
>  // (A)
>   .apply(Window.into(FixedWindows.of(...)))
>  // (B)
>   .apply(Combine.perKey(new MyCombinFn()))// (C)
>   .apply(
> Window
>   .into(new GlobalWindows())
>   // (E1)
>   .triggering(
> Repeatedly.forever(AfterPane.elementCountAtLeast(1)   // (E2)
>   )
>   .accumulatingFiredPanes()
>  // (E3)
>   )
>   .apply(GroupByKey.create())
>  // (F)
>   .apply(ParDo.of(new MyDoFn()))
>   // (D)


I had to include (E1), (E2), (E3), and (F) so that MyDoFn (D) can iterate
over a list of output records from (C) sharing the same key.
This way I can achieve the same effect without having a per-key state at
(D).

Do I understand your intention correctly?
If not, please advise me with some hints on it.

Thanks,

Dongwon


On Sun, Aug 23, 2020 at 5:10 AM Kenneth Knowles  wrote:

> Hi Dongwon,
>
> On Sat, Aug 22, 2020 at 2:46 PM Dongwon Kim  wrote:
>
>> Hi all,
>>
>> I'm using Beam 2.23.0 with FlinkRunner and a part of my unbounded
>> pipeline looks like below:
>>
>>> p.apply(WithKeys.of(...).withKeyType(...)) // (A)
>>>   .apply(Window.into(FixedWindows.of(...)))// (B)
>>
>>   .apply(Combine.perKey(new MyCombinFn()))  // (C)
>>
>>   .apply(ParDo.of(new MyDoFn()))  // (D)
>>
>>
>> What I want to do is
>> (1) to group data by key (A) and window (B),
>> (2) to do some aggregation (C)
>> (3) to perform the final computation on each group (D)
>>
>> I've noticed that a ValueState for a particular key is NULL whenever a
>> new window for the key is arriving, which gives me a feeling that Beam
>> seems to support only per-key+window state, not per-key state, after
>> windowing.
>>
>> I usually work with Flink DataStream API and Flink supports both per-key
>> state and per-key+window state [1].
>>
>> Does Beam support per-key states, not per-key+window states, after
>> windowing (D)? If I miss something, please correct me.
>>
>
> You understand correctly - Beam does not include per-key state that
> crosses window boundaries. If I understand your goal correctly, you can
> achieve the same effect by copying the window metadata into the element and
> then re-windowing into the global window before (D).
>
> Kenn
>
>
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#using-per-window-state-in-processwindowfunction
>>
>> Best,
>>
>> Dongwon
>>
>>


Support of per-key state after windowing

2020-08-22 Thread Dongwon Kim
Hi all,

I'm using Beam 2.23.0 with FlinkRunner and a part of my unbounded pipeline
looks like below:

> p.apply(WithKeys.of(...).withKeyType(...)) // (A)
>   .apply(Window.into(FixedWindows.of(...)))// (B)

  .apply(Combine.perKey(new MyCombinFn()))  // (C)

  .apply(ParDo.of(new MyDoFn()))  // (D)


What I want to do is
(1) to group data by key (A) and window (B),
(2) to do some aggregation (C)
(3) to perform the final computation on each group (D)

I've noticed that a ValueState for a particular key is NULL whenever a new
window for the key is arriving, which gives me a feeling that Beam seems to
support only per-key+window state, not per-key state, after windowing.

I usually work with Flink DataStream API and Flink supports both per-key
state and per-key+window state [1].

Does Beam support per-key states, not per-key+window states, after
windowing (D)? If I miss something, please correct me.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#using-per-window-state-in-processwindowfunction

Best,

Dongwon