no problem and I'm glad that it's solving your problem :) Piotrek
śr., 5 maj 2021 o 08:58 Wenhao Ji <predator....@gmail.com> napisał(a): > Thanks Piotr for your reply! > It is a nice solution! By restricting the buffer using these properties, I > think maxConcurrentRequests attribute is indeed not necessary anymore. > > On Tue, May 4, 2021 at 11:52 PM Piotr Nowojski <piotr.nowoj...@gmail.com> > wrote: > > > Hi Wenhao, > > > > As far as I know this is different compared to FLINK-9083, as > KafkaProducer > > itself can back pressure writes if internal buffers are exhausted [1]. > > > > > The buffer.memory controls the total amount of memory available to the > > producer for buffering. If records are sent faster than they can be > > transmitted to the server then this buffer space will be exhausted. When > > the buffer space is exhausted additional send calls will block. The > > threshold for time to block is determined by max.block.ms after which it > > throws a TimeoutException. > > > > If you want to limit the amount of concurrent requests you can do it via > > reducing the `buffer.memory` option passed to KafkaProducer (via > > FlinkKafkaProducer's "Properties producerConfig"). > > > > Piotrek > > > > [1] > > > > > https://kafka.apache.org/24/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html > > > > pt., 23 kwi 2021 o 11:15 Wenhao Ji <predator....@gmail.com> napisał(a): > > > > > Hi everyone, > > > > > > I recently came across the following exception when dealing with a job > > > failure, which uses the Flink as its sink. > > > > > > ``` > > > org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed > > to > > > send data to Kafka: Expiring #N record(s) for TOPIC-PARTITION:#N ms has > > > passed since batch creation > > > ``` > > > > > > After I dug into the source code of FlinkKafkaProducer, I found out > that > > > FlinkKafkaProducer does not have any kind of backpressure mechanism if > I > > am > > > correct. Incoming records are simply sent using KafkaProducer#send > > without > > > synchronization (at FlinkKafkaProducer.java#L915 > > > < > > > > > > https://github.com/apache/flink/blob/74078914a153a8cc1d6af2c5069e6042432ad13d/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java#L915 > > > >). > > > If the parallelism of the producer is not correctly set according to > its > > > upstream throughput or write to the leader of a topic partition > performs > > > badly, the accumulator in KafkaProducer will be full of unsent records > > and > > > finally causes record expiration as the one above. > > > > > > I have seen there was a similar ticket FLINK-9083 > > > <https://issues.apache.org/jira/browse/FLINK-9083> before, which is > for > > > the Cassandra connector. Shall we have the same improvement for the > Kafka > > > connect? Maybe we can also have maxConcurrentRequests attribute in > > > FlinkKafkaProducer and use a semaphore to limit requests? > > > > > > Thanks, > > > Wenhao > > > > > >