taking broker down and returning it does not restore cluster state (nor rebalance)
Hi, Running some tests on 0811 and wanted to see what happens when a broker is taken down with 'kill'. I bumped into the situation at the subject where launching the broker back left him a bit out of the game as far as I could see using stack driver metrics. Trying to rebalance with verify consumer rebalance return an error no owner for partition for all partitions of that topic (128 partitions). moreover, yet aside from the issue at hand, changing the group name to a non-existent group returned success. taking both the consumers and producers down allowed the rebalance to return success... And the question is: How do you restore 100% state after taking down a broker? what is the best practice? what needs be checked and what needs be done? Shlomi
Rebalance not happening even after increasing max retries causing conflict in ZK
Dear Experts, We recently updated to kafka v0.8.1.1 with zookeeper v3.4.5. I have of topic with 30 partitions and 2 replicas. We are using High level consumer api. Each consumer process which is a storm topolofy has 5 streams which connects to 1 or more partitions. We are not using storm's inbuilt kafka spout. Everything runs fine till the 5th consumer process(25 streams) is added for this topic. As soon as the sixth consumer process is added, the newly added partition does not get the ownership of the partitions that it requests for as the already existing owners have not yet given up the ownership. We changed certain properties on consumer : 1. Max Rebalance attempts - 20 ( rebalance.backoff.ms * rebalance.max.retries zk connection timeout) 2. Back off ms between rebalances - 1 (10seconds) 3. ZK connection timeout - 100,000 (100 seconds) Although when I am looking in the zookeeper shell when the rebalance is happening, the consumer is registered fine on the zookeeper. Just that the rebalance does not happen. After the 20th rebalance gets completed, we get *2014-10-11 11:10:08 k.c.ZookeeperConsumerConnector [INFO] [rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b], Committing all offsets after clearing the fetcher queues* *2014-10-11 11:10:10 c.s.m.k.i.c.KafkaFeedStreamer [WARN] Ignoring exception while trying to start streamer threads: rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b can't rebalance after 20 retries* *kafka.common.ConsumerRebalanceFailedException: rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b can't rebalance after 20 retries* *at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:432) ~[stormjar.jar:na]* *at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:722) ~[stormjar.jar:na]* *at kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:212) ~[stormjar.jar:na]* *at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:80) ~[stormjar.jar:na]* *at com.spr.messaging.kafka.impl.consumer.KafkaFeedStreamer.createAndStartThreads(KafkaFeedStreamer.java:79) ~[stormjar.jar:na]* *at com.spr.messaging.kafka.impl.consumer.KafkaFeedStreamer.startKafkaStreamThreadsIfNecessary(KafkaFeedStreamer.java:64) ~[stormjar.jar:na]* *at com.spr.messaging.kafka.impl.consumer.KafkaFeedConsumerFactoryImpl.startStreamerIfNotRunning(KafkaFeedConsumerFactoryImpl.java:71) [stormjar.jar:na]* *at com.spr.messaging.kafka.impl.consumer.KafkaFeedPullConsumerImpl.startStreamerIfNotRunning(KafkaFeedPullConsumerImpl.java:48) [stormjar.jar:na]* *at com.spr.messaging.kafka.impl.KafkaFeedServiceImpl.getKafkaFeedPullConsumer(KafkaFeedServiceImpl.java:63) [stormjar.jar:na]* *at com.spr.storm.topology.spout.AbstractSprKafkaSpout.nextTuple(AbstractSprKafkaSpout.java:121) [stormjar.jar:na]* *at backtype.storm.daemon.executor$eval3848$fn__3849$fn__3864$fn__3893.invoke(executor.clj:562) [na:0.9.1-incubating]* *at backtype.storm.util$async_loop$fn__384.invoke(util.clj:433) [na:0.9.1-incubating]* *at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]* *at java.lang.Thread.run(Thread.java:745) [na:1.7.0_55]* *2014-10-11 11:10:10 k.c.ZookeeperConsumerConnector [INFO] [rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b], begin registering consumer rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b in ZK* *2014-10-11 11:10:10 k.u.ZkUtils$ [INFO] conflict in /consumers/rule-engine-feed/ids/rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b data: {version:1,subscription:{rule-engine-feed:5},pattern:static,timestamp:1413025810635} stored data: {version:1,subscription:{rule-engine-feed:5},pattern:static,timestamp:1413025767483}* *2014-10-11 11:10:10 k.u.ZkUtils$ [INFO] I wrote this conflicted ephemeral node [{version:1,subscription:{rule-engine-feed:5},pattern:static,timestamp:1413025810635}] at /consumers/rule-engine-feed/ids/rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b a while back in a different session, hence I will backoff for this node to be deleted by Zookeeper and retry* Due to this error, none of the consumer consumes from these partitions in contention which creates a sort of skewed lag on kafka side. When the 6th consumer was added, the existing owner process of the partitions in question did not get rebalanced. Any help would be highly appreciated. -Thanks, Mohit
Re: refactoring ZK so it is plugable, would this make sense?
You can run with a single node zookeeper cluster also. See http://zookeeper.apache.org/doc/r3.3.4/zookeeperStarted.html#sc_InstallingSingleMode Cheers, Erik. Op 9 okt. 2014, om 22:52 heeft S Ahmed sahmed1...@gmail.com het volgende geschreven: I want kafka features (w/o the redundancy) but don't want to have to run 3 zookeeper instances to save $$. On Thu, Oct 9, 2014 at 2:59 PM, Jun Rao jun...@gmail.com wrote: This may not be easy since you have to implement things like watcher callbacks. What's your main concern with the ZK dependency? Thanks, Jun On Thu, Oct 9, 2014 at 8:20 AM, S Ahmed sahmed1...@gmail.com wrote: Hi, I was wondering if the zookeeper library (zkutils.scala etc) was designed in a more modular way, would it make it possible to run a more lean version of kafka? The idea is I want to run kafka but with a less emphasis on it being durable with failover and more on it being a replacement for a standard queue like kestrel. This way you could take advantage of how the other aspects of Kafka (permanent log, etc etc.) I was just thinking if the zookeeper access was wrapped in something like: class DiscoverService def electLeader .. def getFollower ... (I'm just making those methods up, but you get the point they are simply the same calls zkutils etc. will be making to connect to zookeeper) Now the idea is, if you don't want to dedicate 3 servers to run zookeeper, you could create your own implementation that e.g. returns data based on a configuration file that is static and not a discover service like zookeeper. Would wrapping the zookeper calls into a plugable/swapable service make sense and allow you to still use Kakfa at a smaller scale or would this not work for other reasons that I am overlooking?
How to produce and consume events in 2 DCs?
Hi, We have 2 data centers that produce events. Each DC has to process events from both DCs. I had the following in mind: DC 1 | DC 2 events |events + + + | + + + | | | | | | | v v v | v v v ++ | ++ | Receiver topic | | | Receiver topic | ++ ++ | | mirroring || | | +--+| | | | | | ++ | v vv v ++ | ++ | Consumer topic | | | Consumer topic | ++ | ++ + + + | + + + | | | | | | | v v v | v v v consumers | consumers As each DC has a single Kafka cluster, on each DC the receiver topic and consumer topic needs to be on the same cluster. Unfortunately, mirror maker does not seem to support mirroring to a topic with another name. Is there another tool we could use? Or, is there another approach for producing and consuming from 2 DCs? Kind regards, Erik. — Erik van Oosten http://www.day-to-day-stuff.blogspot.nl/
Re: Rebalance not happening even after increasing max retries causing conflict in ZK
Mohit, I wonder if it is related to https://issues.apache.org/jira/browse/KAFKA-1585. When zookeeper expires a session, it doesn't delete the ephemeral nodes immediately. So if you end up trying to recreate ephemeral nodes quickly, it could either be in the valid latest session or from the previously expired session. If you hit this problem, then waiting would resolve it. But if not, then this may be a legitimate bug in ZK 3.4.6. Can you try shutting down all your consumers, waiting until session timeout and restarting them? Thanks, Neha On Mon, Oct 20, 2014 at 6:15 AM, Mohit Kathuria mkathu...@sprinklr.com wrote: Dear Experts, We recently updated to kafka v0.8.1.1 with zookeeper v3.4.5. I have of topic with 30 partitions and 2 replicas. We are using High level consumer api. Each consumer process which is a storm topolofy has 5 streams which connects to 1 or more partitions. We are not using storm's inbuilt kafka spout. Everything runs fine till the 5th consumer process(25 streams) is added for this topic. As soon as the sixth consumer process is added, the newly added partition does not get the ownership of the partitions that it requests for as the already existing owners have not yet given up the ownership. We changed certain properties on consumer : 1. Max Rebalance attempts - 20 ( rebalance.backoff.ms * rebalance.max.retries zk connection timeout) 2. Back off ms between rebalances - 1 (10seconds) 3. ZK connection timeout - 100,000 (100 seconds) Although when I am looking in the zookeeper shell when the rebalance is happening, the consumer is registered fine on the zookeeper. Just that the rebalance does not happen. After the 20th rebalance gets completed, we get *2014-10-11 11:10:08 k.c.ZookeeperConsumerConnector [INFO] [rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b], Committing all offsets after clearing the fetcher queues* *2014-10-11 11:10:10 c.s.m.k.i.c.KafkaFeedStreamer [WARN] Ignoring exception while trying to start streamer threads: rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b can't rebalance after 20 retries* *kafka.common.ConsumerRebalanceFailedException: rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b can't rebalance after 20 retries* *at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:432) ~[stormjar.jar:na]* *at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:722) ~[stormjar.jar:na]* *at kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:212) ~[stormjar.jar:na]* *at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:80) ~[stormjar.jar:na]* *at com.spr.messaging.kafka.impl.consumer.KafkaFeedStreamer.createAndStartThreads(KafkaFeedStreamer.java:79) ~[stormjar.jar:na]* *at com.spr.messaging.kafka.impl.consumer.KafkaFeedStreamer.startKafkaStreamThreadsIfNecessary(KafkaFeedStreamer.java:64) ~[stormjar.jar:na]* *at com.spr.messaging.kafka.impl.consumer.KafkaFeedConsumerFactoryImpl.startStreamerIfNotRunning(KafkaFeedConsumerFactoryImpl.java:71) [stormjar.jar:na]* *at com.spr.messaging.kafka.impl.consumer.KafkaFeedPullConsumerImpl.startStreamerIfNotRunning(KafkaFeedPullConsumerImpl.java:48) [stormjar.jar:na]* *at com.spr.messaging.kafka.impl.KafkaFeedServiceImpl.getKafkaFeedPullConsumer(KafkaFeedServiceImpl.java:63) [stormjar.jar:na]* *at com.spr.storm.topology.spout.AbstractSprKafkaSpout.nextTuple(AbstractSprKafkaSpout.java:121) [stormjar.jar:na]* *at backtype.storm.daemon.executor$eval3848$fn__3849$fn__3864$fn__3893.invoke(executor.clj:562) [na:0.9.1-incubating]* *at backtype.storm.util$async_loop$fn__384.invoke(util.clj:433) [na:0.9.1-incubating]* *at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]* *at java.lang.Thread.run(Thread.java:745) [na:1.7.0_55]* *2014-10-11 11:10:10 k.c.ZookeeperConsumerConnector [INFO] [rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b], begin registering consumer rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b in ZK* *2014-10-11 11:10:10 k.u.ZkUtils$ [INFO] conflict in /consumers/rule-engine-feed/ids/rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b data: {version:1,subscription:{rule-engine-feed:5},pattern:static,timestamp:1413025810635} stored data: {version:1,subscription:{rule-engine-feed:5},pattern:static,timestamp:1413025767483}* *2014-10-11 11:10:10 k.u.ZkUtils$ [INFO] I wrote this conflicted ephemeral node [{version:1,subscription:{rule-engine-feed:5},pattern:static,timestamp:1413025810635}] at /consumers/rule-engine-feed/ids/rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b a while back in a different session, hence I will backoff for this node to
Re: taking broker down and returning it does not restore cluster state (nor rebalance)
Did you ensure that your replication factor was set higher than 1? If so, things should recover automatically after adding the killed broker back into the cluster. On Mon, Oct 20, 2014 at 1:32 AM, Shlomi Hazan shl...@viber.com wrote: Hi, Running some tests on 0811 and wanted to see what happens when a broker is taken down with 'kill'. I bumped into the situation at the subject where launching the broker back left him a bit out of the game as far as I could see using stack driver metrics. Trying to rebalance with verify consumer rebalance return an error no owner for partition for all partitions of that topic (128 partitions). moreover, yet aside from the issue at hand, changing the group name to a non-existent group returned success. taking both the consumers and producers down allowed the rebalance to return success... And the question is: How do you restore 100% state after taking down a broker? what is the best practice? what needs be checked and what needs be done? Shlomi
Re: How to produce and consume events in 2 DCs?
Another way to set up this kind of mirroring is by deploying 2 clusters in each DC - a local Kafka cluster and an aggregate Kafka cluster. The mirror maker copies data from both the DC's local clusters into the aggregate clusters. So if you want access to a topic with data from both DC's, you subscribe to the aggregate cluster. Thanks, Neha On Mon, Oct 20, 2014 at 7:07 AM, Erik van oosten e.vanoos...@grons.nl.invalid wrote: Hi, We have 2 data centers that produce events. Each DC has to process events from both DCs. I had the following in mind: DC 1 | DC 2 events |events + + + | + + + | | | | | | | v v v | v v v ++ | ++ | Receiver topic | | | Receiver topic | ++ ++ | | mirroring || | | +--+| | | | | | ++ | v vv v ++ | ++ | Consumer topic | | | Consumer topic | ++ | ++ + + + | + + + | | | | | | | v v v | v v v consumers | consumers As each DC has a single Kafka cluster, on each DC the receiver topic and consumer topic needs to be on the same cluster. Unfortunately, mirror maker does not seem to support mirroring to a topic with another name. Is there another tool we could use? Or, is there another approach for producing and consuming from 2 DCs? Kind regards, Erik. — Erik van Oosten http://www.day-to-day-stuff.blogspot.nl/
log.cleanup.interval.mins still valid for 0.8.1?
Hi all, This config property does not appear in the table of broker config properties. But it appears in the example on the Web page. So I wonder if this is still a valid config property for 0.8.1. Thanks. Libo
Re: log.cleanup.interval.mins still valid for 0.8.1?
Which example are you referring to? On Mon, Oct 20, 2014 at 7:47 AM, Libo Yu yu_l...@hotmail.com wrote: Hi all, This config property does not appear in the table of broker config properties. But it appears in the example on the Web page. So I wonder if this is still a valid config property for 0.8.1. Thanks. Libo
RE: log.cleanup.interval.mins still valid for 0.8.1?
http://kafka.apache.org/documentation.html#brokerconfigs In section 6.3, there is an example your production server configuration. Date: Mon, 20 Oct 2014 07:49:56 -0700 Subject: Re: log.cleanup.interval.mins still valid for 0.8.1? From: neha.narkh...@gmail.com To: users@kafka.apache.org Which example are you referring to? On Mon, Oct 20, 2014 at 7:47 AM, Libo Yu yu_l...@hotmail.com wrote: Hi all, This config property does not appear in the table of broker config properties. But it appears in the example on the Web page. So I wonder if this is still a valid config property for 0.8.1. Thanks. Libo
Kafka producer iOS and android
hi Is it possible to have iOS and android to run the code needed for Kafka producers ? I want to have mobile clients connect to Kafka broker Thanks, Josh
Re: Kafka producer iOS and android
Hi Josh, Why not have Rest api service running where you post messages from your mobile clients. Rest api can run kafka producers accepting these messages pushing it into kafka brokers. Here is an example where we did similar service for kafka https://github.com/mozilla-metrics/bagheera . This project used kafka 0.7 but you can see how its implemented. Hope that helps. -Harsha On Mon, Oct 20, 2014, at 08:45 AM, Josh J wrote: hi Is it possible to have iOS and android to run the code needed for Kafka producers ? I want to have mobile clients connect to Kafka broker Thanks, Josh
Re: Kafka producer iOS and android
Thanks for the tip. I would like to avoid hand rolling any code if possible. For example, on Android I would like to ask if people are able to include and use the kafka jars with no problem? And on iOS, if there is a way to include any C or other relevant code. On Mon, Oct 20, 2014 at 8:49 AM, Harsha ka...@harsha.io wrote: Hi Josh, Why not have Rest api service running where you post messages from your mobile clients. Rest api can run kafka producers accepting these messages pushing it into kafka brokers. Here is an example where we did similar service for kafka https://github.com/mozilla-metrics/bagheera . This project used kafka 0.7 but you can see how its implemented. Hope that helps. -Harsha On Mon, Oct 20, 2014, at 08:45 AM, Josh J wrote: hi Is it possible to have iOS and android to run the code needed for Kafka producers ? I want to have mobile clients connect to Kafka broker Thanks, Josh
Re: Kafka producer iOS and android
What is the use case requiring that? If you try to integrate kafka in the two different mobile platforms you will get many separate development cycles and none will work in many mobile networked environments. You can HTTP/HTTPS POST the same Avro objects (or Thrift or ProtoBuf) from each platform. You also then have 1 integration cycle then easily maintained and supported cohesively over all. The Avro/Thrift/ProtoBuf objects and a HTTP/HTTPS layer, thats it. Wrapping the producer is pretty straight forward in any language now a days I think and it depends what language and how you operate production services to pick how to write that producer. That producer and interface should change much and you have to think a little about configurations but most all of that is server side depending on the message. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop / On Mon, Oct 20, 2014 at 1:09 PM, Josh J joshjd...@gmail.com wrote: Thanks for the tip. I would like to avoid hand rolling any code if possible. For example, on Android I would like to ask if people are able to include and use the kafka jars with no problem? And on iOS, if there is a way to include any C or other relevant code. On Mon, Oct 20, 2014 at 8:49 AM, Harsha ka...@harsha.io wrote: Hi Josh, Why not have Rest api service running where you post messages from your mobile clients. Rest api can run kafka producers accepting these messages pushing it into kafka brokers. Here is an example where we did similar service for kafka https://github.com/mozilla-metrics/bagheera . This project used kafka 0.7 but you can see how its implemented. Hope that helps. -Harsha On Mon, Oct 20, 2014, at 08:45 AM, Josh J wrote: hi Is it possible to have iOS and android to run the code needed for Kafka producers ? I want to have mobile clients connect to Kafka broker Thanks, Josh
Re: Kafka producer iOS and android
What is the use case requiring that? I'm looking for an open source library that I can use in Android and iOS, instead of hand rolling my own. On Mon, Oct 20, 2014 at 10:21 AM, Joe Stein joe.st...@stealth.ly wrote: What is the use case requiring that? If you try to integrate kafka in the two different mobile platforms you will get many separate development cycles and none will work in many mobile networked environments. You can HTTP/HTTPS POST the same Avro objects (or Thrift or ProtoBuf) from each platform. You also then have 1 integration cycle then easily maintained and supported cohesively over all. The Avro/Thrift/ProtoBuf objects and a HTTP/HTTPS layer, thats it. Wrapping the producer is pretty straight forward in any language now a days I think and it depends what language and how you operate production services to pick how to write that producer. That producer and interface should change much and you have to think a little about configurations but most all of that is server side depending on the message. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop / On Mon, Oct 20, 2014 at 1:09 PM, Josh J joshjd...@gmail.com wrote: Thanks for the tip. I would like to avoid hand rolling any code if possible. For example, on Android I would like to ask if people are able to include and use the kafka jars with no problem? And on iOS, if there is a way to include any C or other relevant code. On Mon, Oct 20, 2014 at 8:49 AM, Harsha ka...@harsha.io wrote: Hi Josh, Why not have Rest api service running where you post messages from your mobile clients. Rest api can run kafka producers accepting these messages pushing it into kafka brokers. Here is an example where we did similar service for kafka https://github.com/mozilla-metrics/bagheera . This project used kafka 0.7 but you can see how its implemented. Hope that helps. -Harsha On Mon, Oct 20, 2014, at 08:45 AM, Josh J wrote: hi Is it possible to have iOS and android to run the code needed for Kafka producers ? I want to have mobile clients connect to Kafka broker Thanks, Josh
Re: Kafka producer iOS and android
I may be out of date, but I believe security measures are only in the proposal stage. Your use case most likely involves sending data from the internet at large to the Kafka instance. This will result in all data sent to the Kafka instance being consumable by the internet at large. This is unlikely to be what you desire and would likely be a disservice to your users. Using a REST wrapper (I believe there is at least one available) will limit the internet at large to producing data and you can build additional authentication mechanisms into that if desired. I hope someone else will correct me if I am mistaken. Christian On Mon, Oct 20, 2014 at 10:28 AM, Josh J joshjd...@gmail.com wrote: What is the use case requiring that? I'm looking for an open source library that I can use in Android and iOS, instead of hand rolling my own. On Mon, Oct 20, 2014 at 10:21 AM, Joe Stein joe.st...@stealth.ly wrote: What is the use case requiring that? If you try to integrate kafka in the two different mobile platforms you will get many separate development cycles and none will work in many mobile networked environments. You can HTTP/HTTPS POST the same Avro objects (or Thrift or ProtoBuf) from each platform. You also then have 1 integration cycle then easily maintained and supported cohesively over all. The Avro/Thrift/ProtoBuf objects and a HTTP/HTTPS layer, thats it. Wrapping the producer is pretty straight forward in any language now a days I think and it depends what language and how you operate production services to pick how to write that producer. That producer and interface should change much and you have to think a little about configurations but most all of that is server side depending on the message. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop / On Mon, Oct 20, 2014 at 1:09 PM, Josh J joshjd...@gmail.com wrote: Thanks for the tip. I would like to avoid hand rolling any code if possible. For example, on Android I would like to ask if people are able to include and use the kafka jars with no problem? And on iOS, if there is a way to include any C or other relevant code. On Mon, Oct 20, 2014 at 8:49 AM, Harsha ka...@harsha.io wrote: Hi Josh, Why not have Rest api service running where you post messages from your mobile clients. Rest api can run kafka producers accepting these messages pushing it into kafka brokers. Here is an example where we did similar service for kafka https://github.com/mozilla-metrics/bagheera . This project used kafka 0.7 but you can see how its implemented. Hope that helps. -Harsha On Mon, Oct 20, 2014, at 08:45 AM, Josh J wrote: hi Is it possible to have iOS and android to run the code needed for Kafka producers ? I want to have mobile clients connect to Kafka broker Thanks, Josh
Re: taking broker down and returning it does not restore cluster state (nor rebalance)
Yes I did. It is set to 2. On Oct 20, 2014 5:38 PM, Neha Narkhede neha.narkh...@gmail.com wrote: Did you ensure that your replication factor was set higher than 1? If so, things should recover automatically after adding the killed broker back into the cluster. On Mon, Oct 20, 2014 at 1:32 AM, Shlomi Hazan shl...@viber.com wrote: Hi, Running some tests on 0811 and wanted to see what happens when a broker is taken down with 'kill'. I bumped into the situation at the subject where launching the broker back left him a bit out of the game as far as I could see using stack driver metrics. Trying to rebalance with verify consumer rebalance return an error no owner for partition for all partitions of that topic (128 partitions). moreover, yet aside from the issue at hand, changing the group name to a non-existent group returned success. taking both the consumers and producers down allowed the rebalance to return success... And the question is: How do you restore 100% state after taking down a broker? what is the best practice? what needs be checked and what needs be done? Shlomi
Re: how to do disaster recovery for kafka 0.8 cluster with consumers that uses high-level consumer api?
If the whole cluster is down and you allow unclean leader election on the broker, some exposed messages on the broker could be lost when restarting the brokers. When that happens, the consumers may need to reset their offset since the current offsets may no longer be valid. By default, the offset will be reset to the smallest valid offset. You can set auto.offset.reset to largest to avoid re-reading all old messages. Thanks, Jun On Sun, Oct 19, 2014 at 9:58 PM, Yu Yang yuyan...@gmail.com wrote: Thanks, Jun! Yes, I set the topic replication factor to 3. On Sun, Oct 19, 2014 at 8:09 PM, Jun Rao jun...@gmail.com wrote: Did you set the replication factor to be more than 1? Thanks, Jun On Sat, Oct 18, 2014 at 2:32 AM, Yu Yang yuyan...@gmail.com wrote: Hi all, We have a kafka 0.8.1 cluster. We implemented a consumers for the topics on the Kafka 0.8 cluster using high-level consumer api. We observed that if the Kafka cluster was down and got rebooted and the consumer was running, the consumer will fail to read a few topic partitions due to negative lag behind value. How shall we handle disaster recovery without re-reading the processed messages? Thanks! -Yu
Strange behavior during un-clean leader election
Hi everyone, We run a 3 Kafka cluster using 0.8.1.1 with all topics having a replication factor of 3 meaning every broker has a replica of every partition. We recently ran into this issue ( https://issues.apache.org/jira/browse/KAFKA-1028) and saw data loss within Kafka. We understand why it happened and have plans to try to ensure it doesn't happen again. The strange part was that the broker that was chosen for the un-clean leader election seemed to drop all of its own data about the partition in the process as our monitoring shows the broker offset was reset to 0 for a number of partitions. Following the broker's server logs in chronological order for a particular partition that saw data loss I see this, 2014-10-16 10:18:11,104 INFO kafka.log.Log: Completed load of log TOPIC-6 with log end offset 528026 2014-10-16 10:20:18,144 WARN kafka.controller.OfflinePartitionLeaderSelector: [OfflinePartitionLeaderSelector]: No broker in ISR is alive for [TOPIC,6]. Elect leader 1 from live brokers 1,2. There's potential data loss. 2014-10-16 10:20:18,277 WARN kafka.cluster.Partition: Partition [TOPIC,6] on broker 1: No checkpointed highwatermark is found for partition [TOPIC,6] 2014-10-16 10:20:18,698 INFO kafka.log.Log: Truncating log TOPIC-6 to offset 0. 2014-10-16 10:21:18,788 INFO kafka.log.OffsetIndex: Deleting index /storage/kafka/00/kafka_data/TOPIC-6/00528024.index.deleted 2014-10-16 10:21:18,781 INFO kafka.log.Log: Deleting segment 528024 from log TOPIC-6. I'm not too worried about this since I'm hoping to move to Kafka 0.8.2 ASAP but I was curious if anyone could explain this behavior. -Bryan
Re: Achieving Consistency and Durability
Hi Kyle, I added new documentation, which will hopefully help. Please take a look here: https://issues.apache.org/jira/browse/KAFKA-1555 I've heard rumors that you are very very good at documenting, so I'm looking forward to your comments. Note that I'm completely ignoring the acks1 case since we are about to remove it. Gwen On Wed, Oct 15, 2014 at 1:21 PM, Kyle Banker kyleban...@gmail.com wrote: Thanks very much for these clarifications, Gwen. I'd recommend modifying the following phrase describing acks=-1: This option provides the best durability, we guarantee that no messages will be lost as long as at least one in sync replica remains. The as long as at least one in sync replica remains is such a huge caveat. It should be noted that acks=-1 provides no actual durability guarantees unless min.isr is also used to specify a majority of replicas. In addition, I was curious if you might comment on my other recent posting Consistency and Availability on Node Failures and possibly add this scenario to the docs. With acks=-1 and min.isr=2 and a 3-replica topic in a 12-node Kafka cluster, there's a relatively high probability that losing 2 nodes from this cluster will result in an inability to write to the cluster. On Tue, Oct 14, 2014 at 4:50 PM, Gwen Shapira gshap...@cloudera.com wrote: ack = 2 *will* throw an exception when there's only one node in ISR. The problem with ack=2 is that if you have 3 replicas and you got acks from 2 of them, the one replica which did not get the message can still be in ISR and get elected as leader, leading for a loss of the message. If you specify ack=3, you can't tolerate the failure of a single replica. Not amazing either. To makes things even worse, when specifying the number of acks you want, you don't always know how many replicas the topic should have, so its difficult to pick the correct number. acks = -1 solves that problem (since all messages need to get acked by all replicas), but introduces the new problem of not getting an exception if ISR shrank to 1 replica. Thats why the min.isr configuration was added. I hope this clarifies things :) I'm planning to add this to the docs in a day or two, so let me know if there are any additional explanations or scenarios you think we need to include. Gwen On Tue, Oct 14, 2014 at 12:27 PM, Scott Reynolds sreyno...@twilio.com wrote: A question about 0.8.1.1 and acks. I was under the impression that setting acks to 2 will not throw an exception when there is only one node in ISR. Am I incorrect ? Thus the need for min_isr. On Tue, Oct 14, 2014 at 11:50 AM, Kyle Banker kyleban...@gmail.com wrote: It's quite difficult to infer from the docs the exact techniques required to ensure consistency and durability in Kafka. I propose that we add a doc section detailing these techniques. I would be happy to help with this. The basic question is this: assuming that I can afford to temporarily halt production to Kafka, how do I ensure that no message written to Kafka is ever lost under typical failure scenarios (i.e., the loss of a single broker)? Here's my understanding of this for Kafka v0.8.1.1: 1. Create a topic with a replication factor of 3. 2. Use a sync producer and set acks to 2. (Setting acks to -1 may successfully write even in a case where the data is written only to a single node). Even with these two precautions, there's always the possibility of an unclean leader election. Can data loss still occur in this scenario? Is it possible to achieve this level of durability on v0.8.1.1? In Kafka v0.8.2, in addition to the above: 3. Ensure that the triple-replicated topic also disallows unclean leader election (https://issues.apache.org/jira/browse/KAFKA-1028). 4. Set the min.isr value of the producer to 2 and acks to -1 ( https://issues.apache.org/jira/browse/KAFKA-1555). The producer will then throw an exception if data can't be written to 2 out of 3 nodes. In addition to producer configuration and usage, there are also monitoring and operations considerations for achieving durability and consistency. As those are rather nuanced, it'd probably be easiest to just start iterating on a document to flesh those out. If anyone has any advice on how to better specify this, or how to get started on improving the docs, I'm happy to help out.
Re: How to produce and consume events in 2 DCs?
This is another potential use-case for message metadata. i.e., if we had a DC/environment field in the header you could easily set up a two-way mirroring pipeline. The mirror-maker can just filter out messages that originated in the source cluster. For this to be efficient the mirror maker should run in the source cluster. (We traditionally run it in the target cluster but there is no hard and fast requirement that it be set up that way). Thanks, Joel On Mon, Oct 20, 2014 at 07:43:26AM -0700, Neha Narkhede wrote: Another way to set up this kind of mirroring is by deploying 2 clusters in each DC - a local Kafka cluster and an aggregate Kafka cluster. The mirror maker copies data from both the DC's local clusters into the aggregate clusters. So if you want access to a topic with data from both DC's, you subscribe to the aggregate cluster. Thanks, Neha On Mon, Oct 20, 2014 at 7:07 AM, Erik van oosten e.vanoos...@grons.nl.invalid wrote: Hi, We have 2 data centers that produce events. Each DC has to process events from both DCs. I had the following in mind: DC 1 | DC 2 events |events + + + | + + + | | | | | | | v v v | v v v ++ | ++ | Receiver topic | | | Receiver topic | ++ ++ | | mirroring || | | +--+| | | | | | ++ | v vv v ++ | ++ | Consumer topic | | | Consumer topic | ++ | ++ + + + | + + + | | | | | | | v v v | v v v consumers | consumers As each DC has a single Kafka cluster, on each DC the receiver topic and consumer topic needs to be on the same cluster. Unfortunately, mirror maker does not seem to support mirroring to a topic with another name. Is there another tool we could use? Or, is there another approach for producing and consuming from 2 DCs? Kind regards, Erik. — Erik van Oosten http://www.day-to-day-stuff.blogspot.nl/
Re: taking broker down and returning it does not restore cluster state (nor rebalance)
As Neha mentioned, with rep factor 2x, this shouldn't normally cause an issue. Taking the broker down will cause the leader to move to another replica; consumers and producers will rediscover the new leader; no rebalances should be triggered. When you bring the broker back up, unless you run a preferred replica leader re-election the broker will remain a follower. Again, there will be no effect on the producers or consumers (i.e., no rebalances). If you can reproduce this easily, can you please send exact steps to reproduce and send over your consumer logs? Thanks, Joel On Mon, Oct 20, 2014 at 09:13:27PM +0300, Shlomi Hazan wrote: Yes I did. It is set to 2. On Oct 20, 2014 5:38 PM, Neha Narkhede neha.narkh...@gmail.com wrote: Did you ensure that your replication factor was set higher than 1? If so, things should recover automatically after adding the killed broker back into the cluster. On Mon, Oct 20, 2014 at 1:32 AM, Shlomi Hazan shl...@viber.com wrote: Hi, Running some tests on 0811 and wanted to see what happens when a broker is taken down with 'kill'. I bumped into the situation at the subject where launching the broker back left him a bit out of the game as far as I could see using stack driver metrics. Trying to rebalance with verify consumer rebalance return an error no owner for partition for all partitions of that topic (128 partitions). moreover, yet aside from the issue at hand, changing the group name to a non-existent group returned success. taking both the consumers and producers down allowed the rebalance to return success... And the question is: How do you restore 100% state after taking down a broker? what is the best practice? what needs be checked and what needs be done? Shlomi
Re: log.cleanup.interval.mins still valid for 0.8.1?
It has been replaced with: log.retention.check.interval.ms We will update the docs. Thanks for reporting this. Joel On Mon, Oct 20, 2014 at 11:08:38AM -0400, Libo Yu wrote: http://kafka.apache.org/documentation.html#brokerconfigs In section 6.3, there is an example your production server configuration. Date: Mon, 20 Oct 2014 07:49:56 -0700 Subject: Re: log.cleanup.interval.mins still valid for 0.8.1? From: neha.narkh...@gmail.com To: users@kafka.apache.org Which example are you referring to? On Mon, Oct 20, 2014 at 7:47 AM, Libo Yu yu_l...@hotmail.com wrote: Hi all, This config property does not appear in the table of broker config properties. But it appears in the example on the Web page. So I wonder if this is still a valid config property for 0.8.1. Thanks. Libo -- Joel
Performance issues
I am running a performance test and from what I am seeing is that messages are taking about 100ms to pop from the queue itself and hence making the test slow. I am looking for pointers of how I can troubleshoot this issue. There seems to be plenty of CPU and IO available. I am running 22 producers and 22 consumers in the same group.
Re: Sending Same Message to Two Topics on Same Broker Cluster
Hi Neha, Yes, I understand that but when transmitting single message (I can not set List of all topics) Only Single one. So I will to add same message in buffer with different topic. If Kakfa protocol, allows to add multiple topic then message does not have to be re-transmited over the wire to add to multiple topic. The Producer record only allow one topic. http://people.apache.org/~nehanarkhede/kafka-0.9-producer-javadoc/doc/org/apache/kafka/clients/producer/ProducerRecord.html Thanks for your quick response and I appreciate your help. Thanks, Bhavesh On Mon, Oct 20, 2014 at 9:10 PM, Neha Narkhede neha.narkh...@gmail.com wrote: Not really. You need producers to send data to Kafka. On Mon, Oct 20, 2014 at 9:05 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Hi Kakfa Team, I would like to send a single message to multiple topics (two for now) without re-transmitting the message from producer to brokers. Is this possible? Both Producers Scala and Java does not allow this. I do not have to do this all the time only based on application condition. Thanks in advance of your help !! Thanks, Bhavesh