[ 
https://issues.apache.org/jira/browse/KAFKA-10790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17244252#comment-17244252
 ] 

GeordieMai edited comment on KAFKA-10790 at 12/4/20, 7:23 PM:
--------------------------------------------------------------

[~chia7712] Can I take this issue?
 when flush method is called in  callback , 

throw a exception to notify user to prevent  it

or just make flush method not working 

or make flush method work fine

what do you think ?


was (Author: geordie):
[~chia7712] Can I take this issue?
when flush method is called in  callback , 

throw a exception to notify user to prevent  it

or just make flush method not working 

or make flush method work fun 

what do you think ?

> Detect/Prevent Deadlock on Producer Network Thread
> --------------------------------------------------
>
>                 Key: KAFKA-10790
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10790
>             Project: Kafka
>          Issue Type: Improvement
>          Components: clients
>    Affects Versions: 2.6.0, 2.7.0
>            Reporter: Gary Russell
>            Priority: Major
>
> I realize this is contrived, but I stumbled across the problem while testing 
> some library code with 2.7.0 RC3 (although the issue is not limited to 2.7).
> For example, calling flush() on the producer callback deadlocks the network 
> thread (and any attempt to close the producer thereafter).
> {code:java}
> producer.send(new ProducerRecord("foo", "bar"), (rm, ex) -> {
>       producer.flush();
> });
> Thread.sleep(1000);
> producer.close();
> {code}
> It took some time to figure out why the close was blocked.
> There is existing logic in close() to avoid it blocking if called from the 
> callback; perhaps similar logic could be added to flush() (and any other 
> methods that might block), even if it means throwing an exception to make it 
> clear that you can't call flush() from the callback. 
> These stack traces are with the 2.6.0 client.
> {noformat}
> "main" #1 prio=5 os_prio=31 cpu=1333.10ms elapsed=13.05s 
> tid=0x00007ff259012800 nid=0x2803 in Object.wait()  [0x000070000fda5000]
>    java.lang.Thread.State: TIMED_WAITING (on object monitor)
>       at java.lang.Object.wait(java.base@14.0.2/Native Method)
>       - waiting on <0x0000000700d00000> (a 
> org.apache.kafka.common.utils.KafkaThread)
>       at java.lang.Thread.join(java.base@14.0.2/Thread.java:1297)
>       - locked <0x0000000700d00000> (a 
> org.apache.kafka.common.utils.KafkaThread)
>       at 
> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1205)
>       at 
> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1182)
>       at 
> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1158)
>       at com.example.demo.Rk1Application.lambda$2(Rk1Application.java:55)
> "kafka-producer-network-thread | producer-1" #24 daemon prio=5 os_prio=31 
> cpu=225.80ms elapsed=11.64s tid=0x00007ff256963000 nid=0x7103 waiting on 
> condition  [0x0000700011d04000]
>    java.lang.Thread.State: WAITING (parking)
>       at jdk.internal.misc.Unsafe.park(java.base@14.0.2/Native Method)
>       - parking to wait for  <0x00000007020b27e0> (a 
> java.util.concurrent.CountDownLatch$Sync)
>       at 
> java.util.concurrent.locks.LockSupport.park(java.base@14.0.2/LockSupport.java:211)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(java.base@14.0.2/AbstractQueuedSynchronizer.java:714)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@14.0.2/AbstractQueuedSynchronizer.java:1046)
>       at 
> java.util.concurrent.CountDownLatch.await(java.base@14.0.2/CountDownLatch.java:232)
>       at 
> org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:76)
>       at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:712)
>       at 
> org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:1111)
>       at com.example.demo.Rk1Application.lambda$3(Rk1Application.java:52)
>       at 
> com.example.demo.Rk1Application$$Lambda$528/0x0000000800e28840.onCompletion(Unknown
>  Source)
>       at 
> org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1363)
>       at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:228)
>       at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:197)
>       at 
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:653)
>       at 
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:634)
>       at 
> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:554)
>       at 
> org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$0(Sender.java:743)
>       at 
> org.apache.kafka.clients.producer.internals.Sender$$Lambda$642/0x0000000800ea2040.onComplete(Unknown
>  Source)
>       at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
>       at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:566)
>       at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:558)
>       at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:325)
>       at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240)
>       at java.lang.Thread.run(java.base@14.0.2/Thread.java:832)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to