Hi Andrew, thanks for getting the discussion going! Here are my responses.
AS1: Good point, done. AS2: We were planning to add more administrative tools to the interface in a follow-up KIP, to not make this KIP too large. If people think that it would help to understand the overall picture if we already add something like `kafka-streams-groups.sh`, we will do that. I also agree that we should address how this relates to KIP-1043, we'll add it. AS3: Good idea, that's more consistent with `assigning` and `reconciling` etc. AS4: Thanks, Fixed. AS5: Good catch. This was supposed to mean that we require CREATE on cluster or CREATE on all topics, not both. Fixed. AS6: Thanks, Fixed. AS7. Thanks, Fixed. AS8: I think this works a bit different in this KIP than in consumer groups. KIP-848 lets the members vote for a preferred assignor, and the broker-side assignor is picked by majority vote. The `group.consumer.assignors` specifies the list of assignors that are supported on the broker, and is configurable because the interface is pluggable. In this KIP, the task assignor is not voted on by members but configured on the broker-side. `group.streams.assignor` is used for this, and uses a specific name. If we'll make the task assignor pluggable on the broker-side, we'd introduce a separate config `group.streams.assignors`, which would indeed be a list of class names. I think there is no conflict here, the two configurations serve different purposes. The only gripe I'd have here is naming as `group.streams.assignor` and `group.streams.assignors` would be a bit similar, but I cannot really think of a better name for `group.streams.assignor`, so I'd probably rather introduce `group.streams.assignors` as `group.streams.possible_assignors` or something like that. AS9: I added explanations for the various record types. Apart from the new topology record, and the partition metadata (which is based on the topology and can only be created once we have a topology initialized) the lifecycle for the records is basically identical as in KIP-848. AS10: In the consumer offset topic, the version in the key is used to differentiate different schema types with the same content. So the keys are not versioned, but the version field is "abused" as a type tag. This is the same in KIP-848, we followed it for consistency. Cheers, Lucas On Thu, Jul 18, 2024 at 1:27 PM Andrew Schofield <andrew_schofi...@live.com> wrote: > > Hi Lucas and Bruno, > > Thanks for the great KIP. > > I've read through the document and have some initial comments. > > AS1: I suppose that there is a new o.a.k.common.GroupType.STREAMS enumeration > constant. This is a change to the public interface and should be called out. > > AS2: Since streams groups are no longer consumer groups, how does the user > manipulate them, observe lag and so on? Will you add `kafka-streams-groups.sh` > or extend `kafka-streams-application-reset.sh`? Of course, KIP-1043 can easily > be extended to support streams groups, but that only lets the user see the > groups, not manipulate them. > > AS3: I wonder whether the streams group state of UNINITIALIZED would be > better expressed as INITIALIZING. > > AS4: In StreamsGroupInitializeRequest, Topology[].SourceTopicRegex should > be nullable. > > AS5: Why does StreamsGroupInitialize require CREATE permission on the > cluster resource? I imagine that this is one of the ways that the request > might > be granted permission to create the StateChangelogTopics and > RepartitionSourceTopics, but if it is granted permission to create those > topics > with specific ACLs, would CREATE on the cluster resource still be required? > > AS6: StreamsGroupInitialize can also fail with TOPIC_AUTHORIZATION_FAILED > and (subject to AS5) CLUSTER_AUTHORIZATION_FAILED. > > AS7: A tiny nit. You've used TopologyID (capitals) in > StreamsGroupHeartbeatRequest > and a few others, but in all other cases the fields which are ids are spelled > Id. > I suggest TopologyId. > > Also, "interal" is probably meant to be "interval”. > > AS8: For consumer groups, the `group.consumer.assignors` configuration is a > list of class names. The assignors do have names too, but the configuration > which > enables them is in terms of class names. I wonder whether the broker’s > group.streams.assignor could actually be `group.streams.assignors` and > specified > as a list of the class names of the supplied assignors. I know you're not > supporting > other assignors yet, but when you do, I expect you would prefer to have used > class > names from the start. > > The use of assignor names in the other places looks good to me. > > AS9: I'd find it really helpful to have a bit of a description about the > purpose and > lifecycle of the 9 record types you've introduced on the __consumer_offsets > topic. > I did a cursory review but without really understanding what's written when, > I can't do a thorough job of reviewing. > > AS10: In the definitions of the record keys, such as > StreamsGroupCurrentMemberAssignmentKey, the versions of the fields must > match the versions of the types. > > Thanks, > Andrew > > > On 12 Jul 2024, at 09:04, Lucas Brutschy <lbruts...@confluent.io.INVALID> > > wrote: > > > > Hi all, > > > > I would like to start a discussion thread on KIP-1071: Streams > > Rebalance Protocol. With this KIP, we aim to bring the principles laid > > down by KIP-848 to Kafka Streams, to make rebalances more reliable and > > scalable, and make Kafka Streams overall easier to deploy and operate. > > The KIP proposed moving the assignment logic to the broker, and > > introducing a dedicated group type and dedicated RPCs for Kafka > > Streams. > > > > The KIP is here: > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1071%3A+Streams+Rebalance+Protocol > > > > This is joint work with Bruno Cadonna. > > > > Please take a look and let us know what you think. > > > > Best, > > Lucas >