Current Beam model does not guarantee an ordering after a GBK (i.e.
Combine.perKey() in your). So you cannot expect that the C step sees
elements in a specific order.
As I recall on Dataflow runner, there is very limited ordering
support. Hi +Reuven
Lax can share your insights about it?
-Rui
Hi,
My Beam pipeline is designed to work with an unbounded source KafkaIO.
It roughly looks like below:
p.apply(KafkaIO.read() ...) // (A-1)
.apply(WithKeys.of(...).withKeyType(...))
.apply(Window.into(FixedWindows.of(...)))
.apply(Combine.perKey(...))
Reuven and Kenneth,
Thanks for the tip!
Now I can get window information without having to modify the type of my
aggregator :-)
Best,
Dongwon
On Mon, Aug 24, 2020 at 3:16 AM Reuven Lax wrote:
> Kenn - shouldn't the Reify happen before the rewindow?
>
> On Sun, Aug 23, 2020 at 11:08 AM
Yes :-)
On Sun, Aug 23, 2020 at 2:16 PM Reuven Lax wrote:
> Kenn - shouldn't the Reify happen before the rewindow?
>
> On Sun, Aug 23, 2020 at 11:08 AM Kenneth Knowles wrote:
>
>>
>>
>> On Sun, Aug 23, 2020 at 1:12 PM Dongwon Kim
>> wrote:
>>
>>> Hi Reuven,
>>>
>>> You and Kenneth are right;
Kenn - shouldn't the Reify happen before the rewindow?
On Sun, Aug 23, 2020 at 11:08 AM Kenneth Knowles wrote:
>
>
> On Sun, Aug 23, 2020 at 1:12 PM Dongwon Kim wrote:
>
>> Hi Reuven,
>>
>> You and Kenneth are right; I thought GlobalWindows in unbounded streams
>> need triggers.
>>
>>
On Sun, Aug 23, 2020 at 1:12 PM Dongwon Kim wrote:
> Hi Reuven,
>
> You and Kenneth are right; I thought GlobalWindows in unbounded streams
> need triggers.
>
> p.apply(WithKeys.of(...).withKeyType(...)) // (A)
>> .apply(Window.into(FixedWindows.of(...)))// (B)
>>
Hi Reuven,
You and Kenneth are right; I thought GlobalWindows in unbounded streams
need triggers.
p.apply(WithKeys.of(...).withKeyType(...)) // (A)
> .apply(Window.into(FixedWindows.of(...)))// (B)
> .apply(Combine.perKey(new MyCombinFn())) // (C)
>
You could simply window into GlobalWindows and add a stateful DoFn
afterwards. No need for the triggering and GroupByKey.
On Sun, Aug 23, 2020 at 9:45 AM Dongwon Kim wrote:
> Hi Kenneth,
>
> According to your suggestion, I modified my pipeline as follows:
>
>
Hi Kenneth,
According to your suggestion, I modified my pipeline as follows:
p.apply(WithKeys.of(...).withKeyType(...))
> // (A)
> .apply(Window.into(FixedWindows.of(...)))
> // (B)
> .apply(Combine.perKey(new MyCombinFn()))// (C)
> .apply(
> Window
>