Hi folks, I have a quick question.
We are using 0.8.1 and running into this weird problem. We are using HighLevelConsumer for this topic. We created 64 partitions for this message. In our service, we first create a Consumer object as usual, and then we went ahead, calls 'createMessageStreans' with Map('topic_name'->64). It returns us a Se[KafkaStream], For each stream object in the sequence, we submit a task like the following to the pool. threadpool.submit(new Runnable { override def run() = { stream.iterator().foreach { msg => ...} } } The problem we ran into is that after all the above established, any message showing up in kafka, we should be able to get it from consumer side. But in reality, for some reason, occasionally, we don't see these message (we do see these message in the log though). Some team members believe that the stream might get a later offset, thus not being able to see the earlier messages. I really doubt that statement and want to see if anyone could shed any light upon this? One possible theory from me is that the offset won't be given until stream.iterator().next is called, but since the task submission is asynchronous (we don't wait for each submission and then produce message to kafka), that could get us a later offset, which might not contains the message we want). One possible solution to that is perform any action which produce messages to kafka, after all these submitted tasks returns. Any thoughts? Thanks, -Jack