On Mon, Mar 29, 2021 at 1:17 PM Kenneth Knowles <k...@apache.org> wrote:

> That's a neat example!
>
> The trigger you have there will emit a ton of output. What is your
> accumulation mode? I assume it must be accumulatingFiredPanes() otherwise
> you would not actually have access to the prior 6 days of input.
>

Yup, accumulating.


>
> The only trigger that is based on completeness of data is the
> AfterWatermark.pastEndOfWindow() trigger, so you have to use that to
> capture the 6 days of data:
>
>     prior6days = input.apply(Window.into(<6 day windows sliding one
> day>).triggering(AfterWatermark.pastEndOfWindow())
>
> Now if you GBK this collection, each group will have a timestamp that is
> the end of the 6 day period. You can use ParDo with outputWithTimestamp to
> move the timestamp up to any timestamp in the following day, yielding a
> PCollection of 6 day grouping of data with a timestamp in the last day of
> the 7. If the 6 days of data is large you may hit size limits (either hard
> limits or perf problems) and have to do something fancier.
>
> Flatten this with the input PCollection and window into FixedWindows(<one
> day>) and trigger however you like, again with accumulatingFiredPanes().
> There is no guarantee that the 6 days of past data arrives prior to
> elements in the last day. In fact since it will be delayed by an extra
> shuffle you would expect it to often show up later. So this is a heuristic
> approach equivalent to what it sounds like you are already doing that
> should lower the cost.
>

Ah interesting. Yes, this would likely have worked for me.


>
> If you want a guarantee that the 6 day buffer arrives prior to the other
> elements you will need to do something else. You could write a WindowFn
> that assigned all 7 days of data to a window that only spanned the first 6
> days, then trigger at end of window plus allowing late data (no early
> firings). Then every firing would be guaranteed by the watermark to have
> the first 6 days of data plus whatever else has shown up. (I assume part of
> your spec is that you do want data to be processed as it arrives, versus
> waiting until the end of the 7 day window).
>

I was curious about this option, and tried it. One issue I ran into was
that the downstream logic had some "odd" windows to deal with because the
window interval did not properly reflect its contents, which resulted in
some downstream logic that wasn't as encapsulated as it should be.

I therefore created a PTransform "DailyWindowsWithContext" that first does
the contextual windowing ("ContextualCalendarDayWindow") and GBK. It then
"re-windows" the elements + their context by filtering out the
"context-only" groups, sets the timestamps of the remaining groups based on
the max of the element timestamps, and then outputs them into a fixed
daily window, followed by another GBK and flatten.

This seems to work quite well with my set of unit tests, though I haven't
used it extensively yet. If anyone is curious about the code (written in
Kotlin), see here:

https://gist.github.com/rocketraman/543f066813fc89590f23ff5dacf43f01

Feedback on this code would be more than welcome.

Regards,
Raman



>
> I am just writing this without coding, so I could certainly have missed
> something or gotten it wrong.
>
> Kenn
>
> On Fri, Mar 26, 2021 at 1:47 PM Raman Gupta <rocketra...@gmail.com> wrote:
>
>> I have a 7-day sliding calendar window, sliding by 1 day. The intent is
>> to process only elements that fall into the last day of a window, but still
>> have access to the elements from the preceding six days.
>>
>> I created a sliding calendar window function, and trigger it like this:
>>
>> AfterWatermark.pastEndOfWindow()
>>   .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane())
>>   .withLateFirings(AfterPane.elementCountAtLeast(1))
>>
>> Downstream of this pipeline I have a GBK and a DoFn that basically
>> ignores elements until at least some of them are in the last day of
>> the window.
>>
>> The above trigger works and the pipeline produces the expected output,
>> but runs the GBK and downstream logic many more times than is necessary.
>>
>> Is there a way I can optimize the triggering here such that the early
>> firings begin only when the watermark moves into the last day of the 7-day
>> window?
>>
>> Thanks,
>> Raman
>>
>>

Reply via email to