Hi Lianet, I have a comment on your comment. I think that’s allowed. LM3. I quite like the idea of having an INVALID_GROUP_TYPE error code for RPCs which were applied to a group which turned out to be the wrong type. We might even be able to use INCONSISTENT_GROUP_PROTOCOL for this purpose.
However, there would be quite a lot of RPC changes to achieve this. For example, OffsetDelete is expecting a consumer group so I guess there would be a new version with the new error code, and so on for the other RPCs. If we are going to do this, perhaps it would be best included in KIP-1043. Feel free to comment in that discussion thread if you agree. Thanks, Andrew > On 19 Jul 2024, at 20:53, Lianet M. <liane...@gmail.com> wrote: > > Hi Lucas/Bruno, thanks for the great KIP! First comments: > > LM1. Related to where the KIP says: *“Group ID, member ID, member epoch > are sent with each heartbeat request. Any other information that has not > changed since the last heartbeat can be omitted.”. *I expect all the other > info also needs to be sent whenever a full heartbeat is required (even if > it didn’t change from the last heartbeat), ex. on fencing scenarios, > correct? > > LM2. For consumer groups we always send the groupInstanceId (if any) as > part of every heartbeat, along with memberId, epoch and groupId. Should we > consider that too here? > > LM3. We’re proposing returning a GROUP_ID_NOT_FOUND error in response to > the stream-specific RPCs if the groupId is associated with a group type > that is not streams (ie. consumer group or share group). I wonder if at > this point, where we're getting several new group types added, each with > RPCs that are supposed to include groupId of a certain type, we should be > more explicit about this situation. Maybe a kind of INVALID_GROUP_TYPE > (group exists but not with a valid type for this RPC) vs a > GROUP_ID_NOT_FOUND (group does not exist). Those errors would be > consistently used across consumer, share, and streams RPCs whenever the > group id is not of the expected type. > This is truly not specific to this KIP, and should be addressed with all > group types and their RPCs in mind. I just wanted to bring out my concern > and get thoughts around it. > > LM4. On a related note, StreamsGroupDescribe returns INVALID_REQUEST if > groupId is empty. There is already an INVALID_GROUP_ID error, that seems > more specific to this situation. Error handling of specific errors would > definitely be easier than having to deal with a generic INVALID_REQUEST > (and probably its custom message). I know that for KIP-848 we have > INVALID_REQUEST for similar situations, so if ever we take down this path > we should review it there too for consistency. Thoughts? > > LM5. The dependency between the StreamsGroupHeartbeat RPC and the > StreamsGroupInitialize RPC is one-way only right? HB requires a previous > StreamsGroupInitialize request, but StreamsGroupInitialize processing is > totally independent of heartbeats (and could perfectly be processed without > a previous HB, even though the client implementation we’re proposing won’t > go down that path). Is my understanding correct? Just to double check, > seems sensible like that at the protocol level. > > LM6. With KIP-848, there is an important improvement that brings a > difference in behaviour around the static membership: with the classic > protocol, if a static member joins with a group instance already in use, it > makes the initial member fail with a FENCED_INSTANCED_ID exception, vs. > with the new consumer group protocol, the second member trying to join > fails with an UNRELEASED_INSTANCE_ID. Does this change need to be > considered in any way for the streams app? (I'm not familiar with KS yet, > but thought it was worth asking. If it doesn't affect in any way, still > maybe helpful to call it out on a section for static membership) > > LM7. Regarding the admin tool to manage streams groups. We can discuss > whether to have it here or separately, but I think we should aim for some > basic admin capabilities from the start, mainly because I believe it will > be very helpful/needed in practice during the impl of the KIP. From > experience with KIP-848, we felt a bit blindfolded in the initial phase > where we still didn't have kafka-consumer-groups dealing with the new > groups (and then it was very helpful and used when we were able to easily > inspect them from the console) > > LM8. nit: the links the KIP-848 are not quite right (pointing to an > unrelated “Future work section” at the end of KIP-848) > > Thanks! > > Lianet > > > On Fri, Jul 19, 2024 at 11:13 AM Lucas Brutschy > <lbruts...@confluent.io.invalid> wrote: > >> Hi Andrew, >> >> AS2: I added a note for now. If others feel strongly about it, we can >> still add more administrative tools to the KIP - it should not change >> the overall story significantly. >> >> AS8: "streams.group.assignor.name" sounds good to me to distinguish >> the config from class names. Not sure if I like the "default". To be >> consistent, we'd then have to call it >> `group.streams.default.session.timeout.ms` as well. I only added the >> `.name` on both broker and group level for now. >> >> AS10: Ah, I misread your comment, now I know what you meant. Good >> point, fixed (by Bruno). >> >> Cheers, >> Lucas >> >> On Fri, Jul 19, 2024 at 4:44 PM Andrew Schofield >> <andrew_schofi...@live.com> wrote: >>> >>> Hi Lucas, >>> I see that I hit send too quickly. One more comment: >>> >>> AS2: I think stating that there will be a `kafka-streams-group.sh` in a >>> future KIP is fine to keep this KIP focused. Personally, I would probably >>> put all of the gory details in this KIP, but then it’s not my KIP. A >> future >>> pointer is fine too. >>> >>> Thanks, >>> Andrew >>> >>> >>>> On 19 Jul 2024, at 13:46, Lucas Brutschy <lbruts...@confluent.io.INVALID> >> wrote: >>>> >>>> Hi Andrew, >>>> >>>> thanks for getting the discussion going! Here are my responses. >>>> >>>> AS1: Good point, done. >>>> >>>> AS2: We were planning to add more administrative tools to the >>>> interface in a follow-up KIP, to not make this KIP too large. If >>>> people think that it would help to understand the overall picture if >>>> we already add something like `kafka-streams-groups.sh`, we will do >>>> that. I also agree that we should address how this relates to >>>> KIP-1043, we'll add it. >>>> >>>> AS3: Good idea, that's more consistent with `assigning` and >> `reconciling` etc. >>>> >>>> AS4: Thanks, Fixed. >>>> >>>> AS5: Good catch. This was supposed to mean that we require CREATE on >>>> cluster or CREATE on all topics, not both. Fixed. >>>> >>>> AS6: Thanks, Fixed. >>>> >>>> AS7. Thanks, Fixed. >>>> >>>> AS8: I think this works a bit different in this KIP than in consumer >>>> groups. KIP-848 lets the members vote for a preferred assignor, and >>>> the broker-side assignor is picked by majority vote. The >>>> `group.consumer.assignors` specifies the list of assignors that are >>>> supported on the broker, and is configurable because the interface is >>>> pluggable. In this KIP, the task assignor is not voted on by members >>>> but configured on the broker-side. `group.streams.assignor` is used >>>> for this, and uses a specific name. If we'll make the task assignor >>>> pluggable on the broker-side, we'd introduce a separate config >>>> `group.streams.assignors`, which would indeed be a list of class >>>> names. I think there is no conflict here, the two configurations serve >>>> different purposes. The only gripe I'd have here is naming as >>>> `group.streams.assignor` and `group.streams.assignors` would be a bit >>>> similar, but I cannot really think of a better name for >>>> `group.streams.assignor`, so I'd probably rather introduce >>>> `group.streams.assignors` as `group.streams.possible_assignors` or >>>> something like that. >>>> >>>> AS9: I added explanations for the various record types. Apart from the >>>> new topology record, and the partition metadata (which is based on the >>>> topology and can only be created once we have a topology initialized) >>>> the lifecycle for the records is basically identical as in KIP-848. >>>> >>>> AS10: In the consumer offset topic, the version in the key is used to >>>> differentiate different schema types with the same content. So the >>>> keys are not versioned, but the version field is "abused" as a type >>>> tag. This is the same in KIP-848, we followed it for consistency. >>>> >>>> Cheers, >>>> Lucas >>>> >>>> >>>> On Thu, Jul 18, 2024 at 1:27 PM Andrew Schofield >>>> <andrew_schofi...@live.com> wrote: >>>>> >>>>> Hi Lucas and Bruno, >>>>> >>>>> Thanks for the great KIP. >>>>> >>>>> I've read through the document and have some initial comments. >>>>> >>>>> AS1: I suppose that there is a new o.a.k.common.GroupType.STREAMS >> enumeration >>>>> constant. This is a change to the public interface and should be >> called out. >>>>> >>>>> AS2: Since streams groups are no longer consumer groups, how does the >> user >>>>> manipulate them, observe lag and so on? Will you add >> `kafka-streams-groups.sh` >>>>> or extend `kafka-streams-application-reset.sh`? Of course, KIP-1043 >> can easily >>>>> be extended to support streams groups, but that only lets the user >> see the >>>>> groups, not manipulate them. >>>>> >>>>> AS3: I wonder whether the streams group state of UNINITIALIZED would >> be >>>>> better expressed as INITIALIZING. >>>>> >>>>> AS4: In StreamsGroupInitializeRequest, Topology[].SourceTopicRegex >> should >>>>> be nullable. >>>>> >>>>> AS5: Why does StreamsGroupInitialize require CREATE permission on the >>>>> cluster resource? I imagine that this is one of the ways that the >> request might >>>>> be granted permission to create the StateChangelogTopics and >>>>> RepartitionSourceTopics, but if it is granted permission to create >> those topics >>>>> with specific ACLs, would CREATE on the cluster resource still be >> required? >>>>> >>>>> AS6: StreamsGroupInitialize can also fail with >> TOPIC_AUTHORIZATION_FAILED >>>>> and (subject to AS5) CLUSTER_AUTHORIZATION_FAILED. >>>>> >>>>> AS7: A tiny nit. You've used TopologyID (capitals) in >> StreamsGroupHeartbeatRequest >>>>> and a few others, but in all other cases the fields which are ids are >> spelled Id. >>>>> I suggest TopologyId. >>>>> >>>>> Also, "interal" is probably meant to be "interval”. >>>>> >>>>> AS8: For consumer groups, the `group.consumer.assignors` >> configuration is a >>>>> list of class names. The assignors do have names too, but the >> configuration which >>>>> enables them is in terms of class names. I wonder whether the broker’s >>>>> group.streams.assignor could actually be `group.streams.assignors` >> and specified >>>>> as a list of the class names of the supplied assignors. I know you're >> not supporting >>>>> other assignors yet, but when you do, I expect you would prefer to >> have used class >>>>> names from the start. >>>>> >>>>> The use of assignor names in the other places looks good to me. >>>>> >>>>> AS9: I'd find it really helpful to have a bit of a description about >> the purpose and >>>>> lifecycle of the 9 record types you've introduced on the >> __consumer_offsets topic. >>>>> I did a cursory review but without really understanding what's >> written when, >>>>> I can't do a thorough job of reviewing. >>>>> >>>>> AS10: In the definitions of the record keys, such as >>>>> StreamsGroupCurrentMemberAssignmentKey, the versions of the fields >> must >>>>> match the versions of the types. >>>>> >>>>> Thanks, >>>>> Andrew >>>>> >>>>>> On 12 Jul 2024, at 09:04, Lucas Brutschy <lbruts...@confluent.io.INVALID> >> wrote: >>>>>> >>>>>> Hi all, >>>>>> >>>>>> I would like to start a discussion thread on KIP-1071: Streams >>>>>> Rebalance Protocol. With this KIP, we aim to bring the principles >> laid >>>>>> down by KIP-848 to Kafka Streams, to make rebalances more reliable >> and >>>>>> scalable, and make Kafka Streams overall easier to deploy and >> operate. >>>>>> The KIP proposed moving the assignment logic to the broker, and >>>>>> introducing a dedicated group type and dedicated RPCs for Kafka >>>>>> Streams. >>>>>> >>>>>> The KIP is here: >>>>>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1071%3A+Streams+Rebalance+Protocol >>>>>> >>>>>> This is joint work with Bruno Cadonna. >>>>>> >>>>>> Please take a look and let us know what you think. >>>>>> >>>>>> Best, >>>>>> Lucas >>>>> >>> >>