Jack,

Could you just change "auto.offset.reset" to smallest and see if this issue
goes away? It is not related to the producer end.

Guozhang

On Mon, Apr 6, 2015 at 4:14 PM, Jack <jac...@gmail.com> wrote:

> Hi Guozhang,
>
> Thanks so much for replying, first of all.
>
> Here is the config we have:
>
> group.id -> 'some unique id'
> zookeeper.connect -> 'zookeeper host'
> auto.commit.enabled -> false
> 'auto.offset.reset' -> largest
> consumer.timeout.ms -> -1
> fetch.message.max.bytes -> 10M
>
> So it seems like we need to make sure the submitted future returns before
> performing action actions which eventually generate the message we expect.
>
> Cheers,
>
> -Jack
>
>
>
> On Mon, Apr 6, 2015 at 4:04 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Jack,
> >
> > Your theory is correct if your consumer config set auto.offset.reset to
> > latest and you do not have any committed offsets before. Could you list
> > your consumer configs and see if that is the case?
> >
> > Guozhang
> >
> > On Mon, Apr 6, 2015 at 3:15 PM, Jack <jac...@gmail.com> wrote:
> >
> > > Hi folks,
> > >
> > > I have a quick question.
> > >
> > > We are using 0.8.1 and running into this weird problem. We are using
> > > HighLevelConsumer for this topic. We created 64 partitions for this
> > > message.
> > >
> > > In our service, we first create a Consumer object as usual, and then we
> > > went ahead, calls 'createMessageStreans' with Map('topic_name'->64). It
> > > returns us a Se[KafkaStream], For each stream object in the sequence,
> we
> > > submit a task like the following to the pool.
> > >
> > > threadpool.submit(new Runnable {
> > >    override def run() = {
> > >       stream.iterator().foreach { msg => ...}
> > >   }
> > > }
> > >
> > > The problem we ran into is that after all the above established, any
> > > message showing up in kafka, we should be able to get it from consumer
> > > side. But in reality, for some reason, occasionally, we don't see these
> > > message (we do see these message in the log though).
> > >
> > >  Some team members believe that the stream might get a later offset,
> thus
> > > not being able to see the earlier messages.
> > >
> > > I really doubt that statement and want to see if anyone could shed any
> > > light upon this?
> > >
> > > One possible theory from me is that the offset won't be given until
> > > stream.iterator().next is called, but since the task submission is
> > > asynchronous (we don't wait for each submission and then produce
> message
> > to
> > > kafka), that could get us a later offset, which might not contains the
> > > message we want). One possible solution to that is perform any action
> > which
> > > produce messages to kafka, after all these submitted tasks returns.
> > >
> > > Any thoughts?
> > >
> > > Thanks,
> > >
> > > -Jack
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Reply via email to