Alexander Sibiryakov created KAFKA-10335:
--------------------------------------------

             Summary: Blocking of producer IO thread when calling send() from 
callback
                 Key: KAFKA-10335
                 URL: https://issues.apache.org/jira/browse/KAFKA-10335
             Project: Kafka
          Issue Type: Bug
          Components: clients, producer 
            Reporter: Alexander Sibiryakov


We had application which supposed to be using KafkaProducer to deliver results 
of some work. Sometimes delivery of results weren't successful because of 
network connectivity errors or maintenance happening on the broker side. In 
such cases we wanted application to send an event with error and original 
message details. All good, but we wanted errors to be delivered to a separate 
topic. So we implemented a callback in send() method, using the same producer 
instance and calling send() from there.

This application worked for some time, but then we encountered that its 
producer was stuck. Almost no CPU consumption and expiring batches for hours. 
After connecting with debugger it turned out that sender IO thread is blocking. 
When record is expired, a callback was called, which contained a call to 
send(), implying usage of a new topic, which metadata is not present in 
producer's client cache. When send() is missing metadata, it is allowed to 
block for up to max.block.ms interval, which is 60 secs by default. If 
application is active, then it will quickly result in a large amount of 
accumulated records. Every record will block IO thread for 60s. Therefore 
sender IO thread is essentially blocked. In Producer only Sender IO thread 
contains a call to client's poll() method, which is responsible for all the 
network communication and metadata update. If poll() is executed with 
significant delay then it will result to errors, connected with various 
timeouts. That's it we've got a stuck producer with little chance to recover.

To summarise, pre-requisites for the problem are sending from callback, using 
the same producer instance and usage of topic which wasn't seen before.

I think it is important to decide if the issue is KafkaProducer misuse or its 
bug. Code is callbacks shouldn't block, that is clear, but at the same time, no 
one expects already initialised producer to block.

Depending on decision I could produce a fix, it can be either a warning when 
user is trying to call a send() from callback, or reduction of max allowed 
blocking time for metadata update. It could be just docs changes, or even 
nothing.

Here is code to reproduce the issue, the output it is producing follows the 
code snippet. Tested on Confluent Cloud, from my desktop with 100 Mbps 
connection.
{code:java}
    public static void main(String[] args) throws IOException {
        byte[] blob = new byte[262144];
        Properties properties = new Properties();
        properties.load(new FileReader("kafka-staging.properties"));
        properties.setProperty("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
        properties.setProperty("value.serializer", 
"org.apache.kafka.common.serialization.ByteArraySerializer");
        properties.setProperty("request.timeout.ms", "5000");
        properties.setProperty("delivery.timeout.ms", "5000");
        KafkaProducer<String, byte[]> producer = new KafkaProducer(properties);
        while (true) {
            ProducerRecord<String, byte[]> record = new 
ProducerRecord<>("alex-test-valid-data", blob);
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception 
exception) {
                    if (exception != null) {
                        System.err.println(exception);
                        long start = System.currentTimeMillis();
                        ProducerRecord<String, byte[]> record = new 
ProducerRecord<>("alex-test-errors", blob);
                        producer.send(record);  // blocking caused by metadata 
update
                        long timeElapsed = System.currentTimeMillis() - start;
                        System.err.println("time spent blocking IO thread: " + 
timeElapsed);
                    }
                }
            });
        }
    }
{code}
{noformat}
[2020-07-31 14:35:51,936: INFO/main] (AbstractConfig.java:347) - ProducerConfig 
values: 
        acks = 1
        batch.size = 16384
        bootstrap.servers = [pkc-l915e.europe-west1.gcp.confluent.cloud:9092]
        buffer.memory = 33554432
        client.dns.lookup = default
        client.id = 
        compression.type = none
        connections.max.idle.ms = 540000
        delivery.timeout.ms = 5000
        enable.idempotence = false
        interceptor.classes = []
        key.serializer = class 
org.apache.kafka.common.serialization.StringSerializer
        linger.ms = 0
        max.block.ms = 60000
        max.in.flight.requests.per.connection = 5
        max.request.size = 1048576
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partitioner.class = class 
org.apache.kafka.clients.producer.internals.DefaultPartitioner
        receive.buffer.bytes = 32768
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 5000
        retries = 2147483647
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = [hidden]
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = PLAIN
        security.protocol = SASL_SSL
        security.providers = null
        send.buffer.bytes = 131072
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = https
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        transaction.timeout.ms = 60000
        transactional.id = null
        value.serializer = class 
org.apache.kafka.common.serialization.ByteArraySerializer

[2020-07-31 14:35:52,099: INFO/main] (AbstractLogin.java:61) - Successfully 
logged in.
[2020-07-31 14:35:52,291: INFO/main] (AppInfoParser.java:117) - Kafka version: 
5.4.0-ccs
[2020-07-31 14:35:52,291: INFO/main] (AppInfoParser.java:118) - Kafka commitId: 
f4201a82bea68cc7
[2020-07-31 14:35:52,291: INFO/main] (AppInfoParser.java:119) - Kafka 
startTimeMs: 1596198952287
[2020-07-31 14:35:52,853: INFO/kafka-producer-network-thread | producer-1] 
(Metadata.java:261) - [Producer clientId=producer-1] Cluster ID: lkc-43m2m
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for 
alex-test-valid-data-0:5001 ms has passed since batch creation
org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory 
within the configured max blocking time 60000 ms.
time spent blocking IO thread: 60001
org.apache.kafka.common.errors.NetworkException: The server disconnected before 
a response was received.
time spent blocking IO thread: 60002
time spent blocking IO thread: 60017
org.apache.kafka.common.errors.NetworkException: The server disconnected before 
a response was received.
[2020-07-31 14:38:07,219: WARN/kafka-producer-network-thread | producer-1] 
(Sender.java:682) - [Producer clientId=producer-1] Received invalid metadata 
error in produce request on partition alex-test-valid-data-3 due to 
org.apache.kafka.common.errors.NetworkException: The server disconnected before 
a response was received.. Going to request metadata update now
org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory 
within the configured max blocking time 60000 ms.
time spent blocking IO thread: 60003
[2020-07-31 14:39:07,223: WARN/kafka-producer-network-thread | producer-1] 
(Sender.java:682) - [Producer clientId=producer-1] Received invalid metadata 
error in produce request on partition alex-test-valid-data-0 due to 
org.apache.kafka.common.errors.NetworkException: The server disconnected before 
a response was received.. Going to request metadata update now
org.apache.kafka.common.errors.NetworkException: The server disconnected before 
a response was received.
time spent blocking IO thread: 60002
time spent blocking IO thread: 60001
[2020-07-31 14:40:07,224: WARN/kafka-producer-network-thread | producer-1] 
(Sender.java:682) - [Producer clientId=producer-1] Received invalid metadata 
error in produce request on partition alex-test-valid-data-5 due to 
org.apache.kafka.common.errors.NetworkException: The server disconnected before 
a response was received.. Going to request metadata update now
org.apache.kafka.common.errors.NetworkException: The server disconnected before 
a response was received.
org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory 
within the configured max blocking time 60000 ms.
time spent blocking IO thread: 60001
[2020-07-31 14:41:07,225: WARN/kafka-producer-network-thread | producer-1] 
(Sender.java:682) - [Producer clientId=producer-1] Received invalid metadata 
error in produce request on partition alex-test-valid-data-1 due to 
org.apache.kafka.common.errors.NetworkException: The server disconnected before 
a response was received.. Going to request metadata update now
org.apache.kafka.common.errors.NetworkException: The server disconnected before 
a response was received.
time spent blocking IO thread: 60004
time spent blocking IO thread: 60004
[2020-07-31 14:42:07,229: WARN/kafka-producer-network-thread | producer-1] 
(Sender.java:682) - [Producer clientId=producer-1] Received invalid metadata 
error in produce request on partition alex-test-valid-data-4 due to 
org.apache.kafka.common.errors.NetworkException: The server disconnected before 
a response was received.. Going to request metadata update now
org.apache.kafka.common.errors.NetworkException: The server disconnected before 
a response was received.
org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory 
within the configured max blocking time 60000 ms.
time spent blocking IO thread: 60000
[2020-07-31 14:43:07,229: WARN/kafka-producer-network-thread | producer-1] 
(Sender.java:682) - [Producer clientId=producer-1] Received invalid metadata 
error in produce request on partition alex-test-valid-data-2 due to 
org.apache.kafka.common.errors.NetworkException: The server disconnected before 
a response was received.. Going to request metadata update now
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for 
alex-test-valid-data-5:422600 ms has passed since batch creation
time spent blocking IO thread: 60003
time spent blocking IO thread: 60001
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for 
alex-test-valid-data-5:422490 ms has passed since batch creation
org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory 
within the configured max blocking time 60000 ms.
time spent blocking IO thread: 60002
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for 
alex-test-valid-data-5:422315 ms has passed since batch creation
time spent blocking IO thread: 60003
time spent blocking IO thread: 60003
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for 
alex-test-valid-data-5:422124 ms has passed since batch creation
{noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to