[ https://issues.apache.org/jira/browse/KAFKA-12838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17350654#comment-17350654 ]
Ryan Cabral commented on KAFKA-12838: ------------------------------------- Yes, increasing the number of partitions can help reduce contention, but won't eliminate it. It is similar to increasing the number of request / io threads where it will help to mitigate the issue a bit. The core of the problem though is that requests for the same partition are dispatched simultaneously, tying up a request / io thread causing requests for other partitions to wait behind them, even though they depend on a different lock. That means just two producers to the same partition can have some sort of performance impact to an entire broker's overall throughput rather than just the partition's throughput. > Kafka Broker - Request threads inefficiently blocking during produce > -------------------------------------------------------------------- > > Key: KAFKA-12838 > URL: https://issues.apache.org/jira/browse/KAFKA-12838 > Project: Kafka > Issue Type: Improvement > Components: core > Affects Versions: 2.7.0, 2.8.0 > Reporter: Ryan Cabral > Priority: Major > > Hello, I have been using Kafka brokers for a bit and have run into a problem > with the way a kafka broker handles produce requests. If there are multiple > producers to the same topic and partition, any request handler threads > handling the produce for that topic and partition become blocked until all > requests before it are done. Request handler threads for the entire broker > can become exhausted waiting on the same partition lock, blocking requests > for other partitions that would not have needed the same lock. > Once that starts happening, requests start to back up, queued requests can > reach its maximum and network threads begin to be paused cascading the > problem a bit more. Overall performance ends up being degraded. I'm not so > focused on the cascade at the moment as I am the initial contention. > Intuitively I would expect locking contention on a single partition to ONLY > affect throughput on that partition and not the entire broker. > > The append call within the request handler originates here: > [https://github.com/apache/kafka/blob/2.8.0/core/src/main/scala/kafka/server/KafkaApis.scala#L638] > Further down the stack the lock during append is created here: > [https://github.com/apache/kafka/blob/2.8.0/core/src/main/scala/kafka/log/Log.scala#L1165] > At this point the first request will hold the lock during append and future > requests on the same partition will block, waiting for the lock, tying up an > io thread (request handler). > At first glance, it seems like it would make the most sense to (via config?) > be able to funnel (produce) requests for the same partition through its own > request queue of sorts and dispatch them such that at most one io thread is > tied up at a time for a given partition. There are a number of reasons the > lock could be held elsewhere too but this should at least help mitigate the > issue a bit. I'm assuming this is easier said than done though and likely > requires significant refactoring to properly achieve but hoping this is > something that could end up on some sort of long term roadmap. > > Snippet from jstack. Almost all request handlers threads (there are 256 of > them, up from 25 to mitigate the issue) in the jstack are blocked waiting on > the same lock due to the number of producers we have. > > {noformat} > "data-plane-kafka-request-handler-254" #335 daemon prio=5 os_prio=0 > tid=0x00007fb1c9f13000 nid=0x53f1 runnable [0x00007fad35796000] > java.lang.Thread.State: RUNNABLE > at > org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.<init>(KafkaLZ4BlockOutputStream.java:82) > at > org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.<init>(KafkaLZ4BlockOutputStream.java:125) > at > org.apache.kafka.common.record.CompressionType$4.wrapForOutput(CompressionType.java:101) > at > org.apache.kafka.common.record.MemoryRecordsBuilder.<init>(MemoryRecordsBuilder.java:134) > at > org.apache.kafka.common.record.MemoryRecordsBuilder.<init>(MemoryRecordsBuilder.java:170) > at > org.apache.kafka.common.record.MemoryRecords.builder(MemoryRecords.java:508) > at > kafka.log.LogValidator$.buildRecordsAndAssignOffsets(LogValidator.scala:500) > at > kafka.log.LogValidator$.validateMessagesAndAssignOffsetsCompressed(LogValidator.scala:455) > at > kafka.log.LogValidator$.validateMessagesAndAssignOffsets(LogValidator.scala:106) > at kafka.log.Log.$anonfun$append$2(Log.scala:1126) > - locked <0x00000004c9a6fd60> (a java.lang.Object) > at kafka.log.Log.append(Log.scala:2387) > at kafka.log.Log.appendAsLeader(Log.scala:1050) > at > kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1079) > at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1067) > at > kafka.server.ReplicaManager.$anonfun$appendToLocalLog$4(ReplicaManager.scala:953) > at kafka.server.ReplicaManager$$Lambda$1078/1017241486.apply(Unknown > Source) > at > scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28) > at > scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27) > at scala.collection.mutable.HashMap.map(HashMap.scala:35) > at > kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:941) > at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:621) > at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:625) > at kafka.server.KafkaApis.handle(KafkaApis.scala:137) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:74) > at java.lang.Thread.run(Thread.java:748) > "data-plane-kafka-request-handler-253" #334 daemon prio=5 os_prio=0 > tid=0x00007fb1c9f11000 nid=0x53f0 waiting for monitor entry > [0x00007fad35897000] > java.lang.Thread.State: BLOCKED (on object monitor) > at kafka.log.Log.$anonfun$append$2(Log.scala:1104) > - waiting to lock <0x00000004c9a6fd60> (a java.lang.Object) > at kafka.log.Log.append(Log.scala:2387) > at kafka.log.Log.appendAsLeader(Log.scala:1050) > at > kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1079) > at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1067) > at > kafka.server.ReplicaManager.$anonfun$appendToLocalLog$4(ReplicaManager.scala:953) > at kafka.server.ReplicaManager$$Lambda$1078/1017241486.apply(Unknown > Source) > at > scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28) > at > scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27) > at scala.collection.mutable.HashMap.map(HashMap.scala:35) > at > kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:941) > at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:621) > at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:625) > at kafka.server.KafkaApis.handle(KafkaApis.scala:137) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:74) > at java.lang.Thread.run(Thread.java:748){noformat} > > -- This message was sent by Atlassian Jira (v8.3.4#803005)