That would be really useful. Thanks for your writing, Guozhang. I will give
it a shot and let you know.

On Tue, Apr 7, 2015 at 10:06 AM, Guozhang Wang <wangg...@gmail.com> wrote:

> Jack,
>
> Okay I see your point now. I was originally thinking that in each run, you
> 1) first create the topic, 2) start producing to the topic, 3) start
> consuming from the topic, and then 4) delete the topic, stop producers /
> consumers before complete, but it sounds like you actually only create the
> topic once.
>
> If that is the case and you always use a different group id, then yes with
> the current consumer you have to make sure that at the boundary of each
> run, when you stop the consumers you also have to halt the producers from
> continue producing until the starting of the next run. In the new consumer
> that we are currently developing, it allows you to specify the starting
> offset for your consumption and you could then do some offset check
> pointing outside Kafka on the consumer side and use the check pointed
> offsets when you resume in each run.
>
> You can find the new consumer's API here (check position() / seek()
> specifically) and let me know if you think that will work for your case.
>
>
> http://kafka.apache.org/083/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
>
> Guozhang
>
> On Mon, Apr 6, 2015 at 8:39 PM, Jack <jac...@gmail.com> wrote:
>
> > How about the first run then? If we use "largest" as "auto.offset.reset"
> > value, what value will these consumers get? I assume it will point to the
> > latest position in the log. Is that true? Just you know, we can't have a
> > warm up run so that the later runs can use the committed offset by that
> > run.
> >
> > To give you a little bit more context, for every run, we create a unique
> > group.id so essentially, we want the offset to point to a safe position
> so
> > that consumer won't miss any messages appended after that point. So is
> > there a way other than setting "auto.offset.reset" to "smallest" which we
> > know it works, but it took forever to get the data (since the log is
> long).
> >
> > Thanks again.
> >
> > -Jack
> >
> > On Mon, Apr 6, 2015 at 5:34 PM, Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> > > Did you turn on automatic offset committing? If yes then this issue
> > should
> > > not happen as later runs will just consume data from the last committed
> > > offset.
> > >
> > > Guozhang
> > >
> > > On Mon, Apr 6, 2015 at 5:16 PM, Jack <jac...@gmail.com> wrote:
> > >
> > > > Hi Guozhang,
> > > >
> > > > When I switched to auto.offset.reset to smallest, it will work.
> > However,
> > > it
> > > > will generate a lot of data and it will slow down the verification.
> > > >
> > > > Thanks,
> > > >
> > > > -Jack
> > > >
> > > > On Mon, Apr 6, 2015 at 5:07 PM, Guozhang Wang <wangg...@gmail.com>
> > > wrote:
> > > >
> > > > > Jack,
> > > > >
> > > > > Could you just change "auto.offset.reset" to smallest and see if
> this
> > > > issue
> > > > > goes away? It is not related to the producer end.
> > > > >
> > > > > Guozhang
> > > > >
> > > > > On Mon, Apr 6, 2015 at 4:14 PM, Jack <jac...@gmail.com> wrote:
> > > > >
> > > > > > Hi Guozhang,
> > > > > >
> > > > > > Thanks so much for replying, first of all.
> > > > > >
> > > > > > Here is the config we have:
> > > > > >
> > > > > > group.id -> 'some unique id'
> > > > > > zookeeper.connect -> 'zookeeper host'
> > > > > > auto.commit.enabled -> false
> > > > > > 'auto.offset.reset' -> largest
> > > > > > consumer.timeout.ms -> -1
> > > > > > fetch.message.max.bytes -> 10M
> > > > > >
> > > > > > So it seems like we need to make sure the submitted future
> returns
> > > > before
> > > > > > performing action actions which eventually generate the message
> we
> > > > > expect.
> > > > > >
> > > > > > Cheers,
> > > > > >
> > > > > > -Jack
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Mon, Apr 6, 2015 at 4:04 PM, Guozhang Wang <
> wangg...@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > Jack,
> > > > > > >
> > > > > > > Your theory is correct if your consumer config set
> > > auto.offset.reset
> > > > to
> > > > > > > latest and you do not have any committed offsets before. Could
> > you
> > > > list
> > > > > > > your consumer configs and see if that is the case?
> > > > > > >
> > > > > > > Guozhang
> > > > > > >
> > > > > > > On Mon, Apr 6, 2015 at 3:15 PM, Jack <jac...@gmail.com> wrote:
> > > > > > >
> > > > > > > > Hi folks,
> > > > > > > >
> > > > > > > > I have a quick question.
> > > > > > > >
> > > > > > > > We are using 0.8.1 and running into this weird problem. We
> are
> > > > using
> > > > > > > > HighLevelConsumer for this topic. We created 64 partitions
> for
> > > this
> > > > > > > > message.
> > > > > > > >
> > > > > > > > In our service, we first create a Consumer object as usual,
> and
> > > > then
> > > > > we
> > > > > > > > went ahead, calls 'createMessageStreans' with
> > > > Map('topic_name'->64).
> > > > > It
> > > > > > > > returns us a Se[KafkaStream], For each stream object in the
> > > > sequence,
> > > > > > we
> > > > > > > > submit a task like the following to the pool.
> > > > > > > >
> > > > > > > > threadpool.submit(new Runnable {
> > > > > > > >    override def run() = {
> > > > > > > >       stream.iterator().foreach { msg => ...}
> > > > > > > >   }
> > > > > > > > }
> > > > > > > >
> > > > > > > > The problem we ran into is that after all the above
> > established,
> > > > any
> > > > > > > > message showing up in kafka, we should be able to get it from
> > > > > consumer
> > > > > > > > side. But in reality, for some reason, occasionally, we don't
> > see
> > > > > these
> > > > > > > > message (we do see these message in the log though).
> > > > > > > >
> > > > > > > >  Some team members believe that the stream might get a later
> > > > offset,
> > > > > > thus
> > > > > > > > not being able to see the earlier messages.
> > > > > > > >
> > > > > > > > I really doubt that statement and want to see if anyone could
> > > shed
> > > > > any
> > > > > > > > light upon this?
> > > > > > > >
> > > > > > > > One possible theory from me is that the offset won't be given
> > > until
> > > > > > > > stream.iterator().next is called, but since the task
> submission
> > > is
> > > > > > > > asynchronous (we don't wait for each submission and then
> > produce
> > > > > > message
> > > > > > > to
> > > > > > > > kafka), that could get us a later offset, which might not
> > > contains
> > > > > the
> > > > > > > > message we want). One possible solution to that is perform
> any
> > > > action
> > > > > > > which
> > > > > > > > produce messages to kafka, after all these submitted tasks
> > > returns.
> > > > > > > >
> > > > > > > > Any thoughts?
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > >
> > > > > > > > -Jack
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > -- Guozhang
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to