Alexander Sibiryakov created KAFKA-10335:
--------------------------------------------
Summary: Blocking of producer IO thread when calling send() from
callback
Key: KAFKA-10335
URL: https://issues.apache.org/jira/browse/KAFKA-10335
Project: Kafka
Issue Type: Bug
Components: clients, producer
Reporter: Alexander Sibiryakov
We had application which supposed to be using KafkaProducer to deliver results
of some work. Sometimes delivery of results weren't successful because of
network connectivity errors or maintenance happening on the broker side. In
such cases we wanted application to send an event with error and original
message details. All good, but we wanted errors to be delivered to a separate
topic. So we implemented a callback in send() method, using the same producer
instance and calling send() from there.
This application worked for some time, but then we encountered that its
producer was stuck. Almost no CPU consumption and expiring batches for hours.
After connecting with debugger it turned out that sender IO thread is blocking.
When record is expired, a callback was called, which contained a call to
send(), implying usage of a new topic, which metadata is not present in
producer's client cache. When send() is missing metadata, it is allowed to
block for up to max.block.ms interval, which is 60 secs by default. If
application is active, then it will quickly result in a large amount of
accumulated records. Every record will block IO thread for 60s. Therefore
sender IO thread is essentially blocked. In Producer only Sender IO thread
contains a call to client's poll() method, which is responsible for all the
network communication and metadata update. If poll() is executed with
significant delay then it will result to errors, connected with various
timeouts. That's it we've got a stuck producer with little chance to recover.
To summarise, pre-requisites for the problem are sending from callback, using
the same producer instance and usage of topic which wasn't seen before.
I think it is important to decide if the issue is KafkaProducer misuse or its
bug. Code is callbacks shouldn't block, that is clear, but at the same time, no
one expects already initialised producer to block.
Depending on decision I could produce a fix, it can be either a warning when
user is trying to call a send() from callback, or reduction of max allowed
blocking time for metadata update. It could be just docs changes, or even
nothing.
Here is code to reproduce the issue, the output it is producing follows the
code snippet. Tested on Confluent Cloud, from my desktop with 100 Mbps
connection.
{code:java}
public static void main(String[] args) throws IOException {
byte[] blob = new byte[262144];
Properties properties = new Properties();
properties.load(new FileReader("kafka-staging.properties"));
properties.setProperty("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("value.serializer",
"org.apache.kafka.common.serialization.ByteArraySerializer");
properties.setProperty("request.timeout.ms", "5000");
properties.setProperty("delivery.timeout.ms", "5000");
KafkaProducer<String, byte[]> producer = new KafkaProducer(properties);
while (true) {
ProducerRecord<String, byte[]> record = new
ProducerRecord<>("alex-test-valid-data", blob);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception
exception) {
if (exception != null) {
System.err.println(exception);
long start = System.currentTimeMillis();
ProducerRecord<String, byte[]> record = new
ProducerRecord<>("alex-test-errors", blob);
producer.send(record); // blocking caused by metadata
update
long timeElapsed = System.currentTimeMillis() - start;
System.err.println("time spent blocking IO thread: " +
timeElapsed);
}
}
});
}
}
{code}
{noformat}
[2020-07-31 14:35:51,936: INFO/main] (AbstractConfig.java:347) - ProducerConfig
values:
acks = 1
batch.size = 16384
bootstrap.servers = [pkc-l915e.europe-west1.gcp.confluent.cloud:9092]
buffer.memory = 33554432
client.dns.lookup = default
client.id =
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 5000
enable.idempotence = false
interceptor.classes = []
key.serializer = class
org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class
org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 5000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = [hidden]
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = PLAIN
security.protocol = SASL_SSL
security.providers = null
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class
org.apache.kafka.common.serialization.ByteArraySerializer
[2020-07-31 14:35:52,099: INFO/main] (AbstractLogin.java:61) - Successfully
logged in.
[2020-07-31 14:35:52,291: INFO/main] (AppInfoParser.java:117) - Kafka version:
5.4.0-ccs
[2020-07-31 14:35:52,291: INFO/main] (AppInfoParser.java:118) - Kafka commitId:
f4201a82bea68cc7
[2020-07-31 14:35:52,291: INFO/main] (AppInfoParser.java:119) - Kafka
startTimeMs: 1596198952287
[2020-07-31 14:35:52,853: INFO/kafka-producer-network-thread | producer-1]
(Metadata.java:261) - [Producer clientId=producer-1] Cluster ID: lkc-43m2m
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for
alex-test-valid-data-0:5001 ms has passed since batch creation
org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory
within the configured max blocking time 60000 ms.
time spent blocking IO thread: 60001
org.apache.kafka.common.errors.NetworkException: The server disconnected before
a response was received.
time spent blocking IO thread: 60002
time spent blocking IO thread: 60017
org.apache.kafka.common.errors.NetworkException: The server disconnected before
a response was received.
[2020-07-31 14:38:07,219: WARN/kafka-producer-network-thread | producer-1]
(Sender.java:682) - [Producer clientId=producer-1] Received invalid metadata
error in produce request on partition alex-test-valid-data-3 due to
org.apache.kafka.common.errors.NetworkException: The server disconnected before
a response was received.. Going to request metadata update now
org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory
within the configured max blocking time 60000 ms.
time spent blocking IO thread: 60003
[2020-07-31 14:39:07,223: WARN/kafka-producer-network-thread | producer-1]
(Sender.java:682) - [Producer clientId=producer-1] Received invalid metadata
error in produce request on partition alex-test-valid-data-0 due to
org.apache.kafka.common.errors.NetworkException: The server disconnected before
a response was received.. Going to request metadata update now
org.apache.kafka.common.errors.NetworkException: The server disconnected before
a response was received.
time spent blocking IO thread: 60002
time spent blocking IO thread: 60001
[2020-07-31 14:40:07,224: WARN/kafka-producer-network-thread | producer-1]
(Sender.java:682) - [Producer clientId=producer-1] Received invalid metadata
error in produce request on partition alex-test-valid-data-5 due to
org.apache.kafka.common.errors.NetworkException: The server disconnected before
a response was received.. Going to request metadata update now
org.apache.kafka.common.errors.NetworkException: The server disconnected before
a response was received.
org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory
within the configured max blocking time 60000 ms.
time spent blocking IO thread: 60001
[2020-07-31 14:41:07,225: WARN/kafka-producer-network-thread | producer-1]
(Sender.java:682) - [Producer clientId=producer-1] Received invalid metadata
error in produce request on partition alex-test-valid-data-1 due to
org.apache.kafka.common.errors.NetworkException: The server disconnected before
a response was received.. Going to request metadata update now
org.apache.kafka.common.errors.NetworkException: The server disconnected before
a response was received.
time spent blocking IO thread: 60004
time spent blocking IO thread: 60004
[2020-07-31 14:42:07,229: WARN/kafka-producer-network-thread | producer-1]
(Sender.java:682) - [Producer clientId=producer-1] Received invalid metadata
error in produce request on partition alex-test-valid-data-4 due to
org.apache.kafka.common.errors.NetworkException: The server disconnected before
a response was received.. Going to request metadata update now
org.apache.kafka.common.errors.NetworkException: The server disconnected before
a response was received.
org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory
within the configured max blocking time 60000 ms.
time spent blocking IO thread: 60000
[2020-07-31 14:43:07,229: WARN/kafka-producer-network-thread | producer-1]
(Sender.java:682) - [Producer clientId=producer-1] Received invalid metadata
error in produce request on partition alex-test-valid-data-2 due to
org.apache.kafka.common.errors.NetworkException: The server disconnected before
a response was received.. Going to request metadata update now
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for
alex-test-valid-data-5:422600 ms has passed since batch creation
time spent blocking IO thread: 60003
time spent blocking IO thread: 60001
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for
alex-test-valid-data-5:422490 ms has passed since batch creation
org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory
within the configured max blocking time 60000 ms.
time spent blocking IO thread: 60002
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for
alex-test-valid-data-5:422315 ms has passed since batch creation
time spent blocking IO thread: 60003
time spent blocking IO thread: 60003
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for
alex-test-valid-data-5:422124 ms has passed since batch creation
{noformat}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)