On Sun, Aug 23, 2020 at 1:12 PM Dongwon Kim <eastcirc...@gmail.com> wrote:

> Hi Reuven,
>
> You and Kenneth are right; I thought GlobalWindows in unbounded streams
> need triggers.
>
> p.apply(WithKeys.of(...).withKeyType(...))           // (A)
>>   .apply(Window.into(FixedWindows.of(...)))        // (B)
>>   .apply(Combine.perKey(new MyCombinFn()))  // (C)
>>   .apply(Window.into(new GlobalWindows()))      // (E)
>>   .apply(ParDo.of(new MyDoFn()))                      // (D)
>
>
> So just adding (E) blurs windows and makes the state defined in MyDoFn (D)
> a per-key state.
> Hope I understand you and Kenneth correctly this time.
>

That is correct. However, I think you may want:

p.apply(WithKeys.of(...).withKeyType(...))           // (A)
>   .apply(Window.into(FixedWindows.of(...)))        // (B)
>   .apply(Combine.perKey(new MyCombinFn()))  // (C)
>   .apply(Window.into(new GlobalWindows()))      // (E)


    .apply(Reify.windowsInValue()
<https://beam.apache.org/releases/javadoc/2.23.0/index.html?org/apache/beam/sdk/transforms/Reify.html>)
                         // (G)


>   .apply(ParDo.of(new MyDoFn()))                      // (D)


This will make the window information from (B) & (C) available to MyDoFn in
(D)

Kenn


>
> Best,
>
> Dongwon
>
> On Mon, Aug 24, 2020 at 1:51 AM Reuven Lax <re...@google.com> wrote:
>
>> You could simply window into GlobalWindows and add a stateful DoFn
>> afterwards. No need for the triggering and GroupByKey.
>>
>> On Sun, Aug 23, 2020 at 9:45 AM Dongwon Kim <eastcirc...@gmail.com>
>> wrote:
>>
>>> Hi Kenneth,
>>>
>>> According to your suggestion, I modified my pipeline as follows:
>>>
>>> p.apply(WithKeys.of(...).withKeyType(...))
>>>>    // (A)
>>>>   .apply(Window.into(FixedWindows.of(...)))
>>>>  // (B)
>>>>   .apply(Combine.perKey(new MyCombinFn()))                        // (C)
>>>>   .apply(
>>>>     Window
>>>>       .into(new GlobalWindows())
>>>>         // (E1)
>>>>       .triggering(
>>>>         Repeatedly.forever(AfterPane.elementCountAtLeast(1)   // (E2)
>>>>       )
>>>>       .accumulatingFiredPanes()
>>>>          // (E3)
>>>>   )
>>>>   .apply(GroupByKey.create())
>>>>          // (F)
>>>>   .apply(ParDo.of(new MyDoFn()))
>>>>     // (D)
>>>
>>>
>>> I had to include (E1), (E2), (E3), and (F) so that MyDoFn (D) can
>>> iterate over a list of output records from (C) sharing the same key.
>>> This way I can achieve the same effect without having a per-key state at
>>> (D).
>>>
>>> Do I understand your intention correctly?
>>> If not, please advise me with some hints on it.
>>>
>>> Thanks,
>>>
>>> Dongwon
>>>
>>>
>>> On Sun, Aug 23, 2020 at 5:10 AM Kenneth Knowles <k...@apache.org> wrote:
>>>
>>>> Hi Dongwon,
>>>>
>>>> On Sat, Aug 22, 2020 at 2:46 PM Dongwon Kim <eastcirc...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> I'm using Beam 2.23.0 with FlinkRunner and a part of my unbounded
>>>>> pipeline looks like below:
>>>>>
>>>>>> p.apply(WithKeys.of(...).withKeyType(...)) // (A)
>>>>>>   .apply(Window.into(FixedWindows.of(...)))        // (B)
>>>>>
>>>>>   .apply(Combine.perKey(new MyCombinFn()))  // (C)
>>>>>
>>>>>   .apply(ParDo.of(new MyDoFn()))                      // (D)
>>>>>
>>>>>
>>>>> What I want to do is
>>>>> (1) to group data by key (A) and window (B),
>>>>> (2) to do some aggregation (C)
>>>>> (3) to perform the final computation on each group (D)
>>>>>
>>>>> I've noticed that a ValueState for a particular key is NULL whenever a
>>>>> new window for the key is arriving, which gives me a feeling that Beam
>>>>> seems to support only per-key+window state, not per-key state, after
>>>>> windowing.
>>>>>
>>>>> I usually work with Flink DataStream API and Flink supports both
>>>>> per-key state and per-key+window state [1].
>>>>>
>>>>> Does Beam support per-key states, not per-key+window states, after
>>>>> windowing (D)? If I miss something, please correct me.
>>>>>
>>>>
>>>> You understand correctly - Beam does not include per-key state that
>>>> crosses window boundaries. If I understand your goal correctly, you can
>>>> achieve the same effect by copying the window metadata into the element and
>>>> then re-windowing into the global window before (D).
>>>>
>>>> Kenn
>>>>
>>>>
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#using-per-window-state-in-processwindowfunction
>>>>>
>>>>> Best,
>>>>>
>>>>> Dongwon
>>>>>
>>>>>

Reply via email to