Hi Till,

thank you for the very helpful hints. You are right, I already see 
backpressure. In my case, that’s ok because it throttles the Kafka source. 
Speaking of which: You mentioned putting the rate limiting mechanism into the 
source. How can I do this with a Kafka source? Just extend the Producer, or is 
there a better mechanism to hook into the connector?

Cheers,
Florian


> Am 20.01.2017 um 16:58 schrieb Till Rohrmann <trohrm...@apache.org>:
> 
> Hi Florian,
> 
> any blocking of the user code thread is in general a not so good idea because 
> the checkpointing happens under the very same lock which also guards the user 
> code invocation. Thus any checkpoint barrier arriving at the operator has 
> only the chance to trigger the checkpointing once the blocking is over. Even 
> worse, if the blocking happens in a downstream operator (not a source), then 
> this blocking could cause backpressure. Since the checkpoint barriers flow 
> with the events and are processed in order, the backpressure will then also 
> influence the checkpointing time.
> 
> So if you want to limit the rate, you should do it a the sources without 
> blocking the source thread. You could for example count how many elements 
> you've emitted in the past second and if it exceeds your maximum, then you 
> don't emit the next element to downstream operators until some time has 
> passed (this might end up in a busy loop but it allows the checkpointing to 
> claim the lock).
> 
> Cheers,
> Till
> 
> On Fri, Jan 20, 2017 at 12:18 PM, Yassine MARZOUGUI 
> <y.marzou...@mindlytix.com> wrote:
> Hi,
> 
> You might find this similar thread from the mailing list archive helpful : 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/throttled-stream-td6138.html.
> 
> Best,
> Yassine
> 
> 2017-01-20 10:53 GMT+01:00 Florian König <florian.koe...@micardo.com>:
> Hi,
> 
> i need to limit the rate of processing in a Flink stream application. 
> Specifically, the number of items processed in a .map() operation has to stay 
> under a certain maximum per second.
> 
> At the moment, I have another .map() operation before the actual processing, 
> which just sleeps for a certain time (e.g., 250ms for a limit of 4 requests / 
> sec) and returns the item unchanged:
> 
> …
> 
> public T map(final T value) throws Exception {
>         Thread.sleep(delay);
>         return value;
> }
> 
> …
> 
> This works as expected, but is a rather crude approach. Checkpointing the job 
> takes a very long time: minutes for a state of a few kB, which for other jobs 
> is done in a few milliseconds. I assume that letting the whole thread sleep 
> for most of the time interferes with the checkpointing - not good!
> 
> Would using a different synchronization mechanism (e.g., 
> https://google.github.io/guava/releases/19.0/api/docs/index.html?com/google/common/util/concurrent/RateLimiter.html)
>  help to make checkpointing work better?
> 
> Or, preferably, is there a mechanism inside Flink that I can use to 
> accomplish the desired rate limiting? I haven’t found anything in the docs.
> 
> Cheers,
> Florian
> 
> 


Reply via email to