Alright, here's the detailed proposal for KAFKA-12477. This assumes we will change the default assignor to ["cooperative-sticky", "range"] in KIP-726. It also acknowledges that users may attempt any kind of upgrade without reading the docs, and so we need to put in safeguards against data corruption rather than assume everyone will follow the safe upgrade path.
With this proposal, 1) New applications on 3.0 will enable cooperative rebalancing by default 2) Existing applications which don’t set an assignor can safely upgrade to 3.0 using a single rolling bounce with no extra steps, and will automatically transition to cooperative rebalancing 3) Existing applications which do set an assignor that uses EAGER can likewise upgrade their applications to COOPERATIVE with a single rolling bounce 4) Once on 3.0, applications can safely go back and forth between EAGER and COOPERATIVE 5) Applications can safely downgrade away from 3.0 The high-level idea for dynamic protocol upgrades is that the group will leverage the assignor selected by the group coordinator to determine when it’s safe to upgrade to COOPERATIVE, and trigger a fail-safe to protect the group in case of rare events or user misconfiguration. The group coordinator selects the most preferred assignor that’s supported by all members of the group, so we know that all members will support COOPERATIVE once we receive the “cooperative-sticky” assignor after a rebalance. At this point, each member can upgrade their own protocol to COOPERATIVE. However, there may be situations in which an EAGER member may join the group even after upgrading to COOPERATIVE. For example, during a rolling upgrade if the last remaining member on the old bytecode misses a rebalance, the other members will be allowed to upgrade to COOPERATIVE. If the old member rejoins and is chosen to be the group leader before it’s upgraded to 3.0, it won’t be aware that the other members of the group have not yet revoked their partitions when computing the assignment. Short Circuit: The risk of mixing the cooperative and eager rebalancing protocols is that a partition may be assigned to one member while it has yet to be revoked from its previous owner. The danger is that the new owner may begin processing and committing offsets for this partition while the previous owner is also committing offsets in its #onPartitionsRevoked callback, which is invoked at the end of the rebalance in the cooperative protocol. This can result in these consumers overwriting each other’s offsets and getting a corrupted view of the partition. Note that it’s not possible to commit during a rebalance, so we can protect against offset corruption by blocking further commits after we detect that the group leader may not understand COOPERATIVE, but before we invoke #onPartitionsRevoked. This is the “short-circuit” — if we detect that the group is in an unsafe state, we invoke #onPartitionsLost instead of #onPartitionsRevoked and explicitly prevent offsets from being committed on those revoked partitions. Consumer procedure: Upon startup, the consumer will initially select the highest commonly-supported protocol across its configured assignors. With ["cooperative-sticky", "range”], the initial protocol will be EAGER when the member first joins the group. Following a rebalance, each member will check the selected assignor. If the chosen assignor supports COOPERATIVE, the member can upgrade their used protocol to COOPERATIVE and no further action is required. If the member is already on COOPERATIVE but the selected assignor does NOT support it, then we need to trigger the short-circuit. In this case we will invoke #onPartitionsLost instead of #onPartitionsRevoked, and set a flag to block any attempts at committing those partitions which have been revoked. If a commit is attempted, as may be the case if the user does not implement #onPartitionsLost (see KAFKA-12638), we will throw a CommitFailedException which will be bubbled up through poll() after completing the rebalance. The member will then downgrade its protocol to EAGER for the next rebalance. Let me know what you think, Sophie On Fri, Apr 2, 2021 at 7:08 PM Luke Chen <show...@gmail.com> wrote: > Hi Sophie, > Making the default to "cooperative-sticky, range" is a smart idea, to > ensure we can at least fall back to rangeAssignor if consumers are not > following our recommended upgrade path. I updated the KIP accordingly. > > Hi Chris, > No problem, I updated the KIP to include the change in Connect. > > Thank you very much. > > Luke > > On Thu, Apr 1, 2021 at 3:24 AM Chris Egerton <chr...@confluent.io.invalid> > wrote: > > > Hi all, > > > > @Sophie - I like the sound of the dual-protocol default. The smooth > upgrade > > path it permits sounds fantastic! > > > > @Luke - Do you think we can also include Connect in this KIP? Right now > we > > don't set any custom partition assignment strategies for the consumer > > groups we bring up for sink tasks, and if we continue to just use the > > default, the assignment strategy for those consumer groups would change > on > > Connect clusters once people upgrade to 3.0. I think this is fine > (assuming > > we can take care of https://issues.apache.org/jira/browse/KAFKA-12487 > > before then, which I'm fairly optimistic about), but it might be worth a > > sentence or two in the KIP explaining that the change in default will > > intentionally propagate to Connect. And, if we think Connect should be > left > > out of this change and stay on the range assignor instead, we should > > probably call that fact out in the KIP as well and state that Connect > will > > now override the default partition assignment strategy to be the range > > assignor (assuming the user hasn't specified a value for > > consumer.partition.assignment.strategy in their worker config or for > > consumer.override.partition.assignment.strategy in their connector > config). > > > > Cheers, > > > > Chris > > > > On Wed, Mar 31, 2021 at 12:18 AM Sophie Blee-Goldman > > <sop...@confluent.io.invalid> wrote: > > > > > Ok I'm still fleshing out all the details of KAFKA-12477 but I think we > > can > > > simplify some things a bit, and avoid > > > any kind of "fail-fast" which will require user intervention. In fact I > > > think we can avoid requiring the user to make > > > any changes at all for KIP-726, so we don't have to worry about whether > > > they actually read our documentation: > > > > > > Instead of making ["cooperative-sticky"] the default, we change the > > default > > > to ["cooperative-sticky", "range"]. > > > Since "range" is the old default, this is equivalent to the first > rolling > > > bounce of the safe upgrade path in KIP-429. > > > > > > Of course this also means that under the current protocol selection > > > mechanism we won't actually upgrade to > > > cooperative rebalancing with the default assignor. But that's where > > > KAFKA-12477 will come in. > > > > > > @Guozhang Wang <guozh...@confluent.io> I'll get back to you with a > > > concrete proposal and answer your questions, I just want to point out > > > that it's possible to side-step the risk of users shooting themselves > in > > > the foot (well, at least in this one specific case, > > > obviously they always find a way) > > > > > > On Tue, Mar 30, 2021 at 10:37 AM Guozhang Wang <wangg...@gmail.com> > > wrote: > > > > > > > Hi Sophie, > > > > > > > > My question is more related to KAFKA-12477, but since your latest > > replies > > > > are on this thread I figured we can follow-up on the same venue. Just > > so > > > I > > > > understand your latest comments above about the approach: > > > > > > > > * I think, we would need to persist this decision so that the group > > would > > > > never go back to the eager protocol, this bit would be written to the > > > > internal topic's assignment message. Is that correct? > > > > * Maybe you can describe the steps, after the group has decided to > move > > > > forward with cooperative protocols, when: > > > > 1) a new member joined the group with the old version, and hence only > > > > recognized eager protocol and executing the eager protocol with its > > first > > > > rebalance, what would happen. > > > > 2) in addition to 1), the new member joined the group with the old > > > version > > > > and only recognized the old subscription format, and was selected as > > the > > > > leader, what would happen. > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > > > On Mon, Mar 29, 2021 at 10:30 PM Luke Chen <show...@gmail.com> > wrote: > > > > > > > > > Hi Sophie & Ismael, > > > > > Thank you for your feedback. > > > > > No problem, let's pause this KIP and wait for this improvement: > > > > KAFKA-12477 > > > > > <https://issues.apache.org/jira/browse/KAFKA-12477>. > > > > > > > > > > Stay tuned :) > > > > > > > > > > Thank you. > > > > > Luke > > > > > > > > > > On Tue, Mar 30, 2021 at 3:14 AM Ismael Juma <ism...@juma.me.uk> > > wrote: > > > > > > > > > > > Hi Sophie, > > > > > > > > > > > > I didn't analyze the KIP in detail, but the two suggestions you > > > > mentioned > > > > > > sound like great improvements. > > > > > > > > > > > > A bit more context: breaking changes for a widely used product > like > > > > Kafka > > > > > > are costly and hence why we try as hard as we can to avoid them. > > When > > > > it > > > > > > comes to the brokers, they are often managed by a central group > (or > > > > > they're > > > > > > in the Cloud), so they're a bit easier to manage. Even so, it's > > still > > > > > > possible to upgrade from 0.8.x directly to 2.7 since all protocol > > > > > versions > > > > > > are still supported. When it comes to the basic clients > (producer, > > > > > > consumer, admin client), they're often embedded in applications > so > > we > > > > > have > > > > > > to be even more conservative. > > > > > > > > > > > > Ismael > > > > > > > > > > > > On Mon, Mar 29, 2021 at 10:50 AM Sophie Blee-Goldman > > > > > > <sop...@confluent.io.invalid> wrote: > > > > > > > > > > > > > Ismael, > > > > > > > > > > > > > > It seems like given 3.0 is a breaking release, we have to rely > on > > > > users > > > > > > > being aware of this and responsible > > > > > > > enough to read the upgrade guide. Otherwise we could never ever > > > make > > > > > any > > > > > > > breaking changes beyond just > > > > > > > removing deprecated APIs or other compilation-breaking errors > > that > > > > > would > > > > > > be > > > > > > > immediately visible, no? > > > > > > > > > > > > > > That said, obviously it's better to have a circuit-breaker that > > > will > > > > > fail > > > > > > > fast in case of a user misconfiguration > > > > > > > rather than silently corrupting the consumer group state -- eg > > for > > > > two > > > > > > > consumers to overlap in their ownership > > > > > > > of the same partition(s). We could definitely implement this, > and > > > now > > > > > > that > > > > > > > I think about it this might solve a > > > > > > > related problem in KAFKA-12477 > > > > > > > <https://issues.apache.org/jira/browse/KAFKA-12477>. We just > > add a > > > > new > > > > > > > field to the Assignment in which the group leader > > > > > > > indicates whether it's on a recent enough version to understand > > > > > > cooperative > > > > > > > rebalancing. If an upgraded member > > > > > > > joins the group, it'll only be allowed to start following the > new > > > > > > > rebalancing protocol after receiving the go-ahead > > > > > > > from the group leader. > > > > > > > > > > > > > > If we do go ahead and add this new field in the Assignment then > > I'm > > > > > > pretty > > > > > > > confident we can reduce the number > > > > > > > of required rolling bounces to just one with KAFKA-12477 > > > > > > > <https://issues.apache.org/jira/browse/KAFKA-12477>. In that > > case > > > we > > > > > > > should > > > > > > > be in much better shape to > > > > > > > feel good about changing the default to the > > > > CooperativeStickyAssignor. > > > > > > How > > > > > > > does that sound? > > > > > > > > > > > > > > To be clear, I'm not proposing we do this as part of KIP-726. > > > Here's > > > > my > > > > > > > take: > > > > > > > > > > > > > > Let's pause this KIP while I work on making these two > > improvements > > > in > > > > > > > KAFKA-12477 <https://issues.apache.org/jira/browse/KAFKA-12477 > >. > > > > Once > > > > > I > > > > > > > can > > > > > > > confirm the > > > > > > > short-circuit and single rolling bounce will be available for > > 3.0, > > > > I'll > > > > > > > report back on this thread. Then we can move > > > > > > > forward with this KIP again. > > > > > > > > > > > > > > Thoughts? > > > > > > > Sophie > > > > > > > > > > > > > > On Mon, Mar 29, 2021 at 12:01 AM Luke Chen <show...@gmail.com> > > > > wrote: > > > > > > > > > > > > > > > Hi Ismael, > > > > > > > > Thanks for your good question. Answer them below: > > > > > > > > *1. Are we saying that every consumer upgraded would have to > > > follow > > > > > the > > > > > > > > complex path described in the KIP? * > > > > > > > > --> We suggest that every consumer did these 2 steps of > rolling > > > > > > upgrade. > > > > > > > > And after KAFKA-12477 < > > > > > > https://issues.apache.org/jira/browse/KAFKA-12477 > > > > > > > > > > > > > > > > is completed, it can be reduced to 1 rolling upgrade. > > > > > > > > > > > > > > > > *2. what happens if they don't read the instructions and > > upgrade > > > as > > > > > > they > > > > > > > > have in the past?* > > > > > > > > --> The reason we want 2 steps of rolling upgrade is that we > > want > > > > to > > > > > > > avoid > > > > > > > > the situation where leader is on old byte-code and only > > recognize > > > > > > > "eager", > > > > > > > > but due to compatibility would still be able to deserialize > the > > > new > > > > > > > > protocol data from newer versioned members, and hence just go > > > ahead > > > > > and > > > > > > > do > > > > > > > > the assignment while new versioned members did not revoke > their > > > > > > > partitions > > > > > > > > before joining the group. > > > > > > > > > > > > > > > > But I'd say, the new default assignor > > "CooperativeStickyAssignor" > > > > was > > > > > > > > already introduced in V2.4.0, and it should be long enough > for > > > user > > > > > to > > > > > > > > upgrade to the new byte-code to recognize the "cooperative" > > > > protocol. > > > > > > > > > > > > > > > > What do you think? > > > > > > > > > > > > > > > > Thank you. > > > > > > > > Luke > > > > > > > > > > > > > > > > On Mon, Mar 29, 2021 at 12:14 PM Ismael Juma < > > ism...@juma.me.uk> > > > > > > wrote: > > > > > > > > > > > > > > > > > Thanks for the KIP. Are we saying that every consumer > > upgraded > > > > > would > > > > > > > have > > > > > > > > > to follow the complex path described in the KIP? Also, what > > > > happens > > > > > > if > > > > > > > > they > > > > > > > > > don't read the instructions and upgrade as they have in the > > > past? > > > > > > > > > > > > > > > > > > Ismael > > > > > > > > > > > > > > > > > > On Fri, Mar 26, 2021, 1:53 AM Luke Chen <show...@gmail.com > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hi everyone, > > > > > > > > > > <Update the subject> > > > > > > > > > > > > > > > > > > > > I'd like to discuss the following proposal to make the > > > > > > > > > > CooperativeStickyAssignor as the default assignor. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-726%3A+Make+the+CooperativeStickyAssignor+as+the+default+assignor > > > > > > > > > > > > > > > > > > > > Any comments are welcomed. > > > > > > > > > > > > > > > > > > > > Thank you. > > > > > > > > > > Luke > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > -- Guozhang > > > > > > > > > >