[jira] [Commented] (KAFKA-1788) producer record can stay in RecordAccumulator forever if leader is no available
[ https://issues.apache.org/jira/browse/KAFKA-1788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14626707#comment-14626707 ] Jiangjie Qin commented on KAFKA-1788: - [~parth.brahmbhatt], yes, we can resolve this ticket now. I will submit another patch for KAFKA-2142 after KIP-19 is done if necessary. But hopefully we can solve everything in KIP-19. > producer record can stay in RecordAccumulator forever if leader is no > available > --- > > Key: KAFKA-1788 > URL: https://issues.apache.org/jira/browse/KAFKA-1788 > Project: Kafka > Issue Type: Bug > Components: core, producer >Affects Versions: 0.8.2.0 >Reporter: Jun Rao >Assignee: Parth Brahmbhatt > Labels: newbie++ > Fix For: 0.8.3 > > Attachments: KAFKA-1788.patch, KAFKA-1788_2015-01-06_13:42:37.patch, > KAFKA-1788_2015-01-06_13:44:41.patch > > > In the new producer, when a partition has no leader for a long time (e.g., > all replicas are down), the records for that partition will stay in the > RecordAccumulator until the leader is available. This may cause the > bufferpool to be full and the callback for the produced message to block for > a long time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1788) producer record can stay in RecordAccumulator forever if leader is no available
[ https://issues.apache.org/jira/browse/KAFKA-1788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14624933#comment-14624933 ] Mayuresh Gharat commented on KAFKA-1788: Hi [~parth.brahmbhatt], this is been handled as a part of KIP-19. Jira : https://issues.apache.org/jira/browse/KAFKA-2120 Thanks, Mayuresh > producer record can stay in RecordAccumulator forever if leader is no > available > --- > > Key: KAFKA-1788 > URL: https://issues.apache.org/jira/browse/KAFKA-1788 > Project: Kafka > Issue Type: Bug > Components: core, producer >Affects Versions: 0.8.2.0 >Reporter: Jun Rao >Assignee: Parth Brahmbhatt > Labels: newbie++ > Fix For: 0.8.3 > > Attachments: KAFKA-1788.patch, KAFKA-1788_2015-01-06_13:42:37.patch, > KAFKA-1788_2015-01-06_13:44:41.patch > > > In the new producer, when a partition has no leader for a long time (e.g., > all replicas are down), the records for that partition will stay in the > RecordAccumulator until the leader is available. This may cause the > bufferpool to be full and the callback for the produced message to block for > a long time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1788) producer record can stay in RecordAccumulator forever if leader is no available
[ https://issues.apache.org/jira/browse/KAFKA-1788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14624873#comment-14624873 ] Parth Brahmbhatt commented on KAFKA-1788: - [~becket_qin] So is this jira irrelavant at this point? If yes can I resolve it? If no, can you describe what needs to be done? I know you had a KIP and multiple discussions but I am not sure if you are taking of it as part of KAFKA-2142. I will be happy to continue working on this jira if you can describe what needs to be done. > producer record can stay in RecordAccumulator forever if leader is no > available > --- > > Key: KAFKA-1788 > URL: https://issues.apache.org/jira/browse/KAFKA-1788 > Project: Kafka > Issue Type: Bug > Components: core, producer >Affects Versions: 0.8.2.0 >Reporter: Jun Rao >Assignee: Parth Brahmbhatt > Labels: newbie++ > Fix For: 0.8.3 > > Attachments: KAFKA-1788.patch, KAFKA-1788_2015-01-06_13:42:37.patch, > KAFKA-1788_2015-01-06_13:44:41.patch > > > In the new producer, when a partition has no leader for a long time (e.g., > all replicas are down), the records for that partition will stay in the > RecordAccumulator until the leader is available. This may cause the > bufferpool to be full and the callback for the produced message to block for > a long time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1788) producer record can stay in RecordAccumulator forever if leader is no available
[ https://issues.apache.org/jira/browse/KAFKA-1788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14525421#comment-14525421 ] Jiangjie Qin commented on KAFKA-1788: - I took a shot to incorporate the solution to this problem in KAFKA-2142. The approach I took there is to just use metadata timeout instead of add a new timeout. Because I think this is essentially metadata not available. So we should treat it the same as in send(). This also saves us another timeout configuration. [~ewencp] My concern about having cap on the buffer for each topic-partition is that what if the traffic of each topic-partition is not balanced. If so we might end up waiting on a busy topic-partition's buffer allocation while we actually have plenty of memory to use. That could hurt the performance a lot. > producer record can stay in RecordAccumulator forever if leader is no > available > --- > > Key: KAFKA-1788 > URL: https://issues.apache.org/jira/browse/KAFKA-1788 > Project: Kafka > Issue Type: Bug > Components: core, producer >Affects Versions: 0.8.2.0 >Reporter: Jun Rao >Assignee: Parth Brahmbhatt > Labels: newbie++ > Fix For: 0.8.3 > > Attachments: KAFKA-1788.patch, KAFKA-1788_2015-01-06_13:42:37.patch, > KAFKA-1788_2015-01-06_13:44:41.patch > > > In the new producer, when a partition has no leader for a long time (e.g., > all replicas are down), the records for that partition will stay in the > RecordAccumulator until the leader is available. This may cause the > bufferpool to be full and the callback for the produced message to block for > a long time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1788) producer record can stay in RecordAccumulator forever if leader is no available
[ https://issues.apache.org/jira/browse/KAFKA-1788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14333041#comment-14333041 ] Ewen Cheslack-Postava commented on KAFKA-1788: -- Ok, I'll try to clear up a few issues. I think that just making sure we make NetworkClient.leastLoadedNode eventually returns all nodes isn't sufficient. I was just raising another case where this issue could occur. The reason this isn't sufficient for the original case is due to the type of situation [~Bmis13] raises. If you have a temporary network outage to a single broker (e.g. due to firewall misconfiguration or just a network partition issues), it may still correctly be listed as leader. If holding the data in the RecordAccumulator only affected data sent to that one broker, then as [~jkreps] points out, we could potentially get away with just holding on to the messages indefinitely since errors should manifest in other ways. (I think it's *better* to have the timeouts, but not strictly necessary). However, since the RecordAccumulator is a shared resource, holding onto these messages also means you're going to block sending data to other brokers once your buffer fills up with data for the unreachable broker. Adding timeouts at least ensures messages for these other brokers will eventually get a chance to send data, even if there are periods where they are automatically rejected because the buffer is already full. So [~parth.brahmbhatt], I think the approach you're trying to take in the patch is definitely the right thing to do, and I agree with [~Bmis13] that the error record metrics definitely should (eventually) be increasing. More generally -- yes, pretty much everything that could potentially block things up for a long time/indefinitely *should* have a timeout. And in a lot of cases this is true even if the operation will eventually timeout "naturally", e.g. due to a TCP timeout. It's better to have control over the timeout (even if we highly recommend using the default values) than rely on settings from other systems, especially when they may be adjusted in unexpected ways outside of our control. This is a pervasive concern that we should keep an eye out for with new code, and try to file JIRAs for as we find missing timeouts in existing code. Given the above, I think the options for controlling memory usage may not be very good for some use cases -- we've been saying people should use a single producer where possible since it's very fast and you actually benefit from sharing the network thread since you can collect all data for all topic-partitions destined for the same broker into a single request. But it turns out that sharing the underlying resources (the buffer) can lead to starvation for some topic-partitions when it shouldn't really be necessary. Would it make sense to allow a per-topic, or even per-partition limit on memory usage? So the effect would be similar to fetch.message.max.bytes for the consumer, where your actual memory usage cap is a n times the value, where n is the number of topic-partitions you're working with? It could also be by broker, but I think that leads to much less intuitive and harder to predict behavior. If people think that's a good idea we can file an additional issue for that. > producer record can stay in RecordAccumulator forever if leader is no > available > --- > > Key: KAFKA-1788 > URL: https://issues.apache.org/jira/browse/KAFKA-1788 > Project: Kafka > Issue Type: Bug > Components: core, producer >Affects Versions: 0.8.2.0 >Reporter: Jun Rao >Assignee: Jun Rao > Labels: newbie++ > Fix For: 0.8.3 > > Attachments: KAFKA-1788.patch, KAFKA-1788_2015-01-06_13:42:37.patch, > KAFKA-1788_2015-01-06_13:44:41.patch > > > In the new producer, when a partition has no leader for a long time (e.g., > all replicas are down), the records for that partition will stay in the > RecordAccumulator until the leader is available. This may cause the > bufferpool to be full and the callback for the produced message to block for > a long time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1788) producer record can stay in RecordAccumulator forever if leader is no available
[ https://issues.apache.org/jira/browse/KAFKA-1788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14326657#comment-14326657 ] Parth Brahmbhatt commented on KAFKA-1788: - Can someone from the kafka team respond to the questions asked above. > producer record can stay in RecordAccumulator forever if leader is no > available > --- > > Key: KAFKA-1788 > URL: https://issues.apache.org/jira/browse/KAFKA-1788 > Project: Kafka > Issue Type: Bug > Components: core, producer >Affects Versions: 0.8.2.0 >Reporter: Jun Rao >Assignee: Jun Rao > Labels: newbie++ > Fix For: 0.8.3 > > Attachments: KAFKA-1788.patch, KAFKA-1788_2015-01-06_13:42:37.patch, > KAFKA-1788_2015-01-06_13:44:41.patch > > > In the new producer, when a partition has no leader for a long time (e.g., > all replicas are down), the records for that partition will stay in the > RecordAccumulator until the leader is available. This may cause the > bufferpool to be full and the callback for the produced message to block for > a long time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1788) producer record can stay in RecordAccumulator forever if leader is no available
[ https://issues.apache.org/jira/browse/KAFKA-1788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14266541#comment-14266541 ] Parth Brahmbhatt commented on KAFKA-1788: - Updated reviewboard https://reviews.apache.org/r/29379/diff/ against branch origin/trunk > producer record can stay in RecordAccumulator forever if leader is no > available > --- > > Key: KAFKA-1788 > URL: https://issues.apache.org/jira/browse/KAFKA-1788 > Project: Kafka > Issue Type: Bug > Components: core, producer >Affects Versions: 0.8.2 >Reporter: Jun Rao >Assignee: Jun Rao > Labels: newbie++ > Fix For: 0.8.3 > > Attachments: KAFKA-1788.patch, KAFKA-1788_2015-01-06_13:42:37.patch, > KAFKA-1788_2015-01-06_13:44:41.patch > > > In the new producer, when a partition has no leader for a long time (e.g., > all replicas are down), the records for that partition will stay in the > RecordAccumulator until the leader is available. This may cause the > bufferpool to be full and the callback for the produced message to block for > a long time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1788) producer record can stay in RecordAccumulator forever if leader is no available
[ https://issues.apache.org/jira/browse/KAFKA-1788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14266538#comment-14266538 ] Parth Brahmbhatt commented on KAFKA-1788: - Updated reviewboard https://reviews.apache.org/r/29379/diff/ against branch origin/trunk > producer record can stay in RecordAccumulator forever if leader is no > available > --- > > Key: KAFKA-1788 > URL: https://issues.apache.org/jira/browse/KAFKA-1788 > Project: Kafka > Issue Type: Bug > Components: core, producer >Affects Versions: 0.8.2 >Reporter: Jun Rao >Assignee: Jun Rao > Labels: newbie++ > Fix For: 0.8.3 > > Attachments: KAFKA-1788.patch, KAFKA-1788_2015-01-06_13:42:37.patch > > > In the new producer, when a partition has no leader for a long time (e.g., > all replicas are down), the records for that partition will stay in the > RecordAccumulator until the leader is available. This may cause the > bufferpool to be full and the callback for the produced message to block for > a long time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1788) producer record can stay in RecordAccumulator forever if leader is no available
[ https://issues.apache.org/jira/browse/KAFKA-1788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14261356#comment-14261356 ] Bhavesh Mistry commented on KAFKA-1788: --- [~parth.brahmbhatt], The close() (#2) issue will be address by the jira ticket https://issues.apache.org/jira/browse/KAFKA-1788, but still following will not be address when leader is available, but no records can be sent. [~junrao], [~jkreps] and [~nehanarkhede], please let me know correct behavior in case of above use case. should I file another issue ? 1) record-error-rate metric remain zero despite firewall rule. In my opinion, it should have called org.apache.kafka.clients.producer.Callback but I did not see that happening either one or two are brokers not reachable. Thanks, Bhavesh > producer record can stay in RecordAccumulator forever if leader is no > available > --- > > Key: KAFKA-1788 > URL: https://issues.apache.org/jira/browse/KAFKA-1788 > Project: Kafka > Issue Type: Bug > Components: core, producer >Affects Versions: 0.8.2 >Reporter: Jun Rao >Assignee: Jun Rao > Labels: newbie++ > Fix For: 0.8.3 > > Attachments: KAFKA-1788.patch > > > In the new producer, when a partition has no leader for a long time (e.g., > all replicas are down), the records for that partition will stay in the > RecordAccumulator until the leader is available. This may cause the > bufferpool to be full and the callback for the produced message to block for > a long time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1788) producer record can stay in RecordAccumulator forever if leader is no available
[ https://issues.apache.org/jira/browse/KAFKA-1788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14259275#comment-14259275 ] Bhavesh Mistry commented on KAFKA-1788: --- [~junrao], Please let me know your opinion on behavior of new producer in above case. Also, issue 1 and 2 reported. Also, how should the expiration behave respect to ack setting and reporting error back to CallBack Handler. Thanks, Bhavesh > producer record can stay in RecordAccumulator forever if leader is no > available > --- > > Key: KAFKA-1788 > URL: https://issues.apache.org/jira/browse/KAFKA-1788 > Project: Kafka > Issue Type: Bug > Components: core, producer >Affects Versions: 0.8.2 >Reporter: Jun Rao >Assignee: Jun Rao > Labels: newbie++ > Fix For: 0.8.3 > > Attachments: KAFKA-1788.patch > > > In the new producer, when a partition has no leader for a long time (e.g., > all replicas are down), the records for that partition will stay in the > RecordAccumulator until the leader is available. This may cause the > bufferpool to be full and the callback for the produced message to block for > a long time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1788) producer record can stay in RecordAccumulator forever if leader is no available
[ https://issues.apache.org/jira/browse/KAFKA-1788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14259235#comment-14259235 ] Bhavesh Mistry commented on KAFKA-1788: --- The use case I have is little different but similar: Here is my use case: 1) Lets suppose we have 3 brokers (b1,b2, b3) and a topic with 30 partitions and replication 1. So partition 1 to 10 is on b1 (is leader), partition 11 to 20 on b2 and 21 to 30 is on b3. Zk has all leadership info and every thing is fine. 2) From the Client every is working fine, but only b1 broker is not reachable (due to network or firewall issue) and note that leader is still reported as b1. 3) The patch you have provided will not address the above issue where you detect that leader is not available and then you purge batch. So case is little different, leader is available but not able to connect or firewall rule in in-place. Based on above use case, I see following two problems which I have reported on Please refer to KAFKA-1642 for more details. 1) record-error-rate metric remain zero despite following firewall rule. In my opinion, it should have called org.apache.kafka.clients.producer.Callback but I did not see that happening either one or two are brokers not reachable. {code} sudo ipfw add reject tcp from me to b1.ip dst-port 9092 sudo ipfw add reject tcp from me to b2.ip dst-port 9092 00100 reject tcp from me to b1.ip dst-port 9092 00200 reject tcp from me to b2.ip dst-port 9092 {code} {code} class LoggingCallBackHandler implements Callback { /** * A callback method the user can implement to provide asynchronous * handling of request completion. This method will be called when the * record sent to the server has been acknowledged. Exactly one of the * arguments will be non-null. * * @param metadata *The metadata for the record that was sent (i.e. the *partition and offset). Null if an error occurred. * @param exception *The exception thrown during processing of this record. *Null if no error occurred. */ @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if(exception != null){ exception.printStackTrace(); } } } {code} I do not see any exception at all on consolenot sure why ? 2) Application does NOT gracefully shutdown when there one or more brokers are not reachable. {code} "SIGTERM handler" daemon prio=5 tid=0x7f8bd79e4000 nid=0x17907 waiting for monitor entry [0x00011e906000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock <0x00070008f7c0> (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) "SIGTERM handler" daemon prio=5 tid=0x7f8bd5159000 nid=0x1cb0b waiting for monitor entry [0x00011e803000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock <0x00070008f7c0> (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) "SIGTERM handler" daemon prio=5 tid=0x7f8bdd147800 nid=0x15d0b waiting for monitor entry [0x00011e30a000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock <0x00070008f7c0> (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) "SIGTERM handler" daemon prio=5 tid=0x7f8bdf82 nid=0x770b waiting for monitor entry [0x00011e207000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock <0x00070008f7c0> (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) "SIGTERM handler" daemon prio=5 tid=0x7f8bdc393800 nid=0x1c30f waiting for monitor entry [0x00011e104000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock
[jira] [Commented] (KAFKA-1788) producer record can stay in RecordAccumulator forever if leader is no available
[ https://issues.apache.org/jira/browse/KAFKA-1788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14257901#comment-14257901 ] Parth Brahmbhatt commented on KAFKA-1788: - [~Bmis13] The patch I provided solves the issue listed on this jira where the records occupy memory space forever if no leader is available. Do you want to open a separate jira for the other issues that you are describing? if you think the problem is related to the theme of this jira can you elaborate how? > producer record can stay in RecordAccumulator forever if leader is no > available > --- > > Key: KAFKA-1788 > URL: https://issues.apache.org/jira/browse/KAFKA-1788 > Project: Kafka > Issue Type: Bug > Components: core, producer >Affects Versions: 0.8.2 >Reporter: Jun Rao >Assignee: Jun Rao > Labels: newbie++ > Fix For: 0.8.3 > > Attachments: KAFKA-1788.patch > > > In the new producer, when a partition has no leader for a long time (e.g., > all replicas are down), the records for that partition will stay in the > RecordAccumulator until the leader is available. This may cause the > bufferpool to be full and the callback for the produced message to block for > a long time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1788) producer record can stay in RecordAccumulator forever if leader is no available
[ https://issues.apache.org/jira/browse/KAFKA-1788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14257691#comment-14257691 ] Bhavesh Mistry commented on KAFKA-1788: --- HI All, I did NOT try this patch, but when one or two or all brokers are down then I see application will not shutdown due to close() method: Application does not gracefully shutdown when there one or more brokers are down. (io Thread never exits this is know issue ) {code} "SIGTERM handler" daemon prio=5 tid=0x7f8bd79e4000 nid=0x17907 waiting for monitor entry [0x00011e906000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock <0x00070008f7c0> (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) "SIGTERM handler" daemon prio=5 tid=0x7f8bd5159000 nid=0x1cb0b waiting for monitor entry [0x00011e803000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock <0x00070008f7c0> (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) "SIGTERM handler" daemon prio=5 tid=0x7f8bdd147800 nid=0x15d0b waiting for monitor entry [0x00011e30a000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock <0x00070008f7c0> (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) "SIGTERM handler" daemon prio=5 tid=0x7f8bdf82 nid=0x770b waiting for monitor entry [0x00011e207000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock <0x00070008f7c0> (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) "SIGTERM handler" daemon prio=5 tid=0x7f8bdc393800 nid=0x1c30f waiting for monitor entry [0x00011e104000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock <0x00070008f7c0> (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) "Thread-4" prio=5 tid=0x7f8bdb39f000 nid=0xa107 in Object.wait() [0x00011ea89000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.$$YJP$$wait(Native Method) at java.lang.Object.wait(Object.java) at java.lang.Thread.join(Thread.java:1280) - locked <0x000705c2f650> (a org.apache.kafka.common.utils.KafkaThread) at java.lang.Thread.join(Thread.java:1354) at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:322) at "kafka-producer-network-thread | error" daemon prio=5 tid=0x7f8bd814e000 nid=0x7403 runnable [0x00011e6c] java.lang.Thread.State: RUNNABLE at sun.nio.ch.KQueueArrayWrapper.$$YJP$$kevent0(Native Method) at sun.nio.ch.KQueueArrayWrapper.kevent0(KQueueArrayWrapper.java) at sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:200) at sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:103) at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87) - locked <0x000705c109f8> (a sun.nio.ch.Util$2) - locked <0x000705c109e8> (a java.util.Collections$UnmodifiableSet) - locked <0x000705c105c8> (a sun.nio.ch.KQueueSelectorImpl) at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98) at org.apache.kafka.common.network.Selector.select(Selector.java:322) at org.apache.kafka.common.network.Selector.poll(Selector.java:212) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:184) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128) at java.lang.Thread.run(Thread.java:744) {code} > producer record can stay in RecordAccumulator forever if leader is no > available > --- > > Key: KAFKA-1788 > URL: https://issues.apache.org/jira/browse/KAFKA-1788 >
[jira] [Commented] (KAFKA-1788) producer record can stay in RecordAccumulator forever if leader is no available
[ https://issues.apache.org/jira/browse/KAFKA-1788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14257493#comment-14257493 ] Parth Brahmbhatt commented on KAFKA-1788: - Created reviewboard https://reviews.apache.org/r/29379/diff/ against branch trunk > producer record can stay in RecordAccumulator forever if leader is no > available > --- > > Key: KAFKA-1788 > URL: https://issues.apache.org/jira/browse/KAFKA-1788 > Project: Kafka > Issue Type: Bug > Components: core, producer >Affects Versions: 0.8.2 >Reporter: Jun Rao >Assignee: Jun Rao > Labels: newbie++ > Fix For: 0.8.3 > > Attachments: KAFKA-1788.patch > > > In the new producer, when a partition has no leader for a long time (e.g., > all replicas are down), the records for that partition will stay in the > RecordAccumulator until the leader is available. This may cause the > bufferpool to be full and the callback for the produced message to block for > a long time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1788) producer record can stay in RecordAccumulator forever if leader is no available
[ https://issues.apache.org/jira/browse/KAFKA-1788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14256255#comment-14256255 ] Parth Brahmbhatt commented on KAFKA-1788: - [~nehanarkhede] [~junrao] Can you provide input on what you think needs to be done here. There are 2 problems being discussed: * No leader is actually available for a long time, which is the original issue in this jira. This is the case where all replicas are in single DC/AZ and DC/AZ faces outage. In this case the record stays in RecordAccumulator forever as no node is ever ready, so no retries are ever attempted and as the max retries are not exhausted this batch is never dropped. The only way I see to solve this is by adding an expiry on batches and perform a cleanup on expired batches. * stale metadata because NetworkClient.leastLoadedNode() returns a bad node and keeps retrying against a bad node. unless I am missing something here, I think this just indicates bad configuration, we could reduce default TCP connection-socket/read timeout so we can fail fast but I am not entirely sure if we need to do anything in code to handle this case. The method already goes through all the nodes in the bootstrap list as leastLoadedNode() starts off with this.metadata.fetch().nodes() and tries to find a good node with fewest outstanding request. > producer record can stay in RecordAccumulator forever if leader is no > available > --- > > Key: KAFKA-1788 > URL: https://issues.apache.org/jira/browse/KAFKA-1788 > Project: Kafka > Issue Type: Bug > Components: core, producer >Affects Versions: 0.8.2 >Reporter: Jun Rao >Assignee: Jun Rao > Labels: newbie++ > Fix For: 0.8.3 > > > In the new producer, when a partition has no leader for a long time (e.g., > all replicas are down), the records for that partition will stay in the > RecordAccumulator until the leader is available. This may cause the > bufferpool to be full and the callback for the produced message to block for > a long time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1788) producer record can stay in RecordAccumulator forever if leader is no available
[ https://issues.apache.org/jira/browse/KAFKA-1788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14249266#comment-14249266 ] Bhavesh Mistry commented on KAFKA-1788: --- [~jkreps], Can we just take quick look at the NodeConnectionState ? If all registered Nodes are down, then exit it quickly or attempt to connect ? This will have accurate status of al Nodes registered... (may we can do TCP ping for all nodes). I am not sure if producer key is fixed to only one brokers then does it still have all Node status ? Here is reference code: https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/clients/NodeConnectionState.java I did this in experimental path for o KAFKA-1642 (but used hard coded timeout for join method). Thanks, Bhavesh > producer record can stay in RecordAccumulator forever if leader is no > available > --- > > Key: KAFKA-1788 > URL: https://issues.apache.org/jira/browse/KAFKA-1788 > Project: Kafka > Issue Type: Bug > Components: core, producer >Affects Versions: 0.8.2 >Reporter: Jun Rao >Assignee: Jun Rao > Labels: newbie++ > Fix For: 0.8.3 > > > In the new producer, when a partition has no leader for a long time (e.g., > all replicas are down), the records for that partition will stay in the > RecordAccumulator until the leader is available. This may cause the > bufferpool to be full and the callback for the produced message to block for > a long time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1788) producer record can stay in RecordAccumulator forever if leader is no available
[ https://issues.apache.org/jira/browse/KAFKA-1788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14249255#comment-14249255 ] Jay Kreps commented on KAFKA-1788: -- Currently the producer supports either blocking or dropping when it cannot send to the cluster as fast as data is arriving. This could occur because the cluster is down, or just because it isn't fast enough to keep up. Kafka provides high availability for partitions so the case where a partition is permanently unavailable should be rare. Timing out requests might be nice, but it's not 100% clear that is better than the current strategy. The current strategy is just to buffer as long as possible and then either block or drop data when the buffer is exhausted. Arguably dropping when you are out of space is better than dropping after a fixed time (since in any case you have to drop when you are out of space). As Ewen says we can't reset the metadata because the bootstrap servers may no longer exist and if they do they are by definition a subset of the current cluster metadata. I think Ewen solution of just making sure leastLoadedNode eventually tries all nodes is the right way to go. We'll have to be careful, though, as that method is pretty constrained. > producer record can stay in RecordAccumulator forever if leader is no > available > --- > > Key: KAFKA-1788 > URL: https://issues.apache.org/jira/browse/KAFKA-1788 > Project: Kafka > Issue Type: Bug > Components: core, producer >Affects Versions: 0.8.2 >Reporter: Jun Rao >Assignee: Jun Rao > Labels: newbie++ > Fix For: 0.8.3 > > > In the new producer, when a partition has no leader for a long time (e.g., > all replicas are down), the records for that partition will stay in the > RecordAccumulator until the leader is available. This may cause the > bufferpool to be full and the callback for the produced message to block for > a long time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1788) producer record can stay in RecordAccumulator forever if leader is no available
[ https://issues.apache.org/jira/browse/KAFKA-1788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14249131#comment-14249131 ] Ewen Cheslack-Postava commented on KAFKA-1788: -- [~bpot] that sounds right, I'm pretty sure metadata never gets cleared if all brokers become unavailable -- it's only updated when the producer starts and when it gets a metadataResponse message. You can actually get into the state you're talking about for a long time without losing all the brokers. Metadata update requests use NetworkClient.leastLoadedNode to select which node to send the request to, which means requests may repeatedly go to the same node even if its connection isn't getting any data through but the TCP connection hasn't timed out yet. That can result in waiting for many minutes even though the metadata might be retrievable from a different node. But I'm not sure it's really a distinct problem, just another variant -- the batch stays in the RecordAccumulator eating up bufferpool space until there's a network error or response to the request that included the batch. This means any failure to make progress sending data would trigger the same issue. I think a proper fix for this bug would add a timeout for messages as soon as send() is called, and would need to be able to remove them from any point in the pipeline after that timeout, cleaning up any resources they use. The metadata issue is another interesting problem. If you reset the metadata, the current implementation will block on any subsequent send() calls since the first thing send() does is waitOnMetadata(). Arguably, given the interface of send() I'm not sure that blocking that way should ever be allowed, although at least now its restricted to the initial send() call and probably simplifies a bunch of code. Resetting the metadata could also be counterproductive since the set of bootstrap nodes could be smaller than the subset of the cluster you had metadata for. One alternative idea: change the use of leastLoadedNode and after a certain amount of time, allow it to start considering the bootstrap nodes as well as the set currently in the metadata. > producer record can stay in RecordAccumulator forever if leader is no > available > --- > > Key: KAFKA-1788 > URL: https://issues.apache.org/jira/browse/KAFKA-1788 > Project: Kafka > Issue Type: Bug > Components: core, producer >Affects Versions: 0.8.2 >Reporter: Jun Rao >Assignee: Jun Rao > Labels: newbie++ > Fix For: 0.8.3 > > > In the new producer, when a partition has no leader for a long time (e.g., > all replicas are down), the records for that partition will stay in the > RecordAccumulator until the leader is available. This may cause the > bufferpool to be full and the callback for the produced message to block for > a long time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1788) producer record can stay in RecordAccumulator forever if leader is no available
[ https://issues.apache.org/jira/browse/KAFKA-1788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14247766#comment-14247766 ] Bob Potter commented on KAFKA-1788: --- I've been digging into this a little bit and in addition to an individual partition being unavailable there is also a case where all brokers become unavailable and we are unable to refresh metadata. This is distinct case because the producer still thinks it has a leader for the partition (AFAICT, the metadata is never updated). The behavior I have seen is that the producer will continue to accept records for any partition which previously had a leader but the batches will never exit the accumulator. It seems like we could track how long it has been since we've been able to connect to any known brokers and after a certain threshold complete all outstanding record batches with an error and reset the metadata so that new production attempts don't end up in the accumulator. On the other hand, we could just start failing record batches if they have been in the accumulator for too long. That would solve both failure scenarios. Although, it seems like we should be resetting the metadata for an unavailable cluster at some point. > producer record can stay in RecordAccumulator forever if leader is no > available > --- > > Key: KAFKA-1788 > URL: https://issues.apache.org/jira/browse/KAFKA-1788 > Project: Kafka > Issue Type: Bug > Components: core, producer >Affects Versions: 0.8.2 >Reporter: Jun Rao >Assignee: Jun Rao > Labels: newbie++ > Fix For: 0.8.3 > > > In the new producer, when a partition has no leader for a long time (e.g., > all replicas are down), the records for that partition will stay in the > RecordAccumulator until the leader is available. This may cause the > bufferpool to be full and the callback for the produced message to block for > a long time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1788) producer record can stay in RecordAccumulator forever if leader is no available
[ https://issues.apache.org/jira/browse/KAFKA-1788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14234595#comment-14234595 ] Bhavesh Mistry commented on KAFKA-1788: --- We also need to fix the Producer Close which hangs JVM because io.join() thread does not exit. Please refer to KAFKA-1642 for more details. So Kakfa core Dev needs to give guidance on how to solve this problem. Please see below comments from that linked issue. 1) Producer.close() method issue is not address with patch. In event of network connection lost or other events happens, IO thread will not be killed and close method hangs. In patch that I have provided, I had timeout for join method and interrupted IO thread. I think we need similar solution. [~ewencp], 1. I'm specifically trying to address the CPU usage here. I realize from your perspective they are closely related since they're both can be triggered by a loss of network connectivity, but internally they're really separate issues – the CPU usage has to do with incorrect timeouts and the join() issues is due to the lack of timeouts on produce operations. That's why I pointed you toward KAFKA-1788. If a timeout is added for data in the producer, that would resolve the close issue as well since any data waiting in the producer would eventually timeout and the IO thread could exit. I think that's the cleanest solution since it solves both problems with a single setting (the amount of time your willing to wait before discarding data). If you think a separate timeout specifically for Producer.close() is worthwhile I'd suggest filing a separate JIRA for that. > producer record can stay in RecordAccumulator forever if leader is no > available > --- > > Key: KAFKA-1788 > URL: https://issues.apache.org/jira/browse/KAFKA-1788 > Project: Kafka > Issue Type: Bug > Components: core, producer >Affects Versions: 0.8.2 >Reporter: Jun Rao >Assignee: Jun Rao > Labels: newbie++ > Fix For: 0.8.3 > > > In the new producer, when a partition has no leader for a long time (e.g., > all replicas are down), the records for that partition will stay in the > RecordAccumulator until the leader is available. This may cause the > bufferpool to be full and the callback for the produced message to block for > a long time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1788) producer record can stay in RecordAccumulator forever if leader is no available
[ https://issues.apache.org/jira/browse/KAFKA-1788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14232073#comment-14232073 ] Neha Narkhede commented on KAFKA-1788: -- [~parth.brahmbhatt] Please go ahead. > producer record can stay in RecordAccumulator forever if leader is no > available > --- > > Key: KAFKA-1788 > URL: https://issues.apache.org/jira/browse/KAFKA-1788 > Project: Kafka > Issue Type: Bug > Components: core, producer >Affects Versions: 0.8.2 >Reporter: Jun Rao >Assignee: Jun Rao > Labels: newbie++ > Fix For: 0.8.3 > > > In the new producer, when a partition has no leader for a long time (e.g., > all replicas are down), the records for that partition will stay in the > RecordAccumulator until the leader is available. This may cause the > bufferpool to be full and the callback for the produced message to block for > a long time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1788) producer record can stay in RecordAccumulator forever if leader is no available
[ https://issues.apache.org/jira/browse/KAFKA-1788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14232054#comment-14232054 ] Parth Brahmbhatt commented on KAFKA-1788: - If noone else has a plan to work on this, I would like to pick this one up to get familiarize with code base. > producer record can stay in RecordAccumulator forever if leader is no > available > --- > > Key: KAFKA-1788 > URL: https://issues.apache.org/jira/browse/KAFKA-1788 > Project: Kafka > Issue Type: Bug > Components: core, producer >Affects Versions: 0.8.2 >Reporter: Jun Rao >Assignee: Jun Rao > Labels: newbie++ > Fix For: 0.8.3 > > > In the new producer, when a partition has no leader for a long time (e.g., > all replicas are down), the records for that partition will stay in the > RecordAccumulator until the leader is available. This may cause the > bufferpool to be full and the callback for the produced message to block for > a long time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)