Gary Russell created KAFKA-10790:
------------------------------------

             Summary: Detect/Prevent Deadlock on Producer Network Thread
                 Key: KAFKA-10790
                 URL: https://issues.apache.org/jira/browse/KAFKA-10790
             Project: Kafka
          Issue Type: Improvement
          Components: clients
    Affects Versions: 2.6.0, 2.7.0
            Reporter: Gary Russell


I realize this is contrived, but I stumbled across the problem while testing 
some library code with 2.7.0 RC3 (although the issue is not limited to 2.7).

For example, calling flush() on the producer callback deadlocks the network 
thread (and any attempt to close the producer thereafter).
{code:java}
producer.send(new ProducerRecord("foo", "bar"), (rm, ex) -> {
        producer.flush();
});
Thread.sleep(1000);
producer.close();
{code}
It took some time to figure out why the close was blocked.

There is existing logic in close() to avoid it blocking if called from the 
callback; perhaps similar logic could be added to flush() (and any other 
methods that might block), even if it means throwing an exception to make it 
clear that you can't call flush() from the callback. 

These stack traces are with the 2.6.0 client.
{noformat}
"main" #1 prio=5 os_prio=31 cpu=1333.10ms elapsed=13.05s tid=0x00007ff259012800 
nid=0x2803 in Object.wait()  [0x000070000fda5000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
        at java.lang.Object.wait([email protected]/Native Method)
        - waiting on <0x0000000700d00000> (a 
org.apache.kafka.common.utils.KafkaThread)
        at java.lang.Thread.join([email protected]/Thread.java:1297)
        - locked <0x0000000700d00000> (a 
org.apache.kafka.common.utils.KafkaThread)
        at 
org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1205)
        at 
org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1182)
        at 
org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1158)
        at com.example.demo.Rk1Application.lambda$2(Rk1Application.java:55)


"kafka-producer-network-thread | producer-1" #24 daemon prio=5 os_prio=31 
cpu=225.80ms elapsed=11.64s tid=0x00007ff256963000 nid=0x7103 waiting on 
condition  [0x0000700011d04000]
   java.lang.Thread.State: WAITING (parking)
        at jdk.internal.misc.Unsafe.park([email protected]/Native Method)
        - parking to wait for  <0x00000007020b27e0> (a 
java.util.concurrent.CountDownLatch$Sync)
        at 
java.util.concurrent.locks.LockSupport.park([email protected]/LockSupport.java:211)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire([email protected]/AbstractQueuedSynchronizer.java:714)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly([email protected]/AbstractQueuedSynchronizer.java:1046)
        at 
java.util.concurrent.CountDownLatch.await([email protected]/CountDownLatch.java:232)
        at 
org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:76)
        at 
org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:712)
        at 
org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:1111)
        at com.example.demo.Rk1Application.lambda$3(Rk1Application.java:52)
        at 
com.example.demo.Rk1Application$$Lambda$528/0x0000000800e28840.onCompletion(Unknown
 Source)
        at 
org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1363)
        at 
org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:228)
        at 
org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:197)
        at 
org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:653)
        at 
org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:634)
        at 
org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:554)
        at 
org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$0(Sender.java:743)
        at 
org.apache.kafka.clients.producer.internals.Sender$$Lambda$642/0x0000000800ea2040.onComplete(Unknown
 Source)
        at 
org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
        at 
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:566)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:558)
        at 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:325)
        at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240)
        at java.lang.Thread.run([email protected]/Thread.java:832)
{noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to