Re: synchronizing two streams

2016-05-12 Thread Matthias J. Sax
I see. But even if you would have an operator (A,B)->(A,B), it would not be possible to block A if B does not deliver any data, because of Flink's internal design. You will need to use an custom solution: something like to a map (one for each steam) that use an side-communication channel (ie,

Re: synchronizing two streams

2016-05-12 Thread Alexander Gryzlov
Yes, this is generally a viable design, and is actually something we started off with. The problem in our case is, however, that either of the streams can occasionally (due to external producer's issues) get stuck for an arbitrary period of time, up to several hours. Buffering the other one

Re: synchronizing two streams

2016-05-12 Thread Matthias J. Sax
That is correct. But there is no reason to throttle an input stream. If you implements an Outer-Join you will have two in-memory buffers holding the record of each stream of your "time window". Each time you receive a watermark, you can remove all "expired" records from the buffer of the other

Re: synchronizing two streams

2016-05-12 Thread Alexander Gryzlov
Hmm, probably I don't really get how Flink's execution model works. As far as I understand, the preferred way to throttle down stream consumption is to simply have an operator with a conditional Thread.sleep() inside. Wouldn't calling sleep() in either of TwoInputStreamOperator's

Re: synchronizing two streams

2016-05-12 Thread Matthias J. Sax
I cannot follow completely. TwoInputStreamOperators defines two methods to process watermarks for each stream. So you can sync both stream within your outer join operator you plan to implement. -Matthias On 05/11/2016 05:00 PM, Alexander Gryzlov wrote: > Hello, > > We're implementing a

synchronizing two streams

2016-05-11 Thread Alexander Gryzlov
Hello, We're implementing a streaming outer join operator based on a TwoInputStreamOperator with an internal buffer. In our use-case only the items whose timestamps are within a several-second interval of each other can join, so we need to synchronize the two input streams to ensure maximal