Quick note for anyone else that comes across a similar use-case: looks
like this is a prime candidate for Stateful DoFns:
https://beam.apache.org/documentation/programming-guide/#state-and-timers
On 2021/03/26 22:34:35, Tomas Gareau <[email protected]> wrote:
> I'm trying to build a Beam pipeline to aggregate events from an >
> unbounded source.>
>
> These events work as follows:>
>
> 1. Some event with a given ID occurs>
> * This ID may be re-used but will _never_ be re-used within *Y*>
> seconds>
> 2. Any number of listeners report that event -- the *event* times will>
> all be within a couple ms of each other>
> 3. These events need to be aggregated by `id` to form some output
message>
> * The order of the messages is not important, just the fact that>
> they are included>
>
> For example, given an input message in CSV format of `id,message`, I >
> would expect the inputs below:>
>
> "1,this ">
> "1,is ">
> "1,event ">
> "2,two ">
> "1,one ">
> "2,chiming ">
> "2,in ">
>
> to give the outputs:>
>
> "1,this is event one">
> "2,two chiming in">
>
>
> With network latency, messages will have different amounts of lag >
> getting to Beam. I'm more concerned with latency than completeness -- >
> it's fine if we miss a couple messages from listeners -- and the >
> downstream processing is not currently equipped to re-process events
if >
> late data comes in, so I'm happy to completely discard late data.>
>
> What I'd like to express in Beam is as follows:>
>
> * A message with event ID *1* arrives → start a new window, wait *X*>
> seconds to see if another message with event ID *A* arrives>
> o A message with event ID *1* arrives in _less than_ *X* seconds →>
> add it to the window and wait another *X* seconds>
> o A message with event ID *1* arrives in _more than_ *X* seconds>
> but _less than_ *Y* seconds → discard it>
> o A message with event ID *1* arrives in _more than_ *Y* seconds →>
> start a new window>
> * If the *X* second timer for a window expires, trigger a pane with>
> whatever events have happened to come in>
>
> At first, session windows seem like a great candidate: I can set the
gap >
> duration to *X* and events within *X* seconds of each other will be >
> assigned to the same window -- great! However, I'm unable to drop
events >
> that are more than *X* but less than *Y* seconds apart -- Beam will >
> instead consider this a new session.>
>
> How could I configure Beam to trigger *X* seconds after the most
recent >
> element and drop elements that arrive *X* < `arrival time` < *Y*?>
>
> Cheers,>
> Tomas>
>
> _>
>
> _Note: it feels like I'm fighting the framework here, which is often a >
> sign that I could maybe approach it from a framework point-of-view --
if >
> there are better ways to address this use-case that are more in line >
> with Beam's philosophy I'd love to hear them too!>
>
>