AFAIK, we can not replay the messages with high level consumer. We need to
use simple consumer.

On Sun, Jan 18, 2015 at 12:15 AM, Christopher Piggott <cpigg...@gmail.com>
wrote:

> Thanks.  That helped clear a lot up in my mind.
>
> I'm trying to high-level consumer now.  Occasionally I need to do a replay
> of the stream.  The example is:
>
>    KafkaStream.iterator();
>
> which starts at wherever zookeeper recorded as where you left off.
>
> With the high level interface, can you request an iterator that starts at
> the very beginning?
>
>
>
> On Fri, Jan 16, 2015 at 8:55 PM, Manikumar Reddy <ku...@nmsworks.co.in>
> wrote:
>
> > Hi,
> >
> > 1. In SimpleConsumer, you must keep track of the offsets in your
> > application.
> >    In the example code,  "readOffset"  variable  can be saved in
> > redis/zookeeper.
> >    You should plugin this logic in your code. High Level Consumer stores
> > the last
> >    read offset information in ZooKeeper.
> >
> > 2. You will get OffsetOutOfRange for any invalid offset.
> >    On error, you can decide what to do. i.e read from the latest ,
> earliest
> > or some other offset.
> >
> > 3. https://issues.apache.org/jira/browse/KAFKA-1779
> >
> > 4. Yes
> >
> >
> > Manikumar
> >
> > On Sat, Jan 17, 2015 at 2:25 AM, Christopher Piggott <cpigg...@gmail.com
> >
> > wrote:
> >
> > > Hi,
> > >
> > > I am following this link:
> > >
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
> > >
> > > for a consumer design using kafka_2.9.2 version 0.8.1.1 (what I can
> find
> > in
> > > maven central).  I have a couple of questions about the consumer.  I
> > > checked the archives and didn't see these exact questions asked
> already,
> > > but I may have missed them -- I apologize if that is the case.
> > >
> > >
> > > When I create a consumer I give it a consumer ID.  I assumed that it
> > would
> > > store my consumer's name as well as the last readOffset in zookeeper,
> but
> > > looking in zookeeper that doesn't seem to be the case.  So it seems to
> me
> > > that when my consumers come up they need to either get the entire
> history
> > > from the start of time (which could take a long time, as I have 14 day
> > > durability); or else they need to somehow keep track of the read offset
> > > themselves.
> > >
> > > I have redis in my system already, so I have the choice of keeping
> track
> > of
> > > this in either redis or zookeeper.  It seems like zookeeper would be a
> > > better idea.  Am I right, though, that the SimpleConsumer and the
> > example I
> > > linked above don't keep track of this, so if I want to do that I would
> > have
> > > to do it myself?
> > >
> > > Second question: in the example consumer, there is an error handler
> that
> > > checks if you received an OffsetOutOfRange response from kafka.  If so,
> > it
> > > gets a new read offset .LatestTime().  My interpretation of this is
> that
> > > you have asked it for an offset which doesn't make sense, so it just
> > scans
> > > you to the end of the stream.  That's a guaranteed data loss.  A simple
> > > alternative would be to take the beginning of the stream, which if you
> > have
> > > idempotent processing would be fine - it would be a replay - but it
> could
> > > take a long time.
> > >
> > > I don't know for sure what would cause you to get an OffsetOutOfRange -
> > the
> > > only thing I can really think of is that someone has changed the
> > underlying
> > > stream on you (like they deleted and recreated it and didn't tell all
> the
> > > consumers).  I guess it's possible that if I have a 1 day stream
> > durability
> > > and I stop my consumer for 3 days that it could ask for a readOffset
> that
> > > no longer exists; it's not clear to me whether or not that would result
> > in
> > > an OffsetOutOfRange error or not.
> > >
> > > Does that all make sense?
> > >
> > > Third question: I set a .maxWait(1000) interpreting that to mean that
> > when
> > > I make my fetch request the consumer will time out if there are no new
> > > messages in 1 second.  It doesn't seem tow ork - my call to
> > > consumer.fetch() seems to return immediately.  Is that expected?
> > >
> > > Final question: just to confirm:
> > >
> > >     new FetchRequestBuilder().addFetch( topic, shardNum, readOffset,
> > > FETCH_SIZE )
> > >
> > > FETCH_SIZE is in bytes, not number of messages, so presumably it
> fetches
> > as
> > > many messages as will fit into that many byte buffer?  Is that right?
> > >
> > > Thanks.
> > >
> > >
> > > Christopher Piggott
> > > Sr. Staff Engineer
> > > Golisano Institute for Sustainability
> > > Rochester Institute of Technology
> > >
> >
>

Reply via email to