Max. storage for Kafka and impact

2014-12-19 Thread Achanta Vamsi Subhash
Hi,

We are using Kafka for our messaging system and we have an estimate for 200
TB/week in the coming months. Will it impact any performance for Kafka?

PS: We will be having greater than 2 lakh partitions.

-- 
Regards
Vamsi Subhash


Re: Max. storage for Kafka and impact

2014-12-19 Thread Achanta Vamsi Subhash
We definitely need a retention policy of a week. Hence.

On Fri, Dec 19, 2014 at 7:40 PM, Achanta Vamsi Subhash 
achanta.va...@flipkart.com wrote:

 Hi,

 We are using Kafka for our messaging system and we have an estimate for
 200 TB/week in the coming months. Will it impact any performance for Kafka?

 PS: We will be having greater than 2 lakh partitions.

 --
 Regards
 Vamsi Subhash



-- 
Regards
Vamsi Subhash


Re: Max. storage for Kafka and impact

2014-12-19 Thread nitin sharma
hi,

Few things you have to plan for:
a. Ensure that from resilience point of view, you are having sufficient
follower brokers for your partitions.
b. In my testing of kafka (50TB/week) so far, haven't seen much issue with
CPU utilization or memory. I had 24 CPU and 32GB RAM.
c. 200,000 partitions means around 1MB/week/partition. are you sure you
need so many partitions?

Regards,
Nitin Kumar Sharma.


On Fri, Dec 19, 2014 at 9:12 AM, Achanta Vamsi Subhash 
achanta.va...@flipkart.com wrote:

 We definitely need a retention policy of a week. Hence.

 On Fri, Dec 19, 2014 at 7:40 PM, Achanta Vamsi Subhash 
 achanta.va...@flipkart.com wrote:
 
  Hi,
 
  We are using Kafka for our messaging system and we have an estimate for
  200 TB/week in the coming months. Will it impact any performance for
 Kafka?
 
  PS: We will be having greater than 2 lakh partitions.
 
  --
  Regards
  Vamsi Subhash
 


 --
 Regards
 Vamsi Subhash



Re: Max. storage for Kafka and impact

2014-12-19 Thread Achanta Vamsi Subhash
Yes. We need those many max partitions as we have a central messaging
service and thousands of topics.

On Friday, December 19, 2014, nitin sharma kumarsharma.ni...@gmail.com
wrote:

 hi,

 Few things you have to plan for:
 a. Ensure that from resilience point of view, you are having sufficient
 follower brokers for your partitions.
 b. In my testing of kafka (50TB/week) so far, haven't seen much issue with
 CPU utilization or memory. I had 24 CPU and 32GB RAM.
 c. 200,000 partitions means around 1MB/week/partition. are you sure you
 need so many partitions?

 Regards,
 Nitin Kumar Sharma.


 On Fri, Dec 19, 2014 at 9:12 AM, Achanta Vamsi Subhash 
 achanta.va...@flipkart.com javascript:; wrote:
 
  We definitely need a retention policy of a week. Hence.
 
  On Fri, Dec 19, 2014 at 7:40 PM, Achanta Vamsi Subhash 
  achanta.va...@flipkart.com javascript:; wrote:
  
   Hi,
  
   We are using Kafka for our messaging system and we have an estimate for
   200 TB/week in the coming months. Will it impact any performance for
  Kafka?
  
   PS: We will be having greater than 2 lakh partitions.
  
   --
   Regards
   Vamsi Subhash
  
 
 
  --
  Regards
  Vamsi Subhash
 



-- 
Regards
Vamsi Subhash


Re: Max. storage for Kafka and impact

2014-12-19 Thread Joe Stein
see some comments inline

On Fri, Dec 19, 2014 at 11:30 AM, Achanta Vamsi Subhash 
achanta.va...@flipkart.com wrote:

 We require:
 - many topics
 - ordering of messages for every topic


Ordering is only on a per partition basis so you might have to pick a
partition key that makes sense for what you are doing.


 - Consumers hit different Http EndPoints which may be slow (in a push
 model). In case of a Pull model, consumers may pull at the rate at which
 they can process.
 - We need parallelism to hit with as many consumers. Hence, we currently
 have around 50 consumers/topic = 50 partitions.


I think you might be mixing up the fetch with the processing. You can have
1 partition and still have 50 message being processed in parallel (so a
batch of messages).

What language are you working in? How are you doing this processing
exactly?



 Currently we have:
 2000 topics x 50 = 1,00,000 partitions.


If this is really the case then you are going to need at least 250 brokers
(~ 4,000 partitions per broker).

If you do that then you are in the 200TB per day world which doesn't sound
to be the case.

I really think you need to strategize more on your processing model some
more.



 The incoming rate of ingestion at max is 100 MB/sec. We are planning for a
 big cluster with many brokers.


It is possible to handle this on just 3 brokers depending on message size,
ability to batch, durability are also factors you really need to be
thinking about.



 We have exactly the same use cases as mentioned in this video (usage at
 LinkedIn):
 https://www.youtube.com/watch?v=19DvtEC0EbQ​

 ​To handle the zookeeper scenario, as mentioned in the above video, we are
 planning to use SSDs​ and would upgrade to the new consumer (0.9+) once its
 available as per the below video.
 https://www.youtube.com/watch?v=7TZiN521FQA

 On Fri, Dec 19, 2014 at 9:06 PM, Jayesh Thakrar
 j_thak...@yahoo.com.invalid
  wrote:

  Technically/conceptually it is possible to have 200,000 topics, but do
 you
  really need it like that?What do you intend to do with those messages -
  i.e. how do you forsee them being processed downstream? And are those
  topics really there to segregate different kinds of processing or
 different
  ids?E.g. if you were LinkedIn, Facebook or Google, would you have have
 one
  topic per user or one topic per kind of event (e.g. login, pageview,
  adview, etc.)Remember there is significant book-keeping done within
  Zookeeper - and these many topics will make that book-keeping
 significant.
  As for storage, I don't think it should be an issue with sufficient
  spindles, servers and higher than default memory configuration.
  Jayesh
From: Achanta Vamsi Subhash achanta.va...@flipkart.com
   To: users@kafka.apache.org users@kafka.apache.org
   Sent: Friday, December 19, 2014 9:00 AM
   Subject: Re: Max. storage for Kafka and impact
 
  Yes. We need those many max partitions as we have a central messaging
  service and thousands of topics.
 
  On Friday, December 19, 2014, nitin sharma kumarsharma.ni...@gmail.com
  wrote:
 
   hi,
  
   Few things you have to plan for:
   a. Ensure that from resilience point of view, you are having sufficient
   follower brokers for your partitions.
   b. In my testing of kafka (50TB/week) so far, haven't seen much issue
  with
   CPU utilization or memory. I had 24 CPU and 32GB RAM.
   c. 200,000 partitions means around 1MB/week/partition. are you sure you
   need so many partitions?
  
   Regards,
   Nitin Kumar Sharma.
  
  
   On Fri, Dec 19, 2014 at 9:12 AM, Achanta Vamsi Subhash 
   achanta.va...@flipkart.com javascript:; wrote:
   
We definitely need a retention policy of a week. Hence.
   
On Fri, Dec 19, 2014 at 7:40 PM, Achanta Vamsi Subhash 
achanta.va...@flipkart.com javascript:; wrote:

 Hi,

 We are using Kafka for our messaging system and we have an estimate
  for
 200 TB/week in the coming months. Will it impact any performance
 for
Kafka?

 PS: We will be having greater than 2 lakh partitions.
 
 

 --
 Regards
 Vamsi Subhash

   
   
--
Regards
Vamsi Subhash
   
  
 
 
  --
  Regards
  Vamsi Subhash
 
 
 
 



 --
 Regards
 Vamsi Subhash



Re: Max. storage for Kafka and impact

2014-12-19 Thread Joe Stein
Wait, how do you get 2,000 topics each with 50 partitions == 1,000,000
partitions? I think you can take what I said below and change my 250 to 25
as I went with your result (1,000,000) and not your arguments (2,000 x 50).

And you should think on the processing as a separate step from fetch and
commit your offset in batch post processing. Then you only need more
partitions to fetch batches to process in parallel.

Regards, Joestein

On Fri, Dec 19, 2014 at 12:01 PM, Joe Stein joe.st...@stealth.ly wrote:

 see some comments inline

 On Fri, Dec 19, 2014 at 11:30 AM, Achanta Vamsi Subhash 
 achanta.va...@flipkart.com wrote:

 We require:
 - many topics
 - ordering of messages for every topic


 Ordering is only on a per partition basis so you might have to pick a
 partition key that makes sense for what you are doing.


 - Consumers hit different Http EndPoints which may be slow (in a push
 model). In case of a Pull model, consumers may pull at the rate at which
 they can process.
 - We need parallelism to hit with as many consumers. Hence, we currently
 have around 50 consumers/topic = 50 partitions.


 I think you might be mixing up the fetch with the processing. You can have
 1 partition and still have 50 message being processed in parallel (so a
 batch of messages).

 What language are you working in? How are you doing this processing
 exactly?



 Currently we have:
 2000 topics x 50 = 1,00,000 partitions.


 If this is really the case then you are going to need at least 250 brokers
 (~ 4,000 partitions per broker).

 If you do that then you are in the 200TB per day world which doesn't sound
 to be the case.

 I really think you need to strategize more on your processing model some
 more.



 The incoming rate of ingestion at max is 100 MB/sec. We are planning for a
 big cluster with many brokers.


 It is possible to handle this on just 3 brokers depending on message size,
 ability to batch, durability are also factors you really need to be
 thinking about.



 We have exactly the same use cases as mentioned in this video (usage at
 LinkedIn):
 https://www.youtube.com/watch?v=19DvtEC0EbQ​

 ​To handle the zookeeper scenario, as mentioned in the above video, we are
 planning to use SSDs​ and would upgrade to the new consumer (0.9+) once
 its
 available as per the below video.
 https://www.youtube.com/watch?v=7TZiN521FQA

 On Fri, Dec 19, 2014 at 9:06 PM, Jayesh Thakrar
 j_thak...@yahoo.com.invalid
  wrote:

  Technically/conceptually it is possible to have 200,000 topics, but do
 you
  really need it like that?What do you intend to do with those messages -
  i.e. how do you forsee them being processed downstream? And are those
  topics really there to segregate different kinds of processing or
 different
  ids?E.g. if you were LinkedIn, Facebook or Google, would you have have
 one
  topic per user or one topic per kind of event (e.g. login, pageview,
  adview, etc.)Remember there is significant book-keeping done within
  Zookeeper - and these many topics will make that book-keeping
 significant.
  As for storage, I don't think it should be an issue with sufficient
  spindles, servers and higher than default memory configuration.
  Jayesh
From: Achanta Vamsi Subhash achanta.va...@flipkart.com
   To: users@kafka.apache.org users@kafka.apache.org
   Sent: Friday, December 19, 2014 9:00 AM
   Subject: Re: Max. storage for Kafka and impact
 
  Yes. We need those many max partitions as we have a central messaging
  service and thousands of topics.
 
  On Friday, December 19, 2014, nitin sharma kumarsharma.ni...@gmail.com
 
  wrote:
 
   hi,
  
   Few things you have to plan for:
   a. Ensure that from resilience point of view, you are having
 sufficient
   follower brokers for your partitions.
   b. In my testing of kafka (50TB/week) so far, haven't seen much issue
  with
   CPU utilization or memory. I had 24 CPU and 32GB RAM.
   c. 200,000 partitions means around 1MB/week/partition. are you sure
 you
   need so many partitions?
  
   Regards,
   Nitin Kumar Sharma.
  
  
   On Fri, Dec 19, 2014 at 9:12 AM, Achanta Vamsi Subhash 
   achanta.va...@flipkart.com javascript:; wrote:
   
We definitely need a retention policy of a week. Hence.
   
On Fri, Dec 19, 2014 at 7:40 PM, Achanta Vamsi Subhash 
achanta.va...@flipkart.com javascript:; wrote:

 Hi,

 We are using Kafka for our messaging system and we have an
 estimate
  for
 200 TB/week in the coming months. Will it impact any performance
 for
Kafka?

 PS: We will be having greater than 2 lakh partitions.
 
 

 --
 Regards
 Vamsi Subhash

   
   
--
Regards
Vamsi Subhash
   
  
 
 
  --
  Regards
  Vamsi Subhash
 
 
 
 



 --
 Regards
 Vamsi Subhash




Re: Kafka 0.8.2 new producer blocking on metadata

2014-12-19 Thread Paul Pearcy
Hi Jay,
  Many thanks for the info. All that makes sense, but from an API
standpoint when something is labelled async and returns a Future, this will
be misconstrued and developers will place async sends in critical client
facing request/response pathways of code that should never block. If the
app comes up with a bad config, it will hang all incoming connections.

Obviously, there is a spectrum of use cases with regard to message loss and
the defaults cannot cater to all use cases. I like that the defaults tend
towards best effort guarantees, but I am not sure it justifies the
inconsistency in the API.

1) It sounds like the client is already structured to handle changes in
partitions on the fly, I am sure I am over simplifying but in the case
where no meta is available, but my naive approach would be assume some
number of partitions and then when there is metadata treat it as a
partition change event. If there are more unknown than just partition
count, probably won't work.
2) Pretty much makes sense, especially now that I see people on this
discussion list wanting a million topics (good luck)
3) I agree client creation shouldn't fail, but any sends should probably
fast fail or have it explicit on the call the choice you are making.

I'm still thinking about how I am going to make the client behave as I'd
like. I think I need a background process kicked off on startup to prime
the topics I am interested in. Until that process completes, any sends
through the producer will need to fast fail instead of hang. This would
still leave the window for blocking if you send to a topic your app wasn't
aware it would send to, but now we're getting into corner corner cases.

Would having something like that be a baked in option be accepted into
Kafka clients mainline?

A quick win might be to clarify the documentation so that it is clear that
this API will block in cases XYZ (maybe this is mentioned somewhere and I
missed it).

Thanks,
Paul


On Thu, Dec 18, 2014 at 1:17 PM, Jay Kreps j...@confluent.io wrote:

 Hey Paul,

 Here are the constraints:
 1. We wanted the storage of messages to be in their compact binary form so
 we could bound memory usage. This implies partitioning prior to enqueue.
 And as you note partitioning requires having metadata (even stale metadata)
 about topics.
 2. We wanted to avoid prefetching metadata for all topics since there may
 be quite a lot of topics.
 3. We wanted to make metadata fetching lazy so that it would be possible to
 create a client without having an active network connection. This tends to
 be important when services are brought up in development or test
 environments where it is annoying to have to control the dependency graph
 when starting things.

 This blocking isn't too bad as it only occurs on the first request for each
 topic. Our feeling was that many things tend to get setup on a first
 request (DB connections are established, caches populated, etc) so this was
 not unreasonable.

 If you want to pre-initialize the metadata to avoid blocking on the first
 request you can do so by fetching the metadata using the
 producer.partitionsFor(topic) api at start-up.

 -Jay

 On Thu, Dec 18, 2014 at 9:07 AM, Paul Pearcy ppea...@gmail.com wrote:
 
  Hello,
 
Playing around with the 0.8.2-beta producer client. One of my test
 cases
  is to ensure producers can deal with Kafka being down when the producer
 is
  created. My tests failed miserably because of the default blocking in the
  producer with regard to metadata.fetch.timeout.ms. The first line of new
  producer is waitOnMetadata which is blocking.
 
  I can handle this case by loading topic meta on init and setting the
  timeout value to very low metadata.fetch.timeout.ms and either throwing
  away messages or creating my own internal queue to buffer.
 
  I’m surprised the metasync isn’t done async. If it fails, return that in
  the future/callback. This way the API could actually be considered safely
  async and the producer buffer could try to hold on to things until
  block.on.buffer.full kicks in to either drop messages or block. You’d
  probably need a partition callback since numPartitions wouldn’t be
  available.
 
  The implication is that people's apps will work fine if first messages
 are
  sent while kafka server is up, however, if kafka is down and they restart
  their app, the new producer will block all sends and blow things up if
 you
  haven't written your app to be aware of this edge case.
 
 
  Thanks,
 
  Paul
 



Re: Max. storage for Kafka and impact

2014-12-19 Thread Achanta Vamsi Subhash
Joe,

- Correction, it's 1,00,000 partitions
- We can have at max only 1 consumer/partition. Not 50 per 1 partition.
Yes, we have a hashing mechanism to support future partition increase as
well. We override the Default Partitioner.
- We use both Simple and HighLevel consumers depending on the consumption
use-case.
- I clearly mentioned that 200 TB/week and not a day.
- We have separate producers and consumers, each operating as different
processes in different machines.

I was explaining why we may end up with so many partitions. I think the
question about 200 TB/day got deviated.

Any suggestions reg. the performance impact of the 200TB/week?

On Fri, Dec 19, 2014 at 10:53 PM, Joe Stein joe.st...@stealth.ly wrote:

 Wait, how do you get 2,000 topics each with 50 partitions == 1,000,000
 partitions? I think you can take what I said below and change my 250 to 25
 as I went with your result (1,000,000) and not your arguments (2,000 x 50).

 And you should think on the processing as a separate step from fetch and
 commit your offset in batch post processing. Then you only need more
 partitions to fetch batches to process in parallel.

 Regards, Joestein

 On Fri, Dec 19, 2014 at 12:01 PM, Joe Stein joe.st...@stealth.ly wrote:
 
  see some comments inline
 
  On Fri, Dec 19, 2014 at 11:30 AM, Achanta Vamsi Subhash 
  achanta.va...@flipkart.com wrote:
 
  We require:
  - many topics
  - ordering of messages for every topic
 
 
  Ordering is only on a per partition basis so you might have to pick a
  partition key that makes sense for what you are doing.
 
 
  - Consumers hit different Http EndPoints which may be slow (in a push
  model). In case of a Pull model, consumers may pull at the rate at which
  they can process.
  - We need parallelism to hit with as many consumers. Hence, we currently
  have around 50 consumers/topic = 50 partitions.
 
 
  I think you might be mixing up the fetch with the processing. You can
 have
  1 partition and still have 50 message being processed in parallel (so a
  batch of messages).
 
  What language are you working in? How are you doing this processing
  exactly?
 
 
 
  Currently we have:
  2000 topics x 50 = 1,00,000 partitions.
 
 
  If this is really the case then you are going to need at least 250
 brokers
  (~ 4,000 partitions per broker).
 
  If you do that then you are in the 200TB per day world which doesn't
 sound
  to be the case.
 
  I really think you need to strategize more on your processing model some
  more.
 
 
 
  The incoming rate of ingestion at max is 100 MB/sec. We are planning
 for a
  big cluster with many brokers.
 
 
  It is possible to handle this on just 3 brokers depending on message
 size,
  ability to batch, durability are also factors you really need to be
  thinking about.
 
 
 
  We have exactly the same use cases as mentioned in this video (usage at
  LinkedIn):
  https://www.youtube.com/watch?v=19DvtEC0EbQ​
 
  ​To handle the zookeeper scenario, as mentioned in the above video, we
 are
  planning to use SSDs​ and would upgrade to the new consumer (0.9+) once
  its
  available as per the below video.
  https://www.youtube.com/watch?v=7TZiN521FQA
 
  On Fri, Dec 19, 2014 at 9:06 PM, Jayesh Thakrar
  j_thak...@yahoo.com.invalid
   wrote:
 
   Technically/conceptually it is possible to have 200,000 topics, but do
  you
   really need it like that?What do you intend to do with those messages
 -
   i.e. how do you forsee them being processed downstream? And are those
   topics really there to segregate different kinds of processing or
  different
   ids?E.g. if you were LinkedIn, Facebook or Google, would you have have
  one
   topic per user or one topic per kind of event (e.g. login, pageview,
   adview, etc.)Remember there is significant book-keeping done within
   Zookeeper - and these many topics will make that book-keeping
  significant.
   As for storage, I don't think it should be an issue with sufficient
   spindles, servers and higher than default memory configuration.
   Jayesh
 From: Achanta Vamsi Subhash achanta.va...@flipkart.com
To: users@kafka.apache.org users@kafka.apache.org
Sent: Friday, December 19, 2014 9:00 AM
Subject: Re: Max. storage for Kafka and impact
  
   Yes. We need those many max partitions as we have a central messaging
   service and thousands of topics.
  
   On Friday, December 19, 2014, nitin sharma 
 kumarsharma.ni...@gmail.com
  
   wrote:
  
hi,
   
Few things you have to plan for:
a. Ensure that from resilience point of view, you are having
  sufficient
follower brokers for your partitions.
b. In my testing of kafka (50TB/week) so far, haven't seen much
 issue
   with
CPU utilization or memory. I had 24 CPU and 32GB RAM.
c. 200,000 partitions means around 1MB/week/partition. are you sure
  you
need so many partitions?
   
Regards,
Nitin Kumar Sharma.
   
   
On Fri, Dec 19, 2014 at 9:12 AM, Achanta Vamsi Subhash 

The purpose of key in kafka

2014-12-19 Thread Rajiv Kurian
Hi all,

I was wondering what why every ProducerRecord sent requires a serialized
key. I am using kafka, to send opaque bytes and I am ending up creating
garbage keys because I don't really have a good one.

Thanks,
Rajiv


Re: The purpose of key in kafka

2014-12-19 Thread Jiangjie Qin
Hi Rajiv,

You can send messages without keys. Just provide null for key.

Jiangjie (Becket) Qin


On 12/19/14, 10:14 AM, Rajiv Kurian ra...@signalfuse.com wrote:

Hi all,

I was wondering what why every ProducerRecord sent requires a serialized
key. I am using kafka, to send opaque bytes and I am ending up creating
garbage keys because I don't really have a good one.

Thanks,
Rajiv



Re: Kafka 0.8.2 new producer blocking on metadata

2014-12-19 Thread Jay Kreps
Hey Paul,

I agree we should document this better.

We allow and encourage using partitions to semantically distribute data. So
unfortunately we can't just arbitrarily assign a partition (say 0) as that
would actually give incorrect answers for any consumer that made use of the
partitioning. It is true that the user can change the partitioning, but we
can't ignore the partitioning they have set.

I get the use case you have--you basically want a hard guarantee that
send() will never block (so presumably you have set to also drop data if
the buffer fills up). As I said the blocking only occurs on the first
request for a given topic and you can avoid it by pre-initializing the
topic metadata.

I think the option you describe is actually possible now. Basically you can
initialize the metadata for topics you care about using that
partitionsFor() call. If you set the property metadata.fetch.timeout.ms=0
then any send calls prior to the completion of metadata initialization will
fail immediately rather than block.

-Jay


On Fri, Dec 19, 2014 at 9:32 AM, Paul Pearcy ppea...@gmail.com wrote:

 Hi Jay,
   Many thanks for the info. All that makes sense, but from an API
 standpoint when something is labelled async and returns a Future, this will
 be misconstrued and developers will place async sends in critical client
 facing request/response pathways of code that should never block. If the
 app comes up with a bad config, it will hang all incoming connections.

 Obviously, there is a spectrum of use cases with regard to message loss and
 the defaults cannot cater to all use cases. I like that the defaults tend
 towards best effort guarantees, but I am not sure it justifies the
 inconsistency in the API.

 1) It sounds like the client is already structured to handle changes in
 partitions on the fly, I am sure I am over simplifying but in the case
 where no meta is available, but my naive approach would be assume some
 number of partitions and then when there is metadata treat it as a
 partition change event. If there are more unknown than just partition
 count, probably won't work.
 2) Pretty much makes sense, especially now that I see people on this
 discussion list wanting a million topics (good luck)
 3) I agree client creation shouldn't fail, but any sends should probably
 fast fail or have it explicit on the call the choice you are making.

 I'm still thinking about how I am going to make the client behave as I'd
 like. I think I need a background process kicked off on startup to prime
 the topics I am interested in. Until that process completes, any sends
 through the producer will need to fast fail instead of hang. This would
 still leave the window for blocking if you send to a topic your app wasn't
 aware it would send to, but now we're getting into corner corner cases.

 Would having something like that be a baked in option be accepted into
 Kafka clients mainline?

 A quick win might be to clarify the documentation so that it is clear that
 this API will block in cases XYZ (maybe this is mentioned somewhere and I
 missed it).

 Thanks,
 Paul


 On Thu, Dec 18, 2014 at 1:17 PM, Jay Kreps j...@confluent.io wrote:
 
  Hey Paul,
 
  Here are the constraints:
  1. We wanted the storage of messages to be in their compact binary form
 so
  we could bound memory usage. This implies partitioning prior to enqueue.
  And as you note partitioning requires having metadata (even stale
 metadata)
  about topics.
  2. We wanted to avoid prefetching metadata for all topics since there may
  be quite a lot of topics.
  3. We wanted to make metadata fetching lazy so that it would be possible
 to
  create a client without having an active network connection. This tends
 to
  be important when services are brought up in development or test
  environments where it is annoying to have to control the dependency graph
  when starting things.
 
  This blocking isn't too bad as it only occurs on the first request for
 each
  topic. Our feeling was that many things tend to get setup on a first
  request (DB connections are established, caches populated, etc) so this
 was
  not unreasonable.
 
  If you want to pre-initialize the metadata to avoid blocking on the first
  request you can do so by fetching the metadata using the
  producer.partitionsFor(topic) api at start-up.
 
  -Jay
 
  On Thu, Dec 18, 2014 at 9:07 AM, Paul Pearcy ppea...@gmail.com wrote:
  
   Hello,
  
 Playing around with the 0.8.2-beta producer client. One of my test
  cases
   is to ensure producers can deal with Kafka being down when the producer
  is
   created. My tests failed miserably because of the default blocking in
 the
   producer with regard to metadata.fetch.timeout.ms. The first line of
 new
   producer is waitOnMetadata which is blocking.
  
   I can handle this case by loading topic meta on init and setting the
   timeout value to very low metadata.fetch.timeout.ms and either
 throwing
   away messages or creating my own internal queue to 

Fwd: Help: KafkaSpout not getting data from Kafka

2014-12-19 Thread Banias H
Hi folks,

I am new to both Kafka and Storm and I have problem having KafkaSpout to
get data from Kafka in our three-node environment with Kafka 0.8.1.1 and
Storm 0.9.3.

What is working:
- I have a Kafka producer (a java application) to generate random string to
a topic and I was able to run the following command in one of the nodes to
read the random strings on the console while the Kafka producer is running:

kaffa folder/bin/kafka-console-consumer.sh --zookeeper
node1:2181,node2:2181,node3:2181/kafka --topic test_topic

- I was also able to run WordCountTopology in Storm.

What is not working:
- I tried running the following code based on KafkaTopology.java:

String zkNodes = node1:2181,node2:2181,node3:2181;
String brokerZkPath = /kafka/brokers;
String topicName = test_topic;
String zkRoot = /kafka;
String topoName = test_topology;

ZkHosts zkhost = new ZkHosts(zkNodes, brokerZkPath);
SpoutConfig kafkaConf = new SpoutConfig(zkhost, topicName, zkRoot,
discovery);
kafkaConf.scheme = new SchemeAsMultiScheme(new StringScheme());
kafkaConf.forceStartOffsetTime(-2);
KafkaSpout kafkaSpout = new KafkaSpout(kafkaConf);

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(spout, kafkaSpout, 1);
builder.setBolt(printer, new
PrinterBolt()).shuffleGrouping(spout2);

Config config = new Config();
config.setDebug(true);
config.setNumWorkers(3);
StormSubmitter.submitTopology(topoName, config,
builder.createTopology());

- Result:
In Storm UI, the numbers of emitted and transferred are always 0,
regardless of whether Kafka producer is running. See attached image. I

- Command to run:
storm folder/bin/storm jar storm-starter-0.9.3-jar-with-dependencies.jar
storm.starter.KafkaTopology

- Zookeeper Path:
[zk: node1:2181(CONNECTED) 73] ls /kafka
[consumers, config, controller, admin, brokers, controller_epoch]
[zk: node1:2181(CONNECTED) 74] ls /kafka/brokers
[topics, ids]

I have run out of ideas of trying different options and figuring out where
to look. If anyone could shed some light on this topic, I would greatly
appreciate it. Many thanks!

-BH


Re: Max. storage for Kafka and impact

2014-12-19 Thread Pradeep Gollakota
@Joe, Achanta is using Indian English numerals which is why it's a little
confusing. http://en.wikipedia.org/wiki/Indian_English#Numbering_system
1,00,000 [1 lakh] (Indian English) == 100,000 [1 hundred thousand] (The
rest of the world :P)

On Fri Dec 19 2014 at 9:40:29 AM Achanta Vamsi Subhash 
achanta.va...@flipkart.com wrote:

 Joe,

 - Correction, it's 1,00,000 partitions
 - We can have at max only 1 consumer/partition. Not 50 per 1 partition.
 Yes, we have a hashing mechanism to support future partition increase as
 well. We override the Default Partitioner.
 - We use both Simple and HighLevel consumers depending on the consumption
 use-case.
 - I clearly mentioned that 200 TB/week and not a day.
 - We have separate producers and consumers, each operating as different
 processes in different machines.

 I was explaining why we may end up with so many partitions. I think the
 question about 200 TB/day got deviated.

 Any suggestions reg. the performance impact of the 200TB/week?

 On Fri, Dec 19, 2014 at 10:53 PM, Joe Stein joe.st...@stealth.ly wrote:
 
  Wait, how do you get 2,000 topics each with 50 partitions == 1,000,000
  partitions? I think you can take what I said below and change my 250 to
 25
  as I went with your result (1,000,000) and not your arguments (2,000 x
 50).
 
  And you should think on the processing as a separate step from fetch and
  commit your offset in batch post processing. Then you only need more
  partitions to fetch batches to process in parallel.
 
  Regards, Joestein
 
  On Fri, Dec 19, 2014 at 12:01 PM, Joe Stein joe.st...@stealth.ly
 wrote:
  
   see some comments inline
  
   On Fri, Dec 19, 2014 at 11:30 AM, Achanta Vamsi Subhash 
   achanta.va...@flipkart.com wrote:
  
   We require:
   - many topics
   - ordering of messages for every topic
  
  
   Ordering is only on a per partition basis so you might have to pick a
   partition key that makes sense for what you are doing.
  
  
   - Consumers hit different Http EndPoints which may be slow (in a push
   model). In case of a Pull model, consumers may pull at the rate at
 which
   they can process.
   - We need parallelism to hit with as many consumers. Hence, we
 currently
   have around 50 consumers/topic = 50 partitions.
  
  
   I think you might be mixing up the fetch with the processing. You can
  have
   1 partition and still have 50 message being processed in parallel (so a
   batch of messages).
  
   What language are you working in? How are you doing this processing
   exactly?
  
  
  
   Currently we have:
   2000 topics x 50 = 1,00,000 partitions.
  
  
   If this is really the case then you are going to need at least 250
  brokers
   (~ 4,000 partitions per broker).
  
   If you do that then you are in the 200TB per day world which doesn't
  sound
   to be the case.
  
   I really think you need to strategize more on your processing model
 some
   more.
  
  
  
   The incoming rate of ingestion at max is 100 MB/sec. We are planning
  for a
   big cluster with many brokers.
  
  
   It is possible to handle this on just 3 brokers depending on message
  size,
   ability to batch, durability are also factors you really need to be
   thinking about.
  
  
  
   We have exactly the same use cases as mentioned in this video (usage
 at
   LinkedIn):
   https://www.youtube.com/watch?v=19DvtEC0EbQ​
  
   ​To handle the zookeeper scenario, as mentioned in the above video, we
  are
   planning to use SSDs​ and would upgrade to the new consumer (0.9+)
 once
   its
   available as per the below video.
   https://www.youtube.com/watch?v=7TZiN521FQA
  
   On Fri, Dec 19, 2014 at 9:06 PM, Jayesh Thakrar
   j_thak...@yahoo.com.invalid
wrote:
  
Technically/conceptually it is possible to have 200,000 topics, but
 do
   you
really need it like that?What do you intend to do with those
 messages
  -
i.e. how do you forsee them being processed downstream? And are
 those
topics really there to segregate different kinds of processing or
   different
ids?E.g. if you were LinkedIn, Facebook or Google, would you have
 have
   one
topic per user or one topic per kind of event (e.g. login, pageview,
adview, etc.)Remember there is significant book-keeping done within
Zookeeper - and these many topics will make that book-keeping
   significant.
As for storage, I don't think it should be an issue with sufficient
spindles, servers and higher than default memory configuration.
Jayesh
  From: Achanta Vamsi Subhash achanta.va...@flipkart.com
 To: users@kafka.apache.org users@kafka.apache.org
 Sent: Friday, December 19, 2014 9:00 AM
 Subject: Re: Max. storage for Kafka and impact
   
Yes. We need those many max partitions as we have a central
 messaging
service and thousands of topics.
   
On Friday, December 19, 2014, nitin sharma 
  kumarsharma.ni...@gmail.com
   
wrote:
   
 hi,

 Few things you have to plan for:
 a. Ensure that from 

Kafka consumer session timeouts

2014-12-19 Thread Terry Cumaranatunge
Hi
I would like to get some feedback on design choices with kafka consumers.
We have an application that a consumer reads a message and the thread does
a number of things, including database accesses before a message is
produced to another topic. The time between consuming and producing the
message on the thread can take several minutes. Once message is produced to
new topic, a commit is done to indicate we are done with work on the
consumer queue message. Auto commit is disabled for this reason.

I'm using the high level consumer and what I'm noticing is that zookeeper
and kafka sessions timeout because it is taking too long before we do
anything on consumer queue so kafka ends up rebalancing every time the
thread goes back to read more from consumer queue and it starts to take a
long time before a consumer reads a new message after a while.

I can set zookeeper session timeout very high to not make that a problem
but then i have to adjust the rebalance parameters accordingly and kafka
won't pickup a new consumer for a while among other side effects.

What are my options to solve this problem? Is there a way to heartbeat to
kafka and zookeeper to keep both happy? Do i still have these same issues
if i were to use a simple consumer?

Thanks


Re: The purpose of key in kafka

2014-12-19 Thread Rajiv Kurian
Thanks, didn't know that.

On Fri, Dec 19, 2014 at 10:39 AM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 Hi Rajiv,

 You can send messages without keys. Just provide null for key.

 Jiangjie (Becket) Qin


 On 12/19/14, 10:14 AM, Rajiv Kurian ra...@signalfuse.com wrote:

 Hi all,
 
 I was wondering what why every ProducerRecord sent requires a serialized
 key. I am using kafka, to send opaque bytes and I am ending up creating
 garbage keys because I don't really have a good one.
 
 Thanks,
 Rajiv




Re: Kafka 0.8.2 new producer blocking on metadata

2014-12-19 Thread Paul Pearcy
Hi Jay,
  I have implemented a wrapper around the producer to behave like I want it
to. Where it diverges from current 0.8.2 producer is that it accepts three
new inputs:
- A list of expected topics
- A timeout value to init meta for those topics during producer creationg
- An option to blow up if we fail to init topic meta within some amount of
time

I also needed to set  metadata.fetch.timeout.ms=1, as 0 means it will block
forever and kick off a thread to do the topic meta data init in the
background.

On the send side, things do fail fast, now. Only current hiccup(not
completely done re-working my tests, though) I am hitting now is that
messages accepted by the producer after the server have been stopped never
return a status if the producer is stopped, think this is a bug.

Are you sure you wouldn't want any of this behavior in client by default
which would give out of the box choices to be made on blocking behavior?
Happy to share code or send a PR.

Thanks,
Paul

On Fri, Dec 19, 2014 at 2:05 PM, Jay Kreps j...@confluent.io wrote:

 Hey Paul,

 I agree we should document this better.

 We allow and encourage using partitions to semantically distribute data. So
 unfortunately we can't just arbitrarily assign a partition (say 0) as that
 would actually give incorrect answers for any consumer that made use of the
 partitioning. It is true that the user can change the partitioning, but we
 can't ignore the partitioning they have set.

 I get the use case you have--you basically want a hard guarantee that
 send() will never block (so presumably you have set to also drop data if
 the buffer fills up). As I said the blocking only occurs on the first
 request for a given topic and you can avoid it by pre-initializing the
 topic metadata.

 I think the option you describe is actually possible now. Basically you can
 initialize the metadata for topics you care about using that
 partitionsFor() call. If you set the property metadata.fetch.timeout.ms=0
 then any send calls prior to the completion of metadata initialization will
 fail immediately rather than block.

 -Jay


 On Fri, Dec 19, 2014 at 9:32 AM, Paul Pearcy ppea...@gmail.com wrote:
 
  Hi Jay,
Many thanks for the info. All that makes sense, but from an API
  standpoint when something is labelled async and returns a Future, this
 will
  be misconstrued and developers will place async sends in critical client
  facing request/response pathways of code that should never block. If the
  app comes up with a bad config, it will hang all incoming connections.
 
  Obviously, there is a spectrum of use cases with regard to message loss
 and
  the defaults cannot cater to all use cases. I like that the defaults tend
  towards best effort guarantees, but I am not sure it justifies the
  inconsistency in the API.
 
  1) It sounds like the client is already structured to handle changes in
  partitions on the fly, I am sure I am over simplifying but in the case
  where no meta is available, but my naive approach would be assume some
  number of partitions and then when there is metadata treat it as a
  partition change event. If there are more unknown than just partition
  count, probably won't work.
  2) Pretty much makes sense, especially now that I see people on this
  discussion list wanting a million topics (good luck)
  3) I agree client creation shouldn't fail, but any sends should probably
  fast fail or have it explicit on the call the choice you are making.
 
  I'm still thinking about how I am going to make the client behave as I'd
  like. I think I need a background process kicked off on startup to prime
  the topics I am interested in. Until that process completes, any sends
  through the producer will need to fast fail instead of hang. This would
  still leave the window for blocking if you send to a topic your app
 wasn't
  aware it would send to, but now we're getting into corner corner cases.
 
  Would having something like that be a baked in option be accepted into
  Kafka clients mainline?
 
  A quick win might be to clarify the documentation so that it is clear
 that
  this API will block in cases XYZ (maybe this is mentioned somewhere and I
  missed it).
 
  Thanks,
  Paul
 
 
  On Thu, Dec 18, 2014 at 1:17 PM, Jay Kreps j...@confluent.io wrote:
  
   Hey Paul,
  
   Here are the constraints:
   1. We wanted the storage of messages to be in their compact binary form
  so
   we could bound memory usage. This implies partitioning prior to
 enqueue.
   And as you note partitioning requires having metadata (even stale
  metadata)
   about topics.
   2. We wanted to avoid prefetching metadata for all topics since there
 may
   be quite a lot of topics.
   3. We wanted to make metadata fetching lazy so that it would be
 possible
  to
   create a client without having an active network connection. This tends
  to
   be important when services are brought up in development or test
   environments where it is annoying to have to control the 

Re: The purpose of key in kafka

2014-12-19 Thread Steve Miller
   Also, if log.cleaner.enable is true in your broker config, that enables the 
log-compaction retention strategy.

   Then, for topics with the per-topic cleanup.policy=compact config 
parameter set, Kafka will scan the topic periodically, nuking old versions of 
the data with the same key.

   I seem to remember that there's some trickiness here, it's not that you're 
absolutely guaranteed to have just one message there with the same key, it's 
just that you'll always have at least one with that key.  I think that depends 
a bit on how big the segments are and how often you're configured to purge old 
log data and that sort of thing.  The idea is that you could have long-term 
persistent data stored within a topic without it getting out of control.

   But in any case, that's another thing that the keys can be useful for.

   It's been six months or so since I tried that so the details are a bit 
fuzzy, but it's something like that, at least.

-Steve

On Fri, Dec 19, 2014 at 01:04:36PM -0800, Rajiv Kurian wrote:
 Thanks, didn't know that.
 
 On Fri, Dec 19, 2014 at 10:39 AM, Jiangjie Qin j...@linkedin.com.invalid
 wrote:
 
  Hi Rajiv,
 
  You can send messages without keys. Just provide null for key.
 
  Jiangjie (Becket) Qin
 
 
  On 12/19/14, 10:14 AM, Rajiv Kurian ra...@signalfuse.com wrote:
 
  Hi all,
  
  I was wondering what why every ProducerRecord sent requires a serialized
  key. I am using kafka, to send opaque bytes and I am ending up creating
  garbage keys because I don't really have a good one.
  
  Thanks,
  Rajiv