Hi Florian, you can rate-limit the Kafka consumer by implementing a custom DeserializationSchema that sleeps a bit from time to time (or at each deserialization step)
On Tue, Jan 24, 2017 at 1:16 PM, Florian König <florian.koe...@micardo.com> wrote: > Hi Till, > > thank you for the very helpful hints. You are right, I already see > backpressure. In my case, that’s ok because it throttles the Kafka source. > Speaking of which: You mentioned putting the rate limiting mechanism into > the source. How can I do this with a Kafka source? Just extend the > Producer, or is there a better mechanism to hook into the connector? > > Cheers, > Florian > > > > Am 20.01.2017 um 16:58 schrieb Till Rohrmann <trohrm...@apache.org>: > > > > Hi Florian, > > > > any blocking of the user code thread is in general a not so good idea > because the checkpointing happens under the very same lock which also > guards the user code invocation. Thus any checkpoint barrier arriving at > the operator has only the chance to trigger the checkpointing once the > blocking is over. Even worse, if the blocking happens in a downstream > operator (not a source), then this blocking could cause backpressure. Since > the checkpoint barriers flow with the events and are processed in order, > the backpressure will then also influence the checkpointing time. > > > > So if you want to limit the rate, you should do it a the sources without > blocking the source thread. You could for example count how many elements > you've emitted in the past second and if it exceeds your maximum, then you > don't emit the next element to downstream operators until some time has > passed (this might end up in a busy loop but it allows the checkpointing to > claim the lock). > > > > Cheers, > > Till > > > > On Fri, Jan 20, 2017 at 12:18 PM, Yassine MARZOUGUI < > y.marzou...@mindlytix.com> wrote: > > Hi, > > > > You might find this similar thread from the mailing list archive helpful > : http://apache-flink-user-mailing-list-archive.2336050. > n4.nabble.com/throttled-stream-td6138.html. > > > > Best, > > Yassine > > > > 2017-01-20 10:53 GMT+01:00 Florian König <florian.koe...@micardo.com>: > > Hi, > > > > i need to limit the rate of processing in a Flink stream application. > Specifically, the number of items processed in a .map() operation has to > stay under a certain maximum per second. > > > > At the moment, I have another .map() operation before the actual > processing, which just sleeps for a certain time (e.g., 250ms for a limit > of 4 requests / sec) and returns the item unchanged: > > > > … > > > > public T map(final T value) throws Exception { > > Thread.sleep(delay); > > return value; > > } > > > > … > > > > This works as expected, but is a rather crude approach. Checkpointing > the job takes a very long time: minutes for a state of a few kB, which for > other jobs is done in a few milliseconds. I assume that letting the whole > thread sleep for most of the time interferes with the checkpointing - not > good! > > > > Would using a different synchronization mechanism (e.g., > https://google.github.io/guava/releases/19.0/api/docs/ > index.html?com/google/common/util/concurrent/RateLimiter.html) help to > make checkpointing work better? > > > > Or, preferably, is there a mechanism inside Flink that I can use to > accomplish the desired rate limiting? I haven’t found anything in the docs. > > > > Cheers, > > Florian > > > > > > >