You could simply window into GlobalWindows and add a stateful DoFn afterwards. No need for the triggering and GroupByKey.
On Sun, Aug 23, 2020 at 9:45 AM Dongwon Kim <eastcirc...@gmail.com> wrote: > Hi Kenneth, > > According to your suggestion, I modified my pipeline as follows: > > p.apply(WithKeys.of(...).withKeyType(...)) >> // (A) >> .apply(Window.into(FixedWindows.of(...))) >> // (B) >> .apply(Combine.perKey(new MyCombinFn())) // (C) >> .apply( >> Window >> .into(new GlobalWindows()) >> // (E1) >> .triggering( >> Repeatedly.forever(AfterPane.elementCountAtLeast(1) // (E2) >> ) >> .accumulatingFiredPanes() >> // (E3) >> ) >> .apply(GroupByKey.create()) >> // (F) >> .apply(ParDo.of(new MyDoFn())) >> // (D) > > > I had to include (E1), (E2), (E3), and (F) so that MyDoFn (D) can iterate > over a list of output records from (C) sharing the same key. > This way I can achieve the same effect without having a per-key state at > (D). > > Do I understand your intention correctly? > If not, please advise me with some hints on it. > > Thanks, > > Dongwon > > > On Sun, Aug 23, 2020 at 5:10 AM Kenneth Knowles <k...@apache.org> wrote: > >> Hi Dongwon, >> >> On Sat, Aug 22, 2020 at 2:46 PM Dongwon Kim <eastcirc...@gmail.com> >> wrote: >> >>> Hi all, >>> >>> I'm using Beam 2.23.0 with FlinkRunner and a part of my unbounded >>> pipeline looks like below: >>> >>>> p.apply(WithKeys.of(...).withKeyType(...)) // (A) >>>> .apply(Window.into(FixedWindows.of(...))) // (B) >>> >>> .apply(Combine.perKey(new MyCombinFn())) // (C) >>> >>> .apply(ParDo.of(new MyDoFn())) // (D) >>> >>> >>> What I want to do is >>> (1) to group data by key (A) and window (B), >>> (2) to do some aggregation (C) >>> (3) to perform the final computation on each group (D) >>> >>> I've noticed that a ValueState for a particular key is NULL whenever a >>> new window for the key is arriving, which gives me a feeling that Beam >>> seems to support only per-key+window state, not per-key state, after >>> windowing. >>> >>> I usually work with Flink DataStream API and Flink supports both per-key >>> state and per-key+window state [1]. >>> >>> Does Beam support per-key states, not per-key+window states, after >>> windowing (D)? If I miss something, please correct me. >>> >> >> You understand correctly - Beam does not include per-key state that >> crosses window boundaries. If I understand your goal correctly, you can >> achieve the same effect by copying the window metadata into the element and >> then re-windowing into the global window before (D). >> >> Kenn >> >> >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#using-per-window-state-in-processwindowfunction >>> >>> Best, >>> >>> Dongwon >>> >>>