You could simulate the Samza approach by having a RichFlatMapFunction over
cogrouped streams that maintains the sliding window in its ListState. As I
understand the drawback is that the list state is not maintained in the
managed memory.
I'm interested to hear about the right way to implement this.

On Wed, Apr 13, 2016 at 3:53 PM, Elias Levy <fearsome.lucid...@gmail.com>
wrote:

> I am wondering how you would implement the following function in Flink.
> The function takes as input two streams.  One stream can be viewed a a
> tuple with two value *(x, y)*, the second stream is a stream of
> individual values *z*.  The function keeps a time based window on the
> first input (say, 24 hours).  Whenever it receives an element from the
> second stream, it compares the value *z* against the *x* element of each
> tuple in the window, and for each match it emits *(x, y)*.  You are
> basically doing a join on *x=z*.  Note that values from the second stream
> are not windowed and they are only matched to values from the first stream
> with an earlier timestamps.
>
> This was relatively easy to implement in Samza.  Consume off two topics,
> the first keyed by *x* and the second by *z*.  Consume both topics in a
> job.  Messages with the same key would be consumed by the same task.  The
> task could maintain a window of messages from the first stream in its local
> state,  Whenever a message came in via the second stream, it could look up
> in the local state for matching messages, and if it found any, send them to
> the output stream.  Obviously, with Samza you don't have the luxury of the
> system handling event time for you, but this work well and it is easy to
> implement.
>
> I am not clear how this would be implemented in Flink.
>
> It is easy enough to partition by key either stream, and to window the
> first stream using a sliding window, but from there out things get
> complicated.
>
> You can join the two streams by key, but you must do so using the same
> window for both streams.  That means events from the first stream may be
> matched to older events of the second stream, which is not what we want.  I
> suppose if both included a timestamp, you could later add a filter to
> remove such events from the merged stream.  But you would also have to deal
> with duplicates, as the window is a sliding window and the same two
> elements may match across all window panes that contain the matching
> elements.  So you need to dedup as well.
>
> coGroup seems like it would suffer from the same issues.
>
> Maybe the answer is connected streams, but there is scant documentation on
> the semantics of ConnectedStreams.  There isn't even an example that I
> could find that makes use of them.
>
> Thoughts?
>
>
>
>
>

Reply via email to