Hi folks,

I have a quick question.

We are using 0.8.1 and running into this weird problem. We are using
HighLevelConsumer for this topic. We created 64 partitions for this
message.

In our service, we first create a Consumer object as usual, and then we
went ahead, calls 'createMessageStreans' with Map('topic_name'->64). It
returns us a Se[KafkaStream], For each stream object in the sequence, we
submit a task like the following to the pool.

threadpool.submit(new Runnable {
   override def run() = {
      stream.iterator().foreach { msg => ...}
  }
}

The problem we ran into is that after all the above established, any
message showing up in kafka, we should be able to get it from consumer
side. But in reality, for some reason, occasionally, we don't see these
message (we do see these message in the log though).

 Some team members believe that the stream might get a later offset, thus
not being able to see the earlier messages.

I really doubt that statement and want to see if anyone could shed any
light upon this?

One possible theory from me is that the offset won't be given until
stream.iterator().next is called, but since the task submission is
asynchronous (we don't wait for each submission and then produce message to
kafka), that could get us a later offset, which might not contains the
message we want). One possible solution to that is perform any action which
produce messages to kafka, after all these submitted tasks returns.

Any thoughts?

Thanks,

-Jack

Reply via email to