Message Timeout
Hi, I am new to Kafka and I have a question how Kafka handles scenarios where no consumer is available. Can I configure Kafka in such a way that the messages will be dropped after x seconds? Otherwise I would be afraid that the queues would overflow... Cheers, Klaus -- -- Klaus Schaefers Senior Optimization Manager Ligatus GmbH Hohenstaufenring 30-32 D-50674 Köln Tel.: +49 (0) 221 / 56939 -784 Fax: +49 (0) 221 / 56 939 - 599 E-Mail: klaus.schaef...@ligatus.com Web: www.ligatus.de HRB Köln 56003 Geschäftsführung: Dipl.-Kaufmann Lars Hasselbach, Dipl.-Kaufmann Klaus Ludemann, Dipl.-Wirtschaftsingenieur Arne Wolter
Re: Message Timeout
Message retention in Kafka is disconnected from message consumption. Messages are all persisted to disk and the queues do not need to fit in RAM unlike some other systems. There are configuration values that control maximum log size in terms of MB and the duration of retention which is typically in terms of days, weeks or months, though perhaps hours at very high volumes. While you certainly could configure Kafka so that it would run out of disk space this can be avoided by a combination of configuration changes and bigger or more cheap spinning disks, or distributing the data across more machines. I hope this helps, though others likely have the configuration values at their fingertips. Christian On Jun 27, 2014 1:09 AM, Klaus Schaefers klaus.schaef...@ligatus.com wrote: Hi, I am new to Kafka and I have a question how Kafka handles scenarios where no consumer is available. Can I configure Kafka in such a way that the messages will be dropped after x seconds? Otherwise I would be afraid that the queues would overflow... Cheers, Klaus -- -- Klaus Schaefers Senior Optimization Manager Ligatus GmbH Hohenstaufenring 30-32 D-50674 Köln Tel.: +49 (0) 221 / 56939 -784 Fax: +49 (0) 221 / 56 939 - 599 E-Mail: klaus.schaef...@ligatus.com Web: www.ligatus.de HRB Köln 56003 Geschäftsführung: Dipl.-Kaufmann Lars Hasselbach, Dipl.-Kaufmann Klaus Ludemann, Dipl.-Wirtschaftsingenieur Arne Wolter
Re: Question on message content, compression, multiple messages per kafka message?
As a data point on one system, while Snappy compression is significantly better than gzip, for our system isn't wasn't enough to offset the decompress/compress on the broker. No matter how fast the compression is, doing that on the broker will always be slower than not. We went the route the original poster is discussing and compressed in our app on the producer, allowing the broker to do its in-place offset management without decompressing. This has the trade-off of having to do the batching on our own and for our consuming applications to manage the decompress, but the result was about 3x increase in throughput from that change alone. Our application-level messages are around 3KB each. I think anyone wanting to get the most throughput may have good luck going this route. The decompress/compress on the broker has a significant effect, regardless of the compression scheme used. -Chris On Thu, Jun 26, 2014 at 3:23 PM, Neha Narkhede neha.narkh...@gmail.com wrote: Using a single Kafka message to contain an application snapshot has the upside of getting atomicity for free. Either the snapshot will be written as a whole to Kafka or not. This is poor man's transactionality. Care needs to be taken to ensure that the message is not too large since that might cause memory consumption problems on the server or the consumers. As far as compression overhead is concerned, have you tried running Snappy? Snappy's performance is good enough to offset the decompression-compression overhead on the server. Thanks, Neha On Thu, Jun 26, 2014 at 12:42 PM, Bert Corderman bertc...@gmail.com wrote: We are in the process of engineering a system that will be using kafka. The legacy system is using the local file system and a database as the queue. In terms of scale we process about 35 billion events per day contained in 15 million files. I am looking for feedback on a design decision we are discussing In our current system we depending heavily on compression as a performance optimization. In kafka the use of compression has some overhead as the broker needs to decompress the data to assign offsets and re-compress. (explained in detail here http://geekmantra.wordpress.com/2013/03/28/compression-in-kafka-gzip-or-snappy/ ) We are thinking about NOT using Kafka compression but rather compressing multiple rows in our code. For example let’s say we wanted to send data in batches of 5,00 rows. Using Kafka compression we would use a batch size of 5,000 rows and use compression. The other option is using a batch size of 1 in Kafka BUT in our code take 5,000 messages, compress them and then send to kafka using the kafka compression setting of none. Is this a pattern others have used? Regardless of compression I am curious if others are using a single message in kafka to contain multiple messages from an application standpoint. Bert
Re: Experiences with larger message sizes
I am using gzip compression. Too big is really difficult to define because it always depends (for example what can your hardware handle), but I would say no more than a few megabytes. Having said that we are still successfully using 50MB size in production for some things, but it comes at a cost. It requires us to tune each consumer individually and keep these consumers separated (not within the same jvm) for SLA reasons. -Luke On 6/26/14, 6:47 PM, Bert Corderman bertc...@gmail.com wrote: Thanks for the details Luke. At what point would you consider a message too big? Are you using compression? Bert On Thursday, June 26, 2014, Luke Forehand luke.foreh...@networkedinsights.com wrote: I have used 50MB message size and it is not a great idea. First of all you need to make sure you have these settings in sync: message.max.bytes replica.fetch.max.bytes fetch.message.max.bytes I had not set the replica fetch setting and didn't realize one of my partitions was not replicating after a large message was produced. I also ran into heap issues with having to fetch such a large message, lots of unnecessary garbage collection. I suggest breaking down your message into smaller chunks. In my case, I decided to break an XML input stream (which had a root element wrapping a ridiculously large number of children) into smaller messages, having to parse the large xml root document and re-wrap each child element with a shallow clone of its parents as I iterated the stream. -Luke From: Denny Lee denny.g@gmail.com javascript:; Sent: Tuesday, June 24, 2014 10:35 AM To: users@kafka.apache.org javascript:; Subject: Experiences with larger message sizes By any chance has anyone worked with using Kafka with message sizes that are approximately 50MB in size? Based on from some of the previous threads there are probably some concerns on memory pressure due to the compression on the broker and decompression on the consumer and a best practices on ensuring batch size (to ultimately not have the compressed message exceed message size limit). Any other best practices or thoughts concerning this scenario? Thanks! Denny
Re: Apache Kafka NYC Users Group!
This is great! -Jay On Thu, Jun 26, 2014 at 5:47 PM, Joe Stein joe.st...@stealth.ly wrote: Hi folks, I just started a new Meetup specifically for Apache Kafka in NYC (everyone is welcome of course) http://www.meetup.com/Apache-Kafka-NYC/ For the last couple of years we have been piggy backing talks and the community with other NYC meetup groups (Storm, Cassandra, Hadoop, etc) and figured it was about time to-do this. I am starting to look for more folks to talk about Kafka (running kafka, eco system, clients, et el) so we can get some meetups scheduled and keep things moving along (also venues). Thanks! /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop /
Re: Message Timeout
You can control retention using log.retention.hours, log.retention.minutes or log.retention.bytes. On Fri, Jun 27, 2014 at 2:06 AM, cac...@gmail.com cac...@gmail.com wrote: Message retention in Kafka is disconnected from message consumption. Messages are all persisted to disk and the queues do not need to fit in RAM unlike some other systems. There are configuration values that control maximum log size in terms of MB and the duration of retention which is typically in terms of days, weeks or months, though perhaps hours at very high volumes. While you certainly could configure Kafka so that it would run out of disk space this can be avoided by a combination of configuration changes and bigger or more cheap spinning disks, or distributing the data across more machines. I hope this helps, though others likely have the configuration values at their fingertips. Christian On Jun 27, 2014 1:09 AM, Klaus Schaefers klaus.schaef...@ligatus.com wrote: Hi, I am new to Kafka and I have a question how Kafka handles scenarios where no consumer is available. Can I configure Kafka in such a way that the messages will be dropped after x seconds? Otherwise I would be afraid that the queues would overflow... Cheers, Klaus -- -- Klaus Schaefers Senior Optimization Manager Ligatus GmbH Hohenstaufenring 30-32 D-50674 Köln Tel.: +49 (0) 221 / 56939 -784 Fax: +49 (0) 221 / 56 939 - 599 E-Mail: klaus.schaef...@ligatus.com Web: www.ligatus.de HRB Köln 56003 Geschäftsführung: Dipl.-Kaufmann Lars Hasselbach, Dipl.-Kaufmann Klaus Ludemann, Dipl.-Wirtschaftsingenieur Arne Wolter
RE: Failed to send messages after 3 tries
Neha, Apologies for the slow response. I was out yesterday. To answer your questions -- Is the LeaderNotAvailableException repeatable? Yes. I happens whenever I send a message to that topic. -- Are you running Kafka in the cloud? No. Does this problem indicate that the topic is corrupt? If so, what would I need to do to clean it up? Thanks, Mike -Original Message- From: Neha Narkhede [mailto:neha.narkh...@gmail.com] Sent: Wednesday, June 25, 2014 11:24 PM To: users@kafka.apache.org Subject: Re: Failed to send messages after 3 tries The output from the list topic tool suggests that a leader is available for all partitions. Is the LeaderNotAvailableException repeatable? Are you running Kafka in the cloud? On Wed, Jun 25, 2014 at 4:03 PM, England, Michael mengl...@homeadvisor.com wrote: By the way, this is what I get when I describe the topic: Topic:lead.indexer PartitionCount:53ReplicationFactor:1 Configs: Topic: lead.indexer Partition: 0Leader: 2 Replicas: 2 Isr: 2 Topic: lead.indexer Partition: 1Leader: 1 Replicas: 1 Isr: 1 Topic: lead.indexer Partition: 2Leader: 2 Replicas: 2 Isr: 2 Topic: lead.indexer Partition: 3Leader: 1 Replicas: 1 Isr: 1 Topic: lead.indexer Partition: 4Leader: 2 Replicas: 2 Isr: 2 Topic: lead.indexer Partition: 5Leader: 1 Replicas: 1 Isr: 1 Topic: lead.indexer Partition: 6Leader: 2 Replicas: 2 Isr: 2 Topic: lead.indexer Partition: 7Leader: 1 Replicas: 1 Isr: 1 Topic: lead.indexer Partition: 8Leader: 2 Replicas: 2 Isr: 2 Topic: lead.indexer Partition: 9Leader: 1 Replicas: 1 Isr: 1 Topic: lead.indexer Partition: 10 Leader: 2 Replicas: 2 Isr: 2 Topic: lead.indexer Partition: 11 Leader: 1 Replicas: 1 Isr: 1 Topic: lead.indexer Partition: 12 Leader: 2 Replicas: 2 Isr: 2 Topic: lead.indexer Partition: 13 Leader: 1 Replicas: 1 Isr: 1 Topic: lead.indexer Partition: 14 Leader: 2 Replicas: 2 Isr: 2 Topic: lead.indexer Partition: 15 Leader: 1 Replicas: 1 Isr: 1 Topic: lead.indexer Partition: 16 Leader: 2 Replicas: 2 Isr: 2 Topic: lead.indexer Partition: 17 Leader: 1 Replicas: 1 Isr: 1 Topic: lead.indexer Partition: 18 Leader: 2 Replicas: 2 Isr: 2 Topic: lead.indexer Partition: 19 Leader: 1 Replicas: 1 Isr: 1 Topic: lead.indexer Partition: 20 Leader: 2 Replicas: 2 Isr: 2 Topic: lead.indexer Partition: 21 Leader: 1 Replicas: 1 Isr: 1 Topic: lead.indexer Partition: 22 Leader: 2 Replicas: 2 Isr: 2 Topic: lead.indexer Partition: 23 Leader: 1 Replicas: 1 Isr: 1 Topic: lead.indexer Partition: 24 Leader: 2 Replicas: 2 Isr: 2 Topic: lead.indexer Partition: 25 Leader: 1 Replicas: 1 Isr: 1 Topic: lead.indexer Partition: 26 Leader: 2 Replicas: 2 Isr: 2 Topic: lead.indexer Partition: 27 Leader: 1 Replicas: 1 Isr: 1 Topic: lead.indexer Partition: 28 Leader: 2 Replicas: 2 Isr: 2 Topic: lead.indexer Partition: 29 Leader: 1 Replicas: 1 Isr: 1 Topic: lead.indexer Partition: 30 Leader: 2 Replicas: 2 Isr: 2 Topic: lead.indexer Partition: 31 Leader: 1 Replicas: 1 Isr: 1 Topic: lead.indexer Partition: 32 Leader: 2 Replicas: 2 Isr: 2 Topic: lead.indexer Partition: 33 Leader: 1 Replicas: 1 Isr: 1 Topic: lead.indexer Partition: 34 Leader: 2 Replicas: 2 Isr: 2 Topic: lead.indexer Partition: 35 Leader: 1 Replicas: 1 Isr: 1 Topic: lead.indexer Partition: 36 Leader: 2 Replicas: 2 Isr: 2 Topic: lead.indexer Partition: 37 Leader: 1 Replicas: 1 Isr: 1 Topic: lead.indexer Partition: 38 Leader: 2 Replicas: 2 Isr: 2 Topic: lead.indexer Partition: 39 Leader: 1 Replicas: 1 Isr: 1 Topic: lead.indexer Partition: 40 Leader: 2 Replicas: 2 Isr: 2 Topic: lead.indexer Partition: 41 Leader: 1 Replicas: 1 Isr: 1 Topic: lead.indexer Partition: 42 Leader: 2 Replicas: 2 Isr: 2 Topic: lead.indexer Partition: 43 Leader: 1 Replicas: 1 Isr: 1 Topic: lead.indexer Partition: 44 Leader: 2 Replicas: 2 Isr: 2 Topic: lead.indexer Partition: 45 Leader: 1 Replicas: 1 Isr: 1 Topic: lead.indexer Partition: 46 Leader: 2 Replicas: 2 Isr: 2 Topic: lead.indexer Partition: 47 Leader: 1 Replicas: 1 Isr: 1 Topic: lead.indexer Partition: 48 Leader: 2 Replicas: 2 Isr: 2 Topic: lead.indexer Partition: 49 Leader: 1
Partition reassign Kafka 0.8.1.1
Hi, I was testing out Kafka 0.8.1.1 and found that i get the following exception during partition re-assignment : *./kafka-reassign-partitions.sh --path-to-json-file ritest.json --zookeeper localhost:2181* *Partitions reassignment failed due to Partition reassignment currently in progress for Map(). Aborting operation* *kafka.common.AdminCommandFailedException: Partition reassignment currently in progress for Map(). Aborting operation* *at kafka.admin.ReassignPartitionsCommand.reassignPartitions(ReassignPartitionsCommand.scala:91)* *at kafka.admin.ReassignPartitionsCommand$.main(ReassignPartitionsCommand.scala:65)* *at kafka.admin.ReassignPartitionsCommand.main(ReassignPartitionsCommand.scala)* *My JSOn file is as follows * *{partitions:[{topic: ritest1,partition: 3, replicas: [1,2,3] }]}* What am I doing wrong? Regards, Kashyap
Re: Partition reassign Kafka 0.8.1.1
Hi Kashyap, If a previous reassignment is still on going the current one cannot proceed, did you trigger another reassignment before this one? Guozhang On Fri, Jun 27, 2014 at 10:05 AM, Kashyap Mhaisekar kashya...@gmail.com wrote: Hi, I was testing out Kafka 0.8.1.1 and found that i get the following exception during partition re-assignment : *./kafka-reassign-partitions.sh --path-to-json-file ritest.json --zookeeper localhost:2181* *Partitions reassignment failed due to Partition reassignment currently in progress for Map(). Aborting operation* *kafka.common.AdminCommandFailedException: Partition reassignment currently in progress for Map(). Aborting operation* *at kafka.admin.ReassignPartitionsCommand.reassignPartitions(ReassignPartitionsCommand.scala:91)* *at kafka.admin.ReassignPartitionsCommand$.main(ReassignPartitionsCommand.scala:65)* *at kafka.admin.ReassignPartitionsCommand.main(ReassignPartitionsCommand.scala)* *My JSOn file is as follows * *{partitions:[{topic: ritest1,partition: 3, replicas: [1,2,3] }]}* What am I doing wrong? Regards, Kashyap -- -- Guozhang
Re: Partition reassign Kafka 0.8.1.1
*Partitions reassignment failed due to Partition reassignment currently in progress for Map(). The map is empty, so this seems like a bug. Please can you file a JIRA and also attach your server's log4j to it? On Fri, Jun 27, 2014 at 10:05 AM, Kashyap Mhaisekar kashya...@gmail.com wrote: Hi, I was testing out Kafka 0.8.1.1 and found that i get the following exception during partition re-assignment : *./kafka-reassign-partitions.sh --path-to-json-file ritest.json --zookeeper localhost:2181* *Partitions reassignment failed due to Partition reassignment currently in progress for Map(). Aborting operation* *kafka.common.AdminCommandFailedException: Partition reassignment currently in progress for Map(). Aborting operation* *at kafka.admin.ReassignPartitionsCommand.reassignPartitions(ReassignPartitionsCommand.scala:91)* *at kafka.admin.ReassignPartitionsCommand$.main(ReassignPartitionsCommand.scala:65)* *at kafka.admin.ReassignPartitionsCommand.main(ReassignPartitionsCommand.scala)* *My JSOn file is as follows * *{partitions:[{topic: ritest1,partition: 3, replicas: [1,2,3] }]}* What am I doing wrong? Regards, Kashyap
Re: Failed to send messages after 3 tries
I'm not so sure what is causing those exceptions. When you send data, do you see any errors in the server logs? Could you send it around? On Fri, Jun 27, 2014 at 10:00 AM, England, Michael mengl...@homeadvisor.com wrote: Neha, Apologies for the slow response. I was out yesterday. To answer your questions -- Is the LeaderNotAvailableException repeatable? Yes. I happens whenever I send a message to that topic. -- Are you running Kafka in the cloud? No. Does this problem indicate that the topic is corrupt? If so, what would I need to do to clean it up? Thanks, Mike -Original Message- From: Neha Narkhede [mailto:neha.narkh...@gmail.com] Sent: Wednesday, June 25, 2014 11:24 PM To: users@kafka.apache.org Subject: Re: Failed to send messages after 3 tries The output from the list topic tool suggests that a leader is available for all partitions. Is the LeaderNotAvailableException repeatable? Are you running Kafka in the cloud? On Wed, Jun 25, 2014 at 4:03 PM, England, Michael mengl...@homeadvisor.com wrote: By the way, this is what I get when I describe the topic: Topic:lead.indexer PartitionCount:53ReplicationFactor:1 Configs: Topic: lead.indexer Partition: 0Leader: 2 Replicas: 2 Isr: 2 Topic: lead.indexer Partition: 1Leader: 1 Replicas: 1 Isr: 1 Topic: lead.indexer Partition: 2Leader: 2 Replicas: 2 Isr: 2 Topic: lead.indexer Partition: 3Leader: 1 Replicas: 1 Isr: 1 Topic: lead.indexer Partition: 4Leader: 2 Replicas: 2 Isr: 2 Topic: lead.indexer Partition: 5Leader: 1 Replicas: 1 Isr: 1 Topic: lead.indexer Partition: 6Leader: 2 Replicas: 2 Isr: 2 Topic: lead.indexer Partition: 7Leader: 1 Replicas: 1 Isr: 1 Topic: lead.indexer Partition: 8Leader: 2 Replicas: 2 Isr: 2 Topic: lead.indexer Partition: 9Leader: 1 Replicas: 1 Isr: 1 Topic: lead.indexer Partition: 10 Leader: 2 Replicas: 2 Isr: 2 Topic: lead.indexer Partition: 11 Leader: 1 Replicas: 1 Isr: 1 Topic: lead.indexer Partition: 12 Leader: 2 Replicas: 2 Isr: 2 Topic: lead.indexer Partition: 13 Leader: 1 Replicas: 1 Isr: 1 Topic: lead.indexer Partition: 14 Leader: 2 Replicas: 2 Isr: 2 Topic: lead.indexer Partition: 15 Leader: 1 Replicas: 1 Isr: 1 Topic: lead.indexer Partition: 16 Leader: 2 Replicas: 2 Isr: 2 Topic: lead.indexer Partition: 17 Leader: 1 Replicas: 1 Isr: 1 Topic: lead.indexer Partition: 18 Leader: 2 Replicas: 2 Isr: 2 Topic: lead.indexer Partition: 19 Leader: 1 Replicas: 1 Isr: 1 Topic: lead.indexer Partition: 20 Leader: 2 Replicas: 2 Isr: 2 Topic: lead.indexer Partition: 21 Leader: 1 Replicas: 1 Isr: 1 Topic: lead.indexer Partition: 22 Leader: 2 Replicas: 2 Isr: 2 Topic: lead.indexer Partition: 23 Leader: 1 Replicas: 1 Isr: 1 Topic: lead.indexer Partition: 24 Leader: 2 Replicas: 2 Isr: 2 Topic: lead.indexer Partition: 25 Leader: 1 Replicas: 1 Isr: 1 Topic: lead.indexer Partition: 26 Leader: 2 Replicas: 2 Isr: 2 Topic: lead.indexer Partition: 27 Leader: 1 Replicas: 1 Isr: 1 Topic: lead.indexer Partition: 28 Leader: 2 Replicas: 2 Isr: 2 Topic: lead.indexer Partition: 29 Leader: 1 Replicas: 1 Isr: 1 Topic: lead.indexer Partition: 30 Leader: 2 Replicas: 2 Isr: 2 Topic: lead.indexer Partition: 31 Leader: 1 Replicas: 1 Isr: 1 Topic: lead.indexer Partition: 32 Leader: 2 Replicas: 2 Isr: 2 Topic: lead.indexer Partition: 33 Leader: 1 Replicas: 1 Isr: 1 Topic: lead.indexer Partition: 34 Leader: 2 Replicas: 2 Isr: 2 Topic: lead.indexer Partition: 35 Leader: 1 Replicas: 1 Isr: 1 Topic: lead.indexer Partition: 36 Leader: 2 Replicas: 2 Isr: 2 Topic: lead.indexer Partition: 37 Leader: 1 Replicas: 1 Isr: 1 Topic: lead.indexer Partition: 38 Leader: 2 Replicas: 2 Isr: 2 Topic: lead.indexer Partition: 39 Leader: 1 Replicas: 1 Isr: 1 Topic: lead.indexer Partition: 40 Leader: 2 Replicas: 2 Isr: 2 Topic: lead.indexer Partition: 41 Leader: 1 Replicas: 1 Isr: 1 Topic: lead.indexer Partition: 42 Leader: 2 Replicas: 2 Isr: 2 Topic: lead.indexer Partition: 43 Leader: 1 Replicas: 1 Isr: 1 Topic: lead.indexer Partition: 44 Leader: 2 Replicas: 2 Isr: 2 Topic: lead.indexer Partition: 45
Re: message stuck, possible problem setting fetch.message.max.bytes
but I found one message (5.1MB in size) which is clogging my pipeline up Have you ensured that the fetch.message.max.bytes on the consumer config is set to 5.1 MB? On Thu, Jun 26, 2014 at 6:14 PM, Louis Clark sfgypsy...@gmail.com wrote: in the consumer.properties file, I've got (default?): zookeeper.connect=127.0.0.1:2181 zookeeper.connection.timeout.ms=100 group.id=test-consumer-group thanks, -Louis On Thu, Jun 26, 2014 at 6:04 PM, Guozhang Wang wangg...@gmail.com wrote: Hi Louis, What are your consumer's config properties? Guozhang On Thu, Jun 26, 2014 at 5:54 PM, Louis Clark sfgypsy...@gmail.com wrote: Hi, I'm trying to stream large message with Kafka into Spark. Generally this has been working nicely, but I found one message (5.1MB in size) which is clogging my pipeline up. I have these settings in server.properties: fetch.message.max.bytes=10485760 replica.fetch.max.bytes=10485760 message.max.bytes=10485760 fetch.size=10485760 I'm not getting any obvious errors in the logs and I can retrieve the large message with this command: kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic mytopic --fetch-size=10485760 I noticed recently after digging into this problem that the kafkaServer.out log is complaining that the fetch.message.max.bytes parameter is not valid: [2014-06-25 11:33:36,547] WARN Property fetch.message.max.bytes is not valid (kafka.utils.VerifiableProperties) [2014-06-25 11:33:36,547] WARN Property fetch.size is not valid (kafka.utils.VerifiableProperties) That seems like the most critical parameter for my needs. It is apparently not recognizing that it is a parameter despite it being listed on the configuration website (https://kafka.apache.org/08/configuration.html). I'm using 0.8.1.1. Any ideas? many thanks for reading this! -- -- Guozhang
Re: message stuck, possible problem setting fetch.message.max.bytes
I believe so. I have set fetch.message.max.bytes=10485760 in both the consumer.properties and the server.properties config files, then restarted kafka - same problem. I'm following up on some of Guozhang's other suggestions now. One thing I'm confused about (I should read the docs again) is what aspect of Kafka reads consumer.properties. If I'm using a different program (Spark streaming) as consumer, do any Kafka programs/services even read consumer.properties? thanks On Fri, Jun 27, 2014 at 10:31 AM, Neha Narkhede neha.narkh...@gmail.com wrote: but I found one message (5.1MB in size) which is clogging my pipeline up Have you ensured that the fetch.message.max.bytes on the consumer config is set to 5.1 MB? On Thu, Jun 26, 2014 at 6:14 PM, Louis Clark sfgypsy...@gmail.com wrote: in the consumer.properties file, I've got (default?): zookeeper.connect=127.0.0.1:2181 zookeeper.connection.timeout.ms=100 group.id=test-consumer-group thanks, -Louis On Thu, Jun 26, 2014 at 6:04 PM, Guozhang Wang wangg...@gmail.com wrote: Hi Louis, What are your consumer's config properties? Guozhang On Thu, Jun 26, 2014 at 5:54 PM, Louis Clark sfgypsy...@gmail.com wrote: Hi, I'm trying to stream large message with Kafka into Spark. Generally this has been working nicely, but I found one message (5.1MB in size) which is clogging my pipeline up. I have these settings in server.properties: fetch.message.max.bytes=10485760 replica.fetch.max.bytes=10485760 message.max.bytes=10485760 fetch.size=10485760 I'm not getting any obvious errors in the logs and I can retrieve the large message with this command: kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic mytopic --fetch-size=10485760 I noticed recently after digging into this problem that the kafkaServer.out log is complaining that the fetch.message.max.bytes parameter is not valid: [2014-06-25 11:33:36,547] WARN Property fetch.message.max.bytes is not valid (kafka.utils.VerifiableProperties) [2014-06-25 11:33:36,547] WARN Property fetch.size is not valid (kafka.utils.VerifiableProperties) That seems like the most critical parameter for my needs. It is apparently not recognizing that it is a parameter despite it being listed on the configuration website (https://kafka.apache.org/08/configuration.html ). I'm using 0.8.1.1. Any ideas? many thanks for reading this! -- -- Guozhang
Re: Intercept broker operation in Kafka
Hey Ravi, I think what you want is available via log4j and jmx. Log4j is pluggable you can plug in any java code at runtime you want to handle the log events. JMX can be called in any way you like too. -Jay On Mon, Jun 23, 2014 at 11:51 PM, ravi singh rrs120...@gmail.com wrote: Primarily we want to log below date(although this is not the exhaustive list): + any error/exception during kafka start/stop + any error/exception while broker is running + broker state changes like leader re-election, broker goes down, + Current live brokers + new topic creation + when messages are deleted by broker after specified limit + Broker health : memory usage Regards, Ravi On Tue, Jun 24, 2014 at 11:11 AM, Neha Narkhede neha.narkh...@gmail.com wrote: What kind of broker metrics are you trying to push to this centralized logging framework? Thanks, Neha On Jun 23, 2014 8:51 PM, ravi singh rrs120...@gmail.com wrote: Thanks Guozhang/Neha for replies. Here's my use case: We use proprietary application logging in our apps. We are planning to use Kafka brokers in production , but apart from the logs which are already logged using log4j in kafka we want to log the broker stats using our centralized application logging framework. Simply put I want to write an application which could start when the kafka brokers starts, read the broker state and metrics and push it to the centralized logging servers. In ActiveMQ we have a plugin for our proprietary logging. We intercept broker operation and install the plugin into the interceptor chain of the broker. Regards, Ravi On Mon, Jun 23, 2014 at 9:29 PM, Neha Narkhede neha.narkh...@gmail.com wrote: Ravi, Our goal is to provide the best implementation of a set of useful abstractions and features in Kafka. The motivation behind this philosophy is performance and simplicity at the cost of flexibility. In most cases, we can argue that the loss in flexibility is minimal since you can always get that functionality by modeling your application differently, especially if the system supports high performance. ActiveMQ has to support the JMS protocol and hence provide all sorts of hooks and plugins on the brokers at the cost of performance. Could you elaborate more on your use case? There is probably another way to model your application using Kafka. Thanks, Neha On Sat, Jun 21, 2014 at 9:24 AM, ravi singh rrs120...@gmail.com wrote: How do I intercept Kakfa broker operation so that features such as security,logging,etc can be implemented as a pluggable filter. For example we have BrokerFilter class in ActiveMQ , Is there anything similar in Kafka? -- *Regards,* *Ravi* -- *Regards,* *Ravi* -- *Regards,* *Ravi*
Re: Apache Kafka NYC Users Group!
Excellant decision Sent from my iPhone On Jun 27, 2014, at 12:06 PM, Jay Kreps jay.kr...@gmail.com wrote: This is great! -Jay On Thu, Jun 26, 2014 at 5:47 PM, Joe Stein joe.st...@stealth.ly wrote: Hi folks, I just started a new Meetup specifically for Apache Kafka in NYC (everyone is welcome of course) http://www.meetup.com/Apache-Kafka-NYC/ For the last couple of years we have been piggy backing talks and the community with other NYC meetup groups (Storm, Cassandra, Hadoop, etc) and figured it was about time to-do this. I am starting to look for more folks to talk about Kafka (running kafka, eco system, clients, et el) so we can get some meetups scheduled and keep things moving along (also venues). Thanks! /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop /
RE: Failed to send messages after 3 tries
Neha, In state-change.log I see lots of logging from when I last started up kafka, and nothing after that. I do see a bunch of errors of the form: [2014-06-25 13:21:37,124] ERROR Controller 1 epoch 11 initiated state change for partition [lead.indexer,37] from OfflinePartition to OnlinePartition failed (state.change.logger) kafka.common.NoReplicaOnlineException: No replica for partition [lead.indexer,37] is alive. Live brokers are: [Set()], Assigned replicas are: [List(1)] at kafka.controller.OfflinePartitionLeaderSelector.selectLeader(PartitionLeaderSelector.scala:61) at kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:336) at kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:185) at kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:99) at kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:96) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95) at scala.collection.Iterator$class.foreach(Iterator.scala:772) at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45) at scala.collection.mutable.HashMap.foreach(HashMap.scala:95) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742) at kafka.controller.PartitionStateMachine.triggerOnlinePartitionStateChange(PartitionStateMachine.scala:96) at kafka.controller.PartitionStateMachine.startup(PartitionStateMachine.scala:68) at kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:312) at kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:162) at kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:63) at kafka.server.ZookeeperLeaderElector$$anonfun$startup$1.apply$mcZ$sp(ZookeeperLeaderElector.scala:49) at kafka.server.ZookeeperLeaderElector$$anonfun$startup$1.apply(ZookeeperLeaderElector.scala:47) at kafka.server.ZookeeperLeaderElector$$anonfun$startup$1.apply(ZookeeperLeaderElector.scala:47) at kafka.utils.Utils$.inLock(Utils.scala:538) at kafka.server.ZookeeperLeaderElector.startup(ZookeeperLeaderElector.scala:47) at kafka.controller.KafkaController$$anonfun$startup$1.apply$mcV$sp(KafkaController.scala:637) at kafka.controller.KafkaController$$anonfun$startup$1.apply(KafkaController.scala:633) at kafka.controller.KafkaController$$anonfun$startup$1.apply(KafkaController.scala:633) at kafka.utils.Utils$.inLock(Utils.scala:538) at kafka.controller.KafkaController.startup(KafkaController.scala:633) at kafka.server.KafkaServer.startup(KafkaServer.scala:96) at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34) at kafka.Kafka$.main(Kafka.scala:46) at kafka.Kafka.main(Kafka.scala) And also errors of the form: [2014-06-25 13:21:42,502] ERROR Broker 1 aborted the become-follower state change with correlation id 4 from controller 1 epoch 10 for partition [lead.indexer,11] new leader -1 (state.change.logger) Are either of these of concern? In controller.log there I also see logging from start-up, and then nothing. There are no errors, but I do see some warnings. They seem rather benign. Here's a sample: [2014-06-25 13:21:47,678] WARN [OfflinePartitionLeaderSelector]: No broker in ISR is alive for [lead.indexer,45]. Elect leader 1 from live brokers 1. There's potential data loss. (kafka.controller.OfflinePartitionLeaderSelector) [2014-06-25 13:21:47,678] INFO [OfflinePartitionLeaderSelector]: Selected new leader and ISR {leader:1,leader_epoch:3,isr:[1]} for offline partition [lead.indexer,45] (kafka.controller.OfflinePartitionLeaderSelector) In kafka.out I see this error message: [2014-06-27 11:50:01,366] ERROR Closing socket for /10.1.162.67 because of error (kafka.network.Processor) java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.write0(Native Method) at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:89) at sun.nio.ch.IOUtil.write(IOUtil.java:60) at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:450) at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:217) at
kafka 0.8.1.1 log.retention.minutes NOT being honored
Running a mixed 2 broker cluster. Mixed as in one of the broker1 is running 0.8.0 and broker2 one 0.8.1.1 (from the apache release link. Directly using the tar ball, no local build used). I have set the log.retention.minutes=10. However the broker is not honoring the setting. I see its not cleaning the log.dir at all. However when I set the log.retention.hours=1, then it starts cleaning the log. When I have the log.retention.minutes set in the server.properties then I see this logged in server.log: Š.. [2014-06-27 19:21:06,633] WARN Property log.retention.minutes is not valid (kafka.utils.VerifiableProperties) [2014-06-27 19:21:06,633] WARN Property log.retention.minutes is not valid (kafka.utils.VerifiableProperties) ŠŠ I have set these properties too: log.cleaner.enable=true log.cleanup.policy=delete But I see similar warning logged for these properties too. Regards, Virendra
Re: message stuck, possible problem setting fetch.message.max.bytes
thanks for the help. For others who happen upon this thread, the problem was indeed on the consumer side. Spark (0.9.1) needs a bit of help setting the Kafka properties for big messages. // setup Kafka with manual parameters to allow big messaging //see spark/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala val kafkaParams = Map[String, String]( zookeeper.connect - zkQuorum, group.id - group, zookeeper.connection.timeout.ms - 1, fetch.message.max.bytes - 10485760,// 10MB fetch.size - 10485760)// not needed? val lines = kafka.KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topicpMap, StorageLevel.MEMORY_AND_DISK_SER_2).map(_._2) sorry about all the messages on this topic for those of you who aren't getting digests On Fri, Jun 27, 2014 at 10:43 AM, Louis Clark sfgypsy...@gmail.com wrote: I believe so. I have set fetch.message.max.bytes=10485760 in both the consumer.properties and the server.properties config files, then restarted kafka - same problem. I'm following up on some of Guozhang's other suggestions now. One thing I'm confused about (I should read the docs again) is what aspect of Kafka reads consumer.properties. If I'm using a different program (Spark streaming) as consumer, do any Kafka programs/services even read consumer.properties? thanks On Fri, Jun 27, 2014 at 10:31 AM, Neha Narkhede neha.narkh...@gmail.com wrote: but I found one message (5.1MB in size) which is clogging my pipeline up Have you ensured that the fetch.message.max.bytes on the consumer config is set to 5.1 MB? On Thu, Jun 26, 2014 at 6:14 PM, Louis Clark sfgypsy...@gmail.com wrote: in the consumer.properties file, I've got (default?): zookeeper.connect=127.0.0.1:2181 zookeeper.connection.timeout.ms=100 group.id=test-consumer-group thanks, -Louis On Thu, Jun 26, 2014 at 6:04 PM, Guozhang Wang wangg...@gmail.com wrote: Hi Louis, What are your consumer's config properties? Guozhang On Thu, Jun 26, 2014 at 5:54 PM, Louis Clark sfgypsy...@gmail.com wrote: Hi, I'm trying to stream large message with Kafka into Spark. Generally this has been working nicely, but I found one message (5.1MB in size) which is clogging my pipeline up. I have these settings in server.properties: fetch.message.max.bytes=10485760 replica.fetch.max.bytes=10485760 message.max.bytes=10485760 fetch.size=10485760 I'm not getting any obvious errors in the logs and I can retrieve the large message with this command: kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic mytopic --fetch-size=10485760 I noticed recently after digging into this problem that the kafkaServer.out log is complaining that the fetch.message.max.bytes parameter is not valid: [2014-06-25 11:33:36,547] WARN Property fetch.message.max.bytes is not valid (kafka.utils.VerifiableProperties) [2014-06-25 11:33:36,547] WARN Property fetch.size is not valid (kafka.utils.VerifiableProperties) That seems like the most critical parameter for my needs. It is apparently not recognizing that it is a parameter despite it being listed on the configuration website (https://kafka.apache.org/08/configuration.html ). I'm using 0.8.1.1. Any ideas? many thanks for reading this! -- -- Guozhang
kafka producer pulling from custom restAPI
Hi, I have a quick question. Say I have a custom REST API with data in JSON format. Can the Kafka Producer read directly from the REST API and write to a topic? If so, can you point me to the code base? Thanks, Sonali Sonali Parthasarathy RD Developer, Data Insights Accenture Technology Labs 703-341-7432 This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Where allowed by local law, electronic communications with Accenture and its affiliates, including e-mail and instant messaging (including content), may be scanned by our systems for the purposes of information security and assessment of internal compliance with Accenture policy. __ www.accenture.com
Re: kafka producer pulling from custom restAPI
The answer is no, it doesn't work that way. You would have to write a process to consume from the API back end and have that back end write to Kafka On Jun 27, 2014, at 19:35, sonali.parthasara...@accenture.com wrote: Hi, I have a quick question. Say I have a custom REST API with data in JSON format. Can the Kafka Producer read directly from the REST API and write to a topic? If so, can you point me to the code base? Thanks, Sonali Sonali Parthasarathy RD Developer, Data Insights Accenture Technology Labs 703-341-7432 This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Where allowed by local law, electronic communications with Accenture and its affiliates, including e-mail and instant messaging (including content), may be scanned by our systems for the purposes of information security and assessment of internal compliance with Accenture policy. __ www.accenture.com