Re: Different partitioning between new producer and old producer

2014-09-18 Thread Bae, Jae Hyeon
I didn't know there's a method in the producer to get the metadata from the
broker.

I will fix my producer container.

On Wed, Sep 17, 2014 at 6:52 PM, Neha Narkhede neha.narkh...@gmail.com
wrote:

 Could you make them same logic? Otherwise, I have to change implementation
 of kafka producer container.

 The new producer is much more flexible and allows the user to use custom
 partitioning logic and provide the partition number in the ProducerRecord.
 That way it is broadly applicable to a variety of applications that require
 different partitioning logic.

 Thanks,
 Neha

 On Wed, Sep 17, 2014 at 11:00 AM, Bae, Jae Hyeon metac...@gmail.com
 wrote:

  The major motivation of adopting new producer before it's released, old
  producer is showing terrible throughput of cross-regional kafka mirroring
  in EC2.
 
  Let me share numbers.
 
  Using iperf, network bandwidth between us-west-2 AWS EC2 and us-east-1
 AWS
  EC2 is more than 40 MB/sec. But old producer's throughput is less than 3
  MB/sec.
 
  start.timeend.timecompressionmessage.sizebatch.sizetotal.data.sent.in.MB
  MB.sectotal.data.sent.in.nMsgnMsg.sec2014-09-16 20:22:25:5372014-09-16
  20:24:13:13823000200286.102.658910929.3594
 
  Even though we increased the socket send buffer on the producer side and
  recv buffer on the broker side, it didn't work.
  send.buffer.bytes: 8388608
  start.timeend.timecompressionmessage.sizebatch.sizetotal.data.sent.in.MB
  MB.sectotal.data.sent.in.nMsgnMsg.sec2014-09-16 20:48:49:5882014-09-16
  20:50:03:00623000200286.103.8969101362.0638
 
  But new producer which is not released yet is showing significant
  performance improvement. Its performance is more than 30MB/sec.
  start.timeend.timecompressionmessage.sizebatch.sizetotal.data.sent.in.MB
  MB.sectotal.data.sent.in.nMsgnMsg.sec2014-09-16 20:50:31:7202014-09-16
  20:50:41:24123000200286.1030.04961010503.098
  I was excited about new producer's performance but its partitioning logic
  is different.
 
  Without partition number in ProducerRecord, its partitioning logic is
 based
  on murmur2 hash key. But in the old partitioner, partitioning logic is
  based on key.hashCode.
 
  Could you make them same logic? Otherwise, I have to change
 implementation
  of kafka producer container.
 



Re: [Java New Producer] CPU Usage Spike to 100% when network connection is lost

2014-09-18 Thread Jay Kreps
Also do you know what version you are running we did fix several bugs
similar to this against trunk.

-Jay

On Wed, Sep 17, 2014 at 2:14 PM, Bhavesh Mistry
mistry.p.bhav...@gmail.com wrote:
 Hi Kafka Dev team,

 I see my CPU spike to 100% when network connection is lost for while.  It
 seems network  IO thread are very busy logging following error message.  Is
 this expected behavior ?

 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR
 org.apache.kafka.clients.producer.internals.Sender - Uncaught error in
 kafka producer I/O thread:

 java.lang.IllegalStateException: No entry found for node -2

 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(
 ClusterConnectionStates.java:110)

 at org.apache.kafka.clients.ClusterConnectionStates.disconnected(
 ClusterConnectionStates.java:99)

 at org.apache.kafka.clients.NetworkClient.initiateConnect(
 NetworkClient.java:394)

 at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(
 NetworkClient.java:380)

 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174)

 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)

 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)

 at java.lang.Thread.run(Thread.java:744)

 Thanks,

 Bhavesh


Re: [Java New Producer] CPU Usage Spike to 100% when network connection is lost

2014-09-18 Thread Bhavesh Mistry
HI Jay,

I am running trunk producer based on following last commit with timestamp
Mon Sep 15 20:34:14 2014 -0700.   Please let me know if this timestamp
contains the fix. Otherwise, I will file a bug with logs as Neha suggested.

commit cf0f5750b39e675cf9a9c6d6394a665366db0f58
Author: Alexis Midon mi...@apache.org
Date:   Mon Sep 15 20:34:14 2014 -0700
KAFKA-1597 New metrics: ResponseQueueSize and BeingSentResponses;
reviewed by Neha Narkhede and Jun Rao


Thanks,

Bhavesh

On Wed, Sep 17, 2014 at 11:22 PM, Jay Kreps jay.kr...@gmail.com wrote:

 Also do you know what version you are running we did fix several bugs
 similar to this against trunk.

 -Jay

 On Wed, Sep 17, 2014 at 2:14 PM, Bhavesh Mistry
 mistry.p.bhav...@gmail.com wrote:
  Hi Kafka Dev team,
 
  I see my CPU spike to 100% when network connection is lost for while.  It
  seems network  IO thread are very busy logging following error message.
 Is
  this expected behavior ?
 
  2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR
  org.apache.kafka.clients.producer.internals.Sender - Uncaught error in
  kafka producer I/O thread:
 
  java.lang.IllegalStateException: No entry found for node -2
 
  at org.apache.kafka.clients.ClusterConnectionStates.nodeState(
  ClusterConnectionStates.java:110)
 
  at org.apache.kafka.clients.ClusterConnectionStates.disconnected(
  ClusterConnectionStates.java:99)
 
  at org.apache.kafka.clients.NetworkClient.initiateConnect(
  NetworkClient.java:394)
 
  at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(
  NetworkClient.java:380)
 
  at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174)
 
  at
 org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
 
  at
 org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
 
  at java.lang.Thread.run(Thread.java:744)
 
  Thanks,
 
  Bhavesh



Re: zookeeper upgrade or remove zookeeper dependency

2014-09-18 Thread Mingtao Zhang
Good to know. Does it mean release will go out after those bug is fixed or
moved to newer release? :)

Best Regards,
Mingtao

On Wed, Sep 17, 2014 at 9:34 PM, Neha Narkhede neha.narkh...@gmail.com
wrote:

 You can track the list of open bugs here
 
 https://issues.apache.org/jira/browse/KAFKA-1558?jql=project%20%3D%20Kafka%20and%20fixVersion%20%3D%200.8.2%20and%20status%20!%3D%20Resolved%20and%20status%20!%3D%20Closed
 
 .


 On Wed, Sep 17, 2014 at 10:00 AM, Mingtao Zhang mail2ming...@gmail.com
 wrote:

  Could you also share a rough time point of 0.8.2 release?
 
  Best Regards,
  Mingtao
 
  On Wed, Sep 17, 2014 at 12:10 PM, Neha Narkhede neha.narkh...@gmail.com
 
  wrote:
 
   Kafka trunk is on a later zookeeper version (3.4.6). So the next
 release
   (0.8.2) will depend on zookeeper 3.4.6
  
   On Wed, Sep 17, 2014 at 8:55 AM, Mingtao Zhang mail2ming...@gmail.com
 
   wrote:
  
Hi,
   
I could see kafka is using zookeeper 3.3.4. For my integration
  purpose, I
want to use curator, which requires a higher version than 3.3.4 even
 in
   its
lowest version.
   
I there any plan to bump up zookeeper dependency? Or is there any
 plan
  to
remove zookeeper dependency?
   
Best Regards,
Mingtao
   
  
 



Re: zookeeper upgrade or remove zookeeper dependency

2014-09-18 Thread Guozhang Wang
Hi Mingtao,

We are shooting to cut the 0.8.2 branch this month.

Guozhang

On Thu, Sep 18, 2014 at 10:36 AM, Mingtao Zhang mail2ming...@gmail.com
wrote:

 Good to know. Does it mean release will go out after those bug is fixed or
 moved to newer release? :)

 Best Regards,
 Mingtao

 On Wed, Sep 17, 2014 at 9:34 PM, Neha Narkhede neha.narkh...@gmail.com
 wrote:

  You can track the list of open bugs here
  
 
 https://issues.apache.org/jira/browse/KAFKA-1558?jql=project%20%3D%20Kafka%20and%20fixVersion%20%3D%200.8.2%20and%20status%20!%3D%20Resolved%20and%20status%20!%3D%20Closed
  
  .
 
 
  On Wed, Sep 17, 2014 at 10:00 AM, Mingtao Zhang mail2ming...@gmail.com
  wrote:
 
   Could you also share a rough time point of 0.8.2 release?
  
   Best Regards,
   Mingtao
  
   On Wed, Sep 17, 2014 at 12:10 PM, Neha Narkhede 
 neha.narkh...@gmail.com
  
   wrote:
  
Kafka trunk is on a later zookeeper version (3.4.6). So the next
  release
(0.8.2) will depend on zookeeper 3.4.6
   
On Wed, Sep 17, 2014 at 8:55 AM, Mingtao Zhang 
 mail2ming...@gmail.com
  
wrote:
   
 Hi,

 I could see kafka is using zookeeper 3.3.4. For my integration
   purpose, I
 want to use curator, which requires a higher version than 3.3.4
 even
  in
its
 lowest version.

 I there any plan to bump up zookeeper dependency? Or is there any
  plan
   to
 remove zookeeper dependency?

 Best Regards,
 Mingtao

   
  
 




-- 
-- Guozhang


Re: zookeeper upgrade or remove zookeeper dependency

2014-09-18 Thread Mingtao Zhang
Great :)

Best Regards,
Mingtao

On Thu, Sep 18, 2014 at 2:04 PM, Guozhang Wang wangg...@gmail.com wrote:

 Hi Mingtao,

 We are shooting to cut the 0.8.2 branch this month.

 Guozhang

 On Thu, Sep 18, 2014 at 10:36 AM, Mingtao Zhang mail2ming...@gmail.com
 wrote:

  Good to know. Does it mean release will go out after those bug is fixed
 or
  moved to newer release? :)
 
  Best Regards,
  Mingtao
 
  On Wed, Sep 17, 2014 at 9:34 PM, Neha Narkhede neha.narkh...@gmail.com
  wrote:
 
   You can track the list of open bugs here
   
  
 
 https://issues.apache.org/jira/browse/KAFKA-1558?jql=project%20%3D%20Kafka%20and%20fixVersion%20%3D%200.8.2%20and%20status%20!%3D%20Resolved%20and%20status%20!%3D%20Closed
   
   .
  
  
   On Wed, Sep 17, 2014 at 10:00 AM, Mingtao Zhang 
 mail2ming...@gmail.com
   wrote:
  
Could you also share a rough time point of 0.8.2 release?
   
Best Regards,
Mingtao
   
On Wed, Sep 17, 2014 at 12:10 PM, Neha Narkhede 
  neha.narkh...@gmail.com
   
wrote:
   
 Kafka trunk is on a later zookeeper version (3.4.6). So the next
   release
 (0.8.2) will depend on zookeeper 3.4.6

 On Wed, Sep 17, 2014 at 8:55 AM, Mingtao Zhang 
  mail2ming...@gmail.com
   
 wrote:

  Hi,
 
  I could see kafka is using zookeeper 3.3.4. For my integration
purpose, I
  want to use curator, which requires a higher version than 3.3.4
  even
   in
 its
  lowest version.
 
  I there any plan to bump up zookeeper dependency? Or is there any
   plan
to
  remove zookeeper dependency?
 
  Best Regards,
  Mingtao
 

   
  
 



 --
 -- Guozhang



Re: MBeans, dashes, underscores, and KAFKA-1481

2014-09-18 Thread Jun Rao
Otis,

In kafka-1481, we will have to change the mbean names (at least the ones
with clientid and topic) anyway. Using the name/value pair in the mbean
name allows us to do this in a cleaner way. Yes, , is not allowed in
clientid or topic.

Bhavesh,

Yes, I was thinking of making changes in the new metrics package. Something
like allowing the sensor names to have name/value pairs. The jmx names will
just follow accordingly. This is probably cleaner than doing the escaping.
Also, the metric names are more intuitive (otherwise, you have to know
which part is the clientid and which part is the topic).

Thanks,

Jun

On Wed, Sep 17, 2014 at 2:32 PM, Otis Gospodnetic 
otis.gospodne...@gmail.com wrote:

 Hi Jun,

 On Wed, Sep 17, 2014 at 12:35 PM, Jun Rao jun...@gmail.com wrote:

  Bhavesh,
 
  Yes, allowing dot in clientId and topic makes it a bit harder to define
 the
  JMX bean names. I see a couple of solutions here.
 
  1. Disable dot in clientId and topic names. The issue is that dot may
  already be used in existing deployment.
 
  2. We can represent the JMX bean name differently in the new producer.
  Instead of
kafka.producer.myclientid:type=mytopic
  we could change it to
kafka.producer:clientId=myclientid,topic=mytopic
 
  I felt that option 2 is probably better since it doesn't affect existing
  users.
 

 If it doesn't affect existing users, great.

 If you are saying that each piece of MBean name could be expressed as
 name=value pair, with something like , (forbidden in host names, topic
 names, client IDs, etc. I assume?) then yes, I think this would be easier
 to parse and it would be easier for people to understand what is what.

 Otis
 --
 Monitoring * Alerting * Anomaly Detection * Centralized Log Management
 Solr  Elasticsearch Support * http://sematext.com/



 
  Otis,
 
  We probably can also use option 2 to address KAFKA-1481. For
 topic/clientid
  specific metrics, we could explicitly specify the metric name so that it
  contains topic=mytopic,clientid=myclientid. That seems to be a much
  cleaner way than having all parts included in a single string separated
 by
  '|'.
 
  Thanks,
 
  Jun
 
 
 
 
  On Tue, Sep 16, 2014 at 5:15 PM, Bhavesh Mistry 
  mistry.p.bhav...@gmail.com
  wrote:
 
   HI Otis,
  
   What is migration path ?  If topic with special chars exists already(
   .,-,| etc)  in previous version of producer/consumer of Kafka,
 what
   happens after the upgrade new producer or consumer (kafka version) ?
  Also,
   in new producer API (Kafka Trunk), does this enforce the rule about
  client
   id as well ?
  
   Thanks,
  
   Bhavesh
  
   On Tue, Sep 16, 2014 at 2:09 PM, Otis Gospodnetic 
   otis.gospodne...@gmail.com wrote:
  
Hi,
   
So maybe I should I should have asked the Q explicitly:
Could we commit the patch from
https://issues.apache.org/jira/browse/KAFKA-1481 now that, I hope,
  it's
clear what problems the current MBean names can cause?
   
Thanks,
Otis
--
Monitoring * Alerting * Anomaly Detection * Centralized Log
 Management
Solr  Elasticsearch Support * http://sematext.com/
   
   
   
On Mon, Sep 15, 2014 at 10:40 PM, Otis Gospodnetic 
otis.gospodne...@gmail.com wrote:
   
 Hi,

 *Problem:*
 Some Kafka 0.8.x MBeans have names composed of things like
 consumer
 group-topic-metric name.  Note how dashes are used as
  delimiters.
  When consumer group and topic don't contain the delimiter
   character
 all is good if you want to extract parts of this MBean name by
 simply
 splitting on the delimiter character.  The problem is that dashes
 are
 allowed in topic and group names, so this splitting doesn't work.
 Moreover, underscores are also used as delimiters, and they can
 also
  be
 used in things like topic names.

 *Example*:
 This MBean's name is composed of consumer
  group-topic-BytesPerSec:

 kafka.consumer:type=ConsumerTopicMetrics,
   name=*myGroup**-myTopic**-*
 BytesPerSec

 Here we can actually split on - and extract all 3 parts from the
   MBean
 name::
 * consumer group ('*myGroup*')
 * topic ('*myTopic*')
 * metric (‘BytesPerSec’)

 All good!

 But imagine if I named the group: *my-Group*
 And if I named the topic: *my-Topic*

 Then we'd have:
 kafka.consumer:type=ConsumerTopicMetrics,
name=*my-Group**-my-Topic**-*
 BytesPerSec

 Now splitting on - would no longer work!  To extract my-Group
 and
 my-Topic and BytesPerSec parts I would have to know the
 specific
group
 name and topic name to look for and could not use generic approach
 of
just
 splitting the MBean name on the delimiter.

 *Solution*:
 The patch in https://issues.apache.org/jira/browse/KAFKA-1481
  replaces
 all _ and - characters where they are used as delimiters in MBean
  names
 with a | character.  Because the I character is not allowed in
   

Interaction of retention settings for broker and topic plus partitions

2014-09-18 Thread Cory Watson
Hello all!

I'm curious about the interaction of server and topic level retention
settings. It's not clear to me the precedence of the follow:

   - broker's default log.retention.bytes
   - topic's retention.bytes (which defaults to broker's
   log.retention.bytes)
   - broker's log.retention.hours and log.retention.minutes (if both are
   specified then it seems to be the lower of the two, since it's when
   either is exceeded)

It seems that the rule is that when any of these are violated then the log
segment is deleted. Is this right?

Also, just to be clear: The log sizes in questions are for a single
partitions logs?

I have a situation where my per-topic retention.bytes is very high, but my
default log.retention.hours is lower (the default @ 168 hours). It seems
that it's truncating at the log.retention.hours instead of the topic's
retention.bytes.

Am I understanding this correctly? :)

-- 
Cory Watson
Principal Infrastructure Engineer // Keen IO


Re: [Java New Producer] CPU Usage Spike to 100% when network connection is lost

2014-09-18 Thread Jay Kreps
Hmm, yes, the fix was prior to that, that sounds like a bug.

-Jay

On Thu, Sep 18, 2014 at 9:39 AM, Bhavesh Mistry
mistry.p.bhav...@gmail.com wrote:
 HI Jay,

 I am running trunk producer based on following last commit with timestamp
 Mon Sep 15 20:34:14 2014 -0700.   Please let me know if this timestamp
 contains the fix. Otherwise, I will file a bug with logs as Neha suggested.

 commit cf0f5750b39e675cf9a9c6d6394a665366db0f58
 Author: Alexis Midon mi...@apache.org
 Date:   Mon Sep 15 20:34:14 2014 -0700
 KAFKA-1597 New metrics: ResponseQueueSize and BeingSentResponses;
 reviewed by Neha Narkhede and Jun Rao


 Thanks,

 Bhavesh

 On Wed, Sep 17, 2014 at 11:22 PM, Jay Kreps jay.kr...@gmail.com wrote:

 Also do you know what version you are running we did fix several bugs
 similar to this against trunk.

 -Jay

 On Wed, Sep 17, 2014 at 2:14 PM, Bhavesh Mistry
 mistry.p.bhav...@gmail.com wrote:
  Hi Kafka Dev team,
 
  I see my CPU spike to 100% when network connection is lost for while.  It
  seems network  IO thread are very busy logging following error message.
 Is
  this expected behavior ?
 
  2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR
  org.apache.kafka.clients.producer.internals.Sender - Uncaught error in
  kafka producer I/O thread:
 
  java.lang.IllegalStateException: No entry found for node -2
 
  at org.apache.kafka.clients.ClusterConnectionStates.nodeState(
  ClusterConnectionStates.java:110)
 
  at org.apache.kafka.clients.ClusterConnectionStates.disconnected(
  ClusterConnectionStates.java:99)
 
  at org.apache.kafka.clients.NetworkClient.initiateConnect(
  NetworkClient.java:394)
 
  at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(
  NetworkClient.java:380)
 
  at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174)
 
  at
 org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
 
  at
 org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
 
  at java.lang.Thread.run(Thread.java:744)
 
  Thanks,
 
  Bhavesh



Re: Different partitioning between new producer and old producer

2014-09-18 Thread Jay Kreps
Hey Jae,

The rationale for switching was to use a hash code that is cross
language and not dependent on the particular object. There are all
kinds of gotchas with Java's hashCode() as a partition assignment
strategy (e.g. two byte arrays with the same bytes will have different
hash codes).

-Jay

On Wed, Sep 17, 2014 at 11:00 AM, Bae, Jae Hyeon metac...@gmail.com wrote:
 The major motivation of adopting new producer before it's released, old
 producer is showing terrible throughput of cross-regional kafka mirroring
 in EC2.

 Let me share numbers.

 Using iperf, network bandwidth between us-west-2 AWS EC2 and us-east-1 AWS
 EC2 is more than 40 MB/sec. But old producer's throughput is less than 3
 MB/sec.

 start.timeend.timecompressionmessage.sizebatch.sizetotal.data.sent.in.MB
 MB.sectotal.data.sent.in.nMsgnMsg.sec2014-09-16 20:22:25:5372014-09-16
 20:24:13:13823000200286.102.658910929.3594

 Even though we increased the socket send buffer on the producer side and
 recv buffer on the broker side, it didn't work.
 send.buffer.bytes: 8388608
 start.timeend.timecompressionmessage.sizebatch.sizetotal.data.sent.in.MB
 MB.sectotal.data.sent.in.nMsgnMsg.sec2014-09-16 20:48:49:5882014-09-16
 20:50:03:00623000200286.103.8969101362.0638

 But new producer which is not released yet is showing significant
 performance improvement. Its performance is more than 30MB/sec.
 start.timeend.timecompressionmessage.sizebatch.sizetotal.data.sent.in.MB
 MB.sectotal.data.sent.in.nMsgnMsg.sec2014-09-16 20:50:31:7202014-09-16
 20:50:41:24123000200286.1030.04961010503.098
 I was excited about new producer's performance but its partitioning logic
 is different.

 Without partition number in ProducerRecord, its partitioning logic is based
 on murmur2 hash key. But in the old partitioner, partitioning logic is
 based on key.hashCode.

 Could you make them same logic? Otherwise, I have to change implementation
 of kafka producer container.


Handling errors in the new (0.8.2) Java Client's Producer

2014-09-18 Thread Andrew Stein
I am trying to understand the best practices for working with the new
(0.8.2) Producer interface.

We have a process in a large server that writes a lot of data to Kafka.
However, this data is not mission critical. When a problem arises writing
to Kafka, most specifically network issues, but also full Producer buffers,
we want the server to continue working, but to stop sending data to Kafka,
allowing other tasks to continue. The issue I have is handling messages
that have been sent to the producer but are waiting to go to Kafka. These
messages remain long after my processing is over, timing out, writing to
the logs, and
preventing me from moving forward. I am looking for some way to tell the
client to stop forwarding messages to Kafka.

This is what I have so far:

class ErrorCallback implements Callback {
@Override
public void onCompletion(RecordMetadata metadata, Exception
exception) {
if (exception == null) { // The message was sent,
return;
}

stopProducerSendAndClose();
String threadName = Thread.currentThread().getName();
if (!threadName.equals(kafka-producer-network-thread)) { //
Some of the callbacks happen on my thread
} else { // We are in KafkaProducer's ioThread == commit
suicide.
Thread.currentThread().interrupt();
throw new ThreadDeath(); // Cannot throw an Exception as is
will just be caught and logged.
}
}
}

My question is, is this the correct approach, or is there some other way to
stop sending messages (short of going synced).

Andrew Stein