Regarding the rebalance listener, in the case of the spark
integration, it is possible a job can fail and be restarted from
checkpoint in a new jvm.  That means that you need to be able to
reconstruct objects.  Any reasonable rebalance listener can't have a
0-arg constructor, because it needs a reference to the consumer in
order to do anything useful.  I can get around this by making the user
provide a 0-arg function to return a fully configured + subscribed
Kafka consumer, so that they get to do the dance of constructing a
consumer before constructing the listener.  I don't think the current
rebalance listener is a dealbreaker, I just think it is an
unnecessarily awkward API.

Regarding poll(), I honestly don't see what the problem is with
keeping poll() behavior as is, but also having public methods to e.g.
fetch metadata, for people that need that functionality without
actually consuming messages.

On Mon, Mar 14, 2016 at 1:01 PM, Jason Gustafson <ja...@confluent.io> wrote:
> Late arrival to this discussion. I'm not really sure I see the problem with
> accessing the consumer in the rebalance listener. Before we passed the
> consumer instance as a separate argument, but that was only because the
> rebalance listener had to be passed by classname before a reference to the
> consumer was available. After we changed the API to pass it in subscribe()
> instead, getting a reference to the consumer shouldn't be a problem. Maybe
> I'm missing something?
>
> As for poll() doing everything... It's definitely caused some confusion,
> but I'm a little doubtful that trying to split out the functionality is
> going to help as much as people think. A heartbeat() API has come up
> several times, for example, but you have to figure out how it's going to
> affect rebalancing. And rebalancing affects which partitions need to be
> fetched or can be committed. They're all so intertwined that I'm not sure
> you can divide them up without creating a bigger problem than the one
> you're trying to solve. At this point, with 0.10 shortly on the way, it
> seems unlikely that incompatible changes to the API will be accepted.
> However, if someone can propose a compatible solution which addresses some
> of the concerns mentioned, we'd love to hear about it!
>
> -Jason
>
> On Mon, Mar 14, 2016 at 9:21 AM, Cody Koeninger <c...@koeninger.org> wrote:
>
>> Honestly the fact that everything is hidden inside poll() has been
>> confusing people since last year, e.g.
>>
>> https://issues.apache.org/jira/browse/KAFKA-2359
>>
>> I can try to formulate a KIP for this, but it seems clear that I'm not
>> the only one giving this feedback, and I may not understand all the
>> other use cases that have been brought up.
>>
>> On Sun, Mar 13, 2016 at 3:09 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>> > Cody,
>> >
>> > We do not have an umbrella JIRA for this, but rather a case-by-case JIRA
>> > ticket / KIP for API changes in consumer.
>> >
>> > If you feel strong about some specific change on the consumer API, please
>> > feel free to create a new KIP with the detailed motivation and proposed
>> > modifications.
>> >
>> > Guozhang
>> >
>> > On Fri, Mar 11, 2016 at 12:28 PM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>> >
>> >> Is there a KIP or Jira related to " working on improving these cases
>> >> with improved APIs " ?
>> >>
>> >> I saw that there was some discussion of it in KIP-41, but that seemed
>> >> to have been resolved in favor of keeping everything inside of poll()
>> >>
>> >> On Fri, Mar 11, 2016 at 11:17 AM, Guozhang Wang <wangg...@gmail.com>
>> >> wrote:
>> >> > Cody, Mansi:
>> >> >
>> >> > All good points! Let me try to answer them one-by-one.
>> >> >
>> >> > About this specific issue, as I suggested in the JIRA we can separate
>> the
>> >> > case about resetting offset upon initializing a partition to fetch,
>> from
>> >> > the case that fetching offset out-of-range in the auto.offset.reset
>> >> config.
>> >> > These two scenarios are indeed quite different and it's reasonable
>> >> treating
>> >> > them differently.
>> >> >
>> >> > About passing a consumer context to the rebalance callback's
>> constructor,
>> >> > we left it for user's flexibility: if you want to use Kafka to commit
>> >> > offsets, for example, then you pass the consumer reference to the
>> >> callback;
>> >> > if you use an external service to store offsets, you can pass a JDBC
>> >> > connector, for example, to the callback; for some data mirroring you
>> can
>> >> > even pass in another producer client into it. Always enforcing the
>> >> consumer
>> >> > context could be convenient (i.e. you do not need to pass in the
>> argument
>> >> > to the constructor yourself) for some use cases, but not necessarily
>> all.
>> >> >
>> >> > About wrapping coordination protocols (partition assignment,
>> heartbeat)
>> >> > inside "poll()" behind the scene, we implemented the APIs in this way
>> in
>> >> > order to abstract the underlying details from the users, and also to
>> >> > provide a simple "single-thread-poll-loop" design pattern in the new
>> >> > Consumer. We realized that it does actually make some of the use cases
>> >> more
>> >> > awkward, and are working on improving these cases with improved APIs
>> as
>> >> > well. Let us know if you have any suggestions about this.
>> >> >
>> >> > Guozhang
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > On Thu, Mar 10, 2016 at 7:53 AM, Mansi Shah <mansis...@maprtech.com>
>> >> wrote:
>> >> >
>> >> >> I second the need for having a consumer context passed to rebalance
>> >> >> callback. I have ran into issues several times because of that.
>> >> >>
>> >> >> About - subscribe vs assign - I have not read through your spark code
>> >> yet
>> >> >> (will do by eod), so I am not sure what you mean (other than I do
>> agree
>> >> >> that new partitions should be consumed automatically). I guess we can
>> >> >> continue this discussion on the spark list then :-)
>> >> >>
>> >> >> Thanks
>> >> >> Mansi.
>> >> >>
>> >> >> On Thu, Mar 10, 2016 at 7:43 AM, Cody Koeninger <c...@koeninger.org>
>> >> >> wrote:
>> >> >>
>> >> >> > Mansi, I'd agree that the fact that everything is tied up in poll
>> >> >> > seems like the source of the awkward behavior.
>> >> >> >
>> >> >> > Regarding assign vs subscribe, most people using the spark
>> integration
>> >> >> > are just going to want to provide a topic name, not go figure out a
>> >> >> > bunch of partitions.  They're also going to be surprised if things
>> >> >> > suddenly blow up once a partition is added, or that partition
>> doesn't
>> >> >> > start being consumed (we already have that second issue today).
>> >> >> >
>> >> >> > Thats why separating the behavior of auto offset reset seems like
>> the
>> >> >> > best idea I've heard so far.
>> >> >> >
>> >> >> > Consumer rebalance listeners are still probably going to be
>> necessary
>> >> >> > for people who are storing offsets externally.
>> >> >> >
>> >> >> > On Thu, Mar 10, 2016 at 9:27 AM, Mansi Shah <
>> mansis...@maprtech.com>
>> >> >> > wrote:
>> >> >> > > Guozhang
>> >> >> > >
>> >> >> > > Sorry for joining the party a little late. I have been thinking
>> >> about
>> >> >> > this
>> >> >> > > whole awkward behavior of having to call poll(0) to actually make
>> >> the
>> >> >> > > underlying subscriptions take effect. Is the core reason for this
>> >> >> design
>> >> >> > > the fact that poll is also the actual heartbeat and you want to
>> make
>> >> >> the
>> >> >> > > listener group assignments through poll - so that timeouts and
>> >> >> > > reassignments can all go through poll? So I think clubbing
>> liveness
>> >> >> with
>> >> >> > > poll (which in effect clubs consumer group assignments and hence
>> >> >> metadata
>> >> >> > > fetch with poll) is the real cause of this design. Were there
>> issues
>> >> >> > where
>> >> >> > > you were seeing active consumers not calling poll that led to
>> this
>> >> >> design
>> >> >> > > choice? I tried to look for a relevant JIRA but could not find
>> one -
>> >> >> can
>> >> >> > > you please point me to something if you have it handy?
>> >> >> > >
>> >> >> > > Btw this would also means that your proposal to do the actual
>> >> >> assignments
>> >> >> > > through seek might not be ideal since there can still be
>> indefinite
>> >> >> time
>> >> >> > > between seek and poll (just like between subscribe and poll) and
>> the
>> >> >> > > consumer could timeout even before the first poll is called?
>> >> >> > >
>> >> >> > >
>> >> >> > > @Cody in your case if you really have only one consumer and it is
>> >> going
>> >> >> > to
>> >> >> > > get all the partitions of the topic anyway - then you might as
>> well
>> >> >> > > subscribe using "assign" call instead of "subscribe" call. That
>> will
>> >> >> make
>> >> >> > > at least your code cleaner and I do not think you are gaining
>> >> anything
>> >> >> > with
>> >> >> > > the listener group functionality anyway?
>> >> >> > >
>> >> >> > > - Mansi.
>> >> >> > >
>> >> >> > >
>> >> >> > >
>> >> >> > > On Wed, Mar 9, 2016 at 8:35 PM, Guozhang Wang <
>> wangg...@gmail.com>
>> >> >> > wrote:
>> >> >> > >
>> >> >> > >> In order to do anything meaningful with the consumer itself in
>> >> >> rebalance
>> >> >> > >> callback (e.g. commit offset), you would need to hold on the
>> >> consumer
>> >> >> > >> reference; admittedly it sounds a bit awkward, but by design we
>> >> choose
>> >> >> > to
>> >> >> > >> not enforce it in the interface itself.
>> >> >> > >>
>> >> >> > >> Guozhang
>> >> >> > >>
>> >> >> > >> On Wed, Mar 9, 2016 at 3:39 PM, Cody Koeninger <
>> c...@koeninger.org
>> >> >
>> >> >> > wrote:
>> >> >> > >>
>> >> >> > >> > So what about my comments regarding the consumer rebalance
>> >> listener
>> >> >> > >> > interface not providing access to a consumer?  I can probably
>> >> work
>> >> >> > around
>> >> >> > >> > it, but it seems odd.
>> >> >> > >> > On Mar 9, 2016 5:04 PM, "Guozhang Wang" <wangg...@gmail.com>
>> >> wrote:
>> >> >> > >> >
>> >> >> > >> > > One thing proposed by Jason:
>> >> >> > >> > >
>> >> >> > >> > > If you want to only reset offset upon initialization, and by
>> >> >> > >> > initialization
>> >> >> > >> > > you mean "no committed offset", you can do sth. like the
>> >> following
>> >> >> > in
>> >> >> > >> > > rebalance callback.
>> >> >> > >> > >
>> >> >> > >> > >                 @Override
>> >> >> > >> > >
>> >> >> > >> > >                 public void
>> >> >> > >> > onPartitionsAssigned(Collection<TopicPartition>
>> >> >> > >> > > partitions) {
>> >> >> > >> > >
>> >> >> > >> > >                     for (TopicPartition partition :
>> partitions)
>> >> >> > >> > >
>> >> >> > >> > >                         if (consumer.committed(partition) ==
>> >> null)
>> >> >> > >> > >
>> >> >> > >> > >
>> >>  consumer.seekToBeginning(partition);
>> >> >> > >> > >
>> >> >> > >> > >                 }
>> >> >> > >> > >
>> >> >> > >> > >
>> >> >> > >> > > Guozhang
>> >> >> > >> > >
>> >> >> > >> > > On Wed, Mar 9, 2016 at 2:11 PM, Guozhang Wang <
>> >> wangg...@gmail.com
>> >> >> >
>> >> >> > >> > wrote:
>> >> >> > >> > >
>> >> >> > >> > > > Filed https://issues.apache.org/jira/browse/KAFKA-3370.
>> >> >> > >> > > >
>> >> >> > >> > > > On Wed, Mar 9, 2016 at 1:11 PM, Cody Koeninger <
>> >> >> > c...@koeninger.org>
>> >> >> > >> > > wrote:
>> >> >> > >> > > >
>> >> >> > >> > > >> That sounds like an interesting way of addressing the
>> >> problem,
>> >> >> > can
>> >> >> > >> > > >> continue further discussions on the JIRA
>> >> >> > >> > > >>
>> >> >> > >> > > >>
>> >> >> > >> > > >>
>> >> >> > >> > > >> On Wed, Mar 9, 2016 at 2:59 PM, Guozhang Wang <
>> >> >> > wangg...@gmail.com>
>> >> >> > >> > > wrote:
>> >> >> > >> > > >> > Cody:
>> >> >> > >> > > >> >
>> >> >> > >> > > >> > More specifically, you do not need the "listTopics"
>> >> function
>> >> >> if
>> >> >> > >> you
>> >> >> > >> > > >> already
>> >> >> > >> > > >> > know your subscribed topics, just use "partitionsFor"
>> is
>> >> >> > >> sufficient.
>> >> >> > >> > > >> >
>> >> >> > >> > > >> > About the fix, I'm thinking of adding two more options
>> in
>> >> the
>> >> >> > >> > > >> > auto.offset.rest, say namely "earliest-on-start" and
>> >> >> > >> > > "latest-on-start",
>> >> >> > >> > > >> > which sets the reset position ONLY at starting up. The
>> >> reason
>> >> >> > is
>> >> >> > >> > that
>> >> >> > >> > > >> the
>> >> >> > >> > > >> > seekToXX was actually not designed to do such
>> >> initialization
>> >> >> > but
>> >> >> > >> for
>> >> >> > >> > > >> > calling during the lifetime of the consumer, and we'd
>> >> better
>> >> >> > >> provide
>> >> >> > >> > > the
>> >> >> > >> > > >> > right solution to do so.
>> >> >> > >> > > >> >
>> >> >> > >> > > >> > I can file the JIRA right away and start further
>> >> discussions
>> >> >> > >> there.
>> >> >> > >> > > But
>> >> >> > >> > > >> let
>> >> >> > >> > > >> > me know if you have any other ideas.
>> >> >> > >> > > >> >
>> >> >> > >> > > >> > Guozhang
>> >> >> > >> > > >> >
>> >> >> > >> > > >> > On Wed, Mar 9, 2016 at 12:25 PM, Cody Koeninger <
>> >> >> > >> c...@koeninger.org
>> >> >> > >> > >
>> >> >> > >> > > >> wrote:
>> >> >> > >> > > >> >
>> >> >> > >> > > >> >> Yeah, I think I understood what you were saying.  What
>> >> I'm
>> >> >> > saying
>> >> >> > >> > is
>> >> >> > >> > > >> >> that if there were a way to just fetch metadata
>> without
>> >> >> doing
>> >> >> > the
>> >> >> > >> > > rest
>> >> >> > >> > > >> >> of the work poll() does, it wouldn't be necessary.  I
>> >> guess
>> >> >> I
>> >> >> > can
>> >> >> > >> > do
>> >> >> > >> > > >> >> listTopics to get all metadata for all topics and then
>> >> parse
>> >> >> > it.
>> >> >> > >> > > >> >>
>> >> >> > >> > > >> >> Regarding running a single instance, that is the case
>> for
>> >> >> what
>> >> >> > >> I'm
>> >> >> > >> > > >> >> talking about.
>> >> >> > >> > > >> >>
>> >> >> > >> > > >> >> On Wed, Mar 9, 2016 at 2:02 PM, Guozhang Wang <
>> >> >> > >> wangg...@gmail.com>
>> >> >> > >> > > >> wrote:
>> >> >> > >> > > >> >> > Cody,
>> >> >> > >> > > >> >> >
>> >> >> > >> > > >> >> > What I meant for a special case of `seekToXX` is
>> that,
>> >> >> today
>> >> >> > >> when
>> >> >> > >> > > the
>> >> >> > >> > > >> >> > function is called with no partition parameters. It
>> >> will
>> >> >> > try to
>> >> >> > >> > > >> execute
>> >> >> > >> > > >> >> the
>> >> >> > >> > > >> >> > logic on all "assigned" partitions for the consumer.
>> >> And
>> >> >> > once
>> >> >> > >> > that
>> >> >> > >> > > is
>> >> >> > >> > > >> >> done,
>> >> >> > >> > > >> >> > the subsequent poll() will not throw the exception
>> >> since
>> >> >> it
>> >> >> > >> knows
>> >> >> > >> > > >> those
>> >> >> > >> > > >> >> > partitions needs to reset offsets.
>> >> >> > >> > > >> >> >
>> >> >> > >> > > >> >> > However for your case, there is no assigned
>> partitions
>> >> >> yet,
>> >> >> > and
>> >> >> > >> > > hence
>> >> >> > >> > > >> >> > `seekToXX` will not take effects on any partitions.
>> The
>> >> >> > >> > assignment
>> >> >> > >> > > is
>> >> >> > >> > > >> >> > wrapped in the poll() call as you mentioned. And one
>> >> way
>> >> >> to
>> >> >> > >> solve
>> >> >> > >> > > it
>> >> >> > >> > > >> is
>> >> >> > >> > > >> >> to
>> >> >> > >> > > >> >> > let the `seekToXX()` with no parameters do the
>> >> >> coordination
>> >> >> > and
>> >> >> > >> > get
>> >> >> > >> > > >> the
>> >> >> > >> > > >> >> > assigned partitions if there are any subscribed
>> >> topics, so
>> >> >> > that
>> >> >> > >> > the
>> >> >> > >> > > >> >> > subsequent poll() will know those partitions need
>> >> >> resetting
>> >> >> > >> > > offsets.
>> >> >> > >> > > >> Does
>> >> >> > >> > > >> >> > that make sense?
>> >> >> > >> > > >> >> >
>> >> >> > >> > > >> >> > As for now another way I can think of is to get the
>> >> >> > partition
>> >> >> > >> > info
>> >> >> > >> > > >> >> > beforehand and call `seekToBeginning` on all
>> >> partitions.
>> >> >> But
>> >> >> > >> that
>> >> >> > >> > > >> only
>> >> >> > >> > > >> >> > works if the consumer knows it is going to get all
>> the
>> >> >> > >> partitions
>> >> >> > >> > > >> >> assigned
>> >> >> > >> > > >> >> > to itself (i.e. you are only running a single
>> >> instance).
>> >> >> > >> > > >> >> >
>> >> >> > >> > > >> >> > Guozhang
>> >> >> > >> > > >> >> >
>> >> >> > >> > > >> >> >
>> >> >> > >> > > >> >> > On Wed, Mar 9, 2016 at 6:22 AM, Cody Koeninger <
>> >> >> > >> > c...@koeninger.org
>> >> >> > >> > > >
>> >> >> > >> > > >> >> wrote:
>> >> >> > >> > > >> >> >
>> >> >> > >> > > >> >> >> Another unfortunate thing about
>> >> ConsumerRebalanceListener
>> >> >> > is
>> >> >> > >> > that
>> >> >> > >> > > in
>> >> >> > >> > > >> >> >> order to do meaningful work in the callback, you
>> need
>> >> a
>> >> >> > >> > reference
>> >> >> > >> > > to
>> >> >> > >> > > >> >> >> the consumer that called it.  But that reference
>> isn't
>> >> >> > >> provided
>> >> >> > >> > to
>> >> >> > >> > > >> the
>> >> >> > >> > > >> >> >> callback, which means the listener implementation
>> >> needs
>> >> >> to
>> >> >> > >> hold
>> >> >> > >> > a
>> >> >> > >> > > >> >> >> reference to the consumer.  Seems like this makes
>> it
>> >> >> > >> > unnecessarily
>> >> >> > >> > > >> >> >> awkward to serialize or provide a 0 arg constructor
>> >> for
>> >> >> the
>> >> >> > >> > > >> listener.
>> >> >> > >> > > >> >> >>
>> >> >> > >> > > >> >> >> On Wed, Mar 9, 2016 at 7:28 AM, Cody Koeninger <
>> >> >> > >> > > c...@koeninger.org>
>> >> >> > >> > > >> >> wrote:
>> >> >> > >> > > >> >> >> > I thought about ConsumerRebalanceListener, but
>> >> seeking
>> >> >> to
>> >> >> > >> the
>> >> >> > >> > > >> >> >> > beginning any time there's a rebalance for
>> whatever
>> >> >> > reason
>> >> >> > >> is
>> >> >> > >> > > not
>> >> >> > >> > > >> >> >> > necessarily the same thing as seeking to the
>> >> beginning
>> >> >> > >> before
>> >> >> > >> > > >> first
>> >> >> > >> > > >> >> >> > starting the consumer.
>> >> >> > >> > > >> >> >> >
>> >> >> > >> > > >> >> >> > On Wed, Mar 9, 2016 at 2:24 AM, Kamal C <
>> >> >> > >> > kamaltar...@gmail.com>
>> >> >> > >> > > >> >> wrote:
>> >> >> > >> > > >> >> >> >> Cody,
>> >> >> > >> > > >> >> >> >>
>> >> >> > >> > > >> >> >> >> Use ConsumerRebalanceListener to achieve that,
>> >> >> > >> > > >> >> >> >>
>> >> >> > >> > > >> >> >> >> ConsumerRebalanceListener listener = new
>> >> >> > >> > > >> ConsumerRebalanceListener()
>> >> >> > >> > > >> >> {
>> >> >> > >> > > >> >> >> >>
>> >> >> > >> > > >> >> >> >>             @Override
>> >> >> > >> > > >> >> >> >>             public void
>> >> >> > >> > > >> >> onPartitionsRevoked(Collection<TopicPartition>
>> >> >> > >> > > >> >> >> >> partitions) {
>> >> >> > >> > > >> >> >> >>             }
>> >> >> > >> > > >> >> >> >>
>> >> >> > >> > > >> >> >> >>             @Override
>> >> >> > >> > > >> >> >> >>             public void
>> >> >> > >> > > >> >> onPartitionsAssigned(Collection<TopicPartition>
>> >> >> > >> > > >> >> >> >> partitions) {
>> >> >> > >> > > >> >> >> >>
>> >> >> > >> >  consumer.seekToBeginning(partitions.toArray(new
>> >> >> > >> > > >> >> >> >> TopicPartition[0]));
>> >> >> > >> > > >> >> >> >>             }
>> >> >> > >> > > >> >> >> >>         };
>> >> >> > >> > > >> >> >> >>
>> >> >> > >> > > >> >> >> >> consumer.subscribe(topics, listener);
>> >> >> > >> > > >> >> >> >>
>> >> >> > >> > > >> >> >> >> On Wed, Mar 9, 2016 at 12:05 PM, Cody Koeninger
>> <
>> >> >> > >> > > >> c...@koeninger.org>
>> >> >> > >> > > >> >> >> wrote:
>> >> >> > >> > > >> >> >> >>
>> >> >> > >> > > >> >> >> >>> That suggestion doesn't work, for pretty much
>> the
>> >> >> same
>> >> >> > >> > reason
>> >> >> > >> > > -
>> >> >> > >> > > >> at
>> >> >> > >> > > >> >> the
>> >> >> > >> > > >> >> >> >>> time poll is first called, there is no reset
>> >> policy
>> >> >> > and no
>> >> >> > >> > > >> committed
>> >> >> > >> > > >> >> >> >>> offset, so NoOffsetForPartitionException is
>> thrown
>> >> >> > >> > > >> >> >> >>>
>> >> >> > >> > > >> >> >> >>> I feel like the underlying problem isn't so
>> much
>> >> that
>> >> >> > >> > > seekToEnd
>> >> >> > >> > > >> >> needs
>> >> >> > >> > > >> >> >> >>> special case behavior.  It's more that  topic
>> >> >> metadata
>> >> >> > >> > > fetches,
>> >> >> > >> > > >> >> >> >>> consumer position fetches, and message fetches
>> are
>> >> >> all
>> >> >> > >> > lumped
>> >> >> > >> > > >> >> together
>> >> >> > >> > > >> >> >> >>> under a single poll() call, with no way to do
>> them
>> >> >> > >> > > individually
>> >> >> > >> > > >> if
>> >> >> > >> > > >> >> >> >>> necessary.
>> >> >> > >> > > >> >> >> >>>
>> >> >> > >> > > >> >> >> >>> What does "work" in this situation is to just
>> >> catch
>> >> >> the
>> >> >> > >> > > >> exception
>> >> >> > >> > > >> >> >> >>> (which leaves the consumer in a state where
>> topics
>> >> >> are
>> >> >> > >> > > >> assigned) and
>> >> >> > >> > > >> >> >> >>> then seek.  But that is not exactly an elegant
>> >> >> > interface.
>> >> >> > >> > > >> >> >> >>>
>> >> >> > >> > > >> >> >> >>>     consumer.subscribe(topics)
>> >> >> > >> > > >> >> >> >>>     try {
>> >> >> > >> > > >> >> >> >>>       consumer.poll(0)
>> >> >> > >> > > >> >> >> >>>     } catch {
>> >> >> > >> > > >> >> >> >>>       case x: Throwable =>
>> >> >> > >> > > >> >> >> >>>     }
>> >> >> > >> > > >> >> >> >>>     consumer.seekToBeginning()
>> >> >> > >> > > >> >> >> >>>     consumer.poll(0)
>> >> >> > >> > > >> >> >> >>>
>> >> >> > >> > > >> >> >> >>>
>> >> >> > >> > > >> >> >> >>>
>> >> >> > >> > > >> >> >> >>>
>> >> >> > >> > > >> >> >> >>> On Tue, Mar 8, 2016 at 11:22 PM, Guozhang Wang
>> <
>> >> >> > >> > > >> wangg...@gmail.com>
>> >> >> > >> > > >> >> >> wrote:
>> >> >> > >> > > >> >> >> >>> > Hi Cody,
>> >> >> > >> > > >> >> >> >>> >
>> >> >> > >> > > >> >> >> >>> > The problem with that code is in
>> >> >> `seekToBeginning()`
>> >> >> > >> > > followed
>> >> >> > >> > > >> by
>> >> >> > >> > > >> >> >> >>> > `subscribe(topic)`.
>> >> >> > >> > > >> >> >> >>> >
>> >> >> > >> > > >> >> >> >>> > Since `subscribe` call is lazy evaluated, by
>> the
>> >> >> time
>> >> >> > >> > > >> >> >> `seekToBeginning()`
>> >> >> > >> > > >> >> >> >>> > is called no partition is assigned yet, and
>> >> hence
>> >> >> it
>> >> >> > is
>> >> >> > >> > > >> >> effectively
>> >> >> > >> > > >> >> >> an
>> >> >> > >> > > >> >> >> >>> > no-op.
>> >> >> > >> > > >> >> >> >>> >
>> >> >> > >> > > >> >> >> >>> > Try
>> >> >> > >> > > >> >> >> >>> >
>> >> >> > >> > > >> >> >> >>> >     consumer.subscribe(topics)
>> >> >> > >> > > >> >> >> >>> >     consumer.poll(0);  // get assigned
>> >> partitions
>> >> >> > >> > > >> >> >> >>> >     consumer.seekToBeginning()
>> >> >> > >> > > >> >> >> >>> >     consumer.poll(0)
>> >> >> > >> > > >> >> >> >>> >
>> >> >> > >> > > >> >> >> >>> > to see if that works.
>> >> >> > >> > > >> >> >> >>> >
>> >> >> > >> > > >> >> >> >>> > I think it is a valid issue that can be
>> fixed in
>> >> >> the
>> >> >> > new
>> >> >> > >> > > >> consumer
>> >> >> > >> > > >> >> >> that,
>> >> >> > >> > > >> >> >> >>> > upon calling seekToEnd/Beginning with no
>> >> parameter,
>> >> >> > >> while
>> >> >> > >> > no
>> >> >> > >> > > >> >> >> assigned is
>> >> >> > >> > > >> >> >> >>> > done yet, do the coordination behind the
>> scene;
>> >> it
>> >> >> > will
>> >> >> > >> > > though
>> >> >> > >> > > >> >> >> change the
>> >> >> > >> > > >> >> >> >>> > behavior of the functions as they are no
>> longer
>> >> >> > always
>> >> >> > >> > > lazily
>> >> >> > >> > > >> >> >> evaluated.
>> >> >> > >> > > >> >> >> >>> >
>> >> >> > >> > > >> >> >> >>> >
>> >> >> > >> > > >> >> >> >>> > Guozhang
>> >> >> > >> > > >> >> >> >>> >
>> >> >> > >> > > >> >> >> >>> >
>> >> >> > >> > > >> >> >> >>> > On Tue, Mar 8, 2016 at 2:08 PM, Cody
>> Koeninger <
>> >> >> > >> > > >> >> c...@koeninger.org>
>> >> >> > >> > > >> >> >> >>> wrote:
>> >> >> > >> > > >> >> >> >>> >
>> >> >> > >> > > >> >> >> >>> >> Using the 0.9 consumer, I would like to
>> start
>> >> >> > consuming
>> >> >> > >> > at
>> >> >> > >> > > >> the
>> >> >> > >> > > >> >> >> >>> >> beginning or end, without specifying
>> >> >> > auto.offset.reset.
>> >> >> > >> > > >> >> >> >>> >>
>> >> >> > >> > > >> >> >> >>> >> This does not seem to be possible:
>> >> >> > >> > > >> >> >> >>> >>
>> >> >> > >> > > >> >> >> >>> >>     val kafkaParams = Map[String, Object](
>> >> >> > >> > > >> >> >> >>> >>       "bootstrap.servers" ->
>> >> >> > >> > > conf.getString("kafka.brokers"),
>> >> >> > >> > > >> >> >> >>> >>       "key.deserializer" ->
>> >> >> > >> classOf[StringDeserializer],
>> >> >> > >> > > >> >> >> >>> >>       "value.deserializer" ->
>> >> >> > >> > classOf[StringDeserializer],
>> >> >> > >> > > >> >> >> >>> >>       "group.id" -> "example",
>> >> >> > >> > > >> >> >> >>> >>       "auto.offset.reset" -> "none"
>> >> >> > >> > > >> >> >> >>> >>     ).asJava
>> >> >> > >> > > >> >> >> >>> >>     val topics =
>> >> >> > >> > > >> >> >>
>> >> conf.getString("kafka.topics").split(",").toList.asJava
>> >> >> > >> > > >> >> >> >>> >>     val consumer = new KafkaConsumer[String,
>> >> >> > >> > > >> String](kafkaParams)
>> >> >> > >> > > >> >> >> >>> >>     consumer.subscribe(topics)
>> >> >> > >> > > >> >> >> >>> >>     consumer.seekToBeginning()
>> >> >> > >> > > >> >> >> >>> >>     consumer.poll(0)
>> >> >> > >> > > >> >> >> >>> >>
>> >> >> > >> > > >> >> >> >>> >>
>> >> >> > >> > > >> >> >> >>> >> Results in:
>> >> >> > >> > > >> >> >> >>> >>
>> >> >> > >> > > >> >> >> >>> >> Exception in thread "main"
>> >> >> > >> > > >> >> >> >>> >>
>> >> >> > >> > > >>
>> >> >> org.apache.kafka.clients.consumer.NoOffsetForPartitionException:
>> >> >> > >> > > >> >> >> >>> >> Undefined offset with no reset policy for
>> >> >> partition:
>> >> >> > >> > > >> testtwo-4
>> >> >> > >> > > >> >> >> >>> >>         at
>> >> >> > >> > > >> >> >> >>> >>
>> >> >> > >> > > >> >> >> >>>
>> >> >> > >> > > >> >> >>
>> >> >> > >> > > >> >>
>> >> >> > >> > > >>
>> >> >> > >> > >
>> >> >> > >> >
>> >> >> > >>
>> >> >> >
>> >> >>
>> >>
>> org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:288)
>> >> >> > >> > > >> >> >> >>> >>         at
>> >> >> > >> > > >> >> >> >>> >>
>> >> >> > >> > > >> >> >> >>>
>> >> >> > >> > > >> >> >>
>> >> >> > >> > > >> >>
>> >> >> > >> > > >>
>> >> >> > >> > >
>> >> >> > >> >
>> >> >> > >>
>> >> >> >
>> >> >>
>> >>
>> org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:167)
>> >> >> > >> > > >> >> >> >>> >>         at
>> >> >> > >> > > >> >> >> >>> >>
>> >> >> > >> > > >> >> >> >>>
>> >> >> > >> > > >> >> >>
>> >> >> > >> > > >> >>
>> >> >> > >> > > >>
>> >> >> > >> > >
>> >> >> > >> >
>> >> >> > >>
>> >> >> >
>> >> >>
>> >>
>> org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1302)
>> >> >> > >> > > >> >> >> >>> >>         at
>> >> >> > >> > > >> >> >> >>> >>
>> >> >> > >> > > >> >> >> >>>
>> >> >> > >> > > >> >> >>
>> >> >> > >> > > >> >>
>> >> >> > >> > > >>
>> >> >> > >> > >
>> >> >> > >> >
>> >> >> > >>
>> >> >> >
>> >> >>
>> >>
>> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:895)
>> >> >> > >> > > >> >> >> >>> >>         at
>> >> >> > >> > > >> >> >> >>> >>
>> >> >> > >> > > >> >> >> >>>
>> >> >> > >> > > >> >> >>
>> >> >> > >> > > >> >>
>> >> >> > >> > > >>
>> >> >> > >> > >
>> >> >> > >> >
>> >> >> > >>
>> >> >> >
>> >> >>
>> >>
>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
>> >> >> > >> > > >> >> >> >>> >>         at
>> >> >> > >> > > >> >> >>
>> >> >> > example.BasicKafkaConsumer$.main(BasicKafkaConsumer.scala:25)
>> >> >> > >> > > >> >> >> >>> >>
>> >> >> > >> > > >> >> >> >>> >>
>> >> >> > >> > > >> >> >> >>> >> I'm assuming this is because, at the time
>> >> >> > >> > seekToBeginning()
>> >> >> > >> > > >> is
>> >> >> > >> > > >> >> >> called,
>> >> >> > >> > > >> >> >> >>> >> subscriptions.assignedPartitions isn't
>> >> populated.
>> >> >> > But
>> >> >> > >> > > >> polling in
>> >> >> > >> > > >> >> >> >>> >> order to assign topicpartitions results in
>> an
>> >> >> error,
>> >> >> > >> > which
>> >> >> > >> > > >> >> creates a
>> >> >> > >> > > >> >> >> >>> >> chicken-or-the-egg situation.
>> >> >> > >> > > >> >> >> >>> >>
>> >> >> > >> > > >> >> >> >>> >> I don't want to set auto.offset.reset,
>> because
>> >> I
>> >> >> > want a
>> >> >> > >> > > hard
>> >> >> > >> > > >> >> error
>> >> >> > >> > > >> >> >> if
>> >> >> > >> > > >> >> >> >>> >> the offsets are out of range at any other
>> time
>> >> >> > during
>> >> >> > >> > > >> >> consumption.
>> >> >> > >> > > >> >> >> >>> >>
>> >> >> > >> > > >> >> >> >>> >
>> >> >> > >> > > >> >> >> >>> >
>> >> >> > >> > > >> >> >> >>> >
>> >> >> > >> > > >> >> >> >>> > --
>> >> >> > >> > > >> >> >> >>> > -- Guozhang
>> >> >> > >> > > >> >> >> >>>
>> >> >> > >> > > >> >> >>
>> >> >> > >> > > >> >> >
>> >> >> > >> > > >> >> >
>> >> >> > >> > > >> >> >
>> >> >> > >> > > >> >> > --
>> >> >> > >> > > >> >> > -- Guozhang
>> >> >> > >> > > >> >>
>> >> >> > >> > > >> >
>> >> >> > >> > > >> >
>> >> >> > >> > > >> >
>> >> >> > >> > > >> > --
>> >> >> > >> > > >> > -- Guozhang
>> >> >> > >> > > >>
>> >> >> > >> > > >
>> >> >> > >> > > >
>> >> >> > >> > > >
>> >> >> > >> > > > --
>> >> >> > >> > > > -- Guozhang
>> >> >> > >> > > >
>> >> >> > >> > >
>> >> >> > >> > >
>> >> >> > >> > >
>> >> >> > >> > > --
>> >> >> > >> > > -- Guozhang
>> >> >> > >> > >
>> >> >> > >> >
>> >> >> > >>
>> >> >> > >>
>> >> >> > >>
>> >> >> > >> --
>> >> >> > >> -- Guozhang
>> >> >> > >>
>> >> >> >
>> >> >>
>> >> >
>> >> >
>> >> >
>> >> > --
>> >> > -- Guozhang
>> >>
>> >
>> >
>> >
>> > --
>> > -- Guozhang
>>

Reply via email to