AFAIK, we can not replay the messages with high level consumer. We need to use simple consumer.
On Sun, Jan 18, 2015 at 12:15 AM, Christopher Piggott <cpigg...@gmail.com> wrote: > Thanks. That helped clear a lot up in my mind. > > I'm trying to high-level consumer now. Occasionally I need to do a replay > of the stream. The example is: > > KafkaStream.iterator(); > > which starts at wherever zookeeper recorded as where you left off. > > With the high level interface, can you request an iterator that starts at > the very beginning? > > > > On Fri, Jan 16, 2015 at 8:55 PM, Manikumar Reddy <ku...@nmsworks.co.in> > wrote: > > > Hi, > > > > 1. In SimpleConsumer, you must keep track of the offsets in your > > application. > > In the example code, "readOffset" variable can be saved in > > redis/zookeeper. > > You should plugin this logic in your code. High Level Consumer stores > > the last > > read offset information in ZooKeeper. > > > > 2. You will get OffsetOutOfRange for any invalid offset. > > On error, you can decide what to do. i.e read from the latest , > earliest > > or some other offset. > > > > 3. https://issues.apache.org/jira/browse/KAFKA-1779 > > > > 4. Yes > > > > > > Manikumar > > > > On Sat, Jan 17, 2015 at 2:25 AM, Christopher Piggott <cpigg...@gmail.com > > > > wrote: > > > > > Hi, > > > > > > I am following this link: > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example > > > > > > for a consumer design using kafka_2.9.2 version 0.8.1.1 (what I can > find > > in > > > maven central). I have a couple of questions about the consumer. I > > > checked the archives and didn't see these exact questions asked > already, > > > but I may have missed them -- I apologize if that is the case. > > > > > > > > > When I create a consumer I give it a consumer ID. I assumed that it > > would > > > store my consumer's name as well as the last readOffset in zookeeper, > but > > > looking in zookeeper that doesn't seem to be the case. So it seems to > me > > > that when my consumers come up they need to either get the entire > history > > > from the start of time (which could take a long time, as I have 14 day > > > durability); or else they need to somehow keep track of the read offset > > > themselves. > > > > > > I have redis in my system already, so I have the choice of keeping > track > > of > > > this in either redis or zookeeper. It seems like zookeeper would be a > > > better idea. Am I right, though, that the SimpleConsumer and the > > example I > > > linked above don't keep track of this, so if I want to do that I would > > have > > > to do it myself? > > > > > > Second question: in the example consumer, there is an error handler > that > > > checks if you received an OffsetOutOfRange response from kafka. If so, > > it > > > gets a new read offset .LatestTime(). My interpretation of this is > that > > > you have asked it for an offset which doesn't make sense, so it just > > scans > > > you to the end of the stream. That's a guaranteed data loss. A simple > > > alternative would be to take the beginning of the stream, which if you > > have > > > idempotent processing would be fine - it would be a replay - but it > could > > > take a long time. > > > > > > I don't know for sure what would cause you to get an OffsetOutOfRange - > > the > > > only thing I can really think of is that someone has changed the > > underlying > > > stream on you (like they deleted and recreated it and didn't tell all > the > > > consumers). I guess it's possible that if I have a 1 day stream > > durability > > > and I stop my consumer for 3 days that it could ask for a readOffset > that > > > no longer exists; it's not clear to me whether or not that would result > > in > > > an OffsetOutOfRange error or not. > > > > > > Does that all make sense? > > > > > > Third question: I set a .maxWait(1000) interpreting that to mean that > > when > > > I make my fetch request the consumer will time out if there are no new > > > messages in 1 second. It doesn't seem tow ork - my call to > > > consumer.fetch() seems to return immediately. Is that expected? > > > > > > Final question: just to confirm: > > > > > > new FetchRequestBuilder().addFetch( topic, shardNum, readOffset, > > > FETCH_SIZE ) > > > > > > FETCH_SIZE is in bytes, not number of messages, so presumably it > fetches > > as > > > many messages as will fit into that many byte buffer? Is that right? > > > > > > Thanks. > > > > > > > > > Christopher Piggott > > > Sr. Staff Engineer > > > Golisano Institute for Sustainability > > > Rochester Institute of Technology > > > > > >