That would be really useful. Thanks for your writing, Guozhang. I will give it a shot and let you know.
On Tue, Apr 7, 2015 at 10:06 AM, Guozhang Wang <wangg...@gmail.com> wrote: > Jack, > > Okay I see your point now. I was originally thinking that in each run, you > 1) first create the topic, 2) start producing to the topic, 3) start > consuming from the topic, and then 4) delete the topic, stop producers / > consumers before complete, but it sounds like you actually only create the > topic once. > > If that is the case and you always use a different group id, then yes with > the current consumer you have to make sure that at the boundary of each > run, when you stop the consumers you also have to halt the producers from > continue producing until the starting of the next run. In the new consumer > that we are currently developing, it allows you to specify the starting > offset for your consumption and you could then do some offset check > pointing outside Kafka on the consumer side and use the check pointed > offsets when you resume in each run. > > You can find the new consumer's API here (check position() / seek() > specifically) and let me know if you think that will work for your case. > > > http://kafka.apache.org/083/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html > > Guozhang > > On Mon, Apr 6, 2015 at 8:39 PM, Jack <jac...@gmail.com> wrote: > > > How about the first run then? If we use "largest" as "auto.offset.reset" > > value, what value will these consumers get? I assume it will point to the > > latest position in the log. Is that true? Just you know, we can't have a > > warm up run so that the later runs can use the committed offset by that > > run. > > > > To give you a little bit more context, for every run, we create a unique > > group.id so essentially, we want the offset to point to a safe position > so > > that consumer won't miss any messages appended after that point. So is > > there a way other than setting "auto.offset.reset" to "smallest" which we > > know it works, but it took forever to get the data (since the log is > long). > > > > Thanks again. > > > > -Jack > > > > On Mon, Apr 6, 2015 at 5:34 PM, Guozhang Wang <wangg...@gmail.com> > wrote: > > > > > Did you turn on automatic offset committing? If yes then this issue > > should > > > not happen as later runs will just consume data from the last committed > > > offset. > > > > > > Guozhang > > > > > > On Mon, Apr 6, 2015 at 5:16 PM, Jack <jac...@gmail.com> wrote: > > > > > > > Hi Guozhang, > > > > > > > > When I switched to auto.offset.reset to smallest, it will work. > > However, > > > it > > > > will generate a lot of data and it will slow down the verification. > > > > > > > > Thanks, > > > > > > > > -Jack > > > > > > > > On Mon, Apr 6, 2015 at 5:07 PM, Guozhang Wang <wangg...@gmail.com> > > > wrote: > > > > > > > > > Jack, > > > > > > > > > > Could you just change "auto.offset.reset" to smallest and see if > this > > > > issue > > > > > goes away? It is not related to the producer end. > > > > > > > > > > Guozhang > > > > > > > > > > On Mon, Apr 6, 2015 at 4:14 PM, Jack <jac...@gmail.com> wrote: > > > > > > > > > > > Hi Guozhang, > > > > > > > > > > > > Thanks so much for replying, first of all. > > > > > > > > > > > > Here is the config we have: > > > > > > > > > > > > group.id -> 'some unique id' > > > > > > zookeeper.connect -> 'zookeeper host' > > > > > > auto.commit.enabled -> false > > > > > > 'auto.offset.reset' -> largest > > > > > > consumer.timeout.ms -> -1 > > > > > > fetch.message.max.bytes -> 10M > > > > > > > > > > > > So it seems like we need to make sure the submitted future > returns > > > > before > > > > > > performing action actions which eventually generate the message > we > > > > > expect. > > > > > > > > > > > > Cheers, > > > > > > > > > > > > -Jack > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Apr 6, 2015 at 4:04 PM, Guozhang Wang < > wangg...@gmail.com> > > > > > wrote: > > > > > > > > > > > > > Jack, > > > > > > > > > > > > > > Your theory is correct if your consumer config set > > > auto.offset.reset > > > > to > > > > > > > latest and you do not have any committed offsets before. Could > > you > > > > list > > > > > > > your consumer configs and see if that is the case? > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > On Mon, Apr 6, 2015 at 3:15 PM, Jack <jac...@gmail.com> wrote: > > > > > > > > > > > > > > > Hi folks, > > > > > > > > > > > > > > > > I have a quick question. > > > > > > > > > > > > > > > > We are using 0.8.1 and running into this weird problem. We > are > > > > using > > > > > > > > HighLevelConsumer for this topic. We created 64 partitions > for > > > this > > > > > > > > message. > > > > > > > > > > > > > > > > In our service, we first create a Consumer object as usual, > and > > > > then > > > > > we > > > > > > > > went ahead, calls 'createMessageStreans' with > > > > Map('topic_name'->64). > > > > > It > > > > > > > > returns us a Se[KafkaStream], For each stream object in the > > > > sequence, > > > > > > we > > > > > > > > submit a task like the following to the pool. > > > > > > > > > > > > > > > > threadpool.submit(new Runnable { > > > > > > > > override def run() = { > > > > > > > > stream.iterator().foreach { msg => ...} > > > > > > > > } > > > > > > > > } > > > > > > > > > > > > > > > > The problem we ran into is that after all the above > > established, > > > > any > > > > > > > > message showing up in kafka, we should be able to get it from > > > > > consumer > > > > > > > > side. But in reality, for some reason, occasionally, we don't > > see > > > > > these > > > > > > > > message (we do see these message in the log though). > > > > > > > > > > > > > > > > Some team members believe that the stream might get a later > > > > offset, > > > > > > thus > > > > > > > > not being able to see the earlier messages. > > > > > > > > > > > > > > > > I really doubt that statement and want to see if anyone could > > > shed > > > > > any > > > > > > > > light upon this? > > > > > > > > > > > > > > > > One possible theory from me is that the offset won't be given > > > until > > > > > > > > stream.iterator().next is called, but since the task > submission > > > is > > > > > > > > asynchronous (we don't wait for each submission and then > > produce > > > > > > message > > > > > > > to > > > > > > > > kafka), that could get us a later offset, which might not > > > contains > > > > > the > > > > > > > > message we want). One possible solution to that is perform > any > > > > action > > > > > > > which > > > > > > > > produce messages to kafka, after all these submitted tasks > > > returns. > > > > > > > > > > > > > > > > Any thoughts? > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > -Jack > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > > > > -- > -- Guozhang >