Guozhang Wang created KAFKA-8135:
------------------------------------
Summary: Kafka Producer deadlocked on flush call with intermittent
broker unavailability
Key: KAFKA-8135
URL: https://issues.apache.org/jira/browse/KAFKA-8135
Project: Kafka
Issue Type: Improvement
Components: clients
Affects Versions: 2.1.0
Reporter: Guozhang Wang
In KIP-91 we added the config {{delivery.timeout.ms}} to replace {{retries}},
and the value is default to 2 minutes. We've observed that when it was set to
MAX_VALUE (e.g. in Kafka Streams, when EOS is turned on), at some times the
{{broker.flush}} call would be blocked during the time when its destination
brokers are undergoing some unavailability:
{code}
java.lang.Thread.State: WAITING (parking)
at jdk.internal.misc.Unsafe.park([email protected]/Native Method)
- parking to wait for <0x00000006aeb21a00> (a
java.util.concurrent.CountDownLatch$Sync)
at java.util.concurrent.locks.LockSupport.park([email protected]/Unknown
Source)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt([email protected]/Unknown
Source)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly([email protected]/Unknown
Source)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly([email protected]/Unknown
Source)
at java.util.concurrent.CountDownLatch.await([email protected]/Unknown
Source)
at
org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:76)
at
org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:693)
at
org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:1066)
at
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:259)
at
org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:520)
at
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:470)
at
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:458)
at
org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:286)
at
org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:412)
at
org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1057)
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:911)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
{code}
And even after the broker went back to normal, producers would still be
blocked. One suspicion is that when broker's not able to handle the request in
time, the responses are dropped somehow inside the Sender, and hence whoever
waiting on this response would be blocked forever.
We've observed such scenarios when 1) broker's transiently failed for a while,
2) network partitioned transiently, and 3) broker's bad config like ACL caused
it to not be able to handle requests for a while.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)