Re: Different partitioning between new producer and old producer
I didn't know there's a method in the producer to get the metadata from the broker. I will fix my producer container. On Wed, Sep 17, 2014 at 6:52 PM, Neha Narkhede neha.narkh...@gmail.com wrote: Could you make them same logic? Otherwise, I have to change implementation of kafka producer container. The new producer is much more flexible and allows the user to use custom partitioning logic and provide the partition number in the ProducerRecord. That way it is broadly applicable to a variety of applications that require different partitioning logic. Thanks, Neha On Wed, Sep 17, 2014 at 11:00 AM, Bae, Jae Hyeon metac...@gmail.com wrote: The major motivation of adopting new producer before it's released, old producer is showing terrible throughput of cross-regional kafka mirroring in EC2. Let me share numbers. Using iperf, network bandwidth between us-west-2 AWS EC2 and us-east-1 AWS EC2 is more than 40 MB/sec. But old producer's throughput is less than 3 MB/sec. start.timeend.timecompressionmessage.sizebatch.sizetotal.data.sent.in.MB MB.sectotal.data.sent.in.nMsgnMsg.sec2014-09-16 20:22:25:5372014-09-16 20:24:13:13823000200286.102.658910929.3594 Even though we increased the socket send buffer on the producer side and recv buffer on the broker side, it didn't work. send.buffer.bytes: 8388608 start.timeend.timecompressionmessage.sizebatch.sizetotal.data.sent.in.MB MB.sectotal.data.sent.in.nMsgnMsg.sec2014-09-16 20:48:49:5882014-09-16 20:50:03:00623000200286.103.8969101362.0638 But new producer which is not released yet is showing significant performance improvement. Its performance is more than 30MB/sec. start.timeend.timecompressionmessage.sizebatch.sizetotal.data.sent.in.MB MB.sectotal.data.sent.in.nMsgnMsg.sec2014-09-16 20:50:31:7202014-09-16 20:50:41:24123000200286.1030.04961010503.098 I was excited about new producer's performance but its partitioning logic is different. Without partition number in ProducerRecord, its partitioning logic is based on murmur2 hash key. But in the old partitioner, partitioning logic is based on key.hashCode. Could you make them same logic? Otherwise, I have to change implementation of kafka producer container.
Re: [Java New Producer] CPU Usage Spike to 100% when network connection is lost
Also do you know what version you are running we did fix several bugs similar to this against trunk. -Jay On Wed, Sep 17, 2014 at 2:14 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Hi Kafka Dev team, I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState( ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected( ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect( NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata( NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh
Re: [Java New Producer] CPU Usage Spike to 100% when network connection is lost
HI Jay, I am running trunk producer based on following last commit with timestamp Mon Sep 15 20:34:14 2014 -0700. Please let me know if this timestamp contains the fix. Otherwise, I will file a bug with logs as Neha suggested. commit cf0f5750b39e675cf9a9c6d6394a665366db0f58 Author: Alexis Midon mi...@apache.org Date: Mon Sep 15 20:34:14 2014 -0700 KAFKA-1597 New metrics: ResponseQueueSize and BeingSentResponses; reviewed by Neha Narkhede and Jun Rao Thanks, Bhavesh On Wed, Sep 17, 2014 at 11:22 PM, Jay Kreps jay.kr...@gmail.com wrote: Also do you know what version you are running we did fix several bugs similar to this against trunk. -Jay On Wed, Sep 17, 2014 at 2:14 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Hi Kafka Dev team, I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState( ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected( ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect( NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata( NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh
Re: zookeeper upgrade or remove zookeeper dependency
Good to know. Does it mean release will go out after those bug is fixed or moved to newer release? :) Best Regards, Mingtao On Wed, Sep 17, 2014 at 9:34 PM, Neha Narkhede neha.narkh...@gmail.com wrote: You can track the list of open bugs here https://issues.apache.org/jira/browse/KAFKA-1558?jql=project%20%3D%20Kafka%20and%20fixVersion%20%3D%200.8.2%20and%20status%20!%3D%20Resolved%20and%20status%20!%3D%20Closed . On Wed, Sep 17, 2014 at 10:00 AM, Mingtao Zhang mail2ming...@gmail.com wrote: Could you also share a rough time point of 0.8.2 release? Best Regards, Mingtao On Wed, Sep 17, 2014 at 12:10 PM, Neha Narkhede neha.narkh...@gmail.com wrote: Kafka trunk is on a later zookeeper version (3.4.6). So the next release (0.8.2) will depend on zookeeper 3.4.6 On Wed, Sep 17, 2014 at 8:55 AM, Mingtao Zhang mail2ming...@gmail.com wrote: Hi, I could see kafka is using zookeeper 3.3.4. For my integration purpose, I want to use curator, which requires a higher version than 3.3.4 even in its lowest version. I there any plan to bump up zookeeper dependency? Or is there any plan to remove zookeeper dependency? Best Regards, Mingtao
Re: zookeeper upgrade or remove zookeeper dependency
Hi Mingtao, We are shooting to cut the 0.8.2 branch this month. Guozhang On Thu, Sep 18, 2014 at 10:36 AM, Mingtao Zhang mail2ming...@gmail.com wrote: Good to know. Does it mean release will go out after those bug is fixed or moved to newer release? :) Best Regards, Mingtao On Wed, Sep 17, 2014 at 9:34 PM, Neha Narkhede neha.narkh...@gmail.com wrote: You can track the list of open bugs here https://issues.apache.org/jira/browse/KAFKA-1558?jql=project%20%3D%20Kafka%20and%20fixVersion%20%3D%200.8.2%20and%20status%20!%3D%20Resolved%20and%20status%20!%3D%20Closed . On Wed, Sep 17, 2014 at 10:00 AM, Mingtao Zhang mail2ming...@gmail.com wrote: Could you also share a rough time point of 0.8.2 release? Best Regards, Mingtao On Wed, Sep 17, 2014 at 12:10 PM, Neha Narkhede neha.narkh...@gmail.com wrote: Kafka trunk is on a later zookeeper version (3.4.6). So the next release (0.8.2) will depend on zookeeper 3.4.6 On Wed, Sep 17, 2014 at 8:55 AM, Mingtao Zhang mail2ming...@gmail.com wrote: Hi, I could see kafka is using zookeeper 3.3.4. For my integration purpose, I want to use curator, which requires a higher version than 3.3.4 even in its lowest version. I there any plan to bump up zookeeper dependency? Or is there any plan to remove zookeeper dependency? Best Regards, Mingtao -- -- Guozhang
Re: zookeeper upgrade or remove zookeeper dependency
Great :) Best Regards, Mingtao On Thu, Sep 18, 2014 at 2:04 PM, Guozhang Wang wangg...@gmail.com wrote: Hi Mingtao, We are shooting to cut the 0.8.2 branch this month. Guozhang On Thu, Sep 18, 2014 at 10:36 AM, Mingtao Zhang mail2ming...@gmail.com wrote: Good to know. Does it mean release will go out after those bug is fixed or moved to newer release? :) Best Regards, Mingtao On Wed, Sep 17, 2014 at 9:34 PM, Neha Narkhede neha.narkh...@gmail.com wrote: You can track the list of open bugs here https://issues.apache.org/jira/browse/KAFKA-1558?jql=project%20%3D%20Kafka%20and%20fixVersion%20%3D%200.8.2%20and%20status%20!%3D%20Resolved%20and%20status%20!%3D%20Closed . On Wed, Sep 17, 2014 at 10:00 AM, Mingtao Zhang mail2ming...@gmail.com wrote: Could you also share a rough time point of 0.8.2 release? Best Regards, Mingtao On Wed, Sep 17, 2014 at 12:10 PM, Neha Narkhede neha.narkh...@gmail.com wrote: Kafka trunk is on a later zookeeper version (3.4.6). So the next release (0.8.2) will depend on zookeeper 3.4.6 On Wed, Sep 17, 2014 at 8:55 AM, Mingtao Zhang mail2ming...@gmail.com wrote: Hi, I could see kafka is using zookeeper 3.3.4. For my integration purpose, I want to use curator, which requires a higher version than 3.3.4 even in its lowest version. I there any plan to bump up zookeeper dependency? Or is there any plan to remove zookeeper dependency? Best Regards, Mingtao -- -- Guozhang
Re: MBeans, dashes, underscores, and KAFKA-1481
Otis, In kafka-1481, we will have to change the mbean names (at least the ones with clientid and topic) anyway. Using the name/value pair in the mbean name allows us to do this in a cleaner way. Yes, , is not allowed in clientid or topic. Bhavesh, Yes, I was thinking of making changes in the new metrics package. Something like allowing the sensor names to have name/value pairs. The jmx names will just follow accordingly. This is probably cleaner than doing the escaping. Also, the metric names are more intuitive (otherwise, you have to know which part is the clientid and which part is the topic). Thanks, Jun On Wed, Sep 17, 2014 at 2:32 PM, Otis Gospodnetic otis.gospodne...@gmail.com wrote: Hi Jun, On Wed, Sep 17, 2014 at 12:35 PM, Jun Rao jun...@gmail.com wrote: Bhavesh, Yes, allowing dot in clientId and topic makes it a bit harder to define the JMX bean names. I see a couple of solutions here. 1. Disable dot in clientId and topic names. The issue is that dot may already be used in existing deployment. 2. We can represent the JMX bean name differently in the new producer. Instead of kafka.producer.myclientid:type=mytopic we could change it to kafka.producer:clientId=myclientid,topic=mytopic I felt that option 2 is probably better since it doesn't affect existing users. If it doesn't affect existing users, great. If you are saying that each piece of MBean name could be expressed as name=value pair, with something like , (forbidden in host names, topic names, client IDs, etc. I assume?) then yes, I think this would be easier to parse and it would be easier for people to understand what is what. Otis -- Monitoring * Alerting * Anomaly Detection * Centralized Log Management Solr Elasticsearch Support * http://sematext.com/ Otis, We probably can also use option 2 to address KAFKA-1481. For topic/clientid specific metrics, we could explicitly specify the metric name so that it contains topic=mytopic,clientid=myclientid. That seems to be a much cleaner way than having all parts included in a single string separated by '|'. Thanks, Jun On Tue, Sep 16, 2014 at 5:15 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: HI Otis, What is migration path ? If topic with special chars exists already( .,-,| etc) in previous version of producer/consumer of Kafka, what happens after the upgrade new producer or consumer (kafka version) ? Also, in new producer API (Kafka Trunk), does this enforce the rule about client id as well ? Thanks, Bhavesh On Tue, Sep 16, 2014 at 2:09 PM, Otis Gospodnetic otis.gospodne...@gmail.com wrote: Hi, So maybe I should I should have asked the Q explicitly: Could we commit the patch from https://issues.apache.org/jira/browse/KAFKA-1481 now that, I hope, it's clear what problems the current MBean names can cause? Thanks, Otis -- Monitoring * Alerting * Anomaly Detection * Centralized Log Management Solr Elasticsearch Support * http://sematext.com/ On Mon, Sep 15, 2014 at 10:40 PM, Otis Gospodnetic otis.gospodne...@gmail.com wrote: Hi, *Problem:* Some Kafka 0.8.x MBeans have names composed of things like consumer group-topic-metric name. Note how dashes are used as delimiters. When consumer group and topic don't contain the delimiter character all is good if you want to extract parts of this MBean name by simply splitting on the delimiter character. The problem is that dashes are allowed in topic and group names, so this splitting doesn't work. Moreover, underscores are also used as delimiters, and they can also be used in things like topic names. *Example*: This MBean's name is composed of consumer group-topic-BytesPerSec: kafka.consumer:type=ConsumerTopicMetrics, name=*myGroup**-myTopic**-* BytesPerSec Here we can actually split on - and extract all 3 parts from the MBean name:: * consumer group ('*myGroup*') * topic ('*myTopic*') * metric (‘BytesPerSec’) All good! But imagine if I named the group: *my-Group* And if I named the topic: *my-Topic* Then we'd have: kafka.consumer:type=ConsumerTopicMetrics, name=*my-Group**-my-Topic**-* BytesPerSec Now splitting on - would no longer work! To extract my-Group and my-Topic and BytesPerSec parts I would have to know the specific group name and topic name to look for and could not use generic approach of just splitting the MBean name on the delimiter. *Solution*: The patch in https://issues.apache.org/jira/browse/KAFKA-1481 replaces all _ and - characters where they are used as delimiters in MBean names with a | character. Because the I character is not allowed in
Interaction of retention settings for broker and topic plus partitions
Hello all! I'm curious about the interaction of server and topic level retention settings. It's not clear to me the precedence of the follow: - broker's default log.retention.bytes - topic's retention.bytes (which defaults to broker's log.retention.bytes) - broker's log.retention.hours and log.retention.minutes (if both are specified then it seems to be the lower of the two, since it's when either is exceeded) It seems that the rule is that when any of these are violated then the log segment is deleted. Is this right? Also, just to be clear: The log sizes in questions are for a single partitions logs? I have a situation where my per-topic retention.bytes is very high, but my default log.retention.hours is lower (the default @ 168 hours). It seems that it's truncating at the log.retention.hours instead of the topic's retention.bytes. Am I understanding this correctly? :) -- Cory Watson Principal Infrastructure Engineer // Keen IO
Re: [Java New Producer] CPU Usage Spike to 100% when network connection is lost
Hmm, yes, the fix was prior to that, that sounds like a bug. -Jay On Thu, Sep 18, 2014 at 9:39 AM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: HI Jay, I am running trunk producer based on following last commit with timestamp Mon Sep 15 20:34:14 2014 -0700. Please let me know if this timestamp contains the fix. Otherwise, I will file a bug with logs as Neha suggested. commit cf0f5750b39e675cf9a9c6d6394a665366db0f58 Author: Alexis Midon mi...@apache.org Date: Mon Sep 15 20:34:14 2014 -0700 KAFKA-1597 New metrics: ResponseQueueSize and BeingSentResponses; reviewed by Neha Narkhede and Jun Rao Thanks, Bhavesh On Wed, Sep 17, 2014 at 11:22 PM, Jay Kreps jay.kr...@gmail.com wrote: Also do you know what version you are running we did fix several bugs similar to this against trunk. -Jay On Wed, Sep 17, 2014 at 2:14 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Hi Kafka Dev team, I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState( ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected( ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect( NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata( NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh
Re: Different partitioning between new producer and old producer
Hey Jae, The rationale for switching was to use a hash code that is cross language and not dependent on the particular object. There are all kinds of gotchas with Java's hashCode() as a partition assignment strategy (e.g. two byte arrays with the same bytes will have different hash codes). -Jay On Wed, Sep 17, 2014 at 11:00 AM, Bae, Jae Hyeon metac...@gmail.com wrote: The major motivation of adopting new producer before it's released, old producer is showing terrible throughput of cross-regional kafka mirroring in EC2. Let me share numbers. Using iperf, network bandwidth between us-west-2 AWS EC2 and us-east-1 AWS EC2 is more than 40 MB/sec. But old producer's throughput is less than 3 MB/sec. start.timeend.timecompressionmessage.sizebatch.sizetotal.data.sent.in.MB MB.sectotal.data.sent.in.nMsgnMsg.sec2014-09-16 20:22:25:5372014-09-16 20:24:13:13823000200286.102.658910929.3594 Even though we increased the socket send buffer on the producer side and recv buffer on the broker side, it didn't work. send.buffer.bytes: 8388608 start.timeend.timecompressionmessage.sizebatch.sizetotal.data.sent.in.MB MB.sectotal.data.sent.in.nMsgnMsg.sec2014-09-16 20:48:49:5882014-09-16 20:50:03:00623000200286.103.8969101362.0638 But new producer which is not released yet is showing significant performance improvement. Its performance is more than 30MB/sec. start.timeend.timecompressionmessage.sizebatch.sizetotal.data.sent.in.MB MB.sectotal.data.sent.in.nMsgnMsg.sec2014-09-16 20:50:31:7202014-09-16 20:50:41:24123000200286.1030.04961010503.098 I was excited about new producer's performance but its partitioning logic is different. Without partition number in ProducerRecord, its partitioning logic is based on murmur2 hash key. But in the old partitioner, partitioning logic is based on key.hashCode. Could you make them same logic? Otherwise, I have to change implementation of kafka producer container.
Handling errors in the new (0.8.2) Java Client's Producer
I am trying to understand the best practices for working with the new (0.8.2) Producer interface. We have a process in a large server that writes a lot of data to Kafka. However, this data is not mission critical. When a problem arises writing to Kafka, most specifically network issues, but also full Producer buffers, we want the server to continue working, but to stop sending data to Kafka, allowing other tasks to continue. The issue I have is handling messages that have been sent to the producer but are waiting to go to Kafka. These messages remain long after my processing is over, timing out, writing to the logs, and preventing me from moving forward. I am looking for some way to tell the client to stop forwarding messages to Kafka. This is what I have so far: class ErrorCallback implements Callback { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception == null) { // The message was sent, return; } stopProducerSendAndClose(); String threadName = Thread.currentThread().getName(); if (!threadName.equals(kafka-producer-network-thread)) { // Some of the callbacks happen on my thread } else { // We are in KafkaProducer's ioThread == commit suicide. Thread.currentThread().interrupt(); throw new ThreadDeath(); // Cannot throw an Exception as is will just be caught and logged. } } } My question is, is this the correct approach, or is there some other way to stop sending messages (short of going synced). Andrew Stein