Re: Kafka broker - Ip-address instead of host naem

2015-05-24 Thread Achanta Vamsi Subhash
Thanks Gwen.

One more question:
Is there a way we can dynamically re-load the contents of the Kafka
server.properties contents without restarting the broker? Example use-case
is when a zk-node goes down and a new one is brought up, we need to update
the server.properties file to reflect this. Currently there is no way to do
this other than broker restart. Broker restart requires a lot of things to
do before triggering it.

This JIRA is already filed but un-resolved. We don't require all the
configs to be reloaded but only variable external config changes should be
allowed.

https://issues.apache.org/jira/browse/KAFKA-1229

On Sun, May 24, 2015 at 1:14 PM, Gwen Shapira gshap...@cloudera.com wrote:

 If you set advertised.hostname in server.properties to the ip address, the
 IP will be registered in ZooKeeper.


 On Fri, May 22, 2015 at 2:20 PM, Achanta Vamsi Subhash 
 achanta.va...@flipkart.com wrote:

  Hi,
 
  Currently Kakfa brokers register the hostname in zookeeper.
 
  [zk: localhost:2181(CONNECTED) 5] get /varadhi/kafka/brokers/ids/0
 
 
 {jmx_port:,timestamp:1427704934158,host:currHostName,version:1,port:9092}
 
  ​Is there any config to make it use ip-address instead so that we don't
  make a DNS lookup for the hostname?
 
  --
  Regards
  Vamsi Subhash
 




-- 
Regards
Vamsi Subhash


Re: Kafka broker - Ip-address instead of host naem

2015-05-24 Thread Achanta Vamsi Subhash
Ok. I was talking about a scenario where there is no DNS/hostNames for the
zk nodes.

If the connection string is given with all ip addresess of zk hosts and a
new host is brought up in the zk cluster replacing a old node with a
different ip address, we still need to re-load the zk connection string.

If hostnames are used instead, the DNS mapping can point to the new IP but
in a scenario where there is no DNS, we need to again hard-code the IP
address in the zk connection string and restart the broker.

One way is to give the local mapping of the zk local hostname to ip in
/etc/hosts file and change it to the new ip when the node changes. But
would reload of the Kafka config with new zk nodes be a better option? But
as you said, if we cannot reload the server.properties file, what is the
best way in case of no service discovery?


On Sun, May 24, 2015 at 6:52 PM, Gwen Shapira gshap...@cloudera.com wrote:

 You can't dynamically re-load server properties.

 However, the norm in zookeeper is to configure the connection string with
 all the nodes in the zk cluster, so there will be no need to modify
 properties when you replace zk nodes.

 On Sun, May 24, 2015 at 4:13 PM, Achanta Vamsi Subhash 
 achanta.va...@flipkart.com wrote:

  Thanks Gwen.
 
  One more question:
  Is there a way we can dynamically re-load the contents of the Kafka
  server.properties contents without restarting the broker? Example
 use-case
  is when a zk-node goes down and a new one is brought up, we need to
 update
  the server.properties file to reflect this. Currently there is no way to
 do
  this other than broker restart. Broker restart requires a lot of things
 to
  do before triggering it.
 
  This JIRA is already filed but un-resolved. We don't require all the
  configs to be reloaded but only variable external config changes should
 be
  allowed.
 
  https://issues.apache.org/jira/browse/KAFKA-1229
 
  On Sun, May 24, 2015 at 1:14 PM, Gwen Shapira gshap...@cloudera.com
  wrote:
 
   If you set advertised.hostname in server.properties to the ip address,
  the
   IP will be registered in ZooKeeper.
  
  
   On Fri, May 22, 2015 at 2:20 PM, Achanta Vamsi Subhash 
   achanta.va...@flipkart.com wrote:
  
Hi,
   
Currently Kakfa brokers register the hostname in zookeeper.
   
[zk: localhost:2181(CONNECTED) 5] get /varadhi/kafka/brokers/ids/0
   
   
  
 
 {jmx_port:,timestamp:1427704934158,host:currHostName,version:1,port:9092}
   
​Is there any config to make it use ip-address instead so that we
 don't
make a DNS lookup for the hostname?
   
--
Regards
Vamsi Subhash
   
  
 
 
 
  --
  Regards
  Vamsi Subhash
 




-- 
Regards
Vamsi Subhash


Re: [Announcement] Hermes - pub / sub broker built on top of Kafka

2015-05-24 Thread Adam Dubiel
Hi Daniel,

First of all sorry for late response, i enjoyed short vacation :)

I guess the documentation might be bit misleading here, and so we should
improve it: we do not aim (and can't) provide higher guarantees than Kafka.

We want to be as bullteproofs as possible in REST interface segments. In
our SLA we concentrate a lot on REST availability and response time. We can
also withstand some short-term Kafka outages. Still, the only goal of
Hermes frontend is to push event to Kafka, as only this way we can assure
our customers that the event will be delivered (and is stored reliably).
Thus, we do not plan on making any distributed storage here - Kafka is our
storage.

Adam



2015-05-20 11:49 GMT+02:00 Daniel Compton daniel.compton.li...@gmail.com:

 Hi Adam

 Firstly, thanks for open sourcing this, it looks like a great tool and I
 can imagine a lot of people will find it very useful.

 I had a few thoughts reading the docs. I may have misunderstood things but
 it seems that your goal of meeting a strict SLA conflicts with your goal of
 bulletproof delivery. Even if you have a durable queue on disk, when you
 send a 202 Accepted you could still lose messages if you lost the disk
 storing the data.

 I realise this has a small chance of happening, but I don't think you can
 guarantee bulletproof delivery if only a single server stores the message
 while its in transit before being accepted by Kafka.

 Could you expand on the reliability guarantees you're looking to offer and
 how they can be stronger than the ones provided by Kafka?

 Thanks, Daniel.
 On Tue, 19 May 2015 at 2:57 am Adam Dubiel dubiel.a...@gmail.com wrote:

  Hello everyone,
 
  I'm technical team lead of Hermes project. I will try to answer already
  posted questions, but feel free to ask me anything.
 
  1) Can you comment on how this compares to Confluent's REST proxy?
 
  We do not perceive Hermes as mere proxy. While Confluent product wants to
  help services written in non-jvm languages in connecting to Kafka, Hermes
  is more then that. First of all we wanted to make connecting to PubSub as
  easy as possible, hence REST API for publishing (this is same as REST
  proxy), but also converting from pull to push on consumer side. This
 means,
  that our consumers don't need to include Kafka drivers, handle retries or
  worry about linear commit offset. Instead, our hermes-consumer module
  implements retry strategies and adaptive output-rate algorithm. Consumer
  needs to register HTTP (or JMS) endpoint that can accept POST message and
  that is all.
 
  Secondly, on the publisher side, we wanted to create somewhat
 bullet-proof
  solution capable of accepting very sensitive data. This means we use
 Kafka
  producer buffer to store unsent messages (our internal solution also
  persists this buffer to disk, this will be ported to OpenSource soon) and
  we can guarantee maximum response time (SLA). We also are able to use
  different levels of Kafka ACKs per topic (-1 or 1 currently).
 
  Last but not least, we mitigate some of administrative issues: added tons
  of metrics that can be reported to graphite, message state tracking for
  debugging purposes, easy to use REST API for previewing messages stored
 at
  Kafka or to retransmit events starting from given timestamp (not
 offset!),
 
 
  2) Performance
 
  We plan on making tests public, but they are not there yet. The numbers
 in
  docs come from our production environment, but they should be taken with
  grain of salt. Hermes performance depends highly on underlying Kafka
  cluster performance. Our cluster is deployed in cloud (on SSDs), bare
 metal
  deployments would probably achieve a lot better performance. Still, the
  most important metric here is not total response time, but Hermes
 overhead
  over pure Kafka. Looks like this is neglible in our cloud deployment
 (p99 
  0.2ms), but we will be crunching those numbers and publish them in docs.
 
  3) Topics/subscriptions limit
 
  We are limited by Kafka as well, though we never encountered any problems
  with this (still, we have only 100-150 topics). We want to scale out by
  making Hermes multi-kafka aware (in effort to become multi DC). Currently
  management node can manage multiple Kafka clusters, but as soon as we
  deploy it on production we will add some more documentation on
 architecture
  and deployment.
 
 
  We should create FAQ that would answer some most popular questions.
 
 
  2015-05-18 13:14 GMT+02:00 Marcin Kuthan marcin.kut...@gmail.com:
 
   Hi Warren
  
   With Hermes, the publisher is never blocked. Even if message has not
   been sent to Kafka cluster, or if message has been sent but not
   acknowledged. It is especially useful if your system needs to have
   strict SLA guarantees.
  
   From the consumers perspective there is retrying policy if the
   consumer is not able to handle published message. In addition, Hermes
   is able to adjust speed of sending messages to actual situation (i.e.
   

Re: Kafka broker - Ip-address instead of host naem

2015-05-24 Thread Gwen Shapira
You can't dynamically re-load server properties.

However, the norm in zookeeper is to configure the connection string with
all the nodes in the zk cluster, so there will be no need to modify
properties when you replace zk nodes.

On Sun, May 24, 2015 at 4:13 PM, Achanta Vamsi Subhash 
achanta.va...@flipkart.com wrote:

 Thanks Gwen.

 One more question:
 Is there a way we can dynamically re-load the contents of the Kafka
 server.properties contents without restarting the broker? Example use-case
 is when a zk-node goes down and a new one is brought up, we need to update
 the server.properties file to reflect this. Currently there is no way to do
 this other than broker restart. Broker restart requires a lot of things to
 do before triggering it.

 This JIRA is already filed but un-resolved. We don't require all the
 configs to be reloaded but only variable external config changes should be
 allowed.

 https://issues.apache.org/jira/browse/KAFKA-1229

 On Sun, May 24, 2015 at 1:14 PM, Gwen Shapira gshap...@cloudera.com
 wrote:

  If you set advertised.hostname in server.properties to the ip address,
 the
  IP will be registered in ZooKeeper.
 
 
  On Fri, May 22, 2015 at 2:20 PM, Achanta Vamsi Subhash 
  achanta.va...@flipkart.com wrote:
 
   Hi,
  
   Currently Kakfa brokers register the hostname in zookeeper.
  
   [zk: localhost:2181(CONNECTED) 5] get /varadhi/kafka/brokers/ids/0
  
  
 
 {jmx_port:,timestamp:1427704934158,host:currHostName,version:1,port:9092}
  
   ​Is there any config to make it use ip-address instead so that we don't
   make a DNS lookup for the hostname?
  
   --
   Regards
   Vamsi Subhash
  
 



 --
 Regards
 Vamsi Subhash



Architecture for multiple consumers of a given message

2015-05-24 Thread Warren Henning
I'm working on a simple web application where I want the same message of a
given topic to be consumed by multiple separate consumers (one that writes
to a database, another that writes to a search index like
Solr/Elasticsearch).

Right now as this is just a small personal project I only have one
partition.

What's the standard way of doing this? Single consumer, one thread for each
kind of consumer thread (two threads in this case)? Multiple consumers for
a single partition?

I Googled and tried to look at the wiki but didn't get a clear answer.
Apologies if I missed something obvious.


Re: Architecture for multiple consumers of a given message

2015-05-24 Thread Daniel Compton
Hi Warren

If you're using the high level consumer, then you can just have multiple
consumer groups (one for each purpose), and run 1 consumer thread per
consumer group.
On Mon, 25 May 2015 at 8:43 am Warren Henning warren.henn...@gmail.com
wrote:

 I'm working on a simple web application where I want the same message of a
 given topic to be consumed by multiple separate consumers (one that writes
 to a database, another that writes to a search index like
 Solr/Elasticsearch).

 Right now as this is just a small personal project I only have one
 partition.

 What's the standard way of doing this? Single consumer, one thread for each
 kind of consumer thread (two threads in this case)? Multiple consumers for
 a single partition?

 I Googled and tried to look at the wiki but didn't get a clear answer.
 Apologies if I missed something obvious.



Replication tools to move topics/partitions gradually

2015-05-24 Thread Henry Cai
We have a kafka cluster with 10 brokers and we are using the kafka
replication tool (kafka-reassign-partitions.sh) when we need to add more
brokers to the cluster.  But this tool tends to move too many
topic/partitions around at the same time which causes instability.  Do we
have an option to do it more slowly (e.g. move one topic/partition at a
step) or did some one build a tool on top of 'kafka-reassign-partitions.sh'?

Another use case is when a broker node went down, do we have a tool to move
the topic/partitions serviced by this node to the remaining nodes (and
doing that in a fashion which doesn't cause too much instability)?


Re: Kafka broker - Ip-address instead of host naem

2015-05-24 Thread Achanta Vamsi Subhash
Oh ok. Got it. Thanks Gwen and Daniel.

On Mon, May 25, 2015 at 5:15 AM, Daniel Compton 
daniel.compton.li...@gmail.com wrote:

 Correct me if I'm wrong, but I thought that the zk connect config was only
 needed explicitly at startup, and the zk cluster would update the active
 zookeepers as they joined and exited. You only need to specify one zk to
 join the cluster, and it will bootstrap the rest.

 So zk changes won't require a restart, but you'll want to make sure your
 config is up to date when you do eventually come to do a restart.
 On Mon, 25 May 2015 at 1:44 am Achanta Vamsi Subhash 
 achanta.va...@flipkart.com wrote:

  Ok. I was talking about a scenario where there is no DNS/hostNames for
 the
  zk nodes.
 
  If the connection string is given with all ip addresess of zk hosts and a
  new host is brought up in the zk cluster replacing a old node with a
  different ip address, we still need to re-load the zk connection string.
 
  If hostnames are used instead, the DNS mapping can point to the new IP
 but
  in a scenario where there is no DNS, we need to again hard-code the IP
  address in the zk connection string and restart the broker.
 
  One way is to give the local mapping of the zk local hostname to ip in
  /etc/hosts file and change it to the new ip when the node changes. But
  would reload of the Kafka config with new zk nodes be a better option?
 But
  as you said, if we cannot reload the server.properties file, what is the
  best way in case of no service discovery?
 
 
  On Sun, May 24, 2015 at 6:52 PM, Gwen Shapira gshap...@cloudera.com
  wrote:
 
   You can't dynamically re-load server properties.
  
   However, the norm in zookeeper is to configure the connection string
 with
   all the nodes in the zk cluster, so there will be no need to modify
   properties when you replace zk nodes.
  
   On Sun, May 24, 2015 at 4:13 PM, Achanta Vamsi Subhash 
   achanta.va...@flipkart.com wrote:
  
Thanks Gwen.
   
One more question:
Is there a way we can dynamically re-load the contents of the Kafka
server.properties contents without restarting the broker? Example
   use-case
is when a zk-node goes down and a new one is brought up, we need to
   update
the server.properties file to reflect this. Currently there is no way
  to
   do
this other than broker restart. Broker restart requires a lot of
 things
   to
do before triggering it.
   
This JIRA is already filed but un-resolved. We don't require all the
configs to be reloaded but only variable external config changes
 should
   be
allowed.
   
https://issues.apache.org/jira/browse/KAFKA-1229
   
On Sun, May 24, 2015 at 1:14 PM, Gwen Shapira gshap...@cloudera.com
 
wrote:
   
 If you set advertised.hostname in server.properties to the ip
  address,
the
 IP will be registered in ZooKeeper.


 On Fri, May 22, 2015 at 2:20 PM, Achanta Vamsi Subhash 
 achanta.va...@flipkart.com wrote:

  Hi,
 
  Currently Kakfa brokers register the hostname in zookeeper.
 
  [zk: localhost:2181(CONNECTED) 5] get
 /varadhi/kafka/brokers/ids/0
 
 

   
  
 
 {jmx_port:,timestamp:1427704934158,host:currHostName,version:1,port:9092}
 
  ​Is there any config to make it use ip-address instead so that we
   don't
  make a DNS lookup for the hostname?
 
  --
  Regards
  Vamsi Subhash
 

   
   
   
--
Regards
Vamsi Subhash
   
  
 
 
 
  --
  Regards
  Vamsi Subhash
 




-- 
Regards
Vamsi Subhash


Re: Replication tools to move topics/partitions gradually

2015-05-24 Thread Todd Palino
We've built tools on top of it that both build the list based on less 
information (like clone this broker to that one) and break it down into a 
configurable number of discrete moves so it doesn't tank the cluster.

And yes, I've finally started the process of departing them from the 
LinkedIn-specific tooling so we can release them to everyone else :)

-Todd

 On May 24, 2015, at 7:45 PM, Henry Cai h...@pinterest.com.INVALID wrote:
 
 We have a kafka cluster with 10 brokers and we are using the kafka
 replication tool (kafka-reassign-partitions.sh) when we need to add more
 brokers to the cluster.  But this tool tends to move too many
 topic/partitions around at the same time which causes instability.  Do we
 have an option to do it more slowly (e.g. move one topic/partition at a
 step) or did some one build a tool on top of 'kafka-reassign-partitions.sh'?
 
 Another use case is when a broker node went down, do we have a tool to move
 the topic/partitions serviced by this node to the remaining nodes (and
 doing that in a fashion which doesn't cause too much instability)?


Re: Log file of server start up error

2015-05-24 Thread Jaikiran Pai

Hi Sanjay,

Did you check that no other Kafka process is using the /tmp/kafk-logs 
folder? What command(s) did you use to verify that?


-Jaikiran
On Saturday 23 May 2015 12:19 PM, Sanjay Mistry wrote:

[2015-05-23 12:16:41,624] INFO Initiating client connection,
connectString=localhost:2181 sessionTimeout=6000
watcher=org.I0Itec.zkclient.ZkClient@70808f4e
(org.apache.zookeeper.ZooKeeper)
[2015-05-23 12:16:41,659] INFO Opening socket connection to server
localhost/0:0:0:0:0:0:0:1:2181 (org.apache.zookeeper.ClientCnxn)
[2015-05-23 12:16:41,673] INFO Socket connection established to
localhost/0:0:0:0:0:0:0:1:2181, initiating session
(org.apache.zookeeper.ClientCnxn)
[2015-05-23 12:16:41,740] INFO Session establishment complete on server
localhost/0:0:0:0:0:0:0:1:2181, sessionid = 0x14d7f85f7a3, negotiated
timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2015-05-23 12:16:41,743] INFO zookeeper state changed (SyncConnected)
(org.I0Itec.zkclient.ZkClient)
[2015-05-23 12:16:42,015] FATAL Fatal error during KafkaServerStable
startup. Prepare to shutdown (kafka.server.KafkaServerStartable)
kafka.common.KafkaException: Failed to acquire lock on file .lock in
/tmp/kafka-logs. A Kafka instance in another process or thread is using
this directory.
 at
kafka.log.LogManager$$anonfun$lockLogDirs$1.apply(LogManager.scala:95)
 at
kafka.log.LogManager$$anonfun$lockLogDirs$1.apply(LogManager.scala:92)
 at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
 at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
 at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
 at
scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:33)
 at
scala.collection.TraversableLike$class.map(TraversableLike.scala:233)
 at scala.collection.mutable.WrappedArray.map(WrappedArray.scala:33)
 at kafka.log.LogManager.lockLogDirs(LogManager.scala:92)
 at kafka.log.LogManager.init(LogManager.scala:55)
 at kafka.server.KafkaServer.createLogManager(KafkaServer.scala:275)
 at kafka.server.KafkaServer.startup(KafkaServer.scala:72)
 at
kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34)
 at kafka.Kafka$.main(Kafka.scala:46)
 at kafka.Kafka.main(Kafka.scala)
[2015-05-23 12:16:42,023] INFO [Kafka Server 0], shutting down
(kafka.server.KafkaServer)
[2015-05-23 12:16:42,030] INFO Terminate ZkClient event thread.
(org.I0Itec.zkclient.ZkEventThread)
[2015-05-23 12:16:42,036] INFO EventThread shut down
(org.apache.zookeeper.ClientCnxn)
[2015-05-23 12:16:42,037] INFO Session: 0x14d7f85f7a3 closed
(org.apache.zookeeper.ZooKeeper)
[2015-05-23 12:16:42,038] INFO [Kafka Server 0], shut down completed
(kafka.server.KafkaServer)
[2015-05-23 12:16:42,040] INFO [Kafka Server 0], shutting down
(kafka.server.KafkaServer)





Re: Replication tools to move topics/partitions gradually

2015-05-24 Thread Henry Cai
Todd,

This is very promising.  Do you know when will we be able to see your tools
released to public?

On Sun, May 24, 2015 at 7:54 PM, Todd Palino tpal...@gmail.com wrote:

 We've built tools on top of it that both build the list based on less
 information (like clone this broker to that one) and break it down into a
 configurable number of discrete moves so it doesn't tank the cluster.

 And yes, I've finally started the process of departing them from the
 LinkedIn-specific tooling so we can release them to everyone else :)

 -Todd

  On May 24, 2015, at 7:45 PM, Henry Cai h...@pinterest.com.INVALID
 wrote:
 
  We have a kafka cluster with 10 brokers and we are using the kafka
  replication tool (kafka-reassign-partitions.sh) when we need to add more
  brokers to the cluster.  But this tool tends to move too many
  topic/partitions around at the same time which causes instability.  Do we
  have an option to do it more slowly (e.g. move one topic/partition at a
  step) or did some one build a tool on top of
 'kafka-reassign-partitions.sh'?
 
  Another use case is when a broker node went down, do we have a tool to
 move
  the topic/partitions serviced by this node to the remaining nodes (and
  doing that in a fashion which doesn't cause too much instability)?



Re: Kafka broker - Ip-address instead of host naem

2015-05-24 Thread Gwen Shapira
If you set advertised.hostname in server.properties to the ip address, the
IP will be registered in ZooKeeper.


On Fri, May 22, 2015 at 2:20 PM, Achanta Vamsi Subhash 
achanta.va...@flipkart.com wrote:

 Hi,

 Currently Kakfa brokers register the hostname in zookeeper.

 [zk: localhost:2181(CONNECTED) 5] get /varadhi/kafka/brokers/ids/0

 {jmx_port:,timestamp:1427704934158,host:currHostName,version:1,port:9092}

 ​Is there any config to make it use ip-address instead so that we don't
 make a DNS lookup for the hostname?

 --
 Regards
 Vamsi Subhash