Re: Consumer questions
AFAIK, we can not replay the messages with high level consumer. We need to use simple consumer. On Sun, Jan 18, 2015 at 12:15 AM, Christopher Piggott cpigg...@gmail.com wrote: Thanks. That helped clear a lot up in my mind. I'm trying to high-level consumer now. Occasionally I need to do a replay of the stream. The example is: KafkaStream.iterator(); which starts at wherever zookeeper recorded as where you left off. With the high level interface, can you request an iterator that starts at the very beginning? On Fri, Jan 16, 2015 at 8:55 PM, Manikumar Reddy ku...@nmsworks.co.in wrote: Hi, 1. In SimpleConsumer, you must keep track of the offsets in your application. In the example code, readOffset variable can be saved in redis/zookeeper. You should plugin this logic in your code. High Level Consumer stores the last read offset information in ZooKeeper. 2. You will get OffsetOutOfRange for any invalid offset. On error, you can decide what to do. i.e read from the latest , earliest or some other offset. 3. https://issues.apache.org/jira/browse/KAFKA-1779 4. Yes Manikumar On Sat, Jan 17, 2015 at 2:25 AM, Christopher Piggott cpigg...@gmail.com wrote: Hi, I am following this link: https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example for a consumer design using kafka_2.9.2 version 0.8.1.1 (what I can find in maven central). I have a couple of questions about the consumer. I checked the archives and didn't see these exact questions asked already, but I may have missed them -- I apologize if that is the case. When I create a consumer I give it a consumer ID. I assumed that it would store my consumer's name as well as the last readOffset in zookeeper, but looking in zookeeper that doesn't seem to be the case. So it seems to me that when my consumers come up they need to either get the entire history from the start of time (which could take a long time, as I have 14 day durability); or else they need to somehow keep track of the read offset themselves. I have redis in my system already, so I have the choice of keeping track of this in either redis or zookeeper. It seems like zookeeper would be a better idea. Am I right, though, that the SimpleConsumer and the example I linked above don't keep track of this, so if I want to do that I would have to do it myself? Second question: in the example consumer, there is an error handler that checks if you received an OffsetOutOfRange response from kafka. If so, it gets a new read offset .LatestTime(). My interpretation of this is that you have asked it for an offset which doesn't make sense, so it just scans you to the end of the stream. That's a guaranteed data loss. A simple alternative would be to take the beginning of the stream, which if you have idempotent processing would be fine - it would be a replay - but it could take a long time. I don't know for sure what would cause you to get an OffsetOutOfRange - the only thing I can really think of is that someone has changed the underlying stream on you (like they deleted and recreated it and didn't tell all the consumers). I guess it's possible that if I have a 1 day stream durability and I stop my consumer for 3 days that it could ask for a readOffset that no longer exists; it's not clear to me whether or not that would result in an OffsetOutOfRange error or not. Does that all make sense? Third question: I set a .maxWait(1000) interpreting that to mean that when I make my fetch request the consumer will time out if there are no new messages in 1 second. It doesn't seem tow ork - my call to consumer.fetch() seems to return immediately. Is that expected? Final question: just to confirm: new FetchRequestBuilder().addFetch( topic, shardNum, readOffset, FETCH_SIZE ) FETCH_SIZE is in bytes, not number of messages, so presumably it fetches as many messages as will fit into that many byte buffer? Is that right? Thanks. Christopher Piggott Sr. Staff Engineer Golisano Institute for Sustainability Rochester Institute of Technology
Re: Consumer questions
Thanks. That helped clear a lot up in my mind. I'm trying to high-level consumer now. Occasionally I need to do a replay of the stream. The example is: KafkaStream.iterator(); which starts at wherever zookeeper recorded as where you left off. With the high level interface, can you request an iterator that starts at the very beginning? On Fri, Jan 16, 2015 at 8:55 PM, Manikumar Reddy ku...@nmsworks.co.in wrote: Hi, 1. In SimpleConsumer, you must keep track of the offsets in your application. In the example code, readOffset variable can be saved in redis/zookeeper. You should plugin this logic in your code. High Level Consumer stores the last read offset information in ZooKeeper. 2. You will get OffsetOutOfRange for any invalid offset. On error, you can decide what to do. i.e read from the latest , earliest or some other offset. 3. https://issues.apache.org/jira/browse/KAFKA-1779 4. Yes Manikumar On Sat, Jan 17, 2015 at 2:25 AM, Christopher Piggott cpigg...@gmail.com wrote: Hi, I am following this link: https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example for a consumer design using kafka_2.9.2 version 0.8.1.1 (what I can find in maven central). I have a couple of questions about the consumer. I checked the archives and didn't see these exact questions asked already, but I may have missed them -- I apologize if that is the case. When I create a consumer I give it a consumer ID. I assumed that it would store my consumer's name as well as the last readOffset in zookeeper, but looking in zookeeper that doesn't seem to be the case. So it seems to me that when my consumers come up they need to either get the entire history from the start of time (which could take a long time, as I have 14 day durability); or else they need to somehow keep track of the read offset themselves. I have redis in my system already, so I have the choice of keeping track of this in either redis or zookeeper. It seems like zookeeper would be a better idea. Am I right, though, that the SimpleConsumer and the example I linked above don't keep track of this, so if I want to do that I would have to do it myself? Second question: in the example consumer, there is an error handler that checks if you received an OffsetOutOfRange response from kafka. If so, it gets a new read offset .LatestTime(). My interpretation of this is that you have asked it for an offset which doesn't make sense, so it just scans you to the end of the stream. That's a guaranteed data loss. A simple alternative would be to take the beginning of the stream, which if you have idempotent processing would be fine - it would be a replay - but it could take a long time. I don't know for sure what would cause you to get an OffsetOutOfRange - the only thing I can really think of is that someone has changed the underlying stream on you (like they deleted and recreated it and didn't tell all the consumers). I guess it's possible that if I have a 1 day stream durability and I stop my consumer for 3 days that it could ask for a readOffset that no longer exists; it's not clear to me whether or not that would result in an OffsetOutOfRange error or not. Does that all make sense? Third question: I set a .maxWait(1000) interpreting that to mean that when I make my fetch request the consumer will time out if there are no new messages in 1 second. It doesn't seem tow ork - my call to consumer.fetch() seems to return immediately. Is that expected? Final question: just to confirm: new FetchRequestBuilder().addFetch( topic, shardNum, readOffset, FETCH_SIZE ) FETCH_SIZE is in bytes, not number of messages, so presumably it fetches as many messages as will fit into that many byte buffer? Is that right? Thanks. Christopher Piggott Sr. Staff Engineer Golisano Institute for Sustainability Rochester Institute of Technology
Re: Consumer questions
You can replay the messages with the high level consumer you can even start at whatever position you want. Prior to your consumers starting call ZkUtils.maybeDeletePath(zkClientConnection, /consumers/ + groupId) make sure you have in your consumer properties auto.offset.reset=smallest This way you start at the beginning of the stream once the offsets are gone. If you have many consumers process launching within your group you might want to have a barrier ( http://zookeeper.apache.org/doc/r3.4.6/recipes.html#sc_recipes_eventHandles) so that only one of your launching consumer process does this... if you have only one process or have the ability to-do the operation administratively then no need. You can even trigger this to happen while they are all running...more code to write but 100% doable (and works well if you do it right) have them watch a node, get the notification, stop what they are doing, barrier, delete path (or change the value of the offset so you can start wherever you want), start again... You can also just change the groupId to something brand new when you start up with auto.offset.reset=smallest in your properties, either way. The above is less lint in zk long term. It is all just 1s and 0s and just a matter of how many you put together yourself vs take out of the box that are given too you =8^) /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop / On Sat, Jan 17, 2015 at 2:11 PM, Manikumar Reddy ku...@nmsworks.co.in wrote: AFAIK, we can not replay the messages with high level consumer. We need to use simple consumer. On Sun, Jan 18, 2015 at 12:15 AM, Christopher Piggott cpigg...@gmail.com wrote: Thanks. That helped clear a lot up in my mind. I'm trying to high-level consumer now. Occasionally I need to do a replay of the stream. The example is: KafkaStream.iterator(); which starts at wherever zookeeper recorded as where you left off. With the high level interface, can you request an iterator that starts at the very beginning? On Fri, Jan 16, 2015 at 8:55 PM, Manikumar Reddy ku...@nmsworks.co.in wrote: Hi, 1. In SimpleConsumer, you must keep track of the offsets in your application. In the example code, readOffset variable can be saved in redis/zookeeper. You should plugin this logic in your code. High Level Consumer stores the last read offset information in ZooKeeper. 2. You will get OffsetOutOfRange for any invalid offset. On error, you can decide what to do. i.e read from the latest , earliest or some other offset. 3. https://issues.apache.org/jira/browse/KAFKA-1779 4. Yes Manikumar On Sat, Jan 17, 2015 at 2:25 AM, Christopher Piggott cpigg...@gmail.com wrote: Hi, I am following this link: https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example for a consumer design using kafka_2.9.2 version 0.8.1.1 (what I can find in maven central). I have a couple of questions about the consumer. I checked the archives and didn't see these exact questions asked already, but I may have missed them -- I apologize if that is the case. When I create a consumer I give it a consumer ID. I assumed that it would store my consumer's name as well as the last readOffset in zookeeper, but looking in zookeeper that doesn't seem to be the case. So it seems to me that when my consumers come up they need to either get the entire history from the start of time (which could take a long time, as I have 14 day durability); or else they need to somehow keep track of the read offset themselves. I have redis in my system already, so I have the choice of keeping track of this in either redis or zookeeper. It seems like zookeeper would be a better idea. Am I right, though, that the SimpleConsumer and the example I linked above don't keep track of this, so if I want to do that I would have to do it myself? Second question: in the example consumer, there is an error handler that checks if you received an OffsetOutOfRange response from kafka. If so, it gets a new read offset .LatestTime(). My interpretation of this is that you have asked it for an offset which doesn't make sense, so it just scans you to the end of the stream. That's a guaranteed data loss. A simple alternative would be to take the beginning of the stream, which if you have idempotent processing would be fine - it would be a replay - but it could take a long time. I don't know for sure what would cause you to get an OffsetOutOfRange - the only thing I can really
Re: Consumer questions
Hi, 1. In SimpleConsumer, you must keep track of the offsets in your application. In the example code, readOffset variable can be saved in redis/zookeeper. You should plugin this logic in your code. High Level Consumer stores the last read offset information in ZooKeeper. 2. You will get OffsetOutOfRange for any invalid offset. On error, you can decide what to do. i.e read from the latest , earliest or some other offset. 3. https://issues.apache.org/jira/browse/KAFKA-1779 4. Yes Manikumar On Sat, Jan 17, 2015 at 2:25 AM, Christopher Piggott cpigg...@gmail.com wrote: Hi, I am following this link: https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example for a consumer design using kafka_2.9.2 version 0.8.1.1 (what I can find in maven central). I have a couple of questions about the consumer. I checked the archives and didn't see these exact questions asked already, but I may have missed them -- I apologize if that is the case. When I create a consumer I give it a consumer ID. I assumed that it would store my consumer's name as well as the last readOffset in zookeeper, but looking in zookeeper that doesn't seem to be the case. So it seems to me that when my consumers come up they need to either get the entire history from the start of time (which could take a long time, as I have 14 day durability); or else they need to somehow keep track of the read offset themselves. I have redis in my system already, so I have the choice of keeping track of this in either redis or zookeeper. It seems like zookeeper would be a better idea. Am I right, though, that the SimpleConsumer and the example I linked above don't keep track of this, so if I want to do that I would have to do it myself? Second question: in the example consumer, there is an error handler that checks if you received an OffsetOutOfRange response from kafka. If so, it gets a new read offset .LatestTime(). My interpretation of this is that you have asked it for an offset which doesn't make sense, so it just scans you to the end of the stream. That's a guaranteed data loss. A simple alternative would be to take the beginning of the stream, which if you have idempotent processing would be fine - it would be a replay - but it could take a long time. I don't know for sure what would cause you to get an OffsetOutOfRange - the only thing I can really think of is that someone has changed the underlying stream on you (like they deleted and recreated it and didn't tell all the consumers). I guess it's possible that if I have a 1 day stream durability and I stop my consumer for 3 days that it could ask for a readOffset that no longer exists; it's not clear to me whether or not that would result in an OffsetOutOfRange error or not. Does that all make sense? Third question: I set a .maxWait(1000) interpreting that to mean that when I make my fetch request the consumer will time out if there are no new messages in 1 second. It doesn't seem tow ork - my call to consumer.fetch() seems to return immediately. Is that expected? Final question: just to confirm: new FetchRequestBuilder().addFetch( topic, shardNum, readOffset, FETCH_SIZE ) FETCH_SIZE is in bytes, not number of messages, so presumably it fetches as many messages as will fit into that many byte buffer? Is that right? Thanks. Christopher Piggott Sr. Staff Engineer Golisano Institute for Sustainability Rochester Institute of Technology
Re: Consumer questions: 0.8.0 vs. 0.7.2
Hi, I was able to implement my own lookup code but have a few concerns about this long term: - the Broker class is marked as 'private' in the Scala code. IntelliJ gives me an error about using it, but the runtime lets me use it and get the host/port out. - I have to know a lot about the structure of some internal classes for this to work, so changes in the implementation would cause my logic to break. I did a quick JIRA search and didn't see a request for a java-api for finding the primary, is that already on the roadmap or should I submit an enhancement request? Thanks, Chris On Wed, Nov 28, 2012 at 2:08 PM, Neha Narkhede neha.narkh...@gmail.comwrote: snip Also, there are 2 ways to send the topic metadata request. One way is how SimpleConsumerShell.scala uses ClientUtils.fetchTopicMetadata(). Another way is by using the send() API on SyncProducer. Thanks, Neha
Re: Consumer questions: 0.8.0 vs. 0.7.2
There is no reason the ClientUtils.fetchTopicMetadata should take broker instances, that is totally unusable. I have a patch for this included in KAFKA-642. -Jay On Mon, Dec 3, 2012 at 7:53 AM, Chris Curtin curtin.ch...@gmail.com wrote: Hi, I was able to implement my own lookup code but have a few concerns about this long term: - the Broker class is marked as 'private' in the Scala code. IntelliJ gives me an error about using it, but the runtime lets me use it and get the host/port out. - I have to know a lot about the structure of some internal classes for this to work, so changes in the implementation would cause my logic to break. I did a quick JIRA search and didn't see a request for a java-api for finding the primary, is that already on the roadmap or should I submit an enhancement request? Thanks, Chris On Wed, Nov 28, 2012 at 2:08 PM, Neha Narkhede neha.narkh...@gmail.com wrote: snip Also, there are 2 ways to send the topic metadata request. One way is how SimpleConsumerShell.scala uses ClientUtils.fetchTopicMetadata(). Another way is by using the send() API on SyncProducer. Thanks, Neha