Hi Jonas, A few things to clarify first:
Stream A has a rate of 100k tuples/s. After processing the whole Kafka queue, the rate drops to 10 tuples/s. From this description it seems like the job is re-reading from the beginning from the topic, and once you reach the latest record at the head of the queue, you start getting the normal input rate again, correct? What I now want is that while tuples from A are being processed in flatMap1, the stream B in flatMap2 should wait until the rate of the A stream has dropped and only then, be flatMap2 should be called. So what you are looking for is that flatMap2 for stream B only doing work after the job reaches the latest record in stream A? If that’s the case, I would not rely on determining a drop on the threshold rate value. It isn’t reliable because it’s dependent on stream A’s actual input rate, which naturally as a stream changes over time. I’m not sure if it’s the best solution, but this is what I have in mind: You could perhaps insert a special marker event into stream A every time you start running this job. Your job can have an operator before your co-flatMap operator that expects this special marker, and when it receives it (which is when the head of stream A is reached), broadcasts a special event to the co-flatMap for flatMap2 to be processed. Then, once flatMap2 is invoked with the special event, you can toggle logic in flatMap2 to actually start doing stuff. Cheers, Gordon On February 9, 2017 at 8:09:33 PM, Jonas (jo...@huntun.de) wrote: Hi! I have a job that uses a RichCoFlatMapFunction of two streams: A and B. A .connect(B) .keyBy(_.id, _.id) .flatMap(new MyOp) In MyOp, the A stream tuples are combined to form a state using a ValueStateDescriptor. Stream A is usually started from the beginning of a Kafka topic. Stream A has a rate of 100k tuples/s. After processing the whole Kafka queue, the rate drops to 10 tuples/s. A big drop. What I now want is that while tuples from A are being processed in flatMap1, the stream B in flatMap2 should wait until the rate of the A stream has dropped and only then, be flatMap2 should be called. Ideally, this behaviour would be captured in a separate operator, like RateBasedStreamValve or something like that :) To solve this, my idea is to add a counter/timer in the RichCoFlatMapFunction that counts how many tuples have been processed from A. If the rate drops below a threshold (here maybe 15 tuples/s), flatMap2 that proesses tuples from B empties the buffer. However, this would make my RichCoFlatMapFunction much bigger and would not allow for operator reuse in other scenarios. I'm of course happy to answer if something is unclear. -- Jonas View this message in context: Start streaming tuples depending on another streams rate Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.