[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14257805#comment-14257805 ] Bhavesh Mistry edited comment on KAFKA-1642 at 12/24/14 7:07 PM: - [~ewencp], Thanks for patch. You may close this issue. The only thing, I have not tested the rare case where a single broker is out of File Descriptor and under heavy load on producer will request more connections to same broker. According to code, it will mark the Node State to disconnect and I am not sure if data will be sent via already live connection. Another comment is that there is no WARN or ERROR message logged when connection fails. Can we please change the log level for following code to WAR, because in production environment people set LOG LEVEL to WARN or ERROR. So there will no visibility if there is connection issue. {code} /** * Initiate a connection to the given node */ private void initiateConnect(Node node, long now) { try { log.debug(Initiating connection to node {} at {}:{}., node.id(), node.host(), node.port()); this.connectionStates.connecting(node.id(), now); selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer); } catch (IOException e) { /* attempt failed, we'll try again after the backoff */ connectionStates.disconnected(node.id()); /* maybe the problem is our metadata, update it */ metadata.requestUpdate(); log.debug(Error connecting to node {} at {}:{}:, node.id(), node.host(), node.port(), e); } } {code} Thanks for all your help ! Thanks, Bhavesh was (Author: bmis13): [~ewencp], Thanks for patch. You may close this issue. The only thing, I have not tested the rare case where a single broker is out of File Descriptor and under heavy load on producer will request more connections to same broker. According to code, it will mark the Node State to disconnect and I am not sure if data will be sent via already live connection. Thanks for all your help ! Thanks, Bhavesh [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Priority: Blocker Fix For: 0.8.2 Attachments: 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, KAFKA-1642.patch, KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, KAFKA-1642_2014-10-23_16:19:41.patch I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14257686#comment-14257686 ] Bhavesh Mistry edited comment on KAFKA-1642 at 12/23/14 11:36 PM: -- [~ewencp], Patch indeed solve the high CPU Problem reported by this bug. I have tested all brokers down, one broker down and two broker down (except for last use cases where one of the brokers runs out of Socket File Descriptor a rear case) : Here are some interesting Observations from YourKit: 0) Overall, patch has also brought down overall consumption in Normal Healthy or Happy case where every thing is up and running. In old code (without patch), I use to see about 10% of overall CPU used by process by io threads (4 in my case), it has reduce to 5% or less now with path. 1) When two brokers are down, then occasionally I see IO thread blocked. ( I did not see this when one brokers is down) {code} kafka-producer-network-thread | rawlog [BLOCKED] [DAEMON] org.apache.kafka.clients.producer.internals.Metadata.fetch() Metadata.java:70 java.lang.Thread.run() Thread.java:744 {code} 2) record-error-rate metric remain zero despite following firewall rule. In my opinion, it should have called org.apache.kafka.clients.producer.Callback but I did not see that happening either in either one or two brokers down. Should I file another issue for this ? Please confirm. {code} 00100 reject tcp from me to b1.ip dst-port 9092 00200 reject tcp from me to b2.ip dst-port 9092 {code} {code} class LoggingCallBaHandler implements Callback { /** * A callback method the user can implement to provide asynchronous * handling of request completion. This method will be called when the * record sent to the server has been acknowledged. Exactly one of the * arguments will be non-null. * * @param metadata *The metadata for the record that was sent (i.e. the *partition and offset). Null if an error occurred. * @param exception *The exception thrown during processing of this record. *Null if no error occurred. */ @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if(exception != null){ exception.printStackTrace(); } } } {code} I do not see any exception at all on consolenot sure why ? 3) Application does NOT gracefully shutdown when there one or more brokers are down. (io Thread never exits this is know issue ) {code} SIGTERM handler daemon prio=5 tid=0x7f8bd79e4000 nid=0x17907 waiting for monitor entry [0x00011e906000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock 0x00070008f7c0 (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) SIGTERM handler daemon prio=5 tid=0x7f8bd5159000 nid=0x1cb0b waiting for monitor entry [0x00011e803000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock 0x00070008f7c0 (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) SIGTERM handler daemon prio=5 tid=0x7f8bdd147800 nid=0x15d0b waiting for monitor entry [0x00011e30a000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock 0x00070008f7c0 (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) SIGTERM handler daemon prio=5 tid=0x7f8bdf82 nid=0x770b waiting for monitor entry [0x00011e207000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock 0x00070008f7c0 (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) SIGTERM handler daemon prio=5 tid=0x7f8bdc393800 nid=0x1c30f waiting for monitor entry [0x00011e104000] java.lang.Thread.State: BLOCKED (on object monitor)
[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14257686#comment-14257686 ] Bhavesh Mistry edited comment on KAFKA-1642 at 12/23/14 11:39 PM: -- [~ewencp], Patch indeed solve the high CPU Problem reported by this bug. I have tested all brokers down, one broker down and two broker down (except for last use cases where one of the brokers runs out of Socket File Descriptor a rear case) : I am sorry for last response, I got busy with other stuff so testing got delayed. Here are some interesting Observations from YourKit: 0) Overall, patch has also brought down overall consumption in Normal Healthy or Happy case where every thing is up and running. In old code (without patch), I use to see about 10% of overall CPU used by process by io threads (4 in my case), it has reduce to 5% or less now with path. 1) When two brokers are down, then occasionally I see IO thread blocked. ( I did not see this when one brokers is down) {code} kafka-producer-network-thread | rawlog [BLOCKED] [DAEMON] org.apache.kafka.clients.producer.internals.Metadata.fetch() Metadata.java:70 java.lang.Thread.run() Thread.java:744 {code} 2) record-error-rate metric remain zero despite following firewall rule. In my opinion, it should have called org.apache.kafka.clients.producer.Callback but I did not see that happening either in either one or two brokers down. Should I file another issue for this ? Please confirm. {code} 00100 reject tcp from me to b1.ip dst-port 9092 00200 reject tcp from me to b2.ip dst-port 9092 {code} {code} class LoggingCallBaHandler implements Callback { /** * A callback method the user can implement to provide asynchronous * handling of request completion. This method will be called when the * record sent to the server has been acknowledged. Exactly one of the * arguments will be non-null. * * @param metadata *The metadata for the record that was sent (i.e. the *partition and offset). Null if an error occurred. * @param exception *The exception thrown during processing of this record. *Null if no error occurred. */ @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if(exception != null){ exception.printStackTrace(); } } } {code} I do not see any exception at all on consolenot sure why ? 3) Application does NOT gracefully shutdown when there one or more brokers are down. (io Thread never exits this is know issue ) {code} SIGTERM handler daemon prio=5 tid=0x7f8bd79e4000 nid=0x17907 waiting for monitor entry [0x00011e906000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock 0x00070008f7c0 (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) SIGTERM handler daemon prio=5 tid=0x7f8bd5159000 nid=0x1cb0b waiting for monitor entry [0x00011e803000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock 0x00070008f7c0 (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) SIGTERM handler daemon prio=5 tid=0x7f8bdd147800 nid=0x15d0b waiting for monitor entry [0x00011e30a000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock 0x00070008f7c0 (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) SIGTERM handler daemon prio=5 tid=0x7f8bdf82 nid=0x770b waiting for monitor entry [0x00011e207000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock 0x00070008f7c0 (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) SIGTERM handler daemon prio=5 tid=0x7f8bdc393800 nid=0x1c30f waiting for monitor entry
[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14257686#comment-14257686 ] Bhavesh Mistry edited comment on KAFKA-1642 at 12/23/14 11:41 PM: -- [~ewencp], Patch indeed solve the high CPU Problem reported by this bug. I have tested all brokers down, one broker down and two broker down (except for last use cases where one of the brokers runs out of Socket File Descriptor a rear case) : I am sorry for last response, I got busy with other stuff so testing got delayed. Here are some interesting Observations from YourKit: 0) Overall, patch has also brought down overall consumption in Normal Healthy or Happy case where every thing is up and running. In old code (without patch), I use to see about 10% of overall CPU used by process by io threads (4 in my case), it has reduce to 5% or less now with path. 1) When two brokers are down, then occasionally I see IO thread blocked. ( I did not see this when one brokers is down) {code} kafka-producer-network-thread | rawlog [BLOCKED] [DAEMON] org.apache.kafka.clients.producer.internals.Metadata.fetch() Metadata.java:70 java.lang.Thread.run() Thread.java:744 {code} 2) record-error-rate metric remain zero despite following firewall rule. In my opinion, it should have called org.apache.kafka.clients.producer.Callback but I did not see that happening either in either one or two brokers down. Should I file another issue for this ? Please confirm. {code} 00100 reject tcp from me to b1.ip dst-port 9092 00200 reject tcp from me to b2.ip dst-port 9092 {code} {code} class LoggingCallBaHandler implements Callback { /** * A callback method the user can implement to provide asynchronous * handling of request completion. This method will be called when the * record sent to the server has been acknowledged. Exactly one of the * arguments will be non-null. * * @param metadata *The metadata for the record that was sent (i.e. the *partition and offset). Null if an error occurred. * @param exception *The exception thrown during processing of this record. *Null if no error occurred. */ @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if(exception != null){ exception.printStackTrace(); } } } {code} I do not see any exception at all on consolenot sure why ? 3) Application does NOT gracefully shutdown when there one or more brokers are down. (io Thread never exits this is know issue ) {code} SIGTERM handler daemon prio=5 tid=0x7f8bd79e4000 nid=0x17907 waiting for monitor entry [0x00011e906000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock 0x00070008f7c0 (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) SIGTERM handler daemon prio=5 tid=0x7f8bd5159000 nid=0x1cb0b waiting for monitor entry [0x00011e803000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock 0x00070008f7c0 (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) SIGTERM handler daemon prio=5 tid=0x7f8bdd147800 nid=0x15d0b waiting for monitor entry [0x00011e30a000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock 0x00070008f7c0 (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) SIGTERM handler daemon prio=5 tid=0x7f8bdf82 nid=0x770b waiting for monitor entry [0x00011e207000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock 0x00070008f7c0 (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) SIGTERM handler daemon prio=5 tid=0x7f8bdc393800 nid=0x1c30f waiting for monitor entry
[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14257686#comment-14257686 ] Bhavesh Mistry edited comment on KAFKA-1642 at 12/24/14 12:01 AM: -- [~ewencp], Patch indeed solve the high CPU Problem reported by this bug. I have tested all brokers down, one broker down and two broker down (except for last use cases where one of the brokers runs out of Socket File Descriptor a rear case) : I am sorry for last response, I got busy with other stuff so testing got delayed. Here are some interesting Observations from YourKit: 0) Overall, patch has also brought down overall consumption in Normal Healthy or Happy case where every thing is up and running. In old code (without patch), I use to see about 10% of overall CPU used by process by io threads (4 in my case), it has reduce to 5% or less now with path. 1) When two brokers are down, then occasionally I see IO thread blocked. ( I did not see this when one brokers is down) {code} kafka-producer-network-thread | rawlog [BLOCKED] [DAEMON] org.apache.kafka.clients.producer.internals.Metadata.fetch() Metadata.java:70 java.lang.Thread.run() Thread.java:744 {code} 2) record-error-rate metric remain zero despite following firewall rule. In my opinion, it should have called org.apache.kafka.clients.producer.Callback but I did not see that happening either in either one or two brokers down. Should I file another issue for this ? Please confirm. {code} sudo ipfw add reject tcp from me to b1.ip dst-port 9092 sudo ipfw add reject tcp from me to b2.ip dst-port 9092 00100 reject tcp from me to b1.ip dst-port 9092 00200 reject tcp from me to b2.ip dst-port 9092 {code} {code} class LoggingCallBaHandler implements Callback { /** * A callback method the user can implement to provide asynchronous * handling of request completion. This method will be called when the * record sent to the server has been acknowledged. Exactly one of the * arguments will be non-null. * * @param metadata *The metadata for the record that was sent (i.e. the *partition and offset). Null if an error occurred. * @param exception *The exception thrown during processing of this record. *Null if no error occurred. */ @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if(exception != null){ exception.printStackTrace(); } } } {code} I do not see any exception at all on consolenot sure why ? 3) Application does NOT gracefully shutdown when there one or more brokers are down. (io Thread never exits this is know issue ) {code} SIGTERM handler daemon prio=5 tid=0x7f8bd79e4000 nid=0x17907 waiting for monitor entry [0x00011e906000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock 0x00070008f7c0 (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) SIGTERM handler daemon prio=5 tid=0x7f8bd5159000 nid=0x1cb0b waiting for monitor entry [0x00011e803000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock 0x00070008f7c0 (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) SIGTERM handler daemon prio=5 tid=0x7f8bdd147800 nid=0x15d0b waiting for monitor entry [0x00011e30a000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock 0x00070008f7c0 (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) SIGTERM handler daemon prio=5 tid=0x7f8bdf82 nid=0x770b waiting for monitor entry [0x00011e207000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock 0x00070008f7c0 (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at
[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14257686#comment-14257686 ] Bhavesh Mistry edited comment on KAFKA-1642 at 12/24/14 12:02 AM: -- [~ewencp], Patch indeed solve the high CPU Problem reported by this bug. I have tested all brokers down, one broker down and two broker down (except for last use cases where one of the brokers runs out of Socket File Descriptor a rear case) : I am sorry for last response, I got busy with other stuff so testing got delayed. Here are some interesting Observations from YourKit: 0) Overall, patch has also brought down overall consumption in Normal Healthy or Happy case where every thing is up and running. In old code (without patch), I used to see about 10% of overall CPU used by io threads (4 in my case), it has reduce to 5% or less now with patch. 1) When two brokers are down, then occasionally I see IO thread blocked. ( I did not see this when one brokers is down) {code} kafka-producer-network-thread | rawlog [BLOCKED] [DAEMON] org.apache.kafka.clients.producer.internals.Metadata.fetch() Metadata.java:70 java.lang.Thread.run() Thread.java:744 {code} 2) record-error-rate metric remain zero despite following firewall rule. In my opinion, it should have called org.apache.kafka.clients.producer.Callback but I did not see that happening either in either one or two brokers down. Should I file another issue for this ? Please confirm. {code} sudo ipfw add reject tcp from me to b1.ip dst-port 9092 sudo ipfw add reject tcp from me to b2.ip dst-port 9092 00100 reject tcp from me to b1.ip dst-port 9092 00200 reject tcp from me to b2.ip dst-port 9092 {code} {code} class LoggingCallBaHandler implements Callback { /** * A callback method the user can implement to provide asynchronous * handling of request completion. This method will be called when the * record sent to the server has been acknowledged. Exactly one of the * arguments will be non-null. * * @param metadata *The metadata for the record that was sent (i.e. the *partition and offset). Null if an error occurred. * @param exception *The exception thrown during processing of this record. *Null if no error occurred. */ @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if(exception != null){ exception.printStackTrace(); } } } {code} I do not see any exception at all on consolenot sure why ? 3) Application does NOT gracefully shutdown when there one or more brokers are down. (io Thread never exits this is know issue ) {code} SIGTERM handler daemon prio=5 tid=0x7f8bd79e4000 nid=0x17907 waiting for monitor entry [0x00011e906000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock 0x00070008f7c0 (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) SIGTERM handler daemon prio=5 tid=0x7f8bd5159000 nid=0x1cb0b waiting for monitor entry [0x00011e803000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock 0x00070008f7c0 (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) SIGTERM handler daemon prio=5 tid=0x7f8bdd147800 nid=0x15d0b waiting for monitor entry [0x00011e30a000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock 0x00070008f7c0 (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) SIGTERM handler daemon prio=5 tid=0x7f8bdf82 nid=0x770b waiting for monitor entry [0x00011e207000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock 0x00070008f7c0 (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at
[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14239063#comment-14239063 ] Bhavesh Mistry edited comment on KAFKA-1642 at 12/9/14 6:53 AM: [~stevenz3wu], 0.8.2 is very well tested and worked well under heavy load. This bug is rare only happen when broker or network has issue. We have been producing about 7 to 10 TB per day using this new producer, so 0.8.2 is very safe to use in production. It has survived pick traffic of the year on large e-commerce site. So I am fairly confident that New Java API is indeed does true round-robin and much faster than Scala Based API. [~ewencp], I will verify the patch by end of this Friday, but do let me know your understanding based on my last comment. The goal is to rest this issue and cover all the use case. Thanks, Bhavesh was (Author: bmis13): [~stevenz3wu], 0.8.2 is very well tested and worked well under heavy load. This bug is rare only happen when broker or network has issue. We have been producing about 7 to 10 TB per day using this new producer, so 0.8.2 is very safe to use in production. It has survived pick traffic of the year on large e-commerce site. So I am fairly confident that New Java API is indeed does true round-robin and much faster than Scala Based API. [~ewencp], I will verify the patch by end of this Friday, but do let me know your understanding based on my last comment. Thanks, Bhavesh [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Priority: Blocker Fix For: 0.8.2 Attachments: 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, KAFKA-1642.patch, KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, KAFKA-1642_2014-10-23_16:19:41.patch I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14232061#comment-14232061 ] Bhavesh Mistry edited comment on KAFKA-1642 at 12/2/14 8:04 PM: Hi [~ewencp], I will not have time to validate this patch till next week. Here is my comments: 1) Producer.close() method issue is not address with patch. In event of network connection lost or other events happens, IO thread will not be killed and close method hangs. In patch that I have provided, I had timeout for join method and interrupted IO thread. I think we need similar solution. 2) Also, can we please add JMX monitoring for IO tread to know how quick it is running. It will great to add this and run() method will report duration to metric in nano sec. {code} try{ ThreadMXBean bean = ManagementFactory.getThreadMXBean( ); if(bean.isThreadCpuTimeSupported() bean.isThreadCpuTimeEnabled()){ this.ioTheadCPUTime = metrics.sensor(iothread-cpu); this.ioTheadCPUTime.add(iothread-cpu-ms, The Rate Of CPU Cycle used by iothead in NANOSECONDS, new Rate(TimeUnit.NANOSECONDS) { public double measure(MetricConfig config, long now) { return (now - metadata.lastUpdate()) / 1000.0; } }); } }catch(Throwable th){ log.warn(Not able to set the CPU time... etc); } {code} 3) Please check the timeout final value in *pollTimeout* if it is zero for constantly then we need to slow IO thread down. 4) Defensive check is need for back off in run() method when IO thread is aggressive. {code} while (running) { long start = time.milliseconds(); try { run(time.milliseconds()); } catch (Exception e) { log.error(Uncaught error in kafka producer I/O thread: , e); }finally{ long durationInMs = time.milliseconds() - start; // TODO Fix ME HERE GET DO exponential back-off sleep etc to prevent still CPU CYCLE HERE ?? How Much ...for the edge case... if(durationInMs 200){ if(client.isAllRegistredNodesAreDown()){ countinuousRetry++; /// TODO MAKE THIS CONSTANT CONFIGURATION. when do we rest this interval ? so we can try aggressive again... sleepInMs = ((long) Math.pow(2, countinuousRetry) * 500); }else{ sleepInMs = 500 ; countinuousRetry = 0; } // Wait until the desired next time arrives using nanosecond // accuracy timer (wait(time) isn't accurate enough on most platforms) try { // TODO SLEEP IS NOT GOOD SOLUTON.. Thread.sleep(sleepInMs); } catch (InterruptedException e) { log.error(While sleeping some one interupted this tread probally close method on prodcuer close () ); } } } } {code} 5) When all nodes are disconnected, do you still want to spin the IO Thread ? 6) When you have a firewall rule that says you can only have 2 concurrent TCP connections from Client to Brokers and client still have live TCP connection to same node (Broker), but new TCP connections are rejected. Node State will be marked as Disconnected in initiateConnect ? Is this case handled gracefully ? By the way, thank you very much for quick reply and with new patch. I appreciate your help. Thanks, Bhavesh was (Author: bmis13): Hi [~ewencp], I will not have time to validate this patch till next week. Here is my comments: 1) You still have not address the Producer.close() method issue that in event of network connection lost or other events happens, IO thread will not be killed and close method hangs. In patch that I have provided, I had timeout for join method and interrupted IO thread. I think we need similar for this. 2) Also, can we please add JMX monitoring for IO tread to know how quick it is running. It will great to add this and run() method will report duration to metric. {code} try{ ThreadMXBean bean = ManagementFactory.getThreadMXBean( ); if(bean.isThreadCpuTimeSupported() bean.isThreadCpuTimeEnabled()){ this.ioTheadCPUTime =
[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14226751#comment-14226751 ] Bhavesh Mistry edited comment on KAFKA-1642 at 11/26/14 8:08 PM: - [~ewencp], Even setting long following parameter, states of system does get impacted does not matter what reconnect.backoff.ms and retry.backoff.ms is set to. Once Node state is removed, the time out is set to 0. Please see the following logs. #15 minutes reconnect.backoff.ms=90 retry.backoff.ms=90 {code} 2014-11-26 11:01:27.898 Kafka Drop message topic=.rawlog org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 6 ms. 2014-11-26 11:02:27.903 Kafka Drop message topic=.rawlog org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 6 ms. 2014-11-26 11:03:27.903 Kafka Drop message topic=.rawlog org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 6 ms. 2014-11-26 11:04:27.903 Kafka Drop message topic=.rawlog org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 6 ms. 2014-11-26 11:05:27.904 Kafka Drop message topic=.rawlog org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 6 ms. 2014-11-26 11:06:27.905 Kafka Drop message topic=.rawlog org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 6 ms. 2014-11-26 11:07:27.906 Kafka Drop message topic=.rawlog org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 6 ms. 2014-11-26 11:08:27.908 Kafka Drop message topic=.rawlog org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 6 ms. 2014-11-26 11:09:27.908 Kafka Drop message topic=.rawlog org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 6 ms. 2014-11-26 11:10:27.909 Kafka Drop message topic=.rawlog org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 6 ms. 2014-11-26 11:11:27.909 Kafka Drop message topic=.rawlog org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 6 ms. 2014-11-26 11:12:27.910 Kafka Drop message topic=.rawlog org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 6 ms. 2014-11-26 11:13:27.911 Kafka Drop message topic=.rawlog org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 6 ms. 2014-11-26 11:14:27.912 Kafka Drop message topic=.rawlog org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 6 ms. 2014-11-26 11:15:27.914 Kafka Drop message topic=.rawlog org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 6 ms. 2014-11-26 11:00:27.613 [kafka-producer-network-thread | heartbeat] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: 2014-11-26 11:00:27.613 [kafka-producer-network-thread | rawlog] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -1 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:131) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:120) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:407) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:393) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:187) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:184) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) java.lang.IllegalStateException: No entry found for node -3 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:131) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:120) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:407) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:393) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:187) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:184) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) 2014-11-26 11:00:27.613 [kafka-producer-network-thread | heartbeat] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: 2014-11-26 11:00:27.613 [kafka-producer-network-thread | error] ERROR
[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14223161#comment-14223161 ] Bhavesh Mistry edited comment on KAFKA-1642 at 11/24/14 5:26 PM: - [~ewencp], The way to reproduce this is to simulate network instability by turning on and off network service (or turn on/off physical cable). The connect and see if recover and disconnect and connect again etc.. you will see the behavior again and again. The issue is also with connection state management : {code} private void initiateConnect(Node node, long now) { try { log.debug(Initiating connection to node {} at {}:{}., node.id(), node.host(), node.port()); // TODO FIX java.lang.IllegalStateException: No entry found for node -3 (We need put before remove it..).. this.connectionStates.connecting(node.id(), now); (This line has problem because it will loose previous last attempt made get above exception and it will try to connect to that node for ever and ever with exception ) selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer); } catch (IOException e) { /* attempt failed, we'll try again after the backoff */ connectionStates.disconnectedWhenConnectting(node.id()); /* maybe the problem is our metadata, update it */ metadata.requestUpdate(); log.debug(Error connecting to node {} at {}:{}:, node.id(), node.host(), node.port(), e); } } {code} In my opinion, regardless of what node status is in run() method needs to be safe-guarded from still CPU Cycle when there is no state for Node. (Hence I have added exponential sleep as temp solution to not to still CPU cycle , I think must protect it some how and check the execution time...) Please let me know if you need more info and i will be more than happy to reproduce bug and we can have conference call, and I can show you the problem. Thanks, Bhavesh was (Author: bmis13): [~ewencp], The way to reproduce this is to simulate network instability by turning on and off network service (or turn on/off physical cable). The connect and see if recover and disconnect and connect again etc.. you will see the behavior again and again. The issue is also with connection state management : {code} private void initiateConnect(Node node, long now) { try { log.debug(Initiating connection to node {} at {}:{}., node.id(), node.host(), node.port()); // TODO FIX java.lang.IllegalStateException: No entry found for node -3 (We need put before remove it..).. this.connectionStates.connecting(node.id(), now); (This line has problem because it will loose previous last attempt made get above exception and it will try to connect to that node for ever and ever with exception ) selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer); } catch (IOException e) { /* attempt failed, we'll try again after the backoff */ connectionStates.disconnectedWhenConnectting(node.id()); /* maybe the problem is our metadata, update it */ metadata.requestUpdate(); log.debug(Error connecting to node {} at {}:{}:, node.id(), node.host(), node.port(), e); } } {code] In my opinion, regardless of what node status is in run() method needs to be safe-guarded from still CPU Cycle when there is no state for Node. (Hence I have added exponential sleep as temp solution to not to still CPU cycle , I think must protect it some how and check the execution time...) Please let me know if you need more info and i will be more than happy to reproduce bug and we can have conference call, and I can show you the problem. Thanks, Bhavesh [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Fix For: 0.8.2 Attachments: 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, KAFKA-1642_2014-10-23_16:19:41.patch I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR
[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14223161#comment-14223161 ] Bhavesh Mistry edited comment on KAFKA-1642 at 11/24/14 5:27 PM: - [~ewencp], The way to reproduce this is to simulate network instability by turning on and off network service (or turn on/off physical cable). The connect and see if recover and disconnect and connect again etc.. you will see the behavior again and again. The issue is also with connection state management : {code} private void initiateConnect(Node node, long now) { try { log.debug(Initiating connection to node {} at {}:{}., node.id(), node.host(), node.port()); // TODO FIX java.lang.IllegalStateException: No entry found for node -3 (We need put before remove it..).. this.connectionStates.connecting(node.id(), now); (This line has problem because it will loose previous last attempt made get above exception and it will try to connect to that node for ever and ever with exception ) selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer); } catch (IOException e) { /* attempt failed, we'll try again after the backoff */ connectionStates.disconnectedWhenConnectting(node.id()); /* maybe the problem is our metadata, update it */ metadata.requestUpdate(); log.debug(Error connecting to node {} at {}:{}:, node.id(), node.host(), node.port(), e); } } {code} In my opinion, regardless of what node status is in run() method needs to be safe-guarded from still CPU Cycle when there is no state for Node. (Hence I have added exponential sleep as temp solution to not to still CPU cycle , I think must protect it some how and check the execution time...) Please let me know if you need more info and i will be more than happy to reproduce bug and we can have conference call, and I can show you the problem. Based on code diff I have done from 0.8.1.1 tag and this. This issue also occur in 0.8.1.1 as well I think. Thanks, Bhavesh was (Author: bmis13): [~ewencp], The way to reproduce this is to simulate network instability by turning on and off network service (or turn on/off physical cable). The connect and see if recover and disconnect and connect again etc.. you will see the behavior again and again. The issue is also with connection state management : {code} private void initiateConnect(Node node, long now) { try { log.debug(Initiating connection to node {} at {}:{}., node.id(), node.host(), node.port()); // TODO FIX java.lang.IllegalStateException: No entry found for node -3 (We need put before remove it..).. this.connectionStates.connecting(node.id(), now); (This line has problem because it will loose previous last attempt made get above exception and it will try to connect to that node for ever and ever with exception ) selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer); } catch (IOException e) { /* attempt failed, we'll try again after the backoff */ connectionStates.disconnectedWhenConnectting(node.id()); /* maybe the problem is our metadata, update it */ metadata.requestUpdate(); log.debug(Error connecting to node {} at {}:{}:, node.id(), node.host(), node.port(), e); } } {code} In my opinion, regardless of what node status is in run() method needs to be safe-guarded from still CPU Cycle when there is no state for Node. (Hence I have added exponential sleep as temp solution to not to still CPU cycle , I think must protect it some how and check the execution time...) Please let me know if you need more info and i will be more than happy to reproduce bug and we can have conference call, and I can show you the problem. Thanks, Bhavesh [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Fix For: 0.8.2 Attachments: 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, KAFKA-1642_2014-10-23_16:19:41.patch I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following
[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14223161#comment-14223161 ] Bhavesh Mistry edited comment on KAFKA-1642 at 11/24/14 6:57 PM: - [~ewencp], The way to reproduce this is to simulate network instability by turning on and off network service (or turn on/off physical cable). The connect and see if recover and disconnect and connect again etc.. you will see the behavior again and again. The issue is also with connection state management : {code} private void initiateConnect(Node node, long now) { try { log.debug(Initiating connection to node {} at {}:{}., node.id(), node.host(), node.port()); // TODO FIX java.lang.IllegalStateException: No entry found for node -3 (We need put before remove it..).. this.connectionStates.connecting(node.id(), now); (This line has problem because it will loose previous last attempt made get above exception and it will try to connect to that node for ever and ever with exception ) selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer); } catch (IOException e) { /* attempt failed, we'll try again after the backoff */ connectionStates.disconnectedWhenConnectting(node.id()); /* maybe the problem is our metadata, update it */ metadata.requestUpdate(); log.debug(Error connecting to node {} at {}:{}:, node.id(), node.host(), node.port(), e); } } {code} In my opinion, regardless of what node status is in run() method needs to be safe-guarded from still CPU Cycle when there is no state for Node. (Hence I have added exponential sleep as temp solution to not to stealing CPU cycle , I think must protect it some how and must check the execution time...) Please let me know if you need more info and i will be more than happy to reproduce bug and we can have conference call, and I can show you the problem. Based on code diff I have done from 0.8.1.1 tag and this. This issue also occur in 0.8.1.1 as well I think. Thanks, Bhavesh was (Author: bmis13): [~ewencp], The way to reproduce this is to simulate network instability by turning on and off network service (or turn on/off physical cable). The connect and see if recover and disconnect and connect again etc.. you will see the behavior again and again. The issue is also with connection state management : {code} private void initiateConnect(Node node, long now) { try { log.debug(Initiating connection to node {} at {}:{}., node.id(), node.host(), node.port()); // TODO FIX java.lang.IllegalStateException: No entry found for node -3 (We need put before remove it..).. this.connectionStates.connecting(node.id(), now); (This line has problem because it will loose previous last attempt made get above exception and it will try to connect to that node for ever and ever with exception ) selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer); } catch (IOException e) { /* attempt failed, we'll try again after the backoff */ connectionStates.disconnectedWhenConnectting(node.id()); /* maybe the problem is our metadata, update it */ metadata.requestUpdate(); log.debug(Error connecting to node {} at {}:{}:, node.id(), node.host(), node.port(), e); } } {code} In my opinion, regardless of what node status is in run() method needs to be safe-guarded from still CPU Cycle when there is no state for Node. (Hence I have added exponential sleep as temp solution to not to still CPU cycle , I think must protect it some how and check the execution time...) Please let me know if you need more info and i will be more than happy to reproduce bug and we can have conference call, and I can show you the problem. Based on code diff I have done from 0.8.1.1 tag and this. This issue also occur in 0.8.1.1 as well I think. Thanks, Bhavesh [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Fix For: 0.8.2 Attachments: 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, KAFKA-1642_2014-10-23_16:19:41.patch I see my CPU
[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14223532#comment-14223532 ] Bhavesh Mistry edited comment on KAFKA-1642 at 11/24/14 9:21 PM: - [~ewencp], Also Regarding KafkaProder.close() method hangs for ever because of following loop, and {code} Sender.java // okay we stopped accepting requests but there may still be // requests in the accumulator or waiting for acknowledgment, // wait until these are completed. while (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() 0) { try { run(time.milliseconds()); } catch (Exception e) { log.error(Uncaught error in kafka producer I/O thread: , e); } } KafkaProducer.java /** * Close this producer. This method blocks until all in-flight requests complete. */ @Override public void close() { log.trace(Closing the Kafka producer.); this.sender.initiateClose(); try { this.ioThread.join(); // THIS IS BLOCKED since ioThread does not give up. } catch (InterruptedException e) { throw new KafkaException(e); } this.metrics.close(); log.debug(The Kafka producer has closed.); } {code} The issue describe in KAFKA-1788 is likelihood, but if you look the close call stack then calling thread that initiated the close() will hang till io thread dies (which it never dies when data is there and network is down). Thanks, Bhavesh was (Author: bmis13): Also Regarding KafkaProder.close() method hangs for ever because of following loop, and {code} Sender.java // okay we stopped accepting requests but there may still be // requests in the accumulator or waiting for acknowledgment, // wait until these are completed. while (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() 0) { try { run(time.milliseconds()); } catch (Exception e) { log.error(Uncaught error in kafka producer I/O thread: , e); } } KafkaProducer.java /** * Close this producer. This method blocks until all in-flight requests complete. */ @Override public void close() { log.trace(Closing the Kafka producer.); this.sender.initiateClose(); try { this.ioThread.join(); // THIS IS BLOCKED since ioThread does not give up. } catch (InterruptedException e) { throw new KafkaException(e); } this.metrics.close(); log.debug(The Kafka producer has closed.); } {code} The issue describe in KAFKA-1788 is likelihood, but if you look the close call stack then calling thread that initiated the close() will hang till io thread dies (which it never dies when data is there and network is down). Thanks, Bhavesh [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Fix For: 0.8.2 Attachments: 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, KAFKA-1642_2014-10-23_16:19:41.patch I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14223532#comment-14223532 ] Bhavesh Mistry edited comment on KAFKA-1642 at 11/24/14 9:22 PM: - [~ewencp], Also Regarding KafkaProder.close() method hangs for ever because of following loop, and {code} Sender.java // okay we stopped accepting requests but there may still be // requests in the accumulator or waiting for acknowledgment, // wait until these are completed. while (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() 0) { try { run(time.milliseconds()); } catch (Exception e) { log.error(Uncaught error in kafka producer I/O thread: , e); } } KafkaProducer.java /** * Close this producer. This method blocks until all in-flight requests complete. */ @Override public void close() { log.trace(Closing the Kafka producer.); this.sender.initiateClose(); try { this.ioThread.join(); // THIS IS BLOCKED since ioThread does not give up so it is all related in my opinion. } catch (InterruptedException e) { throw new KafkaException(e); } this.metrics.close(); log.debug(The Kafka producer has closed.); } {code} The issue describe in KAFKA-1788 is likelihood, but if you look the close call stack then calling thread that initiated the close() will hang till io thread dies (which it never dies when data is there and network is down). Thanks, Bhavesh was (Author: bmis13): [~ewencp], Also Regarding KafkaProder.close() method hangs for ever because of following loop, and {code} Sender.java // okay we stopped accepting requests but there may still be // requests in the accumulator or waiting for acknowledgment, // wait until these are completed. while (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() 0) { try { run(time.milliseconds()); } catch (Exception e) { log.error(Uncaught error in kafka producer I/O thread: , e); } } KafkaProducer.java /** * Close this producer. This method blocks until all in-flight requests complete. */ @Override public void close() { log.trace(Closing the Kafka producer.); this.sender.initiateClose(); try { this.ioThread.join(); // THIS IS BLOCKED since ioThread does not give up. } catch (InterruptedException e) { throw new KafkaException(e); } this.metrics.close(); log.debug(The Kafka producer has closed.); } {code} The issue describe in KAFKA-1788 is likelihood, but if you look the close call stack then calling thread that initiated the close() will hang till io thread dies (which it never dies when data is there and network is down). Thanks, Bhavesh [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Fix For: 0.8.2 Attachments: 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, KAFKA-1642_2014-10-23_16:19:41.patch I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14223626#comment-14223626 ] Bhavesh Mistry edited comment on KAFKA-1642 at 11/24/14 10:16 PM: -- Also, there is issue in my experimental patch. I did not update the lastConnectAttemptMs...in connecting state method to solve the issue with illegal sate exp: {code} /** * Enter the connecting state for the given node. * @param node The id of the node we are connecting to * @param now The current time. */ public void connecting(int node, long now) { NodeConnectionState nodeConn = nodeState.get(node); if(nodeConn == null){ nodeState.put(node, new NodeConnectionState(ConnectionState.CONNECTING, now)); }else{ nodeConn.state = ConnectionState.CONNECTING; nodeConn.lastConnectAttemptMs = now; (This will capture and update last connection attempt) } } {code} was (Author: bmis13): Also, there is issue in my last patch. I did not update the lastConnectAttemptMs...in connecting. {code} /** * Enter the connecting state for the given node. * @param node The id of the node we are connecting to * @param now The current time. */ public void connecting(int node, long now) { NodeConnectionState nodeConn = nodeState.get(node); if(nodeConn == null){ nodeState.put(node, new NodeConnectionState(ConnectionState.CONNECTING, now)); }else{ nodeConn.state = ConnectionState.CONNECTING; nodeConn.lastConnectAttemptMs = now; (This will capture and update last connection attempt) } } {code} [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Fix For: 0.8.2 Attachments: 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, KAFKA-1642_2014-10-23_16:19:41.patch I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14223779#comment-14223779 ] Bhavesh Mistry edited comment on KAFKA-1642 at 11/25/14 1:31 AM: - [~ewencp], Thanks for looking into this really appreciate your response. Also, do you think rapid connect and disconnect is also due to incorrect Node state management ? connecting method and initiateConnection also ? Also, Can we also take the defensive coding and have protection in this tight infinite loop to throttle CPU cycle if it ends up with start-end duration is below some xx ms. This will actually prevent this issues.We had this issue on Prod so I just wanted to highlight the impact of 325% CPU and excessive logging. Thanks, Bhavesh was (Author: bmis13): [~ewencp], Thanks for looking into this really appreciate your response. Also, do you think rapid connect and disconnect is also due to incorrect Node state management ? connecting method and initiateConnection also ? Thanks, Bhavesh [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.1.1, 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Priority: Blocker Fix For: 0.8.2 Attachments: 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, KAFKA-1642_2014-10-23_16:19:41.patch I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14224041#comment-14224041 ] Bhavesh Mistry edited comment on KAFKA-1642 at 11/25/14 4:37 AM: - [~ewencp], I hope above steps will give you comprehensive steps to reproduce problems with run() method. It would be really great if we can make the client more resilient and robust so network and brokers instability does not cause CPU spikes and degrade application performance. Hence, I would strongly at least detect the time run(time) is taking and do based on some configuration, we can do CPU Throttling just to be more defensive or at lest detect that io thread is taking CPU cycle. By the way the experimental patch still works for steps describe above as well due to hard coded back-off. Any time you have patch or any thing, please let me know I will test it out. Once thanks for your detail analysis. Please look into to ClusterConnectionStates and how it manage the state of node when disconnecting immediately . please look into connecting(int node, long now) and this (I feel connecting needs to come before not after). selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer); this.connectionStates.connecting(node.id(), now); Thanks, Bhavesh was (Author: bmis13): [~ewencp], I hope above steps will give you comprehensive steps to reproduce problems with run() method. It would be really great if we can make the client more resilient and robust so network and brokers instability does not cause CPU spikes and degrade application performance. Hence, I would strongly at least detect the time run(time) is taking and do based on some configuration, we can do CPU Throttling just to be more defensive or at lest detect that io thread is taking CPU cycle. By the way the experimental patch still works for steps describe above. Thanks, Bhavesh [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.1.1, 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Priority: Blocker Fix For: 0.8.2 Attachments: 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, KAFKA-1642_2014-10-23_16:19:41.patch I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14224041#comment-14224041 ] Bhavesh Mistry edited comment on KAFKA-1642 at 11/25/14 4:39 AM: - [~ewencp], I hope above steps will give you comprehensive steps to reproduce problems with run() method. It would be really great if we can make the client more resilient and robust so network and brokers instability does not cause CPU spikes and degrade application performance. Hence, I would strongly at least detect the time run(time) is taking and do some stats based on some configuration, we can do CPU Throttling (if need) just to be more defensive or at lest detect that io thread is taking CPU cycle. By the way the experimental patch still works for steps describe above as well due to hard coded back-off. Any time you have patch or any thing, please let me know I will test it out ( you have my email id) . Once again thanks for your detail analysis and looking at this at short notice. Please look into to ClusterConnectionStates and how it manage the state of node when disconnecting immediately . please look into connecting(int node, long now) and this (I feel connecting needs to come before not after). selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer); this.connectionStates.connecting(node.id(), now); Thanks, Bhavesh was (Author: bmis13): [~ewencp], I hope above steps will give you comprehensive steps to reproduce problems with run() method. It would be really great if we can make the client more resilient and robust so network and brokers instability does not cause CPU spikes and degrade application performance. Hence, I would strongly at least detect the time run(time) is taking and do based on some configuration, we can do CPU Throttling just to be more defensive or at lest detect that io thread is taking CPU cycle. By the way the experimental patch still works for steps describe above as well due to hard coded back-off. Any time you have patch or any thing, please let me know I will test it out. Once thanks for your detail analysis. Please look into to ClusterConnectionStates and how it manage the state of node when disconnecting immediately . please look into connecting(int node, long now) and this (I feel connecting needs to come before not after). selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer); this.connectionStates.connecting(node.id(), now); Thanks, Bhavesh [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.1.1, 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Priority: Blocker Fix For: 0.8.2 Attachments: 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, KAFKA-1642_2014-10-23_16:19:41.patch I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14224041#comment-14224041 ] Bhavesh Mistry edited comment on KAFKA-1642 at 11/25/14 4:40 AM: - [~ewencp], I hope above steps will give you comprehensive steps to reproduce problems with run() method. It would be really great if we can make the client more resilient and robust so network and brokers instability does not cause CPU spikes and degrade application performance. Hence, I would strongly at least detect the time run(time) is taking and do some stats based on some configuration, we can do CPU Throttling (if need) just to be more defensive or at lest detect that io thread is taking CPU cycle. By the way the experimental patch still works for steps describe above as well due to hard coded back-off. Any time you have patch or any thing, please let me know I will test it out ( you have my email id) . Once again thanks for your detail analysis and looking at this at short notice. Please look into to ClusterConnectionStates and how it manage the state of node when disconnecting immediately . please look into connecting(int node, long now) and this (I feel connecting needs to come before not after). selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer); this.connectionStates.connecting(node.id(), now); Also, I still feel that produce.close() is also needs to be looked at (join() method with come configuration time out) Thanks, Bhavesh was (Author: bmis13): [~ewencp], I hope above steps will give you comprehensive steps to reproduce problems with run() method. It would be really great if we can make the client more resilient and robust so network and brokers instability does not cause CPU spikes and degrade application performance. Hence, I would strongly at least detect the time run(time) is taking and do some stats based on some configuration, we can do CPU Throttling (if need) just to be more defensive or at lest detect that io thread is taking CPU cycle. By the way the experimental patch still works for steps describe above as well due to hard coded back-off. Any time you have patch or any thing, please let me know I will test it out ( you have my email id) . Once again thanks for your detail analysis and looking at this at short notice. Please look into to ClusterConnectionStates and how it manage the state of node when disconnecting immediately . please look into connecting(int node, long now) and this (I feel connecting needs to come before not after). selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer); this.connectionStates.connecting(node.id(), now); Thanks, Bhavesh [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.1.1, 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Priority: Blocker Fix For: 0.8.2 Attachments: 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, KAFKA-1642_2014-10-23_16:19:41.patch I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14224046#comment-14224046 ] Bhavesh Mistry edited comment on KAFKA-1642 at 11/25/14 4:43 AM: - Also, Are you going to port back the patch to 0.8.1.1 version as well ? Please let me know also. Thanks, Bhavesh was (Author: bmis13): Also, Are you going to port back the back to 0.8.1.1 version as well ? Please let me know also. Thanks, Bhavesh [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.1.1, 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Priority: Blocker Fix For: 0.8.2 Attachments: 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, KAFKA-1642_2014-10-23_16:19:41.patch I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14224041#comment-14224041 ] Bhavesh Mistry edited comment on KAFKA-1642 at 11/25/14 5:37 AM: - [~ewencp], I hope above steps will give you comprehensive steps to reproduce problems with run() method. It would be really great if we can make the client more resilient and robust so network and brokers instability does not cause CPU spikes and degrade application performance. Hence, I would strongly at least detect the time run(time) is taking and do some stats based on some configuration, we can do CPU Throttling (if need) just to be more defensive or at lest detect that io thread is taking CPU cycle. By the way the experimental patch still works for steps describe above as well due to hard coded back-off. Any time you have patch or any thing, please let me know I will test it out ( you have my email id) . Once again thanks for your detail analysis and looking at this at short notice. Please look into to ClusterConnectionStates and how it manage the state of node when disconnecting immediately . please look into connecting(int node, long now) and this (I feel connecting needs to come before not after). selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer); this.connectionStates.connecting(node.id(), now); Also, I still feel that produce.close() is also needs to be looked at (join() method with some configuration time out so thread does not hang) Thanks, Bhavesh was (Author: bmis13): [~ewencp], I hope above steps will give you comprehensive steps to reproduce problems with run() method. It would be really great if we can make the client more resilient and robust so network and brokers instability does not cause CPU spikes and degrade application performance. Hence, I would strongly at least detect the time run(time) is taking and do some stats based on some configuration, we can do CPU Throttling (if need) just to be more defensive or at lest detect that io thread is taking CPU cycle. By the way the experimental patch still works for steps describe above as well due to hard coded back-off. Any time you have patch or any thing, please let me know I will test it out ( you have my email id) . Once again thanks for your detail analysis and looking at this at short notice. Please look into to ClusterConnectionStates and how it manage the state of node when disconnecting immediately . please look into connecting(int node, long now) and this (I feel connecting needs to come before not after). selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer); this.connectionStates.connecting(node.id(), now); Also, I still feel that produce.close() is also needs to be looked at (join() method with come configuration time out) Thanks, Bhavesh [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.1.1, 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Priority: Blocker Fix For: 0.8.2 Attachments: 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, KAFKA-1642_2014-10-23_16:19:41.patch I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14222571#comment-14222571 ] Bhavesh Mistry edited comment on KAFKA-1642 at 11/24/14 1:31 AM: - The patch provided does not solve the problem. When you have more than one or more producer instance, the effect amplifies. org.apache.kafka.clients.producer.internals.Send.run() takes 100% CPU due to infinite loop when there is no brokers (no work to be done to dump data). Thanks, Bhavesh was (Author: bmis13): The patch provided does not solve the problem. When you have more than one producer instance, the effect amplifies. org.apache.kafka.clients.producer.internals.Send.run() takes 100% CPU due to infinite loop when there is no brokers. Thanks, Bhavesh [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Fix For: 0.8.2 Attachments: KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, KAFKA-1642_2014-10-23_16:19:41.patch I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14169415#comment-14169415 ] Bhavesh Mistry edited comment on KAFKA-1642 at 10/13/14 4:08 PM: - {code TestNetworkDownProducer.java} import java.io.IOException; import java.io.InputStream; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; public class TestNetworkDownProducer { static int numberTh = 200; static CountDownLatch latch = new CountDownLatch(200); public static void main(String[] args) throws IOException, InterruptedException { Properties prop = new Properties(); InputStream propFile = Thread.currentThread().getContextClassLoader() .getResourceAsStream(kafkaproducer.properties); String topic = logmon.test; prop.load(propFile); System.out.println(Property: + prop.toString()); StringBuilder builder = new StringBuilder(1024); int msgLenth = 256; for (int i = 0; i msgLenth; i++) builder.append(a); int numberOfProducer = 4; Producer[] producer = new Producer[numberOfProducer]; for (int i = 0; i producer.length; i++) { producer[i] = new KafkaProducer(prop); } ExecutorService service = new ThreadPoolExecutor(numberTh, numberTh, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueueRunnable(numberTh *2)); for(int i = 0 ; i numberTh;i++){ service.execute(new MyProducer(producer,10,builder.toString(), topic)); } latch.await(); System.out.println(All Producers done...!); for (int i = 0; i producer.length; i++) { producer[i].close(); } service.shutdownNow(); System.out.println(All done...!); } static class MyProducer implements Runnable { Producer[] producer; long maxloops; String msg ; String topic; MyProducer(Producer[] list, long maxloops,String msg,String topic){ this.producer = list; this.maxloops = maxloops; this.msg = msg; this.topic = topic; } public void run() { ProducerRecord record = new ProducerRecord(topic, msg.toString().getBytes()); Callback callBack = new MyCallback(); try{ for(long j=0 ; j maxloops ; j++){ try { for (int i = 0; i producer.length; i++) { producer[i].send(record, callBack); } Thread.sleep(10); } catch (Throwable th) { System.err.println(FATAL ); th.printStackTrace(); } } }finally { latch.countDown(); } } } static class MyCallback implements Callback { public void onCompletion(RecordMetadata metadata, Exception exception) { if(exception != null){ System.err.println(Msg dropped..!); exception.printStackTrace(); } } } } {code} {code: kafkaproducer.properties } # THIS IS FOR NEW PRODUCERS API TRUNK Please see the configuration at https://kafka.apache.org/documentation.html#newproducerconfigs # Broker List bootstrap.servers= BROKERS HERE... #Data Acks acks=1 # 64MB of
[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14169415#comment-14169415 ] Bhavesh Mistry edited comment on KAFKA-1642 at 10/13/14 4:09 PM: - {code} import java.io.IOException; import java.io.InputStream; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; public class TestNetworkDownProducer { static int numberTh = 200; static CountDownLatch latch = new CountDownLatch(200); public static void main(String[] args) throws IOException, InterruptedException { Properties prop = new Properties(); InputStream propFile = Thread.currentThread().getContextClassLoader() .getResourceAsStream(kafkaproducer.properties); String topic = test; prop.load(propFile); System.out.println(Property: + prop.toString()); StringBuilder builder = new StringBuilder(1024); int msgLenth = 256; for (int i = 0; i msgLenth; i++) builder.append(a); int numberOfProducer = 4; Producer[] producer = new Producer[numberOfProducer]; for (int i = 0; i producer.length; i++) { producer[i] = new KafkaProducer(prop); } ExecutorService service = new ThreadPoolExecutor(numberTh, numberTh, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueueRunnable(numberTh *2)); for(int i = 0 ; i numberTh;i++){ service.execute(new MyProducer(producer,10,builder.toString(), topic)); } latch.await(); System.out.println(All Producers done...!); for (int i = 0; i producer.length; i++) { producer[i].close(); } service.shutdownNow(); System.out.println(All done...!); } static class MyProducer implements Runnable { Producer[] producer; long maxloops; String msg ; String topic; MyProducer(Producer[] list, long maxloops,String msg,String topic){ this.producer = list; this.maxloops = maxloops; this.msg = msg; this.topic = topic; } public void run() { ProducerRecord record = new ProducerRecord(topic, msg.toString().getBytes()); Callback callBack = new MyCallback(); try{ for(long j=0 ; j maxloops ; j++){ try { for (int i = 0; i producer.length; i++) { producer[i].send(record, callBack); } Thread.sleep(10); } catch (Throwable th) { System.err.println(FATAL ); th.printStackTrace(); } } }finally { latch.countDown(); } } } static class MyCallback implements Callback { public void onCompletion(RecordMetadata metadata, Exception exception) { if(exception != null){ System.err.println(Msg dropped..!); exception.printStackTrace(); } } } } {code} Property File {code } # THIS IS FOR NEW PRODUCERS API TRUNK Please see the configuration at https://kafka.apache.org/documentation.html#newproducerconfigs # Broker List bootstrap.servers= BROKERS HERE... #Data Acks acks=1 # 64MB of Buffer for log lines (including all messages).
[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14169415#comment-14169415 ] Bhavesh Mistry edited comment on KAFKA-1642 at 10/13/14 4:09 PM: - {code} import java.io.IOException; import java.io.InputStream; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; public class TestNetworkDownProducer { static int numberTh = 200; static CountDownLatch latch = new CountDownLatch(200); public static void main(String[] args) throws IOException, InterruptedException { Properties prop = new Properties(); InputStream propFile = Thread.currentThread().getContextClassLoader() .getResourceAsStream(kafkaproducer.properties); String topic = test; prop.load(propFile); System.out.println(Property: + prop.toString()); StringBuilder builder = new StringBuilder(1024); int msgLenth = 256; for (int i = 0; i msgLenth; i++) builder.append(a); int numberOfProducer = 4; Producer[] producer = new Producer[numberOfProducer]; for (int i = 0; i producer.length; i++) { producer[i] = new KafkaProducer(prop); } ExecutorService service = new ThreadPoolExecutor(numberTh, numberTh, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueueRunnable(numberTh *2)); for(int i = 0 ; i numberTh;i++){ service.execute(new MyProducer(producer,10,builder.toString(), topic)); } latch.await(); System.out.println(All Producers done...!); for (int i = 0; i producer.length; i++) { producer[i].close(); } service.shutdownNow(); System.out.println(All done...!); } static class MyProducer implements Runnable { Producer[] producer; long maxloops; String msg ; String topic; MyProducer(Producer[] list, long maxloops,String msg,String topic){ this.producer = list; this.maxloops = maxloops; this.msg = msg; this.topic = topic; } public void run() { ProducerRecord record = new ProducerRecord(topic, msg.toString().getBytes()); Callback callBack = new MyCallback(); try{ for(long j=0 ; j maxloops ; j++){ try { for (int i = 0; i producer.length; i++) { producer[i].send(record, callBack); } Thread.sleep(10); } catch (Throwable th) { System.err.println(FATAL ); th.printStackTrace(); } } }finally { latch.countDown(); } } } static class MyCallback implements Callback { public void onCompletion(RecordMetadata metadata, Exception exception) { if(exception != null){ System.err.println(Msg dropped..!); exception.printStackTrace(); } } } } {code} {code } # THIS IS FOR NEW PRODUCERS API TRUNK Please see the configuration at https://kafka.apache.org/documentation.html#newproducerconfigs # Broker List bootstrap.servers= BROKERS HERE... #Data Acks acks=1 # 64MB of Buffer for log lines (including all messages).
[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14169415#comment-14169415 ] Bhavesh Mistry edited comment on KAFKA-1642 at 10/13/14 4:10 PM: - {code} import java.io.IOException; import java.io.InputStream; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; public class TestNetworkDownProducer { static int numberTh = 200; static CountDownLatch latch = new CountDownLatch(200); public static void main(String[] args) throws IOException, InterruptedException { Properties prop = new Properties(); InputStream propFile = Thread.currentThread().getContextClassLoader() .getResourceAsStream(kafkaproducer.properties); String topic = test; prop.load(propFile); System.out.println(Property: + prop.toString()); StringBuilder builder = new StringBuilder(1024); int msgLenth = 256; for (int i = 0; i msgLenth; i++) builder.append(a); int numberOfProducer = 4; Producer[] producer = new Producer[numberOfProducer]; for (int i = 0; i producer.length; i++) { producer[i] = new KafkaProducer(prop); } ExecutorService service = new ThreadPoolExecutor(numberTh, numberTh, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueueRunnable(numberTh *2)); for(int i = 0 ; i numberTh;i++){ service.execute(new MyProducer(producer,10,builder.toString(), topic)); } latch.await(); System.out.println(All Producers done...!); for (int i = 0; i producer.length; i++) { producer[i].close(); } service.shutdownNow(); System.out.println(All done...!); } static class MyProducer implements Runnable { Producer[] producer; long maxloops; String msg ; String topic; MyProducer(Producer[] list, long maxloops,String msg,String topic){ this.producer = list; this.maxloops = maxloops; this.msg = msg; this.topic = topic; } public void run() { ProducerRecord record = new ProducerRecord(topic, msg.toString().getBytes()); Callback callBack = new MyCallback(); try{ for(long j=0 ; j maxloops ; j++){ try { for (int i = 0; i producer.length; i++) { producer[i].send(record, callBack); } Thread.sleep(10); } catch (Throwable th) { System.err.println(FATAL ); th.printStackTrace(); } } }finally { latch.countDown(); } } } static class MyCallback implements Callback { public void onCompletion(RecordMetadata metadata, Exception exception) { if(exception != null){ System.err.println(Msg dropped..!); exception.printStackTrace(); } } } } {code} This is property file used: {code } # THIS IS FOR NEW PRODUCERS API TRUNK Please see the configuration at https://kafka.apache.org/documentation.html#newproducerconfigs # Broker List bootstrap.servers= BROKERS HERE... #Data Acks acks=1 # 64MB of Buffer for log lines (including all
[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14169415#comment-14169415 ] Bhavesh Mistry edited comment on KAFKA-1642 at 10/13/14 4:11 PM: - {code} import java.io.IOException; import java.io.InputStream; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; public class TestNetworkDownProducer { static int numberTh = 200; static CountDownLatch latch = new CountDownLatch(200); public static void main(String[] args) throws IOException, InterruptedException { Properties prop = new Properties(); InputStream propFile = Thread.currentThread().getContextClassLoader() .getResourceAsStream(kafkaproducer.properties); String topic = test; prop.load(propFile); System.out.println(Property: + prop.toString()); StringBuilder builder = new StringBuilder(1024); int msgLenth = 256; for (int i = 0; i msgLenth; i++) builder.append(a); int numberOfProducer = 4; Producer[] producer = new Producer[numberOfProducer]; for (int i = 0; i producer.length; i++) { producer[i] = new KafkaProducer(prop); } ExecutorService service = new ThreadPoolExecutor(numberTh, numberTh, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueueRunnable(numberTh *2)); for(int i = 0 ; i numberTh;i++){ service.execute(new MyProducer(producer,10,builder.toString(), topic)); } latch.await(); System.out.println(All Producers done...!); for (int i = 0; i producer.length; i++) { producer[i].close(); } service.shutdownNow(); System.out.println(All done...!); } static class MyProducer implements Runnable { Producer[] producer; long maxloops; String msg ; String topic; MyProducer(Producer[] list, long maxloops,String msg,String topic){ this.producer = list; this.maxloops = maxloops; this.msg = msg; this.topic = topic; } public void run() { ProducerRecord record = new ProducerRecord(topic, msg.toString().getBytes()); Callback callBack = new MyCallback(); try{ for(long j=0 ; j maxloops ; j++){ try { for (int i = 0; i producer.length; i++) { producer[i].send(record, callBack); } Thread.sleep(10); } catch (Throwable th) { System.err.println(FATAL ); th.printStackTrace(); } } }finally { latch.countDown(); } } } static class MyCallback implements Callback { public void onCompletion(RecordMetadata metadata, Exception exception) { if(exception != null){ System.err.println(Msg dropped..!); exception.printStackTrace(); } } } } {code} This is property file used: {code} # THIS IS FOR NEW PRODUCERS API TRUNK Please see the configuration at https://kafka.apache.org/documentation.html#newproducerconfigs # Broker List bootstrap.servers= BROKERS HERE... #Data Acks acks=1 # 64MB of Buffer for log lines (including all
[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14169415#comment-14169415 ] Bhavesh Mistry edited comment on KAFKA-1642 at 10/13/14 5:05 PM: - {code} import java.io.IOException; import java.io.InputStream; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; public class TestNetworkDownProducer { static int numberTh = 200; static CountDownLatch latch = new CountDownLatch(200); public static void main(String[] args) throws IOException, InterruptedException { Properties prop = new Properties(); InputStream propFile = Thread.currentThread().getContextClassLoader() .getResourceAsStream(kafkaproducer.properties); String topic = test; prop.load(propFile); System.out.println(Property: + prop.toString()); StringBuilder builder = new StringBuilder(1024); int msgLenth = 256; for (int i = 0; i msgLenth; i++) builder.append(a); int numberOfProducer = 4; Producer[] producer = new Producer[numberOfProducer]; for (int i = 0; i producer.length; i++) { producer[i] = new KafkaProducer(prop); } ExecutorService service = new ThreadPoolExecutor(numberTh, numberTh, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueueRunnable(numberTh *2)); for(int i = 0 ; i numberTh;i++){ service.execute(new MyProducer(producer,10,builder.toString(), topic)); } latch.await(); System.out.println(All Producers done...!); for (int i = 0; i producer.length; i++) { producer[i].close(); } service.shutdownNow(); System.out.println(All done...!); } static class MyProducer implements Runnable { Producer[] producer; long maxloops; String msg ; String topic; MyProducer(Producer[] list, long maxloops,String msg,String topic){ this.producer = list; this.maxloops = maxloops; this.msg = msg; this.topic = topic; } public void run() { ProducerRecord record = new ProducerRecord(topic, msg.toString().getBytes()); Callback callBack = new MyCallback(); try{ for(long j=0 ; j maxloops ; j++){ try { for (int i = 0; i producer.length; i++) { producer[i].send(record, callBack); } Thread.sleep(10); } catch (Throwable th) { System.err.println(FATAL ); th.printStackTrace(); } } }finally { latch.countDown(); } } } static class MyCallback implements Callback { public void onCompletion(RecordMetadata metadata, Exception exception) { if(exception != null){ System.err.println(Msg dropped..!); exception.printStackTrace(); } } } } {code} This is property file used: {code} # THIS IS FOR NEW PRODUCERS API TRUNK Please see the configuration at https://kafka.apache.org/documentation.html#newproducerconfigs # Broker List bootstrap.servers= BROKERS HERE... #Data Acks acks=1 # 64MB of Buffer for log lines (including all
[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148121#comment-14148121 ] Bhavesh Mistry edited comment on KAFKA-1642 at 9/25/14 6:42 PM: HI [~jkreps], I will work on the sample program. We are not setting reconnect.backoff.ms and retry.backoff.ms configuration so it would be default configuration. Only thing I can tell you is that I have 4 Producer instances per JVM. So this might amplify issue. Thanks, Bhavesh was (Author: bmis13): HI [~jkreps], I will work on the sample program. We are not setting reconnect.backoff.ms and retry.backoff.ms configuration so it would be default configuration. Only thing I can tell you is that I have 4 Producer instance per JVM. So this might amplify issue. Thanks, Bhavesh [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2 Reporter: Bhavesh Mistry Assignee: Jun Rao I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)