Re: Poor performance running performance test
I was running the performance command from a virtual box server, so that seems like it was part of the problem. I'm getting better results running this on a server on aws, but that's kind of expected. Can you look at these results, and comment on the occasional warning I see? I appreciate it! 1220375 records sent, 243928.6 records/sec (23.26 MB/sec), 2111.5 ms avg latency, 4435.0 max latency. 1195090 records sent, 239018.0 records/sec (22.79 MB/sec), 2203.1 ms avg latency, 4595.0 max latency. 1257165 records sent, 251433.0 records/sec (23.98 MB/sec), 2172.6 ms avg latency, 4525.0 max latency. 1230981 records sent, 246196.2 records/sec (23.48 MB/sec), 2173.5 ms avg latency, 4465.0 max latency. [2015-01-28 07:19:07,274] WARN Error in I/O with (org.apache.kafka.common.network.Selector) java.io.EOFException at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62) at org.apache.kafka.common.network.Selector.poll(Selector.java:248) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122) at java.lang.Thread.run(Thread.java:745) 1090689 records sent, 218137.8 records/sec (20.80 MB/sec), 2413.6 ms avg latency, 4829.0 max latency. On Tue, Jan 27, 2015 at 7:37 PM, Ewen Cheslack-Postava wrote: > Where are you running ProducerPerformance in relation to ZK and the Kafka > brokers? You should definitely see much higher performance than this. > > A couple of other things I can think of that might be going wrong: Are all > your VMs in the same AZ? Are you storing Kafka data in EBS or local > ephemeral storage? If EBS, have you provisioned enough IOPS. > > > On Tue, Jan 27, 2015 at 4:29 PM, Dillian Murphey > wrote: > > > I'm a new user/admin to kafka. I'm running a 3 node ZK and a 6 brokers on > > aws. > > > > The performance I'm seeing is shockingly bad. I need some advice! > > > > bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance > > test2 5000 100 -1 acks=1 bootstrap.servers=5:9092 > > buffer.memory=67108864 batch.size=8196 > > > > > > > > > > 6097 records sent, 13198.3 records/sec (1.26 MB/sec), 2098.0 ms avg > > latency, 4306.0 max latency. > > 71695 records sent, 14339.0 records/sec (1.37 MB/sec), 6658.1 ms avg > > latency, 9053.0 max latency. > > 65195 records sent, 13028.6 records/sec (1.24 MB/sec), 11504.0 ms avg > > latency, 13809.0 max latency. > > 71955 records sent, 14391.0 records/sec (1.37 MB/sec), 16137.4 ms avg > > latency, 18541.0 max latency. > > > > Thanks for any help! > > > > > > -- > Thanks, > Ewen >
Re: Can't create a topic; can't delete it either
Do you still have the controller and state change logs from the time you originally tried to delete the topic? On Tue, Jan 27, 2015 at 03:11:48PM -0800, Sumit Rangwala wrote: > I am using 0.8.2-beta on brokers 0.8.1.1 for client (producer and > consumers). delete.topic.enable=true on all brokers. replication factor is > < number of brokers. I see this issue with just one single topic, all other > topics are fine (creation and deletion). Even after a day it is still in > marked for deletion stage. Let me know what other information from the > brokers or the zookeepers can help me debug this issue. > > On Tue, Jan 27, 2015 at 9:47 AM, Gwen Shapira wrote: > > > Also, do you have delete.topic.enable=true on all brokers? > > > > The automatic topic creation can fail if the default number of > > replicas is greater than number of available brokers. Check the > > default.replication.factor parameter. > > > > Gwen > > > > On Tue, Jan 27, 2015 at 12:29 AM, Joel Koshy wrote: > > > Which version of the broker are you using? > > > > > > On Mon, Jan 26, 2015 at 10:27:14PM -0800, Sumit Rangwala wrote: > > >> While running kafka in production I found an issue where a topic wasn't > > >> getting created even with auto topic enabled. I then went ahead and > > created > > >> the topic manually (from the command line). I then delete the topic, > > again > > >> manually. Now my broker won't allow me to either create *the* topic or > > >> delete *the* topic. (other topic creation and deletion is working fine). > > >> > > >> The topic is in "marked for deletion" stage for more than 3 hours. > > >> > > >> $ bin/kafka-topics.sh --zookeeper zookeeper1:2181/replication/kafka > > --list > > >> --topic GRIFFIN-TldAdFormat.csv-1422321736886 > > >> GRIFFIN-TldAdFormat.csv-1422321736886 - marked for deletion > > >> > > >> If this is a known issue, is there a workaround? > > >> > > >> Sumit > > > > >
Re: Ques regarding topic partition offset
This can happen as a result of unclean leader elections - there are mbeans on the controller that give the unclean leader election rate - or you can check the controller logs to determine if this happened. On Tue, Jan 27, 2015 at 09:54:38PM -0800, Liju John wrote: > Hi , > > I have query regarding partition offset . > > While running kafka cluster for some time ,I noticed that the partition > offset keeps on increasing and at some point the offset decreased by some > number . > In what scenarios does the offset of a topic partition reduces ? > > The problem I am facing is that my consumers are pulling the msgs from the > topic but at somepoint in time it throws exception that the current offset > is greater that the latest offset of the partition . > Is it because of retension the latest offset gets reset ? How can I handle > this scenarios > > Regards, > Liju John > S/W developer
Re: [DISCUSSION] Boot dependency in the new producer
Steven, You are right, I was wrong about the previous email: it will not set the flag, the Sender thread will trigger Client.poll() but with Int.Max select time, hence this should not be an issue. I am closing this discussion now. Guozhang On Mon, Jan 26, 2015 at 4:42 PM, Steven Wu wrote: > Jay, I don't think this line will bootstrap full metadata (for all topics). > it will just construct the cluster object with bootstrap host. you need to > do "metadata.add(topic)" to set interest of a topic's partition metadata. > > Guozhang, I personally think this is ok. it just do a few DNS lookup or TCP > connection before first send. > > On Mon, Jan 26, 2015 at 2:07 PM, Jay Kreps wrote: > > > Oh, yes, I guess I thought you meant that construction of the client > would > > block on the metadata request. > > > > I don't personally think that is a problem because if it fails it will > > retry in the background, right? > > > > But actually I think this is probably violating another desirable > criteria > > we had talked about which was keeping the producer from bootstrapping the > > full metadata for all partitions. If it is doing that during construction > > time presumably the resulting metadata request is for all partitions, no? > > That isn't a huge problem, but I think isn't what was intended. > > > > -Jay > > > > On Mon, Jan 26, 2015 at 1:34 PM, Guozhang Wang > wrote: > > > > > It will set the needUpdate flag to true and hence the background Sender > > > will try to talk to the bootstrap servers. > > > > > > Guozhang > > > > > > On Mon, Jan 26, 2015 at 1:12 PM, Jay Kreps > wrote: > > > > > > > Hey Guozhang, > > > > > > > > That line shouldn't cause any connections to Kafka to be established, > > > does > > > > it? All that is doing is creating the Cluster pojo using the supplied > > > > addresses. The use of InetSocketAddress may cause some dns stuff to > > > happen, > > > > though... > > > > > > > > -Jay > > > > > > > > On Mon, Jan 26, 2015 at 10:50 AM, Guozhang Wang > > > > wrote: > > > > > > > > > Hi all, > > > > > > > > > > I am not sure if we have discussed about this before, but recently > I > > > > > realized that we have introduced boot dependency of the > kafka-server > > > > > specified by the "bootstrap.servers" config in the new producer. > More > > > > > specifically, although in the old producer we also have a similar > > > config > > > > > for specifying the broker list, the producer will not try to > connect > > to > > > > > those brokers until the first message send call is triggered; > whereas > > > in > > > > > the new producer, it will try to talk to them in construction time > > via: > > > > > > > > > > update(Cluster.bootstrap(addresses), time.milliseconds()); > > > > > > > > > > > > > > > I personally am neutral to this change, as in most cases the > > > > corresponding > > > > > kafka server should be up and running before the producer clients > are > > > > > deployed, but there are still some corner cases when it is not > true, > > > for > > > > > example some standalone deployment tests of the app embedded with > > some > > > > > clients, etc. So I would like to bring this up to people's > attention > > if > > > > we > > > > > have not discussed about it before: do we think this is OK to > > introduce > > > > > this boot dependency in the new producer? > > > > > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > -- -- Guozhang
Re: SimpleConsumer leaks sockets on an UnresolvedAddressException
Ewen, you are right, the patch is committed on Feb.20th last year, I will leave a comment and close that ticket. On Tue, Jan 27, 2015 at 7:24 PM, Ewen Cheslack-Postava wrote: > This was fixed in commit 6ab9b1ecd8 for KAFKA-1235 and it looks like that > will only be included in 0.8.2. > > Guozhang, it looks like you wrote the patch, Jun reviewed it, but the bug > is still open and there's a comment that moved it to 0.9 after the commit > was already made. Was the commit a mistake or did we just forget to close > it? > > On Tue, Jan 27, 2015 at 10:20 AM, Rajiv Kurian > wrote: > > > Here is the relevant stack trace: > > > > java.nio.channels.UnresolvedAddressException: null > > > > at sun.nio.ch.Net.checkAddress(Net.java:127) ~[na:1.7.0_55] > > > > at > sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:644) > > ~[na:1.7.0_55] > > > > at kafka.network.BlockingChannel.connect(Unknown Source) > > ~[kafka_2.10-0.8.0.jar:0.8.0] > > > > at kafka.consumer.SimpleConsumer.connect(Unknown Source) > > ~[kafka_2.10-0.8.0.jar:0.8.0] > > > > at kafka.consumer.SimpleConsumer.getOrMakeConnection(Unknown > > Source) ~[kafka_2.10-0.8.0.jar:0.8.0] > > > > at > > > > > kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(Unknown > > Source) ~[kafka_2.10-0.8.0.jar:0.8.0] > > > > at > > > > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Unknown > > Source) ~[kafka_2.10-0.8.0.jar:0.8.0] > > > > at > > > > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(Unknown > > Source) ~[kafka_2.10-0.8.0.jar:0.8.0] > > > > at > > > > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(Unknown > > Source) ~[kafka_2.10-0.8.0.jar:0.8.0] > > > > at kafka.metrics.KafkaTimer.time(Unknown Source) > > ~[kafka_2.10-0.8.0.jar:0.8.0] > > > > at > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(Unknown > Source) > > ~[kafka_2.10-0.8.0.jar:0.8.0] > > > > at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(Unknown > > Source) ~[kafka_2.10-0.8.0.jar:0.8.0] > > > > at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(Unknown > > Source) ~[kafka_2.10-0.8.0.jar:0.8.0] > > > > at kafka.metrics.KafkaTimer.time(Unknown Source) > > ~[kafka_2.10-0.8.0.jar:0.8.0] > > > > at kafka.consumer.SimpleConsumer.fetch(Unknown Source) > > ~[kafka_2.10-0.8.0.jar:0.8.0] > > > > at kafka.javaapi.consumer.SimpleConsumer.fetch(Unknown Source) > > ~[kafka_2.10-0.8.0.jar:0.8.0] > > > > On Tue, Jan 27, 2015 at 10:19 AM, Rajiv Kurian > > wrote: > > > > > I am using 0.8.1. The source is here: > > > > > > https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/consumer/SimpleConsumer.scala > > > > > > Here is the definition of disconnect(): > > > private def disconnect() = { > > > if(blockingChannel.isConnected) { > > > debug("Disconnecting from " + host + ":" + port) > > > blockingChannel.disconnect() > > > } > > > } > > > It checks if blockingChannel.isConnected before calling > > > blockingChannel.disconnect(). I think if there is an > > > UnresolvedAddressException, the isConnected is never set and the > > > blockingChannel.disconnect() is never called. But by this point we have > > > already created a socket and will leak it. > > > > > > The same problem might be present in the connect method of the > > > BlockingChannel at > > > > > > https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/network/BlockingChannel.scala > > . > > > Though its own disconnect method seems to check for both the connected: > > > > > > def disconnect() = lock synchronized { > > > // My comment: connected will not be set if we get an > > > UnresolvedAddressException but channel should NOT be null, so we will > > > probably still do the right thing. > > > if(connected || channel != null) { > > > // closing the main socket channel *should* close the read > channel > > > // but let's do it to be sure. > > > swallow(channel.close()) > > > swallow(channel.socket.close()) > > > if(readChannel != null) swallow(readChannel.close()) > > > channel = null; readChannel = null; writeChannel = null > > > connected = false > > > } > > > } > > > > > > > > > > > > On Tue, Jan 27, 2015 at 9:03 AM, Guozhang Wang > > wrote: > > > > > >> Rajiv, > > >> > > >> Which version of Kafka are you using? I just checked SimpleConsumer's > > >> code, > > >> and in its close() function, disconnect() is called, which will close > > the > > >> socket. > > >> > > >> Guozhang > > >> > > >> > > >> On Mon, Jan 26, 2015 at 2:36 PM, Rajiv Kurian > > >> wrote: > > >> > > >> > Meant to write a run loop. > > >> > > > >> > void run() { > > >> > while (running) { > > >> > if (simpleConsumer == null) { > > >> > simpleConsumer = new SimpleConsumer(host, port, >
Ques regarding topic partition offset
Hi , I have query regarding partition offset . While running kafka cluster for some time ,I noticed that the partition offset keeps on increasing and at some point the offset decreased by some number . In what scenarios does the offset of a topic partition reduces ? The problem I am facing is that my consumers are pulling the msgs from the topic but at somepoint in time it throws exception that the current offset is greater that the latest offset of the partition . Is it because of retension the latest offset gets reset ? How can I handle this scenarios Regards, Liju John S/W developer
Re: Poor performance running performance test
Where are you running ProducerPerformance in relation to ZK and the Kafka brokers? You should definitely see much higher performance than this. A couple of other things I can think of that might be going wrong: Are all your VMs in the same AZ? Are you storing Kafka data in EBS or local ephemeral storage? If EBS, have you provisioned enough IOPS. On Tue, Jan 27, 2015 at 4:29 PM, Dillian Murphey wrote: > I'm a new user/admin to kafka. I'm running a 3 node ZK and a 6 brokers on > aws. > > The performance I'm seeing is shockingly bad. I need some advice! > > bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance > test2 5000 100 -1 acks=1 bootstrap.servers=5:9092 > buffer.memory=67108864 batch.size=8196 > > > > > 6097 records sent, 13198.3 records/sec (1.26 MB/sec), 2098.0 ms avg > latency, 4306.0 max latency. > 71695 records sent, 14339.0 records/sec (1.37 MB/sec), 6658.1 ms avg > latency, 9053.0 max latency. > 65195 records sent, 13028.6 records/sec (1.24 MB/sec), 11504.0 ms avg > latency, 13809.0 max latency. > 71955 records sent, 14391.0 records/sec (1.37 MB/sec), 16137.4 ms avg > latency, 18541.0 max latency. > > Thanks for any help! > -- Thanks, Ewen
Re: SimpleConsumer leaks sockets on an UnresolvedAddressException
This was fixed in commit 6ab9b1ecd8 for KAFKA-1235 and it looks like that will only be included in 0.8.2. Guozhang, it looks like you wrote the patch, Jun reviewed it, but the bug is still open and there's a comment that moved it to 0.9 after the commit was already made. Was the commit a mistake or did we just forget to close it? On Tue, Jan 27, 2015 at 10:20 AM, Rajiv Kurian wrote: > Here is the relevant stack trace: > > java.nio.channels.UnresolvedAddressException: null > > at sun.nio.ch.Net.checkAddress(Net.java:127) ~[na:1.7.0_55] > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:644) > ~[na:1.7.0_55] > > at kafka.network.BlockingChannel.connect(Unknown Source) > ~[kafka_2.10-0.8.0.jar:0.8.0] > > at kafka.consumer.SimpleConsumer.connect(Unknown Source) > ~[kafka_2.10-0.8.0.jar:0.8.0] > > at kafka.consumer.SimpleConsumer.getOrMakeConnection(Unknown > Source) ~[kafka_2.10-0.8.0.jar:0.8.0] > > at > > kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(Unknown > Source) ~[kafka_2.10-0.8.0.jar:0.8.0] > > at > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Unknown > Source) ~[kafka_2.10-0.8.0.jar:0.8.0] > > at > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(Unknown > Source) ~[kafka_2.10-0.8.0.jar:0.8.0] > > at > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(Unknown > Source) ~[kafka_2.10-0.8.0.jar:0.8.0] > > at kafka.metrics.KafkaTimer.time(Unknown Source) > ~[kafka_2.10-0.8.0.jar:0.8.0] > > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(Unknown Source) > ~[kafka_2.10-0.8.0.jar:0.8.0] > > at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(Unknown > Source) ~[kafka_2.10-0.8.0.jar:0.8.0] > > at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(Unknown > Source) ~[kafka_2.10-0.8.0.jar:0.8.0] > > at kafka.metrics.KafkaTimer.time(Unknown Source) > ~[kafka_2.10-0.8.0.jar:0.8.0] > > at kafka.consumer.SimpleConsumer.fetch(Unknown Source) > ~[kafka_2.10-0.8.0.jar:0.8.0] > > at kafka.javaapi.consumer.SimpleConsumer.fetch(Unknown Source) > ~[kafka_2.10-0.8.0.jar:0.8.0] > > On Tue, Jan 27, 2015 at 10:19 AM, Rajiv Kurian > wrote: > > > I am using 0.8.1. The source is here: > > > https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/consumer/SimpleConsumer.scala > > > > Here is the definition of disconnect(): > > private def disconnect() = { > > if(blockingChannel.isConnected) { > > debug("Disconnecting from " + host + ":" + port) > > blockingChannel.disconnect() > > } > > } > > It checks if blockingChannel.isConnected before calling > > blockingChannel.disconnect(). I think if there is an > > UnresolvedAddressException, the isConnected is never set and the > > blockingChannel.disconnect() is never called. But by this point we have > > already created a socket and will leak it. > > > > The same problem might be present in the connect method of the > > BlockingChannel at > > > https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/network/BlockingChannel.scala > . > > Though its own disconnect method seems to check for both the connected: > > > > def disconnect() = lock synchronized { > > // My comment: connected will not be set if we get an > > UnresolvedAddressException but channel should NOT be null, so we will > > probably still do the right thing. > > if(connected || channel != null) { > > // closing the main socket channel *should* close the read channel > > // but let's do it to be sure. > > swallow(channel.close()) > > swallow(channel.socket.close()) > > if(readChannel != null) swallow(readChannel.close()) > > channel = null; readChannel = null; writeChannel = null > > connected = false > > } > > } > > > > > > > > On Tue, Jan 27, 2015 at 9:03 AM, Guozhang Wang > wrote: > > > >> Rajiv, > >> > >> Which version of Kafka are you using? I just checked SimpleConsumer's > >> code, > >> and in its close() function, disconnect() is called, which will close > the > >> socket. > >> > >> Guozhang > >> > >> > >> On Mon, Jan 26, 2015 at 2:36 PM, Rajiv Kurian > >> wrote: > >> > >> > Meant to write a run loop. > >> > > >> > void run() { > >> > while (running) { > >> > if (simpleConsumer == null) { > >> > simpleConsumer = new SimpleConsumer(host, port, > >> > (int) kafkaSocketTimeout, kafkaRExeiveBufferSize, clientName); > >> > } > >> > try { > >> > // Do stuff with simpleConsumer. > >> > } catch (Exception e) { > >> > logger.error(e); // Assume UnresolvedAddressException. > >> > if (consumer != null) { > >> > simpleConsumer.close(); > >> > simpleConsumer = null; > >> > } > >> > } > >> > } > >> > } > >> > > >> > On Mon, Jan 26, 2015 at 2:27 PM, Rajiv Kurian
Poor performance running performance test
I'm a new user/admin to kafka. I'm running a 3 node ZK and a 6 brokers on aws. The performance I'm seeing is shockingly bad. I need some advice! bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance test2 5000 100 -1 acks=1 bootstrap.servers=5:9092 buffer.memory=67108864 batch.size=8196 6097 records sent, 13198.3 records/sec (1.26 MB/sec), 2098.0 ms avg latency, 4306.0 max latency. 71695 records sent, 14339.0 records/sec (1.37 MB/sec), 6658.1 ms avg latency, 9053.0 max latency. 65195 records sent, 13028.6 records/sec (1.24 MB/sec), 11504.0 ms avg latency, 13809.0 max latency. 71955 records sent, 14391.0 records/sec (1.37 MB/sec), 16137.4 ms avg latency, 18541.0 max latency. Thanks for any help!
Re: Can't create a topic; can't delete it either
I am using 0.8.2-beta on brokers 0.8.1.1 for client (producer and consumers). delete.topic.enable=true on all brokers. replication factor is < number of brokers. I see this issue with just one single topic, all other topics are fine (creation and deletion). Even after a day it is still in marked for deletion stage. Let me know what other information from the brokers or the zookeepers can help me debug this issue. On Tue, Jan 27, 2015 at 9:47 AM, Gwen Shapira wrote: > Also, do you have delete.topic.enable=true on all brokers? > > The automatic topic creation can fail if the default number of > replicas is greater than number of available brokers. Check the > default.replication.factor parameter. > > Gwen > > On Tue, Jan 27, 2015 at 12:29 AM, Joel Koshy wrote: > > Which version of the broker are you using? > > > > On Mon, Jan 26, 2015 at 10:27:14PM -0800, Sumit Rangwala wrote: > >> While running kafka in production I found an issue where a topic wasn't > >> getting created even with auto topic enabled. I then went ahead and > created > >> the topic manually (from the command line). I then delete the topic, > again > >> manually. Now my broker won't allow me to either create *the* topic or > >> delete *the* topic. (other topic creation and deletion is working fine). > >> > >> The topic is in "marked for deletion" stage for more than 3 hours. > >> > >> $ bin/kafka-topics.sh --zookeeper zookeeper1:2181/replication/kafka > --list > >> --topic GRIFFIN-TldAdFormat.csv-1422321736886 > >> GRIFFIN-TldAdFormat.csv-1422321736886 - marked for deletion > >> > >> If this is a known issue, is there a workaround? > >> > >> Sumit > > >
Re: [VOTE] 0.8.2.0 Candidate 2 (with the correct links)
Hi, Everyone, We identified a blocker issue related to Yammer jmx reporter ( https://issues.apache.org/jira/browse/KAFKA-1902). We are addressing this issue right now. Once it's resolved, we will roll out a new RC for voting. Thanks, Jun On Mon, Jan 26, 2015 at 5:14 PM, Otis Gospodnetic < otis.gospodne...@gmail.com> wrote: > Hi, > > Don't use Graphite, so I don't know. Kyle, maybe you can share more info? > What do you mean by "reported to Yammer" for example? And when you say > Yammer/Graphite, are you trying to say that you are using the Graphite > Reporter? If so, can you try other Yammer Reporters and see if there is a > more general issue or something limited to either Graphite or Graphite > Reporter? > > I am pretty sure we are able to see all Kafka 0.8.2 metrics nicely in SPM > (in non-public version of the Kafka monitoring agent). > > Otis > -- > Monitoring * Alerting * Anomaly Detection * Centralized Log Management > Solr & Elasticsearch Support * http://sematext.com/ > > > On Mon, Jan 26, 2015 at 7:37 PM, Jun Rao wrote: > > > Hmm, that's not the intention. The per-topic mbeans are definitely > > registered by Yammer. So, not sure why it's not reported to Graphite. > > > > Otis, Vladimir, > > > > Do you guys know? > > > > Thanks, > > > > Jun > > > > > > > > On Mon, Jan 26, 2015 at 4:08 PM, Kyle Banker > wrote: > > > > > This is still preliminary, but it looks as if the change to metric > names > > > for per-topic metrics (bytes/messages in/out) is preventing these > metrics > > > from being reported to Yammer/Graphite. If this isn't intentional, it > > > should probably be addressed before release. > > > > > > On Wed, Jan 21, 2015 at 9:28 AM, Jun Rao wrote: > > > > > > > This is the second candidate for release of Apache Kafka 0.8.2.0. > There > > > has > > > > been some changes since the 0.8.2 beta release, especially in the new > > > java > > > > producer api and jmx mbean names. It would be great if people can > test > > > this > > > > out thoroughly. > > > > > > > > Release Notes for the 0.8.2.0 release > > > > > > > > > > > > > > https://people.apache.org/~junrao/kafka-0.8.2.0-candidate2/RELEASE_NOTES.html > > > > > > > > *** Please download, test and vote by Monday, Jan 26h, 7pm PT > > > > > > > > Kafka's KEYS file containing PGP keys we use to sign the release: > > > > http://kafka.apache.org/KEYS in addition to the md5, sha1 and sha2 > > > > (SHA256) > > > > checksum. > > > > > > > > * Release artifacts to be voted upon (source and binary): > > > > https://people.apache.org/~junrao/kafka-0.8.2.0-candidate2/ > > > > > > > > * Maven artifacts to be voted upon prior to release: > > > > https://repository.apache.org/content/groups/staging/ > > > > > > > > * scala-doc > > > > https://people.apache.org/~junrao/kafka-0.8.2.0-candidate2/scaladoc/ > > > > > > > > * java-doc > > > > https://people.apache.org/~junrao/kafka-0.8.2.0-candidate2/javadoc/ > > > > > > > > * The tag to be voted upon (off the 0.8.2 branch) is the 0.8.2.0 tag > > > > > > > > > > > > > > https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=058d58adef2ab2787e49d8efeefd61bb3d32f99c > > > > (commit 0b312a6b9f0833d38eec434bfff4c647c1814564) > > > > > > > > /*** > > > > > > > > Thanks, > > > > > > > > Jun > > > > > > > > > >
Replication stop working
Hi, I am new to this forum and I am not sure this is the correct mailing list for sending question. If not, please let me know and I will stop. I am looking for help to resolve replication issue. Replication stopped working a while back. Kafka environment: Kafka 0.8.1.1, Centos 6.5, 7 node cluster, default replication-factor 2, 10 partition per topic. Initially each partition is residing on two different nodes. It has been this way for several months and working. Starting two weeks ago, two things happened. 1. one node's disk usage got to 100% and crashed kafka process. So we had to delete some *.log and *.index and restarted kafka process. 2. In another case, some other node's disk usage reached 90%. Someone deleted some *.log and *.index files without shutting down kafka process. This caused issues and kafka was unable to restarted. I had to delete all *.log and *.index on this node to bring kafka back online. Now replication is all broken. Most of the partition has only one leader and one in ISR, even though replication is setup with two broker ids. Whenever I shutdown kafka process on a node, whatever leader running on this node will get moved to another node that is defined in replication. After I restart kafka on this node, it will never become a follower and its data directory never get updated. I tried the following: 1. I had turned on TRACE/DEBUG level with kafka and zookeeper. I did not find anything that can help. 2. I also tried to manipulate replication configuration in zookeeper using zkCLI.sh, like adding a follower to ISR list. That did not initiate a fether process to make itself become a follower. 3. I also created new topic with replication working initially. But as soon as I shutdown kafka on one of its two nodes, that partition loses one replica in ISR and never come back. This confirms that it is reproducible. 4. I ran kafka preferred replication election tool to force re-election of leader. That did not do anything. It is like nothing happen to the cluster. 5. I added num.replica.fetchers=10 to server.properties and restarted kakfa. That did not do anything. Has anyone have any experience with this ? Or any advice where to look and what the next steps are for trouble-shooting ? There are only two things that I may have to do. 1. Shutdown all kafka and zookeeper and restart them. I really do not want to go this route unless I have to. I would like to identify the root cause of it and not to randomly restart the whole cluster. 2. Move all topics to another kafka cluster, and rebuild it. This will be very time consuming and a lot of changes in the application. Thanks. John Dong
Re: Missing Per-Topic BrokerTopicMetrics in v0.8.2.0
Jason, Kyle, Added comments to the jira. Let me know if they make sense. The dot convention is a bit tricky to follow since we allow dots in topic and clientId, etc. Also, we probably don't want to use a convention too specific for Graphite since other systems may have other conventions. Thanks, Jun On Tue, Jan 27, 2015 at 9:32 AM, Jason Rosenberg wrote: > I added a comment to the ticket. I think it will work getting data > disambiguated (didn't actually test end to end to graphite). > However, the naming scheme is not ideal for how metric ui's typically would > present the metric tree (e.g. jmx tag syntax doesn't really translate). > > Jason > > On Tue, Jan 27, 2015 at 11:19 AM, Jun Rao wrote: > > > Jason, Kyle, > > > > I created an 0.8.2 blocker > > https://issues.apache.org/jira/browse/KAFKA-1902 > > and attached a patch there. Could you test it out and see if it fixes the > > issue with the reporter? The patch adds tags as scope in MetricName. > > > > Thanks, > > > > Jun > > > > On Tue, Jan 27, 2015 at 7:39 AM, Jun Rao wrote: > > > > > Jason, > > > > > > So, this sounds like a real issue. Perhaps we can fix it just by > setting > > > the tag name as the scope. For example, for mbean kafka.server:type= > > > BrokerTopicMetrics,name=BytesInPerSec,topic=test, we can have > > > > > > group: "kafka.server" > > > type: "BrokerTopicMetrics" > > > name: "BytesInPerSec" > > > scope: "topic=test" > > > > > > Do you know if scope can have characters like "=" and "," (e.g., for > > scope > > > like "topic=test,partition=1")? > > > > > > The issue with using mytopic-BytesInPerSec as the name is what we are > > > trying to fix in kafka-1481. Topic name (and clientId, etc) can have > dash > > > in it and it's hard to parse. > > > > > > Thanks, > > > > > > Jun > > > > > > > > > > > > On Tue, Jan 27, 2015 at 6:30 AM, Jason Rosenberg > > wrote: > > > > > >> Remember multiple people have reported this issue. Per topic metrics > no > > >> longer appear in graphite (or in any system modeled after the yammer > > >> GraphiteReporter). They are not being seen as unique. > > >> > > >> While these metrics are registered in the registry as separate > > >> ‘MetricName’ > > >> instances (varying only by mbeanName), the GraphiteReporter sends the > > >> metrics to graphite using only the 4 fields I describe above. > > >> Consequently, > > >> multiple metrics in the registry get sent to graphite under the same > > name. > > >> Thus these metrics all end up in the same bucket in graphite, > trampling > > >> over each other making them useless. They aren’t ‘double counted’ so > > much > > >> as flapping between multiple independent values. > > >> > > >> We actually have our own Reporter class (based off the yammer > > >> GraphiteReporter). Our version sends metrics through kafka which is > then > > >> consumed downstream by multiple metric consumers. > > >> > > >> The ConsoleReporter isn’t useful for actually persisting metrics > > anywhere. > > >> It’s just iterating through all the (identically named metrics in the > > >> registry (save for the different mbeanNames))…. > > >> > > >> The mbeanName, as constructed, is not useful as a human readable > metric > > >> name, to be presented in a browsable tree of metrics, etc. The > > >> ‘group’:’type’:’name’:’scope’ are the pieces that matter. > > >> > > >> The fix here is to produce MetricName instances similar to 0.8.1.1, > etc. > > >> In > > >> this case, it should probably be something like: > > >> > > >> group: kafka.server > > >> type: BrokerTopicMetrics > > >> name: mytopic-BytesInPerSec > > >> group: > > >> > > >> Jason > > >> > > >> > > >> On Tue, Jan 27, 2015 at 7:26 AM, Manikumar Reddy < > ku...@nmsworks.co.in> > > >> wrote: > > >> > > >> > I have enabled yammer's ConsoleReporter and I am getting all the > > metrics > > >> > (including per-topic metrics). > > >> > > > >> > Yammer's MetricName object implements equals/hashcode methods using > > >> > mBeanName . We are constructing a unique mBeanName for each metric, > So > > >> we > > >> > are not missing/overwriting any metrics. > > >> > > > >> > Current confusion is due to MetricName.name(). This will be same > > >> > (BytesInPerSec) for both broker level and topic level metrics. We > need > > >> to > > >> > use MetricName.getMBeanName() to differentiate between broker level > > and > > >> > topic level metrics. > > >> > > > >> > 0.8.1 MBeanName: > > >> > > "kafka.server":type="BrokerTopicMetrics",name="AllTopicsBytesInPerSec" > > >> > > "kafka.server":type="BrokerTopicMetrics",name="MYTOPIC-BytesInPerSec" > > >> > > > >> > 0.8.2 MBeanName: > > >> > kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec > > >> > > kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=MYTOPIC > > >> > > > >> > > > >> > ConsoleReporter's O/P: > > >> > > > >> > BytesInPerSec: <- This is broker level > > >> > count = 1521 > > >> > mean rate = 3.63 bytes/s > > >> > 1-minute rate = 0.35 bytes/s
Re: SimpleConsumer leaks sockets on an UnresolvedAddressException
Here is the relevant stack trace: java.nio.channels.UnresolvedAddressException: null at sun.nio.ch.Net.checkAddress(Net.java:127) ~[na:1.7.0_55] at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:644) ~[na:1.7.0_55] at kafka.network.BlockingChannel.connect(Unknown Source) ~[kafka_2.10-0.8.0.jar:0.8.0] at kafka.consumer.SimpleConsumer.connect(Unknown Source) ~[kafka_2.10-0.8.0.jar:0.8.0] at kafka.consumer.SimpleConsumer.getOrMakeConnection(Unknown Source) ~[kafka_2.10-0.8.0.jar:0.8.0] at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(Unknown Source) ~[kafka_2.10-0.8.0.jar:0.8.0] at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Unknown Source) ~[kafka_2.10-0.8.0.jar:0.8.0] at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(Unknown Source) ~[kafka_2.10-0.8.0.jar:0.8.0] at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(Unknown Source) ~[kafka_2.10-0.8.0.jar:0.8.0] at kafka.metrics.KafkaTimer.time(Unknown Source) ~[kafka_2.10-0.8.0.jar:0.8.0] at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(Unknown Source) ~[kafka_2.10-0.8.0.jar:0.8.0] at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(Unknown Source) ~[kafka_2.10-0.8.0.jar:0.8.0] at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(Unknown Source) ~[kafka_2.10-0.8.0.jar:0.8.0] at kafka.metrics.KafkaTimer.time(Unknown Source) ~[kafka_2.10-0.8.0.jar:0.8.0] at kafka.consumer.SimpleConsumer.fetch(Unknown Source) ~[kafka_2.10-0.8.0.jar:0.8.0] at kafka.javaapi.consumer.SimpleConsumer.fetch(Unknown Source) ~[kafka_2.10-0.8.0.jar:0.8.0] On Tue, Jan 27, 2015 at 10:19 AM, Rajiv Kurian wrote: > I am using 0.8.1. The source is here: > https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/consumer/SimpleConsumer.scala > > Here is the definition of disconnect(): > private def disconnect() = { > if(blockingChannel.isConnected) { > debug("Disconnecting from " + host + ":" + port) > blockingChannel.disconnect() > } > } > It checks if blockingChannel.isConnected before calling > blockingChannel.disconnect(). I think if there is an > UnresolvedAddressException, the isConnected is never set and the > blockingChannel.disconnect() is never called. But by this point we have > already created a socket and will leak it. > > The same problem might be present in the connect method of the > BlockingChannel at > https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/network/BlockingChannel.scala. > Though its own disconnect method seems to check for both the connected: > > def disconnect() = lock synchronized { > // My comment: connected will not be set if we get an > UnresolvedAddressException but channel should NOT be null, so we will > probably still do the right thing. > if(connected || channel != null) { > // closing the main socket channel *should* close the read channel > // but let's do it to be sure. > swallow(channel.close()) > swallow(channel.socket.close()) > if(readChannel != null) swallow(readChannel.close()) > channel = null; readChannel = null; writeChannel = null > connected = false > } > } > > > > On Tue, Jan 27, 2015 at 9:03 AM, Guozhang Wang wrote: > >> Rajiv, >> >> Which version of Kafka are you using? I just checked SimpleConsumer's >> code, >> and in its close() function, disconnect() is called, which will close the >> socket. >> >> Guozhang >> >> >> On Mon, Jan 26, 2015 at 2:36 PM, Rajiv Kurian >> wrote: >> >> > Meant to write a run loop. >> > >> > void run() { >> > while (running) { >> > if (simpleConsumer == null) { >> > simpleConsumer = new SimpleConsumer(host, port, >> > (int) kafkaSocketTimeout, kafkaRExeiveBufferSize, clientName); >> > } >> > try { >> > // Do stuff with simpleConsumer. >> > } catch (Exception e) { >> > logger.error(e); // Assume UnresolvedAddressException. >> > if (consumer != null) { >> > simpleConsumer.close(); >> > simpleConsumer = null; >> > } >> > } >> > } >> > } >> > >> > On Mon, Jan 26, 2015 at 2:27 PM, Rajiv Kurian >> > wrote: >> > >> > > Here is my typical flow: >> > > void run() { >> > > if (simpleConsumer == null) { >> > > simpleConsumer = new SimpleConsumer(host, port, (int) >> > kafkaSocketTimeout, >> > > kafkaRExeiveBufferSize, clientName); >> > > } >> > > try { >> > > // Do stuff with simpleConsumer. >> > >} catch (Exception e) { >> > > if (consumer != null) { >> > >simpleConsumer.close(); >> > >simpleConsumer = null; >> > > } >> > > } >> > > } >> > > >> > > If there is a problem with the host name, or some DNS issues, we get >> an >> > > UnresolvedAddressException as expected and attempt to close the >>
Re: SimpleConsumer leaks sockets on an UnresolvedAddressException
I am using 0.8.1. The source is here: https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/consumer/SimpleConsumer.scala Here is the definition of disconnect(): private def disconnect() = { if(blockingChannel.isConnected) { debug("Disconnecting from " + host + ":" + port) blockingChannel.disconnect() } } It checks if blockingChannel.isConnected before calling blockingChannel.disconnect(). I think if there is an UnresolvedAddressException, the isConnected is never set and the blockingChannel.disconnect() is never called. But by this point we have already created a socket and will leak it. The same problem might be present in the connect method of the BlockingChannel at https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/network/BlockingChannel.scala. Though its own disconnect method seems to check for both the connected: def disconnect() = lock synchronized { // My comment: connected will not be set if we get an UnresolvedAddressException but channel should NOT be null, so we will probably still do the right thing. if(connected || channel != null) { // closing the main socket channel *should* close the read channel // but let's do it to be sure. swallow(channel.close()) swallow(channel.socket.close()) if(readChannel != null) swallow(readChannel.close()) channel = null; readChannel = null; writeChannel = null connected = false } } On Tue, Jan 27, 2015 at 9:03 AM, Guozhang Wang wrote: > Rajiv, > > Which version of Kafka are you using? I just checked SimpleConsumer's code, > and in its close() function, disconnect() is called, which will close the > socket. > > Guozhang > > > On Mon, Jan 26, 2015 at 2:36 PM, Rajiv Kurian > wrote: > > > Meant to write a run loop. > > > > void run() { > > while (running) { > > if (simpleConsumer == null) { > > simpleConsumer = new SimpleConsumer(host, port, > > (int) kafkaSocketTimeout, kafkaRExeiveBufferSize, clientName); > > } > > try { > > // Do stuff with simpleConsumer. > > } catch (Exception e) { > > logger.error(e); // Assume UnresolvedAddressException. > > if (consumer != null) { > > simpleConsumer.close(); > > simpleConsumer = null; > > } > > } > > } > > } > > > > On Mon, Jan 26, 2015 at 2:27 PM, Rajiv Kurian > > wrote: > > > > > Here is my typical flow: > > > void run() { > > > if (simpleConsumer == null) { > > > simpleConsumer = new SimpleConsumer(host, port, (int) > > kafkaSocketTimeout, > > > kafkaRExeiveBufferSize, clientName); > > > } > > > try { > > > // Do stuff with simpleConsumer. > > >} catch (Exception e) { > > > if (consumer != null) { > > >simpleConsumer.close(); > > >simpleConsumer = null; > > > } > > > } > > > } > > > > > > If there is a problem with the host name, or some DNS issues, we get an > > > UnresolvedAddressException as expected and attempt to close the > > > simpleConsumer. However this does not really get rid of the underlying > > > socket. So we end up leaking a FD every time this happens. Though this > is > > > not a common case I think there needs to be a way on the SimpleConsumer > > to > > > get rid of all OS resources that it is holding onto. Right now if this > > > keeps happening the number of FDs being consumed by the process keeps > > > increasing till we hit the OS limits. As a user I cannot do anything > else > > > but call simpleConsumer.close(). We need to be able to close the > > underlying > > > socketChannel/socket when this kind of an error happens. > > > > > > To reproduce, one can just run this code but just put in any garbage > host > > > name, running lsof -p while running this will show that the open FDs > > > increases without limit. > > > > > > Thanks, > > > Rajiv > > > > > > > > > > > > -- > -- Guozhang >
Consumers closing sockets abruptly?
On my brokers I am seeing this error log message: Closing socket for /X because of error (X is the ip address of a consumer) > 2015-01-27_17:32:58.29890 java.io.IOException: Connection reset by peer > 2015-01-27_17:32:58.29890 at > sun.nio.ch.FileChannelImpl.transferTo0(Native Method) > 2015-01-27_17:32:58.29891 at > sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:433) > 2015-01-27_17:32:58.29892 at > sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:565) > 2015-01-27_17:32:58.29892 at > kafka.log.FileMessageSet.writeTo(FileMessageSet.scala:147) > 2015-01-27_17:32:58.29893 at > kafka.api.PartitionDataSend.writeTo(FetchResponse.scala:69) > 2015-01-27_17:32:58.29893 at > kafka.network.MultiSend.writeTo(Transmission.scala:102) > 2015-01-27_17:32:58.29894 at > kafka.api.TopicDataSend.writeTo(FetchResponse.scala:124) > 2015-01-27_17:32:58.29895 at > kafka.network.MultiSend.writeTo(Transmission.scala:102) > 2015-01-27_17:32:58.29895 at > kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:219) > 2015-01-27_17:32:58.29896 at > kafka.network.Processor.write(SocketServer.scala:375) > 2015-01-27_17:32:58.29896 at > kafka.network.Processor.run(SocketServer.scala:247) > 2015-01-27_17:32:58.29897 at java.lang.Thread.run(Thread.java:745) > This is because the Processor doesn't handle java.io.IOException and it falls through to the catch all. My consumers seem actually really happy. So I don't think there is a real issue here. But I could use some help figuring out if there is. We are using the Java consumer like so: > final ConsumerConnector consumer = > kafka.consumer.Consumer.createJavaConsumerConnector(config); > Map topicCountMap = new HashMap(); > topicCountMap.put(topicName, new Integer(1)); > final Map>> consumerMap = > consumer.createMessageStreams(topicCountMap); > final KafkaStream stream = > consumerMap.get(topicName).get(0); > and we just iterate over the stream Questions: 1. What class is the one that makes the network connection to the consumer? 2. Do legit cases exist where the consumer would close its socket connection ? Zookeeper issues ? Consumer too far behind ?
Re: Can't create a topic; can't delete it either
Also, do you have delete.topic.enable=true on all brokers? The automatic topic creation can fail if the default number of replicas is greater than number of available brokers. Check the default.replication.factor parameter. Gwen On Tue, Jan 27, 2015 at 12:29 AM, Joel Koshy wrote: > Which version of the broker are you using? > > On Mon, Jan 26, 2015 at 10:27:14PM -0800, Sumit Rangwala wrote: >> While running kafka in production I found an issue where a topic wasn't >> getting created even with auto topic enabled. I then went ahead and created >> the topic manually (from the command line). I then delete the topic, again >> manually. Now my broker won't allow me to either create *the* topic or >> delete *the* topic. (other topic creation and deletion is working fine). >> >> The topic is in "marked for deletion" stage for more than 3 hours. >> >> $ bin/kafka-topics.sh --zookeeper zookeeper1:2181/replication/kafka --list >> --topic GRIFFIN-TldAdFormat.csv-1422321736886 >> GRIFFIN-TldAdFormat.csv-1422321736886 - marked for deletion >> >> If this is a known issue, is there a workaround? >> >> Sumit >
Re: Errors from ReassignPartitionsCommand
Allen, which version of Kafka are you using? And if you have multiple brokers, is there a controller migration happened before? Guozhang On Fri, Jan 23, 2015 at 3:56 PM, Allen Wang wrote: > Hello, > > We tried the ReassignPartitionsCommand to move partitions to new brokers. > The execution initially showed message "Successfully started reassignment > of partitions ...". But when I tried to verify using --verify option, it > reported some reassignments have failed: > > ERROR: Assigned replicas (0,5,2) don't match the list of replicas for > reassignment (0,5) for partition [vhs_playback_event,1] > ERROR: Assigned replicas (4,5,0,2) don't match the list of replicas for > reassignment (4,5) for partition [vhs_playback_event,11] > ERROR: Assigned replicas (3,5,0,2) don't match the list of replicas for > reassignment (3,5) for partition [vhs_playback_event,16] > > I noticed that the assigned replicas in the error messages include both old > assignment and new assignment. Is this a real error or just means > partitions are being copied and current state does not match the final > expected state? > > Since I was confused by the errors, I ran the same > ReassignPartitionsCommand with the same assignment again but got some > additional failure messages complaining that three partitions do not exist: > > [2015-01-23 18:15:41,333] ERROR Skipping reassignment of partition > [vhs_playback_event,16] since it doesn't exist > (kafka.admin.ReassignPartitionsCommand) > [2015-01-23 18:15:41,455] ERROR Skipping reassignment of partition > [vhs_playback_event,15] since it doesn't exist > (kafka.admin.ReassignPartitionsCommand) > [2015-01-23 18:15:41,499] ERROR Skipping reassignment of partition > [vhs_playback_event,17] since it doesn't exist > (kafka.admin.ReassignPartitionsCommand) > > These partitions later reappeared from the output of --verify. > > The other thing is that at one point the BytesOut from one broker exceeds > 100Mbytes, which is quite alarming. > > In the end, the reassignment was done according to the input file to > ReassignPartitionsCommand. But the UnderReplicatedPartitions for the > brokers keeps showing a positive number, even though the output of describe > topic command and ZooKeeper data show the ISRs are all in sync, and > Replica-MaxLag is 0. > > To sum up, the overall execution is successful but the error messages are > quite noisy and the metric is not consistent with what appears to be. > > Does anyone have the similar experience and is there anything we can do get > it done smoother? What can we do to reset the inconsistent > UnderReplicatedPartitions metric? > > Thanks, > Allen > -- -- Guozhang
Re: Missing Per-Topic BrokerTopicMetrics in v0.8.2.0
I added a comment to the ticket. I think it will work getting data disambiguated (didn't actually test end to end to graphite). However, the naming scheme is not ideal for how metric ui's typically would present the metric tree (e.g. jmx tag syntax doesn't really translate). Jason On Tue, Jan 27, 2015 at 11:19 AM, Jun Rao wrote: > Jason, Kyle, > > I created an 0.8.2 blocker > https://issues.apache.org/jira/browse/KAFKA-1902 > and attached a patch there. Could you test it out and see if it fixes the > issue with the reporter? The patch adds tags as scope in MetricName. > > Thanks, > > Jun > > On Tue, Jan 27, 2015 at 7:39 AM, Jun Rao wrote: > > > Jason, > > > > So, this sounds like a real issue. Perhaps we can fix it just by setting > > the tag name as the scope. For example, for mbean kafka.server:type= > > BrokerTopicMetrics,name=BytesInPerSec,topic=test, we can have > > > > group: "kafka.server" > > type: "BrokerTopicMetrics" > > name: "BytesInPerSec" > > scope: "topic=test" > > > > Do you know if scope can have characters like "=" and "," (e.g., for > scope > > like "topic=test,partition=1")? > > > > The issue with using mytopic-BytesInPerSec as the name is what we are > > trying to fix in kafka-1481. Topic name (and clientId, etc) can have dash > > in it and it's hard to parse. > > > > Thanks, > > > > Jun > > > > > > > > On Tue, Jan 27, 2015 at 6:30 AM, Jason Rosenberg > wrote: > > > >> Remember multiple people have reported this issue. Per topic metrics no > >> longer appear in graphite (or in any system modeled after the yammer > >> GraphiteReporter). They are not being seen as unique. > >> > >> While these metrics are registered in the registry as separate > >> ‘MetricName’ > >> instances (varying only by mbeanName), the GraphiteReporter sends the > >> metrics to graphite using only the 4 fields I describe above. > >> Consequently, > >> multiple metrics in the registry get sent to graphite under the same > name. > >> Thus these metrics all end up in the same bucket in graphite, trampling > >> over each other making them useless. They aren’t ‘double counted’ so > much > >> as flapping between multiple independent values. > >> > >> We actually have our own Reporter class (based off the yammer > >> GraphiteReporter). Our version sends metrics through kafka which is then > >> consumed downstream by multiple metric consumers. > >> > >> The ConsoleReporter isn’t useful for actually persisting metrics > anywhere. > >> It’s just iterating through all the (identically named metrics in the > >> registry (save for the different mbeanNames))…. > >> > >> The mbeanName, as constructed, is not useful as a human readable metric > >> name, to be presented in a browsable tree of metrics, etc. The > >> ‘group’:’type’:’name’:’scope’ are the pieces that matter. > >> > >> The fix here is to produce MetricName instances similar to 0.8.1.1, etc. > >> In > >> this case, it should probably be something like: > >> > >> group: kafka.server > >> type: BrokerTopicMetrics > >> name: mytopic-BytesInPerSec > >> group: > >> > >> Jason > >> > >> > >> On Tue, Jan 27, 2015 at 7:26 AM, Manikumar Reddy > >> wrote: > >> > >> > I have enabled yammer's ConsoleReporter and I am getting all the > metrics > >> > (including per-topic metrics). > >> > > >> > Yammer's MetricName object implements equals/hashcode methods using > >> > mBeanName . We are constructing a unique mBeanName for each metric, So > >> we > >> > are not missing/overwriting any metrics. > >> > > >> > Current confusion is due to MetricName.name(). This will be same > >> > (BytesInPerSec) for both broker level and topic level metrics. We need > >> to > >> > use MetricName.getMBeanName() to differentiate between broker level > and > >> > topic level metrics. > >> > > >> > 0.8.1 MBeanName: > >> > "kafka.server":type="BrokerTopicMetrics",name="AllTopicsBytesInPerSec" > >> > "kafka.server":type="BrokerTopicMetrics",name="MYTOPIC-BytesInPerSec" > >> > > >> > 0.8.2 MBeanName: > >> > kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec > >> > kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=MYTOPIC > >> > > >> > > >> > ConsoleReporter's O/P: > >> > > >> > BytesInPerSec: <- This is broker level > >> > count = 1521 > >> > mean rate = 3.63 bytes/s > >> > 1-minute rate = 0.35 bytes/s > >> > 5-minute rate = 2.07 bytes/s > >> > 15-minute rate = 1.25 bytes/s > >> > > >> > BytesInPerSec: <- This is for topic1 > >> > count = 626 > >> > mean rate = 1.89 bytes/s > >> > 1-minute rate = 0.42 bytes/s > >> > 5-minute rate = 31.53 bytes/s > >> > 15-minute rate = 64.66 bytes/s > >> > > >> > BytesInPerSec: <- This is for topic2 > >> > count = 895 > >> > mean rate = 3.62 bytes/s > >> > 1-minute rate = 1.39 bytes/s > >> > 5-minute rate = 30.08 bytes/s > >> > 15-minute rate = 50.27 bytes/s > >> > > >> > Manikumar > >> > > >> > On Tue, Jan 27, 2015 at 1:59 PM, Ja
Re: Missing Per-Topic BrokerTopicMetrics in v0.8.2.0
Thanks for the quick response, Jun (and many thanks to Jason for confirming and further investigating the issue). I've tested the patch, and it does fix the fundamental issue, but I do have a few comments that I'll leave on the ticket. On Tue, Jan 27, 2015 at 9:19 AM, Jun Rao wrote: > Jason, Kyle, > > I created an 0.8.2 blocker > https://issues.apache.org/jira/browse/KAFKA-1902 > and attached a patch there. Could you test it out and see if it fixes the > issue with the reporter? The patch adds tags as scope in MetricName. > > Thanks, > > Jun > > On Tue, Jan 27, 2015 at 7:39 AM, Jun Rao wrote: > > > Jason, > > > > So, this sounds like a real issue. Perhaps we can fix it just by setting > > the tag name as the scope. For example, for mbean kafka.server:type= > > BrokerTopicMetrics,name=BytesInPerSec,topic=test, we can have > > > > group: "kafka.server" > > type: "BrokerTopicMetrics" > > name: "BytesInPerSec" > > scope: "topic=test" > > > > Do you know if scope can have characters like "=" and "," (e.g., for > scope > > like "topic=test,partition=1")? > > > > The issue with using mytopic-BytesInPerSec as the name is what we are > > trying to fix in kafka-1481. Topic name (and clientId, etc) can have dash > > in it and it's hard to parse. > > > > Thanks, > > > > Jun > > > > > > > > On Tue, Jan 27, 2015 at 6:30 AM, Jason Rosenberg > wrote: > > > >> Remember multiple people have reported this issue. Per topic metrics no > >> longer appear in graphite (or in any system modeled after the yammer > >> GraphiteReporter). They are not being seen as unique. > >> > >> While these metrics are registered in the registry as separate > >> ‘MetricName’ > >> instances (varying only by mbeanName), the GraphiteReporter sends the > >> metrics to graphite using only the 4 fields I describe above. > >> Consequently, > >> multiple metrics in the registry get sent to graphite under the same > name. > >> Thus these metrics all end up in the same bucket in graphite, trampling > >> over each other making them useless. They aren’t ‘double counted’ so > much > >> as flapping between multiple independent values. > >> > >> We actually have our own Reporter class (based off the yammer > >> GraphiteReporter). Our version sends metrics through kafka which is then > >> consumed downstream by multiple metric consumers. > >> > >> The ConsoleReporter isn’t useful for actually persisting metrics > anywhere. > >> It’s just iterating through all the (identically named metrics in the > >> registry (save for the different mbeanNames))…. > >> > >> The mbeanName, as constructed, is not useful as a human readable metric > >> name, to be presented in a browsable tree of metrics, etc. The > >> ‘group’:’type’:’name’:’scope’ are the pieces that matter. > >> > >> The fix here is to produce MetricName instances similar to 0.8.1.1, etc. > >> In > >> this case, it should probably be something like: > >> > >> group: kafka.server > >> type: BrokerTopicMetrics > >> name: mytopic-BytesInPerSec > >> group: > >> > >> Jason > >> > >> > >> On Tue, Jan 27, 2015 at 7:26 AM, Manikumar Reddy > >> wrote: > >> > >> > I have enabled yammer's ConsoleReporter and I am getting all the > metrics > >> > (including per-topic metrics). > >> > > >> > Yammer's MetricName object implements equals/hashcode methods using > >> > mBeanName . We are constructing a unique mBeanName for each metric, So > >> we > >> > are not missing/overwriting any metrics. > >> > > >> > Current confusion is due to MetricName.name(). This will be same > >> > (BytesInPerSec) for both broker level and topic level metrics. We need > >> to > >> > use MetricName.getMBeanName() to differentiate between broker level > and > >> > topic level metrics. > >> > > >> > 0.8.1 MBeanName: > >> > "kafka.server":type="BrokerTopicMetrics",name="AllTopicsBytesInPerSec" > >> > "kafka.server":type="BrokerTopicMetrics",name="MYTOPIC-BytesInPerSec" > >> > > >> > 0.8.2 MBeanName: > >> > kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec > >> > kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=MYTOPIC > >> > > >> > > >> > ConsoleReporter's O/P: > >> > > >> > BytesInPerSec: <- This is broker level > >> > count = 1521 > >> > mean rate = 3.63 bytes/s > >> > 1-minute rate = 0.35 bytes/s > >> > 5-minute rate = 2.07 bytes/s > >> > 15-minute rate = 1.25 bytes/s > >> > > >> > BytesInPerSec: <- This is for topic1 > >> > count = 626 > >> > mean rate = 1.89 bytes/s > >> > 1-minute rate = 0.42 bytes/s > >> > 5-minute rate = 31.53 bytes/s > >> > 15-minute rate = 64.66 bytes/s > >> > > >> > BytesInPerSec: <- This is for topic2 > >> > count = 895 > >> > mean rate = 3.62 bytes/s > >> > 1-minute rate = 1.39 bytes/s > >> > 5-minute rate = 30.08 bytes/s > >> > 15-minute rate = 50.27 bytes/s > >> > > >> > Manikumar > >> > > >> > On Tue, Jan 27, 2015 at 1:59 PM, Jason Rosenberg > >> wrote: > >> > > >> > > Ok, >
Re: Kafka under replicated partitions..
Hi Nitin, a. The follower replica will be kicked out of the ISR (i.e. causing the partition to be under-replicated) when 1) it has lagged much behind the leader replica in terms of number of messages (controlled by config replica.lag.max.messages), or 2) it has not fetched from leader for some long time (controlled by config replica.socket.timeout.ms). Case 1) can usually be triggered when the producer's throughput to the leader replica is too high for the follower to keep up, or if one batch of producer's messages contains too many messages than the lag threshold. Case 2) can be triggered when the follower is faulty (long GC, etc). You can find the definitions of these configs here: http://kafka.apache.org/documentation.html#brokerconfigs b. For case 2) you usually need to restart the faulty follower, for case 1) once the producer throughput dropped to normal it should be able to catch up later. Also to prevent it happening you can tune these two configs a bit. c. You can use the topic command to list all under-replicated partitions in real time: http://kafka.apache.org/documentation.html#basic_ops_add_topic (use bin/kafka-topics.sh --list) Guozhang On Mon, Jan 26, 2015 at 12:40 PM, nitin sharma wrote: > Hi All, > > I would like to know the factors that can cause followers to fall behind > and under-replicated replica getting created? > > In my production system , last Friday one of the partitions fell behind and > before i could check my operation team had restarted the follower broker... > Restart did fixed the issue but i would like to know > > a. the factors that can cause under replicated partitions > b. how to fix that issue? is restart an option? > c. apart from M-Bean any other way to get to know under-replicated > partition.? > > > Regards, > Nitin Kumar Sharma. > -- -- Guozhang
Re: Kafka sending messages with zero copy
Thanks Rajiv, looking forward to your prototype. Guozhang On Mon, Jan 26, 2015 at 2:30 PM, Rajiv Kurian wrote: > Hi Guozhang, > > I am a bit busy at work. When I get the change I'll definitely try to get a > proof of concept going. Not the kafka protocol, but just the buffering and > threading structures, maybe just write to another socket. I think it would > be useful just to get the queueing and buffer management going and prove > that it can be done in a zero copy way in a multi producer single consumer > environment. If that is working, then the single consumer can be the kafka > network sync thread. > > Thanks, > Rajiv > > On Fri, Jan 16, 2015 at 11:58 AM, Guozhang Wang > wrote: > > > Hi Rajiv, > > > > Thanks for this proposal, it would be great if you can upload some > > implementation patch for the CAS idea and show some memory usage / perf > > differences. > > > > Guozhang > > > > On Sun, Dec 14, 2014 at 9:27 PM, Rajiv Kurian > > wrote: > > > > > Resuscitating this thread. I've done some more experiments and > profiling. > > > My messages are very tiny (currently 25 bytes) per message and creating > > > multiple objects per message leads to a lot of churn. The memory churn > > > through creation of convenience objects is more than the memory being > > used > > > by my objects right now. I could probably batch my messages further, to > > > make this effect less pronounced. I did some rather unscientific > > > experiments with a flyweight approach on top of the ByteBuffer for a > > simple > > > messaging API (peer to peer NIO based so not a real comparison) and the > > > numbers were very satisfactory and there is no garbage created in > steady > > > state at all. Though I don't expect such good numbers from actually > going > > > through the broker + all the other extra stuff that a real producer > would > > > do, I think there is great potential here. > > > > > > The general mechanism for me is this: > > > i) A buffer (I used Unsafe but I imagine ByteBuffer having similar > > > performance) is created per partition. > > > ii) A CAS loop (in Java 7 and less) or even better > unsafe.getAndAddInt() > > in > > > Java 8 can be used to claim a chunk of bytes on the per topic buffer. > > This > > > code can be invoked from multiple threads in a wait free manner > > (wait-free > > > in Java 8, since getAndAddInt() is wait-free). Once a region in the > > buffer > > > is claimed, it can be operated on using the flyweight method that we > > talked > > > about. If the buffer doesn't have enough space then we can drop the > > message > > > or move onto a new buffer. Further this creates absolutely zero objects > > in > > > steady state (only a few objects created in the beginning). Even if the > > > flyweight method is not desired, the API can just take byte arrays or > > > objects that need to be serialized and copy them onto the per topic > > buffers > > > in a similar way. This API has been validated in Aeron too, so I am > > pretty > > > confident that it will work well. For the zero copy technique here is a > > > link to Aeron API with zero copy - > > > https://github.com/real-logic/Aeron/issues/18. The regular one copies > > byte > > > arrays but without any object creation. > > > iii) The producer send thread can then just go in FIFO order through > the > > > buffer sending messages that have been committed using NIO to rotate > > > between brokers. We might need a background thread to zero out used > > buffers > > > too. > > > > > > I've left out some details, but again none of this very revolutionary - > > > it's mostly the same techniques used in Aeron. I really think that we > can > > > keep the API ga rbage free and wait-free (even in the multi producer > > case) > > > without compromising how pretty it looks - the total zero copy API will > > low > > > level, but it should only be used by advanced users. Moreover the usual > > > producer.send(msg, topic, partition) can use the efficient ByteBuffer > > > offset API internally without it itself creating any garbage. With the > > > technique I talked about there is no need for an intermediate queue of > > any > > > kind since the underlying ByteBuffer per partition acts as the queue. > > > > > > I can do more experiments with some real producer code instead of my > toy > > > code to further validate the idea, but I am pretty sure that both > > > throughput and jitter characteristics will improve thanks to lower > > > contention (wait-free in java 8 with a single getAndAddInt() operation > > for > > > sync ) and better cache locality (C like buffers and a few constant > > number > > > of objects per partition). If you guys are interested, I'd love to talk > > > more. Again just to reiterate, I don't think the API will suffer at > all - > > > most of this can be done under the covers. Additionally it will open up > > > things so that a low level zero copy API is possible. > > > > > > Thanks, > > > Rajiv > > > > > > > > > > > -- > > -- Guozhang >
Re: SimpleConsumer leaks sockets on an UnresolvedAddressException
Rajiv, Which version of Kafka are you using? I just checked SimpleConsumer's code, and in its close() function, disconnect() is called, which will close the socket. Guozhang On Mon, Jan 26, 2015 at 2:36 PM, Rajiv Kurian wrote: > Meant to write a run loop. > > void run() { > while (running) { > if (simpleConsumer == null) { > simpleConsumer = new SimpleConsumer(host, port, > (int) kafkaSocketTimeout, kafkaRExeiveBufferSize, clientName); > } > try { > // Do stuff with simpleConsumer. > } catch (Exception e) { > logger.error(e); // Assume UnresolvedAddressException. > if (consumer != null) { > simpleConsumer.close(); > simpleConsumer = null; > } > } > } > } > > On Mon, Jan 26, 2015 at 2:27 PM, Rajiv Kurian > wrote: > > > Here is my typical flow: > > void run() { > > if (simpleConsumer == null) { > > simpleConsumer = new SimpleConsumer(host, port, (int) > kafkaSocketTimeout, > > kafkaRExeiveBufferSize, clientName); > > } > > try { > > // Do stuff with simpleConsumer. > >} catch (Exception e) { > > if (consumer != null) { > >simpleConsumer.close(); > >simpleConsumer = null; > > } > > } > > } > > > > If there is a problem with the host name, or some DNS issues, we get an > > UnresolvedAddressException as expected and attempt to close the > > simpleConsumer. However this does not really get rid of the underlying > > socket. So we end up leaking a FD every time this happens. Though this is > > not a common case I think there needs to be a way on the SimpleConsumer > to > > get rid of all OS resources that it is holding onto. Right now if this > > keeps happening the number of FDs being consumed by the process keeps > > increasing till we hit the OS limits. As a user I cannot do anything else > > but call simpleConsumer.close(). We need to be able to close the > underlying > > socketChannel/socket when this kind of an error happens. > > > > To reproduce, one can just run this code but just put in any garbage host > > name, running lsof -p while running this will show that the open FDs > > increases without limit. > > > > Thanks, > > Rajiv > > > > > -- -- Guozhang
Re: Kafka consumer connection error
Mahesh, Could you reformat your attached logs? Guozhang On Mon, Jan 26, 2015 at 8:08 PM, Mahesh Kumar wrote: > Hi all, I am currently working on logstash to mongodb.logstash's > input (producer) is kafka and output (consumer) is mongodb.It is worked > fine for past two days.Today it is not working and i got the following > error.i googled this error,still didnt find any correct solution.kindly > help me to resolve. > [2015-01-24 16:16:13,196] INFO [ReplicaFetcherManager on broker 0] Removed > fetcher for partitions [kafka3,0],[kafka3,1] > (kafka.server.ReplicaFetcherManager)[2015-01-24 16:16:17,443] ERROR Closing > socket for /10.10.10.25 because of error > (kafka.network.Processor)java.io.IOException: Connection reset by peerat > sun.nio.ch.FileDispatcherImpl.read0(Native Method)at > sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)at > sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)at > sun.nio.ch.IOUtil.read(IOUtil.java:197)at > sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)at > kafka.utils.Utils$.read(Utils.scala:375)at > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)at > kafka.network.Processor.read(SocketServer.scala:347)at > kafka.network.Processor.run(SocketServer.scala:245)at > java.lang.Thread.run(Thread.java:744)[2015-01-24 16:16:55,956] INFO Closing > socket connection to /10.10.10.25. (kafka.network.Processor)[2015-01-24 > 16:19:49,285] ERROR Closing socket for /10.10.10.25 because of error > (kafka.network.Processor)java.io.IOException: Connection reset by peerat > sun.nio.ch.FileDispatcherImpl.read0(Native Method)at > sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)at > sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)at > sun.nio.ch.IOUtil.read(IOUtil.java:197)at > sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)at > kafka.utils.Utils$.read(Utils.scala:375)at > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)at > kafka.network.Processor.read(SocketServer.scala:347)at > kafka.network.Processor.run(SocketServer.scala:245)at > java.lang.Thread.run(Thread.java:744)[2015-01-24 16:20:18,100] INFO Closing > socket connection to /10.10.10.25. (kafka.network.Processor)[2015-01-24 > 16:21:30,156] ERROR Closing socket for /10.10.10.25 because of error > (kafka.network.Processor)java.io.IOException: Connection reset by peerat > sun.nio.ch.FileDispatcherImpl.read0(Native Method)at > sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)at > sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)at > sun.nio.ch.IOUtil.read(IOUtil.java:197)at > sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)at > kafka.utils.Utils$.read(Utils.scala:375)at > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)at > kafka.network.Processor.read(SocketServer.scala:347)at > kafka.network.Processor.run(SocketServer.scala:245)at > java.lang.Thread.run(Thread.java:744)[2015-01-24 17:49:56,224] INFO Closing > socket connection to /10.10.10.25. (kafka.network.Processor) > ThanksMahesh.S -- -- Guozhang
Re: CommitOffsets for Simple consumer is returning EOF Exception
Hi Ajeet, Which version of Kafka are you using? I remember the OffsetCommitRequest's requestInfo should be a map of topicPartition -> OffsetAndMetadata, not OffsetMetadataAndError. Guozhang On Tue, Jan 27, 2015 at 3:31 AM, ajeet singh wrote: > Hi , > > I am new to Kafka, I have a use case in which My Consumer can't use auto > commit offset feature, I have to go with option of manual Commit. With High > level Consumer I have have constrain that consumer can commit only current > offset, but in my case I will be committing some previous off-set value. > > So only possible solution seems like to use Simple Consumer. This is how I > am using Simple Consumer for Commit offset : > > TopicAndPartition topicAndPartition = new > TopicAndPartition(topic,partition); > Map requestInfo = new > HashMap(); > requestInfo.put(topicAndPartition, new > OffsetMetadataAndError(0L,"no_metadata", (short) 0)); > kafka.javaapi.OffsetCommitRequest request = new > kafka.javaapi.OffsetCommitRequest("test", > requestInfo1,kafka.api.OffsetRequest.CurrentVersion(), 0, clientName); > kafka.javaapi.OffsetCommitResponse response = > consumer.commitOffsets(request); > > > I am getting EOFException > > Oops:java.io.EOFException: Received -1 when reading from channel, socket > has likely been closed. > java.io.EOFException: Received -1 when reading from channel, socket has > likely been closed. > at kafka.utils.Utils$.read(Utils.scala:376) > at > > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) > at kafka.network.Receive$class.readCompletely(Transmission.scala:56) > at > > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) > at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81) > at > > kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71) > at kafka.consumer.SimpleConsumer.fetchOffsets(SimpleConsumer.scala:138) > at > kafka.javaapi.consumer.SimpleConsumer.fetchOffsets(SimpleConsumer.scala:99) > at > > com.vizury.rtb.realtimelogging.OfflineLogConsumer.commitOffsetTest(OfflineLogConsumer.java:205) > at > > com.vizury.rtb.realtimelogging.OfflineLogConsumer.run(OfflineLogConsumer.java:147) > at > > com.vizury.rtb.realtimelogging.OfflineLogConsumer.main(OfflineLogConsumer.java:31) > > > Any help ?? same error I am getting with fetchOffsets() method, where as > getOffsetsBefore() is working fine. > -- -- Guozhang
Re: Missing Per-Topic BrokerTopicMetrics in v0.8.2.0
Jason, Kyle, I created an 0.8.2 blocker https://issues.apache.org/jira/browse/KAFKA-1902 and attached a patch there. Could you test it out and see if it fixes the issue with the reporter? The patch adds tags as scope in MetricName. Thanks, Jun On Tue, Jan 27, 2015 at 7:39 AM, Jun Rao wrote: > Jason, > > So, this sounds like a real issue. Perhaps we can fix it just by setting > the tag name as the scope. For example, for mbean kafka.server:type= > BrokerTopicMetrics,name=BytesInPerSec,topic=test, we can have > > group: "kafka.server" > type: "BrokerTopicMetrics" > name: "BytesInPerSec" > scope: "topic=test" > > Do you know if scope can have characters like "=" and "," (e.g., for scope > like "topic=test,partition=1")? > > The issue with using mytopic-BytesInPerSec as the name is what we are > trying to fix in kafka-1481. Topic name (and clientId, etc) can have dash > in it and it's hard to parse. > > Thanks, > > Jun > > > > On Tue, Jan 27, 2015 at 6:30 AM, Jason Rosenberg wrote: > >> Remember multiple people have reported this issue. Per topic metrics no >> longer appear in graphite (or in any system modeled after the yammer >> GraphiteReporter). They are not being seen as unique. >> >> While these metrics are registered in the registry as separate >> ‘MetricName’ >> instances (varying only by mbeanName), the GraphiteReporter sends the >> metrics to graphite using only the 4 fields I describe above. >> Consequently, >> multiple metrics in the registry get sent to graphite under the same name. >> Thus these metrics all end up in the same bucket in graphite, trampling >> over each other making them useless. They aren’t ‘double counted’ so much >> as flapping between multiple independent values. >> >> We actually have our own Reporter class (based off the yammer >> GraphiteReporter). Our version sends metrics through kafka which is then >> consumed downstream by multiple metric consumers. >> >> The ConsoleReporter isn’t useful for actually persisting metrics anywhere. >> It’s just iterating through all the (identically named metrics in the >> registry (save for the different mbeanNames))…. >> >> The mbeanName, as constructed, is not useful as a human readable metric >> name, to be presented in a browsable tree of metrics, etc. The >> ‘group’:’type’:’name’:’scope’ are the pieces that matter. >> >> The fix here is to produce MetricName instances similar to 0.8.1.1, etc. >> In >> this case, it should probably be something like: >> >> group: kafka.server >> type: BrokerTopicMetrics >> name: mytopic-BytesInPerSec >> group: >> >> Jason >> >> >> On Tue, Jan 27, 2015 at 7:26 AM, Manikumar Reddy >> wrote: >> >> > I have enabled yammer's ConsoleReporter and I am getting all the metrics >> > (including per-topic metrics). >> > >> > Yammer's MetricName object implements equals/hashcode methods using >> > mBeanName . We are constructing a unique mBeanName for each metric, So >> we >> > are not missing/overwriting any metrics. >> > >> > Current confusion is due to MetricName.name(). This will be same >> > (BytesInPerSec) for both broker level and topic level metrics. We need >> to >> > use MetricName.getMBeanName() to differentiate between broker level and >> > topic level metrics. >> > >> > 0.8.1 MBeanName: >> > "kafka.server":type="BrokerTopicMetrics",name="AllTopicsBytesInPerSec" >> > "kafka.server":type="BrokerTopicMetrics",name="MYTOPIC-BytesInPerSec" >> > >> > 0.8.2 MBeanName: >> > kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec >> > kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=MYTOPIC >> > >> > >> > ConsoleReporter's O/P: >> > >> > BytesInPerSec: <- This is broker level >> > count = 1521 >> > mean rate = 3.63 bytes/s >> > 1-minute rate = 0.35 bytes/s >> > 5-minute rate = 2.07 bytes/s >> > 15-minute rate = 1.25 bytes/s >> > >> > BytesInPerSec: <- This is for topic1 >> > count = 626 >> > mean rate = 1.89 bytes/s >> > 1-minute rate = 0.42 bytes/s >> > 5-minute rate = 31.53 bytes/s >> > 15-minute rate = 64.66 bytes/s >> > >> > BytesInPerSec: <- This is for topic2 >> > count = 895 >> > mean rate = 3.62 bytes/s >> > 1-minute rate = 1.39 bytes/s >> > 5-minute rate = 30.08 bytes/s >> > 15-minute rate = 50.27 bytes/s >> > >> > Manikumar >> > >> > On Tue, Jan 27, 2015 at 1:59 PM, Jason Rosenberg >> wrote: >> > >> > > Ok, >> > > >> > > It looks like the yammer MetricName is not being created correctly for >> > the >> > > sub metrics that include a topic. E.g. a metric with an mbeanName >> like: >> > > >> > > >> > >> kafka.server:type=BrokerTopicMetrics,name=BytesRejectedPerSec,topic=mytopic >> > > >> > > appears to be malformed. A yammer MetricName has 4 fields that are >> used >> > in >> > > creating a graphite metric, that are included in the constructor: >> > > group, type, name, scope. >> > > >> > > In this case, the metric with the above mbeanName has these fields >> s
Re: Missing Per-Topic BrokerTopicMetrics in v0.8.2.0
Jason, So, this sounds like a real issue. Perhaps we can fix it just by setting the tag name as the scope. For example, for mbean kafka.server:type= BrokerTopicMetrics,name=BytesInPerSec,topic=test, we can have group: "kafka.server" type: "BrokerTopicMetrics" name: "BytesInPerSec" scope: "topic=test" Do you know if scope can have characters like "=" and "," (e.g., for scope like "topic=test,partition=1")? The issue with using mytopic-BytesInPerSec as the name is what we are trying to fix in kafka-1481. Topic name (and clientId, etc) can have dash in it and it's hard to parse. Thanks, Jun On Tue, Jan 27, 2015 at 6:30 AM, Jason Rosenberg wrote: > Remember multiple people have reported this issue. Per topic metrics no > longer appear in graphite (or in any system modeled after the yammer > GraphiteReporter). They are not being seen as unique. > > While these metrics are registered in the registry as separate ‘MetricName’ > instances (varying only by mbeanName), the GraphiteReporter sends the > metrics to graphite using only the 4 fields I describe above. Consequently, > multiple metrics in the registry get sent to graphite under the same name. > Thus these metrics all end up in the same bucket in graphite, trampling > over each other making them useless. They aren’t ‘double counted’ so much > as flapping between multiple independent values. > > We actually have our own Reporter class (based off the yammer > GraphiteReporter). Our version sends metrics through kafka which is then > consumed downstream by multiple metric consumers. > > The ConsoleReporter isn’t useful for actually persisting metrics anywhere. > It’s just iterating through all the (identically named metrics in the > registry (save for the different mbeanNames))…. > > The mbeanName, as constructed, is not useful as a human readable metric > name, to be presented in a browsable tree of metrics, etc. The > ‘group’:’type’:’name’:’scope’ are the pieces that matter. > > The fix here is to produce MetricName instances similar to 0.8.1.1, etc. In > this case, it should probably be something like: > > group: kafka.server > type: BrokerTopicMetrics > name: mytopic-BytesInPerSec > group: > > Jason > > > On Tue, Jan 27, 2015 at 7:26 AM, Manikumar Reddy > wrote: > > > I have enabled yammer's ConsoleReporter and I am getting all the metrics > > (including per-topic metrics). > > > > Yammer's MetricName object implements equals/hashcode methods using > > mBeanName . We are constructing a unique mBeanName for each metric, So we > > are not missing/overwriting any metrics. > > > > Current confusion is due to MetricName.name(). This will be same > > (BytesInPerSec) for both broker level and topic level metrics. We need to > > use MetricName.getMBeanName() to differentiate between broker level and > > topic level metrics. > > > > 0.8.1 MBeanName: > > "kafka.server":type="BrokerTopicMetrics",name="AllTopicsBytesInPerSec" > > "kafka.server":type="BrokerTopicMetrics",name="MYTOPIC-BytesInPerSec" > > > > 0.8.2 MBeanName: > > kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec > > kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=MYTOPIC > > > > > > ConsoleReporter's O/P: > > > > BytesInPerSec: <- This is broker level > > count = 1521 > > mean rate = 3.63 bytes/s > > 1-minute rate = 0.35 bytes/s > > 5-minute rate = 2.07 bytes/s > > 15-minute rate = 1.25 bytes/s > > > > BytesInPerSec: <- This is for topic1 > > count = 626 > > mean rate = 1.89 bytes/s > > 1-minute rate = 0.42 bytes/s > > 5-minute rate = 31.53 bytes/s > > 15-minute rate = 64.66 bytes/s > > > > BytesInPerSec: <- This is for topic2 > > count = 895 > > mean rate = 3.62 bytes/s > > 1-minute rate = 1.39 bytes/s > > 5-minute rate = 30.08 bytes/s > > 15-minute rate = 50.27 bytes/s > > > > Manikumar > > > > On Tue, Jan 27, 2015 at 1:59 PM, Jason Rosenberg > wrote: > > > > > Ok, > > > > > > It looks like the yammer MetricName is not being created correctly for > > the > > > sub metrics that include a topic. E.g. a metric with an mbeanName like: > > > > > > > > > kafka.server:type=BrokerTopicMetrics,name=BytesRejectedPerSec,topic=mytopic > > > > > > appears to be malformed. A yammer MetricName has 4 fields that are used > > in > > > creating a graphite metric, that are included in the constructor: > > > group, type, name, scope. > > > > > > In this case, the metric with the above mbeanName has these fields set > in > > > the MetricName: > > > > > > group: "kafka.server" > > > type: "BrokerTopicMetrics" > > > name: "BytesInPerSec" > > > scope: null > > > > > > Thus, the topic metrics all look the same, and get lumped into the > > > top-level BrokerTopicMetrics (and thus that will now be double > counted). > > It > > > looks like the fix for kafka-1481 was where things got broken. It seems > > to > > > have introduced ‘tags’ in the building of metric names, and then t
CommitOffsets for Simple consumer is returning EOF Exception
Hi , I am new to Kafka, I have a use case in which My Consumer can't use auto commit offset feature, I have to go with option of manual Commit. With High level Consumer I have have constrain that consumer can commit only current offset, but in my case I will be committing some previous off-set value. So only possible solution seems like to use Simple Consumer. This is how I am using Simple Consumer for Commit offset : TopicAndPartition topicAndPartition = new TopicAndPartition(topic,partition); Map requestInfo = new HashMap(); requestInfo.put(topicAndPartition, new OffsetMetadataAndError(0L,"no_metadata", (short) 0)); kafka.javaapi.OffsetCommitRequest request = new kafka.javaapi.OffsetCommitRequest("test", requestInfo1,kafka.api.OffsetRequest.CurrentVersion(), 0, clientName); kafka.javaapi.OffsetCommitResponse response = consumer.commitOffsets(request); I am getting EOFException Oops:java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. at kafka.utils.Utils$.read(Utils.scala:376) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71) at kafka.consumer.SimpleConsumer.fetchOffsets(SimpleConsumer.scala:138) at kafka.javaapi.consumer.SimpleConsumer.fetchOffsets(SimpleConsumer.scala:99) at com.vizury.rtb.realtimelogging.OfflineLogConsumer.commitOffsetTest(OfflineLogConsumer.java:205) at com.vizury.rtb.realtimelogging.OfflineLogConsumer.run(OfflineLogConsumer.java:147) at com.vizury.rtb.realtimelogging.OfflineLogConsumer.main(OfflineLogConsumer.java:31) Any help ?? same error I am getting with fetchOffsets() method, where as getOffsetsBefore() is working fine.
subscribe kafka user subject email
Re: Missing Per-Topic BrokerTopicMetrics in v0.8.2.0
Remember multiple people have reported this issue. Per topic metrics no longer appear in graphite (or in any system modeled after the yammer GraphiteReporter). They are not being seen as unique. While these metrics are registered in the registry as separate ‘MetricName’ instances (varying only by mbeanName), the GraphiteReporter sends the metrics to graphite using only the 4 fields I describe above. Consequently, multiple metrics in the registry get sent to graphite under the same name. Thus these metrics all end up in the same bucket in graphite, trampling over each other making them useless. They aren’t ‘double counted’ so much as flapping between multiple independent values. We actually have our own Reporter class (based off the yammer GraphiteReporter). Our version sends metrics through kafka which is then consumed downstream by multiple metric consumers. The ConsoleReporter isn’t useful for actually persisting metrics anywhere. It’s just iterating through all the (identically named metrics in the registry (save for the different mbeanNames))…. The mbeanName, as constructed, is not useful as a human readable metric name, to be presented in a browsable tree of metrics, etc. The ‘group’:’type’:’name’:’scope’ are the pieces that matter. The fix here is to produce MetricName instances similar to 0.8.1.1, etc. In this case, it should probably be something like: group: kafka.server type: BrokerTopicMetrics name: mytopic-BytesInPerSec group: Jason On Tue, Jan 27, 2015 at 7:26 AM, Manikumar Reddy wrote: > I have enabled yammer's ConsoleReporter and I am getting all the metrics > (including per-topic metrics). > > Yammer's MetricName object implements equals/hashcode methods using > mBeanName . We are constructing a unique mBeanName for each metric, So we > are not missing/overwriting any metrics. > > Current confusion is due to MetricName.name(). This will be same > (BytesInPerSec) for both broker level and topic level metrics. We need to > use MetricName.getMBeanName() to differentiate between broker level and > topic level metrics. > > 0.8.1 MBeanName: > "kafka.server":type="BrokerTopicMetrics",name="AllTopicsBytesInPerSec" > "kafka.server":type="BrokerTopicMetrics",name="MYTOPIC-BytesInPerSec" > > 0.8.2 MBeanName: > kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec > kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=MYTOPIC > > > ConsoleReporter's O/P: > > BytesInPerSec: <- This is broker level > count = 1521 > mean rate = 3.63 bytes/s > 1-minute rate = 0.35 bytes/s > 5-minute rate = 2.07 bytes/s > 15-minute rate = 1.25 bytes/s > > BytesInPerSec: <- This is for topic1 > count = 626 > mean rate = 1.89 bytes/s > 1-minute rate = 0.42 bytes/s > 5-minute rate = 31.53 bytes/s > 15-minute rate = 64.66 bytes/s > > BytesInPerSec: <- This is for topic2 > count = 895 > mean rate = 3.62 bytes/s > 1-minute rate = 1.39 bytes/s > 5-minute rate = 30.08 bytes/s > 15-minute rate = 50.27 bytes/s > > Manikumar > > On Tue, Jan 27, 2015 at 1:59 PM, Jason Rosenberg wrote: > > > Ok, > > > > It looks like the yammer MetricName is not being created correctly for > the > > sub metrics that include a topic. E.g. a metric with an mbeanName like: > > > > > kafka.server:type=BrokerTopicMetrics,name=BytesRejectedPerSec,topic=mytopic > > > > appears to be malformed. A yammer MetricName has 4 fields that are used > in > > creating a graphite metric, that are included in the constructor: > > group, type, name, scope. > > > > In this case, the metric with the above mbeanName has these fields set in > > the MetricName: > > > > group: "kafka.server" > > type: "BrokerTopicMetrics" > > name: "BytesInPerSec" > > scope: null > > > > Thus, the topic metrics all look the same, and get lumped into the > > top-level BrokerTopicMetrics (and thus that will now be double counted). > It > > looks like the fix for kafka-1481 was where things got broken. It seems > to > > have introduced ‘tags’ in the building of metric names, and then those > tags > > only get applied to the mbeanName, but get excluded from the metric name: > > > > > https://github.com/apache/kafka/commit/457744a820d806e546edebbd8ffd33f6772e519f > > > > This is a pretty severe issue, since the yammer metrics for these stats > > will be double counted in aggregate, and the per-topic stats will be > > removed. > > > > I should note too, in my previous email, I thought that only the > per-topic > > BrokerTopicMetrics were missing, but also several other per-topic metrics > > are missing too, e.g. under kafka.log, etc. > > > > Jason > > > > > > On Tue, Jan 27, 2015 at 2:20 AM, Jason Rosenberg > wrote: > > > > > I can confirm that the per topic metrics are not coming through to the > > > yammer metrics registry. I do see them in jmx (via jconsole), but the > > > MetricsRegistry does not have them. > > > All the other metrics are coming throug
Re: Missing Per-Topic BrokerTopicMetrics in v0.8.2.0
I have enabled yammer's ConsoleReporter and I am getting all the metrics (including per-topic metrics). Yammer's MetricName object implements equals/hashcode methods using mBeanName . We are constructing a unique mBeanName for each metric, So we are not missing/overwriting any metrics. Current confusion is due to MetricName.name(). This will be same (BytesInPerSec) for both broker level and topic level metrics. We need to use MetricName.getMBeanName() to differentiate between broker level and topic level metrics. 0.8.1 MBeanName: "kafka.server":type="BrokerTopicMetrics",name="AllTopicsBytesInPerSec" "kafka.server":type="BrokerTopicMetrics",name="MYTOPIC-BytesInPerSec" 0.8.2 MBeanName: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=MYTOPIC ConsoleReporter's O/P: BytesInPerSec: <- This is broker level count = 1521 mean rate = 3.63 bytes/s 1-minute rate = 0.35 bytes/s 5-minute rate = 2.07 bytes/s 15-minute rate = 1.25 bytes/s BytesInPerSec: <- This is for topic1 count = 626 mean rate = 1.89 bytes/s 1-minute rate = 0.42 bytes/s 5-minute rate = 31.53 bytes/s 15-minute rate = 64.66 bytes/s BytesInPerSec: <- This is for topic2 count = 895 mean rate = 3.62 bytes/s 1-minute rate = 1.39 bytes/s 5-minute rate = 30.08 bytes/s 15-minute rate = 50.27 bytes/s Manikumar On Tue, Jan 27, 2015 at 1:59 PM, Jason Rosenberg wrote: > Ok, > > It looks like the yammer MetricName is not being created correctly for the > sub metrics that include a topic. E.g. a metric with an mbeanName like: > > kafka.server:type=BrokerTopicMetrics,name=BytesRejectedPerSec,topic=mytopic > > appears to be malformed. A yammer MetricName has 4 fields that are used in > creating a graphite metric, that are included in the constructor: > group, type, name, scope. > > In this case, the metric with the above mbeanName has these fields set in > the MetricName: > > group: "kafka.server" > type: "BrokerTopicMetrics" > name: "BytesInPerSec" > scope: null > > Thus, the topic metrics all look the same, and get lumped into the > top-level BrokerTopicMetrics (and thus that will now be double counted). It > looks like the fix for kafka-1481 was where things got broken. It seems to > have introduced ‘tags’ in the building of metric names, and then those tags > only get applied to the mbeanName, but get excluded from the metric name: > > https://github.com/apache/kafka/commit/457744a820d806e546edebbd8ffd33f6772e519f > > This is a pretty severe issue, since the yammer metrics for these stats > will be double counted in aggregate, and the per-topic stats will be > removed. > > I should note too, in my previous email, I thought that only the per-topic > BrokerTopicMetrics were missing, but also several other per-topic metrics > are missing too, e.g. under kafka.log, etc. > > Jason > > > On Tue, Jan 27, 2015 at 2:20 AM, Jason Rosenberg wrote: > > > I can confirm that the per topic metrics are not coming through to the > > yammer metrics registry. I do see them in jmx (via jconsole), but the > > MetricsRegistry does not have them. > > All the other metrics are coming through that appear in jmx. > > > > This is with single node instance running locally. > > > > Jason > > > > > > > > On Mon, Jan 26, 2015 at 8:30 PM, Manikumar Reddy > > wrote: > > > >> If you are using multi-node cluster, then metrics may be reported from > >> other servers. > >> pl check all the servers in the cluster. > >> > >> On Tue, Jan 27, 2015 at 4:12 AM, Kyle Banker > >> wrote: > >> > >> > I've been using a custom KafkaMetricsReporter to report Kafka broker > >> > metrics to Graphite. In v0.8.1.1, Kafka was reporting bytes and > >> messages in > >> > and out for all topics together and for each individual topic. > >> > > >> > After upgrading to v0.8.2.0, these metrics are no longer being > reported. > >> > > >> > I'm only seeing the following: > >> > BrokerTopicMetrics > >> > - BytesInPerSec > >> > - BytesOutPerSec > >> > - BytesRejectedPerSec > >> > - MessagesInPerSec > >> > > >> > What's more, despite lots of successful writes to the cluster, the > >> values > >> > for these remaining metrics are all zero. > >> > > >> > I saw that there was some refactoring of metric naming code. Was the > >> > behavior supposed to have changed? > >> > > >> > Many thanks in advance. > >> > > >> > > > > >
Re: Missing Per-Topic BrokerTopicMetrics in v0.8.2.0
Is it actually getting double-counted? I tried reproducing this locally but the BrokerTopicMetrics.Count lines up with the sum of the PerTopic.Counts for various metrics. On Tue, Jan 27, 2015 at 03:29:37AM -0500, Jason Rosenberg wrote: > Ok, > > It looks like the yammer MetricName is not being created correctly for the > sub metrics that include a topic. E.g. a metric with an mbeanName like: > > kafka.server:type=BrokerTopicMetrics,name=BytesRejectedPerSec,topic=mytopic > > appears to be malformed. A yammer MetricName has 4 fields that are used in > creating a graphite metric, that are included in the constructor: > group, type, name, scope. > > In this case, the metric with the above mbeanName has these fields set in > the MetricName: > > group: "kafka.server" > type: "BrokerTopicMetrics" > name: "BytesInPerSec" > scope: null > > Thus, the topic metrics all look the same, and get lumped into the > top-level BrokerTopicMetrics (and thus that will now be double counted). It > looks like the fix for kafka-1481 was where things got broken. It seems to > have introduced ‘tags’ in the building of metric names, and then those tags > only get applied to the mbeanName, but get excluded from the metric name: > https://github.com/apache/kafka/commit/457744a820d806e546edebbd8ffd33f6772e519f > > This is a pretty severe issue, since the yammer metrics for these stats > will be double counted in aggregate, and the per-topic stats will be > removed. > > I should note too, in my previous email, I thought that only the per-topic > BrokerTopicMetrics were missing, but also several other per-topic metrics > are missing too, e.g. under kafka.log, etc. > > Jason > > > On Tue, Jan 27, 2015 at 2:20 AM, Jason Rosenberg wrote: > > > I can confirm that the per topic metrics are not coming through to the > > yammer metrics registry. I do see them in jmx (via jconsole), but the > > MetricsRegistry does not have them. > > All the other metrics are coming through that appear in jmx. > > > > This is with single node instance running locally. > > > > Jason > > > > > > > > On Mon, Jan 26, 2015 at 8:30 PM, Manikumar Reddy > > wrote: > > > >> If you are using multi-node cluster, then metrics may be reported from > >> other servers. > >> pl check all the servers in the cluster. > >> > >> On Tue, Jan 27, 2015 at 4:12 AM, Kyle Banker > >> wrote: > >> > >> > I've been using a custom KafkaMetricsReporter to report Kafka broker > >> > metrics to Graphite. In v0.8.1.1, Kafka was reporting bytes and > >> messages in > >> > and out for all topics together and for each individual topic. > >> > > >> > After upgrading to v0.8.2.0, these metrics are no longer being reported. > >> > > >> > I'm only seeing the following: > >> > BrokerTopicMetrics > >> > - BytesInPerSec > >> > - BytesOutPerSec > >> > - BytesRejectedPerSec > >> > - MessagesInPerSec > >> > > >> > What's more, despite lots of successful writes to the cluster, the > >> values > >> > for these remaining metrics are all zero. > >> > > >> > I saw that there was some refactoring of metric naming code. Was the > >> > behavior supposed to have changed? > >> > > >> > Many thanks in advance. > >> > > >> > > > >
Re: Missing Per-Topic BrokerTopicMetrics in v0.8.2.0
Ok, It looks like the yammer MetricName is not being created correctly for the sub metrics that include a topic. E.g. a metric with an mbeanName like: kafka.server:type=BrokerTopicMetrics,name=BytesRejectedPerSec,topic=mytopic appears to be malformed. A yammer MetricName has 4 fields that are used in creating a graphite metric, that are included in the constructor: group, type, name, scope. In this case, the metric with the above mbeanName has these fields set in the MetricName: group: "kafka.server" type: "BrokerTopicMetrics" name: "BytesInPerSec" scope: null Thus, the topic metrics all look the same, and get lumped into the top-level BrokerTopicMetrics (and thus that will now be double counted). It looks like the fix for kafka-1481 was where things got broken. It seems to have introduced ‘tags’ in the building of metric names, and then those tags only get applied to the mbeanName, but get excluded from the metric name: https://github.com/apache/kafka/commit/457744a820d806e546edebbd8ffd33f6772e519f This is a pretty severe issue, since the yammer metrics for these stats will be double counted in aggregate, and the per-topic stats will be removed. I should note too, in my previous email, I thought that only the per-topic BrokerTopicMetrics were missing, but also several other per-topic metrics are missing too, e.g. under kafka.log, etc. Jason On Tue, Jan 27, 2015 at 2:20 AM, Jason Rosenberg wrote: > I can confirm that the per topic metrics are not coming through to the > yammer metrics registry. I do see them in jmx (via jconsole), but the > MetricsRegistry does not have them. > All the other metrics are coming through that appear in jmx. > > This is with single node instance running locally. > > Jason > > > > On Mon, Jan 26, 2015 at 8:30 PM, Manikumar Reddy > wrote: > >> If you are using multi-node cluster, then metrics may be reported from >> other servers. >> pl check all the servers in the cluster. >> >> On Tue, Jan 27, 2015 at 4:12 AM, Kyle Banker >> wrote: >> >> > I've been using a custom KafkaMetricsReporter to report Kafka broker >> > metrics to Graphite. In v0.8.1.1, Kafka was reporting bytes and >> messages in >> > and out for all topics together and for each individual topic. >> > >> > After upgrading to v0.8.2.0, these metrics are no longer being reported. >> > >> > I'm only seeing the following: >> > BrokerTopicMetrics >> > - BytesInPerSec >> > - BytesOutPerSec >> > - BytesRejectedPerSec >> > - MessagesInPerSec >> > >> > What's more, despite lots of successful writes to the cluster, the >> values >> > for these remaining metrics are all zero. >> > >> > I saw that there was some refactoring of metric naming code. Was the >> > behavior supposed to have changed? >> > >> > Many thanks in advance. >> > >> > >
Re: Can't create a topic; can't delete it either
Which version of the broker are you using? On Mon, Jan 26, 2015 at 10:27:14PM -0800, Sumit Rangwala wrote: > While running kafka in production I found an issue where a topic wasn't > getting created even with auto topic enabled. I then went ahead and created > the topic manually (from the command line). I then delete the topic, again > manually. Now my broker won't allow me to either create *the* topic or > delete *the* topic. (other topic creation and deletion is working fine). > > The topic is in "marked for deletion" stage for more than 3 hours. > > $ bin/kafka-topics.sh --zookeeper zookeeper1:2181/replication/kafka --list > --topic GRIFFIN-TldAdFormat.csv-1422321736886 > GRIFFIN-TldAdFormat.csv-1422321736886 - marked for deletion > > If this is a known issue, is there a workaround? > > Sumit
Re: Missing Per-Topic BrokerTopicMetrics in v0.8.2.0
Hi Jason - can you describe how you verify that the metrics are not coming through to the metrics registry? Looking at the metrics code it seems that the mbeans are only registered by the yammer jmx reporter only after being added to the metrics registry. Thanks, Joel On Tue, Jan 27, 2015 at 02:20:38AM -0500, Jason Rosenberg wrote: > I can confirm that the per topic metrics are not coming through to the > yammer metrics registry. I do see them in jmx (via jconsole), but the > MetricsRegistry does not have them. > All the other metrics are coming through that appear in jmx. > > This is with single node instance running locally. > > Jason > > > > On Mon, Jan 26, 2015 at 8:30 PM, Manikumar Reddy > wrote: > > > If you are using multi-node cluster, then metrics may be reported from > > other servers. > > pl check all the servers in the cluster. > > > > On Tue, Jan 27, 2015 at 4:12 AM, Kyle Banker wrote: > > > > > I've been using a custom KafkaMetricsReporter to report Kafka broker > > > metrics to Graphite. In v0.8.1.1, Kafka was reporting bytes and messages > > in > > > and out for all topics together and for each individual topic. > > > > > > After upgrading to v0.8.2.0, these metrics are no longer being reported. > > > > > > I'm only seeing the following: > > > BrokerTopicMetrics > > > - BytesInPerSec > > > - BytesOutPerSec > > > - BytesRejectedPerSec > > > - MessagesInPerSec > > > > > > What's more, despite lots of successful writes to the cluster, the values > > > for these remaining metrics are all zero. > > > > > > I saw that there was some refactoring of metric naming code. Was the > > > behavior supposed to have changed? > > > > > > Many thanks in advance. > > > > >