[ 
https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14174403#comment-14174403
 ] 

Bhavesh Mistry commented on KAFKA-1710:
---------------------------------------

[~ewencp],

Thanks for the looking into this issue.  We consume as fast as we can 
re-publish the message to another aggregated topic based on some kes in 
message. We  see thread contentions in profile tool and I separated out the 
code and to amplify the problem.  We run with about 75 threads.  [~ewencp] can 
you please discuss this issue with Kafka Community as well ?  The dead lock 
will occur something depending on Thread scheduling  and how log the are 
blocked.  All I am asking is there a better way to enqueue in coming messages.  
I just proposed simple above solution that does not impact application threads 
and only drain threads will be blocked and with buffer as you mentioned we 
might get better through-put (of course at expense of buffered memory 
(unbounded concurrent queue)  and thread context switching) .    If you feel 
this is know performance issue to send to to single partition then please close 
this, and you may start discussion on Kafka Community for this issue.  Thanks 
for your help and suggestions  !! 

According to thread dumps, blocks are happening  in Synchronization block.  
{code}
"pool-1-thread-200" prio=5 tid=0x00007f92451c2000 nid=0x20103 waiting for 
monitor entry [0x000000012d228000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at 
org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:139)
        - waiting to lock <0x0000000703ce39f0> (a java.util.ArrayDeque)
        at 
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:238)
        at 
org.kafka.test.TestNetworkDownProducer$MyProducer.run(TestNetworkDownProducer.java:85)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:744)

"pool-1-thread-199" prio=5 tid=0x00007f92451c1800 nid=0x1ff03 waiting for 
monitor entry [0x000000012d0e5000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at 
org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:139)
        - waiting to lock <0x0000000703ce39f0> (a java.util.ArrayDeque)
        at 
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:238)
        at 
org.kafka.test.TestNetworkDownProducer$MyProducer.run(TestNetworkDownProducer.java:85)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:744)
{code}

> [New Java Producer Potential Deadlock] Producer Deadlock when all messages is 
> being sent to single partition
> ------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-1710
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1710
>             Project: Kafka
>          Issue Type: Bug
>          Components: producer 
>         Environment: Development
>            Reporter: Bhavesh Mistry
>            Priority: Critical
>              Labels: performance
>         Attachments: Screen Shot 2014-10-13 at 10.19.04 AM.png, Screen Shot 
> 2014-10-15 at 9.09.06 PM.png, Screen Shot 2014-10-15 at 9.14.15 PM.png, 
> TestNetworkDownProducer.java, th1.dump, th10.dump, th11.dump, th12.dump, 
> th13.dump, th14.dump, th15.dump, th2.dump, th3.dump, th4.dump, th5.dump, 
> th6.dump, th7.dump, th8.dump, th9.dump
>
>
> Hi Kafka Dev Team,
> When I run the test to send message to single partition for 3 minutes or so 
> on, I have encounter deadlock (please see the screen attached) and thread 
> contention from YourKit profiling.  
> Use Case:
> 1)  Aggregating messages into same partition for metric counting. 
> 2)  Replicate Old Producer behavior for sticking to partition for 3 minutes.
> Here is output:
> Frozen threads found (potential deadlock)
>  
> It seems that the following threads have not changed their stack for more 
> than 10 seconds.
> These threads are possibly (but not necessarily!) in a deadlock or hung.
>  
> pool-1-thread-128 <--- Frozen for at least 2m
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
>  byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139
> org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, 
> Callback) KafkaProducer.java:237
> org.kafka.test.TestNetworkDownProducer$MyProducer.run() 
> TestNetworkDownProducer.java:84
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) 
> ThreadPoolExecutor.java:1145
> java.util.concurrent.ThreadPoolExecutor$Worker.run() 
> ThreadPoolExecutor.java:615
> java.lang.Thread.run() Thread.java:744
> pool-1-thread-159 <--- Frozen for at least 2m 1 sec
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
>  byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139
> org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, 
> Callback) KafkaProducer.java:237
> org.kafka.test.TestNetworkDownProducer$MyProducer.run() 
> TestNetworkDownProducer.java:84
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) 
> ThreadPoolExecutor.java:1145
> java.util.concurrent.ThreadPoolExecutor$Worker.run() 
> ThreadPoolExecutor.java:615
> java.lang.Thread.run() Thread.java:744
> pool-1-thread-55 <--- Frozen for at least 2m
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
>  byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139
> org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, 
> Callback) KafkaProducer.java:237
> org.kafka.test.TestNetworkDownProducer$MyProducer.run() 
> TestNetworkDownProducer.java:84
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) 
> ThreadPoolExecutor.java:1145
> java.util.concurrent.ThreadPoolExecutor$Worker.run() 
> ThreadPoolExecutor.java:615
> java.lang.Thread.run() Thread.java:744
> Thanks,
> Bhavesh 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to