Re: Out-of-orderness of window results when testing stateful operators with TextIO
Thanks Reuven for the input and Wang for CC'ing to Reuven. Generally you should not rely on PCollection being ordered Is it because Beam splits PCollection into multiple input splits and tries to process it as efficiently as possible without considering times? This one is very confusing as I've been using Flink for a long time; AFAIK, Flink DataStream API guarantees ordering for the same key between two different tasks. Best, Dongwon On Tue, Aug 25, 2020 at 12:56 AM Reuven Lax wrote: > Generally you should not rely on PCollection being ordered, though there > have been discussions about adding some time-ordering semantics. > > > > On Sun, Aug 23, 2020 at 9:06 PM Rui Wang wrote: > >> Current Beam model does not guarantee an ordering after a GBK (i.e. >> Combine.perKey() in your). So you cannot expect that the C step sees >> elements in a specific order. >> >> As I recall on Dataflow runner, there is very limited ordering support. >> Hi +Reuven Lax can share your insights about it? >> >> >> -Rui >> >> >> >> On Sun, Aug 23, 2020 at 8:32 PM Dongwon Kim >> wrote: >> >>> Hi, >>> >>> My Beam pipeline is designed to work with an unbounded source KafkaIO. >>> It roughly looks like below: >>> p.apply(KafkaIO.read() ...) // (A-1) >>> .apply(WithKeys.of(...).withKeyType(...)) >>> .apply(Window.into(FixedWindows.of(...))) >>> .apply(Combine.perKey(...)) // (B) >>> .apply(Window.into(new GlobalWindows())) // to have per-key stats >>> in (C) >>> .apply(ParDo.of(new MyStatefulDoFn())) // (C) >>> Note that (C) has its own state which is expected to be fetched and >>> updated by window results (B) in order of event-time. >>> >>> Now I'm writing an integration test where (A-1) is replaced by (A-2): >>> >>>> p.apply(TextIO.read().from("test.txt")) // (A-2) >>> >>> "text.txt" contains samples having a single key. >>> >>> I get a wrong result and it turns out that window results didn't feed >>> into (C) in order. >>> Is it because (A-2) makes the pipeline a bounded one? >>> >>> Q1. How to prevent this from happening? >>> Q2. How do you guys usually write an integration test for an unbounded >>> one with stateful function? >>> >>> Best, >>> >>> Dongwon >>> >>
Out-of-orderness of window results when testing stateful operators with TextIO
Hi, My Beam pipeline is designed to work with an unbounded source KafkaIO. It roughly looks like below: p.apply(KafkaIO.read() ...) // (A-1) .apply(WithKeys.of(...).withKeyType(...)) .apply(Window.into(FixedWindows.of(...))) .apply(Combine.perKey(...)) // (B) .apply(Window.into(new GlobalWindows())) // to have per-key stats in (C) .apply(ParDo.of(new MyStatefulDoFn())) // (C) Note that (C) has its own state which is expected to be fetched and updated by window results (B) in order of event-time. Now I'm writing an integration test where (A-1) is replaced by (A-2): > p.apply(TextIO.read().from("test.txt")) // (A-2) "text.txt" contains samples having a single key. I get a wrong result and it turns out that window results didn't feed into (C) in order. Is it because (A-2) makes the pipeline a bounded one? Q1. How to prevent this from happening? Q2. How do you guys usually write an integration test for an unbounded one with stateful function? Best, Dongwon
Re: Support of per-key state after windowing
Reuven and Kenneth, Thanks for the tip! Now I can get window information without having to modify the type of my aggregator :-) Best, Dongwon On Mon, Aug 24, 2020 at 3:16 AM Reuven Lax wrote: > Kenn - shouldn't the Reify happen before the rewindow? > > On Sun, Aug 23, 2020 at 11:08 AM Kenneth Knowles wrote: > >> >> >> On Sun, Aug 23, 2020 at 1:12 PM Dongwon Kim >> wrote: >> >>> Hi Reuven, >>> >>> You and Kenneth are right; I thought GlobalWindows in unbounded streams >>> need triggers. >>> >>> p.apply(WithKeys.of(...).withKeyType(...)) // (A) >>>> .apply(Window.into(FixedWindows.of(...)))// (B) >>>> .apply(Combine.perKey(new MyCombinFn())) // (C) >>>> .apply(Window.into(new GlobalWindows())) // (E) >>>> .apply(ParDo.of(new MyDoFn())) // (D) >>> >>> >>> So just adding (E) blurs windows and makes the state defined in MyDoFn >>> (D) a per-key state. >>> Hope I understand you and Kenneth correctly this time. >>> >> >> That is correct. However, I think you may want: >> >> p.apply(WithKeys.of(...).withKeyType(...)) // (A) >>> .apply(Window.into(FixedWindows.of(...)))// (B) >>> .apply(Combine.perKey(new MyCombinFn())) // (C) >>> .apply(Window.into(new GlobalWindows())) // (E) >> >> >> .apply(Reify.windowsInValue() >> <https://beam.apache.org/releases/javadoc/2.23.0/index.html?org/apache/beam/sdk/transforms/Reify.html>) >> // (G) >> >> >>> .apply(ParDo.of(new MyDoFn())) // (D) >> >> >> This will make the window information from (B) & (C) available to MyDoFn >> in (D) >> >> Kenn >> >> >>> >>> Best, >>> >>> Dongwon >>> >>> On Mon, Aug 24, 2020 at 1:51 AM Reuven Lax wrote: >>> >>>> You could simply window into GlobalWindows and add a stateful DoFn >>>> afterwards. No need for the triggering and GroupByKey. >>>> >>>> On Sun, Aug 23, 2020 at 9:45 AM Dongwon Kim >>>> wrote: >>>> >>>>> Hi Kenneth, >>>>> >>>>> According to your suggestion, I modified my pipeline as follows: >>>>> >>>>> p.apply(WithKeys.of(...).withKeyType(...)) >>>>>> // (A) >>>>>> .apply(Window.into(FixedWindows.of(...))) >>>>>>// (B) >>>>>> .apply(Combine.perKey(new MyCombinFn()))// >>>>>> (C) >>>>>> .apply( >>>>>> Window >>>>>> .into(new GlobalWindows()) >>>>>> // (E1) >>>>>> .triggering( >>>>>> Repeatedly.forever(AfterPane.elementCountAtLeast(1) // (E2) >>>>>> ) >>>>>> .accumulatingFiredPanes() >>>>>>// (E3) >>>>>> ) >>>>>> .apply(GroupByKey.create()) >>>>>>// (F) >>>>>> .apply(ParDo.of(new MyDoFn())) >>>>>> // (D) >>>>> >>>>> >>>>> I had to include (E1), (E2), (E3), and (F) so that MyDoFn (D) can >>>>> iterate over a list of output records from (C) sharing the same key. >>>>> This way I can achieve the same effect without having a per-key state >>>>> at (D). >>>>> >>>>> Do I understand your intention correctly? >>>>> If not, please advise me with some hints on it. >>>>> >>>>> Thanks, >>>>> >>>>> Dongwon >>>>> >>>>> >>>>> On Sun, Aug 23, 2020 at 5:10 AM Kenneth Knowles >>>>> wrote: >>>>> >>>>>> Hi Dongwon, >>>>>> >>>>>> On Sat, Aug 22, 2020 at 2:46 PM Dongwon Kim >>>>>> wrote: >>>>>> >>>>>>> Hi all, >>>>>>> >>>>>>> I'm using Beam 2.23.0 with FlinkRunner and a part of my unbounded >>>>>>> pipeline looks like below: >>>>>>> >>>>>>>> p.apply(WithKeys.of(...).withKeyType(...)) // (A) >>>>>>>> .apply(Window.into(FixedWindows.of(...)))// (B) >>>>>>> >>>>>>> .apply(Combine.perKey(new MyCombinFn())) // (C) >>>>>>> >>>>>>> .apply(ParDo.of(new MyDoFn())) // (D) >>>>>>> >>>>>>> >>>>>>> What I want to do is >>>>>>> (1) to group data by key (A) and window (B), >>>>>>> (2) to do some aggregation (C) >>>>>>> (3) to perform the final computation on each group (D) >>>>>>> >>>>>>> I've noticed that a ValueState for a particular key is NULL whenever >>>>>>> a new window for the key is arriving, which gives me a feeling that Beam >>>>>>> seems to support only per-key+window state, not per-key state, after >>>>>>> windowing. >>>>>>> >>>>>>> I usually work with Flink DataStream API and Flink supports both >>>>>>> per-key state and per-key+window state [1]. >>>>>>> >>>>>>> Does Beam support per-key states, not per-key+window states, after >>>>>>> windowing (D)? If I miss something, please correct me. >>>>>>> >>>>>> >>>>>> You understand correctly - Beam does not include per-key state that >>>>>> crosses window boundaries. If I understand your goal correctly, you can >>>>>> achieve the same effect by copying the window metadata into the element >>>>>> and >>>>>> then re-windowing into the global window before (D). >>>>>> >>>>>> Kenn >>>>>> >>>>>> >>>>>>> >>>>>>> [1] >>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#using-per-window-state-in-processwindowfunction >>>>>>> >>>>>>> Best, >>>>>>> >>>>>>> Dongwon >>>>>>> >>>>>>>
Re: Support of per-key state after windowing
Hi Reuven, You and Kenneth are right; I thought GlobalWindows in unbounded streams need triggers. p.apply(WithKeys.of(...).withKeyType(...)) // (A) > .apply(Window.into(FixedWindows.of(...)))// (B) > .apply(Combine.perKey(new MyCombinFn())) // (C) > .apply(Window.into(new GlobalWindows())) // (E) > .apply(ParDo.of(new MyDoFn())) // (D) So just adding (E) blurs windows and makes the state defined in MyDoFn (D) a per-key state. Hope I understand you and Kenneth correctly this time. Best, Dongwon On Mon, Aug 24, 2020 at 1:51 AM Reuven Lax wrote: > You could simply window into GlobalWindows and add a stateful DoFn > afterwards. No need for the triggering and GroupByKey. > > On Sun, Aug 23, 2020 at 9:45 AM Dongwon Kim wrote: > >> Hi Kenneth, >> >> According to your suggestion, I modified my pipeline as follows: >> >> p.apply(WithKeys.of(...).withKeyType(...)) >>> // (A) >>> .apply(Window.into(FixedWindows.of(...))) >>> // (B) >>> .apply(Combine.perKey(new MyCombinFn()))// (C) >>> .apply( >>> Window >>> .into(new GlobalWindows()) >>> // (E1) >>> .triggering( >>> Repeatedly.forever(AfterPane.elementCountAtLeast(1) // (E2) >>> ) >>> .accumulatingFiredPanes() >>>// (E3) >>> ) >>> .apply(GroupByKey.create()) >>>// (F) >>> .apply(ParDo.of(new MyDoFn())) >>> // (D) >> >> >> I had to include (E1), (E2), (E3), and (F) so that MyDoFn (D) can iterate >> over a list of output records from (C) sharing the same key. >> This way I can achieve the same effect without having a per-key state at >> (D). >> >> Do I understand your intention correctly? >> If not, please advise me with some hints on it. >> >> Thanks, >> >> Dongwon >> >> >> On Sun, Aug 23, 2020 at 5:10 AM Kenneth Knowles wrote: >> >>> Hi Dongwon, >>> >>> On Sat, Aug 22, 2020 at 2:46 PM Dongwon Kim >>> wrote: >>> >>>> Hi all, >>>> >>>> I'm using Beam 2.23.0 with FlinkRunner and a part of my unbounded >>>> pipeline looks like below: >>>> >>>>> p.apply(WithKeys.of(...).withKeyType(...)) // (A) >>>>> .apply(Window.into(FixedWindows.of(...)))// (B) >>>> >>>> .apply(Combine.perKey(new MyCombinFn())) // (C) >>>> >>>> .apply(ParDo.of(new MyDoFn())) // (D) >>>> >>>> >>>> What I want to do is >>>> (1) to group data by key (A) and window (B), >>>> (2) to do some aggregation (C) >>>> (3) to perform the final computation on each group (D) >>>> >>>> I've noticed that a ValueState for a particular key is NULL whenever a >>>> new window for the key is arriving, which gives me a feeling that Beam >>>> seems to support only per-key+window state, not per-key state, after >>>> windowing. >>>> >>>> I usually work with Flink DataStream API and Flink supports both >>>> per-key state and per-key+window state [1]. >>>> >>>> Does Beam support per-key states, not per-key+window states, after >>>> windowing (D)? If I miss something, please correct me. >>>> >>> >>> You understand correctly - Beam does not include per-key state that >>> crosses window boundaries. If I understand your goal correctly, you can >>> achieve the same effect by copying the window metadata into the element and >>> then re-windowing into the global window before (D). >>> >>> Kenn >>> >>> >>>> >>>> [1] >>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#using-per-window-state-in-processwindowfunction >>>> >>>> Best, >>>> >>>> Dongwon >>>> >>>>
Re: Support of per-key state after windowing
Hi Kenneth, According to your suggestion, I modified my pipeline as follows: p.apply(WithKeys.of(...).withKeyType(...)) > // (A) > .apply(Window.into(FixedWindows.of(...))) > // (B) > .apply(Combine.perKey(new MyCombinFn()))// (C) > .apply( > Window > .into(new GlobalWindows()) > // (E1) > .triggering( > Repeatedly.forever(AfterPane.elementCountAtLeast(1) // (E2) > ) > .accumulatingFiredPanes() > // (E3) > ) > .apply(GroupByKey.create()) > // (F) > .apply(ParDo.of(new MyDoFn())) > // (D) I had to include (E1), (E2), (E3), and (F) so that MyDoFn (D) can iterate over a list of output records from (C) sharing the same key. This way I can achieve the same effect without having a per-key state at (D). Do I understand your intention correctly? If not, please advise me with some hints on it. Thanks, Dongwon On Sun, Aug 23, 2020 at 5:10 AM Kenneth Knowles wrote: > Hi Dongwon, > > On Sat, Aug 22, 2020 at 2:46 PM Dongwon Kim wrote: > >> Hi all, >> >> I'm using Beam 2.23.0 with FlinkRunner and a part of my unbounded >> pipeline looks like below: >> >>> p.apply(WithKeys.of(...).withKeyType(...)) // (A) >>> .apply(Window.into(FixedWindows.of(...)))// (B) >> >> .apply(Combine.perKey(new MyCombinFn())) // (C) >> >> .apply(ParDo.of(new MyDoFn())) // (D) >> >> >> What I want to do is >> (1) to group data by key (A) and window (B), >> (2) to do some aggregation (C) >> (3) to perform the final computation on each group (D) >> >> I've noticed that a ValueState for a particular key is NULL whenever a >> new window for the key is arriving, which gives me a feeling that Beam >> seems to support only per-key+window state, not per-key state, after >> windowing. >> >> I usually work with Flink DataStream API and Flink supports both per-key >> state and per-key+window state [1]. >> >> Does Beam support per-key states, not per-key+window states, after >> windowing (D)? If I miss something, please correct me. >> > > You understand correctly - Beam does not include per-key state that > crosses window boundaries. If I understand your goal correctly, you can > achieve the same effect by copying the window metadata into the element and > then re-windowing into the global window before (D). > > Kenn > > >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#using-per-window-state-in-processwindowfunction >> >> Best, >> >> Dongwon >> >>
Support of per-key state after windowing
Hi all, I'm using Beam 2.23.0 with FlinkRunner and a part of my unbounded pipeline looks like below: > p.apply(WithKeys.of(...).withKeyType(...)) // (A) > .apply(Window.into(FixedWindows.of(...)))// (B) .apply(Combine.perKey(new MyCombinFn())) // (C) .apply(ParDo.of(new MyDoFn())) // (D) What I want to do is (1) to group data by key (A) and window (B), (2) to do some aggregation (C) (3) to perform the final computation on each group (D) I've noticed that a ValueState for a particular key is NULL whenever a new window for the key is arriving, which gives me a feeling that Beam seems to support only per-key+window state, not per-key state, after windowing. I usually work with Flink DataStream API and Flink supports both per-key state and per-key+window state [1]. Does Beam support per-key states, not per-key+window states, after windowing (D)? If I miss something, please correct me. [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#using-per-window-state-in-processwindowfunction Best, Dongwon