Ah, my bad -- I thought I refreshed the page to get the latest version which is why I was a bit confused when I couldn't find anything about the new tools which I had previously seen in the KIP. Sorry for the confusion and unnecessary questions
S1. > You could imagine calling the initialize RPC > explicitly (to implement explicit initialization), but this would > still mean sending an event to the background thread, and the > background thread in turn invokes the RPC. However, explicit > initialization would require some additional public interfaces that we > are not including in this KIP. I'm confused -- if we do all the group management stuff in a background thread to avoid needing public APIs, how do we pass all the Streams-specific and app-specific info to the broker for the StreamsGroupInitialize? I am guessing this is where we need to invoke internal APIs and is related to our discussion about removing the KafkaClientSupplier. However, it seems to me that no matter how you look at it, Kafka Streams will need to pass information to and from the background thread if the background thread is to be aware of things like tasks, offset sums, repartition topics, etc We don't really need to rehash everything here since it seems like the proposal to add a StreamsConsumer/StreamsAssignment interface will address this, which I believe we're in agreement on. I just wanted to point out that this work to replace the KafkaClientSupplier is clearly intimately tied up with KIP-1071, and should imho be part of this KIP and not its own standalone thing. By the way I'm happy to hop on a call to help move things forward on that front (or anything really). Although I guess we're just waiting on a design right now? S2. I was suggesting something lower for the default upper limit on all groups. I don't feel too strongly about it, just wanted to point out that the tradeoff is not about the assignor runtime but rather about resource consumption, and that too many warmups could put undue load on the cluster. In the end if we want to trust application operators then I'd say it's fine to use a higher cluster-wide max like 100. S8-10 I'll get back to you on this in another followup since I'm still thinking some things through and want to keep the discussion rolling for now. In the meantime I have some additional questions: S11. One of the main issues with unstable assignments today is the fact that assignors rely on deterministic assignment and use the process Id, which is not configurable and only persisted via local disk in a best-effort attempt. It would be a very small change to include this in KIP-1071 (like 5 LOC), WDYT? (Would even be willing to do the PR for this myself so as not to add to your load). There's a ticket for it here: https://issues.apache.org/jira/browse/KAFKA-15190 S12. What exactly is the member epoch and how is it defined? When is it bumped? I see in the HB request field that certain values signal a member joining/leaving, but I take it there are valid positive values besides the 0/-1/-2 codes? More on this in the next question: S13. The KIP says the group epoch is updated in these three cases: a. Every time the topology is updated through the StreamsGroupInitialize API b. When a member with an assigned warm-up task reports a task changelog offset and task changelog end offset whose difference is less that acceptable.recovery.lag. c. When a member updates its topology ID, rack ID, client tags or process ID. Note: Typically, these do not change within the lifetime of a Streams client, so this only happens when a member with static membership rejoins with an updated configuration. S13.i First, just to clarify, these are not the *only* times the group epoch is bumped but rather the additional cases on top of the regular consumer group protocol -- so it's still bumped when a new member joins or leaves, etc, right? S13.ii Second, does (b) imply the epoch is bumped just whenever the broker notices a task has finished warming up, or does it first reassign the task and then bump the group epoch as a result of this task reassignment? S13.iii Third, (a) mentions the group epoch is bumped on each StreamsGroupInitialize API but (c) says it gets bumped whenever a member updates its topology ID. Does this mean that in the event of a rolling upgrade that changes the topology, the group epoch is bumped just once or for each member that updates its topology ID? Or does the "updating its topology ID" somehow have something to do with static membership? S13.iv How does the broker know when a member has changed its rack ID, client tags, etc -- does it compare these values on every hb?? That seems like a lot of broker load. If this is all strictly for static membership then have we considered leaving static membership support for a followup KIP? Not necessarily advocating for that, just wondering if this was thought about already. Imo it would be acceptable to not support static membership in the first version (especially if we fixed other issues like making the process Id configurable to get actual stable assignments!) S13.v I'm finding the concept of group epoch here to be somewhat overloaded. It sounds like it's trying to keep track of multiple things within a single version: the group membership, the topology version, the assignment version, and various member statuses (eg rack ID/client tags). Personally, I think it would be extremely valuable to unroll all of this into separate epochs with clear definitions and boundaries and triggers. For example, I would suggest something like this: group epoch: describes the group at a point in time, bumped when set of member ids changes (ie static or dynamic member leaves or joins) -- triggered by group coordinator noticing a change in group membership topology epoch: describes the topology version, bumped for each successful StreamsGroupInitialize that changes the broker's topology ID -- triggered by group coordinator bumping the topology ID (ie StreamsGroupInitialize) (optional) member epoch: describes the member version, bumped when a memberId changes its rack ID, client tags, or process ID (maybe topology ID, not sure) -- is this needed only for static membership? Going back to question S12, when is the member epoch is bumped in the current proposal? assignment epoch: describes the assignment version, bumped when the assignor runs successfully (even if the actual assignment of tasks does not change) -- can be triggered by tasks completing warmup, a manual client-side trigger (future KIP only), etc (by definition is bumped any time any of the other epochs are bumped since they all trigger a reassignment) S14. The KIP also mentions a new concept of an "assignment epoch" but doesn't seem to ever elaborate on how this is defined and what it's used for. I assume it's bumped each time the assignment changes and represents a sort of "assignment version", is that right? Can you describe in more detail the difference between the group epoch and the assignment epoch? S15. I assume the "InstanceId" field in the HB request corresponds to the group.instance.id config of static membership. I always felt this field name was too generic and easy to get mixed up with other identifiers, WDYT about "StaticId" or "StaticInstanceId" to make the static membership relation more explicit? S16. what is the "flexibleVersions" field in the RPC protocols? I assume this is related to versioning compatibility but it's not quite clear from the KIP how this is to be used. For example if we modify the protocol by adding new fields at the end, we'd bump the version but should the flexibleVersions field change as well? On Wed, Aug 28, 2024 at 5:06 AM Lucas Brutschy <lbruts...@confluent.io.invalid> wrote: > Hi Sophie, > > Thanks for your detailed comments - much appreciated! I think you read > a version of the KIP that did not yet include the admin command-line > tool and the Admin API extensions, so some of the comments are already > addressed in the KIP. > > S1. StreamsGroupHeartbeat and StreamsGroupInitialize are called in the > consumer background thread. Note that in the new consumer threading > model, all RPCs are run by the background thread. Check out this: > > https://cwiki.apache.org/confluence/display/KAFKA/Consumer+threading+refactor+design > for more information. Both our RPCs are just part of the group > membership management and do not need to be invoked explicitly by the > application thread. You could imagine calling the initialize RPC > explicitly (to implement explicit initialization), but this would > still mean sending an event to the background thread, and the > background thread in turn invokes the RPC. However, explicit > initialization would require some additional public interfaces that we > are not including in this KIP. StreamsGroupDescribe is called by the > AdminClient, and used by the command-line tool > kafka-streams-groups.sh. > > S2. I think the max.warmup.replicas=100 suggested by Nick was intended > as the upper limit for setting the group configuration on the broker. > Just to make sure that this was not a misunderstanding. By default, > values above 100 should be rejected when setting a specific value for > group. Are you suggesting 20 or 30 for the default value for groups, > or the default upper limit for the group configuration? > > S3. Yes, it's supposed to be used like SHUTDOWN_APPLICATION. The > MemberEpoch=-1 is a left-over from an earlier discussion. It means > that the member is leaving the group, so the intention was that the > member must leave the group when it asks the other members to shut > down. We later reconsidered this and decided that all applications > should just react to the shutdown application signal that is returned > by the broker, so the client first sets the ShutdownApplication and > later leaves the group. Thanks for spotting this, I removed it. > > S4. Not sure if this refers to the latest version of the KIP. We added > an extension of the admin API and a kafka-streams-groups.sh > command-line tool to the KIP already. > > S5. All RPCs for dealing with offsets will keep working with streams > groups. The extension of the admin API is rather cosmetic, since the > method names use "consumer group". The RPCs, however, are generic and > do not need to be changed. > > S6. Yes, you can use the DeleteGroup RPC with any group ID, whether > streams group or not. > > S7. See the admin API section. > > S8. I guess for both A and B, I am not sure what you are suggesting. > Do you want to make the broker-side topology immutable and not include > any information about the topology, like the topology ID in the RPC? > It would seem that this would be a massive food-gun for people, if > they start changing their topology and don't notice that the broker is > looking at a completely different version of the topology. Or did you > mean that there is some kind of topology ID, so that at least we can > detect inconsistencies between broker and client-side topologies, and > we fence out any member with an incorrect topology ID? Then we seem to > end up with mostly the same RPCs and the same questions (how is the > topology ID generated?). I agree that the latter could be an option. > See summary at the end of this message. > > S9. If we flat out refuse topology updates, agree - we cannot let the > application crash on minor changes to the topology. However, if we > allow topology updates as described in the KIP, there are only upsides > to making the topology ID more sensitive. All it will cause is that a > client will have to resend a `StreamsGroupInitialize`, and during the > rolling bounce, older clients will not get tasks assigned. > > S10. The intention in the KIP is really just this - old clients can > only retain tasks, but not get new ones. If the topology has > sub-topologies, these will initially be assigned to the new clients, > which also have the new topology. You are right that rolling an > application where the old structure and the new structure are > incompatible (e.g. different subtopologies access the same partition) > will cause problems. But this will also cause problems in the current > protocol, so I'm not sure, if it's strictly a regression, it's just > unsupported (which we can only make the best effort to detect). > > In summary, for the topology ID discussion, I mainly see two options: > 1) stick with the current KIP proposal > 2) define the topology ID as the hash of the StreamsGroupInitialize > topology metadata only, as you suggested, and fence out members with > an incompatible topology ID. > I think doing 2 is also a good option, but in the end it comes down to > how important we believe topology updates (where the set of > sub-topologies or set of internal topics are affected) to be. The main > goal is to make the new protocol a drop-in replacement for the old > protocol, and at least define the RPCs in a way that we won't need > another KIP which bumps the RPC version to make the protocol > production-ready. Adding more command-line tools, or public interfaces > seems less critical as it doesn't change the protocol and can be done > easily later on. The main question becomes - is the protocol > production-ready and sufficient to replace the classic protocol if we > go with 2? > > Cheers, > Lucas >