Re: Kafka broker - Ip-address instead of host naem
Thanks Gwen. One more question: Is there a way we can dynamically re-load the contents of the Kafka server.properties contents without restarting the broker? Example use-case is when a zk-node goes down and a new one is brought up, we need to update the server.properties file to reflect this. Currently there is no way to do this other than broker restart. Broker restart requires a lot of things to do before triggering it. This JIRA is already filed but un-resolved. We don't require all the configs to be reloaded but only variable external config changes should be allowed. https://issues.apache.org/jira/browse/KAFKA-1229 On Sun, May 24, 2015 at 1:14 PM, Gwen Shapira gshap...@cloudera.com wrote: If you set advertised.hostname in server.properties to the ip address, the IP will be registered in ZooKeeper. On Fri, May 22, 2015 at 2:20 PM, Achanta Vamsi Subhash achanta.va...@flipkart.com wrote: Hi, Currently Kakfa brokers register the hostname in zookeeper. [zk: localhost:2181(CONNECTED) 5] get /varadhi/kafka/brokers/ids/0 {jmx_port:,timestamp:1427704934158,host:currHostName,version:1,port:9092} Is there any config to make it use ip-address instead so that we don't make a DNS lookup for the hostname? -- Regards Vamsi Subhash -- Regards Vamsi Subhash
Re: Kafka broker - Ip-address instead of host naem
Ok. I was talking about a scenario where there is no DNS/hostNames for the zk nodes. If the connection string is given with all ip addresess of zk hosts and a new host is brought up in the zk cluster replacing a old node with a different ip address, we still need to re-load the zk connection string. If hostnames are used instead, the DNS mapping can point to the new IP but in a scenario where there is no DNS, we need to again hard-code the IP address in the zk connection string and restart the broker. One way is to give the local mapping of the zk local hostname to ip in /etc/hosts file and change it to the new ip when the node changes. But would reload of the Kafka config with new zk nodes be a better option? But as you said, if we cannot reload the server.properties file, what is the best way in case of no service discovery? On Sun, May 24, 2015 at 6:52 PM, Gwen Shapira gshap...@cloudera.com wrote: You can't dynamically re-load server properties. However, the norm in zookeeper is to configure the connection string with all the nodes in the zk cluster, so there will be no need to modify properties when you replace zk nodes. On Sun, May 24, 2015 at 4:13 PM, Achanta Vamsi Subhash achanta.va...@flipkart.com wrote: Thanks Gwen. One more question: Is there a way we can dynamically re-load the contents of the Kafka server.properties contents without restarting the broker? Example use-case is when a zk-node goes down and a new one is brought up, we need to update the server.properties file to reflect this. Currently there is no way to do this other than broker restart. Broker restart requires a lot of things to do before triggering it. This JIRA is already filed but un-resolved. We don't require all the configs to be reloaded but only variable external config changes should be allowed. https://issues.apache.org/jira/browse/KAFKA-1229 On Sun, May 24, 2015 at 1:14 PM, Gwen Shapira gshap...@cloudera.com wrote: If you set advertised.hostname in server.properties to the ip address, the IP will be registered in ZooKeeper. On Fri, May 22, 2015 at 2:20 PM, Achanta Vamsi Subhash achanta.va...@flipkart.com wrote: Hi, Currently Kakfa brokers register the hostname in zookeeper. [zk: localhost:2181(CONNECTED) 5] get /varadhi/kafka/brokers/ids/0 {jmx_port:,timestamp:1427704934158,host:currHostName,version:1,port:9092} Is there any config to make it use ip-address instead so that we don't make a DNS lookup for the hostname? -- Regards Vamsi Subhash -- Regards Vamsi Subhash -- Regards Vamsi Subhash
Re: [Announcement] Hermes - pub / sub broker built on top of Kafka
Hi Daniel, First of all sorry for late response, i enjoyed short vacation :) I guess the documentation might be bit misleading here, and so we should improve it: we do not aim (and can't) provide higher guarantees than Kafka. We want to be as bullteproofs as possible in REST interface segments. In our SLA we concentrate a lot on REST availability and response time. We can also withstand some short-term Kafka outages. Still, the only goal of Hermes frontend is to push event to Kafka, as only this way we can assure our customers that the event will be delivered (and is stored reliably). Thus, we do not plan on making any distributed storage here - Kafka is our storage. Adam 2015-05-20 11:49 GMT+02:00 Daniel Compton daniel.compton.li...@gmail.com: Hi Adam Firstly, thanks for open sourcing this, it looks like a great tool and I can imagine a lot of people will find it very useful. I had a few thoughts reading the docs. I may have misunderstood things but it seems that your goal of meeting a strict SLA conflicts with your goal of bulletproof delivery. Even if you have a durable queue on disk, when you send a 202 Accepted you could still lose messages if you lost the disk storing the data. I realise this has a small chance of happening, but I don't think you can guarantee bulletproof delivery if only a single server stores the message while its in transit before being accepted by Kafka. Could you expand on the reliability guarantees you're looking to offer and how they can be stronger than the ones provided by Kafka? Thanks, Daniel. On Tue, 19 May 2015 at 2:57 am Adam Dubiel dubiel.a...@gmail.com wrote: Hello everyone, I'm technical team lead of Hermes project. I will try to answer already posted questions, but feel free to ask me anything. 1) Can you comment on how this compares to Confluent's REST proxy? We do not perceive Hermes as mere proxy. While Confluent product wants to help services written in non-jvm languages in connecting to Kafka, Hermes is more then that. First of all we wanted to make connecting to PubSub as easy as possible, hence REST API for publishing (this is same as REST proxy), but also converting from pull to push on consumer side. This means, that our consumers don't need to include Kafka drivers, handle retries or worry about linear commit offset. Instead, our hermes-consumer module implements retry strategies and adaptive output-rate algorithm. Consumer needs to register HTTP (or JMS) endpoint that can accept POST message and that is all. Secondly, on the publisher side, we wanted to create somewhat bullet-proof solution capable of accepting very sensitive data. This means we use Kafka producer buffer to store unsent messages (our internal solution also persists this buffer to disk, this will be ported to OpenSource soon) and we can guarantee maximum response time (SLA). We also are able to use different levels of Kafka ACKs per topic (-1 or 1 currently). Last but not least, we mitigate some of administrative issues: added tons of metrics that can be reported to graphite, message state tracking for debugging purposes, easy to use REST API for previewing messages stored at Kafka or to retransmit events starting from given timestamp (not offset!), 2) Performance We plan on making tests public, but they are not there yet. The numbers in docs come from our production environment, but they should be taken with grain of salt. Hermes performance depends highly on underlying Kafka cluster performance. Our cluster is deployed in cloud (on SSDs), bare metal deployments would probably achieve a lot better performance. Still, the most important metric here is not total response time, but Hermes overhead over pure Kafka. Looks like this is neglible in our cloud deployment (p99 0.2ms), but we will be crunching those numbers and publish them in docs. 3) Topics/subscriptions limit We are limited by Kafka as well, though we never encountered any problems with this (still, we have only 100-150 topics). We want to scale out by making Hermes multi-kafka aware (in effort to become multi DC). Currently management node can manage multiple Kafka clusters, but as soon as we deploy it on production we will add some more documentation on architecture and deployment. We should create FAQ that would answer some most popular questions. 2015-05-18 13:14 GMT+02:00 Marcin Kuthan marcin.kut...@gmail.com: Hi Warren With Hermes, the publisher is never blocked. Even if message has not been sent to Kafka cluster, or if message has been sent but not acknowledged. It is especially useful if your system needs to have strict SLA guarantees. From the consumers perspective there is retrying policy if the consumer is not able to handle published message. In addition, Hermes is able to adjust speed of sending messages to actual situation (i.e.
Re: Kafka broker - Ip-address instead of host naem
You can't dynamically re-load server properties. However, the norm in zookeeper is to configure the connection string with all the nodes in the zk cluster, so there will be no need to modify properties when you replace zk nodes. On Sun, May 24, 2015 at 4:13 PM, Achanta Vamsi Subhash achanta.va...@flipkart.com wrote: Thanks Gwen. One more question: Is there a way we can dynamically re-load the contents of the Kafka server.properties contents without restarting the broker? Example use-case is when a zk-node goes down and a new one is brought up, we need to update the server.properties file to reflect this. Currently there is no way to do this other than broker restart. Broker restart requires a lot of things to do before triggering it. This JIRA is already filed but un-resolved. We don't require all the configs to be reloaded but only variable external config changes should be allowed. https://issues.apache.org/jira/browse/KAFKA-1229 On Sun, May 24, 2015 at 1:14 PM, Gwen Shapira gshap...@cloudera.com wrote: If you set advertised.hostname in server.properties to the ip address, the IP will be registered in ZooKeeper. On Fri, May 22, 2015 at 2:20 PM, Achanta Vamsi Subhash achanta.va...@flipkart.com wrote: Hi, Currently Kakfa brokers register the hostname in zookeeper. [zk: localhost:2181(CONNECTED) 5] get /varadhi/kafka/brokers/ids/0 {jmx_port:,timestamp:1427704934158,host:currHostName,version:1,port:9092} Is there any config to make it use ip-address instead so that we don't make a DNS lookup for the hostname? -- Regards Vamsi Subhash -- Regards Vamsi Subhash
Architecture for multiple consumers of a given message
I'm working on a simple web application where I want the same message of a given topic to be consumed by multiple separate consumers (one that writes to a database, another that writes to a search index like Solr/Elasticsearch). Right now as this is just a small personal project I only have one partition. What's the standard way of doing this? Single consumer, one thread for each kind of consumer thread (two threads in this case)? Multiple consumers for a single partition? I Googled and tried to look at the wiki but didn't get a clear answer. Apologies if I missed something obvious.
Re: Architecture for multiple consumers of a given message
Hi Warren If you're using the high level consumer, then you can just have multiple consumer groups (one for each purpose), and run 1 consumer thread per consumer group. On Mon, 25 May 2015 at 8:43 am Warren Henning warren.henn...@gmail.com wrote: I'm working on a simple web application where I want the same message of a given topic to be consumed by multiple separate consumers (one that writes to a database, another that writes to a search index like Solr/Elasticsearch). Right now as this is just a small personal project I only have one partition. What's the standard way of doing this? Single consumer, one thread for each kind of consumer thread (two threads in this case)? Multiple consumers for a single partition? I Googled and tried to look at the wiki but didn't get a clear answer. Apologies if I missed something obvious.
Replication tools to move topics/partitions gradually
We have a kafka cluster with 10 brokers and we are using the kafka replication tool (kafka-reassign-partitions.sh) when we need to add more brokers to the cluster. But this tool tends to move too many topic/partitions around at the same time which causes instability. Do we have an option to do it more slowly (e.g. move one topic/partition at a step) or did some one build a tool on top of 'kafka-reassign-partitions.sh'? Another use case is when a broker node went down, do we have a tool to move the topic/partitions serviced by this node to the remaining nodes (and doing that in a fashion which doesn't cause too much instability)?
Re: Kafka broker - Ip-address instead of host naem
Oh ok. Got it. Thanks Gwen and Daniel. On Mon, May 25, 2015 at 5:15 AM, Daniel Compton daniel.compton.li...@gmail.com wrote: Correct me if I'm wrong, but I thought that the zk connect config was only needed explicitly at startup, and the zk cluster would update the active zookeepers as they joined and exited. You only need to specify one zk to join the cluster, and it will bootstrap the rest. So zk changes won't require a restart, but you'll want to make sure your config is up to date when you do eventually come to do a restart. On Mon, 25 May 2015 at 1:44 am Achanta Vamsi Subhash achanta.va...@flipkart.com wrote: Ok. I was talking about a scenario where there is no DNS/hostNames for the zk nodes. If the connection string is given with all ip addresess of zk hosts and a new host is brought up in the zk cluster replacing a old node with a different ip address, we still need to re-load the zk connection string. If hostnames are used instead, the DNS mapping can point to the new IP but in a scenario where there is no DNS, we need to again hard-code the IP address in the zk connection string and restart the broker. One way is to give the local mapping of the zk local hostname to ip in /etc/hosts file and change it to the new ip when the node changes. But would reload of the Kafka config with new zk nodes be a better option? But as you said, if we cannot reload the server.properties file, what is the best way in case of no service discovery? On Sun, May 24, 2015 at 6:52 PM, Gwen Shapira gshap...@cloudera.com wrote: You can't dynamically re-load server properties. However, the norm in zookeeper is to configure the connection string with all the nodes in the zk cluster, so there will be no need to modify properties when you replace zk nodes. On Sun, May 24, 2015 at 4:13 PM, Achanta Vamsi Subhash achanta.va...@flipkart.com wrote: Thanks Gwen. One more question: Is there a way we can dynamically re-load the contents of the Kafka server.properties contents without restarting the broker? Example use-case is when a zk-node goes down and a new one is brought up, we need to update the server.properties file to reflect this. Currently there is no way to do this other than broker restart. Broker restart requires a lot of things to do before triggering it. This JIRA is already filed but un-resolved. We don't require all the configs to be reloaded but only variable external config changes should be allowed. https://issues.apache.org/jira/browse/KAFKA-1229 On Sun, May 24, 2015 at 1:14 PM, Gwen Shapira gshap...@cloudera.com wrote: If you set advertised.hostname in server.properties to the ip address, the IP will be registered in ZooKeeper. On Fri, May 22, 2015 at 2:20 PM, Achanta Vamsi Subhash achanta.va...@flipkart.com wrote: Hi, Currently Kakfa brokers register the hostname in zookeeper. [zk: localhost:2181(CONNECTED) 5] get /varadhi/kafka/brokers/ids/0 {jmx_port:,timestamp:1427704934158,host:currHostName,version:1,port:9092} Is there any config to make it use ip-address instead so that we don't make a DNS lookup for the hostname? -- Regards Vamsi Subhash -- Regards Vamsi Subhash -- Regards Vamsi Subhash -- Regards Vamsi Subhash
Re: Replication tools to move topics/partitions gradually
We've built tools on top of it that both build the list based on less information (like clone this broker to that one) and break it down into a configurable number of discrete moves so it doesn't tank the cluster. And yes, I've finally started the process of departing them from the LinkedIn-specific tooling so we can release them to everyone else :) -Todd On May 24, 2015, at 7:45 PM, Henry Cai h...@pinterest.com.INVALID wrote: We have a kafka cluster with 10 brokers and we are using the kafka replication tool (kafka-reassign-partitions.sh) when we need to add more brokers to the cluster. But this tool tends to move too many topic/partitions around at the same time which causes instability. Do we have an option to do it more slowly (e.g. move one topic/partition at a step) or did some one build a tool on top of 'kafka-reassign-partitions.sh'? Another use case is when a broker node went down, do we have a tool to move the topic/partitions serviced by this node to the remaining nodes (and doing that in a fashion which doesn't cause too much instability)?
Re: Log file of server start up error
Hi Sanjay, Did you check that no other Kafka process is using the /tmp/kafk-logs folder? What command(s) did you use to verify that? -Jaikiran On Saturday 23 May 2015 12:19 PM, Sanjay Mistry wrote: [2015-05-23 12:16:41,624] INFO Initiating client connection, connectString=localhost:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@70808f4e (org.apache.zookeeper.ZooKeeper) [2015-05-23 12:16:41,659] INFO Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2181 (org.apache.zookeeper.ClientCnxn) [2015-05-23 12:16:41,673] INFO Socket connection established to localhost/0:0:0:0:0:0:0:1:2181, initiating session (org.apache.zookeeper.ClientCnxn) [2015-05-23 12:16:41,740] INFO Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2181, sessionid = 0x14d7f85f7a3, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn) [2015-05-23 12:16:41,743] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient) [2015-05-23 12:16:42,015] FATAL Fatal error during KafkaServerStable startup. Prepare to shutdown (kafka.server.KafkaServerStartable) kafka.common.KafkaException: Failed to acquire lock on file .lock in /tmp/kafka-logs. A Kafka instance in another process or thread is using this directory. at kafka.log.LogManager$$anonfun$lockLogDirs$1.apply(LogManager.scala:95) at kafka.log.LogManager$$anonfun$lockLogDirs$1.apply(LogManager.scala:92) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:33) at scala.collection.TraversableLike$class.map(TraversableLike.scala:233) at scala.collection.mutable.WrappedArray.map(WrappedArray.scala:33) at kafka.log.LogManager.lockLogDirs(LogManager.scala:92) at kafka.log.LogManager.init(LogManager.scala:55) at kafka.server.KafkaServer.createLogManager(KafkaServer.scala:275) at kafka.server.KafkaServer.startup(KafkaServer.scala:72) at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34) at kafka.Kafka$.main(Kafka.scala:46) at kafka.Kafka.main(Kafka.scala) [2015-05-23 12:16:42,023] INFO [Kafka Server 0], shutting down (kafka.server.KafkaServer) [2015-05-23 12:16:42,030] INFO Terminate ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread) [2015-05-23 12:16:42,036] INFO EventThread shut down (org.apache.zookeeper.ClientCnxn) [2015-05-23 12:16:42,037] INFO Session: 0x14d7f85f7a3 closed (org.apache.zookeeper.ZooKeeper) [2015-05-23 12:16:42,038] INFO [Kafka Server 0], shut down completed (kafka.server.KafkaServer) [2015-05-23 12:16:42,040] INFO [Kafka Server 0], shutting down (kafka.server.KafkaServer)
Re: Replication tools to move topics/partitions gradually
Todd, This is very promising. Do you know when will we be able to see your tools released to public? On Sun, May 24, 2015 at 7:54 PM, Todd Palino tpal...@gmail.com wrote: We've built tools on top of it that both build the list based on less information (like clone this broker to that one) and break it down into a configurable number of discrete moves so it doesn't tank the cluster. And yes, I've finally started the process of departing them from the LinkedIn-specific tooling so we can release them to everyone else :) -Todd On May 24, 2015, at 7:45 PM, Henry Cai h...@pinterest.com.INVALID wrote: We have a kafka cluster with 10 brokers and we are using the kafka replication tool (kafka-reassign-partitions.sh) when we need to add more brokers to the cluster. But this tool tends to move too many topic/partitions around at the same time which causes instability. Do we have an option to do it more slowly (e.g. move one topic/partition at a step) or did some one build a tool on top of 'kafka-reassign-partitions.sh'? Another use case is when a broker node went down, do we have a tool to move the topic/partitions serviced by this node to the remaining nodes (and doing that in a fashion which doesn't cause too much instability)?
Re: Kafka broker - Ip-address instead of host naem
If you set advertised.hostname in server.properties to the ip address, the IP will be registered in ZooKeeper. On Fri, May 22, 2015 at 2:20 PM, Achanta Vamsi Subhash achanta.va...@flipkart.com wrote: Hi, Currently Kakfa brokers register the hostname in zookeeper. [zk: localhost:2181(CONNECTED) 5] get /varadhi/kafka/brokers/ids/0 {jmx_port:,timestamp:1427704934158,host:currHostName,version:1,port:9092} Is there any config to make it use ip-address instead so that we don't make a DNS lookup for the hostname? -- Regards Vamsi Subhash