Hi Johannes,

EventTimeSessionWindows [1] use the EventTimeTrigger [2] as default trigger
(see EventTimeSessionWindows.getDefaultTrigger()).

I would take the EventTimeTrigger and extend it with early firing
functionality.
However, there are a few things to consider
* you need to be aware that session window can be merged, i.e., two session
windows A, B with gap 10: A [20,25), B [37, 45), will be merged when a
record at 32 is received.
* windows store all records in a list. For every firing, you need to
iterate the full list and also track which records you joined already to
avoid duplicates. Maybe you can migrate records from the window state into
a custom state defined in a ProcessWindowFunction.

Best, Fabian





2018-06-13 13:43 GMT+02:00 Johannes Schulte <johannes.schu...@gmail.com>:

> Hi,
>
> I am joining two streams with a session window and want to emit a joined
> (early) result for every element arriving on one of the streams.
>
> Currently the code looks like this:
>
> s1.join(s2)
> .where(s1.id).equalTo(s2.id)
> .window(EventTimeSessionWindows.withGap(Time.minutes(15)))
> // trigger(?)
> .apply(...custom code..)
>
> What I am missing is the right trigger ala "withEarlyFiring" - do I need
> to implement my on trigger for this and if yes, what kind of functionality
> must be present to not break the session window semantics?
>
> Thanks in advance,
>
> Johannes
>
>

Reply via email to