The offset API is definitely a gap at the moment. I think there were some
problems with the old consumer's API and we wanted to make sure we didn't
make the same mistakes. Unfortunately, I'm not sure anyone has had the time
to give this the attention it needs. Here's a couple JIRAS if you want to
have a look:

https://issues.apache.org/jira/browse/KAFKA-2076
https://issues.apache.org/jira/browse/KAFKA-2500

For now, the workaround above should work. However, you should be able to
skip the call to poll() between the seek since position() will block to get
the new offset.

-Jason

On Mon, Mar 14, 2016 at 2:37 PM, Cody Koeninger <c...@koeninger.org> wrote:

> Sorry, by metadata I also meant the equivalent of the old
> OffsetRequest api, which partitionsFor doesn't give you.  I understand
> why you didn't want to expose the broken "offsets before a certain
> time" api, but I don't understand why equivalent functionality for
> first or last offset isn't available in the new consumer.
>
> For instance, I need to know the log end offset, preferably without
> consuming messages.
>
> So it sounds like you're suggesting something like this:
>
> val kc = KafkaConsumer(...)
> kc.subscribe(somePattern)
>
> while(...) {
>    kc.assignment.foreach(tp => kc.pause(tp))
>    kc.seekToEnd()
>    kc.poll(0)
>    val logEnd = kc.assignment.map(tp => kc.position(tp))
>    ... do something with logEnd
> }
>
> Seems like that will work.
>
> Once timestamp indexing is implemented and an actual getOffsetsBefore
> api could work, are people still going to have to mutate consumer
> position just to get information?  Is this a case of just wanting to
> hide as much as possible, even if that means making some things harder
> than they were with the old simple consumer?
>
>
>
>
>
> On Mon, Mar 14, 2016 at 2:35 PM, Jason Gustafson <ja...@confluent.io>
> wrote:
> > Ah, that makes more sense. I have no idea about the limitations of your
> use
> > case, but maybe you could expose a different interface to users.
> >
> > interface RebalanceListener {
> >   void onPartitionsAssigned(Consumer<K, V> consumer,
> > Collection<TopicPartition> partitions);
> >   void onPartitionsRevoked(Consumer<K, V> consumer,
> > Collection<TopicPartition> partitions);
> > }
> >
> > Then you can adapt it internally in the call to subscribe(). Would that
> > work?
> >
> > Also, we already have partitionsFor() to fetch topic metadata. And you
> can
> > look at the pause/resume API which lets you call poll() without consuming
> > messages. We'd probably want to understand why those are insufficient
> > before considering new APIs.
> >
> > -Jason
> >
> > On Mon, Mar 14, 2016 at 12:17 PM, Cody Koeninger <c...@koeninger.org>
> wrote:
> >
> >> Regarding the rebalance listener, in the case of the spark
> >> integration, it is possible a job can fail and be restarted from
> >> checkpoint in a new jvm.  That means that you need to be able to
> >> reconstruct objects.  Any reasonable rebalance listener can't have a
> >> 0-arg constructor, because it needs a reference to the consumer in
> >> order to do anything useful.  I can get around this by making the user
> >> provide a 0-arg function to return a fully configured + subscribed
> >> Kafka consumer, so that they get to do the dance of constructing a
> >> consumer before constructing the listener.  I don't think the current
> >> rebalance listener is a dealbreaker, I just think it is an
> >> unnecessarily awkward API.
> >>
> >> Regarding poll(), I honestly don't see what the problem is with
> >> keeping poll() behavior as is, but also having public methods to e.g.
> >> fetch metadata, for people that need that functionality without
> >> actually consuming messages.
> >>
> >> On Mon, Mar 14, 2016 at 1:01 PM, Jason Gustafson <ja...@confluent.io>
> >> wrote:
> >> > Late arrival to this discussion. I'm not really sure I see the problem
> >> with
> >> > accessing the consumer in the rebalance listener. Before we passed the
> >> > consumer instance as a separate argument, but that was only because
> the
> >> > rebalance listener had to be passed by classname before a reference to
> >> the
> >> > consumer was available. After we changed the API to pass it in
> >> subscribe()
> >> > instead, getting a reference to the consumer shouldn't be a problem.
> >> Maybe
> >> > I'm missing something?
> >> >
> >> > As for poll() doing everything... It's definitely caused some
> confusion,
> >> > but I'm a little doubtful that trying to split out the functionality
> is
> >> > going to help as much as people think. A heartbeat() API has come up
> >> > several times, for example, but you have to figure out how it's going
> to
> >> > affect rebalancing. And rebalancing affects which partitions need to
> be
> >> > fetched or can be committed. They're all so intertwined that I'm not
> sure
> >> > you can divide them up without creating a bigger problem than the one
> >> > you're trying to solve. At this point, with 0.10 shortly on the way,
> it
> >> > seems unlikely that incompatible changes to the API will be accepted.
> >> > However, if someone can propose a compatible solution which addresses
> >> some
> >> > of the concerns mentioned, we'd love to hear about it!
> >> >
> >> > -Jason
> >> >
> >> > On Mon, Mar 14, 2016 at 9:21 AM, Cody Koeninger <c...@koeninger.org>
> >> wrote:
> >> >
> >> >> Honestly the fact that everything is hidden inside poll() has been
> >> >> confusing people since last year, e.g.
> >> >>
> >> >> https://issues.apache.org/jira/browse/KAFKA-2359
> >> >>
> >> >> I can try to formulate a KIP for this, but it seems clear that I'm
> not
> >> >> the only one giving this feedback, and I may not understand all the
> >> >> other use cases that have been brought up.
> >> >>
> >> >> On Sun, Mar 13, 2016 at 3:09 PM, Guozhang Wang <wangg...@gmail.com>
> >> wrote:
> >> >> > Cody,
> >> >> >
> >> >> > We do not have an umbrella JIRA for this, but rather a case-by-case
> >> JIRA
> >> >> > ticket / KIP for API changes in consumer.
> >> >> >
> >> >> > If you feel strong about some specific change on the consumer API,
> >> please
> >> >> > feel free to create a new KIP with the detailed motivation and
> >> proposed
> >> >> > modifications.
> >> >> >
> >> >> > Guozhang
> >> >> >
> >> >> > On Fri, Mar 11, 2016 at 12:28 PM, Cody Koeninger <
> c...@koeninger.org>
> >> >> wrote:
> >> >> >
> >> >> >> Is there a KIP or Jira related to " working on improving these
> cases
> >> >> >> with improved APIs " ?
> >> >> >>
> >> >> >> I saw that there was some discussion of it in KIP-41, but that
> seemed
> >> >> >> to have been resolved in favor of keeping everything inside of
> poll()
> >> >> >>
> >> >> >> On Fri, Mar 11, 2016 at 11:17 AM, Guozhang Wang <
> wangg...@gmail.com>
> >> >> >> wrote:
> >> >> >> > Cody, Mansi:
> >> >> >> >
> >> >> >> > All good points! Let me try to answer them one-by-one.
> >> >> >> >
> >> >> >> > About this specific issue, as I suggested in the JIRA we can
> >> separate
> >> >> the
> >> >> >> > case about resetting offset upon initializing a partition to
> fetch,
> >> >> from
> >> >> >> > the case that fetching offset out-of-range in the
> auto.offset.reset
> >> >> >> config.
> >> >> >> > These two scenarios are indeed quite different and it's
> reasonable
> >> >> >> treating
> >> >> >> > them differently.
> >> >> >> >
> >> >> >> > About passing a consumer context to the rebalance callback's
> >> >> constructor,
> >> >> >> > we left it for user's flexibility: if you want to use Kafka to
> >> commit
> >> >> >> > offsets, for example, then you pass the consumer reference to
> the
> >> >> >> callback;
> >> >> >> > if you use an external service to store offsets, you can pass a
> >> JDBC
> >> >> >> > connector, for example, to the callback; for some data mirroring
> >> you
> >> >> can
> >> >> >> > even pass in another producer client into it. Always enforcing
> the
> >> >> >> consumer
> >> >> >> > context could be convenient (i.e. you do not need to pass in the
> >> >> argument
> >> >> >> > to the constructor yourself) for some use cases, but not
> >> necessarily
> >> >> all.
> >> >> >> >
> >> >> >> > About wrapping coordination protocols (partition assignment,
> >> >> heartbeat)
> >> >> >> > inside "poll()" behind the scene, we implemented the APIs in
> this
> >> way
> >> >> in
> >> >> >> > order to abstract the underlying details from the users, and
> also
> >> to
> >> >> >> > provide a simple "single-thread-poll-loop" design pattern in the
> >> new
> >> >> >> > Consumer. We realized that it does actually make some of the use
> >> cases
> >> >> >> more
> >> >> >> > awkward, and are working on improving these cases with improved
> >> APIs
> >> >> as
> >> >> >> > well. Let us know if you have any suggestions about this.
> >> >> >> >
> >> >> >> > Guozhang
> >> >> >> >
> >> >> >> >
> >> >> >> >
> >> >> >> >
> >> >> >> >
> >> >> >> >
> >> >> >> > On Thu, Mar 10, 2016 at 7:53 AM, Mansi Shah <
> >> mansis...@maprtech.com>
> >> >> >> wrote:
> >> >> >> >
> >> >> >> >> I second the need for having a consumer context passed to
> >> rebalance
> >> >> >> >> callback. I have ran into issues several times because of that.
> >> >> >> >>
> >> >> >> >> About - subscribe vs assign - I have not read through your
> spark
> >> code
> >> >> >> yet
> >> >> >> >> (will do by eod), so I am not sure what you mean (other than I
> do
> >> >> agree
> >> >> >> >> that new partitions should be consumed automatically). I guess
> we
> >> can
> >> >> >> >> continue this discussion on the spark list then :-)
> >> >> >> >>
> >> >> >> >> Thanks
> >> >> >> >> Mansi.
> >> >> >> >>
> >> >> >> >> On Thu, Mar 10, 2016 at 7:43 AM, Cody Koeninger <
> >> c...@koeninger.org>
> >> >> >> >> wrote:
> >> >> >> >>
> >> >> >> >> > Mansi, I'd agree that the fact that everything is tied up in
> >> poll
> >> >> >> >> > seems like the source of the awkward behavior.
> >> >> >> >> >
> >> >> >> >> > Regarding assign vs subscribe, most people using the spark
> >> >> integration
> >> >> >> >> > are just going to want to provide a topic name, not go figure
> >> out a
> >> >> >> >> > bunch of partitions.  They're also going to be surprised if
> >> things
> >> >> >> >> > suddenly blow up once a partition is added, or that partition
> >> >> doesn't
> >> >> >> >> > start being consumed (we already have that second issue
> today).
> >> >> >> >> >
> >> >> >> >> > Thats why separating the behavior of auto offset reset seems
> >> like
> >> >> the
> >> >> >> >> > best idea I've heard so far.
> >> >> >> >> >
> >> >> >> >> > Consumer rebalance listeners are still probably going to be
> >> >> necessary
> >> >> >> >> > for people who are storing offsets externally.
> >> >> >> >> >
> >> >> >> >> > On Thu, Mar 10, 2016 at 9:27 AM, Mansi Shah <
> >> >> mansis...@maprtech.com>
> >> >> >> >> > wrote:
> >> >> >> >> > > Guozhang
> >> >> >> >> > >
> >> >> >> >> > > Sorry for joining the party a little late. I have been
> >> thinking
> >> >> >> about
> >> >> >> >> > this
> >> >> >> >> > > whole awkward behavior of having to call poll(0) to
> actually
> >> make
> >> >> >> the
> >> >> >> >> > > underlying subscriptions take effect. Is the core reason
> for
> >> this
> >> >> >> >> design
> >> >> >> >> > > the fact that poll is also the actual heartbeat and you
> want
> >> to
> >> >> make
> >> >> >> >> the
> >> >> >> >> > > listener group assignments through poll - so that timeouts
> and
> >> >> >> >> > > reassignments can all go through poll? So I think clubbing
> >> >> liveness
> >> >> >> >> with
> >> >> >> >> > > poll (which in effect clubs consumer group assignments and
> >> hence
> >> >> >> >> metadata
> >> >> >> >> > > fetch with poll) is the real cause of this design. Were
> there
> >> >> issues
> >> >> >> >> > where
> >> >> >> >> > > you were seeing active consumers not calling poll that led
> to
> >> >> this
> >> >> >> >> design
> >> >> >> >> > > choice? I tried to look for a relevant JIRA but could not
> find
> >> >> one -
> >> >> >> >> can
> >> >> >> >> > > you please point me to something if you have it handy?
> >> >> >> >> > >
> >> >> >> >> > > Btw this would also means that your proposal to do the
> actual
> >> >> >> >> assignments
> >> >> >> >> > > through seek might not be ideal since there can still be
> >> >> indefinite
> >> >> >> >> time
> >> >> >> >> > > between seek and poll (just like between subscribe and
> poll)
> >> and
> >> >> the
> >> >> >> >> > > consumer could timeout even before the first poll is
> called?
> >> >> >> >> > >
> >> >> >> >> > >
> >> >> >> >> > > @Cody in your case if you really have only one consumer and
> >> it is
> >> >> >> going
> >> >> >> >> > to
> >> >> >> >> > > get all the partitions of the topic anyway - then you
> might as
> >> >> well
> >> >> >> >> > > subscribe using "assign" call instead of "subscribe" call.
> >> That
> >> >> will
> >> >> >> >> make
> >> >> >> >> > > at least your code cleaner and I do not think you are
> gaining
> >> >> >> anything
> >> >> >> >> > with
> >> >> >> >> > > the listener group functionality anyway?
> >> >> >> >> > >
> >> >> >> >> > > - Mansi.
> >> >> >> >> > >
> >> >> >> >> > >
> >> >> >> >> > >
> >> >> >> >> > > On Wed, Mar 9, 2016 at 8:35 PM, Guozhang Wang <
> >> >> wangg...@gmail.com>
> >> >> >> >> > wrote:
> >> >> >> >> > >
> >> >> >> >> > >> In order to do anything meaningful with the consumer
> itself
> >> in
> >> >> >> >> rebalance
> >> >> >> >> > >> callback (e.g. commit offset), you would need to hold on
> the
> >> >> >> consumer
> >> >> >> >> > >> reference; admittedly it sounds a bit awkward, but by
> design
> >> we
> >> >> >> choose
> >> >> >> >> > to
> >> >> >> >> > >> not enforce it in the interface itself.
> >> >> >> >> > >>
> >> >> >> >> > >> Guozhang
> >> >> >> >> > >>
> >> >> >> >> > >> On Wed, Mar 9, 2016 at 3:39 PM, Cody Koeninger <
> >> >> c...@koeninger.org
> >> >> >> >
> >> >> >> >> > wrote:
> >> >> >> >> > >>
> >> >> >> >> > >> > So what about my comments regarding the consumer
> rebalance
> >> >> >> listener
> >> >> >> >> > >> > interface not providing access to a consumer?  I can
> >> probably
> >> >> >> work
> >> >> >> >> > around
> >> >> >> >> > >> > it, but it seems odd.
> >> >> >> >> > >> > On Mar 9, 2016 5:04 PM, "Guozhang Wang" <
> >> wangg...@gmail.com>
> >> >> >> wrote:
> >> >> >> >> > >> >
> >> >> >> >> > >> > > One thing proposed by Jason:
> >> >> >> >> > >> > >
> >> >> >> >> > >> > > If you want to only reset offset upon initialization,
> >> and by
> >> >> >> >> > >> > initialization
> >> >> >> >> > >> > > you mean "no committed offset", you can do sth. like
> the
> >> >> >> following
> >> >> >> >> > in
> >> >> >> >> > >> > > rebalance callback.
> >> >> >> >> > >> > >
> >> >> >> >> > >> > >                 @Override
> >> >> >> >> > >> > >
> >> >> >> >> > >> > >                 public void
> >> >> >> >> > >> > onPartitionsAssigned(Collection<TopicPartition>
> >> >> >> >> > >> > > partitions) {
> >> >> >> >> > >> > >
> >> >> >> >> > >> > >                     for (TopicPartition partition :
> >> >> partitions)
> >> >> >> >> > >> > >
> >> >> >> >> > >> > >                         if
> >> (consumer.committed(partition) ==
> >> >> >> null)
> >> >> >> >> > >> > >
> >> >> >> >> > >> > >
> >> >> >>  consumer.seekToBeginning(partition);
> >> >> >> >> > >> > >
> >> >> >> >> > >> > >                 }
> >> >> >> >> > >> > >
> >> >> >> >> > >> > >
> >> >> >> >> > >> > > Guozhang
> >> >> >> >> > >> > >
> >> >> >> >> > >> > > On Wed, Mar 9, 2016 at 2:11 PM, Guozhang Wang <
> >> >> >> wangg...@gmail.com
> >> >> >> >> >
> >> >> >> >> > >> > wrote:
> >> >> >> >> > >> > >
> >> >> >> >> > >> > > > Filed
> https://issues.apache.org/jira/browse/KAFKA-3370
> >> .
> >> >> >> >> > >> > > >
> >> >> >> >> > >> > > > On Wed, Mar 9, 2016 at 1:11 PM, Cody Koeninger <
> >> >> >> >> > c...@koeninger.org>
> >> >> >> >> > >> > > wrote:
> >> >> >> >> > >> > > >
> >> >> >> >> > >> > > >> That sounds like an interesting way of addressing
> the
> >> >> >> problem,
> >> >> >> >> > can
> >> >> >> >> > >> > > >> continue further discussions on the JIRA
> >> >> >> >> > >> > > >>
> >> >> >> >> > >> > > >>
> >> >> >> >> > >> > > >>
> >> >> >> >> > >> > > >> On Wed, Mar 9, 2016 at 2:59 PM, Guozhang Wang <
> >> >> >> >> > wangg...@gmail.com>
> >> >> >> >> > >> > > wrote:
> >> >> >> >> > >> > > >> > Cody:
> >> >> >> >> > >> > > >> >
> >> >> >> >> > >> > > >> > More specifically, you do not need the
> "listTopics"
> >> >> >> function
> >> >> >> >> if
> >> >> >> >> > >> you
> >> >> >> >> > >> > > >> already
> >> >> >> >> > >> > > >> > know your subscribed topics, just use
> >> "partitionsFor"
> >> >> is
> >> >> >> >> > >> sufficient.
> >> >> >> >> > >> > > >> >
> >> >> >> >> > >> > > >> > About the fix, I'm thinking of adding two more
> >> options
> >> >> in
> >> >> >> the
> >> >> >> >> > >> > > >> > auto.offset.rest, say namely "earliest-on-start"
> and
> >> >> >> >> > >> > > "latest-on-start",
> >> >> >> >> > >> > > >> > which sets the reset position ONLY at starting
> up.
> >> The
> >> >> >> reason
> >> >> >> >> > is
> >> >> >> >> > >> > that
> >> >> >> >> > >> > > >> the
> >> >> >> >> > >> > > >> > seekToXX was actually not designed to do such
> >> >> >> initialization
> >> >> >> >> > but
> >> >> >> >> > >> for
> >> >> >> >> > >> > > >> > calling during the lifetime of the consumer, and
> >> we'd
> >> >> >> better
> >> >> >> >> > >> provide
> >> >> >> >> > >> > > the
> >> >> >> >> > >> > > >> > right solution to do so.
> >> >> >> >> > >> > > >> >
> >> >> >> >> > >> > > >> > I can file the JIRA right away and start further
> >> >> >> discussions
> >> >> >> >> > >> there.
> >> >> >> >> > >> > > But
> >> >> >> >> > >> > > >> let
> >> >> >> >> > >> > > >> > me know if you have any other ideas.
> >> >> >> >> > >> > > >> >
> >> >> >> >> > >> > > >> > Guozhang
> >> >> >> >> > >> > > >> >
> >> >> >> >> > >> > > >> > On Wed, Mar 9, 2016 at 12:25 PM, Cody Koeninger <
> >> >> >> >> > >> c...@koeninger.org
> >> >> >> >> > >> > >
> >> >> >> >> > >> > > >> wrote:
> >> >> >> >> > >> > > >> >
> >> >> >> >> > >> > > >> >> Yeah, I think I understood what you were saying.
> >> What
> >> >> >> I'm
> >> >> >> >> > saying
> >> >> >> >> > >> > is
> >> >> >> >> > >> > > >> >> that if there were a way to just fetch metadata
> >> >> without
> >> >> >> >> doing
> >> >> >> >> > the
> >> >> >> >> > >> > > rest
> >> >> >> >> > >> > > >> >> of the work poll() does, it wouldn't be
> >> necessary.  I
> >> >> >> guess
> >> >> >> >> I
> >> >> >> >> > can
> >> >> >> >> > >> > do
> >> >> >> >> > >> > > >> >> listTopics to get all metadata for all topics
> and
> >> then
> >> >> >> parse
> >> >> >> >> > it.
> >> >> >> >> > >> > > >> >>
> >> >> >> >> > >> > > >> >> Regarding running a single instance, that is the
> >> case
> >> >> for
> >> >> >> >> what
> >> >> >> >> > >> I'm
> >> >> >> >> > >> > > >> >> talking about.
> >> >> >> >> > >> > > >> >>
> >> >> >> >> > >> > > >> >> On Wed, Mar 9, 2016 at 2:02 PM, Guozhang Wang <
> >> >> >> >> > >> wangg...@gmail.com>
> >> >> >> >> > >> > > >> wrote:
> >> >> >> >> > >> > > >> >> > Cody,
> >> >> >> >> > >> > > >> >> >
> >> >> >> >> > >> > > >> >> > What I meant for a special case of `seekToXX`
> is
> >> >> that,
> >> >> >> >> today
> >> >> >> >> > >> when
> >> >> >> >> > >> > > the
> >> >> >> >> > >> > > >> >> > function is called with no partition
> parameters.
> >> It
> >> >> >> will
> >> >> >> >> > try to
> >> >> >> >> > >> > > >> execute
> >> >> >> >> > >> > > >> >> the
> >> >> >> >> > >> > > >> >> > logic on all "assigned" partitions for the
> >> consumer.
> >> >> >> And
> >> >> >> >> > once
> >> >> >> >> > >> > that
> >> >> >> >> > >> > > is
> >> >> >> >> > >> > > >> >> done,
> >> >> >> >> > >> > > >> >> > the subsequent poll() will not throw the
> >> exception
> >> >> >> since
> >> >> >> >> it
> >> >> >> >> > >> knows
> >> >> >> >> > >> > > >> those
> >> >> >> >> > >> > > >> >> > partitions needs to reset offsets.
> >> >> >> >> > >> > > >> >> >
> >> >> >> >> > >> > > >> >> > However for your case, there is no assigned
> >> >> partitions
> >> >> >> >> yet,
> >> >> >> >> > and
> >> >> >> >> > >> > > hence
> >> >> >> >> > >> > > >> >> > `seekToXX` will not take effects on any
> >> partitions.
> >> >> The
> >> >> >> >> > >> > assignment
> >> >> >> >> > >> > > is
> >> >> >> >> > >> > > >> >> > wrapped in the poll() call as you mentioned.
> And
> >> one
> >> >> >> way
> >> >> >> >> to
> >> >> >> >> > >> solve
> >> >> >> >> > >> > > it
> >> >> >> >> > >> > > >> is
> >> >> >> >> > >> > > >> >> to
> >> >> >> >> > >> > > >> >> > let the `seekToXX()` with no parameters do the
> >> >> >> >> coordination
> >> >> >> >> > and
> >> >> >> >> > >> > get
> >> >> >> >> > >> > > >> the
> >> >> >> >> > >> > > >> >> > assigned partitions if there are any
> subscribed
> >> >> >> topics, so
> >> >> >> >> > that
> >> >> >> >> > >> > the
> >> >> >> >> > >> > > >> >> > subsequent poll() will know those partitions
> need
> >> >> >> >> resetting
> >> >> >> >> > >> > > offsets.
> >> >> >> >> > >> > > >> Does
> >> >> >> >> > >> > > >> >> > that make sense?
> >> >> >> >> > >> > > >> >> >
> >> >> >> >> > >> > > >> >> > As for now another way I can think of is to
> get
> >> the
> >> >> >> >> > partition
> >> >> >> >> > >> > info
> >> >> >> >> > >> > > >> >> > beforehand and call `seekToBeginning` on all
> >> >> >> partitions.
> >> >> >> >> But
> >> >> >> >> > >> that
> >> >> >> >> > >> > > >> only
> >> >> >> >> > >> > > >> >> > works if the consumer knows it is going to get
> >> all
> >> >> the
> >> >> >> >> > >> partitions
> >> >> >> >> > >> > > >> >> assigned
> >> >> >> >> > >> > > >> >> > to itself (i.e. you are only running a single
> >> >> >> instance).
> >> >> >> >> > >> > > >> >> >
> >> >> >> >> > >> > > >> >> > Guozhang
> >> >> >> >> > >> > > >> >> >
> >> >> >> >> > >> > > >> >> >
> >> >> >> >> > >> > > >> >> > On Wed, Mar 9, 2016 at 6:22 AM, Cody
> Koeninger <
> >> >> >> >> > >> > c...@koeninger.org
> >> >> >> >> > >> > > >
> >> >> >> >> > >> > > >> >> wrote:
> >> >> >> >> > >> > > >> >> >
> >> >> >> >> > >> > > >> >> >> Another unfortunate thing about
> >> >> >> ConsumerRebalanceListener
> >> >> >> >> > is
> >> >> >> >> > >> > that
> >> >> >> >> > >> > > in
> >> >> >> >> > >> > > >> >> >> order to do meaningful work in the callback,
> you
> >> >> need
> >> >> >> a
> >> >> >> >> > >> > reference
> >> >> >> >> > >> > > to
> >> >> >> >> > >> > > >> >> >> the consumer that called it.  But that
> reference
> >> >> isn't
> >> >> >> >> > >> provided
> >> >> >> >> > >> > to
> >> >> >> >> > >> > > >> the
> >> >> >> >> > >> > > >> >> >> callback, which means the listener
> >> implementation
> >> >> >> needs
> >> >> >> >> to
> >> >> >> >> > >> hold
> >> >> >> >> > >> > a
> >> >> >> >> > >> > > >> >> >> reference to the consumer.  Seems like this
> >> makes
> >> >> it
> >> >> >> >> > >> > unnecessarily
> >> >> >> >> > >> > > >> >> >> awkward to serialize or provide a 0 arg
> >> constructor
> >> >> >> for
> >> >> >> >> the
> >> >> >> >> > >> > > >> listener.
> >> >> >> >> > >> > > >> >> >>
> >> >> >> >> > >> > > >> >> >> On Wed, Mar 9, 2016 at 7:28 AM, Cody
> Koeninger <
> >> >> >> >> > >> > > c...@koeninger.org>
> >> >> >> >> > >> > > >> >> wrote:
> >> >> >> >> > >> > > >> >> >> > I thought about ConsumerRebalanceListener,
> but
> >> >> >> seeking
> >> >> >> >> to
> >> >> >> >> > >> the
> >> >> >> >> > >> > > >> >> >> > beginning any time there's a rebalance for
> >> >> whatever
> >> >> >> >> > reason
> >> >> >> >> > >> is
> >> >> >> >> > >> > > not
> >> >> >> >> > >> > > >> >> >> > necessarily the same thing as seeking to
> the
> >> >> >> beginning
> >> >> >> >> > >> before
> >> >> >> >> > >> > > >> first
> >> >> >> >> > >> > > >> >> >> > starting the consumer.
> >> >> >> >> > >> > > >> >> >> >
> >> >> >> >> > >> > > >> >> >> > On Wed, Mar 9, 2016 at 2:24 AM, Kamal C <
> >> >> >> >> > >> > kamaltar...@gmail.com>
> >> >> >> >> > >> > > >> >> wrote:
> >> >> >> >> > >> > > >> >> >> >> Cody,
> >> >> >> >> > >> > > >> >> >> >>
> >> >> >> >> > >> > > >> >> >> >> Use ConsumerRebalanceListener to achieve
> >> that,
> >> >> >> >> > >> > > >> >> >> >>
> >> >> >> >> > >> > > >> >> >> >> ConsumerRebalanceListener listener = new
> >> >> >> >> > >> > > >> ConsumerRebalanceListener()
> >> >> >> >> > >> > > >> >> {
> >> >> >> >> > >> > > >> >> >> >>
> >> >> >> >> > >> > > >> >> >> >>             @Override
> >> >> >> >> > >> > > >> >> >> >>             public void
> >> >> >> >> > >> > > >> >> onPartitionsRevoked(Collection<TopicPartition>
> >> >> >> >> > >> > > >> >> >> >> partitions) {
> >> >> >> >> > >> > > >> >> >> >>             }
> >> >> >> >> > >> > > >> >> >> >>
> >> >> >> >> > >> > > >> >> >> >>             @Override
> >> >> >> >> > >> > > >> >> >> >>             public void
> >> >> >> >> > >> > > >> >> onPartitionsAssigned(Collection<TopicPartition>
> >> >> >> >> > >> > > >> >> >> >> partitions) {
> >> >> >> >> > >> > > >> >> >> >>
> >> >> >> >> > >> >  consumer.seekToBeginning(partitions.toArray(new
> >> >> >> >> > >> > > >> >> >> >> TopicPartition[0]));
> >> >> >> >> > >> > > >> >> >> >>             }
> >> >> >> >> > >> > > >> >> >> >>         };
> >> >> >> >> > >> > > >> >> >> >>
> >> >> >> >> > >> > > >> >> >> >> consumer.subscribe(topics, listener);
> >> >> >> >> > >> > > >> >> >> >>
> >> >> >> >> > >> > > >> >> >> >> On Wed, Mar 9, 2016 at 12:05 PM, Cody
> >> Koeninger
> >> >> <
> >> >> >> >> > >> > > >> c...@koeninger.org>
> >> >> >> >> > >> > > >> >> >> wrote:
> >> >> >> >> > >> > > >> >> >> >>
> >> >> >> >> > >> > > >> >> >> >>> That suggestion doesn't work, for pretty
> >> much
> >> >> the
> >> >> >> >> same
> >> >> >> >> > >> > reason
> >> >> >> >> > >> > > -
> >> >> >> >> > >> > > >> at
> >> >> >> >> > >> > > >> >> the
> >> >> >> >> > >> > > >> >> >> >>> time poll is first called, there is no
> reset
> >> >> >> policy
> >> >> >> >> > and no
> >> >> >> >> > >> > > >> committed
> >> >> >> >> > >> > > >> >> >> >>> offset, so NoOffsetForPartitionException
> is
> >> >> thrown
> >> >> >> >> > >> > > >> >> >> >>>
> >> >> >> >> > >> > > >> >> >> >>> I feel like the underlying problem isn't
> so
> >> >> much
> >> >> >> that
> >> >> >> >> > >> > > seekToEnd
> >> >> >> >> > >> > > >> >> needs
> >> >> >> >> > >> > > >> >> >> >>> special case behavior.  It's more that
> >> topic
> >> >> >> >> metadata
> >> >> >> >> > >> > > fetches,
> >> >> >> >> > >> > > >> >> >> >>> consumer position fetches, and message
> >> fetches
> >> >> are
> >> >> >> >> all
> >> >> >> >> > >> > lumped
> >> >> >> >> > >> > > >> >> together
> >> >> >> >> > >> > > >> >> >> >>> under a single poll() call, with no way
> to
> >> do
> >> >> them
> >> >> >> >> > >> > > individually
> >> >> >> >> > >> > > >> if
> >> >> >> >> > >> > > >> >> >> >>> necessary.
> >> >> >> >> > >> > > >> >> >> >>>
> >> >> >> >> > >> > > >> >> >> >>> What does "work" in this situation is to
> >> just
> >> >> >> catch
> >> >> >> >> the
> >> >> >> >> > >> > > >> exception
> >> >> >> >> > >> > > >> >> >> >>> (which leaves the consumer in a state
> where
> >> >> topics
> >> >> >> >> are
> >> >> >> >> > >> > > >> assigned) and
> >> >> >> >> > >> > > >> >> >> >>> then seek.  But that is not exactly an
> >> elegant
> >> >> >> >> > interface.
> >> >> >> >> > >> > > >> >> >> >>>
> >> >> >> >> > >> > > >> >> >> >>>     consumer.subscribe(topics)
> >> >> >> >> > >> > > >> >> >> >>>     try {
> >> >> >> >> > >> > > >> >> >> >>>       consumer.poll(0)
> >> >> >> >> > >> > > >> >> >> >>>     } catch {
> >> >> >> >> > >> > > >> >> >> >>>       case x: Throwable =>
> >> >> >> >> > >> > > >> >> >> >>>     }
> >> >> >> >> > >> > > >> >> >> >>>     consumer.seekToBeginning()
> >> >> >> >> > >> > > >> >> >> >>>     consumer.poll(0)
> >> >> >> >> > >> > > >> >> >> >>>
> >> >> >> >> > >> > > >> >> >> >>>
> >> >> >> >> > >> > > >> >> >> >>>
> >> >> >> >> > >> > > >> >> >> >>>
> >> >> >> >> > >> > > >> >> >> >>> On Tue, Mar 8, 2016 at 11:22 PM, Guozhang
> >> Wang
> >> >> <
> >> >> >> >> > >> > > >> wangg...@gmail.com>
> >> >> >> >> > >> > > >> >> >> wrote:
> >> >> >> >> > >> > > >> >> >> >>> > Hi Cody,
> >> >> >> >> > >> > > >> >> >> >>> >
> >> >> >> >> > >> > > >> >> >> >>> > The problem with that code is in
> >> >> >> >> `seekToBeginning()`
> >> >> >> >> > >> > > followed
> >> >> >> >> > >> > > >> by
> >> >> >> >> > >> > > >> >> >> >>> > `subscribe(topic)`.
> >> >> >> >> > >> > > >> >> >> >>> >
> >> >> >> >> > >> > > >> >> >> >>> > Since `subscribe` call is lazy
> evaluated,
> >> by
> >> >> the
> >> >> >> >> time
> >> >> >> >> > >> > > >> >> >> `seekToBeginning()`
> >> >> >> >> > >> > > >> >> >> >>> > is called no partition is assigned yet,
> >> and
> >> >> >> hence
> >> >> >> >> it
> >> >> >> >> > is
> >> >> >> >> > >> > > >> >> effectively
> >> >> >> >> > >> > > >> >> >> an
> >> >> >> >> > >> > > >> >> >> >>> > no-op.
> >> >> >> >> > >> > > >> >> >> >>> >
> >> >> >> >> > >> > > >> >> >> >>> > Try
> >> >> >> >> > >> > > >> >> >> >>> >
> >> >> >> >> > >> > > >> >> >> >>> >     consumer.subscribe(topics)
> >> >> >> >> > >> > > >> >> >> >>> >     consumer.poll(0);  // get assigned
> >> >> >> partitions
> >> >> >> >> > >> > > >> >> >> >>> >     consumer.seekToBeginning()
> >> >> >> >> > >> > > >> >> >> >>> >     consumer.poll(0)
> >> >> >> >> > >> > > >> >> >> >>> >
> >> >> >> >> > >> > > >> >> >> >>> > to see if that works.
> >> >> >> >> > >> > > >> >> >> >>> >
> >> >> >> >> > >> > > >> >> >> >>> > I think it is a valid issue that can be
> >> >> fixed in
> >> >> >> >> the
> >> >> >> >> > new
> >> >> >> >> > >> > > >> consumer
> >> >> >> >> > >> > > >> >> >> that,
> >> >> >> >> > >> > > >> >> >> >>> > upon calling seekToEnd/Beginning with
> no
> >> >> >> parameter,
> >> >> >> >> > >> while
> >> >> >> >> > >> > no
> >> >> >> >> > >> > > >> >> >> assigned is
> >> >> >> >> > >> > > >> >> >> >>> > done yet, do the coordination behind
> the
> >> >> scene;
> >> >> >> it
> >> >> >> >> > will
> >> >> >> >> > >> > > though
> >> >> >> >> > >> > > >> >> >> change the
> >> >> >> >> > >> > > >> >> >> >>> > behavior of the functions as they are
> no
> >> >> longer
> >> >> >> >> > always
> >> >> >> >> > >> > > lazily
> >> >> >> >> > >> > > >> >> >> evaluated.
> >> >> >> >> > >> > > >> >> >> >>> >
> >> >> >> >> > >> > > >> >> >> >>> >
> >> >> >> >> > >> > > >> >> >> >>> > Guozhang
> >> >> >> >> > >> > > >> >> >> >>> >
> >> >> >> >> > >> > > >> >> >> >>> >
> >> >> >> >> > >> > > >> >> >> >>> > On Tue, Mar 8, 2016 at 2:08 PM, Cody
> >> >> Koeninger <
> >> >> >> >> > >> > > >> >> c...@koeninger.org>
> >> >> >> >> > >> > > >> >> >> >>> wrote:
> >> >> >> >> > >> > > >> >> >> >>> >
> >> >> >> >> > >> > > >> >> >> >>> >> Using the 0.9 consumer, I would like
> to
> >> >> start
> >> >> >> >> > consuming
> >> >> >> >> > >> > at
> >> >> >> >> > >> > > >> the
> >> >> >> >> > >> > > >> >> >> >>> >> beginning or end, without specifying
> >> >> >> >> > auto.offset.reset.
> >> >> >> >> > >> > > >> >> >> >>> >>
> >> >> >> >> > >> > > >> >> >> >>> >> This does not seem to be possible:
> >> >> >> >> > >> > > >> >> >> >>> >>
> >> >> >> >> > >> > > >> >> >> >>> >>     val kafkaParams = Map[String,
> >> Object](
> >> >> >> >> > >> > > >> >> >> >>> >>       "bootstrap.servers" ->
> >> >> >> >> > >> > > conf.getString("kafka.brokers"),
> >> >> >> >> > >> > > >> >> >> >>> >>       "key.deserializer" ->
> >> >> >> >> > >> classOf[StringDeserializer],
> >> >> >> >> > >> > > >> >> >> >>> >>       "value.deserializer" ->
> >> >> >> >> > >> > classOf[StringDeserializer],
> >> >> >> >> > >> > > >> >> >> >>> >>       "group.id" -> "example",
> >> >> >> >> > >> > > >> >> >> >>> >>       "auto.offset.reset" -> "none"
> >> >> >> >> > >> > > >> >> >> >>> >>     ).asJava
> >> >> >> >> > >> > > >> >> >> >>> >>     val topics =
> >> >> >> >> > >> > > >> >> >>
> >> >> >> conf.getString("kafka.topics").split(",").toList.asJava
> >> >> >> >> > >> > > >> >> >> >>> >>     val consumer = new
> >> KafkaConsumer[String,
> >> >> >> >> > >> > > >> String](kafkaParams)
> >> >> >> >> > >> > > >> >> >> >>> >>     consumer.subscribe(topics)
> >> >> >> >> > >> > > >> >> >> >>> >>     consumer.seekToBeginning()
> >> >> >> >> > >> > > >> >> >> >>> >>     consumer.poll(0)
> >> >> >> >> > >> > > >> >> >> >>> >>
> >> >> >> >> > >> > > >> >> >> >>> >>
> >> >> >> >> > >> > > >> >> >> >>> >> Results in:
> >> >> >> >> > >> > > >> >> >> >>> >>
> >> >> >> >> > >> > > >> >> >> >>> >> Exception in thread "main"
> >> >> >> >> > >> > > >> >> >> >>> >>
> >> >> >> >> > >> > > >>
> >> >> >> >>
> org.apache.kafka.clients.consumer.NoOffsetForPartitionException:
> >> >> >> >> > >> > > >> >> >> >>> >> Undefined offset with no reset policy
> for
> >> >> >> >> partition:
> >> >> >> >> > >> > > >> testtwo-4
> >> >> >> >> > >> > > >> >> >> >>> >>         at
> >> >> >> >> > >> > > >> >> >> >>> >>
> >> >> >> >> > >> > > >> >> >> >>>
> >> >> >> >> > >> > > >> >> >>
> >> >> >> >> > >> > > >> >>
> >> >> >> >> > >> > > >>
> >> >> >> >> > >> > >
> >> >> >> >> > >> >
> >> >> >> >> > >>
> >> >> >> >> >
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:288)
> >> >> >> >> > >> > > >> >> >> >>> >>         at
> >> >> >> >> > >> > > >> >> >> >>> >>
> >> >> >> >> > >> > > >> >> >> >>>
> >> >> >> >> > >> > > >> >> >>
> >> >> >> >> > >> > > >> >>
> >> >> >> >> > >> > > >>
> >> >> >> >> > >> > >
> >> >> >> >> > >> >
> >> >> >> >> > >>
> >> >> >> >> >
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:167)
> >> >> >> >> > >> > > >> >> >> >>> >>         at
> >> >> >> >> > >> > > >> >> >> >>> >>
> >> >> >> >> > >> > > >> >> >> >>>
> >> >> >> >> > >> > > >> >> >>
> >> >> >> >> > >> > > >> >>
> >> >> >> >> > >> > > >>
> >> >> >> >> > >> > >
> >> >> >> >> > >> >
> >> >> >> >> > >>
> >> >> >> >> >
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1302)
> >> >> >> >> > >> > > >> >> >> >>> >>         at
> >> >> >> >> > >> > > >> >> >> >>> >>
> >> >> >> >> > >> > > >> >> >> >>>
> >> >> >> >> > >> > > >> >> >>
> >> >> >> >> > >> > > >> >>
> >> >> >> >> > >> > > >>
> >> >> >> >> > >> > >
> >> >> >> >> > >> >
> >> >> >> >> > >>
> >> >> >> >> >
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:895)
> >> >> >> >> > >> > > >> >> >> >>> >>         at
> >> >> >> >> > >> > > >> >> >> >>> >>
> >> >> >> >> > >> > > >> >> >> >>>
> >> >> >> >> > >> > > >> >> >>
> >> >> >> >> > >> > > >> >>
> >> >> >> >> > >> > > >>
> >> >> >> >> > >> > >
> >> >> >> >> > >> >
> >> >> >> >> > >>
> >> >> >> >> >
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
> >> >> >> >> > >> > > >> >> >> >>> >>         at
> >> >> >> >> > >> > > >> >> >>
> >> >> >> >> > example.BasicKafkaConsumer$.main(BasicKafkaConsumer.scala:25)
> >> >> >> >> > >> > > >> >> >> >>> >>
> >> >> >> >> > >> > > >> >> >> >>> >>
> >> >> >> >> > >> > > >> >> >> >>> >> I'm assuming this is because, at the
> time
> >> >> >> >> > >> > seekToBeginning()
> >> >> >> >> > >> > > >> is
> >> >> >> >> > >> > > >> >> >> called,
> >> >> >> >> > >> > > >> >> >> >>> >> subscriptions.assignedPartitions isn't
> >> >> >> populated.
> >> >> >> >> > But
> >> >> >> >> > >> > > >> polling in
> >> >> >> >> > >> > > >> >> >> >>> >> order to assign topicpartitions
> results
> >> in
> >> >> an
> >> >> >> >> error,
> >> >> >> >> > >> > which
> >> >> >> >> > >> > > >> >> creates a
> >> >> >> >> > >> > > >> >> >> >>> >> chicken-or-the-egg situation.
> >> >> >> >> > >> > > >> >> >> >>> >>
> >> >> >> >> > >> > > >> >> >> >>> >> I don't want to set auto.offset.reset,
> >> >> because
> >> >> >> I
> >> >> >> >> > want a
> >> >> >> >> > >> > > hard
> >> >> >> >> > >> > > >> >> error
> >> >> >> >> > >> > > >> >> >> if
> >> >> >> >> > >> > > >> >> >> >>> >> the offsets are out of range at any
> other
> >> >> time
> >> >> >> >> > during
> >> >> >> >> > >> > > >> >> consumption.
> >> >> >> >> > >> > > >> >> >> >>> >>
> >> >> >> >> > >> > > >> >> >> >>> >
> >> >> >> >> > >> > > >> >> >> >>> >
> >> >> >> >> > >> > > >> >> >> >>> >
> >> >> >> >> > >> > > >> >> >> >>> > --
> >> >> >> >> > >> > > >> >> >> >>> > -- Guozhang
> >> >> >> >> > >> > > >> >> >> >>>
> >> >> >> >> > >> > > >> >> >>
> >> >> >> >> > >> > > >> >> >
> >> >> >> >> > >> > > >> >> >
> >> >> >> >> > >> > > >> >> >
> >> >> >> >> > >> > > >> >> > --
> >> >> >> >> > >> > > >> >> > -- Guozhang
> >> >> >> >> > >> > > >> >>
> >> >> >> >> > >> > > >> >
> >> >> >> >> > >> > > >> >
> >> >> >> >> > >> > > >> >
> >> >> >> >> > >> > > >> > --
> >> >> >> >> > >> > > >> > -- Guozhang
> >> >> >> >> > >> > > >>
> >> >> >> >> > >> > > >
> >> >> >> >> > >> > > >
> >> >> >> >> > >> > > >
> >> >> >> >> > >> > > > --
> >> >> >> >> > >> > > > -- Guozhang
> >> >> >> >> > >> > > >
> >> >> >> >> > >> > >
> >> >> >> >> > >> > >
> >> >> >> >> > >> > >
> >> >> >> >> > >> > > --
> >> >> >> >> > >> > > -- Guozhang
> >> >> >> >> > >> > >
> >> >> >> >> > >> >
> >> >> >> >> > >>
> >> >> >> >> > >>
> >> >> >> >> > >>
> >> >> >> >> > >> --
> >> >> >> >> > >> -- Guozhang
> >> >> >> >> > >>
> >> >> >> >> >
> >> >> >> >>
> >> >> >> >
> >> >> >> >
> >> >> >> >
> >> >> >> > --
> >> >> >> > -- Guozhang
> >> >> >>
> >> >> >
> >> >> >
> >> >> >
> >> >> > --
> >> >> > -- Guozhang
> >> >>
> >>
>
>

Reply via email to