Consumer's Group.id and autooffset.reset
Would some one explain the interplay between the group.id and autooffset.reset config parameters? I wrote a simple Consumer that had group.id = XYZZY autooffset.reset = smallest My intention was to write something similar to Kafka's ConsoleConsumer; that each time my Consumer ran, it would start at the beginning and read all of the messages available. I discovered that it only read the messages the first time, and that I need to change the group.id and reset to smallest. A general question is why? A specific question is: What are use cases where I would change the group.id, what are use cases where I want it to remain the same? Thanks, C. Helck ** This communication and all information contained in or attached to it (including, but not limited to market prices/levels and market commentary) (the “Information”) is for informational purposes only, is confidential, may be legally privileged and is the intellectual property of one of the companies of ICAP plc group (“ICAP”) or third parties. The Information is not, and should not be construed as, an offer, bid, recommendation or solicitation in relation to any financial instrument or investment or to participate in any particular trading strategy. The Information is not to be relied upon and is not warranted, including, but not limited, as to completeness, timeliness or accuracy and is subject to change without notice. All representations and warranties are expressly disclaimed. Access to the Information by anyone other than the intended recipient is unauthorised and any disclosure, copying or redistribution is prohibited. If you receive this message in error, please immediately delete all copies of it and notify the sender. For further information, please see ebs.com. ** --- This email has been scanned for email related threats and delivered safely by Mimecast. For more information please visit http://www.mimecast.com ---
Re: Consumer's Group.id and autooffset.reset
Hi Chris, The reason that it does not read from the beginning after the first time is probably due to offset commit. Could you check if you set the config auto.commit.enable to true? Setting it to false would solve your problem that you do not need to change to a new group.id everytime. Guozhang On Mon, May 5, 2014 at 5:28 AM, Chris Helck chris.he...@ebs.com wrote: Would some one explain the interplay between the group.id and autooffset.reset config parameters? I wrote a simple Consumer that had group.id = XYZZY autooffset.reset = smallest My intention was to write something similar to Kafka's ConsoleConsumer; that each time my Consumer ran, it would start at the beginning and read all of the messages available. I discovered that it only read the messages the first time, and that I need to change the group.id and reset to smallest. A general question is why? A specific question is: What are use cases where I would change the group.id, what are use cases where I want it to remain the same? Thanks, C. Helck ** This communication and all information contained in or attached to it (including, but not limited to market prices/levels and market commentary) (the “Information”) is for informational purposes only, is confidential, may be legally privileged and is the intellectual property of one of the companies of ICAP plc group (“ICAP”) or third parties. The Information is not, and should not be construed as, an offer, bid, recommendation or solicitation in relation to any financial instrument or investment or to participate in any particular trading strategy. The Information is not to be relied upon and is not warranted, including, but not limited, as to completeness, timeliness or accuracy and is subject to change without notice. All representations and warranties are expressly disclaimed. Access to the Information by anyone other than the intended recipient is unauthorised and any disclosure, copying or redistribution is prohibited. If you receive this message in error, please immediately delete all copies of it and notify the sender. For further information, please see ebs.com. ** --- This email has been scanned for email related threats and delivered safely by Mimecast. For more information please visit http://www.mimecast.com --- -- -- Guozhang
Re: log.retention.size
Yes, your understanding is correct. A global knob that controls aggregate log size may make sense. What would be the expected behavior when that limit is reached? Would you reduce the retention uniformly across all topics? Then, it just means that some of the logs may not be retained as long as you want. Also, we need to think through what happens when every log has only 1 segment left and yet the total size still exceeds the limit. Do we roll log segments early? Thanks, Jun On Sun, May 4, 2014 at 4:31 AM, vinh v...@loggly.com wrote: Thanks Jun. So if I understand this correctly, there really is no master property to control the total aggregate size of all Kafka data files on a broker. log.retention.size and log.file.size are great for managing data at the application level. In our case, application needs change frequently, and performance itself is an ever evolving feature. This means various configs are constantly changing, like topics, # of partitions, etc. What rarely changes though is provisioned hardware resources. So a setting to control the total aggregate size of Kafka logs (or persisted data, for better clarity) would definitely simplify things at an operational level, regardless what happens at the application level. On May 2, 2014, at 7:49 AM, Jun Rao jun...@gmail.com wrote: log.retention.size controls the total size in a log dir (per partition). log.file.size controls the size of each log segment in the log dir. Thanks, Jun On Thu, May 1, 2014 at 9:31 PM, vinh v...@loggly.com wrote: In the 0.7 docs, the description for log.retention.size and log.file.size sound very much the same. In particular, that they apply to a single log file (or log segment file). http://kafka.apache.org/07/configuration.html I'm beginning to think there is no setting to control the max aggregate size of all logs. If this is correct, what would be a good approach to enforce this requirement? In my particular scenario, I have a lot of data being written to Kafka at a very high rate. So a 1TB disk can easily be filled up in 24hrs or so. One option is to add more Kafka brokers to add more disk space to the pool, but I'd like to avoid that and see if I can simply configure Kafka to not write more than 1TB aggregate. Else, Kafka will OOM and kill itself, and possibly the crash the node itself because the disk is full. On May 1, 2014, at 9:21 PM, vinh v...@loggly.com wrote: Using Kafka 0.7.2, I have the following in server.properties: log.retention.hours=48 log.retention.size=107374182400 log.file.size=536870912 My interpretation of this is: a) a single log segment file over 48hrs old will be deleted b) the total combined size of *all* logs is 100GB c) a single log segment file is limited to 500MB in size before a new segment file is spawned spawning a new segment file d) a log file can be composed of many log segment files But, even after setting the above, I find that the total combined size of all Kafka logs on disk is 200GB right now. Isn't log.retention.size supposed to limit it to 100GB? Am I missing something? The docs are not really clear, especially when it comes to distinguishing between a log file and a log segment file. I have disk monitoring. But like anything else in software, even monitoring can fail. Via configuration, I'd like to make sure that Kafka does not write more than the available disk space. Or something like log4j, where I can set a max number of log files and the max size per file, which essentially allows me to set a max aggregate size limit across all logs. Thanks, -Vinh
Re: Log Retention in Kafka
See http://kafka.apache.org/documentation.html#basic_ops_modify_topic Thanks, Jun On Sun, May 4, 2014 at 10:11 PM, Kashyap Mhaisekar kashya...@gmail.comwrote: Is there a way to do this at runtime using some available scripts in kafka/bin? If so, any pointers on which script? Regards, Kashyap On Tue, Apr 22, 2014 at 11:11 PM, Kashyap Mhaisekar kashya...@gmail.com wrote: Thanks Joel. Am using version 2.8.0. Thanks, Kashyap On Tue, Apr 22, 2014 at 5:53 PM, Joel Koshy jjkosh...@gmail.com wrote: Which version of Kafka are you using? You can read up on the configuration options here: http://kafka.apache.org/documentation.html#configuration You can specify time-based retention using log.retention.minutes which will apply to all topics. You can override that on per-topic basis - see further down in the above page under topic-level configuration On Tue, Apr 22, 2014 at 02:34:24PM -0500, Kashyap Mhaisekar wrote: Hi, I wanted to set the message expiry for a message on a kafka topic. Is there anything like this in kafka? I came across a property - *log.retention.hours* and *topic.log.retention.hours* Had some queries around it.And it was mentioned that topic.log.retention.hours is per topic configuration. Had some queries around it - 1. Does it mean that I need to specific the topicname.log.retention.hours in the kafka config? 2. Can this property be overriden anywhere? 3. Is it possible for the producer to set a message expiry so that the message expires after a configurable period of time? Regards, Kashyap
0.7.1 Will there be any log info under poor network which packet may lose?
Our network packet loss between the producer and kafka broker can reach 2~5% when the traffic load is high, but in producer's log file, we see no related error logs. Is this expected or there should be some warnings or errors in the log file? PS. we do see some Event queue is full of unsent messages errors in the log, not sure if it has any thing to do with this. -- Best Regards -- 刘明敏 | mmLiu
using Maven to build Java clients for Kafka, looking for POM
All, Does anyone have a POM to build Java consumers and producers for Kafka? Is there an archetype for Maven? David Novogrodsky david.novogrod...@gmail.com http://www.linkedin.com/in/davidnovogrodsky
One Way Kafka (or seperate ports for producers and consumers)
Hey list, I would like to use Kafka as an ingest point for sensitive data in a large production network. We would like to set it up in such a way that anyone that wants can publish data through the brokers, but we want to restrict access for consumers to just a small subset of machines. I'm sure there are other solutions which might be better, so please let me know if there is a way to do this, but one way I was considering is allowing consumers to connect to one port, and producers to connect to another, and doing the IP based access control with firewalls. Thanks, Rick
Re: Log Retention in Kafka
Thanks Jun. I am using kafka_2.8.0-0.8.0-beta1.jar. I dont see this script kafka-topic.sh in the bin folder. Is there way to do this in the version mentioned? --Kashyap On Mon, May 5, 2014 at 11:18 AM, Jun Rao jun...@gmail.com wrote: See http://kafka.apache.org/documentation.html#basic_ops_modify_topic Thanks, Jun On Sun, May 4, 2014 at 10:11 PM, Kashyap Mhaisekar kashya...@gmail.com wrote: Is there a way to do this at runtime using some available scripts in kafka/bin? If so, any pointers on which script? Regards, Kashyap On Tue, Apr 22, 2014 at 11:11 PM, Kashyap Mhaisekar kashya...@gmail.com wrote: Thanks Joel. Am using version 2.8.0. Thanks, Kashyap On Tue, Apr 22, 2014 at 5:53 PM, Joel Koshy jjkosh...@gmail.com wrote: Which version of Kafka are you using? You can read up on the configuration options here: http://kafka.apache.org/documentation.html#configuration You can specify time-based retention using log.retention.minutes which will apply to all topics. You can override that on per-topic basis - see further down in the above page under topic-level configuration On Tue, Apr 22, 2014 at 02:34:24PM -0500, Kashyap Mhaisekar wrote: Hi, I wanted to set the message expiry for a message on a kafka topic. Is there anything like this in kafka? I came across a property - *log.retention.hours* and *topic.log.retention.hours* Had some queries around it.And it was mentioned that topic.log.retention.hours is per topic configuration. Had some queries around it - 1. Does it mean that I need to specific the topicname.log.retention.hours in the kafka config? 2. Can this property be overriden anywhere? 3. Is it possible for the producer to set a message expiry so that the message expires after a configurable period of time? Regards, Kashyap
Re: Review for the new consumer APIs
Hi Neha, How will new Consumer help us with implementing following use case? We have heartbeat as one of topics and all application servers publish metric to this topic. We have to meet near real-time consume SLA (less than 30 seconds). 1) We would like to find out what is latest message per partition that current consumer is connected? 2) If the consumer lags behind by certain offset or by time, consumer can seek to particular offset(which we can use seek method for this). 3) How can we start a temp consumer for same partition to read messages based on offset range (last consume offset from part 2 to current offset that we jumped to in part 2) ? Basically, is there a QOS concept per partition where consumer always needs to consume latest message and detect a lag behind and start TEMP consumer for back-fill. How does Linked in handle the near real time consumption for operation metrics ? Thanks, Bhavesh On Sat, Apr 12, 2014 at 6:58 PM, Neha Narkhede neha.narkh...@gmail.comwrote: Why cant we pass a callback in subscribe itself? Mainly because it will make the processing kind of awkward since you need to access the other consumer APIs while processing the messages. Your suggestion does point out a problem with the poll() API though. Here is the initial proposal of the poll() API- ListConsumerRecord poll(long timeout, TimeUnit unit); The application subscribes to topics or partitions and expects to process messages per topic or per partition respectively. By just returning a list of ConsumerRecord objects, we make it difficult for the application to process messages naturally grouped by topic or partition. After some thought, I changed it to - MapString, ConsumerRecordMetadata poll(long timeout, TimeUnit unit); ConsumerRecordMetadata allows you to get records for a particular partition or get records for all partitions. The second change I made is to the commit APIs. To remain consistent with the Producer, I changed commit() to return a Future and got rid of commitAsync(). This will easily support the sync and async commit use cases. MapTopicPartition,OffsetMetadata http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/org/apache/kafka/clients/consumer/OffsetMetadata.html commit(MapTopicPartition,Long offsets); I'm looking for feedback on these changes. I've published the new javadoc to the same location http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc. Appreciate if someone can take a look. Thanks, Neha On Tue, Apr 8, 2014 at 9:50 PM, pushkar priyadarshi priyadarshi.push...@gmail.com wrote: Was trying to understand when we have subscribe then why poll is a separate API.Why cant we pass a callback in subscribe itself? On Mon, Apr 7, 2014 at 9:51 PM, Neha Narkhede neha.narkh...@gmail.com wrote: Hi, I'm looking for people to review the new consumers APIs. Patch is posted at https://issues.apache.org/jira/browse/KAFKA-1328 Thanks, Neha
QOS on Producer Side
We are using Kafka for operation metrics and we do not want to loose any data at all if there is issue with Network or all brokers needs to be rebooted or operation reason down time while all producers are still producing data on front end side. We use async to publish messages and we are using Kafka version 0.8.0. Has any one implemented buffering on local disk (on producer side) and transmit messages when network connection is restored? How do I get handle to list of messages async thread could not transfer after x reties ? I know new producer API has callback interface, but is per message not per producer instance ? Is this final new Producer API ? http://empathybox.com/kafka-javadoc/index.html?kafka/clients/producer/KafkaProducer.html is there a plan to add method like. onFailure(ListMessages messages, Exception exception ) ? Basically, I have to address the QOS on producer side, and be able to buffer on disk and retransmit all message to partitions that are reserved for messages that happened in past. How does Linked-in handle QOS on producer side ? Is there any plan to add this QOS feature on Producer Side with Strategies to store and retransmit the message ? If we do get the list of messages is call back, will it be compressed data ? I would appreciate Kafka Developers and others feedback on how to implement QOS. Thanks, Bhavesh
Re: JAVA HEAP settings for KAFKA in production
I apologize for taking a couple days to jump in on this. We¹re currently running JDK 1.7 u51, and we¹ve switched over to the G1 collector. If you do this (and I highly recommend it), make sure you¹re on u51. We tried out u21 in testing, but we had a number of problems with the GC implementation in that version. Our tuning looks like this: -Xms4g -Xmx4g -XX:PermSize=48m -XX:MaxPermSize=48m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 For reference, I¹m looking at the stats on one of our busiest clusters (at peak): - 15 brokers - 15.5k partitions (replication factor 2) - 400k messages/sec in - 70 MB/sec inbound, 400 MB/sec+ outbound The tuning looks fairly aggressive, but all of our brokers in that cluster have a 90% GC pause time of about 21ms, and they¹re doing less than 1 young GC per second. We haven¹t seen a single full GC on those brokers in the last month, and previous to that I think we only saw them when I was messing around with the cluster in a very painful way, not under anything approaching normal traffic. -Todd On 5/1/14, 9:21 PM, Neha Narkhede neha.narkh...@gmail.com wrote: The GC settings at http://kafka.apache.org/documentation.html#java are old. We meant to update the documentation with the new GC settings using the G7 collector, but we haven't gotten around to doing that. Let me reach out to our engineer, Todd Palino, who worked on tuning GC for Kafka at LinkedIn to see if we can update our docs. Thanks, Neha On Thu, May 1, 2014 at 9:02 PM, Jun Rao jun...@gmail.com wrote: http://kafka.apache.org/documentation.html#java Thanks, Jun On Thu, May 1, 2014 at 12:19 PM, Cassa L lcas...@gmail.com wrote: Hi, I want to know what usually, are the JAVA_HEAP settings recommended for kafka servers in production. Thanks LCassa