Hi Guozhang,

Thanks so much for replying, first of all.

Here is the config we have:

group.id -> 'some unique id'
zookeeper.connect -> 'zookeeper host'
auto.commit.enabled -> false
'auto.offset.reset' -> largest
consumer.timeout.ms -> -1
fetch.message.max.bytes -> 10M

So it seems like we need to make sure the submitted future returns before
performing action actions which eventually generate the message we expect.

Cheers,

-Jack



On Mon, Apr 6, 2015 at 4:04 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Jack,
>
> Your theory is correct if your consumer config set auto.offset.reset to
> latest and you do not have any committed offsets before. Could you list
> your consumer configs and see if that is the case?
>
> Guozhang
>
> On Mon, Apr 6, 2015 at 3:15 PM, Jack <jac...@gmail.com> wrote:
>
> > Hi folks,
> >
> > I have a quick question.
> >
> > We are using 0.8.1 and running into this weird problem. We are using
> > HighLevelConsumer for this topic. We created 64 partitions for this
> > message.
> >
> > In our service, we first create a Consumer object as usual, and then we
> > went ahead, calls 'createMessageStreans' with Map('topic_name'->64). It
> > returns us a Se[KafkaStream], For each stream object in the sequence, we
> > submit a task like the following to the pool.
> >
> > threadpool.submit(new Runnable {
> >    override def run() = {
> >       stream.iterator().foreach { msg => ...}
> >   }
> > }
> >
> > The problem we ran into is that after all the above established, any
> > message showing up in kafka, we should be able to get it from consumer
> > side. But in reality, for some reason, occasionally, we don't see these
> > message (we do see these message in the log though).
> >
> >  Some team members believe that the stream might get a later offset, thus
> > not being able to see the earlier messages.
> >
> > I really doubt that statement and want to see if anyone could shed any
> > light upon this?
> >
> > One possible theory from me is that the offset won't be given until
> > stream.iterator().next is called, but since the task submission is
> > asynchronous (we don't wait for each submission and then produce message
> to
> > kafka), that could get us a later offset, which might not contains the
> > message we want). One possible solution to that is perform any action
> which
> > produce messages to kafka, after all these submitted tasks returns.
> >
> > Any thoughts?
> >
> > Thanks,
> >
> > -Jack
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to