Did you turn on automatic offset committing? If yes then this issue should
not happen as later runs will just consume data from the last committed
offset.

Guozhang

On Mon, Apr 6, 2015 at 5:16 PM, Jack <jac...@gmail.com> wrote:

> Hi Guozhang,
>
> When I switched to auto.offset.reset to smallest, it will work. However, it
> will generate a lot of data and it will slow down the verification.
>
> Thanks,
>
> -Jack
>
> On Mon, Apr 6, 2015 at 5:07 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Jack,
> >
> > Could you just change "auto.offset.reset" to smallest and see if this
> issue
> > goes away? It is not related to the producer end.
> >
> > Guozhang
> >
> > On Mon, Apr 6, 2015 at 4:14 PM, Jack <jac...@gmail.com> wrote:
> >
> > > Hi Guozhang,
> > >
> > > Thanks so much for replying, first of all.
> > >
> > > Here is the config we have:
> > >
> > > group.id -> 'some unique id'
> > > zookeeper.connect -> 'zookeeper host'
> > > auto.commit.enabled -> false
> > > 'auto.offset.reset' -> largest
> > > consumer.timeout.ms -> -1
> > > fetch.message.max.bytes -> 10M
> > >
> > > So it seems like we need to make sure the submitted future returns
> before
> > > performing action actions which eventually generate the message we
> > expect.
> > >
> > > Cheers,
> > >
> > > -Jack
> > >
> > >
> > >
> > > On Mon, Apr 6, 2015 at 4:04 PM, Guozhang Wang <wangg...@gmail.com>
> > wrote:
> > >
> > > > Jack,
> > > >
> > > > Your theory is correct if your consumer config set auto.offset.reset
> to
> > > > latest and you do not have any committed offsets before. Could you
> list
> > > > your consumer configs and see if that is the case?
> > > >
> > > > Guozhang
> > > >
> > > > On Mon, Apr 6, 2015 at 3:15 PM, Jack <jac...@gmail.com> wrote:
> > > >
> > > > > Hi folks,
> > > > >
> > > > > I have a quick question.
> > > > >
> > > > > We are using 0.8.1 and running into this weird problem. We are
> using
> > > > > HighLevelConsumer for this topic. We created 64 partitions for this
> > > > > message.
> > > > >
> > > > > In our service, we first create a Consumer object as usual, and
> then
> > we
> > > > > went ahead, calls 'createMessageStreans' with
> Map('topic_name'->64).
> > It
> > > > > returns us a Se[KafkaStream], For each stream object in the
> sequence,
> > > we
> > > > > submit a task like the following to the pool.
> > > > >
> > > > > threadpool.submit(new Runnable {
> > > > >    override def run() = {
> > > > >       stream.iterator().foreach { msg => ...}
> > > > >   }
> > > > > }
> > > > >
> > > > > The problem we ran into is that after all the above established,
> any
> > > > > message showing up in kafka, we should be able to get it from
> > consumer
> > > > > side. But in reality, for some reason, occasionally, we don't see
> > these
> > > > > message (we do see these message in the log though).
> > > > >
> > > > >  Some team members believe that the stream might get a later
> offset,
> > > thus
> > > > > not being able to see the earlier messages.
> > > > >
> > > > > I really doubt that statement and want to see if anyone could shed
> > any
> > > > > light upon this?
> > > > >
> > > > > One possible theory from me is that the offset won't be given until
> > > > > stream.iterator().next is called, but since the task submission is
> > > > > asynchronous (we don't wait for each submission and then produce
> > > message
> > > > to
> > > > > kafka), that could get us a later offset, which might not contains
> > the
> > > > > message we want). One possible solution to that is perform any
> action
> > > > which
> > > > > produce messages to kafka, after all these submitted tasks returns.
> > > > >
> > > > > Any thoughts?
> > > > >
> > > > > Thanks,
> > > > >
> > > > > -Jack
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Reply via email to