Hello Sophie, thanks for brining up this KIP, and the great write-up
summarizing the motivations of the proposal. Here are some comments:

Minor:

1. If we want to make it a blocking call (I have some thoughts about this
below :), to be consistent we need to consider having two overloaded
function, one without the timeout which then relies on `
DEFAULT_API_TIMEOUT_MS_CONFIG`.

2. Also I'd suggest that, again for API consistency, we a) throw
TimeoutException if the operation cannot be completed within the timeout
value, b) return false immediately if we cannot trigger a rebalance either
because coordinator is unknown.

Meta:

3. I'm not sure if we have a concrete scenario that we want to wait until
the rebalance is completed in KIP-441 / 268, rather than calling
"consumer.enforceRebalance(); consumer.poll()" consecutively and try to
execute the rebalance in the poll call? If there's no valid motivations I'm
still a bit inclined to make it non-blocking (i.e. just setting a bit and
then execute the process in the later poll call) similar to our `seek`
functions. By doing this we can also make this function simpler as it would
never throw RebalanceInProgress or Timeout or even KafkaExceptions.

4. Re: the case "when a rebalance is already in progress", this may be
related to 3) above. I think we can simplify this case as well but just not
triggering a new rebalance and let the the caller handle it: for example in
KIP-441, in each iteration of the stream thread, we can if a standby task
is ready, and if yes we call `enforceRebalance`, if there's already a
rebalance in progress (either with the new subscription metadata, or not)
this call would be a no-op, and then in the next iteration we would just
call that function again, and eventually we would trigger the rebalance
with the new subscription metadata and previous calls would be no-op and
hence no cost anyways. I feel this would be simpler than letting the caller
to capture RebalanceInProgressException:


mainProcessingLoop() {
    if (needsRebalance) {
        consumer.enforceRebalance();
    }

    records = consumer.poll();
    ...
    // do some processing
}

RebalanceListener {

   onPartitionsAssigned(...) {
      if (rebalanceGoalAchieved()) {
        needsRebalance = false;
      }
    }
}


WDYT?




On Tue, Feb 11, 2020 at 3:59 PM Sophie Blee-Goldman <sop...@confluent.io>
wrote:

> Hey Boyang,
>
> Originally I had it as a nonblocking call, but decided to change it to
> blocking
> with a timeout parameter. I'm not sure a future makes sense to return here,
> because the rebalance either does or does not complete within the timeout:
> if it does not, you will have to call poll again to complete it (as is the
> case with
> any other rebalance). I'll call this out in the javadocs as well.
>
> I also added an example demonstrating how/when to use this new API.
>
> Thanks!
>
> On Tue, Feb 11, 2020 at 1:49 PM Boyang Chen <reluctanthero...@gmail.com>
> wrote:
>
> > Hey Sophie,
> >
> > is the `enforceRebalance` a blocking call? Could we add a code sample to
> > the KIP on how this API should be used?
> >
> > Returning a future instead of a boolean might be easier as we are
> allowing
> > consumer to make progress during rebalance after 429 IMHO.
> >
> > Boyang
> >
> >
> > On Tue, Feb 11, 2020 at 1:17 PM Konstantine Karantasis <
> > konstant...@confluent.io> wrote:
> >
> > > Thanks for the quick turnaround Sophie. My points have been addressed.
> > > I think the intended use is quite clear now.
> > >
> > > Best,
> > > Konstantine
> > >
> > >
> > > On Tue, Feb 11, 2020 at 12:57 PM Sophie Blee-Goldman <
> > sop...@confluent.io>
> > > wrote:
> > >
> > > > Konstantine,
> > > > Thanks for the feedback! I've updated the sections with your
> > > suggestions. I
> > > > agree
> > > > in particular that it's really important to make sure users don't
> call
> > > this
> > > > unnecessarily,
> > > >  or for the wrong reasons: to that end I also extended the javadocs
> to
> > > > specify that this
> > > > API is for when changes to the subscription userdata occur. Hopefully
> > > that
> > > > should make
> > > > its intended usage quite clear.
> > > >
> > > > Bill,
> > > > The rebalance triggered by this new API will be a "normal" rebalance,
> > and
> > > > therefore
> > > > follow the existing listener semantics. For example a cooperative
> > > rebalance
> > > > will always
> > > > call onPartitionsAssigned, even if no partitions are actually moved.
> > > > An eager rebalance will still revoke all partitions first anyway.
> > > >
> > > > Thanks for the feedback!
> > > > Sophie
> > > >
> > > > On Tue, Feb 11, 2020 at 9:52 AM Bill Bejeck <bbej...@gmail.com>
> wrote:
> > > >
> > > > > Hi Sophie,
> > > > >
> > > > > Thanks for the KIP, makes sense to me.
> > > > >
> > > > > One quick question, I'm not sure if it's relevant or not.
> > > > >
> > > > > If a user provides a `ConsumerRebalanceListener` and a rebalance is
> > > > > triggered from an `enforceRebalance`  call,
> > > > > it seems possible the listener won't get called since partition
> > > > assignments
> > > > > might not change.
> > > > > If that is the case, do we want to possibly consider adding a
> method
> > to
> > > > the
> > > > > `ConsumerRebalanceListener` for callbacks on `enforceRebalance`
> > > actions?
> > > > >
> > > > > Thanks,
> > > > > Bill
> > > > >
> > > > >
> > > > >
> > > > > On Tue, Feb 11, 2020 at 12:11 PM Konstantine Karantasis <
> > > > > konstant...@confluent.io> wrote:
> > > > >
> > > > > > Hi Sophie.
> > > > > >
> > > > > > Thanks for the KIP. I liked how focused the proposal is. Also,
> its
> > > > > > motivation is clear after carefully reading the KIP and its
> > > references.
> > > > > >
> > > > > > Yet, I think it'd be a good idea to call out explicitly on the
> > > Rejected
> > > > > > Alternatives section that an automatic and periodic triggering of
> > > > > > rebalances that would not require exposing this capability
> through
> > > the
> > > > > > Consumer interface does not cover your specific use cases and
> > > therefore
> > > > > is
> > > > > > not chosen as a desired approach. Maybe, even consider mentioning
> > > again
> > > > > > here that this method is expected to be used to respond to system
> > > > changes
> > > > > > external to the consumer and its membership logic and is not
> > proposed
> > > > as
> > > > > a
> > > > > > way to resolve temporary imbalances due to membership changes
> that
> > > > should
> > > > > > inherently be resolved by the assignor logic itself with one or
> > more
> > > > > > consecutive rebalances.
> > > > > >
> > > > > > Also, in your javadoc I'd add some context similar to what
> someone
> > > can
> > > > > read
> > > > > > on the KIP. Specifically where you say: "for example if some
> > > condition
> > > > > has
> > > > > > changed that has implications for the partition assignment." I'd
> > > rather
> > > > > add
> > > > > > something like "for example, if some condition external and
> > invisible
> > > > to
> > > > > > the Consumer and its group membership has changed in ways that
> > would
> > > > > > justify a new partition assignment". That's just an example, feel
> > > free
> > > > to
> > > > > > reword, but I believe that saying explicitly that this condition
> is
> > > not
> > > > > > visible to the consumer is useful to understand that this is not
> > > > > necessary
> > > > > > under normal circumstances.
> > > > > >
> > > > > > In Compatibility, Deprecation, and Migration Plan section I think
> > > it's
> > > > > > worth mentioning that this is a new feature that affects new
> > > > > > implementations of the Consumer interface and any such new
> > > > implementation
> > > > > > should override the new method. Implementations that wish to
> > upgrade
> > > > to a
> > > > > > newer version should be extended and recompiled, since no default
> > > > > > implementation will be provided.
> > > > > >
> > > > > > Naming is hard here, if someone wants to emphasize the ad hoc and
> > > > > irregular
> > > > > > nature of this call. After some thought I'm fine with
> > > > 'enforceRebalance'
> > > > > > even if it could potentially be confused to a method that is
> > supposed
> > > > to
> > > > > be
> > > > > > called to remediate one or more previously unsuccessful
> rebalances
> > > > (which
> > > > > > is partly what StreamThread#enforceRebalance is used for). The
> > best I
> > > > > could
> > > > > > think of was 'onRequestRebalance' but that's not perfect either.
> > > > > >
> > > > > > Best,
> > > > > > Konstantine
> > > > > >
> > > > > >
> > > > > > On Mon, Feb 10, 2020 at 5:18 PM Sophie Blee-Goldman <
> > > > sop...@confluent.io
> > > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Thanks John. I took out the KafkaConsumer method and moved the
> > > > javadocs
> > > > > > > to the Consumer#enforceRebalance in the KIP -- hope you're
> happy
> > :P
> > > > > > >
> > > > > > > Also, I wanted to point out one minor change to the current
> > > proposal:
> > > > > > make
> > > > > > > this
> > > > > > > a blocking call, which accepts a timeout and returns whether
> the
> > > > > > rebalance
> > > > > > > completed within the timeout. It will still reduce to a
> > nonblocking
> > > > > call
> > > > > > if
> > > > > > > a "zero"
> > > > > > > timeout is supplied. I've updated the KIP accordingly.
> > > > > > >
> > > > > > > Let me know if there are any further concerns, else I'll call
> > for a
> > > > > vote.
> > > > > > >
> > > > > > > Cheers!
> > > > > > > Sophie
> > > > > > >
> > > > > > > On Mon, Feb 10, 2020 at 12:47 PM John Roesler <
> > vvcep...@apache.org
> > > >
> > > > > > wrote:
> > > > > > >
> > > > > > > > Thanks Sophie,
> > > > > > > >
> > > > > > > > Sorry I didn't respond. I think your new method name sounds
> > good.
> > > > > > > >
> > > > > > > > Regarding the interface vs implementation, I agree it's
> > > confusing.
> > > > > It's
> > > > > > > > always bothered me that the interface redirects you to an
> > > > > > implementation
> > > > > > > > JavaDocs, but never enough for me to stop what I'm doing to
> fix
> > > it.
> > > > > > > > It's not a big deal either way, I just thought it was strange
> > to
> > > > > > propose
> > > > > > > a
> > > > > > > > "public interface" change, but not in terms of the actual
> > > interface
> > > > > > > class.
> > > > > > > >
> > > > > > > > It _is_ true that KafkaConsumer is also part of the public
> API,
> > > but
> > > > > > only
> > > > > > > > really
> > > > > > > > for the constructor. Any proposal to define a new "consumer
> > > client"
> > > > > API
> > > > > > > > should be on the Consumer interface (which you said you plan
> to
> > > do
> > > > > > > anyway).
> > > > > > > > I guess I brought it up because proposing an addition to
> > Consumer
> > > > > > implies
> > > > > > > > it would be added to KafkaConsumer, but proposing an addition
> > to
> > > > > > > > KafkaConsumer does not necessarily imply it would also be
> added
> > > to
> > > > > > > > Consumer. Does that make sense?
> > > > > > > >
> > > > > > > > Anyway, thanks for updating the KIP.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > -John
> > > > > > > >
> > > > > > > >
> > > > > > > > On Mon, Feb 10, 2020, at 14:38, Sophie Blee-Goldman wrote:
> > > > > > > > > Since this doesn't seem too controversial, I'll probably
> call
> > > > for a
> > > > > > > vote
> > > > > > > > by
> > > > > > > > > end of day.
> > > > > > > > > If there any further comments/questions/concerns, please
> let
> > me
> > > > > know!
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Sophie
> > > > > > > > >
> > > > > > > > > On Sat, Feb 8, 2020 at 12:19 AM Sophie Blee-Goldman <
> > > > > > > sop...@confluent.io
> > > > > > > > >
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Thanks for the feedback! That's a good point about trying
> > to
> > > > > > prevent
> > > > > > > > users
> > > > > > > > > > from
> > > > > > > > > > thinking they should use this API during normal
> processing
> > > and
> > > > > > > > clarifying
> > > > > > > > > > when/why
> > > > > > > > > > you might need it -- regardless of the method name, we
> > should
> > > > > > > > explicitly
> > > > > > > > > > call this out
> > > > > > > > > > in the javadocs.
> > > > > > > > > >
> > > > > > > > > > As for the method name, on reflection I agree that
> > > > "rejoinGroup"
> > > > > > does
> > > > > > > > not
> > > > > > > > > > seem to be
> > > > > > > > > > appropriate. Of course that's what the consumer will
> > actually
> > > > be
> > > > > > > doing,
> > > > > > > > > > but that's just an
> > > > > > > > > > implementation detail -- the name should reflect what the
> > API
> > > > is
> > > > > > > doing,
> > > > > > > > > > not how it does it
> > > > > > > > > > (which can always change).
> > > > > > > > > >
> > > > > > > > > > How about "enforceRebalance"? This is stolen from the
> > > > > StreamThread
> > > > > > > > method
> > > > > > > > > > that does
> > > > > > > > > > exactly this (by unsubscribing) so it seems to fit. I'll
> > > update
> > > > > the
> > > > > > > KIP
> > > > > > > > > > with this unless anyone
> > > > > > > > > > has another suggestion.
> > > > > > > > > >
> > > > > > > > > > Regarding the Consumer vs KafkaConsumer matter, I
> included
> > > the
> > > > > > > > > > KafkaConsumer method
> > > > > > > > > > because that's where all the javadocs redirect to in the
> > > > Consumer
> > > > > > > > > > interface. Also, FWIW
> > > > > > > > > > I'm pretty sure KafkaConsumer is also part of the public
> > API
> > > --
> > > > > we
> > > > > > > > would
> > > > > > > > > > be adding a new
> > > > > > > > > > method to both.
> > > > > > > > > >
> > > > > > > > > > On Fri, Feb 7, 2020 at 7:42 PM John Roesler <
> > > > vvcep...@apache.org
> > > > > >
> > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > >> Hi all,
> > > > > > > > > >>
> > > > > > > > > >> Thanks for the well motivated KIP, Sophie. I had some
> > > > > alternatives
> > > > > > > in
> > > > > > > > > >> mind, which
> > > > > > > > > >> I won't even bother to relate because I feel like the
> > > > motivation
> > > > > > > made
> > > > > > > > a
> > > > > > > > > >> compelling
> > > > > > > > > >> argument for the API as proposed.
> > > > > > > > > >>
> > > > > > > > > >> One very minor point you might as well fix is that the
> API
> > > > > change
> > > > > > is
> > > > > > > > > >> targeted at
> > > > > > > > > >> KafkaConsumer (the implementation), but should be
> targeted
> > > at
> > > > > > > > > >> Consumer (the interface).
> > > > > > > > > >>
> > > > > > > > > >> I agree with your discomfort about the name. Adding a
> > > "rejoin"
> > > > > > > method
> > > > > > > > > >> seems strange
> > > > > > > > > >> since there's no "join" method. Instead the way you join
> > the
> > > > > group
> > > > > > > the
> > > > > > > > > >> first time is just
> > > > > > > > > >> by calling "subscribe". But "resubscribe" seems too
> > indirect
> > > > > from
> > > > > > > what
> > > > > > > > > >> we're really trying
> > > > > > > > > >> to do, which is to trigger a rebalance by sending a new
> > > > > JoinGroup
> > > > > > > > request.
> > > > > > > > > >>
> > > > > > > > > >> Another angle is that we don't want the method to sound
> > like
> > > > > > > something
> > > > > > > > > >> you should
> > > > > > > > > >> be calling in normal circumstances, or people will be
> > > > "tricked"
> > > > > > into
> > > > > > > > > >> calling it unnecessarily.
> > > > > > > > > >>
> > > > > > > > > >> So, I think "rejoinGroup" is fine, although a person
> > _might_
> > > > be
> > > > > > > > forgiven
> > > > > > > > > >> for thinking they
> > > > > > > > > >> need to call it periodically or something. Did you
> > consider
> > > > > > > > > >> "triggerRebalance", which
> > > > > > > > > >> sounds pretty advanced-ish, and accurately describes
> what
> > > > > happens
> > > > > > > when
> > > > > > > > > >> you call it?
> > > > > > > > > >>
> > > > > > > > > >> All in all, the KIP sounds good to me, and I'm in favor.
> > > > > > > > > >>
> > > > > > > > > >> Thanks,
> > > > > > > > > >> -John
> > > > > > > > > >>
> > > > > > > > > >> On Fri, Feb 7, 2020, at 21:22, Anna McDonald wrote:
> > > > > > > > > >> > This situation was discussed at length after a recent
> > > talk I
> > > > > > gave.
> > > > > > > > This
> > > > > > > > > >> KIP
> > > > > > > > > >> > would be a great step towards increased availability
> and
> > > in
> > > > > > > > facilitating
> > > > > > > > > >> > lightweight rebalances.
> > > > > > > > > >> >
> > > > > > > > > >> > anna
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> > On Fri, Feb 7, 2020, 9:38 PM Sophie Blee-Goldman <
> > > > > > > > sop...@confluent.io>
> > > > > > > > > >> > wrote:
> > > > > > > > > >> >
> > > > > > > > > >> > > Hi all,
> > > > > > > > > >> > >
> > > > > > > > > >> > > In light of some recent and upcoming rebalancing and
> > > > > > > availability
> > > > > > > > > >> > > improvements, it seems we have a need for explicitly
> > > > > > triggering
> > > > > > > a
> > > > > > > > > >> consumer
> > > > > > > > > >> > > group rebalance. Therefore I'd like to propose
> adding
> > a
> > > > new
> > > > > > > > > >> > > rejoinGroup()method
> > > > > > > > > >> > > to the Consumer client (better method name
> suggestions
> > > are
> > > > > > very
> > > > > > > > > >> welcome).
> > > > > > > > > >> > >
> > > > > > > > > >> > > Please take a look at the KIP and let me know what
> you
> > > > > think!
> > > > > > > > > >> > >
> > > > > > > > > >> > > KIP document:
> > > > > > > > > >> > >
> > > > > > > > > >> > >
> > > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-568%3A+Explicit+rebalance+triggering+on+the+Consumer
> > > > > > > > > >> > >
> > > > > > > > > >> > > JIRA:
> > https://issues.apache.org/jira/browse/KAFKA-9525
> > > > > > > > > >> > >
> > > > > > > > > >> > > Cheers,
> > > > > > > > > >> > > Sophie
> > > > > > > > > >> > >
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>


-- 
-- Guozhang

Reply via email to