On Sun, Aug 23, 2020 at 1:12 PM Dongwon Kim <eastcirc...@gmail.com> wrote:
> Hi Reuven, > > You and Kenneth are right; I thought GlobalWindows in unbounded streams > need triggers. > > p.apply(WithKeys.of(...).withKeyType(...)) // (A) >> .apply(Window.into(FixedWindows.of(...))) // (B) >> .apply(Combine.perKey(new MyCombinFn())) // (C) >> .apply(Window.into(new GlobalWindows())) // (E) >> .apply(ParDo.of(new MyDoFn())) // (D) > > > So just adding (E) blurs windows and makes the state defined in MyDoFn (D) > a per-key state. > Hope I understand you and Kenneth correctly this time. > That is correct. However, I think you may want: p.apply(WithKeys.of(...).withKeyType(...)) // (A) > .apply(Window.into(FixedWindows.of(...))) // (B) > .apply(Combine.perKey(new MyCombinFn())) // (C) > .apply(Window.into(new GlobalWindows())) // (E) .apply(Reify.windowsInValue() <https://beam.apache.org/releases/javadoc/2.23.0/index.html?org/apache/beam/sdk/transforms/Reify.html>) // (G) > .apply(ParDo.of(new MyDoFn())) // (D) This will make the window information from (B) & (C) available to MyDoFn in (D) Kenn > > Best, > > Dongwon > > On Mon, Aug 24, 2020 at 1:51 AM Reuven Lax <re...@google.com> wrote: > >> You could simply window into GlobalWindows and add a stateful DoFn >> afterwards. No need for the triggering and GroupByKey. >> >> On Sun, Aug 23, 2020 at 9:45 AM Dongwon Kim <eastcirc...@gmail.com> >> wrote: >> >>> Hi Kenneth, >>> >>> According to your suggestion, I modified my pipeline as follows: >>> >>> p.apply(WithKeys.of(...).withKeyType(...)) >>>> // (A) >>>> .apply(Window.into(FixedWindows.of(...))) >>>> // (B) >>>> .apply(Combine.perKey(new MyCombinFn())) // (C) >>>> .apply( >>>> Window >>>> .into(new GlobalWindows()) >>>> // (E1) >>>> .triggering( >>>> Repeatedly.forever(AfterPane.elementCountAtLeast(1) // (E2) >>>> ) >>>> .accumulatingFiredPanes() >>>> // (E3) >>>> ) >>>> .apply(GroupByKey.create()) >>>> // (F) >>>> .apply(ParDo.of(new MyDoFn())) >>>> // (D) >>> >>> >>> I had to include (E1), (E2), (E3), and (F) so that MyDoFn (D) can >>> iterate over a list of output records from (C) sharing the same key. >>> This way I can achieve the same effect without having a per-key state at >>> (D). >>> >>> Do I understand your intention correctly? >>> If not, please advise me with some hints on it. >>> >>> Thanks, >>> >>> Dongwon >>> >>> >>> On Sun, Aug 23, 2020 at 5:10 AM Kenneth Knowles <k...@apache.org> wrote: >>> >>>> Hi Dongwon, >>>> >>>> On Sat, Aug 22, 2020 at 2:46 PM Dongwon Kim <eastcirc...@gmail.com> >>>> wrote: >>>> >>>>> Hi all, >>>>> >>>>> I'm using Beam 2.23.0 with FlinkRunner and a part of my unbounded >>>>> pipeline looks like below: >>>>> >>>>>> p.apply(WithKeys.of(...).withKeyType(...)) // (A) >>>>>> .apply(Window.into(FixedWindows.of(...))) // (B) >>>>> >>>>> .apply(Combine.perKey(new MyCombinFn())) // (C) >>>>> >>>>> .apply(ParDo.of(new MyDoFn())) // (D) >>>>> >>>>> >>>>> What I want to do is >>>>> (1) to group data by key (A) and window (B), >>>>> (2) to do some aggregation (C) >>>>> (3) to perform the final computation on each group (D) >>>>> >>>>> I've noticed that a ValueState for a particular key is NULL whenever a >>>>> new window for the key is arriving, which gives me a feeling that Beam >>>>> seems to support only per-key+window state, not per-key state, after >>>>> windowing. >>>>> >>>>> I usually work with Flink DataStream API and Flink supports both >>>>> per-key state and per-key+window state [1]. >>>>> >>>>> Does Beam support per-key states, not per-key+window states, after >>>>> windowing (D)? If I miss something, please correct me. >>>>> >>>> >>>> You understand correctly - Beam does not include per-key state that >>>> crosses window boundaries. If I understand your goal correctly, you can >>>> achieve the same effect by copying the window metadata into the element and >>>> then re-windowing into the global window before (D). >>>> >>>> Kenn >>>> >>>> >>>>> >>>>> [1] >>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#using-per-window-state-in-processwindowfunction >>>>> >>>>> Best, >>>>> >>>>> Dongwon >>>>> >>>>>