Hi Lucas, Thanks for the KIP update. I think that `kafka-streams-groups.sh` looks like a good equivalent to the tools for the other types of groups.
AS12: In kafka-streams-groups.sh, the description for the --input-topics option seems insufficient. Why is an input topic specified with this option different than a topic specified with --topic? Why is It --input-topics rather than --input-topic? Which action of this tool does this option apply to? AS13: Similarly, for --internal-topics, which action of the tool does it apply to? I suppose it’s --delete, but it’s not clear to me. Thanks, Andrew > On 11 Aug 2024, at 12:10, Lucas Brutschy <lbruts...@confluent.io.INVALID> > wrote: > > Hi Andrew/Lianet, > > I have added an administrative command-line tool (replacing > `kafka-streams-application-reset`) and extensions of the Admin API for > listing, deleting, describing groups and listing, altering and > deleting offsets for streams groups. No new RPCs have to be added, > however, we duplicate some of the API in the admin client that exist > for consumer groups. It seems to me cleaner to duplicate some > code/interface here, instead of using "consumer group" APIs for > streams groups, or renaming existing APIs that use "consumerGroup" in > the name to something more generic (which wouldn't cover share > groups). > > I think for now, all comments are addressed. > > Cheers, > Lucas > > On Tue, Aug 6, 2024 at 3:19 PM Lucas Brutschy <lbruts...@confluent.io> wrote: >> >> Hi Lianet and Andrew, >> >> LM1/LM2: You are right. The idea is to omit fields exactly in the same >> situations as in KIP-848. In the KIP, I stuck with how the behavior >> was defined in KIP-848 (e.g. KIP-848 defined that that instance ID >> will be omitted if it did not change since the last heartbeat). But >> you are correct that the implementation handles these details slightly >> differently. I updated the KIP to match more closely the behavior of >> the KIP-848 implementation. >> >> LM9: Yes, there are several options to do this. The idea is to have >> only one client initialize the topology, not all clients. It seems >> easier to understand on the protocol level (otherwise we'd have N >> topology initializations racing with a hard-to-determine winner). We >> also expect the payload of the request to grow in the future and want >> to avoid the overhead of having all clients sending the topology at >> the same time. But initializing the group could take some time - we >> have to create internal topics, and maybe a client is malfunctioning >> and the initialization has to be retried. It seemed a bit confusing to >> return errors to all other clients that are trying to join the group >> during that time - as if there was a problem with joining the group / >> the contents of the heartbeat. It seems cleaner to me to let all >> clients successfully join the group and heartbeat, but remain in an >> INITIALIZING state which does not yet assign any tasks. Does that make >> sense to you? You are right that returning a retriable error and >> having all clients retry until the group is initialized would also >> work, it just doesn't model well that "everything is going according >> to plan". >> As for the order of the calls - yes, I think it is fine to allow an >> Initialize RPC before the first heartbeat for supporting future admin >> tools. I made this change throughout the KIP, thanks! >> >> AS11: Yes, your understanding is correct. The number of tasks for one >> subtopology is the maximum number of partitions in any of the matched >> topics. What will happen in Kafka Streams is that the partitions of >> the matched topics will effectively be merged during stream >> processing, so in your example, subtopology:0 would consume from AB:0 >> and AC:0. >> >> Cheers, >> Lucas >> >> On Fri, Aug 2, 2024 at 9:47 PM Lianet M. <liane...@gmail.com> wrote: >>> >>> Hi Bruno, answering your questions: >>> >>> About the full heartbeat (LM1): I just wanted to confirm that you'll be >>> sending full HBs in case of errors in general. It's not clear from the KIP, >>> since it referred to sending Id/epoch and whatever had changed since the >>> last HB only. Sending full HB on error is key to ensure fresh rejoins after >>> fencing for instance, and retries with all relevant info. >>> >>> About the instanceId (LM2): The instanceId is needed on every HB to be able >>> to identify a member using one that is already taken. On every HB, the >>> broker uses the instance id (if any) to retrieve the member ID associated >>> with it, and checks it against the memberId received in the HB >>> (throwing UnreleasedInstance exception if needed). So similar to my >>> previous point, just wanted to confirm that we are considering that here >>> too. >>> >>> Now some other thoughts: >>> >>> LM9: Definitely interesting imo if we can avoid the dependency between the >>> StreamsGroupInitialize and the StreamsGroupHeartbeat. I totally get that >>> the initial client implementation will do a HB first, and that's fine, but >>> not having the flow enforced at the protocol level would allow for further >>> improvement in the future (that initialize via admin idea you mentioned, >>> for instance). Actually, I may be missing something about the HB, but if we >>> are at the point where HB requires that the topology has been initialized, >>> and the topology init requires the group, why is it the heartbeat RPC the >>> one responsible for the group creation? (vs. StreamsGroupInitialize creates >>> group if needed + HB just fails if topology not initialized) >>> >>> Thanks! >>> >>> Lianet >>> (I didn't miss your answer on my INVALID_GROUP_TYPE proposal, just still >>> thinking about it in sync with the same discussion we're having on the >>> KIP-1043 thread...I'll come back on that) >>> >>> On Thu, Aug 1, 2024 at 10:55 AM Andrew Schofield <andrew_schofi...@live.com> >>> wrote: >>> >>>> Hi Bruno, >>>> Thanks for adding the detail on the schemas on records written to >>>> __consumer_offsets. >>>> I’ve reviewed them in detail and they look good to me. I have one naive >>>> question. >>>> >>>> AS11: I notice that an assignment is essentially a set of partition >>>> indices for >>>> subtopologies. Since a subtopology can be defined by a source topic regex, >>>> does >>>> this mean that an assignment gives the same set of partition indices for >>>> all topics >>>> which happen to match the regex? So, a subtopology reading from A* that >>>> matches >>>> AB and AC would give the same set of partitions to each task for both >>>> topics, and >>>> is not able to give AB:0 to one task and AC:0 to a different task. Is this >>>> correct? >>>> >>>> Thanks, >>>> Andrew >>>> >>>>> On 23 Jul 2024, at 16:16, Bruno Cadonna <cado...@apache.org> wrote: >>>>> >>>>> Hi Lianet, >>>>> >>>>> Thanks for the review! >>>>> >>>>> Here my answers: >>>>> >>>>> LM1. Is your question whether we need to send a full heartbeat each time >>>> the member re-joins the group even if the information in the RPC did not >>>> change since the last heartbeat? >>>>> >>>>> LM2. Is the reason for sending the instance ID each time that a member >>>> could shutdown, change the instance ID and then start and heartbeat again, >>>> but the group coordinator would never notice that the instance ID changed? >>>>> >>>>> LM3. I see your point. I am wondering whether this additional >>>> information is worth the dependency between the group types. To return >>>> INVALID_GROUP_TYPE, the group coordinator needs to know that a group ID >>>> exists with a different group type. With a group coordinator as we have it >>>> now in Apache Kafka that manages all group types, that is not a big deal, >>>> but imagine if we (or some implementation of the Apache Kafka protocol) >>>> decide to have a separate group coordinator for each group type. >>>>> >>>>> LM4. Using INVALID_GROUP_ID if the group ID is empty makes sense to me. >>>> I going to change that. >>>>> >>>>> LM5. I think there is a dependency from the StreamsGroupInitialize RPC >>>> to the heartbeat. The group must exist when the initialize RPC is received >>>> by the group coordinator. The group is created by the heartbeat RPC. I >>>> would be in favor of making the initialize RPC independent from the >>>> heartbeat RPC. That would allow to initialize a streams group explicitly >>>> with an admin tool. >>>>> >>>>> LM6. I think it affects streams and streams should behave as the >>>> consumer group. >>>>> >>>>> LM7. Good point that we will consider. >>>>> >>>>> LM8. Fixed! Thanks! >>>>> >>>>> >>>>> Best, >>>>> Bruno >>>>> >>>>> >>>>> >>>>> >>>>> On 7/19/24 9:53 PM, Lianet M. wrote: >>>>>> Hi Lucas/Bruno, thanks for the great KIP! First comments: >>>>>> LM1. Related to where the KIP says: *“Group ID, member ID, member epoch >>>>>> are sent with each heartbeat request. Any other information that has not >>>>>> changed since the last heartbeat can be omitted.”. *I expect all the >>>> other >>>>>> info also needs to be sent whenever a full heartbeat is required (even >>>> if >>>>>> it didn’t change from the last heartbeat), ex. on fencing scenarios, >>>>>> correct? >>>>>> LM2. For consumer groups we always send the groupInstanceId (if any) as >>>>>> part of every heartbeat, along with memberId, epoch and groupId. Should >>>> we >>>>>> consider that too here? >>>>>> LM3. We’re proposing returning a GROUP_ID_NOT_FOUND error in response to >>>>>> the stream-specific RPCs if the groupId is associated with a group type >>>>>> that is not streams (ie. consumer group or share group). I wonder if at >>>>>> this point, where we're getting several new group types added, each with >>>>>> RPCs that are supposed to include groupId of a certain type, we should >>>> be >>>>>> more explicit about this situation. Maybe a kind of INVALID_GROUP_TYPE >>>>>> (group exists but not with a valid type for this RPC) vs a >>>>>> GROUP_ID_NOT_FOUND (group does not exist). Those errors would be >>>>>> consistently used across consumer, share, and streams RPCs whenever the >>>>>> group id is not of the expected type. >>>>>> This is truly not specific to this KIP, and should be addressed with all >>>>>> group types and their RPCs in mind. I just wanted to bring out my >>>> concern >>>>>> and get thoughts around it. >>>>>> LM4. On a related note, StreamsGroupDescribe returns INVALID_REQUEST if >>>>>> groupId is empty. There is already an INVALID_GROUP_ID error, that seems >>>>>> more specific to this situation. Error handling of specific errors would >>>>>> definitely be easier than having to deal with a generic INVALID_REQUEST >>>>>> (and probably its custom message). I know that for KIP-848 we have >>>>>> INVALID_REQUEST for similar situations, so if ever we take down this >>>> path >>>>>> we should review it there too for consistency. Thoughts? >>>>>> LM5. The dependency between the StreamsGroupHeartbeat RPC and the >>>>>> StreamsGroupInitialize RPC is one-way only right? HB requires a previous >>>>>> StreamsGroupInitialize request, but StreamsGroupInitialize processing is >>>>>> totally independent of heartbeats (and could perfectly be processed >>>> without >>>>>> a previous HB, even though the client implementation we’re proposing >>>> won’t >>>>>> go down that path). Is my understanding correct? Just to double check, >>>>>> seems sensible like that at the protocol level. >>>>>> LM6. With KIP-848, there is an important improvement that brings a >>>>>> difference in behaviour around the static membership: with the classic >>>>>> protocol, if a static member joins with a group instance already in >>>> use, it >>>>>> makes the initial member fail with a FENCED_INSTANCED_ID exception, vs. >>>>>> with the new consumer group protocol, the second member trying to join >>>>>> fails with an UNRELEASED_INSTANCE_ID. Does this change need to be >>>>>> considered in any way for the streams app? (I'm not familiar with KS >>>> yet, >>>>>> but thought it was worth asking. If it doesn't affect in any way, still >>>>>> maybe helpful to call it out on a section for static membership) >>>>>> LM7. Regarding the admin tool to manage streams groups. We can discuss >>>>>> whether to have it here or separately, but I think we should aim for >>>> some >>>>>> basic admin capabilities from the start, mainly because I believe it >>>> will >>>>>> be very helpful/needed in practice during the impl of the KIP. From >>>>>> experience with KIP-848, we felt a bit blindfolded in the initial phase >>>>>> where we still didn't have kafka-consumer-groups dealing with the new >>>>>> groups (and then it was very helpful and used when we were able to >>>> easily >>>>>> inspect them from the console) >>>>>> LM8. nit: the links the KIP-848 are not quite right (pointing to an >>>>>> unrelated “Future work section” at the end of KIP-848) >>>>>> Thanks! >>>>>> Lianet >>>>>> On Fri, Jul 19, 2024 at 11:13 AM Lucas Brutschy >>>>>> <lbruts...@confluent.io.invalid> wrote: >>>>>>> Hi Andrew, >>>>>>> >>>>>>> AS2: I added a note for now. If others feel strongly about it, we can >>>>>>> still add more administrative tools to the KIP - it should not change >>>>>>> the overall story significantly. >>>>>>> >>>>>>> AS8: "streams.group.assignor.name" sounds good to me to distinguish >>>>>>> the config from class names. Not sure if I like the "default". To be >>>>>>> consistent, we'd then have to call it >>>>>>> `group.streams.default.session.timeout.ms` as well. I only added the >>>>>>> `.name` on both broker and group level for now. >>>>>>> >>>>>>> AS10: Ah, I misread your comment, now I know what you meant. Good >>>>>>> point, fixed (by Bruno). >>>>>>> >>>>>>> Cheers, >>>>>>> Lucas >>>>>>> >>>>>>> On Fri, Jul 19, 2024 at 4:44 PM Andrew Schofield >>>>>>> <andrew_schofi...@live.com> wrote: >>>>>>>> >>>>>>>> Hi Lucas, >>>>>>>> I see that I hit send too quickly. One more comment: >>>>>>>> >>>>>>>> AS2: I think stating that there will be a `kafka-streams-group.sh` in >>>> a >>>>>>>> future KIP is fine to keep this KIP focused. Personally, I would >>>> probably >>>>>>>> put all of the gory details in this KIP, but then it’s not my KIP. A >>>>>>> future >>>>>>>> pointer is fine too. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Andrew >>>>>>>> >>>>>>>> >>>>>>>>> On 19 Jul 2024, at 13:46, Lucas Brutschy <lbruts...@confluent.io >>>> .INVALID> >>>>>>> wrote: >>>>>>>>> >>>>>>>>> Hi Andrew, >>>>>>>>> >>>>>>>>> thanks for getting the discussion going! Here are my responses. >>>>>>>>> >>>>>>>>> AS1: Good point, done. >>>>>>>>> >>>>>>>>> AS2: We were planning to add more administrative tools to the >>>>>>>>> interface in a follow-up KIP, to not make this KIP too large. If >>>>>>>>> people think that it would help to understand the overall picture if >>>>>>>>> we already add something like `kafka-streams-groups.sh`, we will do >>>>>>>>> that. I also agree that we should address how this relates to >>>>>>>>> KIP-1043, we'll add it. >>>>>>>>> >>>>>>>>> AS3: Good idea, that's more consistent with `assigning` and >>>>>>> `reconciling` etc. >>>>>>>>> >>>>>>>>> AS4: Thanks, Fixed. >>>>>>>>> >>>>>>>>> AS5: Good catch. This was supposed to mean that we require CREATE on >>>>>>>>> cluster or CREATE on all topics, not both. Fixed. >>>>>>>>> >>>>>>>>> AS6: Thanks, Fixed. >>>>>>>>> >>>>>>>>> AS7. Thanks, Fixed. >>>>>>>>> >>>>>>>>> AS8: I think this works a bit different in this KIP than in consumer >>>>>>>>> groups. KIP-848 lets the members vote for a preferred assignor, and >>>>>>>>> the broker-side assignor is picked by majority vote. The >>>>>>>>> `group.consumer.assignors` specifies the list of assignors that are >>>>>>>>> supported on the broker, and is configurable because the interface is >>>>>>>>> pluggable. In this KIP, the task assignor is not voted on by members >>>>>>>>> but configured on the broker-side. `group.streams.assignor` is used >>>>>>>>> for this, and uses a specific name. If we'll make the task assignor >>>>>>>>> pluggable on the broker-side, we'd introduce a separate config >>>>>>>>> `group.streams.assignors`, which would indeed be a list of class >>>>>>>>> names. I think there is no conflict here, the two configurations >>>> serve >>>>>>>>> different purposes. The only gripe I'd have here is naming as >>>>>>>>> `group.streams.assignor` and `group.streams.assignors` would be a bit >>>>>>>>> similar, but I cannot really think of a better name for >>>>>>>>> `group.streams.assignor`, so I'd probably rather introduce >>>>>>>>> `group.streams.assignors` as `group.streams.possible_assignors` or >>>>>>>>> something like that. >>>>>>>>> >>>>>>>>> AS9: I added explanations for the various record types. Apart from >>>> the >>>>>>>>> new topology record, and the partition metadata (which is based on >>>> the >>>>>>>>> topology and can only be created once we have a topology initialized) >>>>>>>>> the lifecycle for the records is basically identical as in KIP-848. >>>>>>>>> >>>>>>>>> AS10: In the consumer offset topic, the version in the key is used to >>>>>>>>> differentiate different schema types with the same content. So the >>>>>>>>> keys are not versioned, but the version field is "abused" as a type >>>>>>>>> tag. This is the same in KIP-848, we followed it for consistency. >>>>>>>>> >>>>>>>>> Cheers, >>>>>>>>> Lucas >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thu, Jul 18, 2024 at 1:27 PM Andrew Schofield >>>>>>>>> <andrew_schofi...@live.com> wrote: >>>>>>>>>> >>>>>>>>>> Hi Lucas and Bruno, >>>>>>>>>> >>>>>>>>>> Thanks for the great KIP. >>>>>>>>>> >>>>>>>>>> I've read through the document and have some initial comments. >>>>>>>>>> >>>>>>>>>> AS1: I suppose that there is a new o.a.k.common.GroupType.STREAMS >>>>>>> enumeration >>>>>>>>>> constant. This is a change to the public interface and should be >>>>>>> called out. >>>>>>>>>> >>>>>>>>>> AS2: Since streams groups are no longer consumer groups, how does >>>> the >>>>>>> user >>>>>>>>>> manipulate them, observe lag and so on? Will you add >>>>>>> `kafka-streams-groups.sh` >>>>>>>>>> or extend `kafka-streams-application-reset.sh`? Of course, KIP-1043 >>>>>>> can easily >>>>>>>>>> be extended to support streams groups, but that only lets the user >>>>>>> see the >>>>>>>>>> groups, not manipulate them. >>>>>>>>>> >>>>>>>>>> AS3: I wonder whether the streams group state of UNINITIALIZED would >>>>>>> be >>>>>>>>>> better expressed as INITIALIZING. >>>>>>>>>> >>>>>>>>>> AS4: In StreamsGroupInitializeRequest, Topology[].SourceTopicRegex >>>>>>> should >>>>>>>>>> be nullable. >>>>>>>>>> >>>>>>>>>> AS5: Why does StreamsGroupInitialize require CREATE permission on >>>> the >>>>>>>>>> cluster resource? I imagine that this is one of the ways that the >>>>>>> request might >>>>>>>>>> be granted permission to create the StateChangelogTopics and >>>>>>>>>> RepartitionSourceTopics, but if it is granted permission to create >>>>>>> those topics >>>>>>>>>> with specific ACLs, would CREATE on the cluster resource still be >>>>>>> required? >>>>>>>>>> >>>>>>>>>> AS6: StreamsGroupInitialize can also fail with >>>>>>> TOPIC_AUTHORIZATION_FAILED >>>>>>>>>> and (subject to AS5) CLUSTER_AUTHORIZATION_FAILED. >>>>>>>>>> >>>>>>>>>> AS7: A tiny nit. You've used TopologyID (capitals) in >>>>>>> StreamsGroupHeartbeatRequest >>>>>>>>>> and a few others, but in all other cases the fields which are ids >>>> are >>>>>>> spelled Id. >>>>>>>>>> I suggest TopologyId. >>>>>>>>>> >>>>>>>>>> Also, "interal" is probably meant to be "interval”. >>>>>>>>>> >>>>>>>>>> AS8: For consumer groups, the `group.consumer.assignors` >>>>>>> configuration is a >>>>>>>>>> list of class names. The assignors do have names too, but the >>>>>>> configuration which >>>>>>>>>> enables them is in terms of class names. I wonder whether the >>>> broker’s >>>>>>>>>> group.streams.assignor could actually be `group.streams.assignors` >>>>>>> and specified >>>>>>>>>> as a list of the class names of the supplied assignors. I know >>>> you're >>>>>>> not supporting >>>>>>>>>> other assignors yet, but when you do, I expect you would prefer to >>>>>>> have used class >>>>>>>>>> names from the start. >>>>>>>>>> >>>>>>>>>> The use of assignor names in the other places looks good to me. >>>>>>>>>> >>>>>>>>>> AS9: I'd find it really helpful to have a bit of a description about >>>>>>> the purpose and >>>>>>>>>> lifecycle of the 9 record types you've introduced on the >>>>>>> __consumer_offsets topic. >>>>>>>>>> I did a cursory review but without really understanding what's >>>>>>> written when, >>>>>>>>>> I can't do a thorough job of reviewing. >>>>>>>>>> >>>>>>>>>> AS10: In the definitions of the record keys, such as >>>>>>>>>> StreamsGroupCurrentMemberAssignmentKey, the versions of the fields >>>>>>> must >>>>>>>>>> match the versions of the types. >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> Andrew >>>>>>>>>> >>>>>>>>>>> On 12 Jul 2024, at 09:04, Lucas Brutschy <lbruts...@confluent.io >>>> .INVALID> >>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>> Hi all, >>>>>>>>>>> >>>>>>>>>>> I would like to start a discussion thread on KIP-1071: Streams >>>>>>>>>>> Rebalance Protocol. With this KIP, we aim to bring the principles >>>>>>> laid >>>>>>>>>>> down by KIP-848 to Kafka Streams, to make rebalances more reliable >>>>>>> and >>>>>>>>>>> scalable, and make Kafka Streams overall easier to deploy and >>>>>>> operate. >>>>>>>>>>> The KIP proposed moving the assignment logic to the broker, and >>>>>>>>>>> introducing a dedicated group type and dedicated RPCs for Kafka >>>>>>>>>>> Streams. >>>>>>>>>>> >>>>>>>>>>> The KIP is here: >>>>>>>>>>> >>>>>>> >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1071%3A+Streams+Rebalance+Protocol >>>>>>>>>>> >>>>>>>>>>> This is joint work with Bruno Cadonna. >>>>>>>>>>> >>>>>>>>>>> Please take a look and let us know what you think. >>>>>>>>>>> >>>>>>>>>>> Best, >>>>>>>>>>> Lucas