Reuven and Kenneth, Thanks for the tip!
Now I can get window information without having to modify the type of my aggregator :-) Best, Dongwon On Mon, Aug 24, 2020 at 3:16 AM Reuven Lax <re...@google.com> wrote: > Kenn - shouldn't the Reify happen before the rewindow? > > On Sun, Aug 23, 2020 at 11:08 AM Kenneth Knowles <k...@apache.org> wrote: > >> >> >> On Sun, Aug 23, 2020 at 1:12 PM Dongwon Kim <eastcirc...@gmail.com> >> wrote: >> >>> Hi Reuven, >>> >>> You and Kenneth are right; I thought GlobalWindows in unbounded streams >>> need triggers. >>> >>> p.apply(WithKeys.of(...).withKeyType(...)) // (A) >>>> .apply(Window.into(FixedWindows.of(...))) // (B) >>>> .apply(Combine.perKey(new MyCombinFn())) // (C) >>>> .apply(Window.into(new GlobalWindows())) // (E) >>>> .apply(ParDo.of(new MyDoFn())) // (D) >>> >>> >>> So just adding (E) blurs windows and makes the state defined in MyDoFn >>> (D) a per-key state. >>> Hope I understand you and Kenneth correctly this time. >>> >> >> That is correct. However, I think you may want: >> >> p.apply(WithKeys.of(...).withKeyType(...)) // (A) >>> .apply(Window.into(FixedWindows.of(...))) // (B) >>> .apply(Combine.perKey(new MyCombinFn())) // (C) >>> .apply(Window.into(new GlobalWindows())) // (E) >> >> >> .apply(Reify.windowsInValue() >> <https://beam.apache.org/releases/javadoc/2.23.0/index.html?org/apache/beam/sdk/transforms/Reify.html>) >> // (G) >> >> >>> .apply(ParDo.of(new MyDoFn())) // (D) >> >> >> This will make the window information from (B) & (C) available to MyDoFn >> in (D) >> >> Kenn >> >> >>> >>> Best, >>> >>> Dongwon >>> >>> On Mon, Aug 24, 2020 at 1:51 AM Reuven Lax <re...@google.com> wrote: >>> >>>> You could simply window into GlobalWindows and add a stateful DoFn >>>> afterwards. No need for the triggering and GroupByKey. >>>> >>>> On Sun, Aug 23, 2020 at 9:45 AM Dongwon Kim <eastcirc...@gmail.com> >>>> wrote: >>>> >>>>> Hi Kenneth, >>>>> >>>>> According to your suggestion, I modified my pipeline as follows: >>>>> >>>>> p.apply(WithKeys.of(...).withKeyType(...)) >>>>>> // (A) >>>>>> .apply(Window.into(FixedWindows.of(...))) >>>>>> // (B) >>>>>> .apply(Combine.perKey(new MyCombinFn())) // >>>>>> (C) >>>>>> .apply( >>>>>> Window >>>>>> .into(new GlobalWindows()) >>>>>> // (E1) >>>>>> .triggering( >>>>>> Repeatedly.forever(AfterPane.elementCountAtLeast(1) // (E2) >>>>>> ) >>>>>> .accumulatingFiredPanes() >>>>>> // (E3) >>>>>> ) >>>>>> .apply(GroupByKey.create()) >>>>>> // (F) >>>>>> .apply(ParDo.of(new MyDoFn())) >>>>>> // (D) >>>>> >>>>> >>>>> I had to include (E1), (E2), (E3), and (F) so that MyDoFn (D) can >>>>> iterate over a list of output records from (C) sharing the same key. >>>>> This way I can achieve the same effect without having a per-key state >>>>> at (D). >>>>> >>>>> Do I understand your intention correctly? >>>>> If not, please advise me with some hints on it. >>>>> >>>>> Thanks, >>>>> >>>>> Dongwon >>>>> >>>>> >>>>> On Sun, Aug 23, 2020 at 5:10 AM Kenneth Knowles <k...@apache.org> >>>>> wrote: >>>>> >>>>>> Hi Dongwon, >>>>>> >>>>>> On Sat, Aug 22, 2020 at 2:46 PM Dongwon Kim <eastcirc...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi all, >>>>>>> >>>>>>> I'm using Beam 2.23.0 with FlinkRunner and a part of my unbounded >>>>>>> pipeline looks like below: >>>>>>> >>>>>>>> p.apply(WithKeys.of(...).withKeyType(...)) // (A) >>>>>>>> .apply(Window.into(FixedWindows.of(...))) // (B) >>>>>>> >>>>>>> .apply(Combine.perKey(new MyCombinFn())) // (C) >>>>>>> >>>>>>> .apply(ParDo.of(new MyDoFn())) // (D) >>>>>>> >>>>>>> >>>>>>> What I want to do is >>>>>>> (1) to group data by key (A) and window (B), >>>>>>> (2) to do some aggregation (C) >>>>>>> (3) to perform the final computation on each group (D) >>>>>>> >>>>>>> I've noticed that a ValueState for a particular key is NULL whenever >>>>>>> a new window for the key is arriving, which gives me a feeling that Beam >>>>>>> seems to support only per-key+window state, not per-key state, after >>>>>>> windowing. >>>>>>> >>>>>>> I usually work with Flink DataStream API and Flink supports both >>>>>>> per-key state and per-key+window state [1]. >>>>>>> >>>>>>> Does Beam support per-key states, not per-key+window states, after >>>>>>> windowing (D)? If I miss something, please correct me. >>>>>>> >>>>>> >>>>>> You understand correctly - Beam does not include per-key state that >>>>>> crosses window boundaries. If I understand your goal correctly, you can >>>>>> achieve the same effect by copying the window metadata into the element >>>>>> and >>>>>> then re-windowing into the global window before (D). >>>>>> >>>>>> Kenn >>>>>> >>>>>> >>>>>>> >>>>>>> [1] >>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#using-per-window-state-in-processwindowfunction >>>>>>> >>>>>>> Best, >>>>>>> >>>>>>> Dongwon >>>>>>> >>>>>>>