[ https://issues.apache.org/jira/browse/KAFKA-10790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17244252#comment-17244252 ]
GeordieMai edited comment on KAFKA-10790 at 12/4/20, 7:23 PM: -------------------------------------------------------------- [~chia7712] Can I take this issue? when flush method is called in callback , throw a exception to notify user to prevent it or just make flush method not working or make flush method work fine what do you think ? was (Author: geordie): [~chia7712] Can I take this issue? when flush method is called in callback , throw a exception to notify user to prevent it or just make flush method not working or make flush method work fun what do you think ? > Detect/Prevent Deadlock on Producer Network Thread > -------------------------------------------------- > > Key: KAFKA-10790 > URL: https://issues.apache.org/jira/browse/KAFKA-10790 > Project: Kafka > Issue Type: Improvement > Components: clients > Affects Versions: 2.6.0, 2.7.0 > Reporter: Gary Russell > Priority: Major > > I realize this is contrived, but I stumbled across the problem while testing > some library code with 2.7.0 RC3 (although the issue is not limited to 2.7). > For example, calling flush() on the producer callback deadlocks the network > thread (and any attempt to close the producer thereafter). > {code:java} > producer.send(new ProducerRecord("foo", "bar"), (rm, ex) -> { > producer.flush(); > }); > Thread.sleep(1000); > producer.close(); > {code} > It took some time to figure out why the close was blocked. > There is existing logic in close() to avoid it blocking if called from the > callback; perhaps similar logic could be added to flush() (and any other > methods that might block), even if it means throwing an exception to make it > clear that you can't call flush() from the callback. > These stack traces are with the 2.6.0 client. > {noformat} > "main" #1 prio=5 os_prio=31 cpu=1333.10ms elapsed=13.05s > tid=0x00007ff259012800 nid=0x2803 in Object.wait() [0x000070000fda5000] > java.lang.Thread.State: TIMED_WAITING (on object monitor) > at java.lang.Object.wait(java.base@14.0.2/Native Method) > - waiting on <0x0000000700d00000> (a > org.apache.kafka.common.utils.KafkaThread) > at java.lang.Thread.join(java.base@14.0.2/Thread.java:1297) > - locked <0x0000000700d00000> (a > org.apache.kafka.common.utils.KafkaThread) > at > org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1205) > at > org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1182) > at > org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1158) > at com.example.demo.Rk1Application.lambda$2(Rk1Application.java:55) > "kafka-producer-network-thread | producer-1" #24 daemon prio=5 os_prio=31 > cpu=225.80ms elapsed=11.64s tid=0x00007ff256963000 nid=0x7103 waiting on > condition [0x0000700011d04000] > java.lang.Thread.State: WAITING (parking) > at jdk.internal.misc.Unsafe.park(java.base@14.0.2/Native Method) > - parking to wait for <0x00000007020b27e0> (a > java.util.concurrent.CountDownLatch$Sync) > at > java.util.concurrent.locks.LockSupport.park(java.base@14.0.2/LockSupport.java:211) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(java.base@14.0.2/AbstractQueuedSynchronizer.java:714) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@14.0.2/AbstractQueuedSynchronizer.java:1046) > at > java.util.concurrent.CountDownLatch.await(java.base@14.0.2/CountDownLatch.java:232) > at > org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:76) > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:712) > at > org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:1111) > at com.example.demo.Rk1Application.lambda$3(Rk1Application.java:52) > at > com.example.demo.Rk1Application$$Lambda$528/0x0000000800e28840.onCompletion(Unknown > Source) > at > org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1363) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:228) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:197) > at > org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:653) > at > org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:634) > at > org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:554) > at > org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$0(Sender.java:743) > at > org.apache.kafka.clients.producer.internals.Sender$$Lambda$642/0x0000000800ea2040.onComplete(Unknown > Source) > at > org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) > at > org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:566) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:558) > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:325) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) > at java.lang.Thread.run(java.base@14.0.2/Thread.java:832) > {noformat} -- This message was sent by Atlassian Jira (v8.3.4#803005)