That's a neat example!

The trigger you have there will emit a ton of output. What is your
accumulation mode? I assume it must be accumulatingFiredPanes() otherwise
you would not actually have access to the prior 6 days of input.

The only trigger that is based on completeness of data is the
AfterWatermark.pastEndOfWindow() trigger, so you have to use that to
capture the 6 days of data:

    prior6days = input.apply(Window.into(<6 day windows sliding one
day>).triggering(AfterWatermark.pastEndOfWindow())

Now if you GBK this collection, each group will have a timestamp that is
the end of the 6 day period. You can use ParDo with outputWithTimestamp to
move the timestamp up to any timestamp in the following day, yielding a
PCollection of 6 day grouping of data with a timestamp in the last day of
the 7. If the 6 days of data is large you may hit size limits (either hard
limits or perf problems) and have to do something fancier.

Flatten this with the input PCollection and window into FixedWindows(<one
day>) and trigger however you like, again with accumulatingFiredPanes().
There is no guarantee that the 6 days of past data arrives prior to
elements in the last day. In fact since it will be delayed by an extra
shuffle you would expect it to often show up later. So this is a heuristic
approach equivalent to what it sounds like you are already doing that
should lower the cost.

If you want a guarantee that the 6 day buffer arrives prior to the other
elements you will need to do something else. You could write a WindowFn
that assigned all 7 days of data to a window that only spanned the first 6
days, then trigger at end of window plus allowing late data (no early
firings). Then every firing would be guaranteed by the watermark to have
the first 6 days of data plus whatever else has shown up. (I assume part of
your spec is that you do want data to be processed as it arrives, versus
waiting until the end of the 7 day window).

I am just writing this without coding, so I could certainly have missed
something or gotten it wrong.

Kenn

On Fri, Mar 26, 2021 at 1:47 PM Raman Gupta <[email protected]> wrote:

> I have a 7-day sliding calendar window, sliding by 1 day. The intent is to
> process only elements that fall into the last day of a window, but still
> have access to the elements from the preceding six days.
>
> I created a sliding calendar window function, and trigger it like this:
>
> AfterWatermark.pastEndOfWindow()
>   .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane())
>   .withLateFirings(AfterPane.elementCountAtLeast(1))
>
> Downstream of this pipeline I have a GBK and a DoFn that basically ignores
> elements until at least some of them are in the last day of the window.
>
> The above trigger works and the pipeline produces the expected output, but
> runs the GBK and downstream logic many more times than is necessary.
>
> Is there a way I can optimize the triggering here such that the early
> firings begin only when the watermark moves into the last day of the 7-day
> window?
>
> Thanks,
> Raman
>
>

Reply via email to