no problem and I'm glad that it's solving your problem :)

Piotrek

śr., 5 maj 2021 o 08:58 Wenhao Ji <predator....@gmail.com> napisał(a):

> Thanks Piotr for your reply!
> It is a nice solution! By restricting the buffer using these properties, I
> think maxConcurrentRequests attribute is indeed not necessary anymore.
>
> On Tue, May 4, 2021 at 11:52 PM Piotr Nowojski <piotr.nowoj...@gmail.com>
> wrote:
>
> > Hi Wenhao,
> >
> > As far as I know this is different compared to FLINK-9083, as
> KafkaProducer
> > itself can back pressure writes if internal buffers are exhausted [1].
> >
> > > The buffer.memory controls the total amount of memory available to the
> > producer for buffering. If records are sent faster than they can be
> > transmitted to the server then this buffer space will be exhausted. When
> > the buffer space is exhausted additional send calls will block. The
> > threshold for time to block is determined by max.block.ms after which it
> > throws a TimeoutException.
> >
> > If you want to limit the amount of concurrent requests you can do it via
> > reducing the `buffer.memory` option passed to KafkaProducer (via
> > FlinkKafkaProducer's "Properties producerConfig").
> >
> > Piotrek
> >
> > [1]
> >
> >
> https://kafka.apache.org/24/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
> >
> > pt., 23 kwi 2021 o 11:15 Wenhao Ji <predator....@gmail.com> napisał(a):
> >
> > > Hi everyone,
> > >
> > > I recently came across the following exception when dealing with a job
> > > failure, which uses the Flink as its sink.
> > >
> > > ```
> > > org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed
> > to
> > > send data to Kafka: Expiring #N record(s) for TOPIC-PARTITION:#N ms has
> > > passed since batch creation
> > > ```
> > >
> > > After I dug into the source code of FlinkKafkaProducer, I found out
> that
> > > FlinkKafkaProducer does not have any kind of backpressure mechanism if
> I
> > am
> > > correct. Incoming records are simply sent using KafkaProducer#send
> > without
> > > synchronization (at FlinkKafkaProducer.java#L915
> > > <
> > >
> >
> https://github.com/apache/flink/blob/74078914a153a8cc1d6af2c5069e6042432ad13d/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java#L915
> > > >).
> > > If the parallelism of the producer is not correctly set according to
> its
> > > upstream throughput or write to the leader of a topic partition
> performs
> > > badly, the accumulator in KafkaProducer will be full of unsent records
> > and
> > > finally causes record expiration as the one above.
> > >
> > > I have seen there was a similar ticket FLINK-9083
> > > <https://issues.apache.org/jira/browse/FLINK-9083> before, which is
> for
> > > the Cassandra connector. Shall we have the same improvement for the
> Kafka
> > > connect? Maybe we can also have maxConcurrentRequests attribute in
> > > FlinkKafkaProducer and use a semaphore to limit requests?
> > >
> > > Thanks,
> > > Wenhao
> > >
> >
>

Reply via email to