Hi Till, thank you for the very helpful hints. You are right, I already see backpressure. In my case, that’s ok because it throttles the Kafka source. Speaking of which: You mentioned putting the rate limiting mechanism into the source. How can I do this with a Kafka source? Just extend the Producer, or is there a better mechanism to hook into the connector?
Cheers, Florian > Am 20.01.2017 um 16:58 schrieb Till Rohrmann <trohrm...@apache.org>: > > Hi Florian, > > any blocking of the user code thread is in general a not so good idea because > the checkpointing happens under the very same lock which also guards the user > code invocation. Thus any checkpoint barrier arriving at the operator has > only the chance to trigger the checkpointing once the blocking is over. Even > worse, if the blocking happens in a downstream operator (not a source), then > this blocking could cause backpressure. Since the checkpoint barriers flow > with the events and are processed in order, the backpressure will then also > influence the checkpointing time. > > So if you want to limit the rate, you should do it a the sources without > blocking the source thread. You could for example count how many elements > you've emitted in the past second and if it exceeds your maximum, then you > don't emit the next element to downstream operators until some time has > passed (this might end up in a busy loop but it allows the checkpointing to > claim the lock). > > Cheers, > Till > > On Fri, Jan 20, 2017 at 12:18 PM, Yassine MARZOUGUI > <y.marzou...@mindlytix.com> wrote: > Hi, > > You might find this similar thread from the mailing list archive helpful : > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/throttled-stream-td6138.html. > > Best, > Yassine > > 2017-01-20 10:53 GMT+01:00 Florian König <florian.koe...@micardo.com>: > Hi, > > i need to limit the rate of processing in a Flink stream application. > Specifically, the number of items processed in a .map() operation has to stay > under a certain maximum per second. > > At the moment, I have another .map() operation before the actual processing, > which just sleeps for a certain time (e.g., 250ms for a limit of 4 requests / > sec) and returns the item unchanged: > > … > > public T map(final T value) throws Exception { > Thread.sleep(delay); > return value; > } > > … > > This works as expected, but is a rather crude approach. Checkpointing the job > takes a very long time: minutes for a state of a few kB, which for other jobs > is done in a few milliseconds. I assume that letting the whole thread sleep > for most of the time interferes with the checkpointing - not good! > > Would using a different synchronization mechanism (e.g., > https://google.github.io/guava/releases/19.0/api/docs/index.html?com/google/common/util/concurrent/RateLimiter.html) > help to make checkpointing work better? > > Or, preferably, is there a mechanism inside Flink that I can use to > accomplish the desired rate limiting? I haven’t found anything in the docs. > > Cheers, > Florian > >