1) Yes, all of the above will be part of KAFKA-12477 (not KIP-726)

2) No, KAFKA-12638 would be nice to have but I don't think it's appropriate
to remove
the default implementation of #onPartitionsLost in 3.0 since we never gave
any indication
yet that we intend to remove it

3) Yes, this would be similar to when a Consumer drops out of the group.
It's always been
possible for a member to miss a rebalance and have its partition be
reassigned to another
member, during which time both members would claim to own said partition.
But this is safe
because the member who dropped out is blocked from committing offsets on
that partition.

On Fri, Apr 9, 2021 at 2:46 AM Luke Chen <show...@gmail.com> wrote:

> Hi Sophie,
> That sounds great to take care of each case I can think of.
> Questions:
> 1. Do you mean the short-Circuit will also be implemented in KAFKA-12477?
> 2. I don't think KAFKA-12638 is the blocker of this KIP-726, Am I right?
> 3. So, does that mean we still have possibility to have multiple consumer
> owned the same topic partition? And in this situation, we avoid them doing
> committing, and waiting for next rebalance (should be soon). Is my
> understanding correct?
>
> Thank you very much for finding this great solution.
>
> Luke
>
> On Fri, Apr 9, 2021 at 11:37 AM Sophie Blee-Goldman
> <sop...@confluent.io.invalid> wrote:
>
> > Alright, here's the detailed proposal for KAFKA-12477. This assumes we
> will
> > change the default assignor to ["cooperative-sticky", "range"] in
> KIP-726.
> > It also acknowledges that users may attempt any kind of upgrade without
> > reading the docs, and so we need to put in safeguards against data
> > corruption rather than assume everyone will follow the safe upgrade path.
> >
> > With this proposal,
> > 1) New applications on 3.0 will enable cooperative rebalancing by default
> > 2) Existing applications which don’t set an assignor can safely upgrade
> to
> > 3.0 using a single rolling bounce with no extra steps, and will
> > automatically transition to cooperative rebalancing
> > 3) Existing applications which do set an assignor that uses EAGER can
> > likewise upgrade their applications to COOPERATIVE with a single rolling
> > bounce
> > 4) Once on 3.0, applications can safely go back and forth between EAGER
> and
> > COOPERATIVE
> > 5) Applications can safely downgrade away from 3.0
> >
> > The high-level idea for dynamic protocol upgrades is that the group will
> > leverage the assignor selected by the group coordinator to determine when
> > it’s safe to upgrade to COOPERATIVE, and trigger a fail-safe to protect
> the
> > group in case of rare events or user misconfiguration. The group
> > coordinator selects the most preferred assignor that’s supported by all
> > members of the group, so we know that all members will support
> COOPERATIVE
> > once we receive the “cooperative-sticky” assignor after a rebalance. At
> > this point, each member can upgrade their own protocol to COOPERATIVE.
> > However, there may be situations in which an EAGER member may join the
> > group even after upgrading to COOPERATIVE. For example, during a rolling
> > upgrade if the last remaining member on the old bytecode misses a
> > rebalance, the other members will be allowed to upgrade to COOPERATIVE.
> If
> > the old member rejoins and is chosen to be the group leader before it’s
> > upgraded to 3.0, it won’t be aware that the other members of the group
> have
> > not yet revoked their partitions when computing the assignment.
> >
> > Short Circuit:
> > The risk of mixing the cooperative and eager rebalancing protocols is
> that
> > a partition may be assigned to one member while it has yet to be revoked
> > from its previous owner. The danger is that the new owner may begin
> > processing and committing offsets for this partition while the previous
> > owner is also committing offsets in its #onPartitionsRevoked callback,
> > which is invoked at the end of the rebalance in the cooperative protocol.
> > This can result in these consumers overwriting each other’s offsets and
> > getting a corrupted view of the partition. Note that it’s not possible to
> > commit during a rebalance, so we can protect against offset corruption by
> > blocking further commits after we detect that the group leader may not
> > understand COOPERATIVE, but before we invoke #onPartitionsRevoked. This
> is
> > the “short-circuit” — if we detect that the group is in an unsafe state,
> we
> > invoke #onPartitionsLost instead of #onPartitionsRevoked and explicitly
> > prevent offsets from being committed on those revoked partitions.
> >
> > Consumer procedure:
> > Upon startup, the consumer will initially select the highest
> > commonly-supported protocol across its configured assignors. With
> > ["cooperative-sticky", "range”], the initial protocol will be EAGER when
> > the member first joins the group. Following a rebalance, each member will
> > check the selected assignor. If the chosen assignor supports COOPERATIVE,
> > the member can upgrade their used protocol to COOPERATIVE and no further
> > action is required. If the member is already on COOPERATIVE but the
> > selected assignor does NOT support it, then we need to trigger the
> > short-circuit. In this case we will invoke #onPartitionsLost instead of
> > #onPartitionsRevoked, and set a flag to block any attempts at committing
> > those partitions which have been revoked. If a commit is attempted, as
> may
> > be the case if the user does not implement #onPartitionsLost (see
> > KAFKA-12638), we will throw a CommitFailedException which will be bubbled
> > up through poll() after completing the rebalance. The member will then
> > downgrade its protocol to EAGER for the next rebalance.
> >
> > Let me know what you think,
> > Sophie
> >
> > On Fri, Apr 2, 2021 at 7:08 PM Luke Chen <show...@gmail.com> wrote:
> >
> > > Hi Sophie,
> > > Making the default to "cooperative-sticky, range" is a smart idea, to
> > > ensure we can at least fall back to rangeAssignor if consumers are not
> > > following our recommended upgrade path. I updated the KIP accordingly.
> > >
> > > Hi Chris,
> > > No problem, I updated the KIP to include the change in Connect.
> > >
> > > Thank you very much.
> > >
> > > Luke
> > >
> > > On Thu, Apr 1, 2021 at 3:24 AM Chris Egerton
> <chr...@confluent.io.invalid
> > >
> > > wrote:
> > >
> > > > Hi all,
> > > >
> > > > @Sophie - I like the sound of the dual-protocol default. The smooth
> > > upgrade
> > > > path it permits sounds fantastic!
> > > >
> > > > @Luke - Do you think we can also include Connect in this KIP? Right
> now
> > > we
> > > > don't set any custom partition assignment strategies for the consumer
> > > > groups we bring up for sink tasks, and if we continue to just use the
> > > > default, the assignment strategy for those consumer groups would
> change
> > > on
> > > > Connect clusters once people upgrade to 3.0. I think this is fine
> > > (assuming
> > > > we can take care of
> https://issues.apache.org/jira/browse/KAFKA-12487
> > > > before then, which I'm fairly optimistic about), but it might be
> worth
> > a
> > > > sentence or two in the KIP explaining that the change in default will
> > > > intentionally propagate to Connect. And, if we think Connect should
> be
> > > left
> > > > out of this change and stay on the range assignor instead, we should
> > > > probably call that fact out in the KIP as well and state that Connect
> > > will
> > > > now override the default partition assignment strategy to be the
> range
> > > > assignor (assuming the user hasn't specified a value for
> > > > consumer.partition.assignment.strategy in their worker config or for
> > > > consumer.override.partition.assignment.strategy in their connector
> > > config).
> > > >
> > > > Cheers,
> > > >
> > > > Chris
> > > >
> > > > On Wed, Mar 31, 2021 at 12:18 AM Sophie Blee-Goldman
> > > > <sop...@confluent.io.invalid> wrote:
> > > >
> > > > > Ok I'm still fleshing out all the details of KAFKA-12477 but I
> think
> > we
> > > > can
> > > > > simplify some things a bit, and avoid
> > > > > any kind of "fail-fast" which will require user intervention. In
> > fact I
> > > > > think we can avoid requiring the user to make
> > > > > any changes at all for KIP-726, so we don't have to worry about
> > whether
> > > > > they actually read our documentation:
> > > > >
> > > > > Instead of making ["cooperative-sticky"] the default, we change the
> > > > default
> > > > > to ["cooperative-sticky", "range"].
> > > > > Since "range" is the old default, this is equivalent to the first
> > > rolling
> > > > > bounce of the safe upgrade path in KIP-429.
> > > > >
> > > > > Of course this also means that under the current protocol selection
> > > > > mechanism we won't actually upgrade to
> > > > > cooperative rebalancing with the default assignor. But that's where
> > > > > KAFKA-12477 will come in.
> > > > >
> > > > > @Guozhang Wang <guozh...@confluent.io>  I'll get back to you with
> a
> > > > > concrete proposal and answer your questions, I just want to point
> out
> > > > > that it's possible to side-step the risk of users shooting
> themselves
> > > in
> > > > > the foot (well, at least in this one specific case,
> > > > > obviously they always find a way)
> > > > >
> > > > > On Tue, Mar 30, 2021 at 10:37 AM Guozhang Wang <wangg...@gmail.com
> >
> > > > wrote:
> > > > >
> > > > > > Hi Sophie,
> > > > > >
> > > > > > My question is more related to KAFKA-12477, but since your latest
> > > > replies
> > > > > > are on this thread I figured we can follow-up on the same venue.
> > Just
> > > > so
> > > > > I
> > > > > > understand your latest comments above about the approach:
> > > > > >
> > > > > > * I think, we would need to persist this decision so that the
> group
> > > > would
> > > > > > never go back to the eager protocol, this bit would be written to
> > the
> > > > > > internal topic's assignment message. Is that correct?
> > > > > > * Maybe you can describe the steps, after the group has decided
> to
> > > move
> > > > > > forward with cooperative protocols, when:
> > > > > > 1) a new member joined the group with the old version, and hence
> > only
> > > > > > recognized eager protocol and executing the eager protocol with
> its
> > > > first
> > > > > > rebalance, what would happen.
> > > > > > 2) in addition to 1), the new member joined the group with the
> old
> > > > > version
> > > > > > and only recognized the old subscription format, and was selected
> > as
> > > > the
> > > > > > leader, what would happen.
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Mon, Mar 29, 2021 at 10:30 PM Luke Chen <show...@gmail.com>
> > > wrote:
> > > > > >
> > > > > > > Hi Sophie & Ismael,
> > > > > > > Thank you for your feedback.
> > > > > > > No problem, let's pause this KIP and wait for this improvement:
> > > > > > KAFKA-12477
> > > > > > > <https://issues.apache.org/jira/browse/KAFKA-12477>.
> > > > > > >
> > > > > > > Stay tuned :)
> > > > > > >
> > > > > > > Thank you.
> > > > > > > Luke
> > > > > > >
> > > > > > > On Tue, Mar 30, 2021 at 3:14 AM Ismael Juma <ism...@juma.me.uk
> >
> > > > wrote:
> > > > > > >
> > > > > > > > Hi Sophie,
> > > > > > > >
> > > > > > > > I didn't analyze the KIP in detail, but the two suggestions
> you
> > > > > > mentioned
> > > > > > > > sound like great improvements.
> > > > > > > >
> > > > > > > > A bit more context: breaking changes for a widely used
> product
> > > like
> > > > > > Kafka
> > > > > > > > are costly and hence why we try as hard as we can to avoid
> > them.
> > > > When
> > > > > > it
> > > > > > > > comes to the brokers, they are often managed by a central
> group
> > > (or
> > > > > > > they're
> > > > > > > > in the Cloud), so they're a bit easier to manage. Even so,
> it's
> > > > still
> > > > > > > > possible to upgrade from 0.8.x directly to 2.7 since all
> > protocol
> > > > > > > versions
> > > > > > > > are still supported. When it comes to the basic clients
> > > (producer,
> > > > > > > > consumer, admin client), they're often embedded in
> applications
> > > so
> > > > we
> > > > > > > have
> > > > > > > > to be even more conservative.
> > > > > > > >
> > > > > > > > Ismael
> > > > > > > >
> > > > > > > > On Mon, Mar 29, 2021 at 10:50 AM Sophie Blee-Goldman
> > > > > > > > <sop...@confluent.io.invalid> wrote:
> > > > > > > >
> > > > > > > > > Ismael,
> > > > > > > > >
> > > > > > > > > It seems like given 3.0 is a breaking release, we have to
> > rely
> > > on
> > > > > > users
> > > > > > > > > being aware of this and responsible
> > > > > > > > > enough to read the upgrade guide. Otherwise we could never
> > ever
> > > > > make
> > > > > > > any
> > > > > > > > > breaking changes beyond just
> > > > > > > > > removing deprecated APIs or other compilation-breaking
> errors
> > > > that
> > > > > > > would
> > > > > > > > be
> > > > > > > > > immediately visible, no?
> > > > > > > > >
> > > > > > > > > That said, obviously it's better to have a circuit-breaker
> > that
> > > > > will
> > > > > > > fail
> > > > > > > > > fast in case of a user misconfiguration
> > > > > > > > > rather than silently corrupting the consumer group state --
> > eg
> > > > for
> > > > > > two
> > > > > > > > > consumers to overlap in their ownership
> > > > > > > > > of the same partition(s). We could definitely implement
> this,
> > > and
> > > > > now
> > > > > > > > that
> > > > > > > > > I think about it this might solve a
> > > > > > > > > related problem in KAFKA-12477
> > > > > > > > > <https://issues.apache.org/jira/browse/KAFKA-12477>. We
> just
> > > > add a
> > > > > > new
> > > > > > > > > field to the Assignment in which the group leader
> > > > > > > > > indicates whether it's on a recent enough version to
> > understand
> > > > > > > > cooperative
> > > > > > > > > rebalancing. If an upgraded member
> > > > > > > > > joins the group, it'll only be allowed to start following
> the
> > > new
> > > > > > > > > rebalancing protocol after receiving the go-ahead
> > > > > > > > > from the group leader.
> > > > > > > > >
> > > > > > > > > If we do go ahead and add this new field in the Assignment
> > then
> > > > I'm
> > > > > > > > pretty
> > > > > > > > > confident we can reduce the number
> > > > > > > > > of required rolling bounces to just one with KAFKA-12477
> > > > > > > > > <https://issues.apache.org/jira/browse/KAFKA-12477>. In
> that
> > > > case
> > > > > we
> > > > > > > > > should
> > > > > > > > > be in much better shape to
> > > > > > > > > feel good about changing the default to the
> > > > > > CooperativeStickyAssignor.
> > > > > > > > How
> > > > > > > > > does that sound?
> > > > > > > > >
> > > > > > > > > To be clear, I'm not proposing we do this as part of
> KIP-726.
> > > > > Here's
> > > > > > my
> > > > > > > > > take:
> > > > > > > > >
> > > > > > > > > Let's pause this KIP while I work on making these two
> > > > improvements
> > > > > in
> > > > > > > > > KAFKA-12477 <
> > https://issues.apache.org/jira/browse/KAFKA-12477
> > > >.
> > > > > > Once
> > > > > > > I
> > > > > > > > > can
> > > > > > > > > confirm the
> > > > > > > > > short-circuit and single rolling bounce will be available
> for
> > > > 3.0,
> > > > > > I'll
> > > > > > > > > report back on this thread. Then we can move
> > > > > > > > > forward with this KIP again.
> > > > > > > > >
> > > > > > > > > Thoughts?
> > > > > > > > > Sophie
> > > > > > > > >
> > > > > > > > > On Mon, Mar 29, 2021 at 12:01 AM Luke Chen <
> > show...@gmail.com>
> > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi Ismael,
> > > > > > > > > > Thanks for your good question. Answer them below:
> > > > > > > > > > *1. Are we saying that every consumer upgraded would have
> > to
> > > > > follow
> > > > > > > the
> > > > > > > > > > complex path described in the KIP? *
> > > > > > > > > > --> We suggest that every consumer did these 2 steps of
> > > rolling
> > > > > > > > upgrade.
> > > > > > > > > > And after KAFKA-12477 <
> > > > > > > > https://issues.apache.org/jira/browse/KAFKA-12477
> > > > > > > > > >
> > > > > > > > > > is completed, it can be reduced to 1 rolling upgrade.
> > > > > > > > > >
> > > > > > > > > > *2. what happens if they don't read the instructions and
> > > > upgrade
> > > > > as
> > > > > > > > they
> > > > > > > > > > have in the past?*
> > > > > > > > > > --> The reason we want 2 steps of rolling upgrade is that
> > we
> > > > want
> > > > > > to
> > > > > > > > > avoid
> > > > > > > > > > the situation where leader is on old byte-code and only
> > > > recognize
> > > > > > > > > "eager",
> > > > > > > > > > but due to compatibility would still be able to
> deserialize
> > > the
> > > > > new
> > > > > > > > > > protocol data from newer versioned members, and hence
> just
> > go
> > > > > ahead
> > > > > > > and
> > > > > > > > > do
> > > > > > > > > > the assignment while new versioned members did not revoke
> > > their
> > > > > > > > > partitions
> > > > > > > > > > before joining the group.
> > > > > > > > > >
> > > > > > > > > > But I'd say, the new default assignor
> > > > "CooperativeStickyAssignor"
> > > > > > was
> > > > > > > > > > already introduced in V2.4.0, and it should be long
> enough
> > > for
> > > > > user
> > > > > > > to
> > > > > > > > > > upgrade to the new byte-code to recognize the
> "cooperative"
> > > > > > protocol.
> > > > > > > > > >
> > > > > > > > > > What do you think?
> > > > > > > > > >
> > > > > > > > > > Thank you.
> > > > > > > > > > Luke
> > > > > > > > > >
> > > > > > > > > > On Mon, Mar 29, 2021 at 12:14 PM Ismael Juma <
> > > > ism...@juma.me.uk>
> > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Thanks for the KIP. Are we saying that every consumer
> > > > upgraded
> > > > > > > would
> > > > > > > > > have
> > > > > > > > > > > to follow the complex path described in the KIP? Also,
> > what
> > > > > > happens
> > > > > > > > if
> > > > > > > > > > they
> > > > > > > > > > > don't read the instructions and upgrade as they have in
> > the
> > > > > past?
> > > > > > > > > > >
> > > > > > > > > > > Ismael
> > > > > > > > > > >
> > > > > > > > > > > On Fri, Mar 26, 2021, 1:53 AM Luke Chen <
> > show...@gmail.com
> > > >
> > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hi everyone,
> > > > > > > > > > > > <Update the subject>
> > > > > > > > > > > >
> > > > > > > > > > > > I'd like to discuss the following proposal to make
> the
> > > > > > > > > > > > CooperativeStickyAssignor as the default assignor.
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-726%3A+Make+the+CooperativeStickyAssignor+as+the+default+assignor
> > > > > > > > > > > >
> > > > > > > > > > > > Any comments are welcomed.
> > > > > > > > > > > >
> > > > > > > > > > > > Thank you.
> > > > > > > > > > > > Luke
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to