Hi Florian,

you can rate-limit the Kafka consumer by implementing a custom
DeserializationSchema that sleeps a bit from time to time (or at each
deserialization step)

On Tue, Jan 24, 2017 at 1:16 PM, Florian König <florian.koe...@micardo.com>
wrote:

> Hi Till,
>
> thank you for the very helpful hints. You are right, I already see
> backpressure. In my case, that’s ok because it throttles the Kafka source.
> Speaking of which: You mentioned putting the rate limiting mechanism into
> the source. How can I do this with a Kafka source? Just extend the
> Producer, or is there a better mechanism to hook into the connector?
>
> Cheers,
> Florian
>
>
> > Am 20.01.2017 um 16:58 schrieb Till Rohrmann <trohrm...@apache.org>:
> >
> > Hi Florian,
> >
> > any blocking of the user code thread is in general a not so good idea
> because the checkpointing happens under the very same lock which also
> guards the user code invocation. Thus any checkpoint barrier arriving at
> the operator has only the chance to trigger the checkpointing once the
> blocking is over. Even worse, if the blocking happens in a downstream
> operator (not a source), then this blocking could cause backpressure. Since
> the checkpoint barriers flow with the events and are processed in order,
> the backpressure will then also influence the checkpointing time.
> >
> > So if you want to limit the rate, you should do it a the sources without
> blocking the source thread. You could for example count how many elements
> you've emitted in the past second and if it exceeds your maximum, then you
> don't emit the next element to downstream operators until some time has
> passed (this might end up in a busy loop but it allows the checkpointing to
> claim the lock).
> >
> > Cheers,
> > Till
> >
> > On Fri, Jan 20, 2017 at 12:18 PM, Yassine MARZOUGUI <
> y.marzou...@mindlytix.com> wrote:
> > Hi,
> >
> > You might find this similar thread from the mailing list archive helpful
> : http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/throttled-stream-td6138.html.
> >
> > Best,
> > Yassine
> >
> > 2017-01-20 10:53 GMT+01:00 Florian König <florian.koe...@micardo.com>:
> > Hi,
> >
> > i need to limit the rate of processing in a Flink stream application.
> Specifically, the number of items processed in a .map() operation has to
> stay under a certain maximum per second.
> >
> > At the moment, I have another .map() operation before the actual
> processing, which just sleeps for a certain time (e.g., 250ms for a limit
> of 4 requests / sec) and returns the item unchanged:
> >
> > …
> >
> > public T map(final T value) throws Exception {
> >         Thread.sleep(delay);
> >         return value;
> > }
> >
> > …
> >
> > This works as expected, but is a rather crude approach. Checkpointing
> the job takes a very long time: minutes for a state of a few kB, which for
> other jobs is done in a few milliseconds. I assume that letting the whole
> thread sleep for most of the time interferes with the checkpointing - not
> good!
> >
> > Would using a different synchronization mechanism (e.g.,
> https://google.github.io/guava/releases/19.0/api/docs/
> index.html?com/google/common/util/concurrent/RateLimiter.html) help to
> make checkpointing work better?
> >
> > Or, preferably, is there a mechanism inside Flink that I can use to
> accomplish the desired rate limiting? I haven’t found anything in the docs.
> >
> > Cheers,
> > Florian
> >
> >
>
>
>

Reply via email to