Ryan Cabral created KAFKA-12838:
-----------------------------------
Summary: Kafka Broker - Request threads inefficiently blocking
during produce
Key: KAFKA-12838
URL: https://issues.apache.org/jira/browse/KAFKA-12838
Project: Kafka
Issue Type: Improvement
Components: core
Affects Versions: 2.8.0, 2.7.0
Reporter: Ryan Cabral
Hello, I have been using Kafka brokers for a bit and have run into a problem
with the way a kafka broker handles produce requests. If there are multiple
producers to the same topic and partition, any request handler threads handling
the produce for that topic and partition become blocked until all requests
before it are done. Request handler threads for the entire broker can become
exhausted waiting on the same partition lock, blocking requests for other
partitions that would not have needed the same lock.
Once that starts happening, requests start to back up, queued requests can
reach its maximum and network threads begin to be paused cascading the problem
a bit more. Overall performance ends up being degraded. I'm not so focused on
the cascade at the moment as I am the initial contention. Intuitively I would
expect locking contention on a single partition to ONLY affect throughput on
that partition and not the entire broker.
The append call within the request handler originates here:
[https://github.com/apache/kafka/blob/2.8.0/core/src/main/scala/kafka/server/KafkaApis.scala#L638]
Further down the stack the lock during append is created here:
[https://github.com/apache/kafka/blob/2.8.0/core/src/main/scala/kafka/log/Log.scala#L1165]
At this point the first request will hold the lock during append and future
requests on the same partition will block, waiting for the lock, tying up an io
thread (request handler).
At first glance, it seems like it would make the most sense to (via config?) be
able to funnel (produce) requests for the same partition through its own
request queue of sorts and dispatch them such that at most one io thread is
tied up at a time for a given partition. There are a number of reasons the lock
could be held elsewhere too but this should at least help mitigate the issue a
bit. I'm assuming this is easier said than done though and likely requires
significant refactoring to properly achieve but hoping this is something that
could end up on some sort of long term roadmap.
Snippet from jstack. Almost all request handlers threads (there are 256 of
them, up from 25 to mitigate the issue) in the jstack are blocked waiting on
the same lock due to the number of producers we have.
{noformat}
"data-plane-kafka-request-handler-254" #335 daemon prio=5 os_prio=0
tid=0x00007fb1c9f13000 nid=0x53f1 runnable [0x00007fad35796000]
java.lang.Thread.State: RUNNABLE
at
org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.<init>(KafkaLZ4BlockOutputStream.java:82)
at
org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.<init>(KafkaLZ4BlockOutputStream.java:125)
at
org.apache.kafka.common.record.CompressionType$4.wrapForOutput(CompressionType.java:101)
at
org.apache.kafka.common.record.MemoryRecordsBuilder.<init>(MemoryRecordsBuilder.java:134)
at
org.apache.kafka.common.record.MemoryRecordsBuilder.<init>(MemoryRecordsBuilder.java:170)
at
org.apache.kafka.common.record.MemoryRecords.builder(MemoryRecords.java:508)
at
kafka.log.LogValidator$.buildRecordsAndAssignOffsets(LogValidator.scala:500)
at
kafka.log.LogValidator$.validateMessagesAndAssignOffsetsCompressed(LogValidator.scala:455)
at
kafka.log.LogValidator$.validateMessagesAndAssignOffsets(LogValidator.scala:106)
at kafka.log.Log.$anonfun$append$2(Log.scala:1126)
- locked <0x00000004c9a6fd60> (a java.lang.Object)
at kafka.log.Log.append(Log.scala:2387)
at kafka.log.Log.appendAsLeader(Log.scala:1050)
at
kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1079)
at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1067)
at
kafka.server.ReplicaManager.$anonfun$appendToLocalLog$4(ReplicaManager.scala:953)
at kafka.server.ReplicaManager$$Lambda$1078/1017241486.apply(Unknown
Source)
at
scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
at
scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
at scala.collection.mutable.HashMap.map(HashMap.scala:35)
at
kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:941)
at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:621)
at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:625)
at kafka.server.KafkaApis.handle(KafkaApis.scala:137)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:74)
at java.lang.Thread.run(Thread.java:748)
"data-plane-kafka-request-handler-253" #334 daemon prio=5 os_prio=0
tid=0x00007fb1c9f11000 nid=0x53f0 waiting for monitor entry [0x00007fad35897000]
java.lang.Thread.State: BLOCKED (on object monitor)
at kafka.log.Log.$anonfun$append$2(Log.scala:1104)
- waiting to lock <0x00000004c9a6fd60> (a java.lang.Object)
at kafka.log.Log.append(Log.scala:2387)
at kafka.log.Log.appendAsLeader(Log.scala:1050)
at
kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1079)
at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1067)
at
kafka.server.ReplicaManager.$anonfun$appendToLocalLog$4(ReplicaManager.scala:953)
at kafka.server.ReplicaManager$$Lambda$1078/1017241486.apply(Unknown
Source)
at
scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
at
scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
at scala.collection.mutable.HashMap.map(HashMap.scala:35)
at
kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:941)
at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:621)
at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:625)
at kafka.server.KafkaApis.handle(KafkaApis.scala:137)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:74)
at java.lang.Thread.run(Thread.java:748){noformat}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)