[ 
https://issues.apache.org/jira/browse/KAFKA-8135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16809993#comment-16809993
 ] 

Guozhang Wang commented on KAFKA-8135:
--------------------------------------

Hmm that's possible, as ConcurrentModificationException will only be caught at 
the higher-level `run` method:

{code}
            try {
                run(time.milliseconds());
            } catch (Exception e) {
                log.error("Uncaught error in kafka producer I/O thread: ", e);
            }
{code}

But before that exception thrown, the batch expired is already removed as 
{{iter.remove();}}, hence it become a dangling object.

> Kafka Producer deadlocked on flush call with intermittent broker 
> unavailability
> -------------------------------------------------------------------------------
>
>                 Key: KAFKA-8135
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8135
>             Project: Kafka
>          Issue Type: Improvement
>          Components: clients
>    Affects Versions: 2.1.0
>            Reporter: Guozhang Wang
>            Assignee: Rajini Sivaram
>            Priority: Major
>
> In KIP-91 we added the config {{delivery.timeout.ms}} to replace {{retries}}, 
> and the value is default to 2 minutes. We've observed that when it was set to 
> MAX_VALUE (e.g. in Kafka Streams, when EOS is turned on), at some times the 
> {{broker.flush}} call would be blocked during the time when its destination 
> brokers are undergoing some unavailability:
> {code}
> java.lang.Thread.State: WAITING (parking)
>     at jdk.internal.misc.Unsafe.park(java.base@10.0.2/Native Method)
>     - parking to wait for  <0x00000006aeb21a00> (a 
> java.util.concurrent.CountDownLatch$Sync)
>     at java.util.concurrent.locks.LockSupport.park(java.base@10.0.2/Unknown 
> Source)
>     at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@10.0.2/Unknown
>  Source)
>     at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(java.base@10.0.2/Unknown
>  Source)
>     at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@10.0.2/Unknown
>  Source)
>     at java.util.concurrent.CountDownLatch.await(java.base@10.0.2/Unknown 
> Source)
>     at 
> org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:76)
>     at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:693)
>     at 
> org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:1066)
>     at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:259)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:520)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:470)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:458)
>     at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:286)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:412)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1057)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:911)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
> {code}
> And even after the broker went back to normal, producers would still be 
> blocked. One suspicion is that when broker's not able to handle the request 
> in time, the responses are dropped somehow inside the Sender, and hence 
> whoever waiting on this response would be blocked forever.
> We've observed such scenarios when 1) broker's transiently failed for a 
> while, 2) network partitioned transiently, and 3) broker's bad config like 
> ACL caused it to not be able to handle requests for a while.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to