Hey guys,

As Aljoscha has highlighted earlier the current window join semantics in
the streaming api doesn't follow the changes in the windowing api. More
precisely, we currently only support joins over time windows of equal size
on both streams. The reason for this is that we now take a window of each
of the two streams and do joins over these pairs. This would be a blocking
operation if the windows are not closed at exactly the same time (and since
we dont want this we only allow time windows)

I talked with Peter who came up with the initial idea of an alternative
approach for stream joins which works as follows:

Instead of pairing windows for joins, we do element against window joins.
What this means is that whenever we receive an element from one of the
streams, we join this element with the current window(this window is
constantly updated) of the other stream. This is non-blocking on any window
definitions as we dont have to wait for windows to be completed and we can
use this with any of our predefined policies like Time.of(...),
Count.of(...), Delta.of(....).

Additionally this also allows some very flexible way of defining window
joins. With this we could also define grouped windowing inside if a join.
An example of this would be: Join all elements of Stream1 with the last 5
elements by a given windowkey of Stream2 on some join key.

This feature can be easily implemented over the current operators, so I
already have a working prototype for the simple non-grouped case. My only
concern is the API, the best thing I could come up with is something like
this:

stream_A.join(stream_B).onWindow(windowDefA, windowDefB).by(windowKey1,
windowKey2).where(...).equalTo(...).with(...)

(the user can omit the "by" and "with" calls)

I think this new approach would be worthy of our "flexible windowing" in
contrast with the current approach.

Regards,
Gyula

Reply via email to