Re: Authorization Engine For Kafka Related to KPI-11
+ Kafka Dev team to see if Kafka Dev team know or recommend any Auth engine for Producers/Consumers. Thanks, Bhavesh Please pardon me, I accidentally send previous blank email. On Tue, Nov 3, 2015 at 9:52 PM, Bhavesh Mistry <mistry.p.bhav...@gmail.com> wrote: > On Sun, Nov 1, 2015 at 11:15 PM, Bhavesh Mistry > <mistry.p.bhav...@gmail.com> wrote: >> HI All, >> >> Have any one used Apache Ranger as Authorization Engine for Kafka Topic >> creation, consumption (read) and write operation on a topic. I am looking >> at having audit log and regulating consumption/ write to particular topic >> (for example, having production environment access does not mean that anyone >> can run console consumer etc on particular topic. Basically, regulate who >> can read/write to a topic as first use case). >> >> https://cwiki.apache.org/confluence/display/RANGER/Apache+Ranger+0.5+-+User+Guide#ApacheRanger0.5-UserGuide-KAFKA >> >> If you have used Apache Ranger in production, I have following question: >> 1) Is there any performance impact with Brokers/Producer/Consumer while >> using Apache Ranger ? >> 2) Is Audit log really useful out-of-box ? or let me know what sort of >> reports you run on audit logs (e.g pumping Apache Ranger audit log into any >> other system for reporting purpose). >> >> Please share your experience using Kafka with any other Authorization engine >> if you are not using Apache Ranger (This is based on >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorization+Interface). >> >> Thanks and looking forward to hear back from Apache Kafka Community members. >> >> Thanks, >> >> Bhavesh
Re: Authorization Engine For Kafka Related to KPI-11
On Sun, Nov 1, 2015 at 11:15 PM, Bhavesh Mistry <mistry.p.bhav...@gmail.com> wrote: > HI All, > > Have any one used Apache Ranger as Authorization Engine for Kafka Topic > creation, consumption (read) and write operation on a topic. I am looking > at having audit log and regulating consumption/ write to particular topic > (for example, having production environment access does not mean that anyone > can run console consumer etc on particular topic. Basically, regulate who > can read/write to a topic as first use case). > > https://cwiki.apache.org/confluence/display/RANGER/Apache+Ranger+0.5+-+User+Guide#ApacheRanger0.5-UserGuide-KAFKA > > If you have used Apache Ranger in production, I have following question: > 1) Is there any performance impact with Brokers/Producer/Consumer while > using Apache Ranger ? > 2) Is Audit log really useful out-of-box ? or let me know what sort of > reports you run on audit logs (e.g pumping Apache Ranger audit log into any > other system for reporting purpose). > > Please share your experience using Kafka with any other Authorization engine > if you are not using Apache Ranger (This is based on > https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorization+Interface). > > Thanks and looking forward to hear back from Apache Kafka Community members. > > Thanks, > > Bhavesh
Re: [Discussion] KIP-34 Add Partitioner Change Listener to Partitioner Interface
HI Becket, Thanks for answering and providing feedback. I will withdraw KIP and put into rejected section. Thanks, Bhavesh On Mon, Sep 21, 2015 at 9:53 AM, Jiangjie Qin <j...@linkedin.com.invalid> wrote: > Hey Bhavesh, > > I kind of think this metadata change capture logic should be implemented by > each user by themselves for the following reasons: > > 1. Most user do not really care about partition change. Adding the > logic/interface to default partitioner means for users who don't care about > the partition change, they are paying the price of making a cluster diff > for each metadata update. For a big cluster, this metadata diff could be > costly depending on how frequent the metadata is refreshed. > > 2. In some cases, user might only care about partition change for some > specific topic, in that case, there is no need to do a cluster diff for all > the topics a producer is producing data to. If the cluster diff is > implemented in user code, it would be more efficient because user can only > check the topic they are interested. Also, different users might care about > different changes in the metadata, e.g. topic create/delete/node change, > etc. So it seems better to leave the actual metadata change capture logic > to user instead of doing it in the producer. > > 3. The cluster diff code itself is short and not complicated so even if > user do it on their own it should be simple. e.g.: > { > if (this.cachedCluster.hashCode() != cluster.hashCode()) > for (String topic : cluster.topics()) { > if (this.cachedCluster.hashCode().contains(topic) && > this.cachedCluster.partitionsForTopic(topic).partition() != > cluster.partitionsForTopic(topic).partition()) > // handle partition change. > } > } > > Thanks, > > Jiangjie (Becket) Qin > > On Mon, Sep 21, 2015 at 9:13 AM, Bhavesh Mistry <mistry.p.bhav...@gmail.com> > wrote: > >> HI Jiagjie, >> >> Thanks for valuable feedback. >> >> 1) Thread Coordination for Change of partitions could be issue. >> >> I do agree with you that coordination between the application thread >> and sender thread would be tough one. The only concern I had was to >> share the same logic you had described among all the partitioner >> interface implementation, and let the Kafka framework level take care >> of doing the diff like you exactly describe >> >> In multithreaded environment, the change listener is being called from >> same thread that just finish the MetaData update will receive. >> >> Metadata Listener: >> >> >> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/Metadata.java#L163 >> >> producer.send() >> >> >> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L370 >> >> But the behavior of onChange() will not be different than what is today. >> >> For example, >> >> public Future send(ProducerRecord<K, V> record, >> Callback callback) { >> >> // Determine partition for message >> >> int partition = partition(record, serializedKey, serializedValue, >> metadata.fetch()); >> >> /** >> >> Metadata update occurs after the application thread determine the >> partition for given method but before adding message to record queue >> the cluster change happened. So In my opinion behavior is same. >> >> ***/RecordAccumulator.RecordAppendResult result = >> accumulator.append(tp, serializedKey, serializedValue, callback); >> >> >> >> What do you think of adding the diff logic as you describe to Default >> Partitioner Implementation or (another implementation class called it >> Change Partitioner class ) which within partition() method calls >> onChange() method and whoever care or needs to know can do what they >> like (Log event, or use that to change partitioning strategy etc). >> >> This give ability to share the diff code and not all implementation >> have to implement diff logic that is main concern. >> >> >> Thanks, >> >> Bhavesh >> >> >> On Fri, Sep 18, 2015 at 3:47 PM, Jiangjie Qin <j...@linkedin.com.invalid> >> wrote: >> > Hey Bhavesh, >> > >> > I think it is useful to notify the user about the partition change. >> > >> > The problem of having a listener in producer is that it is hard to >> > guarantee the synchronization. For example, consider the following >> sequence: >> > 1. producer sender thread refreshes the metadata
Re: [Discussion] KIP-34 Add Partitioner Change Listener to Partitioner Interface
HI Jiagjie, Thanks for valuable feedback. 1) Thread Coordination for Change of partitions could be issue. I do agree with you that coordination between the application thread and sender thread would be tough one. The only concern I had was to share the same logic you had described among all the partitioner interface implementation, and let the Kafka framework level take care of doing the diff like you exactly describe In multithreaded environment, the change listener is being called from same thread that just finish the MetaData update will receive. Metadata Listener: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/Metadata.java#L163 producer.send() https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L370 But the behavior of onChange() will not be different than what is today. For example, public Future send(ProducerRecord<K, V> record, Callback callback) { // Determine partition for message int partition = partition(record, serializedKey, serializedValue, metadata.fetch()); /** Metadata update occurs after the application thread determine the partition for given method but before adding message to record queue the cluster change happened. So In my opinion behavior is same. ***/RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, callback); What do you think of adding the diff logic as you describe to Default Partitioner Implementation or (another implementation class called it Change Partitioner class ) which within partition() method calls onChange() method and whoever care or needs to know can do what they like (Log event, or use that to change partitioning strategy etc). This give ability to share the diff code and not all implementation have to implement diff logic that is main concern. Thanks, Bhavesh On Fri, Sep 18, 2015 at 3:47 PM, Jiangjie Qin <j...@linkedin.com.invalid> wrote: > Hey Bhavesh, > > I think it is useful to notify the user about the partition change. > > The problem of having a listener in producer is that it is hard to > guarantee the synchronization. For example, consider the following sequence: > 1. producer sender thread refreshes the metadata with partition change. > 2. user thread called send with customized partitioner, the partitioner > decided the partition with new metadata refreshed in step 1. > 3. producer sender thread calls onParitionChange() > > At that point, the message sent in step 2 was sent using the new metadata. > If we update the metadata after invoking onParttitionChange(), it is a > little strange because the metadata has not changed yet. > > Also the metadata refresh can happen in caller thread as well, not sure how > it would work with multiple caller thread. > > I am thinking it seems the user can actually get the idea of whether the > cluster has changed or not because the partition() method actually takes a > cluster parameter. So if user cares about the partition number change, they > can do the following: > 1. store a copy of cluster as cache in the partitioner. > 2. when partition() is called, check if the hash of this cluster is the > same as the cached cluster. > 3. If the hash of the passed in cluster is different from the hash of > cached cluster, that means a metadata refresh occurred, people can check if > there is partition change or not before do the partitioning. > > Thanks, > > Jiangjie (Becket) Qin > > On Wed, Sep 16, 2015 at 12:08 AM, Bhavesh Mistry <mistry.p.bhav...@gmail.com >> wrote: > >> Hi Kafka Dev Team, >> >> I would like you get your feedback about adding yet another method or API >> call to onPartitionsChange( ) to Partitioner Interface to get notify about >> partition changes upon metadata refresh. >> >> This will allow custom logic (implementor of Partitioner) to be notified if >> partition ownership or partition online vs offline, or partition >> increase/decrease event happen and when changes were propagated to >> individual producer instance. >> >> Please note this is my first KIP and if process is not followed correctly, >> please do let me know. I will be more than happy to follow or correct >> something that I may have missed. >> >> Thanks in advance and looking forward to your feedback. >> >> Thanks, >> >> Bhavesh >>
[Discussion] KIP-34 Add Partitioner Change Listener to Partitioner Interface
Hi Kafka Dev Team, I would like you get your feedback about adding yet another method or API call to onPartitionsChange( ) to Partitioner Interface to get notify about partition changes upon metadata refresh. This will allow custom logic (implementor of Partitioner) to be notified if partition ownership or partition online vs offline, or partition increase/decrease event happen and when changes were propagated to individual producer instance. Please note this is my first KIP and if process is not followed correctly, please do let me know. I will be more than happy to follow or correct something that I may have missed. Thanks in advance and looking forward to your feedback. Thanks, Bhavesh
Re: New Consumer API and Range Consumption with Fail-over
Hi Jason, Thanks for info. I will implement (by end of next week) what you have proposed. If I encounter any issue, I will let you know. Indeed, adding new API would be uphill battle. I did follow email chain Re: Kafka Consumer thoughts. Thanks, Bhavesh On Wed, Aug 5, 2015 at 10:03 AM, Jason Gustafson ja...@confluent.io wrote: Hey Bhavesh, I think your use case can be handled with the new consumer API in roughly the manner I suggested previously. It might be a little easier if we added the ability to set the end offset for consumption. Perhaps something like this: // stop consumption from the partition when offset is reached void limit(TopicPartition partition, long offset) My guess is that we'd have a bit of an uphill battle to get this into the first release, but it may be possible if the use case is common enough. In any case, I think consuming to the limit offset and manually pausing the partition is a viable alternative. As for your question about fail-over, the new consumer provides a similar capability to the old high-level consumer. Here is a link to the wiki which describes its design: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design -Jason On Tue, Aug 4, 2015 at 12:01 AM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Hi Jason and Kafka Dev Team, First of all thanks for responding and I think you got expected behavior correctly. The use-case is offset range consumption. We store each minute highest offset for each topic per partition. So if we need to reload or re-consume data from yesterday per say 8AM to noon, we would have offset start mapping at 8AM and end offset mapping at noon in Time Series Database. I was trying to load this use case with New Consumer API. Do you or Kafka Dev team agree with request to either have API that takes in topic and its start/end offset for High Level Consumer group (With older consumer API we used Simple consumer before without fail-over). Also, for each range-consumption, there will be different group id and group id will not be reused. The main purpose is to reload or process past data again (due to production bugs or downtime etc occasionally and let main consumer-group continue to consume latest records). void subscribe(TopicPartition[] startOffsetPartitions, TopicPartition[] endOffsetPartitions) or something similar which will allow following: 1) When consumer group already exists (meaning have consumed data and committed offset to storage system either Kafka or ZK) ignore start offset positions and use committed offset. If not committed use start Offset Partition. 2) When partition consumption has reached end Offset for given partition, pause is fine or this assigned thread become fail over or wait for reassignment. 3) When all are Consumer Group is done consuming all partitions offset ranges (start to end), gracefully shutdown entire consumer group. 4) While consuming records, if one of node or consuming thread goes down automatic fail-over to others (Similar to High Level Consumer for OLD Consumer API. I am not sure if there exists High level and/or Simple Consumer concept for New API ) I hope above explanation clarifies use-case and intended behavior. Thanks for clarifications, and you are correct we need pause(TopicPartition tp), resume(TopicPartition tp), and/or API to set to end offset for each partition. Please do let us know your preference to support above simple use-case. Thanks, Bhavesh On Thu, Jul 30, 2015 at 1:23 PM, Jason Gustafson ja...@confluent.io wrote: Hi Bhavesh, I'm not totally sure I understand the expected behavior, but I think this can work. Instead of seeking to the start of the range before the poll loop, you should probably provide a ConsumerRebalanceCallback to get notifications when group assignment has changed (e.g. when one of your nodes dies). When a new partition is assigned, the callback will be invoked by the consumer and you can use it to check if there's a committed position in the range or if you need to seek to the beginning of the range. For example: void onPartitionsAssigned(consumer, partitions) { for (partition : partitions) { try { offset = consumer.committed(partition) consumer.seek(partition, offset) } catch (NoOffsetForPartition) { consumer.seek(partition, rangeStart) } } } If a failure occurs, then the partitions will be rebalanced across whichever consumers are still active. The case of the entire cluster being rebooted is not really different. When the consumers come back, they check the committed position and resume where they left off. Does that make sense? After you are finished consuming a partition's range, you can use
Re: New Consumer API and Range Consumption with Fail-over
Hi Jason and Kafka Dev Team, First of all thanks for responding and I think you got expected behavior correctly. The use-case is offset range consumption. We store each minute highest offset for each topic per partition. So if we need to reload or re-consume data from yesterday per say 8AM to noon, we would have offset start mapping at 8AM and end offset mapping at noon in Time Series Database. I was trying to load this use case with New Consumer API. Do you or Kafka Dev team agree with request to either have API that takes in topic and its start/end offset for High Level Consumer group (With older consumer API we used Simple consumer before without fail-over). Also, for each range-consumption, there will be different group id and group id will not be reused. The main purpose is to reload or process past data again (due to production bugs or downtime etc occasionally and let main consumer-group continue to consume latest records). void subscribe(TopicPartition[] startOffsetPartitions, TopicPartition[] endOffsetPartitions) or something similar which will allow following: 1) When consumer group already exists (meaning have consumed data and committed offset to storage system either Kafka or ZK) ignore start offset positions and use committed offset. If not committed use start Offset Partition. 2) When partition consumption has reached end Offset for given partition, pause is fine or this assigned thread become fail over or wait for reassignment. 3) When all are Consumer Group is done consuming all partitions offset ranges (start to end), gracefully shutdown entire consumer group. 4) While consuming records, if one of node or consuming thread goes down automatic fail-over to others (Similar to High Level Consumer for OLD Consumer API. I am not sure if there exists High level and/or Simple Consumer concept for New API ) I hope above explanation clarifies use-case and intended behavior. Thanks for clarifications, and you are correct we need pause(TopicPartition tp), resume(TopicPartition tp), and/or API to set to end offset for each partition. Please do let us know your preference to support above simple use-case. Thanks, Bhavesh On Thu, Jul 30, 2015 at 1:23 PM, Jason Gustafson ja...@confluent.io wrote: Hi Bhavesh, I'm not totally sure I understand the expected behavior, but I think this can work. Instead of seeking to the start of the range before the poll loop, you should probably provide a ConsumerRebalanceCallback to get notifications when group assignment has changed (e.g. when one of your nodes dies). When a new partition is assigned, the callback will be invoked by the consumer and you can use it to check if there's a committed position in the range or if you need to seek to the beginning of the range. For example: void onPartitionsAssigned(consumer, partitions) { for (partition : partitions) { try { offset = consumer.committed(partition) consumer.seek(partition, offset) } catch (NoOffsetForPartition) { consumer.seek(partition, rangeStart) } } } If a failure occurs, then the partitions will be rebalanced across whichever consumers are still active. The case of the entire cluster being rebooted is not really different. When the consumers come back, they check the committed position and resume where they left off. Does that make sense? After you are finished consuming a partition's range, you can use KafkaConsumer.pause(partition) to prevent further fetches from being initiated while still maintaining the current assignment. The patch to add pause() is not in trunk yet, but it probably will be before too long. One potential problem is that you wouldn't be able to reuse the same group to consume a different range because of the way it depends on the committed offsets. Kafka's commit API actually allows some additional metadata to go along with a committed offset and that could potentially be used to tie the commit to the range, but it's not yet exposed in KafkaConsumer. I assume it will be eventually, but I'm not sure whether that will be part of the initial release. Hope that helps! Jason On Thu, Jul 30, 2015 at 7:54 AM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Hello Kafka Dev Team, With new Consumer API redesign ( https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ), is there a capability to consume given the topic and partition start/ end position. How would I achieve following use case of range consumption with fail-over. Use Case: Ability to reload data given topic and its partition offset start/end with High Level Consumer with fail over. Basically, High Level Range consumption and consumer group dies while main consumer group. Suppose you have a topic called “test-topic” and its partition begin and end offset. { topic: test-topic
New Consumer API and Range Consumption with Fail-over
Hello Kafka Dev Team, With new Consumer API redesign ( https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ), is there a capability to consume given the topic and partition start/ end position. How would I achieve following use case of range consumption with fail-over. Use Case: Ability to reload data given topic and its partition offset start/end with High Level Consumer with fail over. Basically, High Level Range consumption and consumer group dies while main consumer group. Suppose you have a topic called “test-topic” and its partition begin and end offset. { topic: test-topic, [ { partition id : 1 , offset start: 100, offset end: 500,000 }, { partition id : 2 , offset start: 200,000, offset end: 500,000 ….. for n partitions ] } Each you create consumer group: “Range-Consumer “ and use seek method and for each partition. Your feedback is greatly appreciated. In each JVM, For each consumption tread: Consumer c = KafkaConsumer( { group.id=”Range-consumer}…) MapInteger, Integer parttionTOEndOfsetMapping …. for(TopicPartition tp : topicPartitionlist){ seek(TopicPartition(Parition 1), long offset) } while(true){ ConsumerRecords records = consumer.poll(1); // for each record check the offset record = record.iterator().next(); if(parttionTOEndOfsetMapping(record.getPartition()) = record.getoffset) { // consume record //commit offset consumer.commit(CommitType.SYNC); }else { // Should I unsubscribe it now for this partition ? consumer.unscribe(record.getPartition) } } Please let me know if the above approach is valid: 1) how will fail-over work. 2) how Rebooting entire consumer group impacts offset seek ? Since offset are stored by Kafka itsself. Thanks , Bhavesh
Re: [DISCUSSION] Partition Selection and Coordination By Brokers for Producers
Thanks for the info Jun. Each broker (that host topic) has the latest (end) offset for each topic and partition ? If yes, I was planning to use this rate-of-change vs incoming injection rate of producer (only calculated for the attached producer broker) to make decision to which partition would be next optimal partition to inject data. I was not thinking about coordination among brokers (but I agree consumer rate can be dropped). Thanks for feedback, and I will have to study how broker handle ProducerRequest (purgatory stuff). Thanks, Bhavesh On Mon, Jun 1, 2015 at 5:50 PM, Jun Rao j...@confluent.io wrote: Bhavesh, I am not sure if load balancing based on the consumption rate (1.b) makes sense. Each consumer typically consumes all partitions from a topic. So, as long as the data in each partition is balanced, the consumption rate will be balanced too. Selecting a partition based on the size of each partition could be useful, but I am not sure if it's going to be significantly better than just having the clients pick a random partition. Also, implementing this on the broker side has downside. First, having the broker forward each produce request increases the network traffic on the broker. Second, this likely will make the broker code more complicated since we probably have to put every forwarded produce request in a purgatory. Third, we currently don't maintain the size of each partition on every broker. Given these, I think your best bet is probably to just fix those non-java clients to send data in a round robin way. Thanks, Jun On Fri, May 29, 2015 at 1:22 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Hi Kafka Dev Team, I would appreciate your feedback on moving producer partition selection from producer to Broker. Also, please do let me know what is correct process of collecting feedback from Kafka Dev team and/or community. Thanks, Bhavesh On Tue, May 26, 2015 at 11:54 AM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Hi Kafka Dev Team, I am sorry I am new to process of discussion and/or KIP. So, I had commented other email voting chain. Please do let me know correct process for collecting and staring discussion with Kafka Dev Group. Here is original message: I have had experience with both producer and consumer side. I have different use case on this partition selection strategy. Problem : We have heterogeneous environment of producers (by that I mean we have node js, python, New Java Old Scala Based producers to same topic). I have seen that not all producers employ round-robing strategies for non-keyed message like new producer does. Hence, it creates non uniform data ingestion into partition and delay in overall message processing. How to address uniform distribution/message injection rate to all partitions ? Propose Solution: Let broker cluster decide the next partition for topic to send data rather than producer itself with more intelligence. 1) When sending data to brokers (ProduceResponse) Kafka Protocol over the wire send hint to client which partition to send based on following logic (Or can be customizable) a. Based on overall data injection rate for topic and current producer injection rate b. Ability rank partition based on consumer rate (Advance Use Case as there may be many consumers so weighted average etc... ) Untimely, brokers will coordinate among thousand of producers and divert data injection rate (out-of-box feature) and consumption rate (pluggable interface implementation on brokers’ side). The goal here is to attain uniformity and/or lower delivery rate to consumer. This is similar to consumer coordination moving to brokers. The producer side partition selection would also move to brokers. This will benefit both java and non-java clients. Please let me know your feedback on this subject matter. I am sure lots of you run Kafka in Enterprise Environment where you may have different type of producers for same topic (e.g logging client in JavaScript, PHP, Java and Python etc sending to log topic). I would really appreciate your feedback on this. Thanks, Bhavesh
Re: [DISCUSSION] Partition Selection and Coordination By Brokers for Producers
Hi Kafka Dev Team, I would appreciate your feedback on moving producer partition selection from producer to Broker. Also, please do let me know what is correct process of collecting feedback from Kafka Dev team and/or community. Thanks, Bhavesh On Tue, May 26, 2015 at 11:54 AM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Hi Kafka Dev Team, I am sorry I am new to process of discussion and/or KIP. So, I had commented other email voting chain. Please do let me know correct process for collecting and staring discussion with Kafka Dev Group. Here is original message: I have had experience with both producer and consumer side. I have different use case on this partition selection strategy. Problem : We have heterogeneous environment of producers (by that I mean we have node js, python, New Java Old Scala Based producers to same topic). I have seen that not all producers employ round-robing strategies for non-keyed message like new producer does. Hence, it creates non uniform data ingestion into partition and delay in overall message processing. How to address uniform distribution/message injection rate to all partitions ? Propose Solution: Let broker cluster decide the next partition for topic to send data rather than producer itself with more intelligence. 1) When sending data to brokers (ProduceResponse) Kafka Protocol over the wire send hint to client which partition to send based on following logic (Or can be customizable) a. Based on overall data injection rate for topic and current producer injection rate b. Ability rank partition based on consumer rate (Advance Use Case as there may be many consumers so weighted average etc... ) Untimely, brokers will coordinate among thousand of producers and divert data injection rate (out-of-box feature) and consumption rate (pluggable interface implementation on brokers’ side). The goal here is to attain uniformity and/or lower delivery rate to consumer. This is similar to consumer coordination moving to brokers. The producer side partition selection would also move to brokers. This will benefit both java and non-java clients. Please let me know your feedback on this subject matter. I am sure lots of you run Kafka in Enterprise Environment where you may have different type of producers for same topic (e.g logging client in JavaScript, PHP, Java and Python etc sending to log topic). I would really appreciate your feedback on this. Thanks, Bhavesh
[DISCUSSION] Partition Selection and Coordination By Brokers for Producers
Hi Kafka Dev Team, I am sorry I am new to process of discussion and/or KIP. So, I had commented other email voting chain. Please do let me know correct process for collecting and staring discussion with Kafka Dev Group. Here is original message: I have had experience with both producer and consumer side. I have different use case on this partition selection strategy. Problem : We have heterogeneous environment of producers (by that I mean we have node js, python, New Java Old Scala Based producers to same topic). I have seen that not all producers employ round-robing strategies for non-keyed message like new producer does. Hence, it creates non uniform data ingestion into partition and delay in overall message processing. How to address uniform distribution/message injection rate to all partitions ? Propose Solution: Let broker cluster decide the next partition for topic to send data rather than producer itself with more intelligence. 1) When sending data to brokers (ProduceResponse) Kafka Protocol over the wire send hint to client which partition to send based on following logic (Or can be customizable) a. Based on overall data injection rate for topic and current producer injection rate b. Ability rank partition based on consumer rate (Advance Use Case as there may be many consumers so weighted average etc... ) Untimely, brokers will coordinate among thousand of producers and divert data injection rate (out-of-box feature) and consumption rate (pluggable interface implementation on brokers’ side). The goal here is to attain uniformity and/or lower delivery rate to consumer. This is similar to consumer coordination moving to brokers. The producer side partition selection would also move to brokers. This will benefit both java and non-java clients. Please let me know your feedback on this subject matter. I am sure lots of you run Kafka in Enterprise Environment where you may have different type of producers for same topic (e.g logging client in JavaScript, PHP, Java and Python etc sending to log topic). I would really appreciate your feedback on this. Thanks, Bhavesh
Re: [KIP-DISCUSSION] KIP-22 Expose a Partitioner interface in the new producer
Hi All, This might be too late about partitioning strategy and use cases to cover. I have had experience with both producer and consumer side. I have different use case on this partition selection strategy. Problem: We have heterogeneous environment of producers (by that I mean we have node js, python, New Java Old Scala Based producers to same topic). I have seen that not all producers employ round-robing strategies for non-keyed message like new producer does. Hence, it creates non uniform data ingestion into partition and delay in overall message processing. How to address uniform distribution/message injection rate to all partitions ? Propose Solution: Let broker cluster decide the next partition for topic to send data rather than producer itself with more intelligence. 1) When sending data to brokers (ProduceResponse) Kafka Protocol over the wire send hint to client which partition to send based on following logic (Or can be customizable) a. Based on overall data injection rate for topic and current producer injection rate b. Ability rank partition based on consumer rate (Advance Use Case as there may be many consumers so weighted average etc... ) Untimely, brokers will coordinate among thousand of producers and divert data injection rate (out-of-box feature) and consumption rate (pluggable interface implementation on brokers’ side). The goal here is to attain uniformity and/or lower delivery rate to consumer. This is similar to consumer coordination moving to brokers. The producer side partition selection would also move to brokers. This will benefit both java and non-java clients. Please let me know feedback on this subject. Thanks, Bhavesh On Mon, May 18, 2015 at 7:25 AM, Sriharsha Chintalapani harsh...@fastmail.fm wrote: Gianmarco, I’ll send the patch soon. Thanks, Harsha On May 18, 2015 at 1:34:50 AM, Gianmarco De Francisci Morales ( g...@apache.org) wrote: Hi, If everything is in order, can we proceed to implement it? Cheers, -- Gianmarco On 13 May 2015 at 03:06, Jiangjie Qin j...@linkedin.com.invalid wrote: Hi Harsha, If you open this link https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposa ls All the KIPs are the child page of this page which you can see from the left bar. Only KIP-22 is missing. It looks you created it as a child page of https://cwiki.apache.org/confluence/display/KAFKA/Index Thanks. Jiangjie (Becket) Qin On 5/12/15, 3:12 PM, Sriharsha Chintalapani ka...@harsha.io wrote: Hi Jiangjie, Its under https://cwiki.apache.org/confluence/display/KAFKA/KIP-+22++Expose+a+Partit ioner+interface+in+the+new+producer I checked other KIPS they are under /KAFKA as well. Thanks, Harsha On May 12, 2015 at 2:12:30 PM, Jiangjie Qin (j...@linkedin.com.invalid) wrote: Hey Harsha, It looks you created the KIP page at wrong place. . . Can you move the page to a child page of https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Propos a ls Thanks. Jiangjie (Becket) Qin On 5/6/15, 6:12 PM, Harsha ka...@harsha.io wrote: Thanks for the review Joel. I agree don't need a init method we can use configure. I'll update the KIP. -Harsha On Wed, May 6, 2015, at 04:45 PM, Joel Koshy wrote: +1 with a minor comment: do we need an init method given it extends Configurable? Also, can you move this wiki out of drafts and add it to the table in https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Prop o sals? Thanks, Joel On Wed, May 06, 2015 at 07:46:46AM -0700, Sriharsha Chintalapani wrote: Thanks Jay. I removed partitioner.metadata from KIP. I¹ll send an updated patch. -- Harsha Sent with Airmail On May 5, 2015 at 6:31:47 AM, Sriharsha Chintalapani (harsh...@fastmail.fm) wrote: Thanks for the comments everyone. Hi Jay, I do have a question regarding configurable interface on how to pass a MapString, ? properties. I couldn¹t find any other classes using it. JMX reporter overrides it but doesn¹t implement it. So with configurable partitioner how can a user pass in partitioner configuration since its getting instantiated within the producer. Thanks, Harsha On May 4, 2015 at 10:36:45 AM, Jay Kreps (jay.kr...@gmail.com) wrote: Hey Harsha, That proposal sounds good. One minor thing--I don't think we need to have the partitioner.metadata property. Our reason for using string properties is exactly to make config extensible at runtime. So a given partitioner can add whatever properties make sense using the configure() api it defines. -Jay On Sun, May 3, 2015 at 5:57 PM, Harsha ka...@harsha.io wrote: Thanks Jay Gianmarco for the comments. I picked the option A, if user
Re: Need Access to Wiki Page To Create Page for Discussion
Hi Guozhang, Here is link to account https://cwiki.apache.org/confluence/users/viewuserprofile.action?username=bmis13 I just created the account wiki. I was under the impression that jira account is wiki account. Thanks in advance for help ! Thanks, Bhavesh On Wed, May 6, 2015 at 8:33 PM, Guozhang Wang wangg...@gmail.com wrote: Bhavesh, I could not find Bmis13 when adding you to the wiki permission. Could you double check the account id? Guozhang On Wed, May 6, 2015 at 6:47 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Hi Jun, The account id is Bmis13. Thanks, Bhavesh On Wed, May 6, 2015 at 4:52 PM, Jun Rao j...@confluent.io wrote: What your wiki user id? Thanks, Jun On Wed, May 6, 2015 at 11:09 AM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Hi All, I need access to create Discussion or KIP document. Let me know what is process of getting access. Thanks, Bhavesh -- -- Guozhang
Need Access to Wiki Page To Create Page for Discussion
Hi All, I need access to create Discussion or KIP document. Let me know what is process of getting access. Thanks, Bhavesh
Re: Need Access to Wiki Page To Create Page for Discussion
Hi Jun, The account id is Bmis13. Thanks, Bhavesh On Wed, May 6, 2015 at 4:52 PM, Jun Rao j...@confluent.io wrote: What your wiki user id? Thanks, Jun On Wed, May 6, 2015 at 11:09 AM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Hi All, I need access to create Discussion or KIP document. Let me know what is process of getting access. Thanks, Bhavesh
Re: [DISCUSS] New consumer offset commit API
Hi Ewen, Only time I can think of where Application needs to know result of offset was committed or not during graceful shutdown and/or Runtime.addShutdownHook() so consumer application does not get duplicated records upon restart or does not have to deal with eliminating already process offset. Only thing that consumer application will have to handle is after XX retry failure to commit offset. Or would prefer application to manage this last offset commit when offset can not be commit due to failure, connection timeout or any other failure case ? Thanks, Bhavesh On Wed, Apr 22, 2015 at 11:20 AM, Jay Kreps jay.kr...@gmail.com wrote: I second Guozhang's proposal. I do think we need the callback. The current state is that for async commits you actually don't know if it succeeded. However there is a totally valid case where you do need to know if it succeeded but don't need to block, and without the callback you are stuck. I think the futures will likely cause problems since blocking on the future precludes polling which would allow it to complete. -Jay On Wed, Apr 22, 2015 at 11:12 AM, Guozhang Wang wangg...@gmail.com wrote: Hi Ewen, I share the same concern you have about 2), that with the new API sync commit implementation is a bit awkward since we have a single-threaded design in new consumer. The reason that we need to mute other nodes for doing coordinator sync operations like join-group / offset commits / offset fetches is to avoid long blocking due to possible starvation on network selector, so I think they need to be done still. On the other hand, I think users using the commit API will usually fall into three categories: 1) I really care that the offsets to be committed before moving on to fetch more data, so I will wait FOREVER for it to complete. 2) I do not really care about whether it succeeds or not, so just fire commit and let's move on; if it fails it fails (and it will be logged). 3) I care if it succeeds or not, but I do not want to wait indefinitely; so let me know if it does not finish within some timeout or failed (i.e. give me the exceptions / error codes) and I will handle it. The current APIs does not handle case 3) above, which sits between BLOCK FOREVER and DO NOT CARE AT ALL, but most times people would not be very explicit about the exact timeout, but just knowing it is definite and reasonably short is good enough. I think for this we probably do not need an extra timeout / retry settings, but rely on the general request retry settings; similarly we probably do not need cancel. So I wonder if we can do a slightly different modification to API like this: void commit(MapTopicPartition, Long offsets, CommitType type, ConsumerCommitCallback callback); For case 1) people call commit(offsets) which will block forever until it succeeds; For case 2) people call commit(offsets, async) which will return immediately, with not callback upon finishes; For case 3) people call commit(offsets, async, callback), and the callback will be executed when it finishes or #.request retries has exhausted. This API will make much smaller changes to the current implementations as well. Of course if we have a common scenario where users would really care about the exact timeout for async commits, then Future may be a good approach. Guozhang On Thu, Apr 16, 2015 at 1:00 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hey Ewen, This makes sense. People usually do not want to stop consuming when committing offsets. One corner case about async commit with retries I am thinking is that it is possible that two offset commits interleave with each other and that might create problem. Like you said maybe we can cancel the previous one. Another thing is that whether the future mechanism will only be applied to auto commit or it will also be used in manual commit? Because in new consumer we allow user to provide an offset map for offset commit. Simply canceling a previous pending offset commit does not seem to be ideal in this case because the two commits could be for different partitions. Thanks. Jiangjie (Becket) Qin On 4/14/15, 4:31 PM, Ewen Cheslack-Postava e...@confluent.io wrote: I'd like to get some feedback on changing the offset commit API in the new consumer. Since this is user-facing API I wanted to make sure this gets better visibility than the JIRA ( https://issues.apache.org/jira/browse/KAFKA-2123) might. The motivation is to make it possible to do async commits but be able to tell when the commit completes/fails. I'm suggesting changing the API from void commit(Map offsets, CommitType) to FutureVoid commit(MapTopicPartition, Long offsets, ConsumerCommitCallback callback); which matches the approach used for the producer. The
Producer Behavior When one or more Brokers' Disk is Full.
Hello Kafka Community, What is expected behavior on Producer side when one or more Brokers’ disk is full, but have not reached retention period for topics (by size or by time limit). Does producer send data to that particular brokers and/or Producer Queue gets full and always throws Queue Full or based on configuration (I have producer with non-blocking setting when queue is full and ack are 0,1 and retries set to 3). What is expected behavior on OLD [Scala Based] vs Pure Java Based Producer ? Here is reference to past discussion: http://grokbase.com/t/kafka/users/147h4958k8/how-to-recover-from-a-disk-full-situation-in-kafka-cluster Is there wiki or cookbook steps to recover from such situation ? Thanks, Bhavesh
Re: [KIP-DISCUSSION] KIP-13 Quotas
Hi Aditya, I just wanted to give you use case of rate limiting that we have implemented with producer which is a work around: Use Case 1: 1) topic based rate limiting per producer instance (not across multiple instance of producers yet, we have producer which we send Heartbeat and regular message and we do not want to rate limit HB (which very very important data about health of application and it is periodic message does not depend on site traffic) 2) The major goal was to prevent network saturation by Kafka Producer (so limit it at producer before message are send across network to brokers and broker rejecting it no protection to network it-self.) 3) Reset the quota limit per minute (not seconds and can be changed while producer instance is running via configuration management and should not impact producer life-cycle) The design/implementation issues are: 1) Quota enforcement is per producer instance (If application team really really want to more quote they just create multiple instances of producer as work around which defeats the purpose of creating Quota in my opinion ) 2) Before inserting the message into Kafka Memory queue, we count # of bytes (un-compressed bytes) hence we do not have very accurate accounting of quote. I think from producer quota limit, I think we need to consider use case where people just want to limit data at producer and be able to control it regardless of # of producer instances to topic for same JVM and same class loader (like tomcat container). Use Case 2: Based on message key: For example, you are building Linked-In distributed tracing solution, you need sampling based on hash(key) % 100 10% then send else reject it. (Although, you can consider this as application specific or plug-able quota or sampling or a selection of message which app can do prior to producer.send() but none the less another use case at producer side) Let me know your thoughts and suggestions. Thanks, Bhavesh On Wed, Mar 11, 2015 at 10:22 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Todd, Yeah it is kind of weird to do the quota check after taking a request, but since the penalty is applied during that request and it just delays you to the right rate, I think it isn't exactly wrong. I admit it is weird, though. What you say about closing the connection makes sense. The issue is that our current model for connections is totally transient. The clients are supposed to handle any kind of transient connection loss and just re-establish. So basically all existing clients would likely just retry all the same whether you closed the connection or not, so at the moment there would be no way to know a retried request is actually a retry. Your point about the REST proxy is a good one, I don't think we had considered that. Currently the java producer just has a single client.id for all requests so the rest proxy would be a single client. But actually what you want is the original sender to be the client. This is technically very hard to do because the client will actually be batching records from all senders together into one request so the only way to get the client id right would be to make a new producer for each rest proxy client and this would mean a lot of memory and connections. This needs thought, not sure what solution there is. I am not 100% convinced we need to obey the request timeout. The configuration issue actually isn't a problem because the request timeout is sent with the request so the broker actually knows it now even without a handshake. However the question is, if someone sets a pathologically low request timeout do we need to obey it? and if so won't that mean we can't quota them? I claim the answer is no! I think we should redefine request timeout to mean replication timeout, which is actually what it is today. Even today if you interact with a slow server it may take longer than that timeout (say because the fs write queues up for a long-ass time). I think we need a separate client timeout which should be fairly long and unlikely to be hit (default to 30 secs or something). -Jay On Tue, Mar 10, 2015 at 10:12 AM, Todd Palino tpal...@gmail.com wrote: Thanks, Jay. On the interface, I agree with Aditya (and you, I believe) that we don't need to expose the public API contract at this time, but structuring the internal logic to allow for it later with low cost is a good idea. Glad you explained the thoughts on where to hold requests. While my gut reaction is to not like processing a produce request that is over quota, it makes sense to do it that way if you are going to have your quota action be a delay. On the delay, I see your point on the bootstrap cases. However, one of the places I differ, and part of the reason that I prefer the error, is that I would never allow a producer who is over quota to resend a produce request. A producer should identify itself at the start of it's connection, and at that
Re: [KIP-DISCUSSION] Mirror Maker Enhancement
as I still feel we might need the data channel to provide more flexibility, especially after message handler is introduced. I’ve put my thinking of the pros and cons of the two designs in the KIP as well. It’ll be great if you can give a review and comment. Thanks. Jiangjie (Becket) Qin On 2/6/15, 7:30 PM, Neha Narkhede n...@confluent.io wrote: Hey Becket, What are the next steps on this KIP. As per your comment earlier on the thread - I do agree it makes more sense to avoid duplicate effort and plan based on new consumer. I’ll modify the KIP. Did you get a chance to think about the simplified design that we proposed earlier? Do you plan to update the KIP with that proposal? Thanks, Neha On Wed, Feb 4, 2015 at 12:12 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: In mirror maker we do not do de-serialization on the messages. Mirror maker use source TopicPartition hash to chose a producer to send messages from the same source partition. The partition those messages end up with are decided by Partitioner class in KafkaProducer (assuming you are using the new producer), which uses hash code of bytes[]. If deserialization is needed, it has to be done in message handler. Thanks. Jiangjie (Becket) Qin On 2/4/15, 11:33 AM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Hi Jiangjie, Thanks for entertaining my question so far. Last question, I have is about serialization of message key. If the key de-serialization (Class) is not present at the MM instance, then does it use raw byte hashcode to determine the partition ? How are you going to address the situation where key needs to be de-serialization and get actual hashcode needs to be computed ?. Thanks, Bhavesh On Fri, Jan 30, 2015 at 1:41 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hi Bhavesh, Please see inline comments. Jiangjie (Becket) Qin On 1/29/15, 7:00 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Hi Jiangjie, Thanks for the input. a) Is MM will producer ack will be attach to Producer Instance or per topic. Use case is that one instance of MM needs to handle both strong ack and also ack=0 for some topic. Or it would be better to set-up another instance of MM. The acks setting is producer level setting instead of topic level setting. In this case you probably need to set up another instance. b) Regarding TCP connections, Why does #producer instance attach to TCP connection. Is it possible to use Broker Connection TCP Pool, producer will just checkout TCP connection to Broker. So, # of Producer Instance does not correlation to Brokers Connection. Is this possible ? In new producer, each producer maintains a connection to each broker within the producer instance. Making producer instances to share the TCP connections is a very big change to the current design
Re: [DISCUSS] KIP-8 Add a flush method to the new Java producer
Hi All, Thanks Jay and all address concern. I am fine with just having flush() method as long as it covers failure mode and resiliency. e.g We had situation where entire Kafka cluster brokers were reachable, but upon adding new kafka node and admin migrated leader to new brokers that new brokers is NOT reachable from producer stand point due to fire wall but metadata would continue to elect new broker as leader for that partition. All I am asking is either you will have to give-up sending to this broker or do something in this scenario. As for the current code 0.8.2 release, caller thread of flush() or close() method would be blocked for ever so all I am asking is https://issues.apache.org/jira/browse/KAFKA-1659 https://issues.apache.org/jira/browse/KAFKA-1660 Also, I recall that there is timeout also added to batch to indicate how long message can retain in memory before expiring. Given, all this should this API be consistent with others up coming patches for addressing similar problem(s). Otherwise, what we have done is spawn a thread for just calling close() or flush with timeout for join on caller end. Anyway, I just wanted to give you issues with existing API and if you guys think this is fine then, I am ok with this approach. It is just that caller will have to do bit more work. Thanks, Bhavesh On Thursday, February 12, 2015, Joel Koshy jjkosh...@gmail.com wrote: Yes that is a counter-example. I'm okay either way on whether we should have just flush() or have a timeout. Bhavesh, does Jay's explanation a few replies prior address your concern? If so, shall we consider this closed? On Tue, Feb 10, 2015 at 01:36:23PM -0800, Jay Kreps wrote: Yeah we could do that, I guess I just feel like it adds confusion because then you have to think about which timeout you want, when likely you don't want a timeout at all. I guess the pattern I was thinking of was fflush or the java equivalent, which don't have timeouts: http://docs.oracle.com/javase/7/docs/api/java/io/OutputStream.html#flush() -Jay On Tue, Feb 10, 2015 at 10:41 AM, Joel Koshy jjkosh...@gmail.com wrote: I think tryFlush with a timeout sounds good to me. This is really more for consistency than anything else. I cannot think of any standard blocking calls off the top of my head that don't have a timed variant. E.g., Thread.join, Object.wait, Future.get Either that, or they provide an entirely non-blocking mode (e.g., socketChannel.connect followed by finishConnect) Thanks, Joel On Tue, Feb 10, 2015 at 11:30:47AM -0500, Joe Stein wrote: Jay, The .flush() call seems like it would be the best way if you wanted to-do a clean shutdown of the new producer? So, you could in your code stop all incoming requests producer.flush() system.exit(value) and know pretty much you won't drop anything on the floor. This can be done with the callbacks and futures (sure) but .flush() seems to be the right time to block and a few lines of code, no? ~ Joestein On Tue, Feb 10, 2015 at 11:25 AM, Jay Kreps jay.kr...@gmail.com wrote: Hey Bhavesh, If a broker is not available a new one should be elected to take over, so although the flush might take longer it should still be quick. Even if not this should result in an error not a hang. The cases you enumerated are all covered already--if the user wants to retry that is covered by the retry setting in the client, for all the errors that is considered completion of the request. The post condition of flush isn't that all sends complete successfully, just that they complete. So if you try to send a message that is too big, when flush returns calling .get() on the future should not block and should produce the error. Basically the argument I am making is that the only reason you want to call flush() is to guarantee all the sends complete so if it doesn't guarantee that it will be somewhat confusing. This does mean blocking, but if you don't want to block on the send then you wouldn't call flush(). This has no impact on the block.on.buffer full setting. That impacts what happens when send() can't append to the buffer because it is full. flush() means any message previously sent (i.e. for which send() call has returned) needs to have its request completed. Hope that makes sense. -Jay On Mon, Feb 9, 2015 at 11:52 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: HI Jay, Imagine, if you have flaky network connection to brokers, and if flush() will be blocked if one of broker is not available ( basically How would be address failure mode and io thread not able to drain records or busy due to pending request. Do you flush() method is only
Re: Kafka New(Java) Producer Connection reset by peer error and LB
HI Ewen, The root of the problem is leak of TCP connection which idle for while. It is just a log message as you mentioned, but suppose you have 50 or more producer instances created by application and everyone of then will print above log that becomes little concern. We configured producer with bootstrap list as LB:port1 and it is set to TCP port forward to broker:port2. When producer fetches Cluster Metadata and discovers that TCP connection LB:port1 is not part of broker cluster or topology, it should close connection to LB:port1 (In my opinion, this would be expected behavior). As you mentioned, producer behavior is normal and this error is harmless. If you consider this as a bug, please let me know and I will file jira ticket for this. We are on non-release 0.8.2 from trunk. Thanks, Bhavesh Thanks, Bhavesh On Tue, Feb 10, 2015 at 12:29 AM, Ewen Cheslack-Postava e...@confluent.io wrote: Bhavesh, I'm unclear what the impact is here. The line numbers don't match up exactly with trunk or 0.8.2.0, but it looks like this exception is just caught and logged. As far as I can tell the producer would continue to function normally. Does this have any impact on the producer or is the concern just that the exception is being logged? On Mon, Feb 9, 2015 at 11:21 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: HI Kafka Team, Please confirm if you would like to open Jira issue to track this ? Thanks, Bhavesh On Mon, Feb 9, 2015 at 12:39 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Hi Kakfa Team, We are getting this connection reset by pears after couple of minute aster start-up of producer due to infrastructure deployment strategies we have adopted from LinkedIn. We have LB hostname and port as seed server, and all producers are getting following exception because of TCP idle connection timeout set on LB (which is 2 minutes and Kafka TCP connection idle is set to 10 minutes). This seems to be minor bug to close TCP connection after discovering that seed server is not part of brokers list immediately. java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcher.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:198) at sun.nio.ch.IOUtil.read(IOUtil.java:171) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:245) at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:60) at org.apache.kafka.common.network.Selector.poll(Selector.java:242) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:178) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:662) java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcher.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:198) at sun.nio.ch.IOUtil.read(IOUtil.java:171) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:245) at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:60) at org.apache.kafka.common.network.Selector.poll(Selector.java:242) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:178) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:662) Thanks, Bhavesh -- Thanks, Ewen
Re: [DISCUSS] KIP-8 Add a flush method to the new Java producer
HI Jay, Imagine, if you have flaky network connection to brokers, and if flush() will be blocked if one of broker is not available ( basically How would be address failure mode and io thread not able to drain records or busy due to pending request. Do you flush() method is only to flush to in mem queue or flush to broker over the network(). Timeout helps with and pushing caller to handle what to do ? e.g re-enqueue records, drop entire batch or one of message is too big cross the limit of max.message.size etc... Also, according to java doc for API The method will block until all previously sent records have completed sending (either successfully or with an error), does this by-pass rule set by for block.on.buffer.full or batch.size when under load. That was my intention, and I am sorry I mixed-up close() method here without knowing that this is only for bulk send. Thanks, Bhavesh On Mon, Feb 9, 2015 at 8:17 PM, Jay Kreps jay.kr...@gmail.com wrote: Yeah I second the problem Guozhang flags with giving flush a timeout. In general failover in Kafka is a bounded thing unless you have brought your Kafka cluster down entirely so I think depending on that bound implicitly is okay. It is possible to make flush() be instead boolean tryFlush(long timeout, TimeUnit unit); But I am somewhat skeptical that people will use this correctly. I.e consider the mirror maker code snippet I gave above, how would one actually recover in this case other than retrying (which the client already does automatically)? After all if you are okay losing data then you don't need to bother calling flush at all, you can just let the messages be sent asynchronously. I think close() is actually different because you may well want to shutdown immediately and just throw away unsent events. -Jay On Mon, Feb 9, 2015 at 2:44 PM, Guozhang Wang wangg...@gmail.com wrote: The proposal looks good to me, will need some time to review the implementation RB later. Bhavesh, I am wondering how you will use a flush() with a timeout since such a call does not actually provide any flushing guarantees? As for close(), there is a separate JIRA for this: KAFKA-1660 https://issues.apache.org/jira/browse/KAFKA-1660 Guozhang On Mon, Feb 9, 2015 at 2:29 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Hi Jay, How about adding timeout for each method calls flush(timeout,TimeUnit) and close(timeout,TimeUNIT) ? We had runway io thread issue and caller thread should not blocked for ever for these methods ? Thanks, Bhavesh On Sun, Feb 8, 2015 at 12:18 PM, Jay Kreps jay.kr...@gmail.com wrote: Well actually in the case of linger.ms = 0 the send is still asynchronous so calling flush() blocks until all the previously sent records have completed. It doesn't speed anything up in that case, though, since they are already available to send. -Jay On Sun, Feb 8, 2015 at 10:36 AM, Gwen Shapira gshap...@cloudera.com wrote: Looks good to me. I like the idea of not blocking additional sends but not guaranteeing that flush() will deliver them. I assume that with linger.ms = 0, flush will just be a noop (since the queue will be empty). Is that correct? Gwen On Sun, Feb 8, 2015 at 10:25 AM, Jay Kreps jay.kr...@gmail.com wrote: Following up on our previous thread on making batch send a little easier, here is a concrete proposal to add a flush() method to the producer: https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+method+to+the+producer+API A proposed implementation is here: https://issues.apache.org/jira/browse/KAFKA-1865 Thoughts? -Jay -- -- Guozhang
Re: New consumer client
Hi Jay, 1) Sorry to get back to you so late. It is CRC check error on any consumer thread regardless of the server. What happens is I have to catch this exception is skip the message now. There is no option to re-fetch this message. Is there any way to add behavior in Java consumer to re-fetch this offset CRC failed offset. 2) Secondly, can you please add default behavior to auto set 'fetch.message.max.bytes' = broker's message.max.bytes. This will ensure smooth configuration for both simple and high level consumer. This will take burden away from Kafka user to config this property. We had lag issue due to this mis configuration and drop messages on Camus side and (camus has different setting for simple consumer). It would be great to auto config this if user did not supply this in configuration. Let me know if you agree with #2. Thanks, Bhavesh On Mon, Jan 12, 2015 at 9:25 AM, Jay Kreps jay.kr...@gmail.com wrote: Hey Bhavesh, This seems like a serious issue and not one anyone else has reported. I don't know what you mean by corrupt message, are you saying the CRC check fails? If so, that check is done both by the broker (prior to appending to the log) and the consumer so that implies either a bug in the broker or else disk corruption on the server. I do have an option to disable the CRC check in the consumer, though depending on the nature of the corruption that can just lead to more serious errors (depending on what is corrupted). -jay On Sun, Jan 11, 2015 at 11:00 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Hi Jay, One of the pain point of existing consumer code is CORRUPT_MESSAGE occasionally. Right now, it is hard to pin-point the problem of CORRUPT_MESSAGE especially when this happen on Mirror Maker side. Is there any proposal to auto skip corrupted message and have reporting visibility of CRC error(metics etc or traceability to find corruption).per topic etc ? I am not sure if this is correct email thread to address this if not please let me know. Will provide feedback about new consumer api and changes. Thanks, Bhavesh On Sun, Jan 11, 2015 at 7:57 PM, Jay Kreps j...@confluent.io wrote: I uploaded an updated version of the new consumer client ( https://issues.apache.org/jira/browse/KAFKA-1760). This is now almost feature complete, and has pretty reasonable testing and metrics. I think it is ready for review and could be checked in once 0.8.2 is out. For those who haven't been following this is meant to be a new consumer client, like the new producer is 0.8.2, and intended to replace the existing high level and simple scala consumers. This still needs the server-side implementation of the partition assignment and group management to be fully functional. I have just stubbed this out in the server to allow the implementation and testing of the server but actual usage will require it. However the client that exists now is actually a fully functional replacement for the simple consumer that is vastly easier to use correctly as it internally does all the discovery and failover. It would be great if people could take a look at this code, and particularly at the public apis which have several small changes from the original proposal. Summary What's there: 1. Simple consumer functionality 2. Offset commit and fetch 3. Ability to change position with seek 4. Ability to commit all or just some offsets 5. Controller discovery, failure detection, heartbeat, and fail-over 6. Controller partition assignment 7. Logging 8. Metrics 9. Integration tests including tests that simulate random broker failures 10. Integration into the consumer performance test Limitations: 1. There could be some lingering bugs in the group management support, it is hard to fully test fully with just the stub support on the server, so we'll need to get the server working to do better I think. 2. I haven't implemented wild-card subscriptions yet. 3. No integration with console consumer yet Performance I did some performance comparison with the old consumer over localhost on my laptop. Usually localhost isn't good for testing but in this case it is good because it has near infinite bandwidth so it does a good job at catching inefficiencies that would be hidden with a slower network. These numbers probably aren't representative of what you would get over a real network, but help bring out the relative efficiencies. Here are the results: - Old high-level consumer: 213 MB/sec - New consumer: 225 MB/sec - Old simple consumer: 242 Mb/sec It may be hard to get this client up to the same point as the simple consumer as it is doing very little beyond allocating and wrapping byte buffers that it reads off the network. The big thing that shows up in profiling is the buffer
Re: Kafka New(Java) Producer Connection reset by peer error and LB
HI Kafka Team, Please confirm if you would like to open Jira issue to track this ? Thanks, Bhavesh On Mon, Feb 9, 2015 at 12:39 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Hi Kakfa Team, We are getting this connection reset by pears after couple of minute aster start-up of producer due to infrastructure deployment strategies we have adopted from LinkedIn. We have LB hostname and port as seed server, and all producers are getting following exception because of TCP idle connection timeout set on LB (which is 2 minutes and Kafka TCP connection idle is set to 10 minutes). This seems to be minor bug to close TCP connection after discovering that seed server is not part of brokers list immediately. java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcher.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:198) at sun.nio.ch.IOUtil.read(IOUtil.java:171) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:245) at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:60) at org.apache.kafka.common.network.Selector.poll(Selector.java:242) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:178) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:662) java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcher.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:198) at sun.nio.ch.IOUtil.read(IOUtil.java:171) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:245) at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:60) at org.apache.kafka.common.network.Selector.poll(Selector.java:242) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:178) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:662) Thanks, Bhavesh
Kafka New(Java) Producer Connection reset by peer error and LB
Hi Kakfa Team, We are getting this connection reset by pears after couple of minute aster start-up of producer due to infrastructure deployment strategies we have adopted from LinkedIn. We have LB hostname and port as seed server, and all producers are getting following exception because of TCP idle connection timeout set on LB (which is 2 minutes and Kafka TCP connection idle is set to 10 minutes). This seems to be minor bug to close TCP connection after discovering that seed server is not part of brokers list immediately. java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcher.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:198) at sun.nio.ch.IOUtil.read(IOUtil.java:171) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:245) at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:60) at org.apache.kafka.common.network.Selector.poll(Selector.java:242) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:178) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:662) java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcher.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:198) at sun.nio.ch.IOUtil.read(IOUtil.java:171) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:245) at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:60) at org.apache.kafka.common.network.Selector.poll(Selector.java:242) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:178) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:662) Thanks, Bhavesh
Re: [DISCUSS] KIP-8 Add a flush method to the new Java producer
Hi Jay, How about adding timeout for each method calls flush(timeout,TimeUnit) and close(timeout,TimeUNIT) ? We had runway io thread issue and caller thread should not blocked for ever for these methods ? Thanks, Bhavesh On Sun, Feb 8, 2015 at 12:18 PM, Jay Kreps jay.kr...@gmail.com wrote: Well actually in the case of linger.ms = 0 the send is still asynchronous so calling flush() blocks until all the previously sent records have completed. It doesn't speed anything up in that case, though, since they are already available to send. -Jay On Sun, Feb 8, 2015 at 10:36 AM, Gwen Shapira gshap...@cloudera.com wrote: Looks good to me. I like the idea of not blocking additional sends but not guaranteeing that flush() will deliver them. I assume that with linger.ms = 0, flush will just be a noop (since the queue will be empty). Is that correct? Gwen On Sun, Feb 8, 2015 at 10:25 AM, Jay Kreps jay.kr...@gmail.com wrote: Following up on our previous thread on making batch send a little easier, here is a concrete proposal to add a flush() method to the producer: https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+method+to+the+producer+API A proposed implementation is here: https://issues.apache.org/jira/browse/KAFKA-1865 Thoughts? -Jay
[Discussion] Producer Instance and Decouple Kafka Cluster State/TCP Connection Management
Hi Kafka Dev team, I would like to discuss the Kafka Cluster State Management and Producer Instance relationship in 0.8.2. Current Implementation of Producer ties very closely with Kafka Cluster, and Topic Metadata management. So imagine, you have following scenarios: Application crates multiple instance of Producers for same Kafka cluster for same topic or more topic (for the same cluster) to get better throughput or to avoid blocking calls or avoid synchronization issue ( by key hash to same partition etc). It would be great if design and implementation of Cluster State/TCP connection management and Producer Instance are decoupled with Producer Instance. Suppose, we are doing aggregation using Kafka and we wanted to avoid Dequeue Sync call for performance reason ( I would have to create # producer = # of partition which can be many each one will manage cluster and TCP state separately ) . All I am asking, is it possible to have low level TCP connection pooling/ Cluster State Management and Topic Meta Data Management separately then encapsulating everything within Producer Instance. In my case , I would create io thread per partition and dump data one to one mapping to avoid blocking/sync calls.Having public API for Cluster/TCP connection management and Kafka Java Protocol will allow advance way to achieve higher throughput at expense of CPU/memory which application control. I am referring to my experience with https://issues.apache.org/jira/browse/KAFKA-1710 Let me know you’re taught process. Thanks, Bhavesh
Re: [KIP-DISCUSSION] Mirror Maker Enhancement
Hi Jiangjie, Thanks for entertaining my question so far. Last question, I have is about serialization of message key. If the key de-serialization (Class) is not present at the MM instance, then does it use raw byte hashcode to determine the partition ? How are you going to address the situation where key needs to be de-serialization and get actual hashcode needs to be computed ?. Thanks, Bhavesh On Fri, Jan 30, 2015 at 1:41 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hi Bhavesh, Please see inline comments. Jiangjie (Becket) Qin On 1/29/15, 7:00 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Hi Jiangjie, Thanks for the input. a) Is MM will producer ack will be attach to Producer Instance or per topic. Use case is that one instance of MM needs to handle both strong ack and also ack=0 for some topic. Or it would be better to set-up another instance of MM. The acks setting is producer level setting instead of topic level setting. In this case you probably need to set up another instance. b) Regarding TCP connections, Why does #producer instance attach to TCP connection. Is it possible to use Broker Connection TCP Pool, producer will just checkout TCP connection to Broker. So, # of Producer Instance does not correlation to Brokers Connection. Is this possible ? In new producer, each producer maintains a connection to each broker within the producer instance. Making producer instances to share the TCP connections is a very big change to the current design, so I suppose we won’t be able to do that. Thanks, Bhavesh On Thu, Jan 29, 2015 at 11:50 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hi Bhavesh, I think it is the right discussion to have when we are talking about the new new design for MM. Please see the inline comments. Jiangjie (Becket) Qin On 1/28/15, 10:48 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Hi Jiangjie, I just wanted to let you know about our use case and stress the point that local data center broker cluster have fewer partitions than the destination offline broker cluster. Just because we do the batch pull from CAMUS and in order to drain data faster than the injection rate (from four DCs for same topic). Keeping the same partition number in source and target cluster will be an option but will not be enforced by default. We are facing following issues (probably due to configuration): 1) We occasionally loose data due to message batch size is too large (2MB) on target data (we are using old producer but I think new producer will solve this problem to some extend). We do see this issue in LinkedIn as well. New producer also might have this issue. There are some proposal of solutions, but no real work started yet. For now, as a workaround, setting a more aggressive batch size on producer side should work. 2) Since only one instance is set to MM data, we are not able to set-up ack per topic instead ack is attached to producer instance. I don’t quite get the question here. 3) How are you going to address two phase commit problem if ack is set to strongest, but auto commit is on for consumer (meaning producer does not get ack, but consumer auto committed offset that message). Is there transactional (Kafka transaction is in process) based ack and commit offset ? Auto offset commit should be turned off in this case. The offset will only be committed once by the offset commit thread. So there is no two phase commit. 4) How are you planning to avoid duplicated message? ( Is brokergoing have moving window of message collected and de-dupe ?) Possibly, we get this from retry set to 5…? We are not trying to completely avoid duplicates. The duplicates will still be there if: 1. Producer retries on failure. 2. Mirror maker is hard killed. Currently, dedup is expected to be done by user if necessary. 5) Last, is there any warning or any thing you can provide insight from MM component about data injection rate into destination partitions is NOT evenly distributed regardless of keyed or non-keyed message (Hence there is ripple effect such as data not arriving late, or data is arriving out of order in intern of time stamp and early some time, and CAMUS creates huge number of file count on HDFS due to uneven injection rate . Camus Job is configured to run every 3 minutes.) I think uneven data distribution is typically caused by server side unbalance, instead of something mirror maker could control. In new mirror maker, however, there is a customizable message handler, that might be able to help a little bit. In message handler, you can explicitly set a partition that you want to produce the message to. So if you know the uneven data distribution in target cluster, you may offset it here. But that probably only works for non-keyed messages
Re: [KIP-DISCUSSION] Mirror Maker Enhancement
Hi Jiangjie, Thanks for the input. a) Is MM will producer ack will be attach to Producer Instance or per topic. Use case is that one instance of MM needs to handle both strong ack and also ack=0 for some topic. Or it would be better to set-up another instance of MM. b) Regarding TCP connections, Why does #producer instance attach to TCP connection. Is it possible to use Broker Connection TCP Pool, producer will just checkout TCP connection to Broker. So, # of Producer Instance does not correlation to Brokers Connection. Is this possible ? Thanks, Bhavesh On Thu, Jan 29, 2015 at 11:50 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hi Bhavesh, I think it is the right discussion to have when we are talking about the new new design for MM. Please see the inline comments. Jiangjie (Becket) Qin On 1/28/15, 10:48 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Hi Jiangjie, I just wanted to let you know about our use case and stress the point that local data center broker cluster have fewer partitions than the destination offline broker cluster. Just because we do the batch pull from CAMUS and in order to drain data faster than the injection rate (from four DCs for same topic). Keeping the same partition number in source and target cluster will be an option but will not be enforced by default. We are facing following issues (probably due to configuration): 1) We occasionally loose data due to message batch size is too large (2MB) on target data (we are using old producer but I think new producer will solve this problem to some extend). We do see this issue in LinkedIn as well. New producer also might have this issue. There are some proposal of solutions, but no real work started yet. For now, as a workaround, setting a more aggressive batch size on producer side should work. 2) Since only one instance is set to MM data, we are not able to set-up ack per topic instead ack is attached to producer instance. I don’t quite get the question here. 3) How are you going to address two phase commit problem if ack is set to strongest, but auto commit is on for consumer (meaning producer does not get ack, but consumer auto committed offset that message). Is there transactional (Kafka transaction is in process) based ack and commit offset ? Auto offset commit should be turned off in this case. The offset will only be committed once by the offset commit thread. So there is no two phase commit. 4) How are you planning to avoid duplicated message? ( Is brokergoing have moving window of message collected and de-dupe ?) Possibly, we get this from retry set to 5…? We are not trying to completely avoid duplicates. The duplicates will still be there if: 1. Producer retries on failure. 2. Mirror maker is hard killed. Currently, dedup is expected to be done by user if necessary. 5) Last, is there any warning or any thing you can provide insight from MM component about data injection rate into destination partitions is NOT evenly distributed regardless of keyed or non-keyed message (Hence there is ripple effect such as data not arriving late, or data is arriving out of order in intern of time stamp and early some time, and CAMUS creates huge number of file count on HDFS due to uneven injection rate . Camus Job is configured to run every 3 minutes.) I think uneven data distribution is typically caused by server side unbalance, instead of something mirror maker could control. In new mirror maker, however, there is a customizable message handler, that might be able to help a little bit. In message handler, you can explicitly set a partition that you want to produce the message to. So if you know the uneven data distribution in target cluster, you may offset it here. But that probably only works for non-keyed messages. I am not sure if this is right discussion form to bring these to your/kafka Dev team attention. This might be off track, Thanks, Bhavesh On Wed, Jan 28, 2015 at 11:07 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: I’ve updated the KIP page. Feedbacks are welcome. Regarding the simple mirror maker design. I thought over it and have some worries: There are two things that might worth thinking: 1. One of the enhancement to mirror maker is adding a message handler to do things like reformatting. I think we might potentially want to have more threads processing the messages than the number of consumers. If we follow the simple mirror maker solution, we lose this flexibility. 2. This might not matter too much, but creating more consumers means more footprint of TCP connection / memory. Any thoughts on this? Thanks. Jiangjie (Becket) Qin On 1/26/15, 10:35 AM, Jiangjie Qin j...@linkedin.com wrote: Hi Jay and Neha, Thanks a lot for the reply and explanation. I do agree it makes more sense to avoid duplicate effort and plan based
Re: [KIP-DISCUSSION] Mirror Maker Enhancement
Hi Jiangjie, I just wanted to let you know about our use case and stress the point that local data center broker cluster have fewer partitions than the destination offline broker cluster. Just because we do the batch pull from CAMUS and in order to drain data faster than the injection rate (from four DCs for same topic). We are facing following issues (probably due to configuration): 1) We occasionally loose data due to message batch size is too large (2MB) on target data (we are using old producer but I think new producer will solve this problem to some extend). 2) Since only one instance is set to MM data, we are not able to set-up ack per topic instead ack is attached to producer instance. 3) How are you going to address two phase commit problem if ack is set to strongest, but auto commit is on for consumer (meaning producer does not get ack, but consumer auto committed offset that message). Is there transactional (Kafka transaction is in process) based ack and commit offset ? 4) How are you planning to avoid duplicated message? ( Is brokergoing have moving window of message collected and de-dupe ?) Possibly, we get this from retry set to 5…? 5) Last, is there any warning or any thing you can provide insight from MM component about data injection rate into destination partitions is NOT evenly distributed regardless of keyed or non-keyed message (Hence there is ripple effect such as data not arriving late, or data is arriving out of order in intern of time stamp and early some time, and CAMUS creates huge number of file count on HDFS due to uneven injection rate . Camus Job is configured to run every 3 minutes.) I am not sure if this is right discussion form to bring these to your/kafka Dev team attention. This might be off track, Thanks, Bhavesh On Wed, Jan 28, 2015 at 11:07 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: I’ve updated the KIP page. Feedbacks are welcome. Regarding the simple mirror maker design. I thought over it and have some worries: There are two things that might worth thinking: 1. One of the enhancement to mirror maker is adding a message handler to do things like reformatting. I think we might potentially want to have more threads processing the messages than the number of consumers. If we follow the simple mirror maker solution, we lose this flexibility. 2. This might not matter too much, but creating more consumers means more footprint of TCP connection / memory. Any thoughts on this? Thanks. Jiangjie (Becket) Qin On 1/26/15, 10:35 AM, Jiangjie Qin j...@linkedin.com wrote: Hi Jay and Neha, Thanks a lot for the reply and explanation. I do agree it makes more sense to avoid duplicate effort and plan based on new consumer. I’ll modify the KIP. To Jay’s question on message ordering - The data channel selection makes sure that the messages from the same source partition will sent by the same producer. So the order of the messages is guaranteed with proper producer settings (MaxInFlightRequests=1,retries=Integer.MaxValue, etc.) For keyed messages, because they come from the same source partition and will end up in the same target partition, as long as they are sent by the same producer, the order is guaranteed. For non-keyed messages, the messages coming from the same source partition might go to different target partitions. The order is only guaranteed within each partition. Anyway, I’ll modify the KIP and data channel will be away. Thanks. Jiangjie (Becket) Qin On 1/25/15, 4:34 PM, Neha Narkhede n...@confluent.io wrote: I think there is some value in investigating if we can go back to the simple mirror maker design, as Jay points out. Here you have N threads, each has a consumer and a producer. The reason why we had to move away from that was a combination of the difference in throughput between the consumer and the old producer and the deficiency of the consumer rebalancing that limits the total number of mirror maker threads. So the only option available was to increase the throughput of the limited # of mirror maker threads that could be deployed. Now that queuing design may not make sense, if the new producer's throughput is almost similar to the consumer AND the fact that the new round-robin based consumer rebalancing can allow a very high number of mirror maker instances to exist. This is the end state that the mirror maker should be in once the new consumer is complete, so it wouldn't hurt to see if we can just move to that right now. On Fri, Jan 23, 2015 at 8:40 PM, Jay Kreps jay.kr...@gmail.com wrote: QQ: If we ever use a different technique for the data channel selection than for the producer partitioning won't that break ordering? How can we ensure these things stay in sync? With respect to the new consumer--I really do want to encourage people to think through how MM will work with the new consumer. I mean this isn't very far
Re: [kafka-clients] Re: [VOTE] 0.8.2.0 Candidate 2 (with the correct links)
Hi Kafka Team, I just wanted to bring this to your attention regarding Java New Producer limitation compare to old producer. a) Partition Increasing is limited to configured memory allocation. buffer.memory batch.size The maximum partition you could have before impacting (New Java Producers) producers is buffer.memory / batch.size. So Developer can plan for horizontal scaling partition from the beginning otherwise production running code will be impacted based on *block.on.buffer.full configuration *(block or BufferExhaustedException). This limitation does not exits with old scala based Producer. This will allow user community to buffer more and plan the capacity before hand. May be add this info http://kafka.apache.org/documentation.html#newproducerconfigs about limitation. Thanks, Bhavesh On Mon, Jan 26, 2015 at 10:28 AM, Joe Stein joe.st...@stealth.ly wrote: +1 (binding) artifacts and quick start look good. I ran in some client code, minor edits from 0-8.2-beta https://github.com/stealthly/scala-kafka/pull/26 On Mon, Jan 26, 2015 at 3:38 AM, Manikumar Reddy ku...@nmsworks.co.in wrote: +1 (Non-binding) Verified source package, unit tests, release build, topic deletion, compaction and random testing On Mon, Jan 26, 2015 at 6:14 AM, Neha Narkhede n...@confluent.io wrote: +1 (binding) Verified keys, quick start, unit tests. On Sat, Jan 24, 2015 at 4:26 PM, Joe Stein joe.st...@stealth.ly wrote: That makes sense, thanks! On Sat, Jan 24, 2015 at 7:00 PM, Jay Kreps jay.kr...@gmail.com wrote: But I think the flaw in trying to guess what kind of serializer they will use is when we get it wrong. Basically let's say we guess String. Say 30% of the time we will be right and we will save the two configuration lines. 70% of the time we will be wrong and the user gets a super cryptic ClassCastException: xyz cannot be cast to [B (because [B is how java chooses to display the byte array class just to up the pain), then they figure out how to subscribe to our mailing list and email us the cryptic exception, then we explain about how we helpfully set these properties for them to save them time. :-) https://www.google.com/?gws_rd=ssl#q=kafka+classcastexception+%22%5BB%22 I think basically we did this experiment with the old clients and the conclusion is that serialization is something you basically have to think about to use Kafka and trying to guess just makes things worse. -Jay On Sat, Jan 24, 2015 at 2:51 PM, Joe Stein joe.st...@stealth.ly wrote: Maybe. I think the StringSerialzer could look more like a typical type of message. Instead of encoding being a property it would be more typically just written in the bytes. On Sat, Jan 24, 2015 at 12:12 AM, Jay Kreps jay.kr...@gmail.com wrote: I don't think so--see if you buy my explanation. We previously defaulted to the byte array serializer and it was a source of unending frustration and confusion. Since it wasn't a required config people just went along plugging in whatever objects they had, and thinking that changing the parametric types would somehow help. Then they would get a class case exception and assume our stuff was somehow busted, not realizing we had helpfully configured a type different from what they were passing in under the covers. So I think it is actually good for people to think: how am I serializing my data, and getting that exception will make them ask that question right? -Jay On Fri, Jan 23, 2015 at 9:06 PM, Joe Stein joe.st...@stealth.ly wrote: Should value.serializer in the new java producer be defaulted to Array[Byte] ? I was working on testing some upgrade paths and got this ! return exception in callback when buffer cannot accept message ConfigException: Missing required configuration value.serializer which has no default value. (ConfigDef.java:124) org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124) org.apache.kafka.common.config.AbstractConfig.init(AbstractConfig.java:48) org.apache.kafka.clients.producer.ProducerConfig.init(ProducerConfig.java:235) org.apache.kafka.clients.producer.KafkaProducer.init(KafkaProducer.java:129) ly.stealth.testing.BaseSpec$class.createNewKafkaProducer(BaseSpec.scala:42) ly.stealth.testing.KafkaSpec.createNewKafkaProducer(KafkaSpec.scala:36) ly.stealth.testing.KafkaSpec$$anonfun$3$$anonfun$apply$37.apply(KafkaSpec.scala:175) ly.stealth.testing.KafkaSpec$$anonfun$3$$anonfun$apply$37.apply(KafkaSpec.scala:170)
Re: Latency Tracking Across All Kafka Component
HI Kafka Team, I completely understand the use of the Audit event and reference material posted here https://issues.apache.org/jira/browse/KAFKA-260 and slides . Since we are enterprise customer of the Kafka end-to-end pipeline, it would be great if Kafka have build-in support for distributive tracing. Here is how I envision Kakfa Distributive Tracing: 1) Would like to see the end to end journey for each major hops (including major component within same JVM ( https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Internals eg network layer, API layer, Replication, and Log Sub System etc). Once an app team produces audit log message, it will contain GUID and ability to trace its journey through producer (queue) –network-to-broker (request channel, to API layer, disk commit to consumer read etc). This gives both Kafka Customers (Operations) and Developers ability to trace event journey and zoom into component which is bottle neck. Of course, the use case can be expended to have aggregated call graph for entire pipeline. (far fetch vision). Here are couple of reference were other company are using for tracing distributive system. http://static.googleusercontent.com/media/research.google.com/en/us/pubs/archive/36356.pdf https://blog.twitter.com/2012/distributed-systems-tracing-with-zipkin eBay Transactional Logger (Distributed Tree Logging) http://server.dzone.com/articles/monitoring-ebay-big-data https://devopsdotcom.files.wordpress.com/2012/11/screen-shot-2012-11-11-at-10-06-39-am.png UI for tracking the Audit event: http://4.bp.blogspot.com/-b0r71ZbJdmA/T9DYhbE0uXI/ABs/bXwyM76Iddc/s1600/web-screenshot.png This is how I would implement: Each of Kafka component logs its Transactional log for audit event into disk → Agent (Flume, Logstash etc) sends those pre-formatted(Structure) logs to → Elastic Search so people can search by the GUID and produce call graph similar to Zipkin or Chrome resource TimeLine View of Event where it spent time etc. This would be powerful tool for both Kafka Development team for customers who have latency issues. This requires lots of effort and code instrumentation. It would be cool if Kafka team at least gets started with distributive tracing functionality. I am sorry I got back to you so late. Thanks, Bhavesh On Thu, Jan 15, 2015 at 4:01 PM, Guozhang Wang wangg...@gmail.com wrote: Hi, At LinkedIn we used an audit module to track the latency / message counts at each tier of the pipeline (for your example it will have the producer / local / central / HDFS tiers). Some details can be found on our recent talk slides (slide 41/42): http://www.slideshare.net/GuozhangWang/apache-kafka-at-linkedin-43307044 This audit is specific to the usage of Avro as our serialization tool though, and we are considering ways to get it generalized hence open-sourced. Guozhang On Mon, Jan 5, 2015 at 3:33 PM, Otis Gospodnetic otis.gospodne...@gmail.com wrote: Hi, That sounds a bit like needing a full, cross-app, cross-network transaction/call tracing, and not something specific or limited to Kafka, doesn't it? Otis -- Monitoring * Alerting * Anomaly Detection * Centralized Log Management Solr Elasticsearch Support * http://sematext.com/ On Mon, Jan 5, 2015 at 2:43 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Hi Kafka Team/Users, We are using Linked-in Kafka data pipe-line end-to-end. Producer(s) -Local DC Brokers - MM - Central brokers - Camus Job - HDFS This is working out very well for us, but we need to have visibility of latency at each layer (Local DC Brokers - MM - Central brokers - Camus Job - HDFS). Our events are time-based (time event was produce). Is there any feature or any audit trail mentioned at ( https://github.com/linkedin/camus/) ? But, I would like to know in-between latency and time event spent in each hope? So, we do not know where is problem and what t o optimize ? Any of this cover in 0.9.0 or any other version of upcoming Kafka release ? How might we achive this latency tracking across all components ? Thanks, Bhavesh -- -- Guozhang
Kafka Cluster Monitoring and Documentation of Internals (JMX Metrics) of Rejected Events
Hi Kafka Team, I am trying to find out Kafka Internal and how a message can be corrupted or lost at brokers side. I have refer to following documentations for monitoring: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Internals http://kafka.apache.org/documentation.html#monitoring I am looking at following beans: kafka.server:type=BrokerTopicMetrics,name=test-FailedProduceRequestsPerSec kafka.server:type=BrokerTopicMetrics,name=test-BytesRejectedPerSec I see following exception on Brokers side rejecting due to large request. This is great but it does not show the source ip of prodcuer that caused this issue ? Is there any way to log and capture this ? *[2014-10-14 22:09:53,262] ERROR [KafkaApi-2] Error processing ProducerRequest with correlation id 28795280 from client Xon partition [XXX,17] (kafka.server.KafkaApis)kafka.common.MessageSizeTooLargeException: Message size is 2924038 bytes which exceeds the maximum configured message size of 2097152. * Can you this be reported as separate metric MessageSizeTooLargeException per topic ? Also, what is best way to find the CRC check error from the consumer side ? How do you debug this ? e.g log line: *11 Dec 2014 07:22:33,387 ERROR [pool-15-thread-4] * *kafka.message.InvalidMessageException: Message is corrupt (stored crc = 1834644195, computed crc = 2374999037)* Also, is there any jira open to update with list all latest metrics and its format and what it means ? http://kafka.apache.org/documentation.html#monitoring. Please see attached image for list of all metrics. Version of Broker is 0.8.1.1. Thanks, Bhavesh
Re: New consumer client
Hi Jay, One of the pain point of existing consumer code is CORRUPT_MESSAGE occasionally. Right now, it is hard to pin-point the problem of CORRUPT_MESSAGE especially when this happen on Mirror Maker side. Is there any proposal to auto skip corrupted message and have reporting visibility of CRC error(metics etc or traceability to find corruption).per topic etc ? I am not sure if this is correct email thread to address this if not please let me know. Will provide feedback about new consumer api and changes. Thanks, Bhavesh On Sun, Jan 11, 2015 at 7:57 PM, Jay Kreps j...@confluent.io wrote: I uploaded an updated version of the new consumer client ( https://issues.apache.org/jira/browse/KAFKA-1760). This is now almost feature complete, and has pretty reasonable testing and metrics. I think it is ready for review and could be checked in once 0.8.2 is out. For those who haven't been following this is meant to be a new consumer client, like the new producer is 0.8.2, and intended to replace the existing high level and simple scala consumers. This still needs the server-side implementation of the partition assignment and group management to be fully functional. I have just stubbed this out in the server to allow the implementation and testing of the server but actual usage will require it. However the client that exists now is actually a fully functional replacement for the simple consumer that is vastly easier to use correctly as it internally does all the discovery and failover. It would be great if people could take a look at this code, and particularly at the public apis which have several small changes from the original proposal. Summary What's there: 1. Simple consumer functionality 2. Offset commit and fetch 3. Ability to change position with seek 4. Ability to commit all or just some offsets 5. Controller discovery, failure detection, heartbeat, and fail-over 6. Controller partition assignment 7. Logging 8. Metrics 9. Integration tests including tests that simulate random broker failures 10. Integration into the consumer performance test Limitations: 1. There could be some lingering bugs in the group management support, it is hard to fully test fully with just the stub support on the server, so we'll need to get the server working to do better I think. 2. I haven't implemented wild-card subscriptions yet. 3. No integration with console consumer yet Performance I did some performance comparison with the old consumer over localhost on my laptop. Usually localhost isn't good for testing but in this case it is good because it has near infinite bandwidth so it does a good job at catching inefficiencies that would be hidden with a slower network. These numbers probably aren't representative of what you would get over a real network, but help bring out the relative efficiencies. Here are the results: - Old high-level consumer: 213 MB/sec - New consumer: 225 MB/sec - Old simple consumer: 242 Mb/sec It may be hard to get this client up to the same point as the simple consumer as it is doing very little beyond allocating and wrapping byte buffers that it reads off the network. The big thing that shows up in profiling is the buffer allocation for reading data. So one speed-up would be to pool these. Some things to discuss 1. What should the behavior of consumer.position() and consumer.committed() be immediately after initialization (prior to calling poll). Currently these methods just fetch the current value from memory, but if the position isn't in memory it will try to fetch it from the server, if no position is found it will use the auto-offset reset policy to pick on. I think this is the right thing to do because you can't guarantee how many calls to poll() will be required before full initialization would be complete otherwise. But it is kind of weird. 2. Overall code structure improvement. These NIO network clients tend to be very imperative in nature. I'm not sure this is bad, but if anyone has any idea on improving the code I'd love to hear it. -Jay
Re: Follow-up On Important Issues for 0.8.2
Adding User Community to see if any one knows behavior of Producer for issue #1) and status of 2). Thanks, Bhavesh On Fri, Jan 2, 2015 at 12:37 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Hi Kafka Dev Team, I am following-up with you guys regarding New (Java) Producer behavior in event of network or firewall rules. I just wanted to make Java Producer resilient of any network or firewall issues, and does not become single-point of failure in application: 1) Jira Issue https://issues.apache.org/jira/browse/KAFKA-1788 https://issues.apache.org/jira/browse/KAFKA-1788?focusedCommentId=14259235page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14259235 What should be the behavior of the Producer when it can not reach leader broker, but metadata reported broker is leader for that partition (via other broker) ? Should the record-error-rate be counted and Call Back should be called with error or not ? 1) *record-error-rate* metric remain zero despite following firewall rule. In my opinion, it should have called org.apache.kafka.clients.producer.Callback but I did not see that happening either one or two are brokers not reachable. 2) Is jira ticket https://issues.apache.org/jira/browse/KAFKA-1788 will be merged to 0.8.2 ? This will give the ability to close the producer in event of lost connectivity to broker if io thread misbehave (does not end) ? Thanks for your help ! Thanks, Bhavesh
Re: Possible Memory Leak in Kafka with Tomcat
Hi Marcel, Memory leaks will happen when there are background threads started by webapp and not shutdown (usually lib like Code hale has shutdown hook, but web app you do get to execute shutdown hook so you get memory leaks or class not found). I have faced this so you need to either use web context listener and close all threads. With Old Producer I have faced this issue. I had to explicitly called Metrics.defaultRegistry().shutdown(); for shutdown because producer.close() does not shutdown metics threads. Similar issue if you are consuming consumer side. Also, you have to call consumer.shutdown() as well. I hope this help ! Thanks, Bhavesh On Tue, Jan 6, 2015 at 1:53 PM, Marcel Alburg m.alb...@weeaar.com wrote: Hello, i try to use spring-integration-kafka and after stopping the Tomcat i get an possible memory leak from the class loader. I talked with the people of spring-integration-kafka and you can see this talk with a lot of debug messages and an reproduceable project under: https://github.com/spring-projects/spring-integration-kafka/issues/10 Thanks in advance
Re: Follow-up On Important Issues for 0.8.2
Hi Kafka Dev Team, We have to similar change for # 2 issue, I would like to know if /KAFKA-1788 https://issues.apache.org/jira/browse/KAFKA-1788 will be part of release. Secondly, issue #1, I would like to know what is expected behavior of metrics error counting and call back in (async mode) when there is no issues with ZK or Brokers but network layer (firewalll or DNS issue) only. If need to be, I can file a jira ticket after understanding behavior and I can test expected behavior as well. Thanks, Bhavesh On Fri, Jan 2, 2015 at 12:37 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Hi Kafka Dev Team, I am following-up with you guys regarding New (Java) Producer behavior in event of network or firewall rules. I just wanted to make Java Producer resilient of any network or firewall issues, and does not become single-point of failure in application: 1) Jira Issue https://issues.apache.org/jira/browse/KAFKA-1788 https://issues.apache.org/jira/browse/KAFKA-1788?focusedCommentId=14259235page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14259235 What should be the behavior of the Producer when it can not reach leader broker, but metadata reported broker is leader for that partition (via other broker) ? Should the record-error-rate be counted and Call Back should be called with error or not ? 1) *record-error-rate* metric remain zero despite following firewall rule. In my opinion, it should have called org.apache.kafka.clients.producer.Callback but I did not see that happening either one or two are brokers not reachable. 2) Is jira ticket https://issues.apache.org/jira/browse/KAFKA-1788 will be merged to 0.8.2 ? This will give the ability to close the producer in event of lost connectivity to broker if io thread misbehave (does not end) ? Thanks for your help ! Thanks, Bhavesh
Latency Tracking Across All Kafka Component
Hi Kafka Team/Users, We are using Linked-in Kafka data pipe-line end-to-end. Producer(s) -Local DC Brokers - MM - Central brokers - Camus Job - HDFS This is working out very well for us, but we need to have visibility of latency at each layer (Local DC Brokers - MM - Central brokers - Camus Job - HDFS). Our events are time-based (time event was produce). Is there any feature or any audit trail mentioned at ( https://github.com/linkedin/camus/) ? But, I would like to know in-between latency and time event spent in each hope? So, we do not know where is problem and what t o optimize ? Any of this cover in 0.9.0 or any other version of upcoming Kafka release ? How might we achive this latency tracking across all components ? Thanks, Bhavesh
Follow-up On Important Issues for 0.8.2
Hi Kafka Dev Team, I am following-up with you guys regarding New (Java) Producer behavior in event of network or firewall rules. I just wanted to make Java Producer resilient of any network or firewall issues, and does not become single-point of failure in application: 1) Jira Issue https://issues.apache.org/jira/browse/KAFKA-1788 https://issues.apache.org/jira/browse/KAFKA-1788?focusedCommentId=14259235page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14259235 What should be the behavior of the Producer when it can not reach leader broker, but metadata reported broker is leader for that partition (via other broker) ? Should the record-error-rate be counted and Call Back should be called with error or not ? 1) *record-error-rate* metric remain zero despite following firewall rule. In my opinion, it should have called org.apache.kafka.clients.producer.Callback but I did not see that happening either one or two are brokers not reachable. 2) Is jira ticket https://issues.apache.org/jira/browse/KAFKA-1788 will be merged to 0.8.2 ? This will give the ability to close the producer in event of lost connectivity to broker if io thread misbehave (does not end) ? Thanks for your help ! Thanks, Bhavesh
[jira] [Commented] (KAFKA-1788) producer record can stay in RecordAccumulator forever if leader is no available
[ https://issues.apache.org/jira/browse/KAFKA-1788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14261356#comment-14261356 ] Bhavesh Mistry commented on KAFKA-1788: --- [~parth.brahmbhatt], The close() (#2) issue will be address by the jira ticket https://issues.apache.org/jira/browse/KAFKA-1788, but still following will not be address when leader is available, but no records can be sent. [~junrao], [~jkreps] and [~nehanarkhede], please let me know correct behavior in case of above use case. should I file another issue ? 1) record-error-rate metric remain zero despite firewall rule. In my opinion, it should have called org.apache.kafka.clients.producer.Callback but I did not see that happening either one or two are brokers not reachable. Thanks, Bhavesh producer record can stay in RecordAccumulator forever if leader is no available --- Key: KAFKA-1788 URL: https://issues.apache.org/jira/browse/KAFKA-1788 Project: Kafka Issue Type: Bug Components: core, producer Affects Versions: 0.8.2 Reporter: Jun Rao Assignee: Jun Rao Labels: newbie++ Fix For: 0.8.3 Attachments: KAFKA-1788.patch In the new producer, when a partition has no leader for a long time (e.g., all replicas are down), the records for that partition will stay in the RecordAccumulator until the leader is available. This may cause the bufferpool to be full and the callback for the produced message to block for a long time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1788) producer record can stay in RecordAccumulator forever if leader is no available
[ https://issues.apache.org/jira/browse/KAFKA-1788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14259235#comment-14259235 ] Bhavesh Mistry commented on KAFKA-1788: --- The use case I have is little different but similar: Here is my use case: 1) Lets suppose we have 3 brokers (b1,b2, b3) and a topic with 30 partitions and replication 1. So partition 1 to 10 is on b1 (is leader), partition 11 to 20 on b2 and 21 to 30 is on b3. Zk has all leadership info and every thing is fine. 2) From the Client every is working fine, but only b1 broker is not reachable (due to network or firewall issue) and note that leader is still reported as b1. 3) The patch you have provided will not address the above issue where you detect that leader is not available and then you purge batch. So case is little different, leader is available but not able to connect or firewall rule in in-place. Based on above use case, I see following two problems which I have reported on Please refer to KAFKA-1642 for more details. 1) record-error-rate metric remain zero despite following firewall rule. In my opinion, it should have called org.apache.kafka.clients.producer.Callback but I did not see that happening either one or two are brokers not reachable. {code} sudo ipfw add reject tcp from me to b1.ip dst-port 9092 sudo ipfw add reject tcp from me to b2.ip dst-port 9092 00100 reject tcp from me to b1.ip dst-port 9092 00200 reject tcp from me to b2.ip dst-port 9092 {code} {code} class LoggingCallBackHandler implements Callback { /** * A callback method the user can implement to provide asynchronous * handling of request completion. This method will be called when the * record sent to the server has been acknowledged. Exactly one of the * arguments will be non-null. * * @param metadata *The metadata for the record that was sent (i.e. the *partition and offset). Null if an error occurred. * @param exception *The exception thrown during processing of this record. *Null if no error occurred. */ @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if(exception != null){ exception.printStackTrace(); } } } {code} I do not see any exception at all on consolenot sure why ? 2) Application does NOT gracefully shutdown when there one or more brokers are not reachable. {code} SIGTERM handler daemon prio=5 tid=0x7f8bd79e4000 nid=0x17907 waiting for monitor entry [0x00011e906000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock 0x00070008f7c0 (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) SIGTERM handler daemon prio=5 tid=0x7f8bd5159000 nid=0x1cb0b waiting for monitor entry [0x00011e803000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock 0x00070008f7c0 (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) SIGTERM handler daemon prio=5 tid=0x7f8bdd147800 nid=0x15d0b waiting for monitor entry [0x00011e30a000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock 0x00070008f7c0 (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) SIGTERM handler daemon prio=5 tid=0x7f8bdf82 nid=0x770b waiting for monitor entry [0x00011e207000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock 0x00070008f7c0 (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) SIGTERM handler daemon prio=5 tid=0x7f8bdc393800 nid=0x1c30f waiting for monitor entry [0x00011e104000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock
[jira] [Comment Edited] (KAFKA-1788) producer record can stay in RecordAccumulator forever if leader is no available
[ https://issues.apache.org/jira/browse/KAFKA-1788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14259235#comment-14259235 ] Bhavesh Mistry edited comment on KAFKA-1788 at 12/26/14 10:37 PM: -- The use case I have is little different but similar: Here is my use case: 1) Lets suppose we have 3 brokers (b1,b2, b3) and a topic with 30 partitions and replication 1. So partition 1 to 10 is on b1 (is leader), partition 11 to 20 on b2 and 21 to 30 is on b3. Zk has all leadership info and every thing is fine. 2) From the Client every is working fine, but only b1 broker is not reachable (due to network or firewall issue) and note that leader is still reported as b1. 3) The patch you have provided will not address the above issue where you detect that leader is not available and then you purge batch. So case is little different, leader is available but not able to connect or firewall rule in in-place. Based on above use case, I see following two problems which I have reported on Please refer to KAFKA-1642 for more details. 1) record-error-rate metric remain zero despite following firewall rule. In my opinion, it should have called org.apache.kafka.clients.producer.Callback but I did not see that happening either one or two are brokers not reachable. {code} sudo ipfw add reject tcp from me to b1.ip dst-port 9092 sudo ipfw add reject tcp from me to b2.ip dst-port 9092 00100 reject tcp from me to b1.ip dst-port 9092 00200 reject tcp from me to b2.ip dst-port 9092 {code} {code} class LoggingCallBackHandler implements Callback { /** * A callback method the user can implement to provide asynchronous * handling of request completion. This method will be called when the * record sent to the server has been acknowledged. Exactly one of the * arguments will be non-null. * * @param metadata *The metadata for the record that was sent (i.e. the *partition and offset). Null if an error occurred. * @param exception *The exception thrown during processing of this record. *Null if no error occurred. */ @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if(exception != null){ exception.printStackTrace(); } } } {code} I do not see any exception at all on consolenot sure why ? 2) Application does NOT gracefully shutdown when there one or more brokers are not reachable. {code} SIGTERM handler daemon prio=5 tid=0x7f8bd79e4000 nid=0x17907 waiting for monitor entry [0x00011e906000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock 0x00070008f7c0 (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) SIGTERM handler daemon prio=5 tid=0x7f8bd5159000 nid=0x1cb0b waiting for monitor entry [0x00011e803000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock 0x00070008f7c0 (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) SIGTERM handler daemon prio=5 tid=0x7f8bdd147800 nid=0x15d0b waiting for monitor entry [0x00011e30a000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock 0x00070008f7c0 (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) SIGTERM handler daemon prio=5 tid=0x7f8bdf82 nid=0x770b waiting for monitor entry [0x00011e207000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock 0x00070008f7c0 (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) SIGTERM handler daemon prio=5 tid=0x7f8bdc393800 nid=0x1c30f waiting for monitor entry [0x00011e104000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit
[jira] [Commented] (KAFKA-1788) producer record can stay in RecordAccumulator forever if leader is no available
[ https://issues.apache.org/jira/browse/KAFKA-1788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14259275#comment-14259275 ] Bhavesh Mistry commented on KAFKA-1788: --- [~junrao], Please let me know your opinion on behavior of new producer in above case. Also, issue 1 and 2 reported. Also, how should the expiration behave respect to ack setting and reporting error back to CallBack Handler. Thanks, Bhavesh producer record can stay in RecordAccumulator forever if leader is no available --- Key: KAFKA-1788 URL: https://issues.apache.org/jira/browse/KAFKA-1788 Project: Kafka Issue Type: Bug Components: core, producer Affects Versions: 0.8.2 Reporter: Jun Rao Assignee: Jun Rao Labels: newbie++ Fix For: 0.8.3 Attachments: KAFKA-1788.patch In the new producer, when a partition has no leader for a long time (e.g., all replicas are down), the records for that partition will stay in the RecordAccumulator until the leader is available. This may cause the bufferpool to be full and the callback for the produced message to block for a long time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14257805#comment-14257805 ] Bhavesh Mistry edited comment on KAFKA-1642 at 12/24/14 7:07 PM: - [~ewencp], Thanks for patch. You may close this issue. The only thing, I have not tested the rare case where a single broker is out of File Descriptor and under heavy load on producer will request more connections to same broker. According to code, it will mark the Node State to disconnect and I am not sure if data will be sent via already live connection. Another comment is that there is no WARN or ERROR message logged when connection fails. Can we please change the log level for following code to WAR, because in production environment people set LOG LEVEL to WARN or ERROR. So there will no visibility if there is connection issue. {code} /** * Initiate a connection to the given node */ private void initiateConnect(Node node, long now) { try { log.debug(Initiating connection to node {} at {}:{}., node.id(), node.host(), node.port()); this.connectionStates.connecting(node.id(), now); selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer); } catch (IOException e) { /* attempt failed, we'll try again after the backoff */ connectionStates.disconnected(node.id()); /* maybe the problem is our metadata, update it */ metadata.requestUpdate(); log.debug(Error connecting to node {} at {}:{}:, node.id(), node.host(), node.port(), e); } } {code} Thanks for all your help ! Thanks, Bhavesh was (Author: bmis13): [~ewencp], Thanks for patch. You may close this issue. The only thing, I have not tested the rare case where a single broker is out of File Descriptor and under heavy load on producer will request more connections to same broker. According to code, it will mark the Node State to disconnect and I am not sure if data will be sent via already live connection. Thanks for all your help ! Thanks, Bhavesh [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Priority: Blocker Fix For: 0.8.2 Attachments: 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, KAFKA-1642.patch, KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, KAFKA-1642_2014-10-23_16:19:41.patch I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14257686#comment-14257686 ] Bhavesh Mistry commented on KAFKA-1642: --- [~ewencp], Patch indeed solve the high CPU Problem reported by this bug. I have tested all brokers down, one broker down and two broker down: Here are some interesting Observations from YourKit: 0) Overall, patch has also brought down overall consumption in Normal Healthy or Happy case where every thing is up and running. In old code (without patch), I use to see about 10% of overall CPU used by process by io threads (4 in my case), it has reduce to 5% or less now with path. 1) When two brokers are down, then occasionally I see IO thread blocked. ( I did not see this when one brokers is down) {code} kafka-producer-network-thread | rawlog [BLOCKED] [DAEMON] org.apache.kafka.clients.producer.internals.Metadata.fetch() Metadata.java:70 java.lang.Thread.run() Thread.java:744 {code} 2) record-error-rate metric remain zero despite following firewall rule. In my opinion, it should have called org.apache.kafka.clients.producer.Callback but I did not see that happening either in either one or two brokers down. Should I file another issue for this ? Please confirm. {code} 00100 reject tcp from me to b1.ip dst-port 9092 00200 reject tcp from me to b2.ip dst-port 9092 {code} {code} class LoggingCallBaHandler implements Callback { /** * A callback method the user can implement to provide asynchronous * handling of request completion. This method will be called when the * record sent to the server has been acknowledged. Exactly one of the * arguments will be non-null. * * @param metadata *The metadata for the record that was sent (i.e. the *partition and offset). Null if an error occurred. * @param exception *The exception thrown during processing of this record. *Null if no error occurred. */ @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if(exception != null){ exception.printStackTrace(); } } } {code} I do not see any exception at all on consolenot sure why ? 3) Application does NOT gracefully shutdown when there one or more brokers are down. (io Thread never exits this is know issue ) {code} SIGTERM handler daemon prio=5 tid=0x7f8bd79e4000 nid=0x17907 waiting for monitor entry [0x00011e906000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock 0x00070008f7c0 (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) SIGTERM handler daemon prio=5 tid=0x7f8bd5159000 nid=0x1cb0b waiting for monitor entry [0x00011e803000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock 0x00070008f7c0 (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) SIGTERM handler daemon prio=5 tid=0x7f8bdd147800 nid=0x15d0b waiting for monitor entry [0x00011e30a000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock 0x00070008f7c0 (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) SIGTERM handler daemon prio=5 tid=0x7f8bdf82 nid=0x770b waiting for monitor entry [0x00011e207000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock 0x00070008f7c0 (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) SIGTERM handler daemon prio=5 tid=0x7f8bdc393800 nid=0x1c30f waiting for monitor entry [0x00011e104000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock 0x00070008f7c0 (a java.lang.Class for java.lang.Shutdown
[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14257686#comment-14257686 ] Bhavesh Mistry edited comment on KAFKA-1642 at 12/23/14 11:36 PM: -- [~ewencp], Patch indeed solve the high CPU Problem reported by this bug. I have tested all brokers down, one broker down and two broker down (except for last use cases where one of the brokers runs out of Socket File Descriptor a rear case) : Here are some interesting Observations from YourKit: 0) Overall, patch has also brought down overall consumption in Normal Healthy or Happy case where every thing is up and running. In old code (without patch), I use to see about 10% of overall CPU used by process by io threads (4 in my case), it has reduce to 5% or less now with path. 1) When two brokers are down, then occasionally I see IO thread blocked. ( I did not see this when one brokers is down) {code} kafka-producer-network-thread | rawlog [BLOCKED] [DAEMON] org.apache.kafka.clients.producer.internals.Metadata.fetch() Metadata.java:70 java.lang.Thread.run() Thread.java:744 {code} 2) record-error-rate metric remain zero despite following firewall rule. In my opinion, it should have called org.apache.kafka.clients.producer.Callback but I did not see that happening either in either one or two brokers down. Should I file another issue for this ? Please confirm. {code} 00100 reject tcp from me to b1.ip dst-port 9092 00200 reject tcp from me to b2.ip dst-port 9092 {code} {code} class LoggingCallBaHandler implements Callback { /** * A callback method the user can implement to provide asynchronous * handling of request completion. This method will be called when the * record sent to the server has been acknowledged. Exactly one of the * arguments will be non-null. * * @param metadata *The metadata for the record that was sent (i.e. the *partition and offset). Null if an error occurred. * @param exception *The exception thrown during processing of this record. *Null if no error occurred. */ @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if(exception != null){ exception.printStackTrace(); } } } {code} I do not see any exception at all on consolenot sure why ? 3) Application does NOT gracefully shutdown when there one or more brokers are down. (io Thread never exits this is know issue ) {code} SIGTERM handler daemon prio=5 tid=0x7f8bd79e4000 nid=0x17907 waiting for monitor entry [0x00011e906000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock 0x00070008f7c0 (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) SIGTERM handler daemon prio=5 tid=0x7f8bd5159000 nid=0x1cb0b waiting for monitor entry [0x00011e803000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock 0x00070008f7c0 (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) SIGTERM handler daemon prio=5 tid=0x7f8bdd147800 nid=0x15d0b waiting for monitor entry [0x00011e30a000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock 0x00070008f7c0 (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) SIGTERM handler daemon prio=5 tid=0x7f8bdf82 nid=0x770b waiting for monitor entry [0x00011e207000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock 0x00070008f7c0 (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) SIGTERM handler daemon prio=5 tid=0x7f8bdc393800 nid=0x1c30f waiting for monitor entry [0x00011e104000] java.lang.Thread.State: BLOCKED (on object monitor
[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14257686#comment-14257686 ] Bhavesh Mistry edited comment on KAFKA-1642 at 12/23/14 11:39 PM: -- [~ewencp], Patch indeed solve the high CPU Problem reported by this bug. I have tested all brokers down, one broker down and two broker down (except for last use cases where one of the brokers runs out of Socket File Descriptor a rear case) : I am sorry for last response, I got busy with other stuff so testing got delayed. Here are some interesting Observations from YourKit: 0) Overall, patch has also brought down overall consumption in Normal Healthy or Happy case where every thing is up and running. In old code (without patch), I use to see about 10% of overall CPU used by process by io threads (4 in my case), it has reduce to 5% or less now with path. 1) When two brokers are down, then occasionally I see IO thread blocked. ( I did not see this when one brokers is down) {code} kafka-producer-network-thread | rawlog [BLOCKED] [DAEMON] org.apache.kafka.clients.producer.internals.Metadata.fetch() Metadata.java:70 java.lang.Thread.run() Thread.java:744 {code} 2) record-error-rate metric remain zero despite following firewall rule. In my opinion, it should have called org.apache.kafka.clients.producer.Callback but I did not see that happening either in either one or two brokers down. Should I file another issue for this ? Please confirm. {code} 00100 reject tcp from me to b1.ip dst-port 9092 00200 reject tcp from me to b2.ip dst-port 9092 {code} {code} class LoggingCallBaHandler implements Callback { /** * A callback method the user can implement to provide asynchronous * handling of request completion. This method will be called when the * record sent to the server has been acknowledged. Exactly one of the * arguments will be non-null. * * @param metadata *The metadata for the record that was sent (i.e. the *partition and offset). Null if an error occurred. * @param exception *The exception thrown during processing of this record. *Null if no error occurred. */ @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if(exception != null){ exception.printStackTrace(); } } } {code} I do not see any exception at all on consolenot sure why ? 3) Application does NOT gracefully shutdown when there one or more brokers are down. (io Thread never exits this is know issue ) {code} SIGTERM handler daemon prio=5 tid=0x7f8bd79e4000 nid=0x17907 waiting for monitor entry [0x00011e906000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock 0x00070008f7c0 (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) SIGTERM handler daemon prio=5 tid=0x7f8bd5159000 nid=0x1cb0b waiting for monitor entry [0x00011e803000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock 0x00070008f7c0 (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) SIGTERM handler daemon prio=5 tid=0x7f8bdd147800 nid=0x15d0b waiting for monitor entry [0x00011e30a000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock 0x00070008f7c0 (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) SIGTERM handler daemon prio=5 tid=0x7f8bdf82 nid=0x770b waiting for monitor entry [0x00011e207000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock 0x00070008f7c0 (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) SIGTERM handler daemon prio=5 tid=0x7f8bdc393800 nid=0x1c30f waiting for monitor entry
[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14257686#comment-14257686 ] Bhavesh Mistry edited comment on KAFKA-1642 at 12/23/14 11:41 PM: -- [~ewencp], Patch indeed solve the high CPU Problem reported by this bug. I have tested all brokers down, one broker down and two broker down (except for last use cases where one of the brokers runs out of Socket File Descriptor a rear case) : I am sorry for last response, I got busy with other stuff so testing got delayed. Here are some interesting Observations from YourKit: 0) Overall, patch has also brought down overall consumption in Normal Healthy or Happy case where every thing is up and running. In old code (without patch), I use to see about 10% of overall CPU used by process by io threads (4 in my case), it has reduce to 5% or less now with path. 1) When two brokers are down, then occasionally I see IO thread blocked. ( I did not see this when one brokers is down) {code} kafka-producer-network-thread | rawlog [BLOCKED] [DAEMON] org.apache.kafka.clients.producer.internals.Metadata.fetch() Metadata.java:70 java.lang.Thread.run() Thread.java:744 {code} 2) record-error-rate metric remain zero despite following firewall rule. In my opinion, it should have called org.apache.kafka.clients.producer.Callback but I did not see that happening either in either one or two brokers down. Should I file another issue for this ? Please confirm. {code} 00100 reject tcp from me to b1.ip dst-port 9092 00200 reject tcp from me to b2.ip dst-port 9092 {code} {code} class LoggingCallBaHandler implements Callback { /** * A callback method the user can implement to provide asynchronous * handling of request completion. This method will be called when the * record sent to the server has been acknowledged. Exactly one of the * arguments will be non-null. * * @param metadata *The metadata for the record that was sent (i.e. the *partition and offset). Null if an error occurred. * @param exception *The exception thrown during processing of this record. *Null if no error occurred. */ @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if(exception != null){ exception.printStackTrace(); } } } {code} I do not see any exception at all on consolenot sure why ? 3) Application does NOT gracefully shutdown when there one or more brokers are down. (io Thread never exits this is know issue ) {code} SIGTERM handler daemon prio=5 tid=0x7f8bd79e4000 nid=0x17907 waiting for monitor entry [0x00011e906000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock 0x00070008f7c0 (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) SIGTERM handler daemon prio=5 tid=0x7f8bd5159000 nid=0x1cb0b waiting for monitor entry [0x00011e803000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock 0x00070008f7c0 (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) SIGTERM handler daemon prio=5 tid=0x7f8bdd147800 nid=0x15d0b waiting for monitor entry [0x00011e30a000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock 0x00070008f7c0 (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) SIGTERM handler daemon prio=5 tid=0x7f8bdf82 nid=0x770b waiting for monitor entry [0x00011e207000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock 0x00070008f7c0 (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) SIGTERM handler daemon prio=5 tid=0x7f8bdc393800 nid=0x1c30f waiting for monitor entry
[jira] [Comment Edited] (KAFKA-1788) producer record can stay in RecordAccumulator forever if leader is no available
[ https://issues.apache.org/jira/browse/KAFKA-1788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14257691#comment-14257691 ] Bhavesh Mistry edited comment on KAFKA-1788 at 12/23/14 11:44 PM: -- HI All, I did NOT try this patch, but when one or two or all brokers are down then I see application will not shutdown due to close() method: Application does not gracefully shutdown when there one or more brokers are down. (io Thread never exits this is know issue ) {code} SIGTERM handler daemon prio=5 tid=0x7f8bd79e4000 nid=0x17907 waiting for monitor entry [0x00011e906000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock 0x00070008f7c0 (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) SIGTERM handler daemon prio=5 tid=0x7f8bd5159000 nid=0x1cb0b waiting for monitor entry [0x00011e803000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock 0x00070008f7c0 (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) SIGTERM handler daemon prio=5 tid=0x7f8bdd147800 nid=0x15d0b waiting for monitor entry [0x00011e30a000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock 0x00070008f7c0 (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) SIGTERM handler daemon prio=5 tid=0x7f8bdf82 nid=0x770b waiting for monitor entry [0x00011e207000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock 0x00070008f7c0 (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) SIGTERM handler daemon prio=5 tid=0x7f8bdc393800 nid=0x1c30f waiting for monitor entry [0x00011e104000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock 0x00070008f7c0 (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) Thread-4 prio=5 tid=0x7f8bdb39f000 nid=0xa107 in Object.wait() [0x00011ea89000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.$$YJP$$wait(Native Method) at java.lang.Object.wait(Object.java) at java.lang.Thread.join(Thread.java:1280) - locked 0x000705c2f650 (a org.apache.kafka.common.utils.KafkaThread) at java.lang.Thread.join(Thread.java:1354) at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:322) at kafka-producer-network-thread | error daemon prio=5 tid=0x7f8bd814e000 nid=0x7403 runnable [0x00011e6c] java.lang.Thread.State: RUNNABLE at sun.nio.ch.KQueueArrayWrapper.$$YJP$$kevent0(Native Method) at sun.nio.ch.KQueueArrayWrapper.kevent0(KQueueArrayWrapper.java) at sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:200) at sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:103) at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87) - locked 0x000705c109f8 (a sun.nio.ch.Util$2) - locked 0x000705c109e8 (a java.util.Collections$UnmodifiableSet) - locked 0x000705c105c8 (a sun.nio.ch.KQueueSelectorImpl) at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98) at org.apache.kafka.common.network.Selector.select(Selector.java:322) at org.apache.kafka.common.network.Selector.poll(Selector.java:212) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:184) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128) at java.lang.Thread.run(Thread.java:744) {code} Thanks, Bhavesh was (Author: bmis13): HI All, I did NOT try this patch, but when one or two or all brokers are down then I see application will not shutdown due to close() method: Application does not gracefully shutdown when there one or more brokers
[jira] [Commented] (KAFKA-1788) producer record can stay in RecordAccumulator forever if leader is no available
[ https://issues.apache.org/jira/browse/KAFKA-1788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14257691#comment-14257691 ] Bhavesh Mistry commented on KAFKA-1788: --- HI All, I did NOT try this patch, but when one or two or all brokers are down then I see application will not shutdown due to close() method: Application does not gracefully shutdown when there one or more brokers are down. (io Thread never exits this is know issue ) {code} SIGTERM handler daemon prio=5 tid=0x7f8bd79e4000 nid=0x17907 waiting for monitor entry [0x00011e906000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock 0x00070008f7c0 (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) SIGTERM handler daemon prio=5 tid=0x7f8bd5159000 nid=0x1cb0b waiting for monitor entry [0x00011e803000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock 0x00070008f7c0 (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) SIGTERM handler daemon prio=5 tid=0x7f8bdd147800 nid=0x15d0b waiting for monitor entry [0x00011e30a000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock 0x00070008f7c0 (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) SIGTERM handler daemon prio=5 tid=0x7f8bdf82 nid=0x770b waiting for monitor entry [0x00011e207000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock 0x00070008f7c0 (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) SIGTERM handler daemon prio=5 tid=0x7f8bdc393800 nid=0x1c30f waiting for monitor entry [0x00011e104000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock 0x00070008f7c0 (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) Thread-4 prio=5 tid=0x7f8bdb39f000 nid=0xa107 in Object.wait() [0x00011ea89000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.$$YJP$$wait(Native Method) at java.lang.Object.wait(Object.java) at java.lang.Thread.join(Thread.java:1280) - locked 0x000705c2f650 (a org.apache.kafka.common.utils.KafkaThread) at java.lang.Thread.join(Thread.java:1354) at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:322) at kafka-producer-network-thread | error daemon prio=5 tid=0x7f8bd814e000 nid=0x7403 runnable [0x00011e6c] java.lang.Thread.State: RUNNABLE at sun.nio.ch.KQueueArrayWrapper.$$YJP$$kevent0(Native Method) at sun.nio.ch.KQueueArrayWrapper.kevent0(KQueueArrayWrapper.java) at sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:200) at sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:103) at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87) - locked 0x000705c109f8 (a sun.nio.ch.Util$2) - locked 0x000705c109e8 (a java.util.Collections$UnmodifiableSet) - locked 0x000705c105c8 (a sun.nio.ch.KQueueSelectorImpl) at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98) at org.apache.kafka.common.network.Selector.select(Selector.java:322) at org.apache.kafka.common.network.Selector.poll(Selector.java:212) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:184) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128) at java.lang.Thread.run(Thread.java:744) {code} producer record can stay in RecordAccumulator forever if leader is no available --- Key: KAFKA-1788 URL: https://issues.apache.org/jira/browse/KAFKA-1788 Project: Kafka Issue
[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14257686#comment-14257686 ] Bhavesh Mistry edited comment on KAFKA-1642 at 12/24/14 12:01 AM: -- [~ewencp], Patch indeed solve the high CPU Problem reported by this bug. I have tested all brokers down, one broker down and two broker down (except for last use cases where one of the brokers runs out of Socket File Descriptor a rear case) : I am sorry for last response, I got busy with other stuff so testing got delayed. Here are some interesting Observations from YourKit: 0) Overall, patch has also brought down overall consumption in Normal Healthy or Happy case where every thing is up and running. In old code (without patch), I use to see about 10% of overall CPU used by process by io threads (4 in my case), it has reduce to 5% or less now with path. 1) When two brokers are down, then occasionally I see IO thread blocked. ( I did not see this when one brokers is down) {code} kafka-producer-network-thread | rawlog [BLOCKED] [DAEMON] org.apache.kafka.clients.producer.internals.Metadata.fetch() Metadata.java:70 java.lang.Thread.run() Thread.java:744 {code} 2) record-error-rate metric remain zero despite following firewall rule. In my opinion, it should have called org.apache.kafka.clients.producer.Callback but I did not see that happening either in either one or two brokers down. Should I file another issue for this ? Please confirm. {code} sudo ipfw add reject tcp from me to b1.ip dst-port 9092 sudo ipfw add reject tcp from me to b2.ip dst-port 9092 00100 reject tcp from me to b1.ip dst-port 9092 00200 reject tcp from me to b2.ip dst-port 9092 {code} {code} class LoggingCallBaHandler implements Callback { /** * A callback method the user can implement to provide asynchronous * handling of request completion. This method will be called when the * record sent to the server has been acknowledged. Exactly one of the * arguments will be non-null. * * @param metadata *The metadata for the record that was sent (i.e. the *partition and offset). Null if an error occurred. * @param exception *The exception thrown during processing of this record. *Null if no error occurred. */ @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if(exception != null){ exception.printStackTrace(); } } } {code} I do not see any exception at all on consolenot sure why ? 3) Application does NOT gracefully shutdown when there one or more brokers are down. (io Thread never exits this is know issue ) {code} SIGTERM handler daemon prio=5 tid=0x7f8bd79e4000 nid=0x17907 waiting for monitor entry [0x00011e906000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock 0x00070008f7c0 (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) SIGTERM handler daemon prio=5 tid=0x7f8bd5159000 nid=0x1cb0b waiting for monitor entry [0x00011e803000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock 0x00070008f7c0 (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) SIGTERM handler daemon prio=5 tid=0x7f8bdd147800 nid=0x15d0b waiting for monitor entry [0x00011e30a000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock 0x00070008f7c0 (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) SIGTERM handler daemon prio=5 tid=0x7f8bdf82 nid=0x770b waiting for monitor entry [0x00011e207000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock 0x00070008f7c0 (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212
[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14257686#comment-14257686 ] Bhavesh Mistry edited comment on KAFKA-1642 at 12/24/14 12:02 AM: -- [~ewencp], Patch indeed solve the high CPU Problem reported by this bug. I have tested all brokers down, one broker down and two broker down (except for last use cases where one of the brokers runs out of Socket File Descriptor a rear case) : I am sorry for last response, I got busy with other stuff so testing got delayed. Here are some interesting Observations from YourKit: 0) Overall, patch has also brought down overall consumption in Normal Healthy or Happy case where every thing is up and running. In old code (without patch), I used to see about 10% of overall CPU used by io threads (4 in my case), it has reduce to 5% or less now with patch. 1) When two brokers are down, then occasionally I see IO thread blocked. ( I did not see this when one brokers is down) {code} kafka-producer-network-thread | rawlog [BLOCKED] [DAEMON] org.apache.kafka.clients.producer.internals.Metadata.fetch() Metadata.java:70 java.lang.Thread.run() Thread.java:744 {code} 2) record-error-rate metric remain zero despite following firewall rule. In my opinion, it should have called org.apache.kafka.clients.producer.Callback but I did not see that happening either in either one or two brokers down. Should I file another issue for this ? Please confirm. {code} sudo ipfw add reject tcp from me to b1.ip dst-port 9092 sudo ipfw add reject tcp from me to b2.ip dst-port 9092 00100 reject tcp from me to b1.ip dst-port 9092 00200 reject tcp from me to b2.ip dst-port 9092 {code} {code} class LoggingCallBaHandler implements Callback { /** * A callback method the user can implement to provide asynchronous * handling of request completion. This method will be called when the * record sent to the server has been acknowledged. Exactly one of the * arguments will be non-null. * * @param metadata *The metadata for the record that was sent (i.e. the *partition and offset). Null if an error occurred. * @param exception *The exception thrown during processing of this record. *Null if no error occurred. */ @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if(exception != null){ exception.printStackTrace(); } } } {code} I do not see any exception at all on consolenot sure why ? 3) Application does NOT gracefully shutdown when there one or more brokers are down. (io Thread never exits this is know issue ) {code} SIGTERM handler daemon prio=5 tid=0x7f8bd79e4000 nid=0x17907 waiting for monitor entry [0x00011e906000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock 0x00070008f7c0 (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) SIGTERM handler daemon prio=5 tid=0x7f8bd5159000 nid=0x1cb0b waiting for monitor entry [0x00011e803000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock 0x00070008f7c0 (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) SIGTERM handler daemon prio=5 tid=0x7f8bdd147800 nid=0x15d0b waiting for monitor entry [0x00011e30a000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock 0x00070008f7c0 (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:744) SIGTERM handler daemon prio=5 tid=0x7f8bdf82 nid=0x770b waiting for monitor entry [0x00011e207000] java.lang.Thread.State: BLOCKED (on object monitor) at java.lang.Shutdown.exit(Shutdown.java:212) - waiting to lock 0x00070008f7c0 (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run
[jira] [Commented] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14257805#comment-14257805 ] Bhavesh Mistry commented on KAFKA-1642: --- [~ewencp], Thanks for patch. You may close this issue. The only thing, I have not tested the rare case where a single broker is out of File Descriptor and under heavy load on producer will request more connections to same broker. According to code, it will mark the Node State to disconnect and I am not sure if data will be sent via already live connection. Thanks for all your help ! Thanks, Bhavesh [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Priority: Blocker Fix For: 0.8.2 Attachments: 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, KAFKA-1642.patch, KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, KAFKA-1642_2014-10-23_16:19:41.patch I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1788) producer record can stay in RecordAccumulator forever if leader is no available
[ https://issues.apache.org/jira/browse/KAFKA-1788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14249266#comment-14249266 ] Bhavesh Mistry commented on KAFKA-1788: --- [~jkreps], Can we just take quick look at the NodeConnectionState ? If all registered Nodes are down, then exit it quickly or attempt to connect ? This will have accurate status of al Nodes registered... (may we can do TCP ping for all nodes). I am not sure if producer key is fixed to only one brokers then does it still have all Node status ? Here is reference code: https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/clients/NodeConnectionState.java I did this in experimental path for o KAFKA-1642 (but used hard coded timeout for join method). Thanks, Bhavesh producer record can stay in RecordAccumulator forever if leader is no available --- Key: KAFKA-1788 URL: https://issues.apache.org/jira/browse/KAFKA-1788 Project: Kafka Issue Type: Bug Components: core, producer Affects Versions: 0.8.2 Reporter: Jun Rao Assignee: Jun Rao Labels: newbie++ Fix For: 0.8.3 In the new producer, when a partition has no leader for a long time (e.g., all replicas are down), the records for that partition will stay in the RecordAccumulator until the leader is available. This may cause the bufferpool to be full and the callback for the produced message to block for a long time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-1788) producer record can stay in RecordAccumulator forever if leader is no available
[ https://issues.apache.org/jira/browse/KAFKA-1788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14249266#comment-14249266 ] Bhavesh Mistry edited comment on KAFKA-1788 at 12/17/14 1:26 AM: - [~jkreps], Can we just take quick look at the NodeConnectionState ? If all registered Nodes are down, then exit it quickly or attempt to connect ? This will have accurate status of all Nodes registered... (may we can do TCP ping for all nodes). I am not sure if producer key is fixed to only one brokers then does it still have all Node status ? Here is reference code: https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/clients/NodeConnectionState.java I did this in experimental path for o KAFKA-1642 (but used hard coded timeout for join method for IO thread and interrupted if it does not get killed ). Thanks, Bhavesh was (Author: bmis13): [~jkreps], Can we just take quick look at the NodeConnectionState ? If all registered Nodes are down, then exit it quickly or attempt to connect ? This will have accurate status of al Nodes registered... (may we can do TCP ping for all nodes). I am not sure if producer key is fixed to only one brokers then does it still have all Node status ? Here is reference code: https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/clients/NodeConnectionState.java I did this in experimental path for o KAFKA-1642 (but used hard coded timeout for join method). Thanks, Bhavesh producer record can stay in RecordAccumulator forever if leader is no available --- Key: KAFKA-1788 URL: https://issues.apache.org/jira/browse/KAFKA-1788 Project: Kafka Issue Type: Bug Components: core, producer Affects Versions: 0.8.2 Reporter: Jun Rao Assignee: Jun Rao Labels: newbie++ Fix For: 0.8.3 In the new producer, when a partition has no leader for a long time (e.g., all replicas are down), the records for that partition will stay in the RecordAccumulator until the leader is available. This may cause the bufferpool to be full and the callback for the produced message to block for a long time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14239063#comment-14239063 ] Bhavesh Mistry commented on KAFKA-1642: --- [~stevenz3wu], 0.8.2 is very well tested and worked well under heavy load. This bug is rare only happen when broker or network has issue. We have been producing about 7 to 10 TB per day using this new producer, so 0.8.2 is very safe to use in production. It has survived pick traffic of the year on large e-commerce site. So I am fairly confident that New Java API is indeed does true round-robin and much faster than Scala Based API. [~ewencp], I will verify the patch by end of this Friday, but do let me know your understanding based on my last comment. Thanks, Bhavesh [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Priority: Blocker Fix For: 0.8.2 Attachments: 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, KAFKA-1642.patch, KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, KAFKA-1642_2014-10-23_16:19:41.patch I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14239063#comment-14239063 ] Bhavesh Mistry edited comment on KAFKA-1642 at 12/9/14 6:53 AM: [~stevenz3wu], 0.8.2 is very well tested and worked well under heavy load. This bug is rare only happen when broker or network has issue. We have been producing about 7 to 10 TB per day using this new producer, so 0.8.2 is very safe to use in production. It has survived pick traffic of the year on large e-commerce site. So I am fairly confident that New Java API is indeed does true round-robin and much faster than Scala Based API. [~ewencp], I will verify the patch by end of this Friday, but do let me know your understanding based on my last comment. The goal is to rest this issue and cover all the use case. Thanks, Bhavesh was (Author: bmis13): [~stevenz3wu], 0.8.2 is very well tested and worked well under heavy load. This bug is rare only happen when broker or network has issue. We have been producing about 7 to 10 TB per day using this new producer, so 0.8.2 is very safe to use in production. It has survived pick traffic of the year on large e-commerce site. So I am fairly confident that New Java API is indeed does true round-robin and much faster than Scala Based API. [~ewencp], I will verify the patch by end of this Friday, but do let me know your understanding based on my last comment. Thanks, Bhavesh [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Priority: Blocker Fix For: 0.8.2 Attachments: 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, KAFKA-1642.patch, KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, KAFKA-1642_2014-10-23_16:19:41.patch I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14234297#comment-14234297 ] Bhavesh Mistry commented on KAFKA-1642: --- [~ewencp], 1) I will posted toward KAFKA-1788 and perhaps link the issue. 2) True , some sort of measure would be great 5,10...25 50, 95 and 99 percentile would be great of execution time. The point is just measure the duration report the rate of execution. 3) Agree with what you are saying and I have observed same behavior. But only recommendation is to add some intelligence to *timeouts* to detect if for long period and consecutive timeout is zero then there is problem. (Little more defensive) 4) Again I agree with you point, but based in your previous comments you had mentioned that you may consider having back-off logic further up the chain. So I was just checking run() is best place to do that check. Again, may be add intelligence here if you get consecutive “Exception” then likelihood of high CPU is high. 5) Ok. I agree what you are saying is data needs to be de-queue so more data can be en-queue even in event of network lost. Is my understanding correct ? 6) All I am saying is network firewall rule (such as only 2 TCP connections per source host) or Brokers running out of File Descriptor so new connection to broker is not established but Client have live and active TCP connection to same broker. But based on what I see in the method * initiateConnect* will mark the entire Broker or Node status as disconnected. Is this expected behavior? So question is: will client continue to send data ? Thank you very much for entertaining my questions so far and I will test out the patch next week. Thanks, Bhavesh [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Priority: Blocker Fix For: 0.8.2 Attachments: 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, KAFKA-1642.patch, KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, KAFKA-1642_2014-10-23_16:19:41.patch I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1788) producer record can stay in RecordAccumulator forever if leader is no available
[ https://issues.apache.org/jira/browse/KAFKA-1788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14234595#comment-14234595 ] Bhavesh Mistry commented on KAFKA-1788: --- We also need to fix the Producer Close which hangs JVM because io.join() thread does not exit. Please refer to KAFKA-1642 for more details. So Kakfa core Dev needs to give guidance on how to solve this problem. Please see below comments from that linked issue. 1) Producer.close() method issue is not address with patch. In event of network connection lost or other events happens, IO thread will not be killed and close method hangs. In patch that I have provided, I had timeout for join method and interrupted IO thread. I think we need similar solution. [~ewencp], 1. I'm specifically trying to address the CPU usage here. I realize from your perspective they are closely related since they're both can be triggered by a loss of network connectivity, but internally they're really separate issues – the CPU usage has to do with incorrect timeouts and the join() issues is due to the lack of timeouts on produce operations. That's why I pointed you toward KAFKA-1788. If a timeout is added for data in the producer, that would resolve the close issue as well since any data waiting in the producer would eventually timeout and the IO thread could exit. I think that's the cleanest solution since it solves both problems with a single setting (the amount of time your willing to wait before discarding data). If you think a separate timeout specifically for Producer.close() is worthwhile I'd suggest filing a separate JIRA for that. producer record can stay in RecordAccumulator forever if leader is no available --- Key: KAFKA-1788 URL: https://issues.apache.org/jira/browse/KAFKA-1788 Project: Kafka Issue Type: Bug Components: core, producer Affects Versions: 0.8.2 Reporter: Jun Rao Assignee: Jun Rao Labels: newbie++ Fix For: 0.8.3 In the new producer, when a partition has no leader for a long time (e.g., all replicas are down), the records for that partition will stay in the RecordAccumulator until the leader is available. This may cause the bufferpool to be full and the callback for the produced message to block for a long time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14232061#comment-14232061 ] Bhavesh Mistry commented on KAFKA-1642: --- Hi [~ewencp], I will not have time to validate this patch till next week. Here is my comments: 1) You still have not address the Producer.close() method issue that in event of network connection lost or other events happens, IO thread will not be killed and close method hangs. In patch that I have provided, I had timeout for join method and interrupted IO thread. I think we need similar for this. 2) Also, can we please add JMX monitoring for IO tread to know how quick it is running. It will great to add this and run() method will report duration to metric. {code} try{ ThreadMXBean bean = ManagementFactory.getThreadMXBean( ); if(bean.isThreadCpuTimeSupported() bean.isThreadCpuTimeEnabled()){ this.ioTheadCPUTime = metrics.sensor(iothread-cpu); this.ioTheadCPUTime.add(iothread-cpu-ms, The Rate Of CPU Cycle used by iothead in NANOSECONDS, new Rate(TimeUnit.NANOSECONDS) { public double measure(MetricConfig config, long now) { return (now - metadata.lastUpdate()) / 1000.0; } }); } }catch(Throwable th){ log.warn(Not able to set the CPU time... etc); } {code} 3) Please check the timeout final value in *pollTimeout* if it is zero for constantly then we need to slow IO thread down. 4) Defensive check in for back off in run() method when IO thread is aggressive: 5) When all nodes are disconnected, do you still want to spin the IO Thread ? 6) When you have a firewall rule that says you can only have 2 concurrent TCP connections from Client to Brokers and client still have live TCP connection to same not (Broker), but new TCP connection is rejected. Node State will be marked as Disconnected in initiateConnect ? Are you handling that gracefully ? By the way, thank you very much for quick reply and with new patch. I appreciate your help. Thanks, Bhavesh [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Priority: Blocker Fix For: 0.8.2 Attachments: 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, KAFKA-1642.patch, KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, KAFKA-1642_2014-10-23_16:19:41.patch I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14232061#comment-14232061 ] Bhavesh Mistry edited comment on KAFKA-1642 at 12/2/14 8:04 PM: Hi [~ewencp], I will not have time to validate this patch till next week. Here is my comments: 1) Producer.close() method issue is not address with patch. In event of network connection lost or other events happens, IO thread will not be killed and close method hangs. In patch that I have provided, I had timeout for join method and interrupted IO thread. I think we need similar solution. 2) Also, can we please add JMX monitoring for IO tread to know how quick it is running. It will great to add this and run() method will report duration to metric in nano sec. {code} try{ ThreadMXBean bean = ManagementFactory.getThreadMXBean( ); if(bean.isThreadCpuTimeSupported() bean.isThreadCpuTimeEnabled()){ this.ioTheadCPUTime = metrics.sensor(iothread-cpu); this.ioTheadCPUTime.add(iothread-cpu-ms, The Rate Of CPU Cycle used by iothead in NANOSECONDS, new Rate(TimeUnit.NANOSECONDS) { public double measure(MetricConfig config, long now) { return (now - metadata.lastUpdate()) / 1000.0; } }); } }catch(Throwable th){ log.warn(Not able to set the CPU time... etc); } {code} 3) Please check the timeout final value in *pollTimeout* if it is zero for constantly then we need to slow IO thread down. 4) Defensive check is need for back off in run() method when IO thread is aggressive. {code} while (running) { long start = time.milliseconds(); try { run(time.milliseconds()); } catch (Exception e) { log.error(Uncaught error in kafka producer I/O thread: , e); }finally{ long durationInMs = time.milliseconds() - start; // TODO Fix ME HERE GET DO exponential back-off sleep etc to prevent still CPU CYCLE HERE ?? How Much ...for the edge case... if(durationInMs 200){ if(client.isAllRegistredNodesAreDown()){ countinuousRetry++; /// TODO MAKE THIS CONSTANT CONFIGURATION. when do we rest this interval ? so we can try aggressive again... sleepInMs = ((long) Math.pow(2, countinuousRetry) * 500); }else{ sleepInMs = 500 ; countinuousRetry = 0; } // Wait until the desired next time arrives using nanosecond // accuracy timer (wait(time) isn't accurate enough on most platforms) try { // TODO SLEEP IS NOT GOOD SOLUTON.. Thread.sleep(sleepInMs); } catch (InterruptedException e) { log.error(While sleeping some one interupted this tread probally close method on prodcuer close () ); } } } } {code} 5) When all nodes are disconnected, do you still want to spin the IO Thread ? 6) When you have a firewall rule that says you can only have 2 concurrent TCP connections from Client to Brokers and client still have live TCP connection to same node (Broker), but new TCP connections are rejected. Node State will be marked as Disconnected in initiateConnect ? Is this case handled gracefully ? By the way, thank you very much for quick reply and with new patch. I appreciate your help. Thanks, Bhavesh was (Author: bmis13): Hi [~ewencp], I will not have time to validate this patch till next week. Here is my comments: 1) You still have not address the Producer.close() method issue that in event of network connection lost or other events happens, IO thread will not be killed and close method hangs. In patch that I have provided, I had timeout for join method and interrupted IO thread. I think we need similar for this. 2) Also, can we please add JMX monitoring for IO tread to know how quick it is running. It will great to add this and run() method will report duration to metric. {code} try{ ThreadMXBean bean = ManagementFactory.getThreadMXBean( ); if(bean.isThreadCpuTimeSupported() bean.isThreadCpuTimeEnabled()){ this.ioTheadCPUTime = metrics.sensor
[jira] [Commented] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14229233#comment-14229233 ] Bhavesh Mistry commented on KAFKA-1642: --- I just discovered yesterday that 0.8.1.1 release does not have new producer code base jar officially released jar (kafka-clients) although code is there in 0.8.1.1 branch. That created confusion about porting to 0.8.1.1. Thanks, Bhavesh [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Priority: Blocker Fix For: 0.8.2 Attachments: 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, KAFKA-1642_2014-10-23_16:19:41.patch I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bhavesh Mistry updated KAFKA-1642: -- Affects Version/s: (was: 0.8.1.1) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Priority: Blocker Fix For: 0.8.2 Attachments: 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, KAFKA-1642_2014-10-23_16:19:41.patch I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14228040#comment-14228040 ] Bhavesh Mistry commented on KAFKA-1642: --- [~soumen.sarkar], Time out is one thing, but also IO Thread needs to be safe guarded to see how aggressive it is based on network and data to be send. So it does not consume so much CPU cycle. Thanks, Bhavesh [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.1.1, 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Priority: Blocker Fix For: 0.8.2 Attachments: 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, KAFKA-1642_2014-10-23_16:19:41.patch I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14226751#comment-14226751 ] Bhavesh Mistry edited comment on KAFKA-1642 at 11/26/14 8:08 PM: - [~ewencp], Even setting long following parameter, states of system does get impacted does not matter what reconnect.backoff.ms and retry.backoff.ms is set to. Once Node state is removed, the time out is set to 0. Please see the following logs. #15 minutes reconnect.backoff.ms=90 retry.backoff.ms=90 {code} 2014-11-26 11:01:27.898 Kafka Drop message topic=.rawlog org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 6 ms. 2014-11-26 11:02:27.903 Kafka Drop message topic=.rawlog org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 6 ms. 2014-11-26 11:03:27.903 Kafka Drop message topic=.rawlog org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 6 ms. 2014-11-26 11:04:27.903 Kafka Drop message topic=.rawlog org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 6 ms. 2014-11-26 11:05:27.904 Kafka Drop message topic=.rawlog org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 6 ms. 2014-11-26 11:06:27.905 Kafka Drop message topic=.rawlog org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 6 ms. 2014-11-26 11:07:27.906 Kafka Drop message topic=.rawlog org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 6 ms. 2014-11-26 11:08:27.908 Kafka Drop message topic=.rawlog org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 6 ms. 2014-11-26 11:09:27.908 Kafka Drop message topic=.rawlog org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 6 ms. 2014-11-26 11:10:27.909 Kafka Drop message topic=.rawlog org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 6 ms. 2014-11-26 11:11:27.909 Kafka Drop message topic=.rawlog org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 6 ms. 2014-11-26 11:12:27.910 Kafka Drop message topic=.rawlog org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 6 ms. 2014-11-26 11:13:27.911 Kafka Drop message topic=.rawlog org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 6 ms. 2014-11-26 11:14:27.912 Kafka Drop message topic=.rawlog org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 6 ms. 2014-11-26 11:15:27.914 Kafka Drop message topic=.rawlog org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 6 ms. 2014-11-26 11:00:27.613 [kafka-producer-network-thread | heartbeat] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: 2014-11-26 11:00:27.613 [kafka-producer-network-thread | rawlog] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -1 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:131) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:120) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:407) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:393) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:187) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:184) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) java.lang.IllegalStateException: No entry found for node -3 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:131) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:120) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:407) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:393) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:187) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:184) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) 2014-11-26 11:00:27.613 [kafka-producer-network-thread | heartbeat] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: 2014-11-26 11:00:27.613 [kafka-producer-network-thread | error] ERROR
[jira] [Commented] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14226751#comment-14226751 ] Bhavesh Mistry commented on KAFKA-1642: --- [~ewencp], Even setting long following parameter, states of system does get impacted does not matter what reconnect.backoff.ms and retry.backoff.ms is set to. Once Node state is removed, the time out is set to 0. Please see the following logs. # 15 minutes reconnect.backoff.ms=90 retry.backoff.ms=90 {code} 2014-11-26 11:01:27.898 Kafka Drop message topic=.rawlog org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 6 ms. 2014-11-26 11:02:27.903 Kafka Drop message topic=.rawlog org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 6 ms. 2014-11-26 11:03:27.903 Kafka Drop message topic=.rawlog org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 6 ms. 2014-11-26 11:04:27.903 Kafka Drop message topic=.rawlog org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 6 ms. 2014-11-26 11:05:27.904 Kafka Drop message topic=.rawlog org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 6 ms. 2014-11-26 11:06:27.905 Kafka Drop message topic=.rawlog org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 6 ms. 2014-11-26 11:07:27.906 Kafka Drop message topic=.rawlog org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 6 ms. 2014-11-26 11:08:27.908 Kafka Drop message topic=.rawlog org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 6 ms. 2014-11-26 11:09:27.908 Kafka Drop message topic=.rawlog org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 6 ms. 2014-11-26 11:10:27.909 Kafka Drop message topic=.rawlog org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 6 ms. 2014-11-26 11:11:27.909 Kafka Drop message topic=.rawlog org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 6 ms. 2014-11-26 11:12:27.910 Kafka Drop message topic=.rawlog org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 6 ms. 2014-11-26 11:13:27.911 Kafka Drop message topic=.rawlog org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 6 ms. 2014-11-26 11:14:27.912 Kafka Drop message topic=.rawlog org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 6 ms. 2014-11-26 11:15:27.914 Kafka Drop message topic=.rawlog org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 6 ms. 2014-11-26 11:00:27.613 [kafka-producer-network-thread | heartbeat] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: 2014-11-26 11:00:27.613 [kafka-producer-network-thread | rawlog] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -1 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:131) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:120) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:407) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:393) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:187) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:184) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) java.lang.IllegalStateException: No entry found for node -3 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:131) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:120) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:407) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:393) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:187) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:184) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) 2014-11-26 11:00:27.613 [kafka-producer-network-thread | heartbeat] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: 2014-11-26 11:00:27.613 [kafka-producer-network-thread | error] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught
Re: [DISCUSSION] adding the serializer api back to the new java producer
How will mix bag will work with Consumer side ? Entire site can not be rolled at once so Consumer will have to deals with New and Old Serialize Bytes ? This could be app team responsibility. Are you guys targeting 0.8.2 release, which may break customer who are already using new producer API (beta version). Thanks, Bhavesh On Tue, Nov 25, 2014 at 8:29 AM, Manikumar Reddy ku...@nmsworks.co.in wrote: +1 for this change. what about de-serializer class in 0.8.2? Say i am using new producer with Avro and old consumer combination. then i need to give custom Decoder implementation for Avro right?. On Tue, Nov 25, 2014 at 9:19 PM, Joe Stein joe.st...@stealth.ly wrote: The serializer is an expected use of the producer/consumer now and think we should continue that support in the new client. As far as breaking the API it is why we released the 0.8.2-beta to help get through just these type of blocking issues in a way that the community at large could be involved in easier with a build/binaries to download and use from maven also. +1 on the change now prior to the 0.8.2 release. - Joe Stein On Mon, Nov 24, 2014 at 11:43 PM, Sriram Subramanian srsubraman...@linkedin.com.invalid wrote: Looked at the patch. +1 from me. On 11/24/14 8:29 PM, Gwen Shapira gshap...@cloudera.com wrote: As one of the people who spent too much time building Avro repositories, +1 on bringing serializer API back. I think it will make the new producer easier to work with. Gwen On Mon, Nov 24, 2014 at 6:13 PM, Jay Kreps jay.kr...@gmail.com wrote: This is admittedly late in the release cycle to make a change. To add to Jun's description the motivation was that we felt it would be better to change that interface now rather than after the release if it needed to change. The motivation for wanting to make a change was the ability to really be able to develop support for Avro and other serialization formats. The current status is pretty scattered--there is a schema repository on an Avro JIRA and another fork of that on github, and a bunch of people we have talked to have done similar things for other serialization systems. It would be nice if these things could be packaged in such a way that it was possible to just change a few configs in the producer and get rich metadata support for messages. As we were thinking this through we realized that the new api we were about to introduce was kind of not very compatable with this since it was just byte[] oriented. You can always do this by adding some kind of wrapper api that wraps the producer. But this puts us back in the position of trying to document and support multiple interfaces. This also opens up the possibility of adding a MessageValidator or MessageInterceptor plug-in transparently so that you can do other custom validation on the messages you are sending which obviously requires access to the original object not the byte array. This api doesn't prevent using byte[] by configuring the ByteArraySerializer it works as it currently does. -Jay On Mon, Nov 24, 2014 at 5:58 PM, Jun Rao jun...@gmail.com wrote: Hi, Everyone, I'd like to start a discussion on whether it makes sense to add the serializer api back to the new java producer. Currently, the new java producer takes a byte array for both the key and the value. While this api is simple, it pushes the serialization logic into the application. This makes it hard to reason about what type of data is being sent to Kafka and also makes it hard to share an implementation of the serializer. For example, to support Avro, the serialization logic could be quite involved since it might need to register the Avro schema in some remote registry and maintain a schema cache locally, etc. Without a serialization api, it's impossible to share such an implementation so that people can easily reuse. We sort of overlooked this implication during the initial discussion of the producer api. So, I'd like to propose an api change to the new producer by adding back the serializer api similar to what we had in the old producer. Specially, the proposed api changes are the following. First, we change KafkaProducer to take generic types K and V for the key and the value, respectively. public class KafkaProducerK,V implements ProducerK,V { public FutureRecordMetadata send(ProducerRecordK,V record, Callback callback); public FutureRecordMetadata send(ProducerRecordK,V record); } Second, we add two new configs, one for the key serializer and another for the value serializer. Both
[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14223161#comment-14223161 ] Bhavesh Mistry edited comment on KAFKA-1642 at 11/24/14 5:26 PM: - [~ewencp], The way to reproduce this is to simulate network instability by turning on and off network service (or turn on/off physical cable). The connect and see if recover and disconnect and connect again etc.. you will see the behavior again and again. The issue is also with connection state management : {code} private void initiateConnect(Node node, long now) { try { log.debug(Initiating connection to node {} at {}:{}., node.id(), node.host(), node.port()); // TODO FIX java.lang.IllegalStateException: No entry found for node -3 (We need put before remove it..).. this.connectionStates.connecting(node.id(), now); (This line has problem because it will loose previous last attempt made get above exception and it will try to connect to that node for ever and ever with exception ) selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer); } catch (IOException e) { /* attempt failed, we'll try again after the backoff */ connectionStates.disconnectedWhenConnectting(node.id()); /* maybe the problem is our metadata, update it */ metadata.requestUpdate(); log.debug(Error connecting to node {} at {}:{}:, node.id(), node.host(), node.port(), e); } } {code} In my opinion, regardless of what node status is in run() method needs to be safe-guarded from still CPU Cycle when there is no state for Node. (Hence I have added exponential sleep as temp solution to not to still CPU cycle , I think must protect it some how and check the execution time...) Please let me know if you need more info and i will be more than happy to reproduce bug and we can have conference call, and I can show you the problem. Thanks, Bhavesh was (Author: bmis13): [~ewencp], The way to reproduce this is to simulate network instability by turning on and off network service (or turn on/off physical cable). The connect and see if recover and disconnect and connect again etc.. you will see the behavior again and again. The issue is also with connection state management : {code} private void initiateConnect(Node node, long now) { try { log.debug(Initiating connection to node {} at {}:{}., node.id(), node.host(), node.port()); // TODO FIX java.lang.IllegalStateException: No entry found for node -3 (We need put before remove it..).. this.connectionStates.connecting(node.id(), now); (This line has problem because it will loose previous last attempt made get above exception and it will try to connect to that node for ever and ever with exception ) selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer); } catch (IOException e) { /* attempt failed, we'll try again after the backoff */ connectionStates.disconnectedWhenConnectting(node.id()); /* maybe the problem is our metadata, update it */ metadata.requestUpdate(); log.debug(Error connecting to node {} at {}:{}:, node.id(), node.host(), node.port(), e); } } {code] In my opinion, regardless of what node status is in run() method needs to be safe-guarded from still CPU Cycle when there is no state for Node. (Hence I have added exponential sleep as temp solution to not to still CPU cycle , I think must protect it some how and check the execution time...) Please let me know if you need more info and i will be more than happy to reproduce bug and we can have conference call, and I can show you the problem. Thanks, Bhavesh [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Fix For: 0.8.2 Attachments: 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, KAFKA-1642_2014-10-23_16:19:41.patch I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR
[jira] [Commented] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14223161#comment-14223161 ] Bhavesh Mistry commented on KAFKA-1642: --- [~ewencp], The way to reproduce this is to simulate network instability by turning on and off network service (or turn on/off physical cable). The connect and see if recover and disconnect and connect again etc.. you will see the behavior again and again. The issue is also with connection state management : {code} private void initiateConnect(Node node, long now) { try { log.debug(Initiating connection to node {} at {}:{}., node.id(), node.host(), node.port()); // TODO FIX java.lang.IllegalStateException: No entry found for node -3 (We need put before remove it..).. this.connectionStates.connecting(node.id(), now); (This line has problem because it will loose previous last attempt made get above exception and it will try to connect to that node for ever and ever with exception ) selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer); } catch (IOException e) { /* attempt failed, we'll try again after the backoff */ connectionStates.disconnectedWhenConnectting(node.id()); /* maybe the problem is our metadata, update it */ metadata.requestUpdate(); log.debug(Error connecting to node {} at {}:{}:, node.id(), node.host(), node.port(), e); } } {code] In my opinion, regardless of what node status is in run() method needs to be safe-guarded from still CPU Cycle when there is no state for Node. (Hence I have added exponential sleep as temp solution to not to still CPU cycle , I think must protect it some how and check the execution time...) Please let me know if you need more info and i will be more than happy to reproduce bug and we can have conference call, and I can show you the problem. Thanks, Bhavesh [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Fix For: 0.8.2 Attachments: 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, KAFKA-1642_2014-10-23_16:19:41.patch I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14223161#comment-14223161 ] Bhavesh Mistry edited comment on KAFKA-1642 at 11/24/14 5:27 PM: - [~ewencp], The way to reproduce this is to simulate network instability by turning on and off network service (or turn on/off physical cable). The connect and see if recover and disconnect and connect again etc.. you will see the behavior again and again. The issue is also with connection state management : {code} private void initiateConnect(Node node, long now) { try { log.debug(Initiating connection to node {} at {}:{}., node.id(), node.host(), node.port()); // TODO FIX java.lang.IllegalStateException: No entry found for node -3 (We need put before remove it..).. this.connectionStates.connecting(node.id(), now); (This line has problem because it will loose previous last attempt made get above exception and it will try to connect to that node for ever and ever with exception ) selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer); } catch (IOException e) { /* attempt failed, we'll try again after the backoff */ connectionStates.disconnectedWhenConnectting(node.id()); /* maybe the problem is our metadata, update it */ metadata.requestUpdate(); log.debug(Error connecting to node {} at {}:{}:, node.id(), node.host(), node.port(), e); } } {code} In my opinion, regardless of what node status is in run() method needs to be safe-guarded from still CPU Cycle when there is no state for Node. (Hence I have added exponential sleep as temp solution to not to still CPU cycle , I think must protect it some how and check the execution time...) Please let me know if you need more info and i will be more than happy to reproduce bug and we can have conference call, and I can show you the problem. Based on code diff I have done from 0.8.1.1 tag and this. This issue also occur in 0.8.1.1 as well I think. Thanks, Bhavesh was (Author: bmis13): [~ewencp], The way to reproduce this is to simulate network instability by turning on and off network service (or turn on/off physical cable). The connect and see if recover and disconnect and connect again etc.. you will see the behavior again and again. The issue is also with connection state management : {code} private void initiateConnect(Node node, long now) { try { log.debug(Initiating connection to node {} at {}:{}., node.id(), node.host(), node.port()); // TODO FIX java.lang.IllegalStateException: No entry found for node -3 (We need put before remove it..).. this.connectionStates.connecting(node.id(), now); (This line has problem because it will loose previous last attempt made get above exception and it will try to connect to that node for ever and ever with exception ) selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer); } catch (IOException e) { /* attempt failed, we'll try again after the backoff */ connectionStates.disconnectedWhenConnectting(node.id()); /* maybe the problem is our metadata, update it */ metadata.requestUpdate(); log.debug(Error connecting to node {} at {}:{}:, node.id(), node.host(), node.port(), e); } } {code} In my opinion, regardless of what node status is in run() method needs to be safe-guarded from still CPU Cycle when there is no state for Node. (Hence I have added exponential sleep as temp solution to not to still CPU cycle , I think must protect it some how and check the execution time...) Please let me know if you need more info and i will be more than happy to reproduce bug and we can have conference call, and I can show you the problem. Thanks, Bhavesh [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Fix For: 0.8.2 Attachments: 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, KAFKA-1642_2014-10-23_16:19:41.patch I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following
[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14223161#comment-14223161 ] Bhavesh Mistry edited comment on KAFKA-1642 at 11/24/14 6:57 PM: - [~ewencp], The way to reproduce this is to simulate network instability by turning on and off network service (or turn on/off physical cable). The connect and see if recover and disconnect and connect again etc.. you will see the behavior again and again. The issue is also with connection state management : {code} private void initiateConnect(Node node, long now) { try { log.debug(Initiating connection to node {} at {}:{}., node.id(), node.host(), node.port()); // TODO FIX java.lang.IllegalStateException: No entry found for node -3 (We need put before remove it..).. this.connectionStates.connecting(node.id(), now); (This line has problem because it will loose previous last attempt made get above exception and it will try to connect to that node for ever and ever with exception ) selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer); } catch (IOException e) { /* attempt failed, we'll try again after the backoff */ connectionStates.disconnectedWhenConnectting(node.id()); /* maybe the problem is our metadata, update it */ metadata.requestUpdate(); log.debug(Error connecting to node {} at {}:{}:, node.id(), node.host(), node.port(), e); } } {code} In my opinion, regardless of what node status is in run() method needs to be safe-guarded from still CPU Cycle when there is no state for Node. (Hence I have added exponential sleep as temp solution to not to stealing CPU cycle , I think must protect it some how and must check the execution time...) Please let me know if you need more info and i will be more than happy to reproduce bug and we can have conference call, and I can show you the problem. Based on code diff I have done from 0.8.1.1 tag and this. This issue also occur in 0.8.1.1 as well I think. Thanks, Bhavesh was (Author: bmis13): [~ewencp], The way to reproduce this is to simulate network instability by turning on and off network service (or turn on/off physical cable). The connect and see if recover and disconnect and connect again etc.. you will see the behavior again and again. The issue is also with connection state management : {code} private void initiateConnect(Node node, long now) { try { log.debug(Initiating connection to node {} at {}:{}., node.id(), node.host(), node.port()); // TODO FIX java.lang.IllegalStateException: No entry found for node -3 (We need put before remove it..).. this.connectionStates.connecting(node.id(), now); (This line has problem because it will loose previous last attempt made get above exception and it will try to connect to that node for ever and ever with exception ) selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer); } catch (IOException e) { /* attempt failed, we'll try again after the backoff */ connectionStates.disconnectedWhenConnectting(node.id()); /* maybe the problem is our metadata, update it */ metadata.requestUpdate(); log.debug(Error connecting to node {} at {}:{}:, node.id(), node.host(), node.port(), e); } } {code} In my opinion, regardless of what node status is in run() method needs to be safe-guarded from still CPU Cycle when there is no state for Node. (Hence I have added exponential sleep as temp solution to not to still CPU cycle , I think must protect it some how and check the execution time...) Please let me know if you need more info and i will be more than happy to reproduce bug and we can have conference call, and I can show you the problem. Based on code diff I have done from 0.8.1.1 tag and this. This issue also occur in 0.8.1.1 as well I think. Thanks, Bhavesh [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Fix For: 0.8.2 Attachments: 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, KAFKA-1642_2014-10-23_16:19:41.patch I see my CPU
[jira] [Commented] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14223532#comment-14223532 ] Bhavesh Mistry commented on KAFKA-1642: --- Also Regarding KafkaProder.close() method hangs for ever because of following loop, and {code} Sender.java // okay we stopped accepting requests but there may still be // requests in the accumulator or waiting for acknowledgment, // wait until these are completed. while (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() 0) { try { run(time.milliseconds()); } catch (Exception e) { log.error(Uncaught error in kafka producer I/O thread: , e); } } KafkaProducer.java /** * Close this producer. This method blocks until all in-flight requests complete. */ @Override public void close() { log.trace(Closing the Kafka producer.); this.sender.initiateClose(); try { this.ioThread.join(); // THIS IS BLOCKED since ioThread does not give up. } catch (InterruptedException e) { throw new KafkaException(e); } this.metrics.close(); log.debug(The Kafka producer has closed.); } {code} The issue describe in KAFKA-1788 is likelihood, but if you look the close call stack then calling thread that initiated the close() will hang till io thread dies (which it never dies when data is there and network is down). Thanks, Bhavesh [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Fix For: 0.8.2 Attachments: 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, KAFKA-1642_2014-10-23_16:19:41.patch I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14223532#comment-14223532 ] Bhavesh Mistry edited comment on KAFKA-1642 at 11/24/14 9:21 PM: - [~ewencp], Also Regarding KafkaProder.close() method hangs for ever because of following loop, and {code} Sender.java // okay we stopped accepting requests but there may still be // requests in the accumulator or waiting for acknowledgment, // wait until these are completed. while (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() 0) { try { run(time.milliseconds()); } catch (Exception e) { log.error(Uncaught error in kafka producer I/O thread: , e); } } KafkaProducer.java /** * Close this producer. This method blocks until all in-flight requests complete. */ @Override public void close() { log.trace(Closing the Kafka producer.); this.sender.initiateClose(); try { this.ioThread.join(); // THIS IS BLOCKED since ioThread does not give up. } catch (InterruptedException e) { throw new KafkaException(e); } this.metrics.close(); log.debug(The Kafka producer has closed.); } {code} The issue describe in KAFKA-1788 is likelihood, but if you look the close call stack then calling thread that initiated the close() will hang till io thread dies (which it never dies when data is there and network is down). Thanks, Bhavesh was (Author: bmis13): Also Regarding KafkaProder.close() method hangs for ever because of following loop, and {code} Sender.java // okay we stopped accepting requests but there may still be // requests in the accumulator or waiting for acknowledgment, // wait until these are completed. while (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() 0) { try { run(time.milliseconds()); } catch (Exception e) { log.error(Uncaught error in kafka producer I/O thread: , e); } } KafkaProducer.java /** * Close this producer. This method blocks until all in-flight requests complete. */ @Override public void close() { log.trace(Closing the Kafka producer.); this.sender.initiateClose(); try { this.ioThread.join(); // THIS IS BLOCKED since ioThread does not give up. } catch (InterruptedException e) { throw new KafkaException(e); } this.metrics.close(); log.debug(The Kafka producer has closed.); } {code} The issue describe in KAFKA-1788 is likelihood, but if you look the close call stack then calling thread that initiated the close() will hang till io thread dies (which it never dies when data is there and network is down). Thanks, Bhavesh [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Fix For: 0.8.2 Attachments: 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, KAFKA-1642_2014-10-23_16:19:41.patch I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14223532#comment-14223532 ] Bhavesh Mistry edited comment on KAFKA-1642 at 11/24/14 9:22 PM: - [~ewencp], Also Regarding KafkaProder.close() method hangs for ever because of following loop, and {code} Sender.java // okay we stopped accepting requests but there may still be // requests in the accumulator or waiting for acknowledgment, // wait until these are completed. while (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() 0) { try { run(time.milliseconds()); } catch (Exception e) { log.error(Uncaught error in kafka producer I/O thread: , e); } } KafkaProducer.java /** * Close this producer. This method blocks until all in-flight requests complete. */ @Override public void close() { log.trace(Closing the Kafka producer.); this.sender.initiateClose(); try { this.ioThread.join(); // THIS IS BLOCKED since ioThread does not give up so it is all related in my opinion. } catch (InterruptedException e) { throw new KafkaException(e); } this.metrics.close(); log.debug(The Kafka producer has closed.); } {code} The issue describe in KAFKA-1788 is likelihood, but if you look the close call stack then calling thread that initiated the close() will hang till io thread dies (which it never dies when data is there and network is down). Thanks, Bhavesh was (Author: bmis13): [~ewencp], Also Regarding KafkaProder.close() method hangs for ever because of following loop, and {code} Sender.java // okay we stopped accepting requests but there may still be // requests in the accumulator or waiting for acknowledgment, // wait until these are completed. while (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() 0) { try { run(time.milliseconds()); } catch (Exception e) { log.error(Uncaught error in kafka producer I/O thread: , e); } } KafkaProducer.java /** * Close this producer. This method blocks until all in-flight requests complete. */ @Override public void close() { log.trace(Closing the Kafka producer.); this.sender.initiateClose(); try { this.ioThread.join(); // THIS IS BLOCKED since ioThread does not give up. } catch (InterruptedException e) { throw new KafkaException(e); } this.metrics.close(); log.debug(The Kafka producer has closed.); } {code} The issue describe in KAFKA-1788 is likelihood, but if you look the close call stack then calling thread that initiated the close() will hang till io thread dies (which it never dies when data is there and network is down). Thanks, Bhavesh [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Fix For: 0.8.2 Attachments: 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, KAFKA-1642_2014-10-23_16:19:41.patch I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14223571#comment-14223571 ] Bhavesh Mistry commented on KAFKA-1642: --- Here is exact steps how to reproducer it bug: (Must have demon program continuously running). 1) Start with happy Situation where all borkers are up everything is running fine. And verify all top -pid JAVA_PID and your kit (kafka network threads are taking less than 4% CPU). 2) Shutdown network (turn off network or pull the eth0 cable) wait for while and you will see that CPU spike to 325% under top (if you have 4 producer) and verify your kit is showing 25% CPU consumption for for each Kafka io thread. 3) Connect back the network ( Spike will still be there but CPU after while come down to 100% or so ) and remain connected for while. 4) again simulate network failure (to simulate network instability) repeat steps again 1 to 4 but wait for 10 or so minutes in between and you will see the trends of CPU spike along with above exception. java.lang.IllegalStateException: No entry found for node -2 Also, I see that Kafka is logging excessively when network is down (your kit shows it is taking more CPU Cycle as compare to normal) Thanks, Bhavesh [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Fix For: 0.8.2 Attachments: 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, KAFKA-1642_2014-10-23_16:19:41.patch I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14223626#comment-14223626 ] Bhavesh Mistry commented on KAFKA-1642: --- Also, there is issue in my last patch. I did not update the lastConnectAttemptMs...in connecting. {code} /** * Enter the connecting state for the given node. * @param node The id of the node we are connecting to * @param now The current time. */ public void connecting(int node, long now) { NodeConnectionState nodeConn = nodeState.get(node); if(nodeConn == null){ nodeState.put(node, new NodeConnectionState(ConnectionState.CONNECTING, now)); }else{ nodeConn.state = ConnectionState.CONNECTING; nodeConn.lastConnectAttemptMs = now; (This will capture and update last connection attempt) } } {code} [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Fix For: 0.8.2 Attachments: 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, KAFKA-1642_2014-10-23_16:19:41.patch I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14223626#comment-14223626 ] Bhavesh Mistry edited comment on KAFKA-1642 at 11/24/14 10:16 PM: -- Also, there is issue in my experimental patch. I did not update the lastConnectAttemptMs...in connecting state method to solve the issue with illegal sate exp: {code} /** * Enter the connecting state for the given node. * @param node The id of the node we are connecting to * @param now The current time. */ public void connecting(int node, long now) { NodeConnectionState nodeConn = nodeState.get(node); if(nodeConn == null){ nodeState.put(node, new NodeConnectionState(ConnectionState.CONNECTING, now)); }else{ nodeConn.state = ConnectionState.CONNECTING; nodeConn.lastConnectAttemptMs = now; (This will capture and update last connection attempt) } } {code} was (Author: bmis13): Also, there is issue in my last patch. I did not update the lastConnectAttemptMs...in connecting. {code} /** * Enter the connecting state for the given node. * @param node The id of the node we are connecting to * @param now The current time. */ public void connecting(int node, long now) { NodeConnectionState nodeConn = nodeState.get(node); if(nodeConn == null){ nodeState.put(node, new NodeConnectionState(ConnectionState.CONNECTING, now)); }else{ nodeConn.state = ConnectionState.CONNECTING; nodeConn.lastConnectAttemptMs = now; (This will capture and update last connection attempt) } } {code} [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Fix For: 0.8.2 Attachments: 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, KAFKA-1642_2014-10-23_16:19:41.patch I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bhavesh Mistry updated KAFKA-1642: -- Affects Version/s: 0.8.1.1 [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.1.1, 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Fix For: 0.8.2 Attachments: 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, KAFKA-1642_2014-10-23_16:19:41.patch I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14223779#comment-14223779 ] Bhavesh Mistry commented on KAFKA-1642: --- [~ewencp], Thanks for looking into this really appreciate your response. Also, do you think rapid connect and disconnect is also due to incorrect Node state management ? connecting method and initiateConnection also ? Thanks, Bhavesh [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.1.1, 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Priority: Blocker Fix For: 0.8.2 Attachments: 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, KAFKA-1642_2014-10-23_16:19:41.patch I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14223779#comment-14223779 ] Bhavesh Mistry edited comment on KAFKA-1642 at 11/25/14 1:31 AM: - [~ewencp], Thanks for looking into this really appreciate your response. Also, do you think rapid connect and disconnect is also due to incorrect Node state management ? connecting method and initiateConnection also ? Also, Can we also take the defensive coding and have protection in this tight infinite loop to throttle CPU cycle if it ends up with start-end duration is below some xx ms. This will actually prevent this issues.We had this issue on Prod so I just wanted to highlight the impact of 325% CPU and excessive logging. Thanks, Bhavesh was (Author: bmis13): [~ewencp], Thanks for looking into this really appreciate your response. Also, do you think rapid connect and disconnect is also due to incorrect Node state management ? connecting method and initiateConnection also ? Thanks, Bhavesh [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.1.1, 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Priority: Blocker Fix For: 0.8.2 Attachments: 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, KAFKA-1642_2014-10-23_16:19:41.patch I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14224039#comment-14224039 ] Bhavesh Mistry commented on KAFKA-1642: --- Here are some more cases to reproduce this simulating network connection issue with one of brokers only and still problem persist: Case 1: brokers connection is down (note according to ZK leader for partition still with b1 ) Have tree brokers: b1, b2, b3 1) Start your daemon program and keep sending data to all the brokers and continue sending some data 2) Observed that you have data netstat -a | grep b1|b2|b3 (keep pumping data for 5 minutes and observed normal behavior using top -pid or top -p java_pid ) 3) Simulate a network connection or problem establishing new TCP connection via following as java program still continues to pump data aggressively (please note TCP connection to B1 still active and connected) a) sudo vi /etc/hosts 2) add entry b1 127.0.0.1 b) /etc/init.d/network restart after while (5 to 7 minutes you will see the issue but keep pumping data, and also repeat this for b2 it will be more CPU consumption) 4) Under a heavy dumping data, now producer will try to establish new TCP connection to B1 and it will get connection refused (Note that CPU spikes up again and remain in state) just because could not establish. Case 2) Simulate Firewall rule such as you are only allowed (4 TCP connection to each brokers) Do step 1,2 and 3 above. 4) use Iptable rule to reject To start an enforcing fire wall: iptables -A OUTPUT -p tcp -m tcp -d b1 --dport 9092 -j REJECT 5) Still pump data will while iptable rejects ( you will see CPU spike to to 200% more depending on # of producer) To recover : iptables -D OUTPUT -p tcp -m tcp -d b1 --dport 9092 -j REJECT [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.1.1, 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Priority: Blocker Fix For: 0.8.2 Attachments: 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, KAFKA-1642_2014-10-23_16:19:41.patch I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14224041#comment-14224041 ] Bhavesh Mistry commented on KAFKA-1642: --- [~ewencp], I hope above steps will give you comprehensive steps to reproduce problems with run() method. It would be really great if we can make the client more resilient and robust so network and brokers instability does not cause CPU spikes and degrade application performance. Hence, I would strongly at least detect the time run(time) is taking and do based on some configuration, we can do CPU Throttling just to be more defensive or at lest detect that io thread is taking CPU cycle. By the way the experimental patch still works for steps describe above. Thanks, Bhavesh [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.1.1, 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Priority: Blocker Fix For: 0.8.2 Attachments: 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, KAFKA-1642_2014-10-23_16:19:41.patch I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14224041#comment-14224041 ] Bhavesh Mistry edited comment on KAFKA-1642 at 11/25/14 4:37 AM: - [~ewencp], I hope above steps will give you comprehensive steps to reproduce problems with run() method. It would be really great if we can make the client more resilient and robust so network and brokers instability does not cause CPU spikes and degrade application performance. Hence, I would strongly at least detect the time run(time) is taking and do based on some configuration, we can do CPU Throttling just to be more defensive or at lest detect that io thread is taking CPU cycle. By the way the experimental patch still works for steps describe above as well due to hard coded back-off. Any time you have patch or any thing, please let me know I will test it out. Once thanks for your detail analysis. Please look into to ClusterConnectionStates and how it manage the state of node when disconnecting immediately . please look into connecting(int node, long now) and this (I feel connecting needs to come before not after). selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer); this.connectionStates.connecting(node.id(), now); Thanks, Bhavesh was (Author: bmis13): [~ewencp], I hope above steps will give you comprehensive steps to reproduce problems with run() method. It would be really great if we can make the client more resilient and robust so network and brokers instability does not cause CPU spikes and degrade application performance. Hence, I would strongly at least detect the time run(time) is taking and do based on some configuration, we can do CPU Throttling just to be more defensive or at lest detect that io thread is taking CPU cycle. By the way the experimental patch still works for steps describe above. Thanks, Bhavesh [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.1.1, 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Priority: Blocker Fix For: 0.8.2 Attachments: 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, KAFKA-1642_2014-10-23_16:19:41.patch I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14224041#comment-14224041 ] Bhavesh Mistry edited comment on KAFKA-1642 at 11/25/14 4:39 AM: - [~ewencp], I hope above steps will give you comprehensive steps to reproduce problems with run() method. It would be really great if we can make the client more resilient and robust so network and brokers instability does not cause CPU spikes and degrade application performance. Hence, I would strongly at least detect the time run(time) is taking and do some stats based on some configuration, we can do CPU Throttling (if need) just to be more defensive or at lest detect that io thread is taking CPU cycle. By the way the experimental patch still works for steps describe above as well due to hard coded back-off. Any time you have patch or any thing, please let me know I will test it out ( you have my email id) . Once again thanks for your detail analysis and looking at this at short notice. Please look into to ClusterConnectionStates and how it manage the state of node when disconnecting immediately . please look into connecting(int node, long now) and this (I feel connecting needs to come before not after). selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer); this.connectionStates.connecting(node.id(), now); Thanks, Bhavesh was (Author: bmis13): [~ewencp], I hope above steps will give you comprehensive steps to reproduce problems with run() method. It would be really great if we can make the client more resilient and robust so network and brokers instability does not cause CPU spikes and degrade application performance. Hence, I would strongly at least detect the time run(time) is taking and do based on some configuration, we can do CPU Throttling just to be more defensive or at lest detect that io thread is taking CPU cycle. By the way the experimental patch still works for steps describe above as well due to hard coded back-off. Any time you have patch or any thing, please let me know I will test it out. Once thanks for your detail analysis. Please look into to ClusterConnectionStates and how it manage the state of node when disconnecting immediately . please look into connecting(int node, long now) and this (I feel connecting needs to come before not after). selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer); this.connectionStates.connecting(node.id(), now); Thanks, Bhavesh [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.1.1, 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Priority: Blocker Fix For: 0.8.2 Attachments: 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, KAFKA-1642_2014-10-23_16:19:41.patch I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14224041#comment-14224041 ] Bhavesh Mistry edited comment on KAFKA-1642 at 11/25/14 4:40 AM: - [~ewencp], I hope above steps will give you comprehensive steps to reproduce problems with run() method. It would be really great if we can make the client more resilient and robust so network and brokers instability does not cause CPU spikes and degrade application performance. Hence, I would strongly at least detect the time run(time) is taking and do some stats based on some configuration, we can do CPU Throttling (if need) just to be more defensive or at lest detect that io thread is taking CPU cycle. By the way the experimental patch still works for steps describe above as well due to hard coded back-off. Any time you have patch or any thing, please let me know I will test it out ( you have my email id) . Once again thanks for your detail analysis and looking at this at short notice. Please look into to ClusterConnectionStates and how it manage the state of node when disconnecting immediately . please look into connecting(int node, long now) and this (I feel connecting needs to come before not after). selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer); this.connectionStates.connecting(node.id(), now); Also, I still feel that produce.close() is also needs to be looked at (join() method with come configuration time out) Thanks, Bhavesh was (Author: bmis13): [~ewencp], I hope above steps will give you comprehensive steps to reproduce problems with run() method. It would be really great if we can make the client more resilient and robust so network and brokers instability does not cause CPU spikes and degrade application performance. Hence, I would strongly at least detect the time run(time) is taking and do some stats based on some configuration, we can do CPU Throttling (if need) just to be more defensive or at lest detect that io thread is taking CPU cycle. By the way the experimental patch still works for steps describe above as well due to hard coded back-off. Any time you have patch or any thing, please let me know I will test it out ( you have my email id) . Once again thanks for your detail analysis and looking at this at short notice. Please look into to ClusterConnectionStates and how it manage the state of node when disconnecting immediately . please look into connecting(int node, long now) and this (I feel connecting needs to come before not after). selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer); this.connectionStates.connecting(node.id(), now); Thanks, Bhavesh [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.1.1, 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Priority: Blocker Fix For: 0.8.2 Attachments: 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, KAFKA-1642_2014-10-23_16:19:41.patch I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14224046#comment-14224046 ] Bhavesh Mistry commented on KAFKA-1642: --- Also, Are you going to port back the back to 0.8.1.1 version as well ? Please let me know also. Thanks, Bhavesh [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.1.1, 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Priority: Blocker Fix For: 0.8.2 Attachments: 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, KAFKA-1642_2014-10-23_16:19:41.patch I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14224046#comment-14224046 ] Bhavesh Mistry edited comment on KAFKA-1642 at 11/25/14 4:43 AM: - Also, Are you going to port back the patch to 0.8.1.1 version as well ? Please let me know also. Thanks, Bhavesh was (Author: bmis13): Also, Are you going to port back the back to 0.8.1.1 version as well ? Please let me know also. Thanks, Bhavesh [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.1.1, 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Priority: Blocker Fix For: 0.8.2 Attachments: 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, KAFKA-1642_2014-10-23_16:19:41.patch I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14224041#comment-14224041 ] Bhavesh Mistry edited comment on KAFKA-1642 at 11/25/14 5:37 AM: - [~ewencp], I hope above steps will give you comprehensive steps to reproduce problems with run() method. It would be really great if we can make the client more resilient and robust so network and brokers instability does not cause CPU spikes and degrade application performance. Hence, I would strongly at least detect the time run(time) is taking and do some stats based on some configuration, we can do CPU Throttling (if need) just to be more defensive or at lest detect that io thread is taking CPU cycle. By the way the experimental patch still works for steps describe above as well due to hard coded back-off. Any time you have patch or any thing, please let me know I will test it out ( you have my email id) . Once again thanks for your detail analysis and looking at this at short notice. Please look into to ClusterConnectionStates and how it manage the state of node when disconnecting immediately . please look into connecting(int node, long now) and this (I feel connecting needs to come before not after). selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer); this.connectionStates.connecting(node.id(), now); Also, I still feel that produce.close() is also needs to be looked at (join() method with some configuration time out so thread does not hang) Thanks, Bhavesh was (Author: bmis13): [~ewencp], I hope above steps will give you comprehensive steps to reproduce problems with run() method. It would be really great if we can make the client more resilient and robust so network and brokers instability does not cause CPU spikes and degrade application performance. Hence, I would strongly at least detect the time run(time) is taking and do some stats based on some configuration, we can do CPU Throttling (if need) just to be more defensive or at lest detect that io thread is taking CPU cycle. By the way the experimental patch still works for steps describe above as well due to hard coded back-off. Any time you have patch or any thing, please let me know I will test it out ( you have my email id) . Once again thanks for your detail analysis and looking at this at short notice. Please look into to ClusterConnectionStates and how it manage the state of node when disconnecting immediately . please look into connecting(int node, long now) and this (I feel connecting needs to come before not after). selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer); this.connectionStates.connecting(node.id(), now); Also, I still feel that produce.close() is also needs to be looked at (join() method with come configuration time out) Thanks, Bhavesh [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.1.1, 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Priority: Blocker Fix For: 0.8.2 Attachments: 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, KAFKA-1642_2014-10-23_16:19:41.patch I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14222571#comment-14222571 ] Bhavesh Mistry edited comment on KAFKA-1642 at 11/24/14 1:31 AM: - The patch provided does not solve the problem. When you have more than one or more producer instance, the effect amplifies. org.apache.kafka.clients.producer.internals.Send.run() takes 100% CPU due to infinite loop when there is no brokers (no work to be done to dump data). Thanks, Bhavesh was (Author: bmis13): The patch provided does not solve the problem. When you have more than one producer instance, the effect amplifies. org.apache.kafka.clients.producer.internals.Send.run() takes 100% CPU due to infinite loop when there is no brokers. Thanks, Bhavesh [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Fix For: 0.8.2 Attachments: KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, KAFKA-1642_2014-10-23_16:19:41.patch I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Reopened] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bhavesh Mistry reopened KAFKA-1642: --- The patch provided does not solve the problem. When you have more than one producer instance, the effect amplifies. org.apache.kafka.clients.producer.internals.Send.run() takes 100% CPU due to infinite loop when there is no brokers. Thanks, Bhavesh [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Fix For: 0.8.2 Attachments: KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, KAFKA-1642_2014-10-23_16:19:41.patch I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bhavesh Mistry updated KAFKA-1642: -- Attachment: 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch Please take look at experimental patch that solve this problem by capturing the correct Node state and also not so elegant by exponential backoff run() method by sleeping (many of the value is hard coded but it is just experimental). Also, there is another problem close() method on producer does not exit and JVM does not gracefully shutdown because io thread is spinning in while loop during network outage. This is also another edge case. I hope this will be very helpful and solve problem. Thanks, Bhavesh [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Fix For: 0.8.2 Attachments: 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, KAFKA-1642_2014-10-23_16:19:41.patch I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Welcome Kafka's newest committer
Congratulation Guozhang !! Thanks for all your help and quick responses. Thanks, Bhavesh On Thu, Nov 20, 2014 at 8:23 AM, Ashish Singh asi...@cloudera.com wrote: Congratulations Guozhang! On Thu, Nov 20, 2014 at 6:09 AM, Jarek Jarcec Cecho jar...@apache.org wrote: Congratulations Guozhang, well deserved! Jarcec On Nov 19, 2014, at 4:05 PM, Neha Narkhede neha.narkh...@gmail.com wrote: Hi everyone, I'm very happy to announce that the Kafka PMC has invited Guozhang Wang to become a committer. Guozhang has made significant contributions to Kafka over the past year, along with being very active on code reviews and the mailing list. Please join me in welcoming him. Thanks, Neha (on behalf of the Kafka PMC) -- Regards, Ashish
Re: How to recover from ConsumerRebalanceFailedException ?
Hi Kakfa team, So just monitor ZkClient-EventThread http://zkclient-eventthread-65-dare-msgq00.sv.walmartlabs.com:9091/* threads via ThreadInfo[] threads = ManagementFactory.getThreadMXBean().; and if this ZkClient-EventThread http://zkclient-eventthread-65-dare-msgq00.sv.walmartlabs.com:9091/* dies thread dies, then restart the sources. Is there any alter approach or life cycle method that so api consumer can attached to Consumer life cycle that it is dying and get notified so we can take some action. Thanks, Bhavesh On Mon, Nov 17, 2014 at 2:30 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Hi Kafka Team, I get following exception due to ZK/Network issues intermittently. How do I recover from consumer thread dying *programmatically* and restart source because we have alerts that due to this error we have partition OWNERSHIP is *none* ? Please let me know how to restart source and detect consumer thread died and need to be restarted ? 17 Nov 2014 04:29:41,180 ERROR [ ZkClient-EventThread-65-dare-msgq00.sv.walmartlabs.com:9091, dare-msgq01.sv.walmartlabs.com:9091,dare-msgq02.sv.walmartlabs.com:9091] (org.I0Itec.zkclient.ZkEventThread.run:77) - Error handling event ZkEvent[New session event sent to kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener@552c5ea8 ] kafka.common.ConsumerRebalanceFailedException: mupd_statsd_logmon_metric_events18_sdc-stg-logsearch-app1-1416023311208-14501d68 can't rebalance after 8 retries at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:626) at kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener.handleNewSession(ZookeeperConsumerConnector.scala:481) at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472) at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) ZK Connection Issues: java.net.SocketException: Transport endpoint is not connected at sun.nio.ch.SocketChannelImpl.shutdown(Native Method) at sun.nio.ch.SocketChannelImpl.shutdownInput(SocketChannelImpl.java:633) at sun.nio.ch.SocketAdaptor.shutdownInput(SocketAdaptor.java:360) at org.apache.zookeeper.ClientCnxn$SendThread.cleanup(ClientCnxn.java:1205) at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1170) at org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47) at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685) at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766) at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761) at kafka.utils.ZkUtils$.readData(ZkUtils.scala:449) at kafka.consumer.TopicCount$.constructTopicCount(TopicCount.scala:61) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:630) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:601) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:595) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:592) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:592) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:591) at kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener.handleNewSession(ZookeeperConsumerConnector.scala:481) at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472) at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) Caused by: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /consumers/mupd_statsd_logmon_metric_events18/ids/mupd_statsd_logmon_metric_events18_sdc-stg-logsearch-app1-1416023311208-14501d68 at org.apache.zookeeper.KeeperException.create(KeeperException.java:102) at org.apache.zookeeper.KeeperException.create(KeeperException.java:42) at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:921) at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:950) at org.I0Itec.zkclient.ZkConnection.readData(ZkConnection.java:103) at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:770) at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:766) at org.I0Itec.zkclient.ZkClient.retryUntilConnected
How to recover from ConsumerRebalanceFailedException ?
Hi Kafka Team, I get following exception due to ZK/Network issues intermittently. How do I recover from consumer thread dying *programmatically* and restart source because we have alerts that due to this error we have partition OWNERSHIP is *none* ? Please let me know how to restart source and detect consumer thread died and need to be restarted ? 17 Nov 2014 04:29:41,180 ERROR [ ZkClient-EventThread-65-dare-msgq00.sv.walmartlabs.com:9091, dare-msgq01.sv.walmartlabs.com:9091,dare-msgq02.sv.walmartlabs.com:9091] (org.I0Itec.zkclient.ZkEventThread.run:77) - Error handling event ZkEvent[New session event sent to kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener@552c5ea8] kafka.common.ConsumerRebalanceFailedException: mupd_statsd_logmon_metric_events18_sdc-stg-logsearch-app1-1416023311208-14501d68 can't rebalance after 8 retries at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:626) at kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener.handleNewSession(ZookeeperConsumerConnector.scala:481) at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472) at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) ZK Connection Issues: java.net.SocketException: Transport endpoint is not connected at sun.nio.ch.SocketChannelImpl.shutdown(Native Method) at sun.nio.ch.SocketChannelImpl.shutdownInput(SocketChannelImpl.java:633) at sun.nio.ch.SocketAdaptor.shutdownInput(SocketAdaptor.java:360) at org.apache.zookeeper.ClientCnxn$SendThread.cleanup(ClientCnxn.java:1205) at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1170) at org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47) at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685) at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766) at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761) at kafka.utils.ZkUtils$.readData(ZkUtils.scala:449) at kafka.consumer.TopicCount$.constructTopicCount(TopicCount.scala:61) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:630) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:601) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:595) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:592) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:592) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:591) at kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener.handleNewSession(ZookeeperConsumerConnector.scala:481) at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472) at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) Caused by: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /consumers/mupd_statsd_logmon_metric_events18/ids/mupd_statsd_logmon_metric_events18_sdc-stg-logsearch-app1-1416023311208-14501d68 at org.apache.zookeeper.KeeperException.create(KeeperException.java:102) at org.apache.zookeeper.KeeperException.create(KeeperException.java:42) at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:921) at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:950) at org.I0Itec.zkclient.ZkConnection.readData(ZkConnection.java:103) at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:770) at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:766) at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
Enforcing Network Bandwidth Quote with New Java Producer
HI Kafka Team, We like to enforce a network bandwidth quota limit per minute on producer side. How can I do this ? I need some way to count compressed bytes on producer ? I know there is callback does not give this ability ? Let me know the best way. Thanks, Bhavesh
Re: Announcing Confluent
HI Guys, Thanks for your awesome support. I wish you good luck !! Thanks for open sources Kafka !! Thanks, Bhavesh On Thu, Nov 6, 2014 at 10:52 AM, Rajasekar Elango rela...@salesforce.com wrote: Congrats. Wish you all the very best and success. Thanks, Raja. On Thu, Nov 6, 2014 at 1:36 PM, Niek Sanders niek.sand...@gmail.com wrote: Congrats! On Thu, Nov 6, 2014 at 10:28 AM, Jay Kreps jay.kr...@gmail.com wrote: Hey all, I’m happy to announce that Jun Rao, Neha Narkhede and I are creating a company around Kafka called Confluent. We are planning on productizing the kind of Kafka-based real-time data platform we built out at LinkedIn. We are doing this because we think this is a really powerful idea and we felt there was a lot to do to make this idea really take root. We wanted to make that our full time mission and focus. There is a blog post that goes into a little more depth here: http://blog.confluent.io/ LinkedIn will remain a heavy Kafka user and contributor. Combined with our additional resources from the funding of the company this should be a really good thing for the Kafka development effort. Especially when combined with the increasing contributions from the rest of the development community. This is great news, as there is a lot of work to do. We'll need to really focus on scaling this distributed development in a healthy way. One thing I do want to emphasize is that the addition of a company in the Kafka ecosystem won’t mean meddling with open source. Kafka will remain 100% open source and community focused, as of course is true of any Apache project. I have been doing open source for a long time and strongly believe it is the right model for infrastructure software development. Confluent is just getting off the ground now. We left LinkedIn, raised some money, and we have an office (but no furniture yet!). None the less, f you are interested in finding out more about the company and either getting help with your Kafka usage or joining us to help build all this, by all means reach out to us, we’d love to talk. Wish us luck! -Jay -- Thanks, Raja.
Re: [Java New Producer] Changing Partition number and its Impact
Hi Jay or Kafka Dev Team, Any suggestions, how I can deal with this situation of expanding partitions for New Java Producer for scalability (consumer side) ? Thanks, Bhavesh On Tue, Nov 4, 2014 at 7:08 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Also, to added to this Old producer (Scala based in not impacted by the partition changes). So it is important scalability feature being taken way if you do not plan for expansion from the beginning for New Java Producer. So, New Java Producer is taking way this critical feature (unless plan). Thanks, Bhavesh On Tue, Nov 4, 2014 at 4:56 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: HI Jay, Fundamental, problem is batch size is already configured and producers are running in production with given configuration. ( Previous value were just sample). How do we increase partitions for topics when batch size exceed and configured buffer limit ? Yes, had we planed for batch size smaller we can do this, but we cannot do this if producers are already running. Have you faced this problem at LinkedIn or any other place ? Thanks, Bhavesh On Tue, Nov 4, 2014 at 4:25 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Bhavesh, No there isn't such a setting. But what I am saying is that I don't think you really need that feature. I think instead you can use a 32k batch size with your 64M memory limit. This should mean you can have up up to 2048 batches in flight. Assuming one batch in flight and one being added to at any given time, then this should work well for up to ~1000 partitions. So rather than trying to do anything dynamic. So assuming each producer sends to just one topic then you would be fine as long as that topic had fewer than 1000 partitions. If you wanted to add more you would need to add memory on producers. -Jay On Tue, Nov 4, 2014 at 4:04 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Hi Jay, I agree and understood what you have mentioned in previous email. But when you have 5000+ producers running in cloud ( I am sure linkedin has many more and need to increase partitions for scalability) then all running producer will not send any data. So Is there any feature or setting that make sense to shrink batch size to fit the increase. I am sure other will face the same issue. Had I configured with block.on.buffer.full=true it will be even worse and will block application threads. Our use case is *logger.log(msg)* method can not be blocked so that is why we have configuration to false. So I am sure others will run into this same issues. Try to find the optimal solution and recommendation from Kafka Dev team for this particular use case (which may become common). Thanks, Bhavesh On Tue, Nov 4, 2014 at 3:12 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Bhavesh, Here is what your configuration means buffer.memory=64MB # This means don't use more than 64MB of memory batch.size=1MB # This means allocate a 1MB buffer for each partition with data block.on.buffer.full=false # This means immediately throw an exception if there is not enough memory to create a new buffer Not sure what linger time you have set. So what you see makes sense. If you have 1MB buffers and 32 partitions then you will have approximately 32MB of memory in use (actually a bit more than this since one buffer will be filling while another is sending). If you have 128 partitions then you will try to use 128MB, and since you have configured the producer to fail when you reach 64 (rather than waiting for memory to become available) that is what happens. I suspect if you want a smaller batch size. More than 64k is usually not going to help throughput. -Jay On Tue, Nov 4, 2014 at 11:39 AM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Hi Kafka Dev, With new Producer, we are having to change the # partitions for a topic, and we face this issue BufferExhaustedException. Here is example, we have set 64MiB and 32 partitions and 1MiB of batch size. But when we increase the partition to 128, it throws BufferExhaustedException right way (non key based message). Buffer is allocated based on batch.size. This is very common need to set auto calculate batch size when partitions increase because we have about ~5000 boxes and it is not practical to deploy code in all machines than expand partition for scalability purpose. What are options available while new producer is running and partition needs to increase and not enough buffer to allocate batch size for additional partition ? buffer.memory=64MiB batch.size=1MiB block.on.buffer.full=false Thanks, Bhavesh
Re: [Java New Producer] Changing Partition number and its Impact
HI Jay, Thanks for response. I feel this needs to be documented as limitation of New Java Producer Batch size vs buffer size impact when increasing the partition. I agree what you get fine grain control (which is great), but ultimately loosing the functionality of increasing partition for scalability which I think is greater without impacting running live production environment producers. I would argue from customer prospective that I want to have flag called *auto.update.batch.size* respect to buffer size for new producer which will recalculate batch size when partition increase is detected. So the running code does not throw this exception or block application threads for more memory which it will never get. (this is just my suggestion) Do you agree ? Should I file a Jira for this ? I am sure others will run into this problem for sure sooner or later. Thanks, Bhavesh On Wed, Nov 5, 2014 at 4:44 PM, Jay Kreps jay.kr...@gmail.com wrote: Bhavesh, Wouldn't using the default batch size of 16k have avoided this problem entirely? I think the best solution now is just to change the configuration. What I am saying is it is unlikely you will need to do this again, the problem is just that 1MB partition batches are quite large so you quickly run out of memory very quickly with that configuration. I agree that the scala producer doesn't have this problem, but it actually doesn't really let you control the memory use or the request size very effectively which I would argue is a much bigger problem. Once you introduce those controls you have to configure how to make use of them, which is what this is about. -Jay On Wed, Nov 5, 2014 at 3:45 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Hi Jay or Kafka Dev Team, Any suggestions, how I can deal with this situation of expanding partitions for New Java Producer for scalability (consumer side) ? Thanks, Bhavesh On Tue, Nov 4, 2014 at 7:08 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Also, to added to this Old producer (Scala based in not impacted by the partition changes). So it is important scalability feature being taken way if you do not plan for expansion from the beginning for New Java Producer. So, New Java Producer is taking way this critical feature (unless plan). Thanks, Bhavesh On Tue, Nov 4, 2014 at 4:56 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: HI Jay, Fundamental, problem is batch size is already configured and producers are running in production with given configuration. ( Previous value were just sample). How do we increase partitions for topics when batch size exceed and configured buffer limit ? Yes, had we planed for batch size smaller we can do this, but we cannot do this if producers are already running. Have you faced this problem at LinkedIn or any other place ? Thanks, Bhavesh On Tue, Nov 4, 2014 at 4:25 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Bhavesh, No there isn't such a setting. But what I am saying is that I don't think you really need that feature. I think instead you can use a 32k batch size with your 64M memory limit. This should mean you can have up up to 2048 batches in flight. Assuming one batch in flight and one being added to at any given time, then this should work well for up to ~1000 partitions. So rather than trying to do anything dynamic. So assuming each producer sends to just one topic then you would be fine as long as that topic had fewer than 1000 partitions. If you wanted to add more you would need to add memory on producers. -Jay On Tue, Nov 4, 2014 at 4:04 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Hi Jay, I agree and understood what you have mentioned in previous email. But when you have 5000+ producers running in cloud ( I am sure linkedin has many more and need to increase partitions for scalability) then all running producer will not send any data. So Is there any feature or setting that make sense to shrink batch size to fit the increase. I am sure other will face the same issue. Had I configured with block.on.buffer.full=true it will be even worse and will block application threads. Our use case is *logger.log(msg)* method can not be blocked so that is why we have configuration to false. So I am sure others will run into this same issues. Try to find the optimal solution and recommendation from Kafka Dev team for this particular use case (which may become common). Thanks, Bhavesh On Tue, Nov 4, 2014 at 3:12 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Bhavesh, Here is what your configuration means buffer.memory=64MB # This means don't use more than 64MB of memory batch.size=1MB # This means
queued.max.message.chunks impact and consumer tuning
Hi Kafka Dev Team, It seems that Maximum buffer size is set to 2 default. What is impact of changing this to 2000 or so ? This will improve the consumer thread performance ? More event will be buffered in memory. Or Is there any other recommendation to tune High Level Consumers ? Here is code from Kafka Trunk Branch: val MaxQueuedChunks = 2 /** max number of message chunks buffered for consumption, each chunk can be up to fetch.message.max.bytes*/ val queuedMaxMessages = props.getInt(queued.max.message.chunks, MaxQueuedChunks) Thanks, Bhavesh
Re: queued.max.message.chunks impact and consumer tuning
Thanks for info. I will have to tune the memory. What else do you recommend for High level Consumer for optimal performance and drain as quickly as possible with auto commit on ? Thanks, Bhavesh On Tue, Nov 4, 2014 at 9:59 AM, Joel Koshy jjkosh...@gmail.com wrote: We used to default to 10, but two should be sufficient. There is little reason to buffer more than that. If you increase it to 2000 you will most likely run into memory issues. E.g., if your fetch size is 1MB you would enqueue 1MB*2000 chunks in each queue. On Tue, Nov 04, 2014 at 09:05:44AM -0800, Bhavesh Mistry wrote: Hi Kafka Dev Team, It seems that Maximum buffer size is set to 2 default. What is impact of changing this to 2000 or so ? This will improve the consumer thread performance ? More event will be buffered in memory. Or Is there any other recommendation to tune High Level Consumers ? Here is code from Kafka Trunk Branch: val MaxQueuedChunks = 2 /** max number of message chunks buffered for consumption, each chunk can be up to fetch.message.max.bytes*/ val queuedMaxMessages = props.getInt(queued.max.message.chunks, MaxQueuedChunks) Thanks, Bhavesh