[ https://issues.apache.org/jira/browse/KAFKA-10114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17182635#comment-17182635 ]
Tim Fox commented on KAFKA-10114: --------------------------------- Hi Itamar, Can you confirm whether the issue still occurs with the latest version of Kafka? The code in question has been refactored since your report. > Kafka producer stuck after broker crash > --------------------------------------- > > Key: KAFKA-10114 > URL: https://issues.apache.org/jira/browse/KAFKA-10114 > Project: Kafka > Issue Type: Bug > Components: producer > Affects Versions: 2.3.1, 2.4.1 > Reporter: Itamar Benjamin > Priority: Critical > > Today two of our kafka brokers crashed (cluster of 3 brokers), and producers > were not able to send new messages. After brokers started again all producers > resumed sending data except for a single one. > at the beginning producer rejected all new messages with TimeoutException: > > {code:java} > org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for > incoming-mutable-RuntimeIIL-1:120000 ms has passed since batch creation > {code} > > then after sometime exception changed to > > {code:java} > org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory > within the configured max blocking time 60000 ms. > {code} > > > jstack shows kafka-producer-network-thread is waiting to get producer id: > > {code:java} > "kafka-producer-network-thread | producer-1" #767 daemon prio=5 os_prio=0 > cpu=63594017.16ms elapsed=1511219.38s tid=0x00007fffd8353000 nid=0x4fa4 > sleeping [0x00007ff55c177000] > java.lang.Thread.State: TIMED_WAITING (sleeping) > at java.lang.Thread.sleep(java.base@11.0.1/Native Method) > at org.apache.kafka.common.utils.Utils.sleep(Utils.java:296) > at org.apache.kafka.common.utils.SystemTime.sleep(SystemTime.java:41) > at > org.apache.kafka.clients.producer.internals.Sender.maybeWaitForProducerId(Sender.java:565) > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:306) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244) > at java.lang.Thread.run(java.base@11.0.1/Thread.java:834) Locked > ownable synchronizers: > - None > {code} > > digging into maybeWaitForProducerId(), it waits until some broker is ready > (awaitNodeReady function) which in return calls leastLoadedNode() on > NetworkClient. This one iterates over all brokers and checks if a request can > be sent to it using canSendRequest(). > This is the code for canSendRequest(): > > {code:java} > return connectionStates.isReady(node, now) && selector.isChannelReady(node) > && inFlightRequests.canSendMore(node) > {code} > > > using some debugging tools i saw this expression always evaluates to false > since the last part (canSendMore) is false. > > This is the code for canSendMore: > {code:java} > public boolean canSendMore(String node) { > Deque<NetworkClient.InFlightRequest> queue = requests.get(node); return queue > == null || queue.isEmpty() || (queue.peekFirst().send.completed() && > queue.size() < this.maxInFlightRequestsPerConnection); } > {code} > > > i verified > {code:java} > queue.peekFirst().send.completed() > {code} > is true, and that leads to the live lock - since requests queues are full for > all nodes a new request to check broker availability and reconnect to it > cannot be submitted. > -- This message was sent by Atlassian Jira (v8.3.4#803005)