I second the need for having a consumer context passed to rebalance
callback. I have ran into issues several times because of that.

About - subscribe vs assign - I have not read through your spark code yet
(will do by eod), so I am not sure what you mean (other than I do agree
that new partitions should be consumed automatically). I guess we can
continue this discussion on the spark list then :-)

Thanks
Mansi.

On Thu, Mar 10, 2016 at 7:43 AM, Cody Koeninger <c...@koeninger.org> wrote:

> Mansi, I'd agree that the fact that everything is tied up in poll
> seems like the source of the awkward behavior.
>
> Regarding assign vs subscribe, most people using the spark integration
> are just going to want to provide a topic name, not go figure out a
> bunch of partitions.  They're also going to be surprised if things
> suddenly blow up once a partition is added, or that partition doesn't
> start being consumed (we already have that second issue today).
>
> Thats why separating the behavior of auto offset reset seems like the
> best idea I've heard so far.
>
> Consumer rebalance listeners are still probably going to be necessary
> for people who are storing offsets externally.
>
> On Thu, Mar 10, 2016 at 9:27 AM, Mansi Shah <mansis...@maprtech.com>
> wrote:
> > Guozhang
> >
> > Sorry for joining the party a little late. I have been thinking about
> this
> > whole awkward behavior of having to call poll(0) to actually make the
> > underlying subscriptions take effect. Is the core reason for this design
> > the fact that poll is also the actual heartbeat and you want to make the
> > listener group assignments through poll - so that timeouts and
> > reassignments can all go through poll? So I think clubbing liveness with
> > poll (which in effect clubs consumer group assignments and hence metadata
> > fetch with poll) is the real cause of this design. Were there issues
> where
> > you were seeing active consumers not calling poll that led to this design
> > choice? I tried to look for a relevant JIRA but could not find one - can
> > you please point me to something if you have it handy?
> >
> > Btw this would also means that your proposal to do the actual assignments
> > through seek might not be ideal since there can still be indefinite time
> > between seek and poll (just like between subscribe and poll) and the
> > consumer could timeout even before the first poll is called?
> >
> >
> > @Cody in your case if you really have only one consumer and it is going
> to
> > get all the partitions of the topic anyway - then you might as well
> > subscribe using "assign" call instead of "subscribe" call. That will make
> > at least your code cleaner and I do not think you are gaining anything
> with
> > the listener group functionality anyway?
> >
> > - Mansi.
> >
> >
> >
> > On Wed, Mar 9, 2016 at 8:35 PM, Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> >> In order to do anything meaningful with the consumer itself in rebalance
> >> callback (e.g. commit offset), you would need to hold on the consumer
> >> reference; admittedly it sounds a bit awkward, but by design we choose
> to
> >> not enforce it in the interface itself.
> >>
> >> Guozhang
> >>
> >> On Wed, Mar 9, 2016 at 3:39 PM, Cody Koeninger <c...@koeninger.org>
> wrote:
> >>
> >> > So what about my comments regarding the consumer rebalance listener
> >> > interface not providing access to a consumer?  I can probably work
> around
> >> > it, but it seems odd.
> >> > On Mar 9, 2016 5:04 PM, "Guozhang Wang" <wangg...@gmail.com> wrote:
> >> >
> >> > > One thing proposed by Jason:
> >> > >
> >> > > If you want to only reset offset upon initialization, and by
> >> > initialization
> >> > > you mean "no committed offset", you can do sth. like the following
> in
> >> > > rebalance callback.
> >> > >
> >> > >                 @Override
> >> > >
> >> > >                 public void
> >> > onPartitionsAssigned(Collection<TopicPartition>
> >> > > partitions) {
> >> > >
> >> > >                     for (TopicPartition partition : partitions)
> >> > >
> >> > >                         if (consumer.committed(partition) == null)
> >> > >
> >> > >                             consumer.seekToBeginning(partition);
> >> > >
> >> > >                 }
> >> > >
> >> > >
> >> > > Guozhang
> >> > >
> >> > > On Wed, Mar 9, 2016 at 2:11 PM, Guozhang Wang <wangg...@gmail.com>
> >> > wrote:
> >> > >
> >> > > > Filed https://issues.apache.org/jira/browse/KAFKA-3370.
> >> > > >
> >> > > > On Wed, Mar 9, 2016 at 1:11 PM, Cody Koeninger <
> c...@koeninger.org>
> >> > > wrote:
> >> > > >
> >> > > >> That sounds like an interesting way of addressing the problem,
> can
> >> > > >> continue further discussions on the JIRA
> >> > > >>
> >> > > >>
> >> > > >>
> >> > > >> On Wed, Mar 9, 2016 at 2:59 PM, Guozhang Wang <
> wangg...@gmail.com>
> >> > > wrote:
> >> > > >> > Cody:
> >> > > >> >
> >> > > >> > More specifically, you do not need the "listTopics" function if
> >> you
> >> > > >> already
> >> > > >> > know your subscribed topics, just use "partitionsFor" is
> >> sufficient.
> >> > > >> >
> >> > > >> > About the fix, I'm thinking of adding two more options in the
> >> > > >> > auto.offset.rest, say namely "earliest-on-start" and
> >> > > "latest-on-start",
> >> > > >> > which sets the reset position ONLY at starting up. The reason
> is
> >> > that
> >> > > >> the
> >> > > >> > seekToXX was actually not designed to do such initialization
> but
> >> for
> >> > > >> > calling during the lifetime of the consumer, and we'd better
> >> provide
> >> > > the
> >> > > >> > right solution to do so.
> >> > > >> >
> >> > > >> > I can file the JIRA right away and start further discussions
> >> there.
> >> > > But
> >> > > >> let
> >> > > >> > me know if you have any other ideas.
> >> > > >> >
> >> > > >> > Guozhang
> >> > > >> >
> >> > > >> > On Wed, Mar 9, 2016 at 12:25 PM, Cody Koeninger <
> >> c...@koeninger.org
> >> > >
> >> > > >> wrote:
> >> > > >> >
> >> > > >> >> Yeah, I think I understood what you were saying.  What I'm
> saying
> >> > is
> >> > > >> >> that if there were a way to just fetch metadata without doing
> the
> >> > > rest
> >> > > >> >> of the work poll() does, it wouldn't be necessary.  I guess I
> can
> >> > do
> >> > > >> >> listTopics to get all metadata for all topics and then parse
> it.
> >> > > >> >>
> >> > > >> >> Regarding running a single instance, that is the case for what
> >> I'm
> >> > > >> >> talking about.
> >> > > >> >>
> >> > > >> >> On Wed, Mar 9, 2016 at 2:02 PM, Guozhang Wang <
> >> wangg...@gmail.com>
> >> > > >> wrote:
> >> > > >> >> > Cody,
> >> > > >> >> >
> >> > > >> >> > What I meant for a special case of `seekToXX` is that, today
> >> when
> >> > > the
> >> > > >> >> > function is called with no partition parameters. It will
> try to
> >> > > >> execute
> >> > > >> >> the
> >> > > >> >> > logic on all "assigned" partitions for the consumer. And
> once
> >> > that
> >> > > is
> >> > > >> >> done,
> >> > > >> >> > the subsequent poll() will not throw the exception since it
> >> knows
> >> > > >> those
> >> > > >> >> > partitions needs to reset offsets.
> >> > > >> >> >
> >> > > >> >> > However for your case, there is no assigned partitions yet,
> and
> >> > > hence
> >> > > >> >> > `seekToXX` will not take effects on any partitions. The
> >> > assignment
> >> > > is
> >> > > >> >> > wrapped in the poll() call as you mentioned. And one way to
> >> solve
> >> > > it
> >> > > >> is
> >> > > >> >> to
> >> > > >> >> > let the `seekToXX()` with no parameters do the coordination
> and
> >> > get
> >> > > >> the
> >> > > >> >> > assigned partitions if there are any subscribed topics, so
> that
> >> > the
> >> > > >> >> > subsequent poll() will know those partitions need resetting
> >> > > offsets.
> >> > > >> Does
> >> > > >> >> > that make sense?
> >> > > >> >> >
> >> > > >> >> > As for now another way I can think of is to get the
> partition
> >> > info
> >> > > >> >> > beforehand and call `seekToBeginning` on all partitions. But
> >> that
> >> > > >> only
> >> > > >> >> > works if the consumer knows it is going to get all the
> >> partitions
> >> > > >> >> assigned
> >> > > >> >> > to itself (i.e. you are only running a single instance).
> >> > > >> >> >
> >> > > >> >> > Guozhang
> >> > > >> >> >
> >> > > >> >> >
> >> > > >> >> > On Wed, Mar 9, 2016 at 6:22 AM, Cody Koeninger <
> >> > c...@koeninger.org
> >> > > >
> >> > > >> >> wrote:
> >> > > >> >> >
> >> > > >> >> >> Another unfortunate thing about ConsumerRebalanceListener
> is
> >> > that
> >> > > in
> >> > > >> >> >> order to do meaningful work in the callback, you need a
> >> > reference
> >> > > to
> >> > > >> >> >> the consumer that called it.  But that reference isn't
> >> provided
> >> > to
> >> > > >> the
> >> > > >> >> >> callback, which means the listener implementation needs to
> >> hold
> >> > a
> >> > > >> >> >> reference to the consumer.  Seems like this makes it
> >> > unnecessarily
> >> > > >> >> >> awkward to serialize or provide a 0 arg constructor for the
> >> > > >> listener.
> >> > > >> >> >>
> >> > > >> >> >> On Wed, Mar 9, 2016 at 7:28 AM, Cody Koeninger <
> >> > > c...@koeninger.org>
> >> > > >> >> wrote:
> >> > > >> >> >> > I thought about ConsumerRebalanceListener, but seeking to
> >> the
> >> > > >> >> >> > beginning any time there's a rebalance for whatever
> reason
> >> is
> >> > > not
> >> > > >> >> >> > necessarily the same thing as seeking to the beginning
> >> before
> >> > > >> first
> >> > > >> >> >> > starting the consumer.
> >> > > >> >> >> >
> >> > > >> >> >> > On Wed, Mar 9, 2016 at 2:24 AM, Kamal C <
> >> > kamaltar...@gmail.com>
> >> > > >> >> wrote:
> >> > > >> >> >> >> Cody,
> >> > > >> >> >> >>
> >> > > >> >> >> >> Use ConsumerRebalanceListener to achieve that,
> >> > > >> >> >> >>
> >> > > >> >> >> >> ConsumerRebalanceListener listener = new
> >> > > >> ConsumerRebalanceListener()
> >> > > >> >> {
> >> > > >> >> >> >>
> >> > > >> >> >> >>             @Override
> >> > > >> >> >> >>             public void
> >> > > >> >> onPartitionsRevoked(Collection<TopicPartition>
> >> > > >> >> >> >> partitions) {
> >> > > >> >> >> >>             }
> >> > > >> >> >> >>
> >> > > >> >> >> >>             @Override
> >> > > >> >> >> >>             public void
> >> > > >> >> onPartitionsAssigned(Collection<TopicPartition>
> >> > > >> >> >> >> partitions) {
> >> > > >> >> >> >>
> >> >  consumer.seekToBeginning(partitions.toArray(new
> >> > > >> >> >> >> TopicPartition[0]));
> >> > > >> >> >> >>             }
> >> > > >> >> >> >>         };
> >> > > >> >> >> >>
> >> > > >> >> >> >> consumer.subscribe(topics, listener);
> >> > > >> >> >> >>
> >> > > >> >> >> >> On Wed, Mar 9, 2016 at 12:05 PM, Cody Koeninger <
> >> > > >> c...@koeninger.org>
> >> > > >> >> >> wrote:
> >> > > >> >> >> >>
> >> > > >> >> >> >>> That suggestion doesn't work, for pretty much the same
> >> > reason
> >> > > -
> >> > > >> at
> >> > > >> >> the
> >> > > >> >> >> >>> time poll is first called, there is no reset policy
> and no
> >> > > >> committed
> >> > > >> >> >> >>> offset, so NoOffsetForPartitionException is thrown
> >> > > >> >> >> >>>
> >> > > >> >> >> >>> I feel like the underlying problem isn't so much that
> >> > > seekToEnd
> >> > > >> >> needs
> >> > > >> >> >> >>> special case behavior.  It's more that  topic metadata
> >> > > fetches,
> >> > > >> >> >> >>> consumer position fetches, and message fetches are all
> >> > lumped
> >> > > >> >> together
> >> > > >> >> >> >>> under a single poll() call, with no way to do them
> >> > > individually
> >> > > >> if
> >> > > >> >> >> >>> necessary.
> >> > > >> >> >> >>>
> >> > > >> >> >> >>> What does "work" in this situation is to just catch the
> >> > > >> exception
> >> > > >> >> >> >>> (which leaves the consumer in a state where topics are
> >> > > >> assigned) and
> >> > > >> >> >> >>> then seek.  But that is not exactly an elegant
> interface.
> >> > > >> >> >> >>>
> >> > > >> >> >> >>>     consumer.subscribe(topics)
> >> > > >> >> >> >>>     try {
> >> > > >> >> >> >>>       consumer.poll(0)
> >> > > >> >> >> >>>     } catch {
> >> > > >> >> >> >>>       case x: Throwable =>
> >> > > >> >> >> >>>     }
> >> > > >> >> >> >>>     consumer.seekToBeginning()
> >> > > >> >> >> >>>     consumer.poll(0)
> >> > > >> >> >> >>>
> >> > > >> >> >> >>>
> >> > > >> >> >> >>>
> >> > > >> >> >> >>>
> >> > > >> >> >> >>> On Tue, Mar 8, 2016 at 11:22 PM, Guozhang Wang <
> >> > > >> wangg...@gmail.com>
> >> > > >> >> >> wrote:
> >> > > >> >> >> >>> > Hi Cody,
> >> > > >> >> >> >>> >
> >> > > >> >> >> >>> > The problem with that code is in `seekToBeginning()`
> >> > > followed
> >> > > >> by
> >> > > >> >> >> >>> > `subscribe(topic)`.
> >> > > >> >> >> >>> >
> >> > > >> >> >> >>> > Since `subscribe` call is lazy evaluated, by the time
> >> > > >> >> >> `seekToBeginning()`
> >> > > >> >> >> >>> > is called no partition is assigned yet, and hence it
> is
> >> > > >> >> effectively
> >> > > >> >> >> an
> >> > > >> >> >> >>> > no-op.
> >> > > >> >> >> >>> >
> >> > > >> >> >> >>> > Try
> >> > > >> >> >> >>> >
> >> > > >> >> >> >>> >     consumer.subscribe(topics)
> >> > > >> >> >> >>> >     consumer.poll(0);  // get assigned partitions
> >> > > >> >> >> >>> >     consumer.seekToBeginning()
> >> > > >> >> >> >>> >     consumer.poll(0)
> >> > > >> >> >> >>> >
> >> > > >> >> >> >>> > to see if that works.
> >> > > >> >> >> >>> >
> >> > > >> >> >> >>> > I think it is a valid issue that can be fixed in the
> new
> >> > > >> consumer
> >> > > >> >> >> that,
> >> > > >> >> >> >>> > upon calling seekToEnd/Beginning with no parameter,
> >> while
> >> > no
> >> > > >> >> >> assigned is
> >> > > >> >> >> >>> > done yet, do the coordination behind the scene; it
> will
> >> > > though
> >> > > >> >> >> change the
> >> > > >> >> >> >>> > behavior of the functions as they are no longer
> always
> >> > > lazily
> >> > > >> >> >> evaluated.
> >> > > >> >> >> >>> >
> >> > > >> >> >> >>> >
> >> > > >> >> >> >>> > Guozhang
> >> > > >> >> >> >>> >
> >> > > >> >> >> >>> >
> >> > > >> >> >> >>> > On Tue, Mar 8, 2016 at 2:08 PM, Cody Koeninger <
> >> > > >> >> c...@koeninger.org>
> >> > > >> >> >> >>> wrote:
> >> > > >> >> >> >>> >
> >> > > >> >> >> >>> >> Using the 0.9 consumer, I would like to start
> consuming
> >> > at
> >> > > >> the
> >> > > >> >> >> >>> >> beginning or end, without specifying
> auto.offset.reset.
> >> > > >> >> >> >>> >>
> >> > > >> >> >> >>> >> This does not seem to be possible:
> >> > > >> >> >> >>> >>
> >> > > >> >> >> >>> >>     val kafkaParams = Map[String, Object](
> >> > > >> >> >> >>> >>       "bootstrap.servers" ->
> >> > > conf.getString("kafka.brokers"),
> >> > > >> >> >> >>> >>       "key.deserializer" ->
> >> classOf[StringDeserializer],
> >> > > >> >> >> >>> >>       "value.deserializer" ->
> >> > classOf[StringDeserializer],
> >> > > >> >> >> >>> >>       "group.id" -> "example",
> >> > > >> >> >> >>> >>       "auto.offset.reset" -> "none"
> >> > > >> >> >> >>> >>     ).asJava
> >> > > >> >> >> >>> >>     val topics =
> >> > > >> >> >> conf.getString("kafka.topics").split(",").toList.asJava
> >> > > >> >> >> >>> >>     val consumer = new KafkaConsumer[String,
> >> > > >> String](kafkaParams)
> >> > > >> >> >> >>> >>     consumer.subscribe(topics)
> >> > > >> >> >> >>> >>     consumer.seekToBeginning()
> >> > > >> >> >> >>> >>     consumer.poll(0)
> >> > > >> >> >> >>> >>
> >> > > >> >> >> >>> >>
> >> > > >> >> >> >>> >> Results in:
> >> > > >> >> >> >>> >>
> >> > > >> >> >> >>> >> Exception in thread "main"
> >> > > >> >> >> >>> >>
> >> > > >> org.apache.kafka.clients.consumer.NoOffsetForPartitionException:
> >> > > >> >> >> >>> >> Undefined offset with no reset policy for partition:
> >> > > >> testtwo-4
> >> > > >> >> >> >>> >>         at
> >> > > >> >> >> >>> >>
> >> > > >> >> >> >>>
> >> > > >> >> >>
> >> > > >> >>
> >> > > >>
> >> > >
> >> >
> >>
> org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:288)
> >> > > >> >> >> >>> >>         at
> >> > > >> >> >> >>> >>
> >> > > >> >> >> >>>
> >> > > >> >> >>
> >> > > >> >>
> >> > > >>
> >> > >
> >> >
> >>
> org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:167)
> >> > > >> >> >> >>> >>         at
> >> > > >> >> >> >>> >>
> >> > > >> >> >> >>>
> >> > > >> >> >>
> >> > > >> >>
> >> > > >>
> >> > >
> >> >
> >>
> org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1302)
> >> > > >> >> >> >>> >>         at
> >> > > >> >> >> >>> >>
> >> > > >> >> >> >>>
> >> > > >> >> >>
> >> > > >> >>
> >> > > >>
> >> > >
> >> >
> >>
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:895)
> >> > > >> >> >> >>> >>         at
> >> > > >> >> >> >>> >>
> >> > > >> >> >> >>>
> >> > > >> >> >>
> >> > > >> >>
> >> > > >>
> >> > >
> >> >
> >>
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
> >> > > >> >> >> >>> >>         at
> >> > > >> >> >>
> example.BasicKafkaConsumer$.main(BasicKafkaConsumer.scala:25)
> >> > > >> >> >> >>> >>
> >> > > >> >> >> >>> >>
> >> > > >> >> >> >>> >> I'm assuming this is because, at the time
> >> > seekToBeginning()
> >> > > >> is
> >> > > >> >> >> called,
> >> > > >> >> >> >>> >> subscriptions.assignedPartitions isn't populated.
> But
> >> > > >> polling in
> >> > > >> >> >> >>> >> order to assign topicpartitions results in an error,
> >> > which
> >> > > >> >> creates a
> >> > > >> >> >> >>> >> chicken-or-the-egg situation.
> >> > > >> >> >> >>> >>
> >> > > >> >> >> >>> >> I don't want to set auto.offset.reset, because I
> want a
> >> > > hard
> >> > > >> >> error
> >> > > >> >> >> if
> >> > > >> >> >> >>> >> the offsets are out of range at any other time
> during
> >> > > >> >> consumption.
> >> > > >> >> >> >>> >>
> >> > > >> >> >> >>> >
> >> > > >> >> >> >>> >
> >> > > >> >> >> >>> >
> >> > > >> >> >> >>> > --
> >> > > >> >> >> >>> > -- Guozhang
> >> > > >> >> >> >>>
> >> > > >> >> >>
> >> > > >> >> >
> >> > > >> >> >
> >> > > >> >> >
> >> > > >> >> > --
> >> > > >> >> > -- Guozhang
> >> > > >> >>
> >> > > >> >
> >> > > >> >
> >> > > >> >
> >> > > >> > --
> >> > > >> > -- Guozhang
> >> > > >>
> >> > > >
> >> > > >
> >> > > >
> >> > > > --
> >> > > > -- Guozhang
> >> > > >
> >> > >
> >> > >
> >> > >
> >> > > --
> >> > > -- Guozhang
> >> > >
> >> >
> >>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
>

Reply via email to