Thanks for another detailed response Lucas! Especially w.r.t how the epochs
are defined. I also went back and re-read KIP-848 to refresh myself, I
guess it wasn't clear to me how much of the next-gen consumer protocol we
are reusing vs what's being built from the ground up.

S16.

>  It will become interesting when we want to include extra data after the
> initial protocol definition.

I guess that's what I'm asking us to define -- how do we evolve the schema
when expanding the protocol? I think we need to set a few public contracts
here, such as whether/which versions get bumped when. For example if I
add/modify fields twice within a single Kafka release, do we still need to
update the version? I think the answer is "yes" since I know you Confluent
folks upgrade brokers more often than AK releases, but this is different
from how eg the SubscriptionInfo/AssignmentInfo of the
StreamsPartitionAssignor works today, hence I want to put all that in
writing (for those of us who aren't familiar with how all this works
already and may be modifying it in the future, say, to add client-side
assignment :P)

Maybe a quick example of what to do to evolve this schema is sufficient?

S17.
The KIP seems to throw the assignor, group coordinator, topic
initialization, and topology versioning all into a single black box on the
broker. I understand much of this is intentionally being glossed over as an
implementation detail since it's a KIP, but it would really help to tease
apart these distinct players and define their interactions. For example,
what are the inputs and outputs to the assignor? For example could we get a
table like the "Data Model
<https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217387038#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-DataModel>"
in 848?

KIP-848 goes into detail on these things, but again, it's unclear what
parts of 848 are and aren't supposed to be carried through. For example 848
has a concept of assignor "reasons" and client-side assignors and assignor
metadata which can all trigger a rebalance/group epoch bump. However in the
current plan for 1071 none of these exist. Which leads me to my next
point...

S18.
Many people use a managed Kafka service like Confluent Cloud where brokers
are on the bleeding edge but clients are upgraded less frequently. However,
the opposite is also true for a great many users: client applications are
easy to upgrade and bumped often to get new features, whereas the brokers
(often run by a different team altogether) are considered riskier to
upgrade and stay on one version for a long time. We can argue about which
case is more common -- in my personal experience it's actually the latter,
although I only realized this after my time at Confluent which was
obviously skewed towards meeting Confluent Cloud users -- but regardless,
we obviously need to support both cases as first-class citizens. And I'm a
bit concerned that there's not enough in the KIP to ensure that client-side
assignment won't end up getting hacked into the protocol as an afterthought.

Don't get me wrong, I'm still not demanding you guys implement client-side
assignors as part of this KIP. I'm happy to leave the implementation of
that to a followup, but I feel we need to flesh out the basic direction for
this right now, as part of the v0 Streams protocol.

I'm also not asking you to do this alone, and am happy to work with/for you
on this. I have one possible proposal sketched out already, although before
I share that I just wanted to make sure we're aligned on the fundamentals
and have mapped out the protocol in full first (ie S17). Just want to have
a rough plan for how clients-side assignment will fit into the picture, and
right now it's difficult to define because it's unclear where the
boundaries between the group coordinator and broker-side assignor are (eg
what are assignor inputs and outputs).

I'm committed to ensuring this doesn't add extra work or delay to this KIP
for you guys.

S19.
I think it could make sense to include a concept like the "assignor
reason"  from 848  as a top-level field and rebalance trigger (ie group
epoch bump). This would mainly be for the client-side assignor to trigger a
rebalance, but for another example, it seems like we want the brokers to be
constantly monitoring warmup tasks and processing lag updates to know when
a warmup task is to be switched, which is heavy work. Why not just let the
client who owns the warmup and knows the lag decide when it has warmed up
and signal the broker? It's a small thing for a client to check when its
task gets within the acceptable recovery lag and notify via the heartbeat
if so, but it seems like a rather big deal for the broker to be constantly
computing these checks for every task on every hb. Just food for thought,
I  don't really have a strong preference over where the warmup decision
happens -- simply pointing it out that it fits very neatly into the
"assignor reason"/"rebalance requested" framework. Making this a
client-side decision also makes it easier to evolve and customize since
Kafka Streams users tend to have more control over upgrading/implementing
stuff in their client application vs the brokers (which is often a
different team altogether)

S20.
To what extent do we plan to reuse implementations of 848 in order to build
the new Streams group protocol? I'm still having some trouble understanding
where we are just adding stuff on top and where we plan to rebuild from the
ground up. For example we seem to share most of the epoch reconciliation
stuff but will have different triggers for the group epoch bump. Is it
possible to just "plug in" the group epoch and let the consumer's group
coordinator figure stuff out from there? Will we have a completely distinct
Streams coordinator? I know this is an implementation detail but imo it's
ok to get in the weeds a bit for a large and complex KIP such as this.

Setting aside the topology initialization stuff, is the main difference
really just in how the group coordinator will be aware of tasks for Streams
groups vs partitions for consumer groups?

Which brings me to a high level question I feel I have to ask:

S21.
Can you go into more detail on why we have to provide Kafka Streams its own
RPCs as opposed to just utilizing the protocol from KIP-848? The Rejected
Alternatives does address this but the main argument there seems to be in
implementing a broker-side assignor. But can't you do this with 848? And
given there are users who rarely upgrade their brokers just as there are
users who rarely upgrade their clients, it seems silly to do all this work
and change the fundamental direction of Kafka Streams just to give more
control over the assignment to the brokers.

For a specific example, the KIP says "Client-side assignment makes it hard
to debug and tune the assignment logic" -- personally, I would find it
infinitely more difficult to tune and debug assignment logic if it were
happening on the broker. But that's just because I work with the app devs,
not the cluster operators. The point is we shouldn't design an entire
protocol to give preference to one vendor or another -- nobody wants the
ASF to start yelling at us  lol D:

Another example, which imo is a good one (IIUC) is that "a simple parameter
change requires redeploying of all streams clients". That's definitely a
huge bummer, but again, why would broker-side assignors not be able to
trigger a reassignment based on a custom config in the 848 protocol? We
could still have the ha and sticky assignors implemented on the brokers,
and could still define all the Streams configs as broker configs to be
passed into the streams custom broker assignors. That feels like a lot less
work than redoing our own group protocol from scratch?

I don't want you guys to think I'm trying to block the entire direction of
this KIP. I'd just like to better understand your thoughts on this since I
assume you've discussed this at length internally. Just help me understand
why this huge proposal is justified (for my own sake and to convince any
"others" who might be skeptical)


On Wed, Sep 4, 2024 at 6:00 PM Matthias J. Sax <mj...@apache.org> wrote:

> I still need to catch up on the discussion in general. But as promised,
> I just started KIP-1088 about the `KafkaClientSupplier` question.
>
> Looking forward to your feedback on the new KIP. I hope we can get
> KIP-1088 done soon, to not block this KIP.
>
>
> -Matthias
>
> On 9/4/24 09:52, Lucas Brutschy wrote:
> > Hi Sophie,
> >
> > thanks for the questions and comments!
> >
> > S1. KIP-1071 does not need to add public interfaces to streams, but it
> > does not (yet) make the streams protocol client pluggable. Matthias
> > promised to write a KIP that will propose a solution for the
> > KafkaClientSupplier soon. He suggested keeping it separate from this
> > KIP, but we can treat it as a blocker. You are right that we could try
> > fitting an explicit topic initialization in this interface, if we
> > decide we absolutely need it now, although I'm not sure if that would
> > actually define a useful workflow for the users to explicitly update a
> > topology. Explicit initialization cannot be part of admin tools,
> > because we do not have the topology there. It could probably be a
> > config that defaults to off, or a method on the KafkaStreams object?
> > To me, explicit initialization opens a broad discussion on how to do
> > it, so I would like to avoid including it in this KIP. I guess it also
> > depends on the outcome of the discussion in S8-S10.
> >
> > S2. It's hard to define the perfect default, and we should probably
> > aim for safe operations first - since people can always up the limit.
> > I changed it to 20.
> >
> > S11. I was going to write we can add it, but it's actually quite
> > independent of KIP-1071 and also a nice newbie ticket, so I would
> > propose we treat it as out-of-scope and let somebody define it in a
> > separate KIP - any discussion about it would anyway be lost in this
> > long discussion thread.
> >
> > S12. Member epoch behaves the same as in KIP-848. See the KIP-848
> > explanation for basics:
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-MemberEpoch-Reconciliationofthegroup
> >
> > Together with the section "Reconciliation of the group", this should
> > make clear how member epochs, group epochs and assignment epochs work
> > in KIP-1071, but I'll try to summarize the reconciliation a bit more
> > explicitly below:
> >
> > Essentially, the broker drives a reconciliation for each member
> > separately. The goal is to reconcile the assignment generated by the
> > assignor and reach the assignment epoch, but it's an asynchronous
> > process, so each member may still be on an older epoch, which is the
> > member epoch. We do not reinvent any of the internal machinery for
> > streams here, we are just adding standby tasks and warm-up tasks to
> > the reconciliation of KIP-848.
> >
> > During reconciliation, the broker first asks the member to revoke all
> > tasks that are not in its target assignment, by removing it from the
> > current assignment for the member (which is provided in the heartbeat
> > response). Until revocation is completed, the member remains on the
> > old member epoch and in `UNREVOKED` state. Only once the member
> > confirms revocation of these tasks by removing them from the set of
> > owned tasks in the heartbeat request, the member can transition to the
> > current assignment epoch and get new tasks assigned. Once the member
> > has revoked all tasks that are not in its target assignment, it
> > transitions to the new epoch and gets new tasks assigned. There are
> > some tasks that may be in the (declarative) target assignment of the
> > member generated by the assignor, but that may not be immediately
> > added to the current assignment of the member, because they are still
> > "unreleased". Unreleased tasks are:
> >
> > - A. For any task, another instance of the same process may own the
> > task in any role (active/standby/warm-up), and we are still awaiting
> > revocation.
> >
> > - B. For an active task, an instance of any process may own the same
> > active task, and we are still awaiting revocation.
> >
> > As long as there is at least one unreleased task, the member remains
> > in state UNRELEASED. Once all tasks of the target assignment are added
> > to the current assignment of the member (because they were released),
> > the member transitions to STABLE.
> >
> > Technically, the first constraint (we cannot assign a task in two
> > different roles to the same process) is only relevant for stateful
> > tasks that use the local state directory. I wonder if we should relax
> > this restriction slightly, by marking sub-topologies that access the
> > local state directory. This information could also be used by the
> > assignor. But we can also introduce as a follow-up improvement.
> >
> > S13i. You are right, these bumps of the group epoch are "on top" of
> > the KIP-848 reasons for bumping the group epoch. I added the KIP-848
> > reasons as well, because this was a bit confusing in the KIP.
> >
> > S13ii. The group epoch is bumped immediately once a warm-up task has
> > caught up, since this is what triggers running the assignor. However,
> > the assignment epoch is only bumped once the task assignor has run. If
> > the task assignment for a member hasn't changed, it will transition to
> > the new assignment epoch immediately.
> >
> > S13iii. In the case of a rolling bounce, the group epoch will be
> > bumped for every member that leaves and joins the group. For updating
> > rack ID, client tags, topology ID etc. -- this will mostly play a role
> > for static membership, as you mentioned, but would cause the same
> > thing - in a rolling bounce with static membership, we will attempt to
> > recompute the assignment after every bounce, if the topology ID is
> > changed in the members. This can cause broker load, as you mentioned.
> > Note, however, that we don't have to compute an assignment for every
> > group epoch bump. While KIP-848 computes its assignment as part of the
> > main loop of the group coordinator, it uses asynchronous assignment in
> > the case of client-side assignment, where the assignment is slower. So
> > it will kick off the assignment on the client side, enter state
> > `ASSIGNING`, but may have more group epoch bumps in the meantime while
> > the assignment is computed on the client side. We leave it open to use
> > the same mechanism on the broker to compute assignment asynchronously
> > on a separate thread, in case streams assignment is too slow for the
> > main loop of the group coordinator. We will also consider
> > rate-limiting the assignor to reduce broker load. However, it's too
> > early to define this in detail, as we don't know much about the
> > performance impact yet.
> >
> > S13iv. The heartbeat just sends "null" instead of the actual value in
> > case the value didn't change since the last heartbeat, so no
> > comparison is required. This is defined in the heartbeat RPC and the
> > same as in KIP-848.
> >
> > S13v. I think I disagree. The group epoch represents a version of the
> > group state, including the current topology and the metadata of all
> > members -- the whole input to the task assignor -- and defines exactly
> > the condition under which we attempt to recompute the assignment. So
> > it's not overloaded, and has a clear definition. It's also exactly the
> > same as in KIP-848, and we do not want to move away too far from that
> > protocol, unless it's something that is specific to streams that need
> > to be changed. Also, the protocol already uses three types of epochs -
> > member epoch, assignment epoch, group epoch. I don't think adding more
> > epochs will make things clearer. Unless we have a strong reason why we
> > need to add extra epochs, I'm not in favor.
> >
> > S14. Assignment epoch is the same as in KIP-848:
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-AssignmentEpoch-Computethegroupassignment
> >
> > Its definition does not change at all, so we didn't describe it in
> > detail in KIP-1071.
> >
> > S15. I agree that the name is a bit generic, but the time to change
> > this was when `group.instance.id` was introduced. Also, if we start
> > diverging from regular consumers / consumer groups in terms of naming,
> > it will not make things clearer. "application.id" and "group.id" are
> > bad enough, in my eyes. I hope it's okay if I just improve the field
> > documentation.
> >
> > S16. This goes back to KIP-482. Essentially, in flexible request
> > schema versions you can define optional fields that will be encoded
> > with a field tag on the wire, but can also be completely omitted.
> > This is similar to how fields are encoded in, e.g., protobuf. It
> > improves schema evolution, because we can add new fields to the schema
> > without bumping the version. There is also a minor difference in
> > encoding size: when encoded, tagged fields take up more bytes on the
> > wire because of the field tag, but they can be omitted completely. It
> > will become interesting when we want to include extra data after the
> > initial protocol definition.
> >
> > Cheers,
> > Lucas
> >
> > On Tue, Sep 3, 2024 at 10:36 PM Sophie Blee-Goldman
> > <sop...@responsive.dev> wrote:
> >>
> >> Ah, my bad -- I thought I refreshed the page to get the latest version
> >> which is why I was a bit confused when I couldn't find anything about
> the
> >> new tools which I had previously seen in the KIP. Sorry for the
> confusion
> >> and unnecessary questions
> >>
> >> S1.
> >>
> >>> You could imagine calling the initialize RPC
> >>> explicitly (to implement explicit initialization), but this would
> >>> still mean sending an event to the background thread, and the
> >>> background thread in turn invokes the RPC. However, explicit
> >>> initialization would require some additional public interfaces that we
> >>> are not including in this KIP.
> >>
> >> I'm confused -- if we do all the group management stuff in a background
> >> thread to avoid needing public APIs, how do we pass all the
> >> Streams-specific and app-specific info to the broker for the
> >> StreamsGroupInitialize? I am guessing this is where we need to invoke
> >> internal APIs and is related to our discussion about removing the
> >> KafkaClientSupplier. However, it seems to me that no matter how you
> look at
> >> it, Kafka Streams will need to pass information to and from the
> background
> >> thread if the background thread is to be aware of things like tasks,
> offset
> >> sums, repartition topics, etc
> >>
> >> We don't really need to rehash everything here since it seems like the
> >> proposal to add a StreamsConsumer/StreamsAssignment interface will
> address
> >> this, which I believe we're in agreement on. I just wanted to point out
> >> that this work to replace the KafkaClientSupplier is clearly intimately
> >> tied up with KIP-1071, and should imho be part of this KIP and not its
> own
> >> standalone thing.
> >>
> >> By the way I'm happy to hop on a call to help move things forward on
> that
> >> front (or anything really). Although I guess we're just waiting on a
> design
> >> right now?
> >>
> >> S2. I was suggesting something lower for the default upper limit on all
> >> groups. I don't feel too strongly about it, just wanted to point out
> that
> >> the tradeoff is not about the assignor runtime but rather about resource
> >> consumption, and that too many warmups could put undue load on the
> cluster.
> >> In the end if we want to trust application operators then I'd say it's
> fine
> >> to use a higher cluster-wide max like 100.
> >>
> >> S8-10 I'll get back to you on this in another followup since I'm still
> >> thinking some things through and want to keep the discussion rolling for
> >> now. In the meantime I have some additional questions:
> >>
> >> S11. One of the main issues with unstable assignments today is the fact
> >> that assignors rely on deterministic assignment and use the process Id,
> >> which is not configurable and only persisted via local disk in a
> >> best-effort attempt. It would be a very small change to include this in
> >> KIP-1071 (like 5 LOC), WDYT? (Would even be willing to do the PR for
> this
> >> myself so as not to add to your load). There's a ticket for it here:
> >> https://issues.apache.org/jira/browse/KAFKA-15190
> >>
> >> S12. What exactly is the member epoch and how is it defined? When is it
> >> bumped? I see in the HB request field that certain values signal a
> member
> >> joining/leaving, but I take it there are valid positive values besides
> the
> >> 0/-1/-2 codes? More on this in the next question:
> >>
> >> S13. The KIP says the group epoch is updated in these three cases:
> >> a. Every time the topology is updated through the
> StreamsGroupInitialize API
> >> b. When a member with an assigned warm-up task reports a task changelog
> >> offset and task changelog end offset whose difference is less that
> >> acceptable.recovery.lag.
> >> c. When a member updates its topology ID, rack ID, client tags or
> process
> >> ID. Note: Typically, these do not change within the lifetime of a
> Streams
> >> client, so this only happens when a member with static membership
> rejoins
> >> with an updated configuration.
> >>
> >> S13.i First, just to clarify, these are not the *only* times the group
> >> epoch is bumped but rather the additional cases on top of the regular
> >> consumer group protocol -- so it's still bumped when a new member joins
> or
> >> leaves, etc, right?
> >> S13.ii Second, does (b) imply the epoch is bumped just whenever the
> broker
> >> notices a task has finished warming up, or does it first reassign the
> task
> >> and then bump the group epoch as a result of this task reassignment?
> >> S13.iii Third, (a) mentions the group epoch is bumped on each
> >> StreamsGroupInitialize API but (c) says it gets bumped whenever a member
> >> updates its topology ID. Does this mean that in the event of a rolling
> >> upgrade that changes the topology, the group epoch is bumped just once
> or
> >> for each member that updates its topology ID? Or does the "updating its
> >> topology ID" somehow have something to do with static membership?
> >> S13.iv How does the broker know when a member has changed its rack ID,
> >> client tags, etc -- does it compare these values on every hb?? That
> seems
> >> like a lot of broker load. If this is all strictly for static membership
> >> then have we considered leaving static membership support for a followup
> >> KIP? Not necessarily advocating for that, just wondering if this was
> >> thought about already. Imo it would be acceptable to not support static
> >> membership in the first version (especially if we fixed other issues
> like
> >> making the process Id configurable to get actual stable assignments!)
> >> S13.v I'm finding the concept of group epoch here to be somewhat
> >> overloaded. It sounds like it's trying to keep track of multiple things
> >> within a single version: the group membership, the topology version, the
> >> assignment version, and various member statuses (eg rack ID/client
> tags).
> >> Personally, I think it would be extremely valuable to unroll all of this
> >> into separate epochs with clear definitions and boundaries and triggers.
> >> For example, I would suggest something like this:
> >>
> >> group epoch: describes the group at a point in time, bumped when set of
> >> member ids changes (ie static or dynamic member leaves or joins) --
> >> triggered by group coordinator noticing a change in group membership
> >> topology epoch: describes the topology version, bumped for each
> successful
> >> StreamsGroupInitialize that changes the broker's topology ID --
> triggered
> >> by group coordinator bumping the topology ID (ie StreamsGroupInitialize)
> >> (optional) member epoch: describes the member version, bumped when a
> >> memberId changes its rack ID, client tags, or process ID (maybe topology
> >> ID, not sure) -- is this needed only for static membership? Going back
> to
> >> question S12, when is the member epoch is bumped in the current
> proposal?
> >> assignment epoch: describes the assignment version, bumped when the
> >> assignor runs successfully (even if the actual assignment of tasks does
> not
> >> change) -- can be triggered by tasks completing warmup, a manual
> >> client-side trigger (future KIP only), etc (by definition is bumped any
> >> time any of the other epochs are bumped since they all trigger a
> >> reassignment)
> >>
> >> S14. The KIP also mentions a new concept of an  "assignment epoch" but
> >> doesn't seem to ever elaborate on how this is defined and what it's used
> >> for. I assume it's bumped each time the assignment changes and
> represents a
> >> sort of "assignment version", is that right? Can you describe in more
> >> detail the difference between the group epoch and the assignment epoch?
> >>
> >> S15. I assume the "InstanceId" field in the HB request corresponds to
> the
> >> group.instance.id config of static membership. I always felt this field
> >> name was too generic and easy to get mixed up with other identifiers,
> WDYT
> >> about "StaticId" or "StaticInstanceId" to make the static membership
> >> relation more explicit?
> >>
> >> S16. what is the "flexibleVersions" field in the RPC protocols? I assume
> >> this is related to versioning compatibility but it's not quite clear
> from
> >> the KIP how this is to be used. For example if we modify the protocol by
> >> adding new fields at the end, we'd bump the version but should the
> >> flexibleVersions field change as well?
> >>
> >> On Wed, Aug 28, 2024 at 5:06 AM Lucas Brutschy
> >> <lbruts...@confluent.io.invalid> wrote:
> >>
> >>> Hi Sophie,
> >>>
> >>> Thanks for your detailed comments - much appreciated! I think you read
> >>> a version of the KIP that did not yet include the admin command-line
> >>> tool and the Admin API extensions, so some of the comments are already
> >>> addressed in the KIP.
> >>>
> >>> S1. StreamsGroupHeartbeat and StreamsGroupInitialize are called in the
> >>> consumer background thread. Note that in the new consumer threading
> >>> model, all RPCs are run by the background thread. Check out this:
> >>>
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/Consumer+threading+refactor+design
> >>> for more information. Both our RPCs are just part of the group
> >>> membership management and do not need to be invoked explicitly by the
> >>> application thread. You could imagine calling the initialize RPC
> >>> explicitly (to implement explicit initialization), but this would
> >>> still mean sending an event to the background thread, and the
> >>> background thread in turn invokes the RPC. However, explicit
> >>> initialization would require some additional public interfaces that we
> >>> are not including in this KIP. StreamsGroupDescribe is called by the
> >>> AdminClient, and used by the command-line tool
> >>> kafka-streams-groups.sh.
> >>>
> >>> S2. I think the max.warmup.replicas=100 suggested by Nick was intended
> >>> as the upper limit for setting the group configuration on the broker.
> >>> Just to make sure that this was not a misunderstanding. By default,
> >>> values above 100 should be rejected when setting a specific value for
> >>> group. Are you suggesting 20 or 30 for the default value for groups,
> >>> or the default upper limit for the group configuration?
> >>>
> >>> S3. Yes, it's supposed to be used like SHUTDOWN_APPLICATION. The
> >>> MemberEpoch=-1 is a left-over from an earlier discussion. It means
> >>> that the member is leaving the group, so the intention was that the
> >>> member must leave the group when it asks the other members to shut
> >>> down. We later reconsidered this and decided that all applications
> >>> should just react to the shutdown application signal that is returned
> >>> by the broker, so the client first sets the ShutdownApplication and
> >>> later leaves the group. Thanks for spotting this, I removed it.
> >>>
> >>> S4. Not sure if this refers to the latest version of the KIP. We added
> >>> an extension of the admin API and a kafka-streams-groups.sh
> >>> command-line tool to the KIP already.
> >>>
> >>> S5. All RPCs for dealing with offsets will keep working with streams
> >>> groups. The extension of the admin API is rather cosmetic, since the
> >>> method names use "consumer group". The RPCs, however, are generic and
> >>> do not need to be changed.
> >>>
> >>> S6. Yes, you can use the DeleteGroup RPC with any group ID, whether
> >>> streams group or not.
> >>>
> >>> S7. See the admin API section.
> >>>
> >>> S8. I guess for both A and B, I am not sure what you are suggesting.
> >>> Do you want to make the broker-side topology immutable and not include
> >>> any information about the topology, like the topology ID in the RPC?
> >>> It would seem that this would be a massive food-gun for people, if
> >>> they start changing their topology and don't notice that the broker is
> >>> looking at a completely different version of the topology. Or did you
> >>> mean that there is some kind of topology ID, so that at least we can
> >>> detect inconsistencies between broker and client-side topologies, and
> >>> we fence out any member with an incorrect topology ID? Then we seem to
> >>> end up with mostly the same RPCs and the same questions (how is the
> >>> topology ID generated?). I agree that the latter could be an option.
> >>> See summary at the end of this message.
> >>>
> >>> S9. If we flat out refuse topology updates, agree - we cannot let the
> >>> application crash on minor changes to the topology. However, if we
> >>> allow topology updates as described in the KIP, there are only upsides
> >>> to making the topology ID more sensitive. All it will cause is that a
> >>> client will have to resend a `StreamsGroupInitialize`, and during the
> >>> rolling bounce, older clients will not get tasks assigned.
> >>>
> >>> S10. The intention in the KIP is really just this - old clients can
> >>> only retain tasks, but not get new ones. If the topology has
> >>> sub-topologies, these will initially be assigned to the new clients,
> >>> which also have the new topology. You are right that rolling an
> >>> application where the old structure and the new structure are
> >>> incompatible (e.g. different subtopologies access the same partition)
> >>> will cause problems. But this will also cause problems in the current
> >>> protocol, so I'm not sure, if it's strictly a regression, it's just
> >>> unsupported (which we can only make the best effort to detect).
> >>>
> >>> In summary, for the topology ID discussion, I mainly see two options:
> >>> 1) stick with the current KIP proposal
> >>> 2) define the topology ID as the hash of the StreamsGroupInitialize
> >>> topology metadata only, as you suggested, and fence out members with
> >>> an incompatible topology ID.
> >>> I think doing 2 is also a good option, but in the end it comes down to
> >>> how important we believe topology updates (where the set of
> >>> sub-topologies or set of internal topics are affected) to be. The main
> >>> goal is to make the new protocol a drop-in replacement for the old
> >>> protocol, and at least define the RPCs in a way that we won't need
> >>> another KIP which bumps the RPC version to make the protocol
> >>> production-ready. Adding more command-line tools, or public interfaces
> >>> seems less critical as it doesn't change the protocol and can be done
> >>> easily later on. The main question becomes - is the protocol
> >>> production-ready and sufficient to replace the classic protocol if we
> >>> go with 2?
> >>>
> >>> Cheers,
> >>> Lucas
> >>>
>

Reply via email to