Re: Poor performance running performance test

2015-01-27 Thread Dillian Murphey
I was running the performance command from a virtual box server, so that
seems like it was part of the problem.  I'm getting better results running
this on a server on aws, but that's kind of expected.  Can you look at
these results, and comment on the occasional warning I see?  I appreciate
it!

1220375 records sent, 243928.6 records/sec (23.26 MB/sec), 2111.5 ms avg
latency, 4435.0 max latency.
1195090 records sent, 239018.0 records/sec (22.79 MB/sec), 2203.1 ms avg
latency, 4595.0 max latency.
1257165 records sent, 251433.0 records/sec (23.98 MB/sec), 2172.6 ms avg
latency, 4525.0 max latency.
1230981 records sent, 246196.2 records/sec (23.48 MB/sec), 2173.5 ms avg
latency, 4465.0 max latency.
[2015-01-28 07:19:07,274] WARN Error in I/O with
(org.apache.kafka.common.network.Selector)
java.io.EOFException
at
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62)
at org.apache.kafka.common.network.Selector.poll(Selector.java:248)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
at java.lang.Thread.run(Thread.java:745)
1090689 records sent, 218137.8 records/sec (20.80 MB/sec), 2413.6 ms avg
latency, 4829.0 max latency.

On Tue, Jan 27, 2015 at 7:37 PM, Ewen Cheslack-Postava 
wrote:

> Where are you running ProducerPerformance in relation to ZK and the Kafka
> brokers? You should definitely see much higher performance than this.
>
> A couple of other things I can think of that might be going wrong: Are all
> your VMs in the same AZ? Are you storing Kafka data in EBS or local
> ephemeral storage? If EBS, have you provisioned enough IOPS.
>
>
> On Tue, Jan 27, 2015 at 4:29 PM, Dillian Murphey 
> wrote:
>
> > I'm a new user/admin to kafka. I'm running a 3 node ZK and a 6 brokers on
> > aws.
> >
> > The performance I'm seeing is shockingly bad. I need some advice!
> >
> > bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance
> > test2 5000 100 -1 acks=1 bootstrap.servers=5:9092
> > buffer.memory=67108864 batch.size=8196
> >
> >
> >
> >
> > 6097 records sent, 13198.3 records/sec (1.26 MB/sec), 2098.0 ms avg
> > latency, 4306.0 max latency.
> > 71695 records sent, 14339.0 records/sec (1.37 MB/sec), 6658.1 ms avg
> > latency, 9053.0 max latency.
> > 65195 records sent, 13028.6 records/sec (1.24 MB/sec), 11504.0 ms avg
> > latency, 13809.0 max latency.
> > 71955 records sent, 14391.0 records/sec (1.37 MB/sec), 16137.4 ms avg
> > latency, 18541.0 max latency.
> >
> > Thanks for any help!
> >
>
>
>
> --
> Thanks,
> Ewen
>


Re: Can't create a topic; can't delete it either

2015-01-27 Thread Joel Koshy
Do you still have the controller and state change logs from the time
you originally tried to delete the topic?

On Tue, Jan 27, 2015 at 03:11:48PM -0800, Sumit Rangwala wrote:
> I am using 0.8.2-beta on brokers 0.8.1.1 for client (producer and
> consumers). delete.topic.enable=true on all brokers. replication factor is
> < number of brokers. I see this issue with just one single topic, all other
> topics are fine (creation and deletion). Even after a day it is still in
> marked for deletion stage. Let me know what other  information from the
> brokers or the zookeepers can help me debug this issue.
> 
> On Tue, Jan 27, 2015 at 9:47 AM, Gwen Shapira  wrote:
> 
> > Also, do you have delete.topic.enable=true on all brokers?
> >
> > The automatic topic creation can fail if the default number of
> > replicas is greater than number of available brokers. Check the
> > default.replication.factor parameter.
> >
> > Gwen
> >
> > On Tue, Jan 27, 2015 at 12:29 AM, Joel Koshy  wrote:
> > > Which version of the broker are you using?
> > >
> > > On Mon, Jan 26, 2015 at 10:27:14PM -0800, Sumit Rangwala wrote:
> > >> While running kafka in production I found an issue where a topic wasn't
> > >> getting created even with auto topic enabled. I then went ahead and
> > created
> > >> the topic manually (from the command line). I then delete the topic,
> > again
> > >> manually. Now my broker won't allow me to either create *the* topic or
> > >> delete *the* topic. (other topic creation and deletion is working fine).
> > >>
> > >> The topic is in "marked for deletion" stage for more than 3 hours.
> > >>
> > >> $ bin/kafka-topics.sh --zookeeper zookeeper1:2181/replication/kafka
> > --list
> > >> --topic GRIFFIN-TldAdFormat.csv-1422321736886
> > >> GRIFFIN-TldAdFormat.csv-1422321736886 - marked for deletion
> > >>
> > >> If this is a known issue, is there a workaround?
> > >>
> > >> Sumit
> > >
> >



Re: Ques regarding topic partition offset

2015-01-27 Thread Joel Koshy
This can happen as a result of unclean leader elections - there are
mbeans on the controller that give the unclean leader election rate -
or you can check the controller logs to determine if this happened.

On Tue, Jan 27, 2015 at 09:54:38PM -0800, Liju John wrote:
> Hi ,
> 
> I have query regarding partition offset .
> 
> While running kafka cluster for some time ,I noticed that the partition
> offset keeps on increasing and at some point the offset decreased by some
> number .
> In what  scenarios does the offset of a topic partition reduces ?
> 
> The problem I am facing is that my consumers are pulling the msgs from the
> topic but at somepoint in time it throws exception that the current offset
> is greater that the latest offset of the partition .
> Is it because of retension the latest offset gets reset ? How can I handle
> this scenarios
> 
> Regards,
> Liju John
> S/W developer



Re: [DISCUSSION] Boot dependency in the new producer

2015-01-27 Thread Guozhang Wang
Steven,

You are right, I was wrong about the previous email: it will not set the
flag, the Sender thread will trigger Client.poll() but with Int.Max select
time, hence this should not be an issue. I am closing this discussion now.

Guozhang

On Mon, Jan 26, 2015 at 4:42 PM, Steven Wu  wrote:

> Jay, I don't think this line will bootstrap full metadata (for all topics).
> it will just construct the cluster object with bootstrap host. you need to
> do "metadata.add(topic)"  to set interest of a topic's partition metadata.
>
> Guozhang, I personally think this is ok. it just do a few DNS lookup or TCP
> connection before first send.
>
> On Mon, Jan 26, 2015 at 2:07 PM, Jay Kreps  wrote:
>
> > Oh, yes, I guess I thought you meant that construction of the client
> would
> > block on the metadata request.
> >
> > I don't personally think that is a problem because if it fails it will
> > retry in the background, right?
> >
> > But actually I think this is probably violating another desirable
> criteria
> > we had talked about which was keeping the producer from bootstrapping the
> > full metadata for all partitions. If it is doing that during construction
> > time presumably the resulting metadata request is for all partitions, no?
> > That isn't a huge problem, but I think isn't what was intended.
> >
> > -Jay
> >
> > On Mon, Jan 26, 2015 at 1:34 PM, Guozhang Wang 
> wrote:
> >
> > > It will set the needUpdate flag to true and hence the background Sender
> > > will try to talk to the bootstrap servers.
> > >
> > > Guozhang
> > >
> > > On Mon, Jan 26, 2015 at 1:12 PM, Jay Kreps 
> wrote:
> > >
> > > > Hey Guozhang,
> > > >
> > > > That line shouldn't cause any connections to Kafka to be established,
> > > does
> > > > it? All that is doing is creating the Cluster pojo using the supplied
> > > > addresses. The use of InetSocketAddress may cause some dns stuff to
> > > happen,
> > > > though...
> > > >
> > > > -Jay
> > > >
> > > > On Mon, Jan 26, 2015 at 10:50 AM, Guozhang Wang 
> > > > wrote:
> > > >
> > > > > Hi all,
> > > > >
> > > > > I am not sure if we have discussed about this before, but recently
> I
> > > > > realized that we have introduced boot dependency of the
> kafka-server
> > > > > specified by the "bootstrap.servers" config in the new producer.
> More
> > > > > specifically, although in the old producer we also have a similar
> > > config
> > > > > for specifying the broker list, the producer will not try to
> connect
> > to
> > > > > those brokers until the first message send call is triggered;
> whereas
> > > in
> > > > > the new producer, it will try to talk to them in construction time
> > via:
> > > > >
> > > > > update(Cluster.bootstrap(addresses), time.milliseconds());
> > > > >
> > > > >
> > > > > I personally am neutral to this change, as in most cases the
> > > > corresponding
> > > > > kafka server should be up and running before the producer clients
> are
> > > > > deployed, but there are still some corner cases when it is not
> true,
> > > for
> > > > > example some standalone deployment tests of the app embedded with
> > some
> > > > > clients, etc. So I would like to bring this up to people's
> attention
> > if
> > > > we
> > > > > have not discussed about it before: do we think this is OK to
> > introduce
> > > > > this boot dependency in the new producer?
> > > > >
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>



-- 
-- Guozhang


Re: SimpleConsumer leaks sockets on an UnresolvedAddressException

2015-01-27 Thread Guozhang Wang
Ewen, you are right, the patch is committed on Feb.20th last year, I will
leave a comment and close that ticket.

On Tue, Jan 27, 2015 at 7:24 PM, Ewen Cheslack-Postava 
wrote:

> This was fixed in commit 6ab9b1ecd8 for KAFKA-1235 and it looks like that
> will only be included in 0.8.2.
>
> Guozhang, it looks like you wrote the patch, Jun reviewed it, but the bug
> is still open and there's a comment that moved it to 0.9 after the commit
> was already made. Was the commit a mistake or did we just forget to close
> it?
>
> On Tue, Jan 27, 2015 at 10:20 AM, Rajiv Kurian 
> wrote:
>
> > Here is the relevant stack trace:
> >
> > java.nio.channels.UnresolvedAddressException: null
> >
> > at sun.nio.ch.Net.checkAddress(Net.java:127) ~[na:1.7.0_55]
> >
> > at
> sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:644)
> > ~[na:1.7.0_55]
> >
> > at kafka.network.BlockingChannel.connect(Unknown Source)
> > ~[kafka_2.10-0.8.0.jar:0.8.0]
> >
> > at kafka.consumer.SimpleConsumer.connect(Unknown Source)
> > ~[kafka_2.10-0.8.0.jar:0.8.0]
> >
> > at kafka.consumer.SimpleConsumer.getOrMakeConnection(Unknown
> > Source) ~[kafka_2.10-0.8.0.jar:0.8.0]
> >
> > at
> >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(Unknown
> > Source) ~[kafka_2.10-0.8.0.jar:0.8.0]
> >
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Unknown
> > Source) ~[kafka_2.10-0.8.0.jar:0.8.0]
> >
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(Unknown
> > Source) ~[kafka_2.10-0.8.0.jar:0.8.0]
> >
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(Unknown
> > Source) ~[kafka_2.10-0.8.0.jar:0.8.0]
> >
> > at kafka.metrics.KafkaTimer.time(Unknown Source)
> > ~[kafka_2.10-0.8.0.jar:0.8.0]
> >
> > at
> > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(Unknown
> Source)
> > ~[kafka_2.10-0.8.0.jar:0.8.0]
> >
> > at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(Unknown
> > Source) ~[kafka_2.10-0.8.0.jar:0.8.0]
> >
> > at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(Unknown
> > Source) ~[kafka_2.10-0.8.0.jar:0.8.0]
> >
> > at kafka.metrics.KafkaTimer.time(Unknown Source)
> > ~[kafka_2.10-0.8.0.jar:0.8.0]
> >
> > at kafka.consumer.SimpleConsumer.fetch(Unknown Source)
> > ~[kafka_2.10-0.8.0.jar:0.8.0]
> >
> > at kafka.javaapi.consumer.SimpleConsumer.fetch(Unknown Source)
> > ~[kafka_2.10-0.8.0.jar:0.8.0]
> >
> > On Tue, Jan 27, 2015 at 10:19 AM, Rajiv Kurian 
> > wrote:
> >
> > > I am using 0.8.1. The source is here:
> > >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
> > >
> > > Here is the definition of disconnect():
> > > private def disconnect() = {
> > > if(blockingChannel.isConnected) {
> > >   debug("Disconnecting from " + host + ":" + port)
> > >   blockingChannel.disconnect()
> > > }
> > >   }
> > > It checks if blockingChannel.isConnected before calling
> > > blockingChannel.disconnect(). I think if there is an
> > > UnresolvedAddressException, the isConnected is never set and the
> > > blockingChannel.disconnect() is never called. But by this point we have
> > > already created a socket and will leak it.
> > >
> > > The same problem might be present in the connect method of the
> > > BlockingChannel at
> > >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/network/BlockingChannel.scala
> > .
> > > Though its own disconnect method seems to check for both the connected:
> > >
> > > def disconnect() = lock synchronized {
> > > // My comment: connected will not be set if we get an
> > > UnresolvedAddressException but channel should NOT  be null, so we will
> > > probably still do the right thing.
> > > if(connected || channel != null) {
> > >   // closing the main socket channel *should* close the read
> channel
> > >   // but let's do it to be sure.
> > >   swallow(channel.close())
> > >   swallow(channel.socket.close())
> > >   if(readChannel != null) swallow(readChannel.close())
> > >   channel = null; readChannel = null; writeChannel = null
> > >   connected = false
> > > }
> > >   }
> > >
> > >
> > >
> > > On Tue, Jan 27, 2015 at 9:03 AM, Guozhang Wang 
> > wrote:
> > >
> > >> Rajiv,
> > >>
> > >> Which version of Kafka are you using? I just checked SimpleConsumer's
> > >> code,
> > >> and in its close() function, disconnect() is called, which will close
> > the
> > >> socket.
> > >>
> > >> Guozhang
> > >>
> > >>
> > >> On Mon, Jan 26, 2015 at 2:36 PM, Rajiv Kurian 
> > >> wrote:
> > >>
> > >> > Meant to write a run loop.
> > >> >
> > >> > void run() {
> > >> >   while (running) {
> > >> > if (simpleConsumer == null) {
> > >> >   simpleConsumer = new SimpleConsumer(host, port,
> 

Ques regarding topic partition offset

2015-01-27 Thread Liju John
Hi ,

I have query regarding partition offset .

While running kafka cluster for some time ,I noticed that the partition
offset keeps on increasing and at some point the offset decreased by some
number .
In what  scenarios does the offset of a topic partition reduces ?

The problem I am facing is that my consumers are pulling the msgs from the
topic but at somepoint in time it throws exception that the current offset
is greater that the latest offset of the partition .
Is it because of retension the latest offset gets reset ? How can I handle
this scenarios

Regards,
Liju John
S/W developer


Re: Poor performance running performance test

2015-01-27 Thread Ewen Cheslack-Postava
Where are you running ProducerPerformance in relation to ZK and the Kafka
brokers? You should definitely see much higher performance than this.

A couple of other things I can think of that might be going wrong: Are all
your VMs in the same AZ? Are you storing Kafka data in EBS or local
ephemeral storage? If EBS, have you provisioned enough IOPS.


On Tue, Jan 27, 2015 at 4:29 PM, Dillian Murphey 
wrote:

> I'm a new user/admin to kafka. I'm running a 3 node ZK and a 6 brokers on
> aws.
>
> The performance I'm seeing is shockingly bad. I need some advice!
>
> bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance
> test2 5000 100 -1 acks=1 bootstrap.servers=5:9092
> buffer.memory=67108864 batch.size=8196
>
>
>
>
> 6097 records sent, 13198.3 records/sec (1.26 MB/sec), 2098.0 ms avg
> latency, 4306.0 max latency.
> 71695 records sent, 14339.0 records/sec (1.37 MB/sec), 6658.1 ms avg
> latency, 9053.0 max latency.
> 65195 records sent, 13028.6 records/sec (1.24 MB/sec), 11504.0 ms avg
> latency, 13809.0 max latency.
> 71955 records sent, 14391.0 records/sec (1.37 MB/sec), 16137.4 ms avg
> latency, 18541.0 max latency.
>
> Thanks for any help!
>



-- 
Thanks,
Ewen


Re: SimpleConsumer leaks sockets on an UnresolvedAddressException

2015-01-27 Thread Ewen Cheslack-Postava
This was fixed in commit 6ab9b1ecd8 for KAFKA-1235 and it looks like that
will only be included in 0.8.2.

Guozhang, it looks like you wrote the patch, Jun reviewed it, but the bug
is still open and there's a comment that moved it to 0.9 after the commit
was already made. Was the commit a mistake or did we just forget to close
it?

On Tue, Jan 27, 2015 at 10:20 AM, Rajiv Kurian  wrote:

> Here is the relevant stack trace:
>
> java.nio.channels.UnresolvedAddressException: null
>
> at sun.nio.ch.Net.checkAddress(Net.java:127) ~[na:1.7.0_55]
>
> at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:644)
> ~[na:1.7.0_55]
>
> at kafka.network.BlockingChannel.connect(Unknown Source)
> ~[kafka_2.10-0.8.0.jar:0.8.0]
>
> at kafka.consumer.SimpleConsumer.connect(Unknown Source)
> ~[kafka_2.10-0.8.0.jar:0.8.0]
>
> at kafka.consumer.SimpleConsumer.getOrMakeConnection(Unknown
> Source) ~[kafka_2.10-0.8.0.jar:0.8.0]
>
> at
>
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(Unknown
> Source) ~[kafka_2.10-0.8.0.jar:0.8.0]
>
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Unknown
> Source) ~[kafka_2.10-0.8.0.jar:0.8.0]
>
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(Unknown
> Source) ~[kafka_2.10-0.8.0.jar:0.8.0]
>
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(Unknown
> Source) ~[kafka_2.10-0.8.0.jar:0.8.0]
>
> at kafka.metrics.KafkaTimer.time(Unknown Source)
> ~[kafka_2.10-0.8.0.jar:0.8.0]
>
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(Unknown Source)
> ~[kafka_2.10-0.8.0.jar:0.8.0]
>
> at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(Unknown
> Source) ~[kafka_2.10-0.8.0.jar:0.8.0]
>
> at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(Unknown
> Source) ~[kafka_2.10-0.8.0.jar:0.8.0]
>
> at kafka.metrics.KafkaTimer.time(Unknown Source)
> ~[kafka_2.10-0.8.0.jar:0.8.0]
>
> at kafka.consumer.SimpleConsumer.fetch(Unknown Source)
> ~[kafka_2.10-0.8.0.jar:0.8.0]
>
> at kafka.javaapi.consumer.SimpleConsumer.fetch(Unknown Source)
> ~[kafka_2.10-0.8.0.jar:0.8.0]
>
> On Tue, Jan 27, 2015 at 10:19 AM, Rajiv Kurian 
> wrote:
>
> > I am using 0.8.1. The source is here:
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
> >
> > Here is the definition of disconnect():
> > private def disconnect() = {
> > if(blockingChannel.isConnected) {
> >   debug("Disconnecting from " + host + ":" + port)
> >   blockingChannel.disconnect()
> > }
> >   }
> > It checks if blockingChannel.isConnected before calling
> > blockingChannel.disconnect(). I think if there is an
> > UnresolvedAddressException, the isConnected is never set and the
> > blockingChannel.disconnect() is never called. But by this point we have
> > already created a socket and will leak it.
> >
> > The same problem might be present in the connect method of the
> > BlockingChannel at
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/network/BlockingChannel.scala
> .
> > Though its own disconnect method seems to check for both the connected:
> >
> > def disconnect() = lock synchronized {
> > // My comment: connected will not be set if we get an
> > UnresolvedAddressException but channel should NOT  be null, so we will
> > probably still do the right thing.
> > if(connected || channel != null) {
> >   // closing the main socket channel *should* close the read channel
> >   // but let's do it to be sure.
> >   swallow(channel.close())
> >   swallow(channel.socket.close())
> >   if(readChannel != null) swallow(readChannel.close())
> >   channel = null; readChannel = null; writeChannel = null
> >   connected = false
> > }
> >   }
> >
> >
> >
> > On Tue, Jan 27, 2015 at 9:03 AM, Guozhang Wang 
> wrote:
> >
> >> Rajiv,
> >>
> >> Which version of Kafka are you using? I just checked SimpleConsumer's
> >> code,
> >> and in its close() function, disconnect() is called, which will close
> the
> >> socket.
> >>
> >> Guozhang
> >>
> >>
> >> On Mon, Jan 26, 2015 at 2:36 PM, Rajiv Kurian 
> >> wrote:
> >>
> >> > Meant to write a run loop.
> >> >
> >> > void run() {
> >> >   while (running) {
> >> > if (simpleConsumer == null) {
> >> >   simpleConsumer = new SimpleConsumer(host, port,
> >> > (int) kafkaSocketTimeout, kafkaRExeiveBufferSize, clientName);
> >> > }
> >> > try {
> >> >   // Do stuff with simpleConsumer.
> >> > } catch (Exception e) {
> >> >   logger.error(e);  // Assume UnresolvedAddressException.
> >> >   if (consumer != null) {
> >> > simpleConsumer.close();
> >> > simpleConsumer = null;
> >> >   }
> >> > }
> >> >   }
> >> > }
> >> >
> >> > On Mon, Jan 26, 2015 at 2:27 PM, Rajiv Kurian

Poor performance running performance test

2015-01-27 Thread Dillian Murphey
I'm a new user/admin to kafka. I'm running a 3 node ZK and a 6 brokers on
aws.

The performance I'm seeing is shockingly bad. I need some advice!

bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance
test2 5000 100 -1 acks=1 bootstrap.servers=5:9092
buffer.memory=67108864 batch.size=8196




6097 records sent, 13198.3 records/sec (1.26 MB/sec), 2098.0 ms avg
latency, 4306.0 max latency.
71695 records sent, 14339.0 records/sec (1.37 MB/sec), 6658.1 ms avg
latency, 9053.0 max latency.
65195 records sent, 13028.6 records/sec (1.24 MB/sec), 11504.0 ms avg
latency, 13809.0 max latency.
71955 records sent, 14391.0 records/sec (1.37 MB/sec), 16137.4 ms avg
latency, 18541.0 max latency.

Thanks for any help!


Re: Can't create a topic; can't delete it either

2015-01-27 Thread Sumit Rangwala
I am using 0.8.2-beta on brokers 0.8.1.1 for client (producer and
consumers). delete.topic.enable=true on all brokers. replication factor is
< number of brokers. I see this issue with just one single topic, all other
topics are fine (creation and deletion). Even after a day it is still in
marked for deletion stage. Let me know what other  information from the
brokers or the zookeepers can help me debug this issue.

On Tue, Jan 27, 2015 at 9:47 AM, Gwen Shapira  wrote:

> Also, do you have delete.topic.enable=true on all brokers?
>
> The automatic topic creation can fail if the default number of
> replicas is greater than number of available brokers. Check the
> default.replication.factor parameter.
>
> Gwen
>
> On Tue, Jan 27, 2015 at 12:29 AM, Joel Koshy  wrote:
> > Which version of the broker are you using?
> >
> > On Mon, Jan 26, 2015 at 10:27:14PM -0800, Sumit Rangwala wrote:
> >> While running kafka in production I found an issue where a topic wasn't
> >> getting created even with auto topic enabled. I then went ahead and
> created
> >> the topic manually (from the command line). I then delete the topic,
> again
> >> manually. Now my broker won't allow me to either create *the* topic or
> >> delete *the* topic. (other topic creation and deletion is working fine).
> >>
> >> The topic is in "marked for deletion" stage for more than 3 hours.
> >>
> >> $ bin/kafka-topics.sh --zookeeper zookeeper1:2181/replication/kafka
> --list
> >> --topic GRIFFIN-TldAdFormat.csv-1422321736886
> >> GRIFFIN-TldAdFormat.csv-1422321736886 - marked for deletion
> >>
> >> If this is a known issue, is there a workaround?
> >>
> >> Sumit
> >
>


Re: [VOTE] 0.8.2.0 Candidate 2 (with the correct links)

2015-01-27 Thread Jun Rao
Hi, Everyone,

We identified a blocker issue related to Yammer jmx reporter (
https://issues.apache.org/jira/browse/KAFKA-1902). We are addressing this
issue right now. Once it's resolved, we will roll out a new RC for voting.

Thanks,

Jun

On Mon, Jan 26, 2015 at 5:14 PM, Otis Gospodnetic <
otis.gospodne...@gmail.com> wrote:

> Hi,
>
> Don't use Graphite, so I don't know.  Kyle, maybe you can share more info?
> What do you mean by "reported to Yammer" for example?  And when you say
> Yammer/Graphite, are you trying to say that you are using the Graphite
> Reporter?  If so, can you try other Yammer Reporters and see if there is a
> more general issue or something limited to either Graphite or Graphite
> Reporter?
>
> I am pretty sure we are able to see all Kafka 0.8.2 metrics nicely in SPM
> (in non-public version of the Kafka monitoring agent).
>
> Otis
> --
> Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> Solr & Elasticsearch Support * http://sematext.com/
>
>
> On Mon, Jan 26, 2015 at 7:37 PM, Jun Rao  wrote:
>
> > Hmm, that's not the intention. The per-topic mbeans are definitely
> > registered by Yammer. So, not sure why it's not reported to Graphite.
> >
> > Otis, Vladimir,
> >
> > Do you guys know?
> >
> > Thanks,
> >
> > Jun
> >
> >
> >
> > On Mon, Jan 26, 2015 at 4:08 PM, Kyle Banker 
> wrote:
> >
> > > This is still preliminary, but it looks as if the change to metric
> names
> > > for per-topic metrics (bytes/messages in/out) is preventing these
> metrics
> > > from being reported to Yammer/Graphite. If this isn't intentional, it
> > > should probably be addressed before release.
> > >
> > > On Wed, Jan 21, 2015 at 9:28 AM, Jun Rao  wrote:
> > >
> > > > This is the second candidate for release of Apache Kafka 0.8.2.0.
> There
> > > has
> > > > been some changes since the 0.8.2 beta release, especially in the new
> > > java
> > > > producer api and jmx mbean names. It would be great if people can
> test
> > > this
> > > > out thoroughly.
> > > >
> > > > Release Notes for the 0.8.2.0 release
> > > >
> > > >
> > >
> >
> https://people.apache.org/~junrao/kafka-0.8.2.0-candidate2/RELEASE_NOTES.html
> > > >
> > > > *** Please download, test and vote by Monday, Jan 26h, 7pm PT
> > > >
> > > > Kafka's KEYS file containing PGP keys we use to sign the release:
> > > > http://kafka.apache.org/KEYS in addition to the md5, sha1 and sha2
> > > > (SHA256)
> > > > checksum.
> > > >
> > > > * Release artifacts to be voted upon (source and binary):
> > > > https://people.apache.org/~junrao/kafka-0.8.2.0-candidate2/
> > > >
> > > > * Maven artifacts to be voted upon prior to release:
> > > > https://repository.apache.org/content/groups/staging/
> > > >
> > > > * scala-doc
> > > > https://people.apache.org/~junrao/kafka-0.8.2.0-candidate2/scaladoc/
> > > >
> > > > * java-doc
> > > > https://people.apache.org/~junrao/kafka-0.8.2.0-candidate2/javadoc/
> > > >
> > > > * The tag to be voted upon (off the 0.8.2 branch) is the 0.8.2.0 tag
> > > >
> > > >
> > >
> >
> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=058d58adef2ab2787e49d8efeefd61bb3d32f99c
> > > > (commit 0b312a6b9f0833d38eec434bfff4c647c1814564)
> > > >
> > > > /***
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > >
> >
>


Replication stop working

2015-01-27 Thread Dong, John
Hi,

I am new to this forum and I am not sure this is the correct mailing list for 
sending question. If not, please let me know and I will stop.

I am looking for help to resolve replication issue. Replication stopped working 
a while back.

Kafka environment: Kafka 0.8.1.1, Centos 6.5, 7 node cluster, default 
replication-factor 2, 10 partition per topic.

Initially each partition is residing on two different nodes. It has been this 
way for several months and working. Starting two weeks ago, two things happened.

  1.  one node's disk usage got to 100% and crashed kafka process. So we had to 
delete some *.log and *.index and restarted kafka process.
  2.  In another case, some other node's disk usage reached 90%. Someone 
deleted some *.log and *.index files without shutting down kafka process. This 
caused issues and kafka was unable to restarted. I had to delete all *.log and 
*.index on this node to bring kafka back online.

Now replication is all broken. Most of the partition has only one leader and 
one in ISR, even though replication is setup with two broker ids. Whenever I 
shutdown kafka process on a node, whatever leader running on this node will get 
moved to another node that is defined in replication. After I restart kafka on 
this node, it will never become a follower and its data directory never get 
updated.

I tried the following:


  1.  I had turned on TRACE/DEBUG level with kafka and zookeeper. I did not 
find anything that can help.
  2.   I also tried to manipulate replication configuration in zookeeper using 
zkCLI.sh, like adding a follower to ISR list. That did not initiate a fether 
process to make itself become a follower.
  3.   I also created new topic with replication working initially. But as soon 
as I shutdown kafka on one of its two nodes, that partition loses one replica 
in ISR and never come back. This confirms that it is reproducible.
  4.  I ran kafka preferred replication election tool to force re-election of 
leader. That did not do anything. It is like nothing happen to the cluster.
  5.  I added num.replica.fetchers=10 to server.properties and restarted kakfa. 
That did not do anything.

Has anyone have any experience with this ? Or any advice where to look and what 
the next steps are for trouble-shooting ? There are only two things that I may 
have to do.


  1.  Shutdown all kafka and zookeeper and restart them. I really do not want 
to go this route unless I have to. I would like to identify the root cause of 
it and not to randomly restart the whole cluster.
  2.  Move all topics to another kafka cluster, and rebuild it. This will be 
very time consuming and a lot of changes in the application.

Thanks.

John Dong


Re: Missing Per-Topic BrokerTopicMetrics in v0.8.2.0

2015-01-27 Thread Jun Rao
Jason, Kyle,

Added comments to the jira. Let me know if they make sense. The dot
convention is a bit tricky to follow since we allow dots in topic and
clientId, etc. Also, we probably don't want to use a convention too
specific for Graphite since other systems may have other conventions.

Thanks,

Jun

On Tue, Jan 27, 2015 at 9:32 AM, Jason Rosenberg  wrote:

> I added a comment to the ticket.  I think it will work getting data
> disambiguated (didn't actually test end to end to graphite).
> However, the naming scheme is not ideal for how metric ui's typically would
> present the metric tree (e.g. jmx tag syntax doesn't really translate).
>
> Jason
>
> On Tue, Jan 27, 2015 at 11:19 AM, Jun Rao  wrote:
>
> > Jason, Kyle,
> >
> > I created an 0.8.2 blocker
> > https://issues.apache.org/jira/browse/KAFKA-1902
> > and attached a patch there. Could you test it out and see if it fixes the
> > issue with the reporter? The patch adds tags as scope in MetricName.
> >
> > Thanks,
> >
> > Jun
> >
> > On Tue, Jan 27, 2015 at 7:39 AM, Jun Rao  wrote:
> >
> > > Jason,
> > >
> > > So, this sounds like a real issue. Perhaps we can fix it just by
> setting
> > > the tag name as the scope. For example, for mbean kafka.server:type=
> > > BrokerTopicMetrics,name=BytesInPerSec,topic=test, we can have
> > >
> > > group: "kafka.server"
> > > type: "BrokerTopicMetrics"
> > > name: "BytesInPerSec"
> > > scope: "topic=test"
> > >
> > > Do you know if scope can have characters like "=" and "," (e.g., for
> > scope
> > > like "topic=test,partition=1")?
> > >
> > > The issue with using mytopic-BytesInPerSec as the name is what we are
> > > trying to fix in kafka-1481. Topic name (and clientId, etc) can have
> dash
> > > in it and it's hard to parse.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > >
> > > On Tue, Jan 27, 2015 at 6:30 AM, Jason Rosenberg 
> > wrote:
> > >
> > >> Remember multiple people have reported this issue. Per topic metrics
> no
> > >> longer appear in graphite (or in any system modeled after the yammer
> > >> GraphiteReporter). They are not being seen as unique.
> > >>
> > >> While these metrics are registered in the registry as separate
> > >> ‘MetricName’
> > >> instances (varying only by mbeanName), the GraphiteReporter sends the
> > >> metrics to graphite using only the 4 fields I describe above.
> > >> Consequently,
> > >> multiple metrics in the registry get sent to graphite under the same
> > name.
> > >> Thus these metrics all end up in the same bucket in graphite,
> trampling
> > >> over each other making them useless. They aren’t ‘double counted’ so
> > much
> > >> as flapping between multiple independent values.
> > >>
> > >> We actually have our own Reporter class (based off the yammer
> > >> GraphiteReporter). Our version sends metrics through kafka which is
> then
> > >> consumed downstream by multiple metric consumers.
> > >>
> > >> The ConsoleReporter isn’t useful for actually persisting metrics
> > anywhere.
> > >> It’s just iterating through all the (identically named metrics in the
> > >> registry (save for the different mbeanNames))….
> > >>
> > >> The mbeanName, as constructed, is not useful as a human readable
> metric
> > >> name, to be presented in a browsable tree of metrics, etc. The
> > >> ‘group’:’type’:’name’:’scope’ are the pieces that matter.
> > >>
> > >> The fix here is to produce MetricName instances similar to 0.8.1.1,
> etc.
> > >> In
> > >> this case, it should probably be something like:
> > >>
> > >> group: kafka.server
> > >> type: BrokerTopicMetrics
> > >> name: mytopic-BytesInPerSec
> > >> group: 
> > >>
> > >> Jason
> > >> ​
> > >>
> > >> On Tue, Jan 27, 2015 at 7:26 AM, Manikumar Reddy <
> ku...@nmsworks.co.in>
> > >> wrote:
> > >>
> > >> > I have enabled yammer's ConsoleReporter and I am getting all the
> > metrics
> > >> > (including per-topic metrics).
> > >> >
> > >> > Yammer's MetricName object implements equals/hashcode methods using
> > >> > mBeanName . We are constructing a unique mBeanName for each metric,
> So
> > >> we
> > >> > are not missing/overwriting any metrics.
> > >> >
> > >> > Current confusion is due to  MetricName.name(). This will be same
> > >> > (BytesInPerSec) for both broker level and topic level metrics. We
> need
> > >> to
> > >> > use MetricName.getMBeanName() to differentiate between broker level
> > and
> > >> > topic level metrics.
> > >> >
> > >> > 0.8.1  MBeanName:
> > >> >
> "kafka.server":type="BrokerTopicMetrics",name="AllTopicsBytesInPerSec"
> > >> >
> "kafka.server":type="BrokerTopicMetrics",name="MYTOPIC-BytesInPerSec"
> > >> >
> > >> > 0.8.2  MBeanName:
> > >> > kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec
> > >> >
> kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=MYTOPIC
> > >> >
> > >> >
> > >> > ConsoleReporter's O/P:
> > >> >
> > >> >   BytesInPerSec:  <- This is broker level
> > >> >  count = 1521
> > >> >  mean rate = 3.63 bytes/s
> > >> >  1-minute rate = 0.35 bytes/s

Re: SimpleConsumer leaks sockets on an UnresolvedAddressException

2015-01-27 Thread Rajiv Kurian
Here is the relevant stack trace:

java.nio.channels.UnresolvedAddressException: null

at sun.nio.ch.Net.checkAddress(Net.java:127) ~[na:1.7.0_55]

at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:644)
~[na:1.7.0_55]

at kafka.network.BlockingChannel.connect(Unknown Source)
~[kafka_2.10-0.8.0.jar:0.8.0]

at kafka.consumer.SimpleConsumer.connect(Unknown Source)
~[kafka_2.10-0.8.0.jar:0.8.0]

at kafka.consumer.SimpleConsumer.getOrMakeConnection(Unknown
Source) ~[kafka_2.10-0.8.0.jar:0.8.0]

at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(Unknown
Source) ~[kafka_2.10-0.8.0.jar:0.8.0]

at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Unknown
Source) ~[kafka_2.10-0.8.0.jar:0.8.0]

at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(Unknown
Source) ~[kafka_2.10-0.8.0.jar:0.8.0]

at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(Unknown
Source) ~[kafka_2.10-0.8.0.jar:0.8.0]

at kafka.metrics.KafkaTimer.time(Unknown Source)
~[kafka_2.10-0.8.0.jar:0.8.0]

at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(Unknown Source)
~[kafka_2.10-0.8.0.jar:0.8.0]

at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(Unknown
Source) ~[kafka_2.10-0.8.0.jar:0.8.0]

at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(Unknown
Source) ~[kafka_2.10-0.8.0.jar:0.8.0]

at kafka.metrics.KafkaTimer.time(Unknown Source)
~[kafka_2.10-0.8.0.jar:0.8.0]

at kafka.consumer.SimpleConsumer.fetch(Unknown Source)
~[kafka_2.10-0.8.0.jar:0.8.0]

at kafka.javaapi.consumer.SimpleConsumer.fetch(Unknown Source)
~[kafka_2.10-0.8.0.jar:0.8.0]

On Tue, Jan 27, 2015 at 10:19 AM, Rajiv Kurian  wrote:

> I am using 0.8.1. The source is here:
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
>
> Here is the definition of disconnect():
> private def disconnect() = {
> if(blockingChannel.isConnected) {
>   debug("Disconnecting from " + host + ":" + port)
>   blockingChannel.disconnect()
> }
>   }
> It checks if blockingChannel.isConnected before calling
> blockingChannel.disconnect(). I think if there is an
> UnresolvedAddressException, the isConnected is never set and the
> blockingChannel.disconnect() is never called. But by this point we have
> already created a socket and will leak it.
>
> The same problem might be present in the connect method of the
> BlockingChannel at
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/network/BlockingChannel.scala.
> Though its own disconnect method seems to check for both the connected:
>
> def disconnect() = lock synchronized {
> // My comment: connected will not be set if we get an
> UnresolvedAddressException but channel should NOT  be null, so we will
> probably still do the right thing.
> if(connected || channel != null) {
>   // closing the main socket channel *should* close the read channel
>   // but let's do it to be sure.
>   swallow(channel.close())
>   swallow(channel.socket.close())
>   if(readChannel != null) swallow(readChannel.close())
>   channel = null; readChannel = null; writeChannel = null
>   connected = false
> }
>   }
>
>
>
> On Tue, Jan 27, 2015 at 9:03 AM, Guozhang Wang  wrote:
>
>> Rajiv,
>>
>> Which version of Kafka are you using? I just checked SimpleConsumer's
>> code,
>> and in its close() function, disconnect() is called, which will close the
>> socket.
>>
>> Guozhang
>>
>>
>> On Mon, Jan 26, 2015 at 2:36 PM, Rajiv Kurian 
>> wrote:
>>
>> > Meant to write a run loop.
>> >
>> > void run() {
>> >   while (running) {
>> > if (simpleConsumer == null) {
>> >   simpleConsumer = new SimpleConsumer(host, port,
>> > (int) kafkaSocketTimeout, kafkaRExeiveBufferSize, clientName);
>> > }
>> > try {
>> >   // Do stuff with simpleConsumer.
>> > } catch (Exception e) {
>> >   logger.error(e);  // Assume UnresolvedAddressException.
>> >   if (consumer != null) {
>> > simpleConsumer.close();
>> > simpleConsumer = null;
>> >   }
>> > }
>> >   }
>> > }
>> >
>> > On Mon, Jan 26, 2015 at 2:27 PM, Rajiv Kurian 
>> > wrote:
>> >
>> > > Here is my typical flow:
>> > > void run() {
>> > >   if (simpleConsumer == null) {
>> > > simpleConsumer = new SimpleConsumer(host, port, (int)
>> > kafkaSocketTimeout,
>> > > kafkaRExeiveBufferSize, clientName);
>> > >   }
>> > >   try {
>> > > // Do stuff with simpleConsumer.
>> > >} catch (Exception e) {
>> > >  if (consumer != null) {
>> > >simpleConsumer.close();
>> > >simpleConsumer = null;
>> > >  }
>> > >   }
>> > > }
>> > >
>> > > If there is a problem with the host name, or some DNS issues, we get
>> an
>> > > UnresolvedAddressException as expected and attempt to close the
>> 

Re: SimpleConsumer leaks sockets on an UnresolvedAddressException

2015-01-27 Thread Rajiv Kurian
I am using 0.8.1. The source is here:
https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/consumer/SimpleConsumer.scala

Here is the definition of disconnect():
private def disconnect() = {
if(blockingChannel.isConnected) {
  debug("Disconnecting from " + host + ":" + port)
  blockingChannel.disconnect()
}
  }
It checks if blockingChannel.isConnected before calling
blockingChannel.disconnect(). I think if there is an
UnresolvedAddressException, the isConnected is never set and the
blockingChannel.disconnect() is never called. But by this point we have
already created a socket and will leak it.

The same problem might be present in the connect method of the
BlockingChannel at
https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/network/BlockingChannel.scala.
Though its own disconnect method seems to check for both the connected:

def disconnect() = lock synchronized {
// My comment: connected will not be set if we get an
UnresolvedAddressException but channel should NOT  be null, so we will
probably still do the right thing.
if(connected || channel != null) {
  // closing the main socket channel *should* close the read channel
  // but let's do it to be sure.
  swallow(channel.close())
  swallow(channel.socket.close())
  if(readChannel != null) swallow(readChannel.close())
  channel = null; readChannel = null; writeChannel = null
  connected = false
}
  }



On Tue, Jan 27, 2015 at 9:03 AM, Guozhang Wang  wrote:

> Rajiv,
>
> Which version of Kafka are you using? I just checked SimpleConsumer's code,
> and in its close() function, disconnect() is called, which will close the
> socket.
>
> Guozhang
>
>
> On Mon, Jan 26, 2015 at 2:36 PM, Rajiv Kurian 
> wrote:
>
> > Meant to write a run loop.
> >
> > void run() {
> >   while (running) {
> > if (simpleConsumer == null) {
> >   simpleConsumer = new SimpleConsumer(host, port,
> > (int) kafkaSocketTimeout, kafkaRExeiveBufferSize, clientName);
> > }
> > try {
> >   // Do stuff with simpleConsumer.
> > } catch (Exception e) {
> >   logger.error(e);  // Assume UnresolvedAddressException.
> >   if (consumer != null) {
> > simpleConsumer.close();
> > simpleConsumer = null;
> >   }
> > }
> >   }
> > }
> >
> > On Mon, Jan 26, 2015 at 2:27 PM, Rajiv Kurian 
> > wrote:
> >
> > > Here is my typical flow:
> > > void run() {
> > >   if (simpleConsumer == null) {
> > > simpleConsumer = new SimpleConsumer(host, port, (int)
> > kafkaSocketTimeout,
> > > kafkaRExeiveBufferSize, clientName);
> > >   }
> > >   try {
> > > // Do stuff with simpleConsumer.
> > >} catch (Exception e) {
> > >  if (consumer != null) {
> > >simpleConsumer.close();
> > >simpleConsumer = null;
> > >  }
> > >   }
> > > }
> > >
> > > If there is a problem with the host name, or some DNS issues, we get an
> > > UnresolvedAddressException as expected and attempt to close the
> > > simpleConsumer. However this does not really get rid of the underlying
> > > socket. So we end up leaking a FD every time this happens. Though this
> is
> > > not a common case I think there needs to be a way on the SimpleConsumer
> > to
> > > get rid of all OS resources that it is holding onto. Right now if this
> > > keeps happening the number of FDs being consumed by the process keeps
> > > increasing till we hit the OS limits. As a user I cannot do anything
> else
> > > but call simpleConsumer.close(). We need to be able to close the
> > underlying
> > > socketChannel/socket when this kind of an error happens.
> > >
> > > To reproduce, one can just run this code but just put in any garbage
> host
> > > name, running lsof -p while running this will show that the open FDs
> > > increases without limit.
> > >
> > > Thanks,
> > > Rajiv
> > >
> > >
> >
>
>
>
> --
> -- Guozhang
>


Consumers closing sockets abruptly?

2015-01-27 Thread Scott Reynolds
On my brokers I am seeing this error log message:

Closing socket for /X because of error (X is the ip address of a consumer)
> 2015-01-27_17:32:58.29890 java.io.IOException: Connection reset by peer
> 2015-01-27_17:32:58.29890   at
> sun.nio.ch.FileChannelImpl.transferTo0(Native Method)
> 2015-01-27_17:32:58.29891   at
> sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:433)
> 2015-01-27_17:32:58.29892   at
> sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:565)
> 2015-01-27_17:32:58.29892   at
> kafka.log.FileMessageSet.writeTo(FileMessageSet.scala:147)
> 2015-01-27_17:32:58.29893   at
> kafka.api.PartitionDataSend.writeTo(FetchResponse.scala:69)
> 2015-01-27_17:32:58.29893   at
> kafka.network.MultiSend.writeTo(Transmission.scala:102)
> 2015-01-27_17:32:58.29894   at
> kafka.api.TopicDataSend.writeTo(FetchResponse.scala:124)
> 2015-01-27_17:32:58.29895   at
> kafka.network.MultiSend.writeTo(Transmission.scala:102)
> 2015-01-27_17:32:58.29895   at
> kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:219)
> 2015-01-27_17:32:58.29896   at
> kafka.network.Processor.write(SocketServer.scala:375)
> 2015-01-27_17:32:58.29896   at
> kafka.network.Processor.run(SocketServer.scala:247)
> 2015-01-27_17:32:58.29897   at java.lang.Thread.run(Thread.java:745)
>

This is because the Processor doesn't handle java.io.IOException and it
falls through to the catch all.

My consumers seem actually really happy. So I don't think there is a real
issue here. But I could use some help figuring out if there is.

We are using the Java consumer like so:

> final ConsumerConnector consumer =
> kafka.consumer.Consumer.createJavaConsumerConnector(config);
> Map topicCountMap = new HashMap();
> topicCountMap.put(topicName, new Integer(1));
> final Map>> consumerMap =
> consumer.createMessageStreams(topicCountMap);
> final KafkaStream stream =
> consumerMap.get(topicName).get(0);
>

and we just iterate over the stream

Questions:
1. What class is the one that makes the network connection to the consumer?
2. Do legit cases exist where the consumer would close its socket
connection ? Zookeeper issues ? Consumer too far behind ?


Re: Can't create a topic; can't delete it either

2015-01-27 Thread Gwen Shapira
Also, do you have delete.topic.enable=true on all brokers?

The automatic topic creation can fail if the default number of
replicas is greater than number of available brokers. Check the
default.replication.factor parameter.

Gwen

On Tue, Jan 27, 2015 at 12:29 AM, Joel Koshy  wrote:
> Which version of the broker are you using?
>
> On Mon, Jan 26, 2015 at 10:27:14PM -0800, Sumit Rangwala wrote:
>> While running kafka in production I found an issue where a topic wasn't
>> getting created even with auto topic enabled. I then went ahead and created
>> the topic manually (from the command line). I then delete the topic, again
>> manually. Now my broker won't allow me to either create *the* topic or
>> delete *the* topic. (other topic creation and deletion is working fine).
>>
>> The topic is in "marked for deletion" stage for more than 3 hours.
>>
>> $ bin/kafka-topics.sh --zookeeper zookeeper1:2181/replication/kafka  --list
>> --topic GRIFFIN-TldAdFormat.csv-1422321736886
>> GRIFFIN-TldAdFormat.csv-1422321736886 - marked for deletion
>>
>> If this is a known issue, is there a workaround?
>>
>> Sumit
>


Re: Errors from ReassignPartitionsCommand

2015-01-27 Thread Guozhang Wang
Allen, which version of Kafka are you using? And if you have multiple
brokers, is there a controller migration happened before?

Guozhang

On Fri, Jan 23, 2015 at 3:56 PM, Allen Wang 
wrote:

> Hello,
>
> We tried the ReassignPartitionsCommand to move partitions to new brokers.
> The execution initially showed message "Successfully started reassignment
> of partitions ...". But when I tried to verify using --verify option, it
> reported some reassignments have failed:
>
> ERROR: Assigned replicas (0,5,2) don't match the list of replicas for
> reassignment (0,5) for partition [vhs_playback_event,1]
> ERROR: Assigned replicas (4,5,0,2) don't match the list of replicas for
> reassignment (4,5) for partition [vhs_playback_event,11]
> ERROR: Assigned replicas (3,5,0,2) don't match the list of replicas for
> reassignment (3,5) for partition [vhs_playback_event,16]
>
> I noticed that the assigned replicas in the error messages include both old
> assignment and new assignment. Is this a real error or just means
> partitions are being copied and current state does not match the final
> expected state?
>
> Since I was confused by the errors, I ran the same
> ReassignPartitionsCommand with the same assignment again but got some
> additional failure messages complaining that three partitions do not exist:
>
> [2015-01-23 18:15:41,333] ERROR Skipping reassignment of partition
> [vhs_playback_event,16] since it doesn't exist
> (kafka.admin.ReassignPartitionsCommand)
> [2015-01-23 18:15:41,455] ERROR Skipping reassignment of partition
> [vhs_playback_event,15] since it doesn't exist
> (kafka.admin.ReassignPartitionsCommand)
> [2015-01-23 18:15:41,499] ERROR Skipping reassignment of partition
> [vhs_playback_event,17] since it doesn't exist
> (kafka.admin.ReassignPartitionsCommand)
>
> These partitions later reappeared from the output of --verify.
>
> The other thing is that at one point the BytesOut from one broker exceeds
> 100Mbytes, which is quite alarming.
>
> In the end, the reassignment was done according to the input file to
> ReassignPartitionsCommand. But the UnderReplicatedPartitions for the
> brokers keeps showing a positive number, even though the output of describe
> topic command and ZooKeeper data show the ISRs are all in sync, and
> Replica-MaxLag is 0.
>
> To sum up, the overall execution is successful but the error messages are
> quite noisy and the metric is not consistent with what appears to be.
>
> Does anyone have the similar experience and is there anything we can do get
> it done smoother? What can we do to reset the inconsistent
> UnderReplicatedPartitions metric?
>
> Thanks,
> Allen
>



-- 
-- Guozhang


Re: Missing Per-Topic BrokerTopicMetrics in v0.8.2.0

2015-01-27 Thread Jason Rosenberg
I added a comment to the ticket.  I think it will work getting data
disambiguated (didn't actually test end to end to graphite).
However, the naming scheme is not ideal for how metric ui's typically would
present the metric tree (e.g. jmx tag syntax doesn't really translate).

Jason

On Tue, Jan 27, 2015 at 11:19 AM, Jun Rao  wrote:

> Jason, Kyle,
>
> I created an 0.8.2 blocker
> https://issues.apache.org/jira/browse/KAFKA-1902
> and attached a patch there. Could you test it out and see if it fixes the
> issue with the reporter? The patch adds tags as scope in MetricName.
>
> Thanks,
>
> Jun
>
> On Tue, Jan 27, 2015 at 7:39 AM, Jun Rao  wrote:
>
> > Jason,
> >
> > So, this sounds like a real issue. Perhaps we can fix it just by setting
> > the tag name as the scope. For example, for mbean kafka.server:type=
> > BrokerTopicMetrics,name=BytesInPerSec,topic=test, we can have
> >
> > group: "kafka.server"
> > type: "BrokerTopicMetrics"
> > name: "BytesInPerSec"
> > scope: "topic=test"
> >
> > Do you know if scope can have characters like "=" and "," (e.g., for
> scope
> > like "topic=test,partition=1")?
> >
> > The issue with using mytopic-BytesInPerSec as the name is what we are
> > trying to fix in kafka-1481. Topic name (and clientId, etc) can have dash
> > in it and it's hard to parse.
> >
> > Thanks,
> >
> > Jun
> >
> >
> >
> > On Tue, Jan 27, 2015 at 6:30 AM, Jason Rosenberg 
> wrote:
> >
> >> Remember multiple people have reported this issue. Per topic metrics no
> >> longer appear in graphite (or in any system modeled after the yammer
> >> GraphiteReporter). They are not being seen as unique.
> >>
> >> While these metrics are registered in the registry as separate
> >> ‘MetricName’
> >> instances (varying only by mbeanName), the GraphiteReporter sends the
> >> metrics to graphite using only the 4 fields I describe above.
> >> Consequently,
> >> multiple metrics in the registry get sent to graphite under the same
> name.
> >> Thus these metrics all end up in the same bucket in graphite, trampling
> >> over each other making them useless. They aren’t ‘double counted’ so
> much
> >> as flapping between multiple independent values.
> >>
> >> We actually have our own Reporter class (based off the yammer
> >> GraphiteReporter). Our version sends metrics through kafka which is then
> >> consumed downstream by multiple metric consumers.
> >>
> >> The ConsoleReporter isn’t useful for actually persisting metrics
> anywhere.
> >> It’s just iterating through all the (identically named metrics in the
> >> registry (save for the different mbeanNames))….
> >>
> >> The mbeanName, as constructed, is not useful as a human readable metric
> >> name, to be presented in a browsable tree of metrics, etc. The
> >> ‘group’:’type’:’name’:’scope’ are the pieces that matter.
> >>
> >> The fix here is to produce MetricName instances similar to 0.8.1.1, etc.
> >> In
> >> this case, it should probably be something like:
> >>
> >> group: kafka.server
> >> type: BrokerTopicMetrics
> >> name: mytopic-BytesInPerSec
> >> group: 
> >>
> >> Jason
> >> ​
> >>
> >> On Tue, Jan 27, 2015 at 7:26 AM, Manikumar Reddy 
> >> wrote:
> >>
> >> > I have enabled yammer's ConsoleReporter and I am getting all the
> metrics
> >> > (including per-topic metrics).
> >> >
> >> > Yammer's MetricName object implements equals/hashcode methods using
> >> > mBeanName . We are constructing a unique mBeanName for each metric, So
> >> we
> >> > are not missing/overwriting any metrics.
> >> >
> >> > Current confusion is due to  MetricName.name(). This will be same
> >> > (BytesInPerSec) for both broker level and topic level metrics. We need
> >> to
> >> > use MetricName.getMBeanName() to differentiate between broker level
> and
> >> > topic level metrics.
> >> >
> >> > 0.8.1  MBeanName:
> >> > "kafka.server":type="BrokerTopicMetrics",name="AllTopicsBytesInPerSec"
> >> > "kafka.server":type="BrokerTopicMetrics",name="MYTOPIC-BytesInPerSec"
> >> >
> >> > 0.8.2  MBeanName:
> >> > kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec
> >> > kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=MYTOPIC
> >> >
> >> >
> >> > ConsoleReporter's O/P:
> >> >
> >> >   BytesInPerSec:  <- This is broker level
> >> >  count = 1521
> >> >  mean rate = 3.63 bytes/s
> >> >  1-minute rate = 0.35 bytes/s
> >> >  5-minute rate = 2.07 bytes/s
> >> > 15-minute rate = 1.25 bytes/s
> >> >
> >> >   BytesInPerSec:  <- This is for topic1
> >> >  count = 626
> >> >  mean rate = 1.89 bytes/s
> >> >  1-minute rate = 0.42 bytes/s
> >> >  5-minute rate = 31.53 bytes/s
> >> > 15-minute rate = 64.66 bytes/s
> >> >
> >> >   BytesInPerSec:  <- This is for topic2
> >> >  count = 895
> >> >  mean rate = 3.62 bytes/s
> >> >  1-minute rate = 1.39 bytes/s
> >> >  5-minute rate = 30.08 bytes/s
> >> > 15-minute rate = 50.27 bytes/s
> >> >
> >> > Manikumar
> >> >
> >> > On Tue, Jan 27, 2015 at 1:59 PM, Ja

Re: Missing Per-Topic BrokerTopicMetrics in v0.8.2.0

2015-01-27 Thread Kyle Banker
Thanks for the quick response, Jun (and many thanks to Jason for confirming
and further investigating the issue). I've tested the patch, and it does
fix the fundamental issue, but I do have a few comments that I'll leave on
the ticket.

On Tue, Jan 27, 2015 at 9:19 AM, Jun Rao  wrote:

> Jason, Kyle,
>
> I created an 0.8.2 blocker
> https://issues.apache.org/jira/browse/KAFKA-1902
> and attached a patch there. Could you test it out and see if it fixes the
> issue with the reporter? The patch adds tags as scope in MetricName.
>
> Thanks,
>
> Jun
>
> On Tue, Jan 27, 2015 at 7:39 AM, Jun Rao  wrote:
>
> > Jason,
> >
> > So, this sounds like a real issue. Perhaps we can fix it just by setting
> > the tag name as the scope. For example, for mbean kafka.server:type=
> > BrokerTopicMetrics,name=BytesInPerSec,topic=test, we can have
> >
> > group: "kafka.server"
> > type: "BrokerTopicMetrics"
> > name: "BytesInPerSec"
> > scope: "topic=test"
> >
> > Do you know if scope can have characters like "=" and "," (e.g., for
> scope
> > like "topic=test,partition=1")?
> >
> > The issue with using mytopic-BytesInPerSec as the name is what we are
> > trying to fix in kafka-1481. Topic name (and clientId, etc) can have dash
> > in it and it's hard to parse.
> >
> > Thanks,
> >
> > Jun
> >
> >
> >
> > On Tue, Jan 27, 2015 at 6:30 AM, Jason Rosenberg 
> wrote:
> >
> >> Remember multiple people have reported this issue. Per topic metrics no
> >> longer appear in graphite (or in any system modeled after the yammer
> >> GraphiteReporter). They are not being seen as unique.
> >>
> >> While these metrics are registered in the registry as separate
> >> ‘MetricName’
> >> instances (varying only by mbeanName), the GraphiteReporter sends the
> >> metrics to graphite using only the 4 fields I describe above.
> >> Consequently,
> >> multiple metrics in the registry get sent to graphite under the same
> name.
> >> Thus these metrics all end up in the same bucket in graphite, trampling
> >> over each other making them useless. They aren’t ‘double counted’ so
> much
> >> as flapping between multiple independent values.
> >>
> >> We actually have our own Reporter class (based off the yammer
> >> GraphiteReporter). Our version sends metrics through kafka which is then
> >> consumed downstream by multiple metric consumers.
> >>
> >> The ConsoleReporter isn’t useful for actually persisting metrics
> anywhere.
> >> It’s just iterating through all the (identically named metrics in the
> >> registry (save for the different mbeanNames))….
> >>
> >> The mbeanName, as constructed, is not useful as a human readable metric
> >> name, to be presented in a browsable tree of metrics, etc. The
> >> ‘group’:’type’:’name’:’scope’ are the pieces that matter.
> >>
> >> The fix here is to produce MetricName instances similar to 0.8.1.1, etc.
> >> In
> >> this case, it should probably be something like:
> >>
> >> group: kafka.server
> >> type: BrokerTopicMetrics
> >> name: mytopic-BytesInPerSec
> >> group: 
> >>
> >> Jason
> >> ​
> >>
> >> On Tue, Jan 27, 2015 at 7:26 AM, Manikumar Reddy 
> >> wrote:
> >>
> >> > I have enabled yammer's ConsoleReporter and I am getting all the
> metrics
> >> > (including per-topic metrics).
> >> >
> >> > Yammer's MetricName object implements equals/hashcode methods using
> >> > mBeanName . We are constructing a unique mBeanName for each metric, So
> >> we
> >> > are not missing/overwriting any metrics.
> >> >
> >> > Current confusion is due to  MetricName.name(). This will be same
> >> > (BytesInPerSec) for both broker level and topic level metrics. We need
> >> to
> >> > use MetricName.getMBeanName() to differentiate between broker level
> and
> >> > topic level metrics.
> >> >
> >> > 0.8.1  MBeanName:
> >> > "kafka.server":type="BrokerTopicMetrics",name="AllTopicsBytesInPerSec"
> >> > "kafka.server":type="BrokerTopicMetrics",name="MYTOPIC-BytesInPerSec"
> >> >
> >> > 0.8.2  MBeanName:
> >> > kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec
> >> > kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=MYTOPIC
> >> >
> >> >
> >> > ConsoleReporter's O/P:
> >> >
> >> >   BytesInPerSec:  <- This is broker level
> >> >  count = 1521
> >> >  mean rate = 3.63 bytes/s
> >> >  1-minute rate = 0.35 bytes/s
> >> >  5-minute rate = 2.07 bytes/s
> >> > 15-minute rate = 1.25 bytes/s
> >> >
> >> >   BytesInPerSec:  <- This is for topic1
> >> >  count = 626
> >> >  mean rate = 1.89 bytes/s
> >> >  1-minute rate = 0.42 bytes/s
> >> >  5-minute rate = 31.53 bytes/s
> >> > 15-minute rate = 64.66 bytes/s
> >> >
> >> >   BytesInPerSec:  <- This is for topic2
> >> >  count = 895
> >> >  mean rate = 3.62 bytes/s
> >> >  1-minute rate = 1.39 bytes/s
> >> >  5-minute rate = 30.08 bytes/s
> >> > 15-minute rate = 50.27 bytes/s
> >> >
> >> > Manikumar
> >> >
> >> > On Tue, Jan 27, 2015 at 1:59 PM, Jason Rosenberg 
> >> wrote:
> >> >
> >> > > Ok,
> 

Re: Kafka under replicated partitions..

2015-01-27 Thread Guozhang Wang
Hi Nitin,

a. The follower replica will be kicked out of the ISR (i.e. causing the
partition to be under-replicated) when 1) it has lagged much behind the
leader replica in terms of number of messages (controlled by config
replica.lag.max.messages), or 2) it has not fetched from leader for some
long time (controlled by config replica.socket.timeout.ms). Case 1) can
usually be triggered when the producer's throughput to the leader replica
is too high for the follower to keep up, or if one batch of producer's
messages contains too many messages than the lag threshold. Case 2) can be
triggered when the follower is faulty (long GC, etc).

You can find the definitions of these configs here:

http://kafka.apache.org/documentation.html#brokerconfigs

b. For case 2) you usually need to restart the faulty follower, for case 1)
once the producer throughput dropped to normal it should be able to catch
up later. Also to prevent it happening you can tune these two configs a bit.

c. You can use the topic command to list all under-replicated partitions in
real time:

http://kafka.apache.org/documentation.html#basic_ops_add_topic (use
bin/kafka-topics.sh --list)


Guozhang


On Mon, Jan 26, 2015 at 12:40 PM, nitin sharma 
wrote:

> Hi All,
>
> I would like to know the factors that can cause followers to fall behind
> and under-replicated replica getting created?
>
> In my production system , last Friday one of the partitions fell behind and
> before i could check my operation team had restarted the follower broker...
> Restart did fixed the issue but i would like to know
>
> a. the factors that can cause under replicated partitions
> b. how to fix that issue? is restart an option?
> c. apart from M-Bean any other way to get to know under-replicated
> partition.?
>
>
> Regards,
> Nitin Kumar Sharma.
>



-- 
-- Guozhang


Re: Kafka sending messages with zero copy

2015-01-27 Thread Guozhang Wang
Thanks Rajiv, looking forward to your prototype.

Guozhang

On Mon, Jan 26, 2015 at 2:30 PM, Rajiv Kurian  wrote:

> Hi Guozhang,
>
> I am a bit busy at work. When I get the change I'll definitely try to get a
> proof of concept going. Not the kafka protocol, but just the buffering and
> threading structures, maybe just write to another socket. I think it would
> be useful just to get the queueing and buffer management going and prove
> that it can be done in a zero copy way in a multi producer single consumer
> environment. If that is working, then the single consumer can be the kafka
> network sync thread.
>
> Thanks,
> Rajiv
>
> On Fri, Jan 16, 2015 at 11:58 AM, Guozhang Wang 
> wrote:
>
> > Hi Rajiv,
> >
> > Thanks for this proposal, it would be great if you can upload some
> > implementation patch for the CAS idea and show some memory usage / perf
> > differences.
> >
> > Guozhang
> >
> > On Sun, Dec 14, 2014 at 9:27 PM, Rajiv Kurian 
> > wrote:
> >
> > > Resuscitating this thread. I've done some more experiments and
> profiling.
> > > My messages are very tiny (currently 25 bytes) per message and creating
> > > multiple objects per message leads to a lot of churn. The memory churn
> > > through creation of convenience objects is more than the memory being
> > used
> > > by my objects right now. I could probably batch my messages further, to
> > > make this effect less pronounced.​ I did some rather unscientific
> > > experiments with a flyweight approach on top of the ByteBuffer for a
> > simple
> > > messaging API (peer to peer NIO based so not a real comparison) and the
> > > numbers were very satisfactory and there is no garbage created in
> steady
> > > state at all. Though I don't expect such good numbers from actually
> going
> > > through the broker + all the other extra stuff that a real producer
> would
> > > do, I think there is great potential here.
> > >
> > > The general mechanism for me is this:
> > > i) A buffer (I used Unsafe but I imagine ByteBuffer having similar
> > > performance) is created per partition.
> > > ii) A CAS loop (in Java 7 and less) or even better
> unsafe.getAndAddInt()
> > in
> > > Java 8 can be used to claim a chunk of bytes on the per topic buffer.
> > This
> > > code can be invoked from multiple threads in a wait free manner
> > (wait-free
> > > in Java 8, since getAndAddInt() is wait-free).  Once a region in the
> > buffer
> > > is claimed, it can be operated on using the flyweight method that we
> > talked
> > > about. If the buffer doesn't have enough space then we can drop the
> > message
> > > or move onto a new buffer. Further this creates absolutely zero objects
> > in
> > > steady state (only a few objects created in the beginning). Even if the
> > > flyweight method is not desired, the API can just take byte arrays or
> > > objects that need to be serialized and copy them onto the per topic
> > buffers
> > > in a similar way. This API has been validated in Aeron too, so I am
> > pretty
> > > confident that it will work well. For the zero copy technique here is a
> > > link to Aeron API with zero copy -
> > > https://github.com/real-logic/Aeron/issues/18. The regular one copies
> > byte
> > > arrays but without any object creation.
> > > iii) The producer send thread can then just go in FIFO order through
> the
> > > buffer sending messages that have been committed using NIO to rotate
> > > between brokers. We might need a background thread to zero out used
> > buffers
> > > too.
> > >
> > > I've left out some details, but again none of this very revolutionary -
> > > it's mostly the same techniques used in Aeron. I really think that we
> can
> > > keep the API ga rbage free and wait-free (even in the multi producer
> > case)
> > > without compromising how pretty it looks - the total zero copy API will
> > low
> > > level, but it should only be used by advanced users. Moreover the usual
> > > producer.send(msg, topic, partition) can use the efficient ByteBuffer
> > > offset API internally without it itself creating any garbage. With the
> > > technique I talked about there is no need for an intermediate queue of
> > any
> > > kind since the underlying ByteBuffer per partition acts as the queue.
> > >
> > > I can do more experiments with some real producer code instead of my
> toy
> > > code to further validate the idea, but I am pretty sure that both
> > > throughput and jitter characteristics will improve thanks to lower
> > > contention (wait-free in java 8 with a single getAndAddInt() operation
> > for
> > > sync ) and better cache locality (C like buffers and a few constant
> > number
> > > of objects per partition). If you guys are interested, I'd love to talk
> > > more. Again just to reiterate, I don't think the API will suffer at
> all -
> > > most of this can be done under the covers. Additionally it will open up
> > > things so that a low level zero copy API is possible.
> > >
> > > Thanks,
> > > Rajiv
> > >
> >
> >
> >
> > --
> > -- Guozhang
>

Re: SimpleConsumer leaks sockets on an UnresolvedAddressException

2015-01-27 Thread Guozhang Wang
Rajiv,

Which version of Kafka are you using? I just checked SimpleConsumer's code,
and in its close() function, disconnect() is called, which will close the
socket.

Guozhang


On Mon, Jan 26, 2015 at 2:36 PM, Rajiv Kurian  wrote:

> Meant to write a run loop.
>
> void run() {
>   while (running) {
> if (simpleConsumer == null) {
>   simpleConsumer = new SimpleConsumer(host, port,
> (int) kafkaSocketTimeout, kafkaRExeiveBufferSize, clientName);
> }
> try {
>   // Do stuff with simpleConsumer.
> } catch (Exception e) {
>   logger.error(e);  // Assume UnresolvedAddressException.
>   if (consumer != null) {
> simpleConsumer.close();
> simpleConsumer = null;
>   }
> }
>   }
> }
>
> On Mon, Jan 26, 2015 at 2:27 PM, Rajiv Kurian 
> wrote:
>
> > Here is my typical flow:
> > void run() {
> >   if (simpleConsumer == null) {
> > simpleConsumer = new SimpleConsumer(host, port, (int)
> kafkaSocketTimeout,
> > kafkaRExeiveBufferSize, clientName);
> >   }
> >   try {
> > // Do stuff with simpleConsumer.
> >} catch (Exception e) {
> >  if (consumer != null) {
> >simpleConsumer.close();
> >simpleConsumer = null;
> >  }
> >   }
> > }
> >
> > If there is a problem with the host name, or some DNS issues, we get an
> > UnresolvedAddressException as expected and attempt to close the
> > simpleConsumer. However this does not really get rid of the underlying
> > socket. So we end up leaking a FD every time this happens. Though this is
> > not a common case I think there needs to be a way on the SimpleConsumer
> to
> > get rid of all OS resources that it is holding onto. Right now if this
> > keeps happening the number of FDs being consumed by the process keeps
> > increasing till we hit the OS limits. As a user I cannot do anything else
> > but call simpleConsumer.close(). We need to be able to close the
> underlying
> > socketChannel/socket when this kind of an error happens.
> >
> > To reproduce, one can just run this code but just put in any garbage host
> > name, running lsof -p while running this will show that the open FDs
> > increases without limit.
> >
> > Thanks,
> > Rajiv
> >
> >
>



-- 
-- Guozhang


Re: Kafka consumer connection error

2015-01-27 Thread Guozhang Wang
Mahesh,

Could you reformat your attached logs?

Guozhang

On Mon, Jan 26, 2015 at 8:08 PM, Mahesh Kumar 
wrote:

> Hi all,   I am currently working on logstash to mongodb.logstash's
> input (producer) is kafka and output (consumer) is mongodb.It is worked
> fine for past two days.Today it is not working and i got the following
> error.i googled this error,still didnt find any correct solution.kindly
> help me to resolve.
> [2015-01-24 16:16:13,196] INFO [ReplicaFetcherManager on broker 0] Removed
> fetcher for partitions [kafka3,0],[kafka3,1]
> (kafka.server.ReplicaFetcherManager)[2015-01-24 16:16:17,443] ERROR Closing
> socket for /10.10.10.25 because of error
> (kafka.network.Processor)java.io.IOException: Connection reset by peerat
> sun.nio.ch.FileDispatcherImpl.read0(Native Method)at
> sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)at
> sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)at
> sun.nio.ch.IOUtil.read(IOUtil.java:197)at
> sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)at
> kafka.utils.Utils$.read(Utils.scala:375)at
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)at
> kafka.network.Processor.read(SocketServer.scala:347)at
> kafka.network.Processor.run(SocketServer.scala:245)at
> java.lang.Thread.run(Thread.java:744)[2015-01-24 16:16:55,956] INFO Closing
> socket connection to /10.10.10.25. (kafka.network.Processor)[2015-01-24
> 16:19:49,285] ERROR Closing socket for /10.10.10.25 because of error
> (kafka.network.Processor)java.io.IOException: Connection reset by peerat
> sun.nio.ch.FileDispatcherImpl.read0(Native Method)at
> sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)at
> sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)at
> sun.nio.ch.IOUtil.read(IOUtil.java:197)at
> sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)at
> kafka.utils.Utils$.read(Utils.scala:375)at
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)at
> kafka.network.Processor.read(SocketServer.scala:347)at
> kafka.network.Processor.run(SocketServer.scala:245)at
> java.lang.Thread.run(Thread.java:744)[2015-01-24 16:20:18,100] INFO Closing
> socket connection to /10.10.10.25. (kafka.network.Processor)[2015-01-24
> 16:21:30,156] ERROR Closing socket for /10.10.10.25 because of error
> (kafka.network.Processor)java.io.IOException: Connection reset by peerat
> sun.nio.ch.FileDispatcherImpl.read0(Native Method)at
> sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)at
> sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)at
> sun.nio.ch.IOUtil.read(IOUtil.java:197)at
> sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)at
> kafka.utils.Utils$.read(Utils.scala:375)at
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)at
> kafka.network.Processor.read(SocketServer.scala:347)at
> kafka.network.Processor.run(SocketServer.scala:245)at
> java.lang.Thread.run(Thread.java:744)[2015-01-24 17:49:56,224] INFO Closing
> socket connection to /10.10.10.25. (kafka.network.Processor)
> ThanksMahesh.S




-- 
-- Guozhang


Re: CommitOffsets for Simple consumer is returning EOF Exception

2015-01-27 Thread Guozhang Wang
Hi Ajeet,

Which version of Kafka are you using? I remember the OffsetCommitRequest's
requestInfo should be a map of topicPartition -> OffsetAndMetadata, not
OffsetMetadataAndError.

Guozhang

On Tue, Jan 27, 2015 at 3:31 AM, ajeet singh 
wrote:

> Hi ,
>
> I am new to Kafka, I have a use case in which My Consumer can't use auto
> commit offset feature, I have to go with option of manual Commit. With High
> level Consumer I have have constrain that consumer can commit only current
> offset, but in my case I will be committing some previous off-set value.
>
> So only possible solution seems like to use Simple Consumer.  This is how I
> am using Simple Consumer for Commit offset :
>
> TopicAndPartition topicAndPartition = new
> TopicAndPartition(topic,partition);
> Map requestInfo = new
> HashMap();
> requestInfo.put(topicAndPartition, new
> OffsetMetadataAndError(0L,"no_metadata", (short) 0));
> kafka.javaapi.OffsetCommitRequest request = new
> kafka.javaapi.OffsetCommitRequest("test",
> requestInfo1,kafka.api.OffsetRequest.CurrentVersion(), 0, clientName);
> kafka.javaapi.OffsetCommitResponse response =
> consumer.commitOffsets(request);
>
>
> I am getting EOFException
>
> Oops:java.io.EOFException: Received -1 when reading from channel, socket
> has likely been closed.
> java.io.EOFException: Received -1 when reading from channel, socket has
> likely been closed.
> at kafka.utils.Utils$.read(Utils.scala:376)
> at
>
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> at
>
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
> at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
> at
>
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> at kafka.consumer.SimpleConsumer.fetchOffsets(SimpleConsumer.scala:138)
> at
> kafka.javaapi.consumer.SimpleConsumer.fetchOffsets(SimpleConsumer.scala:99)
> at
>
> com.vizury.rtb.realtimelogging.OfflineLogConsumer.commitOffsetTest(OfflineLogConsumer.java:205)
> at
>
> com.vizury.rtb.realtimelogging.OfflineLogConsumer.run(OfflineLogConsumer.java:147)
> at
>
> com.vizury.rtb.realtimelogging.OfflineLogConsumer.main(OfflineLogConsumer.java:31)
>
>
> Any help ?? same error I am getting with fetchOffsets() method, where as
>  getOffsetsBefore() is working fine.
>



-- 
-- Guozhang


Re: Missing Per-Topic BrokerTopicMetrics in v0.8.2.0

2015-01-27 Thread Jun Rao
Jason, Kyle,

I created an 0.8.2 blocker https://issues.apache.org/jira/browse/KAFKA-1902
and attached a patch there. Could you test it out and see if it fixes the
issue with the reporter? The patch adds tags as scope in MetricName.

Thanks,

Jun

On Tue, Jan 27, 2015 at 7:39 AM, Jun Rao  wrote:

> Jason,
>
> So, this sounds like a real issue. Perhaps we can fix it just by setting
> the tag name as the scope. For example, for mbean kafka.server:type=
> BrokerTopicMetrics,name=BytesInPerSec,topic=test, we can have
>
> group: "kafka.server"
> type: "BrokerTopicMetrics"
> name: "BytesInPerSec"
> scope: "topic=test"
>
> Do you know if scope can have characters like "=" and "," (e.g., for scope
> like "topic=test,partition=1")?
>
> The issue with using mytopic-BytesInPerSec as the name is what we are
> trying to fix in kafka-1481. Topic name (and clientId, etc) can have dash
> in it and it's hard to parse.
>
> Thanks,
>
> Jun
>
>
>
> On Tue, Jan 27, 2015 at 6:30 AM, Jason Rosenberg  wrote:
>
>> Remember multiple people have reported this issue. Per topic metrics no
>> longer appear in graphite (or in any system modeled after the yammer
>> GraphiteReporter). They are not being seen as unique.
>>
>> While these metrics are registered in the registry as separate
>> ‘MetricName’
>> instances (varying only by mbeanName), the GraphiteReporter sends the
>> metrics to graphite using only the 4 fields I describe above.
>> Consequently,
>> multiple metrics in the registry get sent to graphite under the same name.
>> Thus these metrics all end up in the same bucket in graphite, trampling
>> over each other making them useless. They aren’t ‘double counted’ so much
>> as flapping between multiple independent values.
>>
>> We actually have our own Reporter class (based off the yammer
>> GraphiteReporter). Our version sends metrics through kafka which is then
>> consumed downstream by multiple metric consumers.
>>
>> The ConsoleReporter isn’t useful for actually persisting metrics anywhere.
>> It’s just iterating through all the (identically named metrics in the
>> registry (save for the different mbeanNames))….
>>
>> The mbeanName, as constructed, is not useful as a human readable metric
>> name, to be presented in a browsable tree of metrics, etc. The
>> ‘group’:’type’:’name’:’scope’ are the pieces that matter.
>>
>> The fix here is to produce MetricName instances similar to 0.8.1.1, etc.
>> In
>> this case, it should probably be something like:
>>
>> group: kafka.server
>> type: BrokerTopicMetrics
>> name: mytopic-BytesInPerSec
>> group: 
>>
>> Jason
>> ​
>>
>> On Tue, Jan 27, 2015 at 7:26 AM, Manikumar Reddy 
>> wrote:
>>
>> > I have enabled yammer's ConsoleReporter and I am getting all the metrics
>> > (including per-topic metrics).
>> >
>> > Yammer's MetricName object implements equals/hashcode methods using
>> > mBeanName . We are constructing a unique mBeanName for each metric, So
>> we
>> > are not missing/overwriting any metrics.
>> >
>> > Current confusion is due to  MetricName.name(). This will be same
>> > (BytesInPerSec) for both broker level and topic level metrics. We need
>> to
>> > use MetricName.getMBeanName() to differentiate between broker level and
>> > topic level metrics.
>> >
>> > 0.8.1  MBeanName:
>> > "kafka.server":type="BrokerTopicMetrics",name="AllTopicsBytesInPerSec"
>> > "kafka.server":type="BrokerTopicMetrics",name="MYTOPIC-BytesInPerSec"
>> >
>> > 0.8.2  MBeanName:
>> > kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec
>> > kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=MYTOPIC
>> >
>> >
>> > ConsoleReporter's O/P:
>> >
>> >   BytesInPerSec:  <- This is broker level
>> >  count = 1521
>> >  mean rate = 3.63 bytes/s
>> >  1-minute rate = 0.35 bytes/s
>> >  5-minute rate = 2.07 bytes/s
>> > 15-minute rate = 1.25 bytes/s
>> >
>> >   BytesInPerSec:  <- This is for topic1
>> >  count = 626
>> >  mean rate = 1.89 bytes/s
>> >  1-minute rate = 0.42 bytes/s
>> >  5-minute rate = 31.53 bytes/s
>> > 15-minute rate = 64.66 bytes/s
>> >
>> >   BytesInPerSec:  <- This is for topic2
>> >  count = 895
>> >  mean rate = 3.62 bytes/s
>> >  1-minute rate = 1.39 bytes/s
>> >  5-minute rate = 30.08 bytes/s
>> > 15-minute rate = 50.27 bytes/s
>> >
>> > Manikumar
>> >
>> > On Tue, Jan 27, 2015 at 1:59 PM, Jason Rosenberg 
>> wrote:
>> >
>> > > Ok,
>> > >
>> > > It looks like the yammer MetricName is not being created correctly for
>> > the
>> > > sub metrics that include a topic. E.g. a metric with an mbeanName
>> like:
>> > >
>> > >
>> >
>> kafka.server:type=BrokerTopicMetrics,name=BytesRejectedPerSec,topic=mytopic
>> > >
>> > > appears to be malformed. A yammer MetricName has 4 fields that are
>> used
>> > in
>> > > creating a graphite metric, that are included in the constructor:
>> > > group, type, name, scope.
>> > >
>> > > In this case, the metric with the above mbeanName has these fields
>> s

Re: Missing Per-Topic BrokerTopicMetrics in v0.8.2.0

2015-01-27 Thread Jun Rao
Jason,

So, this sounds like a real issue. Perhaps we can fix it just by setting
the tag name as the scope. For example, for mbean kafka.server:type=
BrokerTopicMetrics,name=BytesInPerSec,topic=test, we can have

group: "kafka.server"
type: "BrokerTopicMetrics"
name: "BytesInPerSec"
scope: "topic=test"

Do you know if scope can have characters like "=" and "," (e.g., for scope
like "topic=test,partition=1")?

The issue with using mytopic-BytesInPerSec as the name is what we are
trying to fix in kafka-1481. Topic name (and clientId, etc) can have dash
in it and it's hard to parse.

Thanks,

Jun



On Tue, Jan 27, 2015 at 6:30 AM, Jason Rosenberg  wrote:

> Remember multiple people have reported this issue. Per topic metrics no
> longer appear in graphite (or in any system modeled after the yammer
> GraphiteReporter). They are not being seen as unique.
>
> While these metrics are registered in the registry as separate ‘MetricName’
> instances (varying only by mbeanName), the GraphiteReporter sends the
> metrics to graphite using only the 4 fields I describe above. Consequently,
> multiple metrics in the registry get sent to graphite under the same name.
> Thus these metrics all end up in the same bucket in graphite, trampling
> over each other making them useless. They aren’t ‘double counted’ so much
> as flapping between multiple independent values.
>
> We actually have our own Reporter class (based off the yammer
> GraphiteReporter). Our version sends metrics through kafka which is then
> consumed downstream by multiple metric consumers.
>
> The ConsoleReporter isn’t useful for actually persisting metrics anywhere.
> It’s just iterating through all the (identically named metrics in the
> registry (save for the different mbeanNames))….
>
> The mbeanName, as constructed, is not useful as a human readable metric
> name, to be presented in a browsable tree of metrics, etc. The
> ‘group’:’type’:’name’:’scope’ are the pieces that matter.
>
> The fix here is to produce MetricName instances similar to 0.8.1.1, etc. In
> this case, it should probably be something like:
>
> group: kafka.server
> type: BrokerTopicMetrics
> name: mytopic-BytesInPerSec
> group: 
>
> Jason
> ​
>
> On Tue, Jan 27, 2015 at 7:26 AM, Manikumar Reddy 
> wrote:
>
> > I have enabled yammer's ConsoleReporter and I am getting all the metrics
> > (including per-topic metrics).
> >
> > Yammer's MetricName object implements equals/hashcode methods using
> > mBeanName . We are constructing a unique mBeanName for each metric, So we
> > are not missing/overwriting any metrics.
> >
> > Current confusion is due to  MetricName.name(). This will be same
> > (BytesInPerSec) for both broker level and topic level metrics. We need to
> > use MetricName.getMBeanName() to differentiate between broker level and
> > topic level metrics.
> >
> > 0.8.1  MBeanName:
> > "kafka.server":type="BrokerTopicMetrics",name="AllTopicsBytesInPerSec"
> > "kafka.server":type="BrokerTopicMetrics",name="MYTOPIC-BytesInPerSec"
> >
> > 0.8.2  MBeanName:
> > kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec
> > kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=MYTOPIC
> >
> >
> > ConsoleReporter's O/P:
> >
> >   BytesInPerSec:  <- This is broker level
> >  count = 1521
> >  mean rate = 3.63 bytes/s
> >  1-minute rate = 0.35 bytes/s
> >  5-minute rate = 2.07 bytes/s
> > 15-minute rate = 1.25 bytes/s
> >
> >   BytesInPerSec:  <- This is for topic1
> >  count = 626
> >  mean rate = 1.89 bytes/s
> >  1-minute rate = 0.42 bytes/s
> >  5-minute rate = 31.53 bytes/s
> > 15-minute rate = 64.66 bytes/s
> >
> >   BytesInPerSec:  <- This is for topic2
> >  count = 895
> >  mean rate = 3.62 bytes/s
> >  1-minute rate = 1.39 bytes/s
> >  5-minute rate = 30.08 bytes/s
> > 15-minute rate = 50.27 bytes/s
> >
> > Manikumar
> >
> > On Tue, Jan 27, 2015 at 1:59 PM, Jason Rosenberg 
> wrote:
> >
> > > Ok,
> > >
> > > It looks like the yammer MetricName is not being created correctly for
> > the
> > > sub metrics that include a topic. E.g. a metric with an mbeanName like:
> > >
> > >
> >
> kafka.server:type=BrokerTopicMetrics,name=BytesRejectedPerSec,topic=mytopic
> > >
> > > appears to be malformed. A yammer MetricName has 4 fields that are used
> > in
> > > creating a graphite metric, that are included in the constructor:
> > > group, type, name, scope.
> > >
> > > In this case, the metric with the above mbeanName has these fields set
> in
> > > the MetricName:
> > >
> > > group: "kafka.server"
> > > type: "BrokerTopicMetrics"
> > > name: "BytesInPerSec"
> > > scope: null
> > >
> > > Thus, the topic metrics all look the same, and get lumped into the
> > > top-level BrokerTopicMetrics (and thus that will now be double
> counted).
> > It
> > > looks like the fix for kafka-1481 was where things got broken. It seems
> > to
> > > have introduced ‘tags’ in the building of metric names, and then t

CommitOffsets for Simple consumer is returning EOF Exception

2015-01-27 Thread ajeet singh
Hi ,

I am new to Kafka, I have a use case in which My Consumer can't use auto
commit offset feature, I have to go with option of manual Commit. With High
level Consumer I have have constrain that consumer can commit only current
offset, but in my case I will be committing some previous off-set value.

So only possible solution seems like to use Simple Consumer.  This is how I
am using Simple Consumer for Commit offset :

TopicAndPartition topicAndPartition = new
TopicAndPartition(topic,partition);
Map requestInfo = new
HashMap();
requestInfo.put(topicAndPartition, new
OffsetMetadataAndError(0L,"no_metadata", (short) 0));
kafka.javaapi.OffsetCommitRequest request = new
kafka.javaapi.OffsetCommitRequest("test",
requestInfo1,kafka.api.OffsetRequest.CurrentVersion(), 0, clientName);
kafka.javaapi.OffsetCommitResponse response =
consumer.commitOffsets(request);


I am getting EOFException

Oops:java.io.EOFException: Received -1 when reading from channel, socket
has likely been closed.
java.io.EOFException: Received -1 when reading from channel, socket has
likely been closed.
at kafka.utils.Utils$.read(Utils.scala:376)
at
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
at
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
at kafka.consumer.SimpleConsumer.fetchOffsets(SimpleConsumer.scala:138)
at
kafka.javaapi.consumer.SimpleConsumer.fetchOffsets(SimpleConsumer.scala:99)
at
com.vizury.rtb.realtimelogging.OfflineLogConsumer.commitOffsetTest(OfflineLogConsumer.java:205)
at
com.vizury.rtb.realtimelogging.OfflineLogConsumer.run(OfflineLogConsumer.java:147)
at
com.vizury.rtb.realtimelogging.OfflineLogConsumer.main(OfflineLogConsumer.java:31)


Any help ?? same error I am getting with fetchOffsets() method, where as
 getOffsetsBefore() is working fine.


subscribe kafka user subject email

2015-01-27 Thread 刘见康



Re: Missing Per-Topic BrokerTopicMetrics in v0.8.2.0

2015-01-27 Thread Jason Rosenberg
Remember multiple people have reported this issue. Per topic metrics no
longer appear in graphite (or in any system modeled after the yammer
GraphiteReporter). They are not being seen as unique.

While these metrics are registered in the registry as separate ‘MetricName’
instances (varying only by mbeanName), the GraphiteReporter sends the
metrics to graphite using only the 4 fields I describe above. Consequently,
multiple metrics in the registry get sent to graphite under the same name.
Thus these metrics all end up in the same bucket in graphite, trampling
over each other making them useless. They aren’t ‘double counted’ so much
as flapping between multiple independent values.

We actually have our own Reporter class (based off the yammer
GraphiteReporter). Our version sends metrics through kafka which is then
consumed downstream by multiple metric consumers.

The ConsoleReporter isn’t useful for actually persisting metrics anywhere.
It’s just iterating through all the (identically named metrics in the
registry (save for the different mbeanNames))….

The mbeanName, as constructed, is not useful as a human readable metric
name, to be presented in a browsable tree of metrics, etc. The
‘group’:’type’:’name’:’scope’ are the pieces that matter.

The fix here is to produce MetricName instances similar to 0.8.1.1, etc. In
this case, it should probably be something like:

group: kafka.server
type: BrokerTopicMetrics
name: mytopic-BytesInPerSec
group: 

Jason
​

On Tue, Jan 27, 2015 at 7:26 AM, Manikumar Reddy 
wrote:

> I have enabled yammer's ConsoleReporter and I am getting all the metrics
> (including per-topic metrics).
>
> Yammer's MetricName object implements equals/hashcode methods using
> mBeanName . We are constructing a unique mBeanName for each metric, So we
> are not missing/overwriting any metrics.
>
> Current confusion is due to  MetricName.name(). This will be same
> (BytesInPerSec) for both broker level and topic level metrics. We need to
> use MetricName.getMBeanName() to differentiate between broker level and
> topic level metrics.
>
> 0.8.1  MBeanName:
> "kafka.server":type="BrokerTopicMetrics",name="AllTopicsBytesInPerSec"
> "kafka.server":type="BrokerTopicMetrics",name="MYTOPIC-BytesInPerSec"
>
> 0.8.2  MBeanName:
> kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec
> kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=MYTOPIC
>
>
> ConsoleReporter's O/P:
>
>   BytesInPerSec:  <- This is broker level
>  count = 1521
>  mean rate = 3.63 bytes/s
>  1-minute rate = 0.35 bytes/s
>  5-minute rate = 2.07 bytes/s
> 15-minute rate = 1.25 bytes/s
>
>   BytesInPerSec:  <- This is for topic1
>  count = 626
>  mean rate = 1.89 bytes/s
>  1-minute rate = 0.42 bytes/s
>  5-minute rate = 31.53 bytes/s
> 15-minute rate = 64.66 bytes/s
>
>   BytesInPerSec:  <- This is for topic2
>  count = 895
>  mean rate = 3.62 bytes/s
>  1-minute rate = 1.39 bytes/s
>  5-minute rate = 30.08 bytes/s
> 15-minute rate = 50.27 bytes/s
>
> Manikumar
>
> On Tue, Jan 27, 2015 at 1:59 PM, Jason Rosenberg  wrote:
>
> > Ok,
> >
> > It looks like the yammer MetricName is not being created correctly for
> the
> > sub metrics that include a topic. E.g. a metric with an mbeanName like:
> >
> >
> kafka.server:type=BrokerTopicMetrics,name=BytesRejectedPerSec,topic=mytopic
> >
> > appears to be malformed. A yammer MetricName has 4 fields that are used
> in
> > creating a graphite metric, that are included in the constructor:
> > group, type, name, scope.
> >
> > In this case, the metric with the above mbeanName has these fields set in
> > the MetricName:
> >
> > group: "kafka.server"
> > type: "BrokerTopicMetrics"
> > name: "BytesInPerSec"
> > scope: null
> >
> > Thus, the topic metrics all look the same, and get lumped into the
> > top-level BrokerTopicMetrics (and thus that will now be double counted).
> It
> > looks like the fix for kafka-1481 was where things got broken. It seems
> to
> > have introduced ‘tags’ in the building of metric names, and then those
> tags
> > only get applied to the mbeanName, but get excluded from the metric name:
> >
> >
> https://github.com/apache/kafka/commit/457744a820d806e546edebbd8ffd33f6772e519f
> >
> > This is a pretty severe issue, since the yammer metrics for these stats
> > will be double counted in aggregate, and the per-topic stats will be
> > removed.
> >
> > I should note too, in my previous email, I thought that only the
> per-topic
> > BrokerTopicMetrics were missing, but also several other per-topic metrics
> > are missing too, e.g. under kafka.log, etc.
> >
> > Jason
> > ​
> >
> > On Tue, Jan 27, 2015 at 2:20 AM, Jason Rosenberg 
> wrote:
> >
> > > I can confirm that the per topic metrics are not coming through to the
> > > yammer metrics registry.  I do see them in jmx (via jconsole), but the
> > > MetricsRegistry does not have them.
> > > All the other metrics are coming throug

Re: Missing Per-Topic BrokerTopicMetrics in v0.8.2.0

2015-01-27 Thread Manikumar Reddy
I have enabled yammer's ConsoleReporter and I am getting all the metrics
(including per-topic metrics).

Yammer's MetricName object implements equals/hashcode methods using
mBeanName . We are constructing a unique mBeanName for each metric, So we
are not missing/overwriting any metrics.

Current confusion is due to  MetricName.name(). This will be same
(BytesInPerSec) for both broker level and topic level metrics. We need to
use MetricName.getMBeanName() to differentiate between broker level and
topic level metrics.

0.8.1  MBeanName:
"kafka.server":type="BrokerTopicMetrics",name="AllTopicsBytesInPerSec"
"kafka.server":type="BrokerTopicMetrics",name="MYTOPIC-BytesInPerSec"

0.8.2  MBeanName:
kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec
kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=MYTOPIC


ConsoleReporter's O/P:

  BytesInPerSec:  <- This is broker level
 count = 1521
 mean rate = 3.63 bytes/s
 1-minute rate = 0.35 bytes/s
 5-minute rate = 2.07 bytes/s
15-minute rate = 1.25 bytes/s

  BytesInPerSec:  <- This is for topic1
 count = 626
 mean rate = 1.89 bytes/s
 1-minute rate = 0.42 bytes/s
 5-minute rate = 31.53 bytes/s
15-minute rate = 64.66 bytes/s

  BytesInPerSec:  <- This is for topic2
 count = 895
 mean rate = 3.62 bytes/s
 1-minute rate = 1.39 bytes/s
 5-minute rate = 30.08 bytes/s
15-minute rate = 50.27 bytes/s

Manikumar

On Tue, Jan 27, 2015 at 1:59 PM, Jason Rosenberg  wrote:

> Ok,
>
> It looks like the yammer MetricName is not being created correctly for the
> sub metrics that include a topic. E.g. a metric with an mbeanName like:
>
> kafka.server:type=BrokerTopicMetrics,name=BytesRejectedPerSec,topic=mytopic
>
> appears to be malformed. A yammer MetricName has 4 fields that are used in
> creating a graphite metric, that are included in the constructor:
> group, type, name, scope.
>
> In this case, the metric with the above mbeanName has these fields set in
> the MetricName:
>
> group: "kafka.server"
> type: "BrokerTopicMetrics"
> name: "BytesInPerSec"
> scope: null
>
> Thus, the topic metrics all look the same, and get lumped into the
> top-level BrokerTopicMetrics (and thus that will now be double counted). It
> looks like the fix for kafka-1481 was where things got broken. It seems to
> have introduced ‘tags’ in the building of metric names, and then those tags
> only get applied to the mbeanName, but get excluded from the metric name:
>
> https://github.com/apache/kafka/commit/457744a820d806e546edebbd8ffd33f6772e519f
>
> This is a pretty severe issue, since the yammer metrics for these stats
> will be double counted in aggregate, and the per-topic stats will be
> removed.
>
> I should note too, in my previous email, I thought that only the per-topic
> BrokerTopicMetrics were missing, but also several other per-topic metrics
> are missing too, e.g. under kafka.log, etc.
>
> Jason
> ​
>
> On Tue, Jan 27, 2015 at 2:20 AM, Jason Rosenberg  wrote:
>
> > I can confirm that the per topic metrics are not coming through to the
> > yammer metrics registry.  I do see them in jmx (via jconsole), but the
> > MetricsRegistry does not have them.
> > All the other metrics are coming through that appear in jmx.
> >
> > This is with single node instance running locally.
> >
> > Jason
> >
> >
> >
> > On Mon, Jan 26, 2015 at 8:30 PM, Manikumar Reddy 
> > wrote:
> >
> >> If you are using multi-node cluster, then metrics may be reported from
> >> other servers.
> >> pl check all the servers in the cluster.
> >>
> >> On Tue, Jan 27, 2015 at 4:12 AM, Kyle Banker 
> >> wrote:
> >>
> >> > I've been using a custom KafkaMetricsReporter to report Kafka broker
> >> > metrics to Graphite. In v0.8.1.1, Kafka was reporting bytes and
> >> messages in
> >> > and out for all topics together and for each individual topic.
> >> >
> >> > After upgrading to v0.8.2.0, these metrics are no longer being
> reported.
> >> >
> >> > I'm only seeing the following:
> >> > BrokerTopicMetrics
> >> > - BytesInPerSec
> >> > - BytesOutPerSec
> >> > - BytesRejectedPerSec
> >> > - MessagesInPerSec
> >> >
> >> > What's more, despite lots of successful writes to the cluster, the
> >> values
> >> > for these remaining metrics are all zero.
> >> >
> >> > I saw that there was some refactoring of metric naming code. Was the
> >> > behavior supposed to have changed?
> >> >
> >> > Many thanks in advance.
> >> >
> >>
> >
> >
>


Re: Missing Per-Topic BrokerTopicMetrics in v0.8.2.0

2015-01-27 Thread Joel Koshy
Is it actually getting double-counted? I tried reproducing this
locally but the BrokerTopicMetrics.Count lines up with the sum of the
PerTopic.Counts for various metrics.

On Tue, Jan 27, 2015 at 03:29:37AM -0500, Jason Rosenberg wrote:
> Ok,
> 
> It looks like the yammer MetricName is not being created correctly for the
> sub metrics that include a topic. E.g. a metric with an mbeanName like:
> 
> kafka.server:type=BrokerTopicMetrics,name=BytesRejectedPerSec,topic=mytopic
> 
> appears to be malformed. A yammer MetricName has 4 fields that are used in
> creating a graphite metric, that are included in the constructor:
> group, type, name, scope.
> 
> In this case, the metric with the above mbeanName has these fields set in
> the MetricName:
> 
> group: "kafka.server"
> type: "BrokerTopicMetrics"
> name: "BytesInPerSec"
> scope: null
> 
> Thus, the topic metrics all look the same, and get lumped into the
> top-level BrokerTopicMetrics (and thus that will now be double counted). It
> looks like the fix for kafka-1481 was where things got broken. It seems to
> have introduced ‘tags’ in the building of metric names, and then those tags
> only get applied to the mbeanName, but get excluded from the metric name:
> https://github.com/apache/kafka/commit/457744a820d806e546edebbd8ffd33f6772e519f
> 
> This is a pretty severe issue, since the yammer metrics for these stats
> will be double counted in aggregate, and the per-topic stats will be
> removed.
> 
> I should note too, in my previous email, I thought that only the per-topic
> BrokerTopicMetrics were missing, but also several other per-topic metrics
> are missing too, e.g. under kafka.log, etc.
> 
> Jason
> ​
> 
> On Tue, Jan 27, 2015 at 2:20 AM, Jason Rosenberg  wrote:
> 
> > I can confirm that the per topic metrics are not coming through to the
> > yammer metrics registry.  I do see them in jmx (via jconsole), but the
> > MetricsRegistry does not have them.
> > All the other metrics are coming through that appear in jmx.
> >
> > This is with single node instance running locally.
> >
> > Jason
> >
> >
> >
> > On Mon, Jan 26, 2015 at 8:30 PM, Manikumar Reddy 
> > wrote:
> >
> >> If you are using multi-node cluster, then metrics may be reported from
> >> other servers.
> >> pl check all the servers in the cluster.
> >>
> >> On Tue, Jan 27, 2015 at 4:12 AM, Kyle Banker 
> >> wrote:
> >>
> >> > I've been using a custom KafkaMetricsReporter to report Kafka broker
> >> > metrics to Graphite. In v0.8.1.1, Kafka was reporting bytes and
> >> messages in
> >> > and out for all topics together and for each individual topic.
> >> >
> >> > After upgrading to v0.8.2.0, these metrics are no longer being reported.
> >> >
> >> > I'm only seeing the following:
> >> > BrokerTopicMetrics
> >> > - BytesInPerSec
> >> > - BytesOutPerSec
> >> > - BytesRejectedPerSec
> >> > - MessagesInPerSec
> >> >
> >> > What's more, despite lots of successful writes to the cluster, the
> >> values
> >> > for these remaining metrics are all zero.
> >> >
> >> > I saw that there was some refactoring of metric naming code. Was the
> >> > behavior supposed to have changed?
> >> >
> >> > Many thanks in advance.
> >> >
> >>
> >
> >



Re: Missing Per-Topic BrokerTopicMetrics in v0.8.2.0

2015-01-27 Thread Jason Rosenberg
Ok,

It looks like the yammer MetricName is not being created correctly for the
sub metrics that include a topic. E.g. a metric with an mbeanName like:

kafka.server:type=BrokerTopicMetrics,name=BytesRejectedPerSec,topic=mytopic

appears to be malformed. A yammer MetricName has 4 fields that are used in
creating a graphite metric, that are included in the constructor:
group, type, name, scope.

In this case, the metric with the above mbeanName has these fields set in
the MetricName:

group: "kafka.server"
type: "BrokerTopicMetrics"
name: "BytesInPerSec"
scope: null

Thus, the topic metrics all look the same, and get lumped into the
top-level BrokerTopicMetrics (and thus that will now be double counted). It
looks like the fix for kafka-1481 was where things got broken. It seems to
have introduced ‘tags’ in the building of metric names, and then those tags
only get applied to the mbeanName, but get excluded from the metric name:
https://github.com/apache/kafka/commit/457744a820d806e546edebbd8ffd33f6772e519f

This is a pretty severe issue, since the yammer metrics for these stats
will be double counted in aggregate, and the per-topic stats will be
removed.

I should note too, in my previous email, I thought that only the per-topic
BrokerTopicMetrics were missing, but also several other per-topic metrics
are missing too, e.g. under kafka.log, etc.

Jason
​

On Tue, Jan 27, 2015 at 2:20 AM, Jason Rosenberg  wrote:

> I can confirm that the per topic metrics are not coming through to the
> yammer metrics registry.  I do see them in jmx (via jconsole), but the
> MetricsRegistry does not have them.
> All the other metrics are coming through that appear in jmx.
>
> This is with single node instance running locally.
>
> Jason
>
>
>
> On Mon, Jan 26, 2015 at 8:30 PM, Manikumar Reddy 
> wrote:
>
>> If you are using multi-node cluster, then metrics may be reported from
>> other servers.
>> pl check all the servers in the cluster.
>>
>> On Tue, Jan 27, 2015 at 4:12 AM, Kyle Banker 
>> wrote:
>>
>> > I've been using a custom KafkaMetricsReporter to report Kafka broker
>> > metrics to Graphite. In v0.8.1.1, Kafka was reporting bytes and
>> messages in
>> > and out for all topics together and for each individual topic.
>> >
>> > After upgrading to v0.8.2.0, these metrics are no longer being reported.
>> >
>> > I'm only seeing the following:
>> > BrokerTopicMetrics
>> > - BytesInPerSec
>> > - BytesOutPerSec
>> > - BytesRejectedPerSec
>> > - MessagesInPerSec
>> >
>> > What's more, despite lots of successful writes to the cluster, the
>> values
>> > for these remaining metrics are all zero.
>> >
>> > I saw that there was some refactoring of metric naming code. Was the
>> > behavior supposed to have changed?
>> >
>> > Many thanks in advance.
>> >
>>
>
>


Re: Can't create a topic; can't delete it either

2015-01-27 Thread Joel Koshy
Which version of the broker are you using?

On Mon, Jan 26, 2015 at 10:27:14PM -0800, Sumit Rangwala wrote:
> While running kafka in production I found an issue where a topic wasn't
> getting created even with auto topic enabled. I then went ahead and created
> the topic manually (from the command line). I then delete the topic, again
> manually. Now my broker won't allow me to either create *the* topic or
> delete *the* topic. (other topic creation and deletion is working fine).
> 
> The topic is in "marked for deletion" stage for more than 3 hours.
> 
> $ bin/kafka-topics.sh --zookeeper zookeeper1:2181/replication/kafka  --list
> --topic GRIFFIN-TldAdFormat.csv-1422321736886
> GRIFFIN-TldAdFormat.csv-1422321736886 - marked for deletion
> 
> If this is a known issue, is there a workaround?
> 
> Sumit



Re: Missing Per-Topic BrokerTopicMetrics in v0.8.2.0

2015-01-27 Thread Joel Koshy
Hi Jason - can you describe how you verify that the metrics are not
coming through to the metrics registry?  Looking at the metrics code
it seems that the mbeans are only registered by the yammer jmx
reporter only after being added to the metrics registry.

Thanks,

Joel

On Tue, Jan 27, 2015 at 02:20:38AM -0500, Jason Rosenberg wrote:
> I can confirm that the per topic metrics are not coming through to the
> yammer metrics registry.  I do see them in jmx (via jconsole), but the
> MetricsRegistry does not have them.
> All the other metrics are coming through that appear in jmx.
> 
> This is with single node instance running locally.
> 
> Jason
> 
> 
> 
> On Mon, Jan 26, 2015 at 8:30 PM, Manikumar Reddy 
> wrote:
> 
> > If you are using multi-node cluster, then metrics may be reported from
> > other servers.
> > pl check all the servers in the cluster.
> >
> > On Tue, Jan 27, 2015 at 4:12 AM, Kyle Banker  wrote:
> >
> > > I've been using a custom KafkaMetricsReporter to report Kafka broker
> > > metrics to Graphite. In v0.8.1.1, Kafka was reporting bytes and messages
> > in
> > > and out for all topics together and for each individual topic.
> > >
> > > After upgrading to v0.8.2.0, these metrics are no longer being reported.
> > >
> > > I'm only seeing the following:
> > > BrokerTopicMetrics
> > > - BytesInPerSec
> > > - BytesOutPerSec
> > > - BytesRejectedPerSec
> > > - MessagesInPerSec
> > >
> > > What's more, despite lots of successful writes to the cluster, the values
> > > for these remaining metrics are all zero.
> > >
> > > I saw that there was some refactoring of metric naming code. Was the
> > > behavior supposed to have changed?
> > >
> > > Many thanks in advance.
> > >
> >