Re: Kafka on yarn

2014-07-23 Thread Gwen Shapira
Hi,

Can we discuss for a moment the use-case of Kafka-on-YARN?

I (as Cloudera field engineer) typically advise my customers to
install Kafka on their own nodes, to allow Kafka uninterrupted access
to disks. Hadoop processes tend to be a bit IO heavy. Also, I can't
see any benefit from co-locating Kafka and HDFS.

Since YARN does not manage IO yet, running Kafka on Hadoop cluster
with YARN won't solve this problem in the near future.

The other problem is that we typically want brokers to be long
running, and YARN is poorly designed for that (see our hacks for Llama
as an example).

And yet another problem: For resource management to work, we need to
be able to add and take away resources from a process. AFAIK, the YARN
re-allocated memory for Java processes is to kill them (since there's
no good way to force Java to give back memory to the OS). I doubt we
want to do that for Kafka.

I'd love to hear from those interested in Kafka+YARN what do they
expect to gain out of the combination.

Gwen


On Wed, Jul 23, 2014 at 2:37 PM, hsy...@gmail.com  wrote:
> Hi guys,
>
> Kafka is getting more and more popular and in most cases people run kafka
> as long-term service in the cluster. Is there a discussion of running kafka
> on yarn cluster which we can utilize the convenient configuration/resource
> management and HA.  I think there is a big potential and requirement for
> that.
> I found a project https://github.com/kkasravi/kafka-yarn. But is there a
> official roadmap/plan for this?
>
> Thank you very much!
>
> Best,
> Siyuan


Re: Kafka on yarn

2014-07-23 Thread hsy...@gmail.com
Thanks guys for your knowledge. Is there any other concern on
producer/consumer side? My understanding is High level consumer  and
producer would refresh metadata of the cluster and detect the leadership
change or node failure. I guess, there shouldn't be anything worried if I
delete 1 broker and add it back from other node at run time?


On Wed, Jul 23, 2014 at 4:44 PM, Kam Kasravi 
wrote:

> Thanks Joe for the input related to Mesos as well as acknowledging the
> need for YARN to support this type of cluster allocation - long running
> services with node locality priority.
>
> Thanks Jay - That's an interesting fact that I wasn't aware of - though I
> imagine there could possibly be a long latency for the replica data to be
> transferred to the new broker (depending on #/size of partitions). It does
> open up some possibilities to restart brokers on app master restart using
> different containers  (as well as some complications if an old container
> with old data were reallocated on restart). I had used zookeeper to store
> broker locations so the app master on restart would look for this
> information and attempt to reallocate containers on these nodes.  All this
> said, would this be part of kafka or some other framework? I can see kafka
> benefitting from this at the same time kafka's appeal IMO is it's
> simplicity. Spark has chosen to include YARN within its distribution, not
> sure what the kafka team thinks.
>
>
>
> On Wednesday, July 23, 2014 4:19 PM, Jay Kreps 
> wrote:
>
>
>
> Hey Kam,
>
> It would be nice to have a way to get a failed node back with it's
> original data, but this isn't strictly necessary, it is just a good
> optimization. As long as you run with replication you can restart a
> broker elsewhere with no data, and it will restore it's state off the
> other replicas.
>
> -Jay
>
>
> On Wed, Jul 23, 2014 at 3:47 PM, Kam Kasravi
>  wrote:
> > Hi
> >
> > Kafka-on-yarn requires YARN to consistently allocate a kafka broker at a
> particular resource since the broker needs to always use its local data.
> YARN doesn't do this well, unless you provide (override) the default
> scheduler (CapacityScheduler or FairScheduler). SequenceIO did something
> along these lines for a different use case. Unfortunately replacing the
> scheduler is a global operation which would affect all App masters.
> Additionally one could argue that the broker should be run as an OS service
> and auto restarted on failure if necessary. Slider (incubating) did some of
> this groundwork but YARN still has lots of limitations in providing
> guarantees to consistently allocate a container on a particular node
> especially on appmaster restart (eg ResourceManager dies). That said, it
> might be worthwhile to enumerate all of this here with some possible
> solutions. If there is interest I could certainly list the relevant JIRA's
> along with some additional JIRA's
> >  required IMO.
> >
> > Thanks
> > Kam
> >
> >
> > On Wednesday, July 23, 2014 2:37 PM, "hsy...@gmail.com" <
> hsy...@gmail.com> wrote:
> >
> >
> >
> > Hi guys,
> >
> > Kafka is getting more and more popular and in most cases people run kafka
> > as long-term service in the cluster. Is there a discussion of running
> kafka
> > on yarn cluster which we can utilize the convenient
> configuration/resource
> > management and HA.  I think there is a big potential and requirement for
> > that.
> > I found a project https://github.com/kkasravi/kafka-yarn. But is there a
> > official roadmap/plan for this?
> >
> > Thank you very much!
> >
> > Best,
> > Siyuan
>


Re: num.partitions vs CreateTopicCommand.main(args)

2014-07-23 Thread Mingtao Zhang
Thank you for the clarification!

In fact, the config instance is our own file ...

Mingtao


On Wed, Jul 23, 2014 at 7:57 PM, Guozhang Wang  wrote:

> num.partitions is only used as a default value when the createTopic command
> does not specify the num.partitions or it is automatically created. In your
> case since you always use its value in the createTopic you will always can
> one partition. Try change your code to sth. like:
>
> String[] args = new String[]{
> "--zookeeper", config.getString("zookeeper"),
> "--topic", config.getString("topic"),
> "--replica", config.getString("replicas"),
> "--partition", "8"
> };
>
> CreateTopicCommand.main(args);
>
>
>
> On Wed, Jul 23, 2014 at 4:38 PM, Mingtao Zhang 
> wrote:
>
> > Hi All,
> >
> > In kafka.properties, I put (forgot to change):
> >
> > num.partitions=1
> >
> > While I create topics programatically:
> >
> > String[] args = new String[]{
> > "--zookeeper", config.getString("zookeeper"),
> > "--topic", config.getString("topic"),
> > "--replica", config.getString("replicas"),
> > "--partition", config.getString("partitions")
> > };
> >
> > CreateTopicCommand.main(args);
> >
> > The performance engineer told me only one consumer thread is actively
> > working even I have 4 consumer threads started (could see when debugging
> or
> > in thread dump); and 4 partitions configured from the args.
> >
> > It seems that num.partitions is still controlling the parallelism. Do I
> > need to change this num.partitions accordingly? Could I remove it? What
> is
> > I have different parallel requirement for different topic?
> >
> > Thank you in advance!
> >
> > Best Regards,
> > Mingtao
> >
>
>
>
> --
> -- Guozhang
>


Re: Kafka on yarn

2014-07-23 Thread Jay Kreps
Yeah restoring data is definitely expensive. If you have 5TB/machine
then you will need to restore 5TB of data. Running this way then there
is no particular functionality you need out of the app master other
than and setting the right node id.

Obviously you do need HA RM to make this work. I think you also need a
way to push new broker code one node at a time to upgrade Kafka itself
or want to change a config.

The feature that YARN needs to support running this kind of stateful
service more happily is a backoff on reassigning a container and
cleaning up data on disk when the process fails. Kafka itself actually
tracks the last known location of a given node even if it is down, so
a Kafka app master could request the same machine it was previously on
and reuse its data if it is still there.

-Jay

On Wed, Jul 23, 2014 at 4:44 PM, Kam Kasravi
 wrote:
> Thanks Joe for the input related to Mesos as well as acknowledging the need 
> for YARN to support this type of cluster allocation - long running services 
> with node locality priority.
>
> Thanks Jay - That's an interesting fact that I wasn't aware of - though I 
> imagine there could possibly be a long latency for the replica data to be 
> transferred to the new broker (depending on #/size of partitions). It does 
> open up some possibilities to restart brokers on app master restart using 
> different containers  (as well as some complications if an old container with 
> old data were reallocated on restart). I had used zookeeper to store broker 
> locations so the app master on restart would look for this information and 
> attempt to reallocate containers on these nodes.  All this said, would this 
> be part of kafka or some other framework? I can see kafka benefitting from 
> this at the same time kafka's appeal IMO is it's simplicity. Spark has chosen 
> to include YARN within its distribution, not sure what the kafka team thinks.
>
>
>
> On Wednesday, July 23, 2014 4:19 PM, Jay Kreps  wrote:
>
>
>
> Hey Kam,
>
> It would be nice to have a way to get a failed node back with it's
> original data, but this isn't strictly necessary, it is just a good
> optimization. As long as you run with replication you can restart a
> broker elsewhere with no data, and it will restore it's state off the
> other replicas.
>
> -Jay
>
>
> On Wed, Jul 23, 2014 at 3:47 PM, Kam Kasravi
>  wrote:
>> Hi
>>
>> Kafka-on-yarn requires YARN to consistently allocate a kafka broker at a 
>> particular resource since the broker needs to always use its local data. 
>> YARN doesn't do this well, unless you provide (override) the default 
>> scheduler (CapacityScheduler or FairScheduler). SequenceIO did something 
>> along these lines for a different use case. Unfortunately replacing the 
>> scheduler is a global operation which would affect all App masters. 
>> Additionally one could argue that the broker should be run as an OS service 
>> and auto restarted on failure if necessary. Slider (incubating) did some of 
>> this groundwork but YARN still has lots of limitations in providing 
>> guarantees to consistently allocate a container on a particular node 
>> especially on appmaster restart (eg ResourceManager dies). That said, it 
>> might be worthwhile to enumerate all of this here with some possible 
>> solutions. If there is interest I could certainly list the relevant JIRA's 
>> along with some additional JIRA's
>>  required IMO.
>>
>> Thanks
>> Kam
>>
>>
>> On Wednesday, July 23, 2014 2:37 PM, "hsy...@gmail.com"  
>> wrote:
>>
>>
>>
>> Hi guys,
>>
>> Kafka is getting more and more popular and in most cases people run kafka
>> as long-term service in the cluster. Is there a discussion of running kafka
>> on yarn cluster which we can utilize the convenient configuration/resource
>> management and HA.  I think there is a big potential and requirement for
>> that.
>> I found a project https://github.com/kkasravi/kafka-yarn. But is there a
>> official roadmap/plan for this?
>>
>> Thank you very much!
>>
>> Best,
>> Siyuan


Re: Kafka on yarn

2014-07-23 Thread Steve Morin
Kam,
  Give it some time and think it's getting better as a real possibility for
Kafka on Yarn.  There are new capabilities coming out in Yarn/HDFS to allow
for node groups/label that can work with locality and secondarily new
functionality in HDFS that depending on the use-case can be very
interesting with in-memory files.
-Steve


On Wed, Jul 23, 2014 at 4:44 PM, Kam Kasravi 
wrote:

> Thanks Joe for the input related to Mesos as well as acknowledging the
> need for YARN to support this type of cluster allocation - long running
> services with node locality priority.
>
> Thanks Jay - That's an interesting fact that I wasn't aware of - though I
> imagine there could possibly be a long latency for the replica data to be
> transferred to the new broker (depending on #/size of partitions). It does
> open up some possibilities to restart brokers on app master restart using
> different containers  (as well as some complications if an old container
> with old data were reallocated on restart). I had used zookeeper to store
> broker locations so the app master on restart would look for this
> information and attempt to reallocate containers on these nodes.  All this
> said, would this be part of kafka or some other framework? I can see kafka
> benefitting from this at the same time kafka's appeal IMO is it's
> simplicity. Spark has chosen to include YARN within its distribution, not
> sure what the kafka team thinks.
>
>
>
> On Wednesday, July 23, 2014 4:19 PM, Jay Kreps 
> wrote:
>
>
>
> Hey Kam,
>
> It would be nice to have a way to get a failed node back with it's
> original data, but this isn't strictly necessary, it is just a good
> optimization. As long as you run with replication you can restart a
> broker elsewhere with no data, and it will restore it's state off the
> other replicas.
>
> -Jay
>
>
> On Wed, Jul 23, 2014 at 3:47 PM, Kam Kasravi
>  wrote:
> > Hi
> >
> > Kafka-on-yarn requires YARN to consistently allocate a kafka broker at a
> particular resource since the broker needs to always use its local data.
> YARN doesn't do this well, unless you provide (override) the default
> scheduler (CapacityScheduler or FairScheduler). SequenceIO did something
> along these lines for a different use case. Unfortunately replacing the
> scheduler is a global operation which would affect all App masters.
> Additionally one could argue that the broker should be run as an OS service
> and auto restarted on failure if necessary. Slider (incubating) did some of
> this groundwork but YARN still has lots of limitations in providing
> guarantees to consistently allocate a container on a particular node
> especially on appmaster restart (eg ResourceManager dies). That said, it
> might be worthwhile to enumerate all of this here with some possible
> solutions. If there is interest I could certainly list the relevant JIRA's
> along with some additional JIRA's
> >  required IMO.
> >
> > Thanks
> > Kam
> >
> >
> > On Wednesday, July 23, 2014 2:37 PM, "hsy...@gmail.com" <
> hsy...@gmail.com> wrote:
> >
> >
> >
> > Hi guys,
> >
> > Kafka is getting more and more popular and in most cases people run kafka
> > as long-term service in the cluster. Is there a discussion of running
> kafka
> > on yarn cluster which we can utilize the convenient
> configuration/resource
> > management and HA.  I think there is a big potential and requirement for
> > that.
> > I found a project https://github.com/kkasravi/kafka-yarn. But is there a
> > official roadmap/plan for this?
> >
> > Thank you very much!
> >
> > Best,
> > Siyuan
>


Re: num.partitions vs CreateTopicCommand.main(args)

2014-07-23 Thread Guozhang Wang
num.partitions is only used as a default value when the createTopic command
does not specify the num.partitions or it is automatically created. In your
case since you always use its value in the createTopic you will always can
one partition. Try change your code to sth. like:

String[] args = new String[]{
"--zookeeper", config.getString("zookeeper"),
"--topic", config.getString("topic"),
"--replica", config.getString("replicas"),
"--partition", "8"
};

CreateTopicCommand.main(args);



On Wed, Jul 23, 2014 at 4:38 PM, Mingtao Zhang 
wrote:

> Hi All,
>
> In kafka.properties, I put (forgot to change):
>
> num.partitions=1
>
> While I create topics programatically:
>
> String[] args = new String[]{
> "--zookeeper", config.getString("zookeeper"),
> "--topic", config.getString("topic"),
> "--replica", config.getString("replicas"),
> "--partition", config.getString("partitions")
> };
>
> CreateTopicCommand.main(args);
>
> The performance engineer told me only one consumer thread is actively
> working even I have 4 consumer threads started (could see when debugging or
> in thread dump); and 4 partitions configured from the args.
>
> It seems that num.partitions is still controlling the parallelism. Do I
> need to change this num.partitions accordingly? Could I remove it? What is
> I have different parallel requirement for different topic?
>
> Thank you in advance!
>
> Best Regards,
> Mingtao
>



-- 
-- Guozhang


Re: Kafka on yarn

2014-07-23 Thread Kam Kasravi
Thanks Joe for the input related to Mesos as well as acknowledging the need for 
YARN to support this type of cluster allocation - long running services with 
node locality priority. 

Thanks Jay - That's an interesting fact that I wasn't aware of - though I 
imagine there could possibly be a long latency for the replica data to be 
transferred to the new broker (depending on #/size of partitions). It does open 
up some possibilities to restart brokers on app master restart using different 
containers  (as well as some complications if an old container with old data 
were reallocated on restart). I had used zookeeper to store broker locations so 
the app master on restart would look for this information and attempt to 
reallocate containers on these nodes.  All this said, would this be part of 
kafka or some other framework? I can see kafka benefitting from this at the 
same time kafka's appeal IMO is it's simplicity. Spark has chosen to include 
YARN within its distribution, not sure what the kafka team thinks. 



On Wednesday, July 23, 2014 4:19 PM, Jay Kreps  wrote:
 


Hey Kam,

It would be nice to have a way to get a failed node back with it's
original data, but this isn't strictly necessary, it is just a good
optimization. As long as you run with replication you can restart a
broker elsewhere with no data, and it will restore it's state off the
other replicas.

-Jay


On Wed, Jul 23, 2014 at 3:47 PM, Kam Kasravi
 wrote:
> Hi
>
> Kafka-on-yarn requires YARN to consistently allocate a kafka broker at a 
> particular resource since the broker needs to always use its local data. YARN 
> doesn't do this well, unless you provide (override) the default scheduler 
> (CapacityScheduler or FairScheduler). SequenceIO did something along these 
> lines for a different use case. Unfortunately replacing the scheduler is a 
> global operation which would affect all App masters. Additionally one could 
> argue that the broker should be run as an OS service and auto restarted on 
> failure if necessary. Slider (incubating) did some of this groundwork but 
> YARN still has lots of limitations in providing guarantees to consistently 
> allocate a container on a particular node especially on appmaster restart (eg 
> ResourceManager dies). That said, it might be worthwhile to enumerate all of 
> this here with some possible solutions. If there is interest I could 
> certainly list the relevant JIRA's along with some additional JIRA's
>  required IMO.
>
> Thanks
> Kam
>
>
> On Wednesday, July 23, 2014 2:37 PM, "hsy...@gmail.com"  
> wrote:
>
>
>
> Hi guys,
>
> Kafka is getting more and more popular and in most cases people run kafka
> as long-term service in the cluster. Is there a discussion of running kafka
> on yarn cluster which we can utilize the convenient configuration/resource
> management and HA.  I think there is a big potential and requirement for
> that.
> I found a project https://github.com/kkasravi/kafka-yarn. But is there a
> official roadmap/plan for this?
>
> Thank you very much!
>
> Best,
> Siyuan

num.partitions vs CreateTopicCommand.main(args)

2014-07-23 Thread Mingtao Zhang
Hi All,

In kafka.properties, I put (forgot to change):

num.partitions=1

While I create topics programatically:

String[] args = new String[]{
"--zookeeper", config.getString("zookeeper"),
"--topic", config.getString("topic"),
"--replica", config.getString("replicas"),
"--partition", config.getString("partitions")
};

CreateTopicCommand.main(args);

The performance engineer told me only one consumer thread is actively
working even I have 4 consumer threads started (could see when debugging or
in thread dump); and 4 partitions configured from the args.

It seems that num.partitions is still controlling the parallelism. Do I
need to change this num.partitions accordingly? Could I remove it? What is
I have different parallel requirement for different topic?

Thank you in advance!

Best Regards,
Mingtao


Re: Kafka on yarn

2014-07-23 Thread Jay Kreps
Hey Kam,

It would be nice to have a way to get a failed node back with it's
original data, but this isn't strictly necessary, it is just a good
optimization. As long as you run with replication you can restart a
broker elsewhere with no data, and it will restore it's state off the
other replicas.

-Jay

On Wed, Jul 23, 2014 at 3:47 PM, Kam Kasravi
 wrote:
> Hi
>
> Kafka-on-yarn requires YARN to consistently allocate a kafka broker at a 
> particular resource since the broker needs to always use its local data. YARN 
> doesn't do this well, unless you provide (override) the default scheduler 
> (CapacityScheduler or FairScheduler). SequenceIO did something along these 
> lines for a different use case. Unfortunately replacing the scheduler is a 
> global operation which would affect all App masters. Additionally one could 
> argue that the broker should be run as an OS service and auto restarted on 
> failure if necessary. Slider (incubating) did some of this groundwork but 
> YARN still has lots of limitations in providing guarantees to consistently 
> allocate a container on a particular node especially on appmaster restart (eg 
> ResourceManager dies). That said, it might be worthwhile to enumerate all of 
> this here with some possible solutions. If there is interest I could 
> certainly list the relevant JIRA's along with some additional JIRA's
>  required IMO.
>
> Thanks
> Kam
>
>
> On Wednesday, July 23, 2014 2:37 PM, "hsy...@gmail.com"  
> wrote:
>
>
>
> Hi guys,
>
> Kafka is getting more and more popular and in most cases people run kafka
> as long-term service in the cluster. Is there a discussion of running kafka
> on yarn cluster which we can utilize the convenient configuration/resource
> management and HA.  I think there is a big potential and requirement for
> that.
> I found a project https://github.com/kkasravi/kafka-yarn. But is there a
> official roadmap/plan for this?
>
> Thank you very much!
>
> Best,
> Siyuan


Re: Kafka on yarn

2014-07-23 Thread Joe Stein
There are folks that run Kafka Brokers on Apache Mesos.  I don't know of
anyone running Kafka brokers on YARN but if there were I would hope they
chime in.

Without getting into a long debate about Mesos vs YARN I do agree with
cluster resource allocation being an important direction for the industry
as a whole.  There was a discussion some time back about this on the dev
list (maybe last November) and there isn't really anything that Kafka needs
to-do for making this work.  I had started that discussion and a wiki page
and at the end of it was able to get Kafka running without any changes to
it. Each of these resource negotiator systems provide ways for standalone
systems to run on them to avoid custom schedulers from having to be
developed.  Their (Mesos/YARN/etc) success is somewhat dictated by not
forcing application to have to-do much of anything to gain from the
benefits they provide =8^)

If you are interested in running Kafka Brokers on Mesos you can do so using
the Apache Aurora scheduler.  Some scripts that will launch that for you on
a Mesos cluster can be found here
https://github.com/pegasussolutions/borealis (along with the zookeeper
ensemble if you wanted).  I know folks also use Marathon (another Mesos
schedular) for this too but I don't know if those scripts were open sourced
or not but it is possible.

Running consumers on Mesos and/or YARN has become more prevalent (almost
typical) with systems like Spark, Samza and Storm taking up the data
processing from brokers and those systems being launched on Mesos and/or
YARN clusters.

Producers also but that falls a bit more into just running whatever
application that is producing data on Mesos and/or YARN and your being able
to produce to Kafka brokers from within that application.

/***
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop 
/


On Wed, Jul 23, 2014 at 5:37 PM, hsy...@gmail.com  wrote:

> Hi guys,
>
> Kafka is getting more and more popular and in most cases people run kafka
> as long-term service in the cluster. Is there a discussion of running kafka
> on yarn cluster which we can utilize the convenient configuration/resource
> management and HA.  I think there is a big potential and requirement for
> that.
> I found a project https://github.com/kkasravi/kafka-yarn. But is there a
> official roadmap/plan for this?
>
> Thank you very much!
>
> Best,
> Siyuan
>


Re: Kafka on yarn

2014-07-23 Thread Kam Kasravi
Hi 

Kafka-on-yarn requires YARN to consistently allocate a kafka broker at a 
particular resource since the broker needs to always use its local data. YARN 
doesn't do this well, unless you provide (override) the default scheduler 
(CapacityScheduler or FairScheduler). SequenceIO did something along these 
lines for a different use case. Unfortunately replacing the scheduler is a 
global operation which would affect all App masters. Additionally one could 
argue that the broker should be run as an OS service and auto restarted on 
failure if necessary. Slider (incubating) did some of this groundwork but YARN 
still has lots of limitations in providing guarantees to consistently allocate 
a container on a particular node especially on appmaster restart (eg 
ResourceManager dies). That said, it might be worthwhile to enumerate all of 
this here with some possible solutions. If there is interest I could certainly 
list the relevant JIRA's along with some additional JIRA's
 required IMO.

Thanks
Kam


On Wednesday, July 23, 2014 2:37 PM, "hsy...@gmail.com"  
wrote:
 


Hi guys,

Kafka is getting more and more popular and in most cases people run kafka
as long-term service in the cluster. Is there a discussion of running kafka
on yarn cluster which we can utilize the convenient configuration/resource
management and HA.  I think there is a big potential and requirement for
that.
I found a project https://github.com/kkasravi/kafka-yarn. But is there a
official roadmap/plan for this?

Thank you very much!

Best,
Siyuan

Kafka on yarn

2014-07-23 Thread hsy...@gmail.com
Hi guys,

Kafka is getting more and more popular and in most cases people run kafka
as long-term service in the cluster. Is there a discussion of running kafka
on yarn cluster which we can utilize the convenient configuration/resource
management and HA.  I think there is a big potential and requirement for
that.
I found a project https://github.com/kkasravi/kafka-yarn. But is there a
official roadmap/plan for this?

Thank you very much!

Best,
Siyuan


Re: Partitions per Machine for a topic

2014-07-23 Thread Philip O'Toole
Brokers can host multiple partitions for the same topic without any problems.

Philip

 
-
http://www.philipotoole.com



On Wednesday, July 23, 2014 2:15 PM, Kashyap Mhaisekar  
wrote:
HI,
Is the maximum no. of partitions for a topic dependent on the no. of
machines in a kafka cluster?
For e.g., if I have 3 machines in a cluster, can I have 5 partitions with a
caveat that one machine can host multiple partitions for a given topic?

Regards,
Kashyap



Partitions per Machine for a topic

2014-07-23 Thread Kashyap Mhaisekar
HI,
Is the maximum no. of partitions for a topic dependent on the no. of
machines in a kafka cluster?
For e.g., if I have 3 machines in a cluster, can I have 5 partitions with a
caveat that one machine can host multiple partitions for a given topic?

Regards,
Kashyap


Re: [DISCUSS] Kafka Security Specific Features

2014-07-23 Thread Chris Neal
Pramod,

I got that same error when following the configuration from Raja's
presentation earlier in this thread.  If you'll notice the usage for the
console_producer.sh, it is slightly different, which is also slightly
different than the scala code for the ConsoleProducer. :)

When I changed this:

bin/kafka-console-producer.sh --broker-list n5:9092:true --topic test

to this:

bin/kafka-console-producer.sh --broker-list n5:9092 --secure
--client.security.file config/client.security.properties --topic test

I was able to push messages to the topic, although I got a WARN about the
property "topic" not being valid, even though it is required.

Also, the Producer reported this warning to me:

[2014-07-23 20:45:24,509] WARN Attempt to reinitialize auth context
(kafka.network.security.SecureAuth$)

and the broker gave me this:
[2014-07-23 20:45:24,114] INFO begin ssl handshake for
n5.example.com/192.168.1.144:48817//192.168.1.144:9092
(kafka.network.security.SSLSocketChannel)
[2014-07-23 20:45:24,374] INFO finished ssl handshake for
n5.example.com/192.168.1.144:48817//192.168.1.144:9092
(kafka.network.security.SSLSocketChannel)
[2014-07-23 20:45:24,493] INFO Closing socket connection to
n5.example.com/192.168.1.144. (kafka.network.Processor)
[2014-07-23 20:45:24,555] INFO begin ssl handshake for
n5.example.com/192.168.1.144:48818//192.168.1.144:9092
(kafka.network.security.SSLSocketChannel)
[2014-07-23 20:45:24,566] INFO finished ssl handshake for
n5.example.com/192.168.1.144:48818//192.168.1.144:9092
(kafka.network.security.SSLSocketChannel)

It's like it did the SSL piece twice :)

Subsequent puts to the topic did not exhibit this behavior though:

root@n5[937]:~/kafka_2.10-0-8-2-0.1.0.0> bin/kafka-console-producer.sh
--broker-list n5:9092 --secure --client.security.file
config/client.security.properties --topic test
[2014-07-23 20:45:17,530] WARN Property topic is not valid
(kafka.utils.VerifiableProperties)
1
[2014-07-23 20:45:24,509] WARN Attempt to reinitialize auth context
(kafka.network.security.SecureAuth$)
2
3
4

Consuming worked with these options:

root@n5[918]:~/kafka_2.10-0-8-2-0.1.0.0> bin/kafka-console-consumer.sh
--topic test --zookeeper n5:2181 --from-beginning --security.config.file
config/client.security.properties
1
2
3
4
^CConsumed 5 messages

I hope that helps!
Chris


On Tue, Jul 22, 2014 at 2:10 PM, Pramod Deshmukh  wrote:

> Anyone getting this issue. Is it something related to environment or it is
> the code. Producer works fine when run with secure=false (no security)
> mode.
>
>
> pdeshmukh$ bin/kafka-console-producer.sh --broker-list localhost:9092:true
> --topic secureTopic
>
> [2014-07-18 13:12:29,817] WARN Property topic is not valid
> (kafka.utils.VerifiableProperties)
>
> Hare Krishna
>
> [2014-07-18 13:12:45,256] WARN Fetching topic metadata with correlation id
> 0 for topics [Set(secureTopic)] from broker
> [id:0,host:localhost,port:9092,secure:true] failed
> (kafka.client.ClientUtils$)
>
> java.io.EOFException: Received -1 when reading from channel, socket has
> likely been closed.
>
> at kafka.utils.Utils$.read(Utils.scala:381)
>
> at
>
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:67)
>
> at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>
> at
>
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>
> at kafka.network.BlockingChannel.receive(BlockingChannel.scala:102)
>
> at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:79)
>
> at
>
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:76)
>
> at kafka.producer.SyncProducer.send(SyncProducer.scala:117)
>
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
>
> at
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
>
> at
>
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
>
> at kafka.utils.Utils$.swallow(Utils.scala:172)
>
> at kafka.utils.Logging$class.swallowError(Logging.scala:106)
>
> at kafka.utils.Utils$.swallowError(Utils.scala:45)
>
> at
>
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
>
> at
>
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
>
> at
>
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
>
> at
>
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
>
> at scala.collection.immutable.Stream.foreach(Stream.scala:526)
>
> at
>
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
>
> at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
>
>
> On Fri, Jul 18, 2014 at 1:20 PM, Pramod Deshmukh 
> wrote:
>
> > Thanks Joe, I don't see any Out of memory error. Now I get exception when
> > Producer fetches metadata for a topic
> >
> > Here is how I created the

Re: Kafka consumer per topic

2014-07-23 Thread Philip O'Toole
How many partitions in your topic? Are you talking about Producing or 
Consuming? All those factors will determine the number of TCP connections to 
your Kafka cluster.


In any event, Kafka can support lots, and lots, and lots, of connections (I've 
run systems with hundreds of connections to a 3-broker Kafka cluster). Almost 
certainly you'll hit bottlenecks elsewhere in your system before you cause 
Kafka problems. If in doubt, be sure to monitor your Kafka system (CPU, IO, RAM 
etc, etc). Be sure to keep an eye on Zookeeper too, as it also has more work to 
do, as the number of connections to Kafka increases.


Philip

 
-
http://www.philipotoole.com 


On Wednesday, July 23, 2014 10:43 AM, Nickolas Simi  wrote:
 


Hello All,

I hope that this is the right place for this question, I am trying to determine 
if I have a separate connection per kafka topic that I want to consume if that 
would cause any performance, or usage problems for my kafka servers or the 
clients?

Thank you,

Nick

The information and attachments in this email may 
contain privileged, proprietary and confidential 
information for its intended recipients.  

If you have received this email in error, please 
notify the sender and delete the email.  

The contents of this message are subject to 
written reconfirmation from an authorized 
representative of blinkx.

Kafka consumer per topic

2014-07-23 Thread Nickolas Simi
Hello All,

I hope that this is the right place for this question, I am trying to determine 
if I have a separate connection per kafka topic that I want to consume if that 
would cause any performance, or usage problems for my kafka servers or the 
clients?

Thank you,

Nick

The information and attachments in this email may 
contain privileged, proprietary and confidential 
information for its intended recipients.  

If you have received this email in error, please 
notify the sender and delete the email.  

The contents of this message are subject to 
written reconfirmation from an authorized 
representative of blinkx.

Re: much reduced io utilization after upgrade to 0.8.0 -> 0.8.1.1

2014-07-23 Thread Jason Rosenberg
Thanks for the improvement!
(I'm not explicitly configuring fsync policy)

Jason


On Wed, Jul 23, 2014 at 12:33 PM, Jay Kreps  wrote:

> Yes, it could definitely be related to KAFKA-615. The default in 0.8.1
> is to let the OS handle disk writes. This is much more efficient as it
> will schedule them in an order friendly to the layout on disk and do a
> good job of merging adjacent writes. However if you are explicitly
> configuring an fsync policy (either by time or number of messages)
> then this is likely not the cause.
>
> -Jay
>
> On Tue, Jul 22, 2014 at 9:37 PM, Jason Rosenberg  wrote:
> > I recently upgraded some of our kafka clusters to use 0.8.1.1 (from
> 0.8.0).
> >  It's all looking good so far.  One thing I notice though (seems like a
> > good thing) is that the iostat utilization has gone way down after the
> > upgrade.
> >
> > I'm not sure if I know exactly what could could be responsible for this,
> is
> > this an expected result.
> >
> > Is it possibly related to:
> https://issues.apache.org/jira/browse/KAFKA-615
> >
> > Thanks,
> >
> > Jason
>


Re: much reduced io utilization after upgrade to 0.8.0 -> 0.8.1.1

2014-07-23 Thread Jay Kreps
Yes, it could definitely be related to KAFKA-615. The default in 0.8.1
is to let the OS handle disk writes. This is much more efficient as it
will schedule them in an order friendly to the layout on disk and do a
good job of merging adjacent writes. However if you are explicitly
configuring an fsync policy (either by time or number of messages)
then this is likely not the cause.

-Jay

On Tue, Jul 22, 2014 at 9:37 PM, Jason Rosenberg  wrote:
> I recently upgraded some of our kafka clusters to use 0.8.1.1 (from 0.8.0).
>  It's all looking good so far.  One thing I notice though (seems like a
> good thing) is that the iostat utilization has gone way down after the
> upgrade.
>
> I'm not sure if I know exactly what could could be responsible for this, is
> this an expected result.
>
> Is it possibly related to:  https://issues.apache.org/jira/browse/KAFKA-615
>
> Thanks,
>
> Jason


Re: much reduced io utilization after upgrade to 0.8.0 -> 0.8.1.1

2014-07-23 Thread Neha Narkhede
Yes, that is most likely the improvement due to which you see the drop in
io utilization, though there were several improvements since 0.8.0 that
could've helped as well.

Thanks,
Neha


On Tue, Jul 22, 2014 at 9:37 PM, Jason Rosenberg  wrote:

> I recently upgraded some of our kafka clusters to use 0.8.1.1 (from 0.8.0).
>  It's all looking good so far.  One thing I notice though (seems like a
> good thing) is that the iostat utilization has gone way down after the
> upgrade.
>
> I'm not sure if I know exactly what could could be responsible for this, is
> this an expected result.
>
> Is it possibly related to:
> https://issues.apache.org/jira/browse/KAFKA-615
>
> Thanks,
>
> Jason
>


Re: how to ensure strong consistency with reasonable availabilit

2014-07-23 Thread Jiang Wu (Pricehistory) (BLOOMBERG/ 731 LEX -)
Jun,
There're still other concerns regarding ack=-1. A single disk failure may cause 
data loss for ack=-1. When 2 out 3 brokers fail out of ISR, acknowledged 
messages may be stored in the leader only. If the leader disk failure happens, 
then these messages are lost. In a less severe situtation where the leader is 
temporarily unavailable, message loss or service blocking may happen, depending 
on whether unclean leader election is enabled.

I didn't see message loss in my tests in case 3.1 where 2 brokers are restarted 
in sequence. I had though it would happen because lagged broker could join in 
ISR, but as you explained it's not the case. Thanks for your clarification.

I filed a jira to record the discussion on this topic 
https://issues.apache.org/jira/i#browse/KAFKA-1555.

Regards,
Jiang

- Original Message -
From: jun...@gmail.com
To: JIANG WU (PRICEHISTORY) (BLOOMBERG/ 731 LEX -), users@kafka.apache.org
At: Jul 22 2014 17:59:55

Jiang,

In case 3.1, when C restarts, the protocol is that C can only join ISR if
it has received all messages up to the current high watermark.

For example, let's assume that M is 10. Let's say A, B, C all have messages
at offset 100 and all those messages are committed (therefore high
watermark is at 100). Then C dies. After that, we commit 5 more messages
with both A and B (high watermark is at 105). Now, C is restarted. C is
actually not allowed to rejoin ISR until its log end offset has passed 105.
This means that C must first fetch the 5 newly committed messages before
being added to ISR.

Are you observing data loss in this case?

Thanks,

Jun


On Tue, Jul 22, 2014 at 6:21 AM, Jiang Wu (Pricehistory) (BLOOMBERG/ 731
LEX -)  wrote:

> kafka-1028 addressed another unclean leader election problem. It prevents
> a broker not in ISR from becoming a leader. The problem we are facing is
> that a broker in ISR but without complete messages may become a leader.
> It's also a kind of unclean leader election, but not the one that
> kafka-1028 addressed.
>
> Here I'm trying to give a proof that current kafka doesn't achieve the
> requirement (no message loss, no blocking when 1 broker down) due to its
> two behaviors:
> 1. when choosing a new leader from 2 followers in ISR, the one with less
> messages may be chosen as the leader
> 2. even when replica.lag.max.messages=0, a follower can stay in ISR when
> it has less messages than the leader.
>
> We consider a cluster with 3 brokers and a topic with 3 replicas. We
> analyze different cases according to the value of request.required.acks
> (acks for short). For each case and it subcases, we find situations that
> either message loss or service blocking happens. We assume that at the
> beginning, all 3 replicas, leader A, followers B and C, are in sync, i.e.,
> they have the same messages and are all in ISR.
>
> 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
> 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At
> this time, although C hasn't received m, it's still in ISR. If A is killed,
> C can be elected as the new leader, and consumers will miss m.
> 3. acks=-1. Suppose replica.lag.max.messages=M. There are two sub-cases:
> 3.1 M>0. Suppose C be killed. C will be out of ISR after
> replica.lag.time.max.ms. Then the producer publishes M messages to A and
> B. C restarts. C will join in ISR since it is M messages behind A and B.
> Before C replicates all messages, A is killed, and C becomes leader, then
> message loss happens.
> 3.2 M=0. In this case, when the producer publishes at a high speed, B and
> C will fail out of ISR, only A keeps receiving messages. Then A is killed.
> Either message loss or service blocking will happen, depending on whether
> unclean leader election is enabled.
>
>
> From: users@kafka.apache.org At: Jul 21 2014 22:28:18
> To: JIANG WU (PRICEHISTORY) (BLOOMBERG/ 731 LEX -), users@kafka.apache.org
> Subject: Re: how to ensure strong consistency with reasonable availability
>
> You will probably need 0.8.2  which gives
> https://issues.apache.org/jira/browse/KAFKA-1028
>
>
> On Mon, Jul 21, 2014 at 6:37 PM, Jiang Wu (Pricehistory) (BLOOMBERG/ 731
> LEX -)  wrote:
>
> > Hi everyone,
> >
> > With a cluster of 3 brokers and a topic of 3 replicas, we want to achieve
> > the following two properties:
> > 1. when only one broker is down, there's no message loss, and
> > procuders/consumers are not blocked.
> > 2. in other more serious problems, for example, one broker is restarted
> > twice in a short period or two brokers are down at the same time,
> > producers/consumers can be blocked, but no message loss is allowed.
> >
> > We haven't found any producer/broker paramter combinations that achieve
> > this. If you know or think some configurations will work, please post
> > details. We have a test bed to verify any given configurations.
> >
> > In addition, I'm wondering if it's necessary to open a jira to require
> the
> > above feature?
> >
> > Than