Hi Andrew, thanks for the comment.
AS12: I clarified the command-line interface. It's supposed to be used with --reset-offsets and --delete-offsets. I removed --topic. AS13: Yes, it's --delete. I clarified the command-line interface. Cheers, Lucas On Tue, Aug 13, 2024 at 4:14 PM Andrew Schofield <andrew_schofi...@live.com> wrote: > > Hi Lucas, > Thanks for the KIP update. > > I think that `kafka-streams-groups.sh` looks like a good equivalent to > the tools for the other types of groups. > > AS12: In kafka-streams-groups.sh, the description for the > --input-topics option seems insufficient. Why is an input topic specified > with this option different than a topic specified with --topic? Why is > It --input-topics rather than --input-topic? Which action of this tool > does this option apply to? > > AS13: Similarly, for --internal-topics, which action of the tool does it > apply to? I suppose it’s --delete, but it’s not clear to me. > > Thanks, > Andrew > > > On 11 Aug 2024, at 12:10, Lucas Brutschy <lbruts...@confluent.io.INVALID> > > wrote: > > > > Hi Andrew/Lianet, > > > > I have added an administrative command-line tool (replacing > > `kafka-streams-application-reset`) and extensions of the Admin API for > > listing, deleting, describing groups and listing, altering and > > deleting offsets for streams groups. No new RPCs have to be added, > > however, we duplicate some of the API in the admin client that exist > > for consumer groups. It seems to me cleaner to duplicate some > > code/interface here, instead of using "consumer group" APIs for > > streams groups, or renaming existing APIs that use "consumerGroup" in > > the name to something more generic (which wouldn't cover share > > groups). > > > > I think for now, all comments are addressed. > > > > Cheers, > > Lucas > > > > On Tue, Aug 6, 2024 at 3:19 PM Lucas Brutschy <lbruts...@confluent.io> > > wrote: > >> > >> Hi Lianet and Andrew, > >> > >> LM1/LM2: You are right. The idea is to omit fields exactly in the same > >> situations as in KIP-848. In the KIP, I stuck with how the behavior > >> was defined in KIP-848 (e.g. KIP-848 defined that that instance ID > >> will be omitted if it did not change since the last heartbeat). But > >> you are correct that the implementation handles these details slightly > >> differently. I updated the KIP to match more closely the behavior of > >> the KIP-848 implementation. > >> > >> LM9: Yes, there are several options to do this. The idea is to have > >> only one client initialize the topology, not all clients. It seems > >> easier to understand on the protocol level (otherwise we'd have N > >> topology initializations racing with a hard-to-determine winner). We > >> also expect the payload of the request to grow in the future and want > >> to avoid the overhead of having all clients sending the topology at > >> the same time. But initializing the group could take some time - we > >> have to create internal topics, and maybe a client is malfunctioning > >> and the initialization has to be retried. It seemed a bit confusing to > >> return errors to all other clients that are trying to join the group > >> during that time - as if there was a problem with joining the group / > >> the contents of the heartbeat. It seems cleaner to me to let all > >> clients successfully join the group and heartbeat, but remain in an > >> INITIALIZING state which does not yet assign any tasks. Does that make > >> sense to you? You are right that returning a retriable error and > >> having all clients retry until the group is initialized would also > >> work, it just doesn't model well that "everything is going according > >> to plan". > >> As for the order of the calls - yes, I think it is fine to allow an > >> Initialize RPC before the first heartbeat for supporting future admin > >> tools. I made this change throughout the KIP, thanks! > >> > >> AS11: Yes, your understanding is correct. The number of tasks for one > >> subtopology is the maximum number of partitions in any of the matched > >> topics. What will happen in Kafka Streams is that the partitions of > >> the matched topics will effectively be merged during stream > >> processing, so in your example, subtopology:0 would consume from AB:0 > >> and AC:0. > >> > >> Cheers, > >> Lucas > >> > >> On Fri, Aug 2, 2024 at 9:47 PM Lianet M. <liane...@gmail.com> wrote: > >>> > >>> Hi Bruno, answering your questions: > >>> > >>> About the full heartbeat (LM1): I just wanted to confirm that you'll be > >>> sending full HBs in case of errors in general. It's not clear from the > >>> KIP, > >>> since it referred to sending Id/epoch and whatever had changed since the > >>> last HB only. Sending full HB on error is key to ensure fresh rejoins > >>> after > >>> fencing for instance, and retries with all relevant info. > >>> > >>> About the instanceId (LM2): The instanceId is needed on every HB to be > >>> able > >>> to identify a member using one that is already taken. On every HB, the > >>> broker uses the instance id (if any) to retrieve the member ID associated > >>> with it, and checks it against the memberId received in the HB > >>> (throwing UnreleasedInstance exception if needed). So similar to my > >>> previous point, just wanted to confirm that we are considering that here > >>> too. > >>> > >>> Now some other thoughts: > >>> > >>> LM9: Definitely interesting imo if we can avoid the dependency between the > >>> StreamsGroupInitialize and the StreamsGroupHeartbeat. I totally get that > >>> the initial client implementation will do a HB first, and that's fine, but > >>> not having the flow enforced at the protocol level would allow for further > >>> improvement in the future (that initialize via admin idea you mentioned, > >>> for instance). Actually, I may be missing something about the HB, but if > >>> we > >>> are at the point where HB requires that the topology has been initialized, > >>> and the topology init requires the group, why is it the heartbeat RPC the > >>> one responsible for the group creation? (vs. StreamsGroupInitialize > >>> creates > >>> group if needed + HB just fails if topology not initialized) > >>> > >>> Thanks! > >>> > >>> Lianet > >>> (I didn't miss your answer on my INVALID_GROUP_TYPE proposal, just still > >>> thinking about it in sync with the same discussion we're having on the > >>> KIP-1043 thread...I'll come back on that) > >>> > >>> On Thu, Aug 1, 2024 at 10:55 AM Andrew Schofield > >>> <andrew_schofi...@live.com> > >>> wrote: > >>> > >>>> Hi Bruno, > >>>> Thanks for adding the detail on the schemas on records written to > >>>> __consumer_offsets. > >>>> I’ve reviewed them in detail and they look good to me. I have one naive > >>>> question. > >>>> > >>>> AS11: I notice that an assignment is essentially a set of partition > >>>> indices for > >>>> subtopologies. Since a subtopology can be defined by a source topic > >>>> regex, > >>>> does > >>>> this mean that an assignment gives the same set of partition indices for > >>>> all topics > >>>> which happen to match the regex? So, a subtopology reading from A* that > >>>> matches > >>>> AB and AC would give the same set of partitions to each task for both > >>>> topics, and > >>>> is not able to give AB:0 to one task and AC:0 to a different task. Is > >>>> this > >>>> correct? > >>>> > >>>> Thanks, > >>>> Andrew > >>>> > >>>>> On 23 Jul 2024, at 16:16, Bruno Cadonna <cado...@apache.org> wrote: > >>>>> > >>>>> Hi Lianet, > >>>>> > >>>>> Thanks for the review! > >>>>> > >>>>> Here my answers: > >>>>> > >>>>> LM1. Is your question whether we need to send a full heartbeat each time > >>>> the member re-joins the group even if the information in the RPC did not > >>>> change since the last heartbeat? > >>>>> > >>>>> LM2. Is the reason for sending the instance ID each time that a member > >>>> could shutdown, change the instance ID and then start and heartbeat > >>>> again, > >>>> but the group coordinator would never notice that the instance ID > >>>> changed? > >>>>> > >>>>> LM3. I see your point. I am wondering whether this additional > >>>> information is worth the dependency between the group types. To return > >>>> INVALID_GROUP_TYPE, the group coordinator needs to know that a group ID > >>>> exists with a different group type. With a group coordinator as we have > >>>> it > >>>> now in Apache Kafka that manages all group types, that is not a big deal, > >>>> but imagine if we (or some implementation of the Apache Kafka protocol) > >>>> decide to have a separate group coordinator for each group type. > >>>>> > >>>>> LM4. Using INVALID_GROUP_ID if the group ID is empty makes sense to me. > >>>> I going to change that. > >>>>> > >>>>> LM5. I think there is a dependency from the StreamsGroupInitialize RPC > >>>> to the heartbeat. The group must exist when the initialize RPC is > >>>> received > >>>> by the group coordinator. The group is created by the heartbeat RPC. I > >>>> would be in favor of making the initialize RPC independent from the > >>>> heartbeat RPC. That would allow to initialize a streams group explicitly > >>>> with an admin tool. > >>>>> > >>>>> LM6. I think it affects streams and streams should behave as the > >>>> consumer group. > >>>>> > >>>>> LM7. Good point that we will consider. > >>>>> > >>>>> LM8. Fixed! Thanks! > >>>>> > >>>>> > >>>>> Best, > >>>>> Bruno > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> On 7/19/24 9:53 PM, Lianet M. wrote: > >>>>>> Hi Lucas/Bruno, thanks for the great KIP! First comments: > >>>>>> LM1. Related to where the KIP says: *“Group ID, member ID, member > >>>>>> epoch > >>>>>> are sent with each heartbeat request. Any other information that has > >>>>>> not > >>>>>> changed since the last heartbeat can be omitted.”. *I expect all the > >>>> other > >>>>>> info also needs to be sent whenever a full heartbeat is required (even > >>>> if > >>>>>> it didn’t change from the last heartbeat), ex. on fencing scenarios, > >>>>>> correct? > >>>>>> LM2. For consumer groups we always send the groupInstanceId (if any) as > >>>>>> part of every heartbeat, along with memberId, epoch and groupId. Should > >>>> we > >>>>>> consider that too here? > >>>>>> LM3. We’re proposing returning a GROUP_ID_NOT_FOUND error in response > >>>>>> to > >>>>>> the stream-specific RPCs if the groupId is associated with a group type > >>>>>> that is not streams (ie. consumer group or share group). I wonder if at > >>>>>> this point, where we're getting several new group types added, each > >>>>>> with > >>>>>> RPCs that are supposed to include groupId of a certain type, we should > >>>> be > >>>>>> more explicit about this situation. Maybe a kind of INVALID_GROUP_TYPE > >>>>>> (group exists but not with a valid type for this RPC) vs a > >>>>>> GROUP_ID_NOT_FOUND (group does not exist). Those errors would be > >>>>>> consistently used across consumer, share, and streams RPCs whenever the > >>>>>> group id is not of the expected type. > >>>>>> This is truly not specific to this KIP, and should be addressed with > >>>>>> all > >>>>>> group types and their RPCs in mind. I just wanted to bring out my > >>>> concern > >>>>>> and get thoughts around it. > >>>>>> LM4. On a related note, StreamsGroupDescribe returns INVALID_REQUEST if > >>>>>> groupId is empty. There is already an INVALID_GROUP_ID error, that > >>>>>> seems > >>>>>> more specific to this situation. Error handling of specific errors > >>>>>> would > >>>>>> definitely be easier than having to deal with a generic INVALID_REQUEST > >>>>>> (and probably its custom message). I know that for KIP-848 we have > >>>>>> INVALID_REQUEST for similar situations, so if ever we take down this > >>>> path > >>>>>> we should review it there too for consistency. Thoughts? > >>>>>> LM5. The dependency between the StreamsGroupHeartbeat RPC and the > >>>>>> StreamsGroupInitialize RPC is one-way only right? HB requires a > >>>>>> previous > >>>>>> StreamsGroupInitialize request, but StreamsGroupInitialize processing > >>>>>> is > >>>>>> totally independent of heartbeats (and could perfectly be processed > >>>> without > >>>>>> a previous HB, even though the client implementation we’re proposing > >>>> won’t > >>>>>> go down that path). Is my understanding correct? Just to double check, > >>>>>> seems sensible like that at the protocol level. > >>>>>> LM6. With KIP-848, there is an important improvement that brings a > >>>>>> difference in behaviour around the static membership: with the classic > >>>>>> protocol, if a static member joins with a group instance already in > >>>> use, it > >>>>>> makes the initial member fail with a FENCED_INSTANCED_ID exception, vs. > >>>>>> with the new consumer group protocol, the second member trying to join > >>>>>> fails with an UNRELEASED_INSTANCE_ID. Does this change need to be > >>>>>> considered in any way for the streams app? (I'm not familiar with KS > >>>> yet, > >>>>>> but thought it was worth asking. If it doesn't affect in any way, still > >>>>>> maybe helpful to call it out on a section for static membership) > >>>>>> LM7. Regarding the admin tool to manage streams groups. We can discuss > >>>>>> whether to have it here or separately, but I think we should aim for > >>>> some > >>>>>> basic admin capabilities from the start, mainly because I believe it > >>>> will > >>>>>> be very helpful/needed in practice during the impl of the KIP. From > >>>>>> experience with KIP-848, we felt a bit blindfolded in the initial phase > >>>>>> where we still didn't have kafka-consumer-groups dealing with the new > >>>>>> groups (and then it was very helpful and used when we were able to > >>>> easily > >>>>>> inspect them from the console) > >>>>>> LM8. nit: the links the KIP-848 are not quite right (pointing to an > >>>>>> unrelated “Future work section” at the end of KIP-848) > >>>>>> Thanks! > >>>>>> Lianet > >>>>>> On Fri, Jul 19, 2024 at 11:13 AM Lucas Brutschy > >>>>>> <lbruts...@confluent.io.invalid> wrote: > >>>>>>> Hi Andrew, > >>>>>>> > >>>>>>> AS2: I added a note for now. If others feel strongly about it, we can > >>>>>>> still add more administrative tools to the KIP - it should not change > >>>>>>> the overall story significantly. > >>>>>>> > >>>>>>> AS8: "streams.group.assignor.name" sounds good to me to distinguish > >>>>>>> the config from class names. Not sure if I like the "default". To be > >>>>>>> consistent, we'd then have to call it > >>>>>>> `group.streams.default.session.timeout.ms` as well. I only added the > >>>>>>> `.name` on both broker and group level for now. > >>>>>>> > >>>>>>> AS10: Ah, I misread your comment, now I know what you meant. Good > >>>>>>> point, fixed (by Bruno). > >>>>>>> > >>>>>>> Cheers, > >>>>>>> Lucas > >>>>>>> > >>>>>>> On Fri, Jul 19, 2024 at 4:44 PM Andrew Schofield > >>>>>>> <andrew_schofi...@live.com> wrote: > >>>>>>>> > >>>>>>>> Hi Lucas, > >>>>>>>> I see that I hit send too quickly. One more comment: > >>>>>>>> > >>>>>>>> AS2: I think stating that there will be a `kafka-streams-group.sh` in > >>>> a > >>>>>>>> future KIP is fine to keep this KIP focused. Personally, I would > >>>> probably > >>>>>>>> put all of the gory details in this KIP, but then it’s not my KIP. A > >>>>>>> future > >>>>>>>> pointer is fine too. > >>>>>>>> > >>>>>>>> Thanks, > >>>>>>>> Andrew > >>>>>>>> > >>>>>>>> > >>>>>>>>> On 19 Jul 2024, at 13:46, Lucas Brutschy <lbruts...@confluent.io > >>>> .INVALID> > >>>>>>> wrote: > >>>>>>>>> > >>>>>>>>> Hi Andrew, > >>>>>>>>> > >>>>>>>>> thanks for getting the discussion going! Here are my responses. > >>>>>>>>> > >>>>>>>>> AS1: Good point, done. > >>>>>>>>> > >>>>>>>>> AS2: We were planning to add more administrative tools to the > >>>>>>>>> interface in a follow-up KIP, to not make this KIP too large. If > >>>>>>>>> people think that it would help to understand the overall picture if > >>>>>>>>> we already add something like `kafka-streams-groups.sh`, we will do > >>>>>>>>> that. I also agree that we should address how this relates to > >>>>>>>>> KIP-1043, we'll add it. > >>>>>>>>> > >>>>>>>>> AS3: Good idea, that's more consistent with `assigning` and > >>>>>>> `reconciling` etc. > >>>>>>>>> > >>>>>>>>> AS4: Thanks, Fixed. > >>>>>>>>> > >>>>>>>>> AS5: Good catch. This was supposed to mean that we require CREATE on > >>>>>>>>> cluster or CREATE on all topics, not both. Fixed. > >>>>>>>>> > >>>>>>>>> AS6: Thanks, Fixed. > >>>>>>>>> > >>>>>>>>> AS7. Thanks, Fixed. > >>>>>>>>> > >>>>>>>>> AS8: I think this works a bit different in this KIP than in consumer > >>>>>>>>> groups. KIP-848 lets the members vote for a preferred assignor, and > >>>>>>>>> the broker-side assignor is picked by majority vote. The > >>>>>>>>> `group.consumer.assignors` specifies the list of assignors that are > >>>>>>>>> supported on the broker, and is configurable because the interface > >>>>>>>>> is > >>>>>>>>> pluggable. In this KIP, the task assignor is not voted on by members > >>>>>>>>> but configured on the broker-side. `group.streams.assignor` is used > >>>>>>>>> for this, and uses a specific name. If we'll make the task assignor > >>>>>>>>> pluggable on the broker-side, we'd introduce a separate config > >>>>>>>>> `group.streams.assignors`, which would indeed be a list of class > >>>>>>>>> names. I think there is no conflict here, the two configurations > >>>> serve > >>>>>>>>> different purposes. The only gripe I'd have here is naming as > >>>>>>>>> `group.streams.assignor` and `group.streams.assignors` would be a > >>>>>>>>> bit > >>>>>>>>> similar, but I cannot really think of a better name for > >>>>>>>>> `group.streams.assignor`, so I'd probably rather introduce > >>>>>>>>> `group.streams.assignors` as `group.streams.possible_assignors` or > >>>>>>>>> something like that. > >>>>>>>>> > >>>>>>>>> AS9: I added explanations for the various record types. Apart from > >>>> the > >>>>>>>>> new topology record, and the partition metadata (which is based on > >>>> the > >>>>>>>>> topology and can only be created once we have a topology > >>>>>>>>> initialized) > >>>>>>>>> the lifecycle for the records is basically identical as in KIP-848. > >>>>>>>>> > >>>>>>>>> AS10: In the consumer offset topic, the version in the key is used > >>>>>>>>> to > >>>>>>>>> differentiate different schema types with the same content. So the > >>>>>>>>> keys are not versioned, but the version field is "abused" as a type > >>>>>>>>> tag. This is the same in KIP-848, we followed it for consistency. > >>>>>>>>> > >>>>>>>>> Cheers, > >>>>>>>>> Lucas > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On Thu, Jul 18, 2024 at 1:27 PM Andrew Schofield > >>>>>>>>> <andrew_schofi...@live.com> wrote: > >>>>>>>>>> > >>>>>>>>>> Hi Lucas and Bruno, > >>>>>>>>>> > >>>>>>>>>> Thanks for the great KIP. > >>>>>>>>>> > >>>>>>>>>> I've read through the document and have some initial comments. > >>>>>>>>>> > >>>>>>>>>> AS1: I suppose that there is a new o.a.k.common.GroupType.STREAMS > >>>>>>> enumeration > >>>>>>>>>> constant. This is a change to the public interface and should be > >>>>>>> called out. > >>>>>>>>>> > >>>>>>>>>> AS2: Since streams groups are no longer consumer groups, how does > >>>> the > >>>>>>> user > >>>>>>>>>> manipulate them, observe lag and so on? Will you add > >>>>>>> `kafka-streams-groups.sh` > >>>>>>>>>> or extend `kafka-streams-application-reset.sh`? Of course, KIP-1043 > >>>>>>> can easily > >>>>>>>>>> be extended to support streams groups, but that only lets the user > >>>>>>> see the > >>>>>>>>>> groups, not manipulate them. > >>>>>>>>>> > >>>>>>>>>> AS3: I wonder whether the streams group state of UNINITIALIZED > >>>>>>>>>> would > >>>>>>> be > >>>>>>>>>> better expressed as INITIALIZING. > >>>>>>>>>> > >>>>>>>>>> AS4: In StreamsGroupInitializeRequest, Topology[].SourceTopicRegex > >>>>>>> should > >>>>>>>>>> be nullable. > >>>>>>>>>> > >>>>>>>>>> AS5: Why does StreamsGroupInitialize require CREATE permission on > >>>> the > >>>>>>>>>> cluster resource? I imagine that this is one of the ways that the > >>>>>>> request might > >>>>>>>>>> be granted permission to create the StateChangelogTopics and > >>>>>>>>>> RepartitionSourceTopics, but if it is granted permission to create > >>>>>>> those topics > >>>>>>>>>> with specific ACLs, would CREATE on the cluster resource still be > >>>>>>> required? > >>>>>>>>>> > >>>>>>>>>> AS6: StreamsGroupInitialize can also fail with > >>>>>>> TOPIC_AUTHORIZATION_FAILED > >>>>>>>>>> and (subject to AS5) CLUSTER_AUTHORIZATION_FAILED. > >>>>>>>>>> > >>>>>>>>>> AS7: A tiny nit. You've used TopologyID (capitals) in > >>>>>>> StreamsGroupHeartbeatRequest > >>>>>>>>>> and a few others, but in all other cases the fields which are ids > >>>> are > >>>>>>> spelled Id. > >>>>>>>>>> I suggest TopologyId. > >>>>>>>>>> > >>>>>>>>>> Also, "interal" is probably meant to be "interval”. > >>>>>>>>>> > >>>>>>>>>> AS8: For consumer groups, the `group.consumer.assignors` > >>>>>>> configuration is a > >>>>>>>>>> list of class names. The assignors do have names too, but the > >>>>>>> configuration which > >>>>>>>>>> enables them is in terms of class names. I wonder whether the > >>>> broker’s > >>>>>>>>>> group.streams.assignor could actually be `group.streams.assignors` > >>>>>>> and specified > >>>>>>>>>> as a list of the class names of the supplied assignors. I know > >>>> you're > >>>>>>> not supporting > >>>>>>>>>> other assignors yet, but when you do, I expect you would prefer to > >>>>>>> have used class > >>>>>>>>>> names from the start. > >>>>>>>>>> > >>>>>>>>>> The use of assignor names in the other places looks good to me. > >>>>>>>>>> > >>>>>>>>>> AS9: I'd find it really helpful to have a bit of a description > >>>>>>>>>> about > >>>>>>> the purpose and > >>>>>>>>>> lifecycle of the 9 record types you've introduced on the > >>>>>>> __consumer_offsets topic. > >>>>>>>>>> I did a cursory review but without really understanding what's > >>>>>>> written when, > >>>>>>>>>> I can't do a thorough job of reviewing. > >>>>>>>>>> > >>>>>>>>>> AS10: In the definitions of the record keys, such as > >>>>>>>>>> StreamsGroupCurrentMemberAssignmentKey, the versions of the fields > >>>>>>> must > >>>>>>>>>> match the versions of the types. > >>>>>>>>>> > >>>>>>>>>> Thanks, > >>>>>>>>>> Andrew > >>>>>>>>>> > >>>>>>>>>>> On 12 Jul 2024, at 09:04, Lucas Brutschy <lbruts...@confluent.io > >>>> .INVALID> > >>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>> Hi all, > >>>>>>>>>>> > >>>>>>>>>>> I would like to start a discussion thread on KIP-1071: Streams > >>>>>>>>>>> Rebalance Protocol. With this KIP, we aim to bring the principles > >>>>>>> laid > >>>>>>>>>>> down by KIP-848 to Kafka Streams, to make rebalances more reliable > >>>>>>> and > >>>>>>>>>>> scalable, and make Kafka Streams overall easier to deploy and > >>>>>>> operate. > >>>>>>>>>>> The KIP proposed moving the assignment logic to the broker, and > >>>>>>>>>>> introducing a dedicated group type and dedicated RPCs for Kafka > >>>>>>>>>>> Streams. > >>>>>>>>>>> > >>>>>>>>>>> The KIP is here: > >>>>>>>>>>> > >>>>>>> > >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1071%3A+Streams+Rebalance+Protocol > >>>>>>>>>>> > >>>>>>>>>>> This is joint work with Bruno Cadonna. > >>>>>>>>>>> > >>>>>>>>>>> Please take a look and let us know what you think. > >>>>>>>>>>> > >>>>>>>>>>> Best, > >>>>>>>>>>> Lucas > >