Max. storage for Kafka and impact
Hi, We are using Kafka for our messaging system and we have an estimate for 200 TB/week in the coming months. Will it impact any performance for Kafka? PS: We will be having greater than 2 lakh partitions. -- Regards Vamsi Subhash
Re: Max. storage for Kafka and impact
We definitely need a retention policy of a week. Hence. On Fri, Dec 19, 2014 at 7:40 PM, Achanta Vamsi Subhash achanta.va...@flipkart.com wrote: Hi, We are using Kafka for our messaging system and we have an estimate for 200 TB/week in the coming months. Will it impact any performance for Kafka? PS: We will be having greater than 2 lakh partitions. -- Regards Vamsi Subhash -- Regards Vamsi Subhash
Re: Max. storage for Kafka and impact
hi, Few things you have to plan for: a. Ensure that from resilience point of view, you are having sufficient follower brokers for your partitions. b. In my testing of kafka (50TB/week) so far, haven't seen much issue with CPU utilization or memory. I had 24 CPU and 32GB RAM. c. 200,000 partitions means around 1MB/week/partition. are you sure you need so many partitions? Regards, Nitin Kumar Sharma. On Fri, Dec 19, 2014 at 9:12 AM, Achanta Vamsi Subhash achanta.va...@flipkart.com wrote: We definitely need a retention policy of a week. Hence. On Fri, Dec 19, 2014 at 7:40 PM, Achanta Vamsi Subhash achanta.va...@flipkart.com wrote: Hi, We are using Kafka for our messaging system and we have an estimate for 200 TB/week in the coming months. Will it impact any performance for Kafka? PS: We will be having greater than 2 lakh partitions. -- Regards Vamsi Subhash -- Regards Vamsi Subhash
Re: Max. storage for Kafka and impact
Yes. We need those many max partitions as we have a central messaging service and thousands of topics. On Friday, December 19, 2014, nitin sharma kumarsharma.ni...@gmail.com wrote: hi, Few things you have to plan for: a. Ensure that from resilience point of view, you are having sufficient follower brokers for your partitions. b. In my testing of kafka (50TB/week) so far, haven't seen much issue with CPU utilization or memory. I had 24 CPU and 32GB RAM. c. 200,000 partitions means around 1MB/week/partition. are you sure you need so many partitions? Regards, Nitin Kumar Sharma. On Fri, Dec 19, 2014 at 9:12 AM, Achanta Vamsi Subhash achanta.va...@flipkart.com javascript:; wrote: We definitely need a retention policy of a week. Hence. On Fri, Dec 19, 2014 at 7:40 PM, Achanta Vamsi Subhash achanta.va...@flipkart.com javascript:; wrote: Hi, We are using Kafka for our messaging system and we have an estimate for 200 TB/week in the coming months. Will it impact any performance for Kafka? PS: We will be having greater than 2 lakh partitions. -- Regards Vamsi Subhash -- Regards Vamsi Subhash -- Regards Vamsi Subhash
Re: Max. storage for Kafka and impact
see some comments inline On Fri, Dec 19, 2014 at 11:30 AM, Achanta Vamsi Subhash achanta.va...@flipkart.com wrote: We require: - many topics - ordering of messages for every topic Ordering is only on a per partition basis so you might have to pick a partition key that makes sense for what you are doing. - Consumers hit different Http EndPoints which may be slow (in a push model). In case of a Pull model, consumers may pull at the rate at which they can process. - We need parallelism to hit with as many consumers. Hence, we currently have around 50 consumers/topic = 50 partitions. I think you might be mixing up the fetch with the processing. You can have 1 partition and still have 50 message being processed in parallel (so a batch of messages). What language are you working in? How are you doing this processing exactly? Currently we have: 2000 topics x 50 = 1,00,000 partitions. If this is really the case then you are going to need at least 250 brokers (~ 4,000 partitions per broker). If you do that then you are in the 200TB per day world which doesn't sound to be the case. I really think you need to strategize more on your processing model some more. The incoming rate of ingestion at max is 100 MB/sec. We are planning for a big cluster with many brokers. It is possible to handle this on just 3 brokers depending on message size, ability to batch, durability are also factors you really need to be thinking about. We have exactly the same use cases as mentioned in this video (usage at LinkedIn): https://www.youtube.com/watch?v=19DvtEC0EbQ To handle the zookeeper scenario, as mentioned in the above video, we are planning to use SSDs and would upgrade to the new consumer (0.9+) once its available as per the below video. https://www.youtube.com/watch?v=7TZiN521FQA On Fri, Dec 19, 2014 at 9:06 PM, Jayesh Thakrar j_thak...@yahoo.com.invalid wrote: Technically/conceptually it is possible to have 200,000 topics, but do you really need it like that?What do you intend to do with those messages - i.e. how do you forsee them being processed downstream? And are those topics really there to segregate different kinds of processing or different ids?E.g. if you were LinkedIn, Facebook or Google, would you have have one topic per user or one topic per kind of event (e.g. login, pageview, adview, etc.)Remember there is significant book-keeping done within Zookeeper - and these many topics will make that book-keeping significant. As for storage, I don't think it should be an issue with sufficient spindles, servers and higher than default memory configuration. Jayesh From: Achanta Vamsi Subhash achanta.va...@flipkart.com To: users@kafka.apache.org users@kafka.apache.org Sent: Friday, December 19, 2014 9:00 AM Subject: Re: Max. storage for Kafka and impact Yes. We need those many max partitions as we have a central messaging service and thousands of topics. On Friday, December 19, 2014, nitin sharma kumarsharma.ni...@gmail.com wrote: hi, Few things you have to plan for: a. Ensure that from resilience point of view, you are having sufficient follower brokers for your partitions. b. In my testing of kafka (50TB/week) so far, haven't seen much issue with CPU utilization or memory. I had 24 CPU and 32GB RAM. c. 200,000 partitions means around 1MB/week/partition. are you sure you need so many partitions? Regards, Nitin Kumar Sharma. On Fri, Dec 19, 2014 at 9:12 AM, Achanta Vamsi Subhash achanta.va...@flipkart.com javascript:; wrote: We definitely need a retention policy of a week. Hence. On Fri, Dec 19, 2014 at 7:40 PM, Achanta Vamsi Subhash achanta.va...@flipkart.com javascript:; wrote: Hi, We are using Kafka for our messaging system and we have an estimate for 200 TB/week in the coming months. Will it impact any performance for Kafka? PS: We will be having greater than 2 lakh partitions. -- Regards Vamsi Subhash -- Regards Vamsi Subhash -- Regards Vamsi Subhash -- Regards Vamsi Subhash
Re: Max. storage for Kafka and impact
Wait, how do you get 2,000 topics each with 50 partitions == 1,000,000 partitions? I think you can take what I said below and change my 250 to 25 as I went with your result (1,000,000) and not your arguments (2,000 x 50). And you should think on the processing as a separate step from fetch and commit your offset in batch post processing. Then you only need more partitions to fetch batches to process in parallel. Regards, Joestein On Fri, Dec 19, 2014 at 12:01 PM, Joe Stein joe.st...@stealth.ly wrote: see some comments inline On Fri, Dec 19, 2014 at 11:30 AM, Achanta Vamsi Subhash achanta.va...@flipkart.com wrote: We require: - many topics - ordering of messages for every topic Ordering is only on a per partition basis so you might have to pick a partition key that makes sense for what you are doing. - Consumers hit different Http EndPoints which may be slow (in a push model). In case of a Pull model, consumers may pull at the rate at which they can process. - We need parallelism to hit with as many consumers. Hence, we currently have around 50 consumers/topic = 50 partitions. I think you might be mixing up the fetch with the processing. You can have 1 partition and still have 50 message being processed in parallel (so a batch of messages). What language are you working in? How are you doing this processing exactly? Currently we have: 2000 topics x 50 = 1,00,000 partitions. If this is really the case then you are going to need at least 250 brokers (~ 4,000 partitions per broker). If you do that then you are in the 200TB per day world which doesn't sound to be the case. I really think you need to strategize more on your processing model some more. The incoming rate of ingestion at max is 100 MB/sec. We are planning for a big cluster with many brokers. It is possible to handle this on just 3 brokers depending on message size, ability to batch, durability are also factors you really need to be thinking about. We have exactly the same use cases as mentioned in this video (usage at LinkedIn): https://www.youtube.com/watch?v=19DvtEC0EbQ To handle the zookeeper scenario, as mentioned in the above video, we are planning to use SSDs and would upgrade to the new consumer (0.9+) once its available as per the below video. https://www.youtube.com/watch?v=7TZiN521FQA On Fri, Dec 19, 2014 at 9:06 PM, Jayesh Thakrar j_thak...@yahoo.com.invalid wrote: Technically/conceptually it is possible to have 200,000 topics, but do you really need it like that?What do you intend to do with those messages - i.e. how do you forsee them being processed downstream? And are those topics really there to segregate different kinds of processing or different ids?E.g. if you were LinkedIn, Facebook or Google, would you have have one topic per user or one topic per kind of event (e.g. login, pageview, adview, etc.)Remember there is significant book-keeping done within Zookeeper - and these many topics will make that book-keeping significant. As for storage, I don't think it should be an issue with sufficient spindles, servers and higher than default memory configuration. Jayesh From: Achanta Vamsi Subhash achanta.va...@flipkart.com To: users@kafka.apache.org users@kafka.apache.org Sent: Friday, December 19, 2014 9:00 AM Subject: Re: Max. storage for Kafka and impact Yes. We need those many max partitions as we have a central messaging service and thousands of topics. On Friday, December 19, 2014, nitin sharma kumarsharma.ni...@gmail.com wrote: hi, Few things you have to plan for: a. Ensure that from resilience point of view, you are having sufficient follower brokers for your partitions. b. In my testing of kafka (50TB/week) so far, haven't seen much issue with CPU utilization or memory. I had 24 CPU and 32GB RAM. c. 200,000 partitions means around 1MB/week/partition. are you sure you need so many partitions? Regards, Nitin Kumar Sharma. On Fri, Dec 19, 2014 at 9:12 AM, Achanta Vamsi Subhash achanta.va...@flipkart.com javascript:; wrote: We definitely need a retention policy of a week. Hence. On Fri, Dec 19, 2014 at 7:40 PM, Achanta Vamsi Subhash achanta.va...@flipkart.com javascript:; wrote: Hi, We are using Kafka for our messaging system and we have an estimate for 200 TB/week in the coming months. Will it impact any performance for Kafka? PS: We will be having greater than 2 lakh partitions. -- Regards Vamsi Subhash -- Regards Vamsi Subhash -- Regards Vamsi Subhash -- Regards Vamsi Subhash
Re: Kafka 0.8.2 new producer blocking on metadata
Hi Jay, Many thanks for the info. All that makes sense, but from an API standpoint when something is labelled async and returns a Future, this will be misconstrued and developers will place async sends in critical client facing request/response pathways of code that should never block. If the app comes up with a bad config, it will hang all incoming connections. Obviously, there is a spectrum of use cases with regard to message loss and the defaults cannot cater to all use cases. I like that the defaults tend towards best effort guarantees, but I am not sure it justifies the inconsistency in the API. 1) It sounds like the client is already structured to handle changes in partitions on the fly, I am sure I am over simplifying but in the case where no meta is available, but my naive approach would be assume some number of partitions and then when there is metadata treat it as a partition change event. If there are more unknown than just partition count, probably won't work. 2) Pretty much makes sense, especially now that I see people on this discussion list wanting a million topics (good luck) 3) I agree client creation shouldn't fail, but any sends should probably fast fail or have it explicit on the call the choice you are making. I'm still thinking about how I am going to make the client behave as I'd like. I think I need a background process kicked off on startup to prime the topics I am interested in. Until that process completes, any sends through the producer will need to fast fail instead of hang. This would still leave the window for blocking if you send to a topic your app wasn't aware it would send to, but now we're getting into corner corner cases. Would having something like that be a baked in option be accepted into Kafka clients mainline? A quick win might be to clarify the documentation so that it is clear that this API will block in cases XYZ (maybe this is mentioned somewhere and I missed it). Thanks, Paul On Thu, Dec 18, 2014 at 1:17 PM, Jay Kreps j...@confluent.io wrote: Hey Paul, Here are the constraints: 1. We wanted the storage of messages to be in their compact binary form so we could bound memory usage. This implies partitioning prior to enqueue. And as you note partitioning requires having metadata (even stale metadata) about topics. 2. We wanted to avoid prefetching metadata for all topics since there may be quite a lot of topics. 3. We wanted to make metadata fetching lazy so that it would be possible to create a client without having an active network connection. This tends to be important when services are brought up in development or test environments where it is annoying to have to control the dependency graph when starting things. This blocking isn't too bad as it only occurs on the first request for each topic. Our feeling was that many things tend to get setup on a first request (DB connections are established, caches populated, etc) so this was not unreasonable. If you want to pre-initialize the metadata to avoid blocking on the first request you can do so by fetching the metadata using the producer.partitionsFor(topic) api at start-up. -Jay On Thu, Dec 18, 2014 at 9:07 AM, Paul Pearcy ppea...@gmail.com wrote: Hello, Playing around with the 0.8.2-beta producer client. One of my test cases is to ensure producers can deal with Kafka being down when the producer is created. My tests failed miserably because of the default blocking in the producer with regard to metadata.fetch.timeout.ms. The first line of new producer is waitOnMetadata which is blocking. I can handle this case by loading topic meta on init and setting the timeout value to very low metadata.fetch.timeout.ms and either throwing away messages or creating my own internal queue to buffer. I’m surprised the metasync isn’t done async. If it fails, return that in the future/callback. This way the API could actually be considered safely async and the producer buffer could try to hold on to things until block.on.buffer.full kicks in to either drop messages or block. You’d probably need a partition callback since numPartitions wouldn’t be available. The implication is that people's apps will work fine if first messages are sent while kafka server is up, however, if kafka is down and they restart their app, the new producer will block all sends and blow things up if you haven't written your app to be aware of this edge case. Thanks, Paul
Re: Max. storage for Kafka and impact
Joe, - Correction, it's 1,00,000 partitions - We can have at max only 1 consumer/partition. Not 50 per 1 partition. Yes, we have a hashing mechanism to support future partition increase as well. We override the Default Partitioner. - We use both Simple and HighLevel consumers depending on the consumption use-case. - I clearly mentioned that 200 TB/week and not a day. - We have separate producers and consumers, each operating as different processes in different machines. I was explaining why we may end up with so many partitions. I think the question about 200 TB/day got deviated. Any suggestions reg. the performance impact of the 200TB/week? On Fri, Dec 19, 2014 at 10:53 PM, Joe Stein joe.st...@stealth.ly wrote: Wait, how do you get 2,000 topics each with 50 partitions == 1,000,000 partitions? I think you can take what I said below and change my 250 to 25 as I went with your result (1,000,000) and not your arguments (2,000 x 50). And you should think on the processing as a separate step from fetch and commit your offset in batch post processing. Then you only need more partitions to fetch batches to process in parallel. Regards, Joestein On Fri, Dec 19, 2014 at 12:01 PM, Joe Stein joe.st...@stealth.ly wrote: see some comments inline On Fri, Dec 19, 2014 at 11:30 AM, Achanta Vamsi Subhash achanta.va...@flipkart.com wrote: We require: - many topics - ordering of messages for every topic Ordering is only on a per partition basis so you might have to pick a partition key that makes sense for what you are doing. - Consumers hit different Http EndPoints which may be slow (in a push model). In case of a Pull model, consumers may pull at the rate at which they can process. - We need parallelism to hit with as many consumers. Hence, we currently have around 50 consumers/topic = 50 partitions. I think you might be mixing up the fetch with the processing. You can have 1 partition and still have 50 message being processed in parallel (so a batch of messages). What language are you working in? How are you doing this processing exactly? Currently we have: 2000 topics x 50 = 1,00,000 partitions. If this is really the case then you are going to need at least 250 brokers (~ 4,000 partitions per broker). If you do that then you are in the 200TB per day world which doesn't sound to be the case. I really think you need to strategize more on your processing model some more. The incoming rate of ingestion at max is 100 MB/sec. We are planning for a big cluster with many brokers. It is possible to handle this on just 3 brokers depending on message size, ability to batch, durability are also factors you really need to be thinking about. We have exactly the same use cases as mentioned in this video (usage at LinkedIn): https://www.youtube.com/watch?v=19DvtEC0EbQ To handle the zookeeper scenario, as mentioned in the above video, we are planning to use SSDs and would upgrade to the new consumer (0.9+) once its available as per the below video. https://www.youtube.com/watch?v=7TZiN521FQA On Fri, Dec 19, 2014 at 9:06 PM, Jayesh Thakrar j_thak...@yahoo.com.invalid wrote: Technically/conceptually it is possible to have 200,000 topics, but do you really need it like that?What do you intend to do with those messages - i.e. how do you forsee them being processed downstream? And are those topics really there to segregate different kinds of processing or different ids?E.g. if you were LinkedIn, Facebook or Google, would you have have one topic per user or one topic per kind of event (e.g. login, pageview, adview, etc.)Remember there is significant book-keeping done within Zookeeper - and these many topics will make that book-keeping significant. As for storage, I don't think it should be an issue with sufficient spindles, servers and higher than default memory configuration. Jayesh From: Achanta Vamsi Subhash achanta.va...@flipkart.com To: users@kafka.apache.org users@kafka.apache.org Sent: Friday, December 19, 2014 9:00 AM Subject: Re: Max. storage for Kafka and impact Yes. We need those many max partitions as we have a central messaging service and thousands of topics. On Friday, December 19, 2014, nitin sharma kumarsharma.ni...@gmail.com wrote: hi, Few things you have to plan for: a. Ensure that from resilience point of view, you are having sufficient follower brokers for your partitions. b. In my testing of kafka (50TB/week) so far, haven't seen much issue with CPU utilization or memory. I had 24 CPU and 32GB RAM. c. 200,000 partitions means around 1MB/week/partition. are you sure you need so many partitions? Regards, Nitin Kumar Sharma. On Fri, Dec 19, 2014 at 9:12 AM, Achanta Vamsi Subhash
The purpose of key in kafka
Hi all, I was wondering what why every ProducerRecord sent requires a serialized key. I am using kafka, to send opaque bytes and I am ending up creating garbage keys because I don't really have a good one. Thanks, Rajiv
Re: The purpose of key in kafka
Hi Rajiv, You can send messages without keys. Just provide null for key. Jiangjie (Becket) Qin On 12/19/14, 10:14 AM, Rajiv Kurian ra...@signalfuse.com wrote: Hi all, I was wondering what why every ProducerRecord sent requires a serialized key. I am using kafka, to send opaque bytes and I am ending up creating garbage keys because I don't really have a good one. Thanks, Rajiv
Re: Kafka 0.8.2 new producer blocking on metadata
Hey Paul, I agree we should document this better. We allow and encourage using partitions to semantically distribute data. So unfortunately we can't just arbitrarily assign a partition (say 0) as that would actually give incorrect answers for any consumer that made use of the partitioning. It is true that the user can change the partitioning, but we can't ignore the partitioning they have set. I get the use case you have--you basically want a hard guarantee that send() will never block (so presumably you have set to also drop data if the buffer fills up). As I said the blocking only occurs on the first request for a given topic and you can avoid it by pre-initializing the topic metadata. I think the option you describe is actually possible now. Basically you can initialize the metadata for topics you care about using that partitionsFor() call. If you set the property metadata.fetch.timeout.ms=0 then any send calls prior to the completion of metadata initialization will fail immediately rather than block. -Jay On Fri, Dec 19, 2014 at 9:32 AM, Paul Pearcy ppea...@gmail.com wrote: Hi Jay, Many thanks for the info. All that makes sense, but from an API standpoint when something is labelled async and returns a Future, this will be misconstrued and developers will place async sends in critical client facing request/response pathways of code that should never block. If the app comes up with a bad config, it will hang all incoming connections. Obviously, there is a spectrum of use cases with regard to message loss and the defaults cannot cater to all use cases. I like that the defaults tend towards best effort guarantees, but I am not sure it justifies the inconsistency in the API. 1) It sounds like the client is already structured to handle changes in partitions on the fly, I am sure I am over simplifying but in the case where no meta is available, but my naive approach would be assume some number of partitions and then when there is metadata treat it as a partition change event. If there are more unknown than just partition count, probably won't work. 2) Pretty much makes sense, especially now that I see people on this discussion list wanting a million topics (good luck) 3) I agree client creation shouldn't fail, but any sends should probably fast fail or have it explicit on the call the choice you are making. I'm still thinking about how I am going to make the client behave as I'd like. I think I need a background process kicked off on startup to prime the topics I am interested in. Until that process completes, any sends through the producer will need to fast fail instead of hang. This would still leave the window for blocking if you send to a topic your app wasn't aware it would send to, but now we're getting into corner corner cases. Would having something like that be a baked in option be accepted into Kafka clients mainline? A quick win might be to clarify the documentation so that it is clear that this API will block in cases XYZ (maybe this is mentioned somewhere and I missed it). Thanks, Paul On Thu, Dec 18, 2014 at 1:17 PM, Jay Kreps j...@confluent.io wrote: Hey Paul, Here are the constraints: 1. We wanted the storage of messages to be in their compact binary form so we could bound memory usage. This implies partitioning prior to enqueue. And as you note partitioning requires having metadata (even stale metadata) about topics. 2. We wanted to avoid prefetching metadata for all topics since there may be quite a lot of topics. 3. We wanted to make metadata fetching lazy so that it would be possible to create a client without having an active network connection. This tends to be important when services are brought up in development or test environments where it is annoying to have to control the dependency graph when starting things. This blocking isn't too bad as it only occurs on the first request for each topic. Our feeling was that many things tend to get setup on a first request (DB connections are established, caches populated, etc) so this was not unreasonable. If you want to pre-initialize the metadata to avoid blocking on the first request you can do so by fetching the metadata using the producer.partitionsFor(topic) api at start-up. -Jay On Thu, Dec 18, 2014 at 9:07 AM, Paul Pearcy ppea...@gmail.com wrote: Hello, Playing around with the 0.8.2-beta producer client. One of my test cases is to ensure producers can deal with Kafka being down when the producer is created. My tests failed miserably because of the default blocking in the producer with regard to metadata.fetch.timeout.ms. The first line of new producer is waitOnMetadata which is blocking. I can handle this case by loading topic meta on init and setting the timeout value to very low metadata.fetch.timeout.ms and either throwing away messages or creating my own internal queue to
Fwd: Help: KafkaSpout not getting data from Kafka
Hi folks, I am new to both Kafka and Storm and I have problem having KafkaSpout to get data from Kafka in our three-node environment with Kafka 0.8.1.1 and Storm 0.9.3. What is working: - I have a Kafka producer (a java application) to generate random string to a topic and I was able to run the following command in one of the nodes to read the random strings on the console while the Kafka producer is running: kaffa folder/bin/kafka-console-consumer.sh --zookeeper node1:2181,node2:2181,node3:2181/kafka --topic test_topic - I was also able to run WordCountTopology in Storm. What is not working: - I tried running the following code based on KafkaTopology.java: String zkNodes = node1:2181,node2:2181,node3:2181; String brokerZkPath = /kafka/brokers; String topicName = test_topic; String zkRoot = /kafka; String topoName = test_topology; ZkHosts zkhost = new ZkHosts(zkNodes, brokerZkPath); SpoutConfig kafkaConf = new SpoutConfig(zkhost, topicName, zkRoot, discovery); kafkaConf.scheme = new SchemeAsMultiScheme(new StringScheme()); kafkaConf.forceStartOffsetTime(-2); KafkaSpout kafkaSpout = new KafkaSpout(kafkaConf); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(spout, kafkaSpout, 1); builder.setBolt(printer, new PrinterBolt()).shuffleGrouping(spout2); Config config = new Config(); config.setDebug(true); config.setNumWorkers(3); StormSubmitter.submitTopology(topoName, config, builder.createTopology()); - Result: In Storm UI, the numbers of emitted and transferred are always 0, regardless of whether Kafka producer is running. See attached image. I - Command to run: storm folder/bin/storm jar storm-starter-0.9.3-jar-with-dependencies.jar storm.starter.KafkaTopology - Zookeeper Path: [zk: node1:2181(CONNECTED) 73] ls /kafka [consumers, config, controller, admin, brokers, controller_epoch] [zk: node1:2181(CONNECTED) 74] ls /kafka/brokers [topics, ids] I have run out of ideas of trying different options and figuring out where to look. If anyone could shed some light on this topic, I would greatly appreciate it. Many thanks! -BH
Re: Max. storage for Kafka and impact
@Joe, Achanta is using Indian English numerals which is why it's a little confusing. http://en.wikipedia.org/wiki/Indian_English#Numbering_system 1,00,000 [1 lakh] (Indian English) == 100,000 [1 hundred thousand] (The rest of the world :P) On Fri Dec 19 2014 at 9:40:29 AM Achanta Vamsi Subhash achanta.va...@flipkart.com wrote: Joe, - Correction, it's 1,00,000 partitions - We can have at max only 1 consumer/partition. Not 50 per 1 partition. Yes, we have a hashing mechanism to support future partition increase as well. We override the Default Partitioner. - We use both Simple and HighLevel consumers depending on the consumption use-case. - I clearly mentioned that 200 TB/week and not a day. - We have separate producers and consumers, each operating as different processes in different machines. I was explaining why we may end up with so many partitions. I think the question about 200 TB/day got deviated. Any suggestions reg. the performance impact of the 200TB/week? On Fri, Dec 19, 2014 at 10:53 PM, Joe Stein joe.st...@stealth.ly wrote: Wait, how do you get 2,000 topics each with 50 partitions == 1,000,000 partitions? I think you can take what I said below and change my 250 to 25 as I went with your result (1,000,000) and not your arguments (2,000 x 50). And you should think on the processing as a separate step from fetch and commit your offset in batch post processing. Then you only need more partitions to fetch batches to process in parallel. Regards, Joestein On Fri, Dec 19, 2014 at 12:01 PM, Joe Stein joe.st...@stealth.ly wrote: see some comments inline On Fri, Dec 19, 2014 at 11:30 AM, Achanta Vamsi Subhash achanta.va...@flipkart.com wrote: We require: - many topics - ordering of messages for every topic Ordering is only on a per partition basis so you might have to pick a partition key that makes sense for what you are doing. - Consumers hit different Http EndPoints which may be slow (in a push model). In case of a Pull model, consumers may pull at the rate at which they can process. - We need parallelism to hit with as many consumers. Hence, we currently have around 50 consumers/topic = 50 partitions. I think you might be mixing up the fetch with the processing. You can have 1 partition and still have 50 message being processed in parallel (so a batch of messages). What language are you working in? How are you doing this processing exactly? Currently we have: 2000 topics x 50 = 1,00,000 partitions. If this is really the case then you are going to need at least 250 brokers (~ 4,000 partitions per broker). If you do that then you are in the 200TB per day world which doesn't sound to be the case. I really think you need to strategize more on your processing model some more. The incoming rate of ingestion at max is 100 MB/sec. We are planning for a big cluster with many brokers. It is possible to handle this on just 3 brokers depending on message size, ability to batch, durability are also factors you really need to be thinking about. We have exactly the same use cases as mentioned in this video (usage at LinkedIn): https://www.youtube.com/watch?v=19DvtEC0EbQ To handle the zookeeper scenario, as mentioned in the above video, we are planning to use SSDs and would upgrade to the new consumer (0.9+) once its available as per the below video. https://www.youtube.com/watch?v=7TZiN521FQA On Fri, Dec 19, 2014 at 9:06 PM, Jayesh Thakrar j_thak...@yahoo.com.invalid wrote: Technically/conceptually it is possible to have 200,000 topics, but do you really need it like that?What do you intend to do with those messages - i.e. how do you forsee them being processed downstream? And are those topics really there to segregate different kinds of processing or different ids?E.g. if you were LinkedIn, Facebook or Google, would you have have one topic per user or one topic per kind of event (e.g. login, pageview, adview, etc.)Remember there is significant book-keeping done within Zookeeper - and these many topics will make that book-keeping significant. As for storage, I don't think it should be an issue with sufficient spindles, servers and higher than default memory configuration. Jayesh From: Achanta Vamsi Subhash achanta.va...@flipkart.com To: users@kafka.apache.org users@kafka.apache.org Sent: Friday, December 19, 2014 9:00 AM Subject: Re: Max. storage for Kafka and impact Yes. We need those many max partitions as we have a central messaging service and thousands of topics. On Friday, December 19, 2014, nitin sharma kumarsharma.ni...@gmail.com wrote: hi, Few things you have to plan for: a. Ensure that from
Kafka consumer session timeouts
Hi I would like to get some feedback on design choices with kafka consumers. We have an application that a consumer reads a message and the thread does a number of things, including database accesses before a message is produced to another topic. The time between consuming and producing the message on the thread can take several minutes. Once message is produced to new topic, a commit is done to indicate we are done with work on the consumer queue message. Auto commit is disabled for this reason. I'm using the high level consumer and what I'm noticing is that zookeeper and kafka sessions timeout because it is taking too long before we do anything on consumer queue so kafka ends up rebalancing every time the thread goes back to read more from consumer queue and it starts to take a long time before a consumer reads a new message after a while. I can set zookeeper session timeout very high to not make that a problem but then i have to adjust the rebalance parameters accordingly and kafka won't pickup a new consumer for a while among other side effects. What are my options to solve this problem? Is there a way to heartbeat to kafka and zookeeper to keep both happy? Do i still have these same issues if i were to use a simple consumer? Thanks
Re: The purpose of key in kafka
Thanks, didn't know that. On Fri, Dec 19, 2014 at 10:39 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hi Rajiv, You can send messages without keys. Just provide null for key. Jiangjie (Becket) Qin On 12/19/14, 10:14 AM, Rajiv Kurian ra...@signalfuse.com wrote: Hi all, I was wondering what why every ProducerRecord sent requires a serialized key. I am using kafka, to send opaque bytes and I am ending up creating garbage keys because I don't really have a good one. Thanks, Rajiv
Re: Kafka 0.8.2 new producer blocking on metadata
Hi Jay, I have implemented a wrapper around the producer to behave like I want it to. Where it diverges from current 0.8.2 producer is that it accepts three new inputs: - A list of expected topics - A timeout value to init meta for those topics during producer creationg - An option to blow up if we fail to init topic meta within some amount of time I also needed to set metadata.fetch.timeout.ms=1, as 0 means it will block forever and kick off a thread to do the topic meta data init in the background. On the send side, things do fail fast, now. Only current hiccup(not completely done re-working my tests, though) I am hitting now is that messages accepted by the producer after the server have been stopped never return a status if the producer is stopped, think this is a bug. Are you sure you wouldn't want any of this behavior in client by default which would give out of the box choices to be made on blocking behavior? Happy to share code or send a PR. Thanks, Paul On Fri, Dec 19, 2014 at 2:05 PM, Jay Kreps j...@confluent.io wrote: Hey Paul, I agree we should document this better. We allow and encourage using partitions to semantically distribute data. So unfortunately we can't just arbitrarily assign a partition (say 0) as that would actually give incorrect answers for any consumer that made use of the partitioning. It is true that the user can change the partitioning, but we can't ignore the partitioning they have set. I get the use case you have--you basically want a hard guarantee that send() will never block (so presumably you have set to also drop data if the buffer fills up). As I said the blocking only occurs on the first request for a given topic and you can avoid it by pre-initializing the topic metadata. I think the option you describe is actually possible now. Basically you can initialize the metadata for topics you care about using that partitionsFor() call. If you set the property metadata.fetch.timeout.ms=0 then any send calls prior to the completion of metadata initialization will fail immediately rather than block. -Jay On Fri, Dec 19, 2014 at 9:32 AM, Paul Pearcy ppea...@gmail.com wrote: Hi Jay, Many thanks for the info. All that makes sense, but from an API standpoint when something is labelled async and returns a Future, this will be misconstrued and developers will place async sends in critical client facing request/response pathways of code that should never block. If the app comes up with a bad config, it will hang all incoming connections. Obviously, there is a spectrum of use cases with regard to message loss and the defaults cannot cater to all use cases. I like that the defaults tend towards best effort guarantees, but I am not sure it justifies the inconsistency in the API. 1) It sounds like the client is already structured to handle changes in partitions on the fly, I am sure I am over simplifying but in the case where no meta is available, but my naive approach would be assume some number of partitions and then when there is metadata treat it as a partition change event. If there are more unknown than just partition count, probably won't work. 2) Pretty much makes sense, especially now that I see people on this discussion list wanting a million topics (good luck) 3) I agree client creation shouldn't fail, but any sends should probably fast fail or have it explicit on the call the choice you are making. I'm still thinking about how I am going to make the client behave as I'd like. I think I need a background process kicked off on startup to prime the topics I am interested in. Until that process completes, any sends through the producer will need to fast fail instead of hang. This would still leave the window for blocking if you send to a topic your app wasn't aware it would send to, but now we're getting into corner corner cases. Would having something like that be a baked in option be accepted into Kafka clients mainline? A quick win might be to clarify the documentation so that it is clear that this API will block in cases XYZ (maybe this is mentioned somewhere and I missed it). Thanks, Paul On Thu, Dec 18, 2014 at 1:17 PM, Jay Kreps j...@confluent.io wrote: Hey Paul, Here are the constraints: 1. We wanted the storage of messages to be in their compact binary form so we could bound memory usage. This implies partitioning prior to enqueue. And as you note partitioning requires having metadata (even stale metadata) about topics. 2. We wanted to avoid prefetching metadata for all topics since there may be quite a lot of topics. 3. We wanted to make metadata fetching lazy so that it would be possible to create a client without having an active network connection. This tends to be important when services are brought up in development or test environments where it is annoying to have to control the
Re: The purpose of key in kafka
Also, if log.cleaner.enable is true in your broker config, that enables the log-compaction retention strategy. Then, for topics with the per-topic cleanup.policy=compact config parameter set, Kafka will scan the topic periodically, nuking old versions of the data with the same key. I seem to remember that there's some trickiness here, it's not that you're absolutely guaranteed to have just one message there with the same key, it's just that you'll always have at least one with that key. I think that depends a bit on how big the segments are and how often you're configured to purge old log data and that sort of thing. The idea is that you could have long-term persistent data stored within a topic without it getting out of control. But in any case, that's another thing that the keys can be useful for. It's been six months or so since I tried that so the details are a bit fuzzy, but it's something like that, at least. -Steve On Fri, Dec 19, 2014 at 01:04:36PM -0800, Rajiv Kurian wrote: Thanks, didn't know that. On Fri, Dec 19, 2014 at 10:39 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hi Rajiv, You can send messages without keys. Just provide null for key. Jiangjie (Becket) Qin On 12/19/14, 10:14 AM, Rajiv Kurian ra...@signalfuse.com wrote: Hi all, I was wondering what why every ProducerRecord sent requires a serialized key. I am using kafka, to send opaque bytes and I am ending up creating garbage keys because I don't really have a good one. Thanks, Rajiv