Hi Jonas,

A few things to clarify first:

Stream A has a rate of 100k tuples/s. After processing the whole Kafka queue, 
the rate drops to 10 tuples/s.

From this description it seems like the job is re-reading from the beginning 
from the topic, and once you reach the latest record at the head of the queue, 
you start getting the normal input rate again, correct?

What I now want is that while tuples from A are being processed in flatMap1, 
the stream B in flatMap2 should wait until the rate of the A stream has dropped 
and only then, be flatMap2 should be called.

So what you are looking for is that flatMap2 for stream B only doing work after 
the job reaches the latest record in stream A?

If that’s the case, I would not rely on determining a drop on the threshold 
rate value. It isn’t reliable because it’s dependent on stream A’s actual input 
rate, which naturally as a stream changes over time.

I’m not sure if it’s the best solution, but this is what I have in mind:
You could perhaps insert a special marker event into stream A every time you 
start running this job.
Your job can have an operator before your co-flatMap operator that expects this 
special marker, and when it receives it (which is when the head of stream A is 
reached),  broadcasts a special event to the co-flatMap for flatMap2 to be 
processed.
Then, once flatMap2 is invoked with the special event, you can toggle logic in 
flatMap2 to actually start doing stuff.

Cheers,
Gordon
On February 9, 2017 at 8:09:33 PM, Jonas (jo...@huntun.de) wrote:

Hi! I have a job that uses a RichCoFlatMapFunction of two streams: A and B.
A
    .connect(B)
    .keyBy(_.id, _.id)
    .flatMap(new MyOp)
In MyOp, the A stream tuples are combined to form a state using a 
ValueStateDescriptor. Stream A is usually started from the beginning of a Kafka 
topic. Stream A has a rate of 100k tuples/s. After processing the whole Kafka 
queue, the rate drops to 10 tuples/s. A big drop. What I now want is that while 
tuples from A are being processed in flatMap1, the stream B in flatMap2 should 
wait until the rate of the A stream has dropped and only then, be flatMap2 
should be called. Ideally, this behaviour would be captured in a separate 
operator, like RateBasedStreamValve or something like that :) To solve this, my 
idea is to add a counter/timer in the RichCoFlatMapFunction that counts how 
many tuples have been processed from A. If the rate drops below a threshold 
(here maybe 15 tuples/s), flatMap2 that proesses tuples from B empties the 
buffer. However, this would make my RichCoFlatMapFunction much bigger and would 
not allow for operator reuse in other scenarios. I'm of course happy to answer 
if something is unclear. -- Jonas
View this message in context: Start streaming tuples depending on another 
streams rate
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to