[ 
https://issues.apache.org/jira/browse/KAFKA-12838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17350654#comment-17350654
 ] 

Ryan Cabral commented on KAFKA-12838:
-------------------------------------

Yes, increasing the number of partitions can help reduce contention, but won't 
eliminate it. It is similar to increasing the number of request / io threads 
where it will help to mitigate the issue a bit. The core of the problem though 
is that requests for the same partition are dispatched simultaneously, tying up 
a request / io thread causing requests for other partitions to wait behind 
them, even though they depend on a different lock. That means just two 
producers to the same partition can have some sort of performance impact to an 
entire broker's overall throughput rather than just the partition's throughput. 

> Kafka Broker - Request threads inefficiently blocking during produce
> --------------------------------------------------------------------
>
>                 Key: KAFKA-12838
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12838
>             Project: Kafka
>          Issue Type: Improvement
>          Components: core
>    Affects Versions: 2.7.0, 2.8.0
>            Reporter: Ryan Cabral
>            Priority: Major
>
> Hello, I have been using Kafka brokers for a bit and have run into a problem 
> with the way a kafka broker handles produce requests. If there are multiple 
> producers to the same topic and partition, any request handler threads 
> handling the produce for that topic and partition become blocked until all 
> requests before it are done. Request handler threads for the entire broker 
> can become exhausted waiting on the same partition lock, blocking requests 
> for other partitions that would not have needed the same lock.
> Once that starts happening, requests start to back up, queued requests can 
> reach its maximum and network threads begin to be paused cascading the 
> problem a bit more. Overall performance ends up being degraded. I'm not so 
> focused on the cascade at the moment as I am the initial contention. 
> Intuitively I would expect locking contention on a single partition to ONLY 
> affect throughput on that partition and not the entire broker.
>  
> The append call within the request handler originates here:
> [https://github.com/apache/kafka/blob/2.8.0/core/src/main/scala/kafka/server/KafkaApis.scala#L638]
> Further down the stack the lock during append is created here: 
> [https://github.com/apache/kafka/blob/2.8.0/core/src/main/scala/kafka/log/Log.scala#L1165]
> At this point the first request will hold the lock during append and future 
> requests on the same partition will block, waiting for the lock, tying up an 
> io thread (request handler).
> At first glance, it seems like it would make the most sense to (via config?) 
> be able to funnel (produce) requests for the same partition through its own 
> request queue of sorts and dispatch them such that at most one io thread is 
> tied up at a time for a given partition. There are a number of reasons the 
> lock could be held elsewhere too but this should at least help mitigate the 
> issue a bit. I'm assuming this is easier said than done though and likely 
> requires significant refactoring to properly achieve but hoping this is 
> something that could end up on some sort of long term roadmap.
>  
> Snippet from jstack. Almost all request handlers threads (there are 256 of 
> them, up from 25 to mitigate the issue) in the jstack are blocked waiting on 
> the same lock due to the number of producers we have.
>  
> {noformat}
> "data-plane-kafka-request-handler-254" #335 daemon prio=5 os_prio=0 
> tid=0x00007fb1c9f13000 nid=0x53f1 runnable [0x00007fad35796000]
>    java.lang.Thread.State: RUNNABLE
>       at 
> org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.<init>(KafkaLZ4BlockOutputStream.java:82)
>       at 
> org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.<init>(KafkaLZ4BlockOutputStream.java:125)
>       at 
> org.apache.kafka.common.record.CompressionType$4.wrapForOutput(CompressionType.java:101)
>       at 
> org.apache.kafka.common.record.MemoryRecordsBuilder.<init>(MemoryRecordsBuilder.java:134)
>       at 
> org.apache.kafka.common.record.MemoryRecordsBuilder.<init>(MemoryRecordsBuilder.java:170)
>       at 
> org.apache.kafka.common.record.MemoryRecords.builder(MemoryRecords.java:508)
>       at 
> kafka.log.LogValidator$.buildRecordsAndAssignOffsets(LogValidator.scala:500)
>       at 
> kafka.log.LogValidator$.validateMessagesAndAssignOffsetsCompressed(LogValidator.scala:455)
>       at 
> kafka.log.LogValidator$.validateMessagesAndAssignOffsets(LogValidator.scala:106)
>       at kafka.log.Log.$anonfun$append$2(Log.scala:1126)
>       - locked <0x00000004c9a6fd60> (a java.lang.Object)
>       at kafka.log.Log.append(Log.scala:2387)
>       at kafka.log.Log.appendAsLeader(Log.scala:1050)
>       at 
> kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1079)
>       at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1067)
>       at 
> kafka.server.ReplicaManager.$anonfun$appendToLocalLog$4(ReplicaManager.scala:953)
>       at kafka.server.ReplicaManager$$Lambda$1078/1017241486.apply(Unknown 
> Source)
>       at 
> scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
>       at 
> scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
>       at scala.collection.mutable.HashMap.map(HashMap.scala:35)
>       at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:941)
>       at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:621)
>       at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:625)
>       at kafka.server.KafkaApis.handle(KafkaApis.scala:137)
>       at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:74)
>       at java.lang.Thread.run(Thread.java:748)
> "data-plane-kafka-request-handler-253" #334 daemon prio=5 os_prio=0 
> tid=0x00007fb1c9f11000 nid=0x53f0 waiting for monitor entry 
> [0x00007fad35897000]
>    java.lang.Thread.State: BLOCKED (on object monitor)
>       at kafka.log.Log.$anonfun$append$2(Log.scala:1104)
>       - waiting to lock <0x00000004c9a6fd60> (a java.lang.Object)
>       at kafka.log.Log.append(Log.scala:2387)
>       at kafka.log.Log.appendAsLeader(Log.scala:1050)
>       at 
> kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1079)
>       at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1067)
>       at 
> kafka.server.ReplicaManager.$anonfun$appendToLocalLog$4(ReplicaManager.scala:953)
>       at kafka.server.ReplicaManager$$Lambda$1078/1017241486.apply(Unknown 
> Source)
>       at 
> scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
>       at 
> scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
>       at scala.collection.mutable.HashMap.map(HashMap.scala:35)
>       at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:941)
>       at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:621)
>       at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:625)
>       at kafka.server.KafkaApis.handle(KafkaApis.scala:137)
>       at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:74)
>       at java.lang.Thread.run(Thread.java:748){noformat}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to