I'm trying to build a Beam pipeline to aggregate events from an unbounded source.

These events work as follows:

1. Some event with a given ID occurs
     * This ID may be re-used but will _never_ be re-used within *Y*
       seconds
2. Any number of listeners report that event -- the *event* times will
   all be within a couple ms of each other
3. These events need to be aggregated by `id` to form some output message
     * The order of the messages is not important, just the fact that
       they are included

For example, given an input message in CSV format of `id,message`, I would expect the inputs below:

"1,this "
"1,is "
"1,event "
"2,two "
"1,one "
"2,chiming "
"2,in "

to give the outputs:

"1,this is event one"
"2,two chiming in"


With network latency, messages will have different amounts of lag getting to Beam. I'm more concerned with latency than completeness -- it's fine if we miss a couple messages from listeners -- and the downstream processing is not currently equipped to re-process events if late data comes in, so I'm happy to completely discard late data.

What I'd like to express in Beam is as follows:

 * A message with event ID *1* arrives → start a new window, wait *X*
   seconds to see if another message with event ID *A* arrives
     o A message with event ID *1* arrives in _less than_ *X* seconds →
       add it to the window and wait another *X* seconds
     o A message with event ID *1* arrives in _more than_ *X* seconds
       but _less than_ *Y* seconds → discard it
     o A message with event ID *1* arrives in _more than_ *Y* seconds →
       start a new window
 * If the *X* second timer for a window expires, trigger a pane with
   whatever events have happened to come in

At first, session windows seem like a great candidate: I can set the gap duration to *X* and events within *X* seconds of each other will be assigned to the same window -- great! However, I'm unable to drop events that are more than *X* but less than *Y* seconds apart -- Beam will instead consider this a new session.

How could I configure Beam to trigger *X* seconds after the most recent element and drop elements that arrive *X* < `arrival time` < *Y*?

Cheers,
Tomas

_

_Note: it feels like I'm fighting the framework here, which is often a sign that I could maybe approach it from a framework point-of-view -- if there are better ways to address this use-case that are more in line with Beam's philosophy I'd love to hear them too!

Reply via email to