You could simply window into GlobalWindows and add a stateful DoFn
afterwards. No need for the triggering and GroupByKey.

On Sun, Aug 23, 2020 at 9:45 AM Dongwon Kim <eastcirc...@gmail.com> wrote:

> Hi Kenneth,
>
> According to your suggestion, I modified my pipeline as follows:
>
> p.apply(WithKeys.of(...).withKeyType(...))
>>  // (A)
>>   .apply(Window.into(FixedWindows.of(...)))
>>  // (B)
>>   .apply(Combine.perKey(new MyCombinFn()))                        // (C)
>>   .apply(
>>     Window
>>       .into(new GlobalWindows())
>>       // (E1)
>>       .triggering(
>>         Repeatedly.forever(AfterPane.elementCountAtLeast(1)   // (E2)
>>       )
>>       .accumulatingFiredPanes()
>>        // (E3)
>>   )
>>   .apply(GroupByKey.create())
>>        // (F)
>>   .apply(ParDo.of(new MyDoFn()))
>>   // (D)
>
>
> I had to include (E1), (E2), (E3), and (F) so that MyDoFn (D) can iterate
> over a list of output records from (C) sharing the same key.
> This way I can achieve the same effect without having a per-key state at
> (D).
>
> Do I understand your intention correctly?
> If not, please advise me with some hints on it.
>
> Thanks,
>
> Dongwon
>
>
> On Sun, Aug 23, 2020 at 5:10 AM Kenneth Knowles <k...@apache.org> wrote:
>
>> Hi Dongwon,
>>
>> On Sat, Aug 22, 2020 at 2:46 PM Dongwon Kim <eastcirc...@gmail.com>
>> wrote:
>>
>>> Hi all,
>>>
>>> I'm using Beam 2.23.0 with FlinkRunner and a part of my unbounded
>>> pipeline looks like below:
>>>
>>>> p.apply(WithKeys.of(...).withKeyType(...)) // (A)
>>>>   .apply(Window.into(FixedWindows.of(...)))        // (B)
>>>
>>>   .apply(Combine.perKey(new MyCombinFn()))  // (C)
>>>
>>>   .apply(ParDo.of(new MyDoFn()))                      // (D)
>>>
>>>
>>> What I want to do is
>>> (1) to group data by key (A) and window (B),
>>> (2) to do some aggregation (C)
>>> (3) to perform the final computation on each group (D)
>>>
>>> I've noticed that a ValueState for a particular key is NULL whenever a
>>> new window for the key is arriving, which gives me a feeling that Beam
>>> seems to support only per-key+window state, not per-key state, after
>>> windowing.
>>>
>>> I usually work with Flink DataStream API and Flink supports both per-key
>>> state and per-key+window state [1].
>>>
>>> Does Beam support per-key states, not per-key+window states, after
>>> windowing (D)? If I miss something, please correct me.
>>>
>>
>> You understand correctly - Beam does not include per-key state that
>> crosses window boundaries. If I understand your goal correctly, you can
>> achieve the same effect by copying the window metadata into the element and
>> then re-windowing into the global window before (D).
>>
>> Kenn
>>
>>
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#using-per-window-state-in-processwindowfunction
>>>
>>> Best,
>>>
>>> Dongwon
>>>
>>>

Reply via email to