Reuven and Kenneth,

Thanks for the tip!

Now I can get window information without having to modify the type of my
aggregator :-)

Best,

Dongwon

On Mon, Aug 24, 2020 at 3:16 AM Reuven Lax <re...@google.com> wrote:

> Kenn - shouldn't the Reify happen before the rewindow?
>
> On Sun, Aug 23, 2020 at 11:08 AM Kenneth Knowles <k...@apache.org> wrote:
>
>>
>>
>> On Sun, Aug 23, 2020 at 1:12 PM Dongwon Kim <eastcirc...@gmail.com>
>> wrote:
>>
>>> Hi Reuven,
>>>
>>> You and Kenneth are right; I thought GlobalWindows in unbounded streams
>>> need triggers.
>>>
>>> p.apply(WithKeys.of(...).withKeyType(...))           // (A)
>>>>   .apply(Window.into(FixedWindows.of(...)))        // (B)
>>>>   .apply(Combine.perKey(new MyCombinFn()))  // (C)
>>>>   .apply(Window.into(new GlobalWindows()))      // (E)
>>>>   .apply(ParDo.of(new MyDoFn()))                      // (D)
>>>
>>>
>>> So just adding (E) blurs windows and makes the state defined in MyDoFn
>>> (D) a per-key state.
>>> Hope I understand you and Kenneth correctly this time.
>>>
>>
>> That is correct. However, I think you may want:
>>
>> p.apply(WithKeys.of(...).withKeyType(...))           // (A)
>>>   .apply(Window.into(FixedWindows.of(...)))        // (B)
>>>   .apply(Combine.perKey(new MyCombinFn()))  // (C)
>>>   .apply(Window.into(new GlobalWindows()))      // (E)
>>
>>
>>     .apply(Reify.windowsInValue()
>> <https://beam.apache.org/releases/javadoc/2.23.0/index.html?org/apache/beam/sdk/transforms/Reify.html>)
>>                          // (G)
>>
>>
>>>   .apply(ParDo.of(new MyDoFn()))                      // (D)
>>
>>
>> This will make the window information from (B) & (C) available to MyDoFn
>> in (D)
>>
>> Kenn
>>
>>
>>>
>>> Best,
>>>
>>> Dongwon
>>>
>>> On Mon, Aug 24, 2020 at 1:51 AM Reuven Lax <re...@google.com> wrote:
>>>
>>>> You could simply window into GlobalWindows and add a stateful DoFn
>>>> afterwards. No need for the triggering and GroupByKey.
>>>>
>>>> On Sun, Aug 23, 2020 at 9:45 AM Dongwon Kim <eastcirc...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Kenneth,
>>>>>
>>>>> According to your suggestion, I modified my pipeline as follows:
>>>>>
>>>>> p.apply(WithKeys.of(...).withKeyType(...))
>>>>>>      // (A)
>>>>>>   .apply(Window.into(FixedWindows.of(...)))
>>>>>>    // (B)
>>>>>>   .apply(Combine.perKey(new MyCombinFn()))                        //
>>>>>> (C)
>>>>>>   .apply(
>>>>>>     Window
>>>>>>       .into(new GlobalWindows())
>>>>>>           // (E1)
>>>>>>       .triggering(
>>>>>>         Repeatedly.forever(AfterPane.elementCountAtLeast(1)   // (E2)
>>>>>>       )
>>>>>>       .accumulatingFiredPanes()
>>>>>>            // (E3)
>>>>>>   )
>>>>>>   .apply(GroupByKey.create())
>>>>>>            // (F)
>>>>>>   .apply(ParDo.of(new MyDoFn()))
>>>>>>       // (D)
>>>>>
>>>>>
>>>>> I had to include (E1), (E2), (E3), and (F) so that MyDoFn (D) can
>>>>> iterate over a list of output records from (C) sharing the same key.
>>>>> This way I can achieve the same effect without having a per-key state
>>>>> at (D).
>>>>>
>>>>> Do I understand your intention correctly?
>>>>> If not, please advise me with some hints on it.
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Dongwon
>>>>>
>>>>>
>>>>> On Sun, Aug 23, 2020 at 5:10 AM Kenneth Knowles <k...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi Dongwon,
>>>>>>
>>>>>> On Sat, Aug 22, 2020 at 2:46 PM Dongwon Kim <eastcirc...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi all,
>>>>>>>
>>>>>>> I'm using Beam 2.23.0 with FlinkRunner and a part of my unbounded
>>>>>>> pipeline looks like below:
>>>>>>>
>>>>>>>> p.apply(WithKeys.of(...).withKeyType(...)) // (A)
>>>>>>>>   .apply(Window.into(FixedWindows.of(...)))        // (B)
>>>>>>>
>>>>>>>   .apply(Combine.perKey(new MyCombinFn()))  // (C)
>>>>>>>
>>>>>>>   .apply(ParDo.of(new MyDoFn()))                      // (D)
>>>>>>>
>>>>>>>
>>>>>>> What I want to do is
>>>>>>> (1) to group data by key (A) and window (B),
>>>>>>> (2) to do some aggregation (C)
>>>>>>> (3) to perform the final computation on each group (D)
>>>>>>>
>>>>>>> I've noticed that a ValueState for a particular key is NULL whenever
>>>>>>> a new window for the key is arriving, which gives me a feeling that Beam
>>>>>>> seems to support only per-key+window state, not per-key state, after
>>>>>>> windowing.
>>>>>>>
>>>>>>> I usually work with Flink DataStream API and Flink supports both
>>>>>>> per-key state and per-key+window state [1].
>>>>>>>
>>>>>>> Does Beam support per-key states, not per-key+window states, after
>>>>>>> windowing (D)? If I miss something, please correct me.
>>>>>>>
>>>>>>
>>>>>> You understand correctly - Beam does not include per-key state that
>>>>>> crosses window boundaries. If I understand your goal correctly, you can
>>>>>> achieve the same effect by copying the window metadata into the element 
>>>>>> and
>>>>>> then re-windowing into the global window before (D).
>>>>>>
>>>>>> Kenn
>>>>>>
>>>>>>
>>>>>>>
>>>>>>> [1]
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#using-per-window-state-in-processwindowfunction
>>>>>>>
>>>>>>> Best,
>>>>>>>
>>>>>>> Dongwon
>>>>>>>
>>>>>>>

Reply via email to