How about the first run then? If we use "largest" as "auto.offset.reset" value, what value will these consumers get? I assume it will point to the latest position in the log. Is that true? Just you know, we can't have a warm up run so that the later runs can use the committed offset by that run.
To give you a little bit more context, for every run, we create a unique group.id so essentially, we want the offset to point to a safe position so that consumer won't miss any messages appended after that point. So is there a way other than setting "auto.offset.reset" to "smallest" which we know it works, but it took forever to get the data (since the log is long). Thanks again. -Jack On Mon, Apr 6, 2015 at 5:34 PM, Guozhang Wang <wangg...@gmail.com> wrote: > Did you turn on automatic offset committing? If yes then this issue should > not happen as later runs will just consume data from the last committed > offset. > > Guozhang > > On Mon, Apr 6, 2015 at 5:16 PM, Jack <jac...@gmail.com> wrote: > > > Hi Guozhang, > > > > When I switched to auto.offset.reset to smallest, it will work. However, > it > > will generate a lot of data and it will slow down the verification. > > > > Thanks, > > > > -Jack > > > > On Mon, Apr 6, 2015 at 5:07 PM, Guozhang Wang <wangg...@gmail.com> > wrote: > > > > > Jack, > > > > > > Could you just change "auto.offset.reset" to smallest and see if this > > issue > > > goes away? It is not related to the producer end. > > > > > > Guozhang > > > > > > On Mon, Apr 6, 2015 at 4:14 PM, Jack <jac...@gmail.com> wrote: > > > > > > > Hi Guozhang, > > > > > > > > Thanks so much for replying, first of all. > > > > > > > > Here is the config we have: > > > > > > > > group.id -> 'some unique id' > > > > zookeeper.connect -> 'zookeeper host' > > > > auto.commit.enabled -> false > > > > 'auto.offset.reset' -> largest > > > > consumer.timeout.ms -> -1 > > > > fetch.message.max.bytes -> 10M > > > > > > > > So it seems like we need to make sure the submitted future returns > > before > > > > performing action actions which eventually generate the message we > > > expect. > > > > > > > > Cheers, > > > > > > > > -Jack > > > > > > > > > > > > > > > > On Mon, Apr 6, 2015 at 4:04 PM, Guozhang Wang <wangg...@gmail.com> > > > wrote: > > > > > > > > > Jack, > > > > > > > > > > Your theory is correct if your consumer config set > auto.offset.reset > > to > > > > > latest and you do not have any committed offsets before. Could you > > list > > > > > your consumer configs and see if that is the case? > > > > > > > > > > Guozhang > > > > > > > > > > On Mon, Apr 6, 2015 at 3:15 PM, Jack <jac...@gmail.com> wrote: > > > > > > > > > > > Hi folks, > > > > > > > > > > > > I have a quick question. > > > > > > > > > > > > We are using 0.8.1 and running into this weird problem. We are > > using > > > > > > HighLevelConsumer for this topic. We created 64 partitions for > this > > > > > > message. > > > > > > > > > > > > In our service, we first create a Consumer object as usual, and > > then > > > we > > > > > > went ahead, calls 'createMessageStreans' with > > Map('topic_name'->64). > > > It > > > > > > returns us a Se[KafkaStream], For each stream object in the > > sequence, > > > > we > > > > > > submit a task like the following to the pool. > > > > > > > > > > > > threadpool.submit(new Runnable { > > > > > > override def run() = { > > > > > > stream.iterator().foreach { msg => ...} > > > > > > } > > > > > > } > > > > > > > > > > > > The problem we ran into is that after all the above established, > > any > > > > > > message showing up in kafka, we should be able to get it from > > > consumer > > > > > > side. But in reality, for some reason, occasionally, we don't see > > > these > > > > > > message (we do see these message in the log though). > > > > > > > > > > > > Some team members believe that the stream might get a later > > offset, > > > > thus > > > > > > not being able to see the earlier messages. > > > > > > > > > > > > I really doubt that statement and want to see if anyone could > shed > > > any > > > > > > light upon this? > > > > > > > > > > > > One possible theory from me is that the offset won't be given > until > > > > > > stream.iterator().next is called, but since the task submission > is > > > > > > asynchronous (we don't wait for each submission and then produce > > > > message > > > > > to > > > > > > kafka), that could get us a later offset, which might not > contains > > > the > > > > > > message we want). One possible solution to that is perform any > > action > > > > > which > > > > > > produce messages to kafka, after all these submitted tasks > returns. > > > > > > > > > > > > Any thoughts? > > > > > > > > > > > > Thanks, > > > > > > > > > > > > -Jack > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > > > > -- > -- Guozhang >