Hi Guozhang, Thanks so much for replying, first of all.
Here is the config we have: group.id -> 'some unique id' zookeeper.connect -> 'zookeeper host' auto.commit.enabled -> false 'auto.offset.reset' -> largest consumer.timeout.ms -> -1 fetch.message.max.bytes -> 10M So it seems like we need to make sure the submitted future returns before performing action actions which eventually generate the message we expect. Cheers, -Jack On Mon, Apr 6, 2015 at 4:04 PM, Guozhang Wang <wangg...@gmail.com> wrote: > Jack, > > Your theory is correct if your consumer config set auto.offset.reset to > latest and you do not have any committed offsets before. Could you list > your consumer configs and see if that is the case? > > Guozhang > > On Mon, Apr 6, 2015 at 3:15 PM, Jack <jac...@gmail.com> wrote: > > > Hi folks, > > > > I have a quick question. > > > > We are using 0.8.1 and running into this weird problem. We are using > > HighLevelConsumer for this topic. We created 64 partitions for this > > message. > > > > In our service, we first create a Consumer object as usual, and then we > > went ahead, calls 'createMessageStreans' with Map('topic_name'->64). It > > returns us a Se[KafkaStream], For each stream object in the sequence, we > > submit a task like the following to the pool. > > > > threadpool.submit(new Runnable { > > override def run() = { > > stream.iterator().foreach { msg => ...} > > } > > } > > > > The problem we ran into is that after all the above established, any > > message showing up in kafka, we should be able to get it from consumer > > side. But in reality, for some reason, occasionally, we don't see these > > message (we do see these message in the log though). > > > > Some team members believe that the stream might get a later offset, thus > > not being able to see the earlier messages. > > > > I really doubt that statement and want to see if anyone could shed any > > light upon this? > > > > One possible theory from me is that the offset won't be given until > > stream.iterator().next is called, but since the task submission is > > asynchronous (we don't wait for each submission and then produce message > to > > kafka), that could get us a later offset, which might not contains the > > message we want). One possible solution to that is perform any action > which > > produce messages to kafka, after all these submitted tasks returns. > > > > Any thoughts? > > > > Thanks, > > > > -Jack > > > > > > -- > -- Guozhang >