Jack,

Your theory is correct if your consumer config set auto.offset.reset to
latest and you do not have any committed offsets before. Could you list
your consumer configs and see if that is the case?

Guozhang

On Mon, Apr 6, 2015 at 3:15 PM, Jack <jac...@gmail.com> wrote:

> Hi folks,
>
> I have a quick question.
>
> We are using 0.8.1 and running into this weird problem. We are using
> HighLevelConsumer for this topic. We created 64 partitions for this
> message.
>
> In our service, we first create a Consumer object as usual, and then we
> went ahead, calls 'createMessageStreans' with Map('topic_name'->64). It
> returns us a Se[KafkaStream], For each stream object in the sequence, we
> submit a task like the following to the pool.
>
> threadpool.submit(new Runnable {
>    override def run() = {
>       stream.iterator().foreach { msg => ...}
>   }
> }
>
> The problem we ran into is that after all the above established, any
> message showing up in kafka, we should be able to get it from consumer
> side. But in reality, for some reason, occasionally, we don't see these
> message (we do see these message in the log though).
>
>  Some team members believe that the stream might get a later offset, thus
> not being able to see the earlier messages.
>
> I really doubt that statement and want to see if anyone could shed any
> light upon this?
>
> One possible theory from me is that the offset won't be given until
> stream.iterator().next is called, but since the task submission is
> asynchronous (we don't wait for each submission and then produce message to
> kafka), that could get us a later offset, which might not contains the
> message we want). One possible solution to that is perform any action which
> produce messages to kafka, after all these submitted tasks returns.
>
> Any thoughts?
>
> Thanks,
>
> -Jack
>



-- 
-- Guozhang

Reply via email to