Guozhang

Sorry for joining the party a little late. I have been thinking about this
whole awkward behavior of having to call poll(0) to actually make the
underlying subscriptions take effect. Is the core reason for this design
the fact that poll is also the actual heartbeat and you want to make the
listener group assignments through poll - so that timeouts and
reassignments can all go through poll? So I think clubbing liveness with
poll (which in effect clubs consumer group assignments and hence metadata
fetch with poll) is the real cause of this design. Were there issues where
you were seeing active consumers not calling poll that led to this design
choice? I tried to look for a relevant JIRA but could not find one - can
you please point me to something if you have it handy?

Btw this would also means that your proposal to do the actual assignments
through seek might not be ideal since there can still be indefinite time
between seek and poll (just like between subscribe and poll) and the
consumer could timeout even before the first poll is called?


@Cody in your case if you really have only one consumer and it is going to
get all the partitions of the topic anyway - then you might as well
subscribe using "assign" call instead of "subscribe" call. That will make
at least your code cleaner and I do not think you are gaining anything with
the listener group functionality anyway?

- Mansi.



On Wed, Mar 9, 2016 at 8:35 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> In order to do anything meaningful with the consumer itself in rebalance
> callback (e.g. commit offset), you would need to hold on the consumer
> reference; admittedly it sounds a bit awkward, but by design we choose to
> not enforce it in the interface itself.
>
> Guozhang
>
> On Wed, Mar 9, 2016 at 3:39 PM, Cody Koeninger <c...@koeninger.org> wrote:
>
> > So what about my comments regarding the consumer rebalance listener
> > interface not providing access to a consumer?  I can probably work around
> > it, but it seems odd.
> > On Mar 9, 2016 5:04 PM, "Guozhang Wang" <wangg...@gmail.com> wrote:
> >
> > > One thing proposed by Jason:
> > >
> > > If you want to only reset offset upon initialization, and by
> > initialization
> > > you mean "no committed offset", you can do sth. like the following in
> > > rebalance callback.
> > >
> > >                 @Override
> > >
> > >                 public void
> > onPartitionsAssigned(Collection<TopicPartition>
> > > partitions) {
> > >
> > >                     for (TopicPartition partition : partitions)
> > >
> > >                         if (consumer.committed(partition) == null)
> > >
> > >                             consumer.seekToBeginning(partition);
> > >
> > >                 }
> > >
> > >
> > > Guozhang
> > >
> > > On Wed, Mar 9, 2016 at 2:11 PM, Guozhang Wang <wangg...@gmail.com>
> > wrote:
> > >
> > > > Filed https://issues.apache.org/jira/browse/KAFKA-3370.
> > > >
> > > > On Wed, Mar 9, 2016 at 1:11 PM, Cody Koeninger <c...@koeninger.org>
> > > wrote:
> > > >
> > > >> That sounds like an interesting way of addressing the problem, can
> > > >> continue further discussions on the JIRA
> > > >>
> > > >>
> > > >>
> > > >> On Wed, Mar 9, 2016 at 2:59 PM, Guozhang Wang <wangg...@gmail.com>
> > > wrote:
> > > >> > Cody:
> > > >> >
> > > >> > More specifically, you do not need the "listTopics" function if
> you
> > > >> already
> > > >> > know your subscribed topics, just use "partitionsFor" is
> sufficient.
> > > >> >
> > > >> > About the fix, I'm thinking of adding two more options in the
> > > >> > auto.offset.rest, say namely "earliest-on-start" and
> > > "latest-on-start",
> > > >> > which sets the reset position ONLY at starting up. The reason is
> > that
> > > >> the
> > > >> > seekToXX was actually not designed to do such initialization but
> for
> > > >> > calling during the lifetime of the consumer, and we'd better
> provide
> > > the
> > > >> > right solution to do so.
> > > >> >
> > > >> > I can file the JIRA right away and start further discussions
> there.
> > > But
> > > >> let
> > > >> > me know if you have any other ideas.
> > > >> >
> > > >> > Guozhang
> > > >> >
> > > >> > On Wed, Mar 9, 2016 at 12:25 PM, Cody Koeninger <
> c...@koeninger.org
> > >
> > > >> wrote:
> > > >> >
> > > >> >> Yeah, I think I understood what you were saying.  What I'm saying
> > is
> > > >> >> that if there were a way to just fetch metadata without doing the
> > > rest
> > > >> >> of the work poll() does, it wouldn't be necessary.  I guess I can
> > do
> > > >> >> listTopics to get all metadata for all topics and then parse it.
> > > >> >>
> > > >> >> Regarding running a single instance, that is the case for what
> I'm
> > > >> >> talking about.
> > > >> >>
> > > >> >> On Wed, Mar 9, 2016 at 2:02 PM, Guozhang Wang <
> wangg...@gmail.com>
> > > >> wrote:
> > > >> >> > Cody,
> > > >> >> >
> > > >> >> > What I meant for a special case of `seekToXX` is that, today
> when
> > > the
> > > >> >> > function is called with no partition parameters. It will try to
> > > >> execute
> > > >> >> the
> > > >> >> > logic on all "assigned" partitions for the consumer. And once
> > that
> > > is
> > > >> >> done,
> > > >> >> > the subsequent poll() will not throw the exception since it
> knows
> > > >> those
> > > >> >> > partitions needs to reset offsets.
> > > >> >> >
> > > >> >> > However for your case, there is no assigned partitions yet, and
> > > hence
> > > >> >> > `seekToXX` will not take effects on any partitions. The
> > assignment
> > > is
> > > >> >> > wrapped in the poll() call as you mentioned. And one way to
> solve
> > > it
> > > >> is
> > > >> >> to
> > > >> >> > let the `seekToXX()` with no parameters do the coordination and
> > get
> > > >> the
> > > >> >> > assigned partitions if there are any subscribed topics, so that
> > the
> > > >> >> > subsequent poll() will know those partitions need resetting
> > > offsets.
> > > >> Does
> > > >> >> > that make sense?
> > > >> >> >
> > > >> >> > As for now another way I can think of is to get the partition
> > info
> > > >> >> > beforehand and call `seekToBeginning` on all partitions. But
> that
> > > >> only
> > > >> >> > works if the consumer knows it is going to get all the
> partitions
> > > >> >> assigned
> > > >> >> > to itself (i.e. you are only running a single instance).
> > > >> >> >
> > > >> >> > Guozhang
> > > >> >> >
> > > >> >> >
> > > >> >> > On Wed, Mar 9, 2016 at 6:22 AM, Cody Koeninger <
> > c...@koeninger.org
> > > >
> > > >> >> wrote:
> > > >> >> >
> > > >> >> >> Another unfortunate thing about ConsumerRebalanceListener is
> > that
> > > in
> > > >> >> >> order to do meaningful work in the callback, you need a
> > reference
> > > to
> > > >> >> >> the consumer that called it.  But that reference isn't
> provided
> > to
> > > >> the
> > > >> >> >> callback, which means the listener implementation needs to
> hold
> > a
> > > >> >> >> reference to the consumer.  Seems like this makes it
> > unnecessarily
> > > >> >> >> awkward to serialize or provide a 0 arg constructor for the
> > > >> listener.
> > > >> >> >>
> > > >> >> >> On Wed, Mar 9, 2016 at 7:28 AM, Cody Koeninger <
> > > c...@koeninger.org>
> > > >> >> wrote:
> > > >> >> >> > I thought about ConsumerRebalanceListener, but seeking to
> the
> > > >> >> >> > beginning any time there's a rebalance for whatever reason
> is
> > > not
> > > >> >> >> > necessarily the same thing as seeking to the beginning
> before
> > > >> first
> > > >> >> >> > starting the consumer.
> > > >> >> >> >
> > > >> >> >> > On Wed, Mar 9, 2016 at 2:24 AM, Kamal C <
> > kamaltar...@gmail.com>
> > > >> >> wrote:
> > > >> >> >> >> Cody,
> > > >> >> >> >>
> > > >> >> >> >> Use ConsumerRebalanceListener to achieve that,
> > > >> >> >> >>
> > > >> >> >> >> ConsumerRebalanceListener listener = new
> > > >> ConsumerRebalanceListener()
> > > >> >> {
> > > >> >> >> >>
> > > >> >> >> >>             @Override
> > > >> >> >> >>             public void
> > > >> >> onPartitionsRevoked(Collection<TopicPartition>
> > > >> >> >> >> partitions) {
> > > >> >> >> >>             }
> > > >> >> >> >>
> > > >> >> >> >>             @Override
> > > >> >> >> >>             public void
> > > >> >> onPartitionsAssigned(Collection<TopicPartition>
> > > >> >> >> >> partitions) {
> > > >> >> >> >>
> >  consumer.seekToBeginning(partitions.toArray(new
> > > >> >> >> >> TopicPartition[0]));
> > > >> >> >> >>             }
> > > >> >> >> >>         };
> > > >> >> >> >>
> > > >> >> >> >> consumer.subscribe(topics, listener);
> > > >> >> >> >>
> > > >> >> >> >> On Wed, Mar 9, 2016 at 12:05 PM, Cody Koeninger <
> > > >> c...@koeninger.org>
> > > >> >> >> wrote:
> > > >> >> >> >>
> > > >> >> >> >>> That suggestion doesn't work, for pretty much the same
> > reason
> > > -
> > > >> at
> > > >> >> the
> > > >> >> >> >>> time poll is first called, there is no reset policy and no
> > > >> committed
> > > >> >> >> >>> offset, so NoOffsetForPartitionException is thrown
> > > >> >> >> >>>
> > > >> >> >> >>> I feel like the underlying problem isn't so much that
> > > seekToEnd
> > > >> >> needs
> > > >> >> >> >>> special case behavior.  It's more that  topic metadata
> > > fetches,
> > > >> >> >> >>> consumer position fetches, and message fetches are all
> > lumped
> > > >> >> together
> > > >> >> >> >>> under a single poll() call, with no way to do them
> > > individually
> > > >> if
> > > >> >> >> >>> necessary.
> > > >> >> >> >>>
> > > >> >> >> >>> What does "work" in this situation is to just catch the
> > > >> exception
> > > >> >> >> >>> (which leaves the consumer in a state where topics are
> > > >> assigned) and
> > > >> >> >> >>> then seek.  But that is not exactly an elegant interface.
> > > >> >> >> >>>
> > > >> >> >> >>>     consumer.subscribe(topics)
> > > >> >> >> >>>     try {
> > > >> >> >> >>>       consumer.poll(0)
> > > >> >> >> >>>     } catch {
> > > >> >> >> >>>       case x: Throwable =>
> > > >> >> >> >>>     }
> > > >> >> >> >>>     consumer.seekToBeginning()
> > > >> >> >> >>>     consumer.poll(0)
> > > >> >> >> >>>
> > > >> >> >> >>>
> > > >> >> >> >>>
> > > >> >> >> >>>
> > > >> >> >> >>> On Tue, Mar 8, 2016 at 11:22 PM, Guozhang Wang <
> > > >> wangg...@gmail.com>
> > > >> >> >> wrote:
> > > >> >> >> >>> > Hi Cody,
> > > >> >> >> >>> >
> > > >> >> >> >>> > The problem with that code is in `seekToBeginning()`
> > > followed
> > > >> by
> > > >> >> >> >>> > `subscribe(topic)`.
> > > >> >> >> >>> >
> > > >> >> >> >>> > Since `subscribe` call is lazy evaluated, by the time
> > > >> >> >> `seekToBeginning()`
> > > >> >> >> >>> > is called no partition is assigned yet, and hence it is
> > > >> >> effectively
> > > >> >> >> an
> > > >> >> >> >>> > no-op.
> > > >> >> >> >>> >
> > > >> >> >> >>> > Try
> > > >> >> >> >>> >
> > > >> >> >> >>> >     consumer.subscribe(topics)
> > > >> >> >> >>> >     consumer.poll(0);  // get assigned partitions
> > > >> >> >> >>> >     consumer.seekToBeginning()
> > > >> >> >> >>> >     consumer.poll(0)
> > > >> >> >> >>> >
> > > >> >> >> >>> > to see if that works.
> > > >> >> >> >>> >
> > > >> >> >> >>> > I think it is a valid issue that can be fixed in the new
> > > >> consumer
> > > >> >> >> that,
> > > >> >> >> >>> > upon calling seekToEnd/Beginning with no parameter,
> while
> > no
> > > >> >> >> assigned is
> > > >> >> >> >>> > done yet, do the coordination behind the scene; it will
> > > though
> > > >> >> >> change the
> > > >> >> >> >>> > behavior of the functions as they are no longer always
> > > lazily
> > > >> >> >> evaluated.
> > > >> >> >> >>> >
> > > >> >> >> >>> >
> > > >> >> >> >>> > Guozhang
> > > >> >> >> >>> >
> > > >> >> >> >>> >
> > > >> >> >> >>> > On Tue, Mar 8, 2016 at 2:08 PM, Cody Koeninger <
> > > >> >> c...@koeninger.org>
> > > >> >> >> >>> wrote:
> > > >> >> >> >>> >
> > > >> >> >> >>> >> Using the 0.9 consumer, I would like to start consuming
> > at
> > > >> the
> > > >> >> >> >>> >> beginning or end, without specifying auto.offset.reset.
> > > >> >> >> >>> >>
> > > >> >> >> >>> >> This does not seem to be possible:
> > > >> >> >> >>> >>
> > > >> >> >> >>> >>     val kafkaParams = Map[String, Object](
> > > >> >> >> >>> >>       "bootstrap.servers" ->
> > > conf.getString("kafka.brokers"),
> > > >> >> >> >>> >>       "key.deserializer" ->
> classOf[StringDeserializer],
> > > >> >> >> >>> >>       "value.deserializer" ->
> > classOf[StringDeserializer],
> > > >> >> >> >>> >>       "group.id" -> "example",
> > > >> >> >> >>> >>       "auto.offset.reset" -> "none"
> > > >> >> >> >>> >>     ).asJava
> > > >> >> >> >>> >>     val topics =
> > > >> >> >> conf.getString("kafka.topics").split(",").toList.asJava
> > > >> >> >> >>> >>     val consumer = new KafkaConsumer[String,
> > > >> String](kafkaParams)
> > > >> >> >> >>> >>     consumer.subscribe(topics)
> > > >> >> >> >>> >>     consumer.seekToBeginning()
> > > >> >> >> >>> >>     consumer.poll(0)
> > > >> >> >> >>> >>
> > > >> >> >> >>> >>
> > > >> >> >> >>> >> Results in:
> > > >> >> >> >>> >>
> > > >> >> >> >>> >> Exception in thread "main"
> > > >> >> >> >>> >>
> > > >> org.apache.kafka.clients.consumer.NoOffsetForPartitionException:
> > > >> >> >> >>> >> Undefined offset with no reset policy for partition:
> > > >> testtwo-4
> > > >> >> >> >>> >>         at
> > > >> >> >> >>> >>
> > > >> >> >> >>>
> > > >> >> >>
> > > >> >>
> > > >>
> > >
> >
> org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:288)
> > > >> >> >> >>> >>         at
> > > >> >> >> >>> >>
> > > >> >> >> >>>
> > > >> >> >>
> > > >> >>
> > > >>
> > >
> >
> org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:167)
> > > >> >> >> >>> >>         at
> > > >> >> >> >>> >>
> > > >> >> >> >>>
> > > >> >> >>
> > > >> >>
> > > >>
> > >
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1302)
> > > >> >> >> >>> >>         at
> > > >> >> >> >>> >>
> > > >> >> >> >>>
> > > >> >> >>
> > > >> >>
> > > >>
> > >
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:895)
> > > >> >> >> >>> >>         at
> > > >> >> >> >>> >>
> > > >> >> >> >>>
> > > >> >> >>
> > > >> >>
> > > >>
> > >
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
> > > >> >> >> >>> >>         at
> > > >> >> >> example.BasicKafkaConsumer$.main(BasicKafkaConsumer.scala:25)
> > > >> >> >> >>> >>
> > > >> >> >> >>> >>
> > > >> >> >> >>> >> I'm assuming this is because, at the time
> > seekToBeginning()
> > > >> is
> > > >> >> >> called,
> > > >> >> >> >>> >> subscriptions.assignedPartitions isn't populated.  But
> > > >> polling in
> > > >> >> >> >>> >> order to assign topicpartitions results in an error,
> > which
> > > >> >> creates a
> > > >> >> >> >>> >> chicken-or-the-egg situation.
> > > >> >> >> >>> >>
> > > >> >> >> >>> >> I don't want to set auto.offset.reset, because I want a
> > > hard
> > > >> >> error
> > > >> >> >> if
> > > >> >> >> >>> >> the offsets are out of range at any other time during
> > > >> >> consumption.
> > > >> >> >> >>> >>
> > > >> >> >> >>> >
> > > >> >> >> >>> >
> > > >> >> >> >>> >
> > > >> >> >> >>> > --
> > > >> >> >> >>> > -- Guozhang
> > > >> >> >> >>>
> > > >> >> >>
> > > >> >> >
> > > >> >> >
> > > >> >> >
> > > >> >> > --
> > > >> >> > -- Guozhang
> > > >> >>
> > > >> >
> > > >> >
> > > >> >
> > > >> > --
> > > >> > -- Guozhang
> > > >>
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to