Hi Lucas,
Thanks for the KIP update.

I think that `kafka-streams-groups.sh` looks like a good equivalent to
the tools for the other types of groups.

AS12: In kafka-streams-groups.sh, the description for the
--input-topics option seems insufficient. Why is an input topic specified
with this option different than a topic specified with --topic? Why is
It --input-topics rather than --input-topic? Which action of this tool
does this option apply to?

AS13: Similarly, for --internal-topics, which action of the tool does it
apply to? I suppose it’s --delete, but it’s not clear to me.

Thanks,
Andrew

> On 11 Aug 2024, at 12:10, Lucas Brutschy <lbruts...@confluent.io.INVALID> 
> wrote:
>
> Hi Andrew/Lianet,
>
> I have added an administrative command-line tool (replacing
> `kafka-streams-application-reset`) and extensions of the Admin API for
> listing, deleting, describing groups and listing, altering and
> deleting offsets for streams groups. No new RPCs have to be added,
> however, we duplicate some of the API in the admin client that exist
> for consumer groups. It seems to me cleaner to duplicate some
> code/interface here, instead of using "consumer group" APIs for
> streams groups, or renaming existing APIs that use "consumerGroup" in
> the name to something more generic (which wouldn't cover share
> groups).
>
> I think for now, all comments are addressed.
>
> Cheers,
> Lucas
>
> On Tue, Aug 6, 2024 at 3:19 PM Lucas Brutschy <lbruts...@confluent.io> wrote:
>>
>> Hi Lianet and Andrew,
>>
>> LM1/LM2: You are right. The idea is to omit fields exactly in the same
>> situations as in KIP-848. In the KIP, I stuck with how the behavior
>> was defined in KIP-848 (e.g. KIP-848 defined that that instance ID
>> will be omitted if it did not change since the last heartbeat). But
>> you are correct that the implementation handles these details slightly
>> differently. I updated the KIP to match more closely the behavior of
>> the KIP-848 implementation.
>>
>> LM9: Yes, there are several options to do this. The idea is to have
>> only one client initialize the topology, not all clients. It seems
>> easier to understand on the protocol level (otherwise we'd have N
>> topology initializations racing with a hard-to-determine winner). We
>> also expect the payload of the request to grow in the future and want
>> to avoid the overhead of having all clients sending the topology at
>> the same time. But initializing the group could take some time - we
>> have to create internal topics, and maybe a client is malfunctioning
>> and the initialization has to be retried. It seemed a bit confusing to
>> return errors to all other clients that are trying to join the group
>> during that time - as if there was a problem with joining the group /
>> the contents of the heartbeat. It seems cleaner to me to let all
>> clients successfully join the group and heartbeat, but remain in an
>> INITIALIZING state which does not yet assign any tasks. Does that make
>> sense to you? You are right that returning a retriable error and
>> having all clients retry until the group is initialized would also
>> work, it just doesn't model well that "everything is going according
>> to plan".
>> As for the order of the calls - yes, I think it is fine to allow an
>> Initialize RPC before the first heartbeat for supporting future admin
>> tools. I made this change throughout the KIP, thanks!
>>
>> AS11: Yes, your understanding is correct. The number of tasks for one
>> subtopology is the maximum number of partitions in any of the matched
>> topics. What will happen in Kafka Streams is that the partitions of
>> the matched topics will effectively be merged during stream
>> processing, so in your example, subtopology:0 would consume from AB:0
>> and AC:0.
>>
>> Cheers,
>> Lucas
>>
>> On Fri, Aug 2, 2024 at 9:47 PM Lianet M. <liane...@gmail.com> wrote:
>>>
>>> Hi Bruno, answering your questions:
>>>
>>> About the full heartbeat (LM1): I just wanted to confirm that you'll be
>>> sending full HBs in case of errors in general. It's not clear from the KIP,
>>> since it referred to sending Id/epoch and whatever had changed since the
>>> last HB only. Sending full HB on error is key to ensure fresh rejoins after
>>> fencing for instance, and retries with all relevant info.
>>>
>>> About the instanceId (LM2): The instanceId is needed on every HB to be able
>>> to identify a member using one that is already taken. On every HB, the
>>> broker uses the instance id (if any) to retrieve the member ID associated
>>> with it, and checks it against the memberId received in the HB
>>> (throwing UnreleasedInstance exception if needed). So similar to my
>>> previous point, just wanted to confirm that we are considering that here
>>> too.
>>>
>>> Now some other thoughts:
>>>
>>> LM9: Definitely interesting imo if we can avoid the dependency between the
>>> StreamsGroupInitialize and the StreamsGroupHeartbeat. I totally get that
>>> the initial client implementation will do a HB first, and that's fine, but
>>> not having the flow enforced at the protocol level would allow for further
>>> improvement in the future (that initialize via admin idea you mentioned,
>>> for instance). Actually, I may be missing something about the HB, but if we
>>> are at the point where HB requires that the topology has been initialized,
>>> and the topology init requires the group, why is it the heartbeat RPC the
>>> one responsible for the group creation? (vs. StreamsGroupInitialize creates
>>> group if needed + HB just fails if topology not initialized)
>>>
>>> Thanks!
>>>
>>> Lianet
>>> (I didn't miss your answer on my INVALID_GROUP_TYPE proposal, just still
>>> thinking about it in sync with the same discussion we're having on the
>>> KIP-1043 thread...I'll come back on that)
>>>
>>> On Thu, Aug 1, 2024 at 10:55 AM Andrew Schofield <andrew_schofi...@live.com>
>>> wrote:
>>>
>>>> Hi Bruno,
>>>> Thanks for adding the detail on the schemas on records written to
>>>> __consumer_offsets.
>>>> I’ve reviewed them in detail and they look good to me. I have one naive
>>>> question.
>>>>
>>>> AS11: I notice that an assignment is essentially a set of partition
>>>> indices for
>>>> subtopologies. Since a subtopology can be defined by a source topic regex,
>>>> does
>>>> this mean that an assignment gives the same set of partition indices for
>>>> all topics
>>>> which happen to match the regex? So, a subtopology reading from A* that
>>>> matches
>>>> AB and AC would give the same set of partitions to each task for both
>>>> topics, and
>>>> is not able to give AB:0 to one task and AC:0 to a different task. Is this
>>>> correct?
>>>>
>>>> Thanks,
>>>> Andrew
>>>>
>>>>> On 23 Jul 2024, at 16:16, Bruno Cadonna <cado...@apache.org> wrote:
>>>>>
>>>>> Hi Lianet,
>>>>>
>>>>> Thanks for the review!
>>>>>
>>>>> Here my answers:
>>>>>
>>>>> LM1. Is your question whether we need to send a full heartbeat each time
>>>> the member re-joins the group even if the information in the RPC did not
>>>> change since the last heartbeat?
>>>>>
>>>>> LM2. Is the reason for sending the instance ID each time that a member
>>>> could shutdown, change the instance ID and then start and heartbeat again,
>>>> but the group coordinator would never notice that the instance ID changed?
>>>>>
>>>>> LM3. I see your point. I am wondering whether this additional
>>>> information is worth the dependency between the group types. To return
>>>> INVALID_GROUP_TYPE, the group coordinator needs to know that a group ID
>>>> exists with a different group type. With a group coordinator as we have it
>>>> now in Apache Kafka that manages all group types, that is not a big deal,
>>>> but imagine if we (or some implementation of the Apache Kafka protocol)
>>>> decide to have a separate group coordinator for each group type.
>>>>>
>>>>> LM4. Using INVALID_GROUP_ID if the group ID is empty makes sense to me.
>>>> I going to change that.
>>>>>
>>>>> LM5. I think there is a dependency from the StreamsGroupInitialize RPC
>>>> to the heartbeat. The group must exist when the initialize RPC is received
>>>> by the group coordinator. The group is created by the heartbeat RPC. I
>>>> would be in favor of making the initialize RPC independent from the
>>>> heartbeat RPC. That would allow to initialize a streams group explicitly
>>>> with an admin tool.
>>>>>
>>>>> LM6. I think it affects streams and streams should behave as the
>>>> consumer group.
>>>>>
>>>>> LM7. Good point that we will consider.
>>>>>
>>>>> LM8. Fixed! Thanks!
>>>>>
>>>>>
>>>>> Best,
>>>>> Bruno
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On 7/19/24 9:53 PM, Lianet M. wrote:
>>>>>> Hi Lucas/Bruno, thanks for the great KIP! First comments:
>>>>>> LM1. Related to where the KIP says:  *“Group ID, member ID, member epoch
>>>>>> are sent with each heartbeat request. Any other information that has not
>>>>>> changed since the last heartbeat can be omitted.”. *I expect all the
>>>> other
>>>>>> info also needs to be sent whenever a full heartbeat is required (even
>>>> if
>>>>>> it didn’t change from the last heartbeat), ex. on fencing scenarios,
>>>>>> correct?
>>>>>> LM2. For consumer groups we always send the groupInstanceId (if any) as
>>>>>> part of every heartbeat, along with memberId, epoch and groupId. Should
>>>> we
>>>>>> consider that too here?
>>>>>> LM3. We’re proposing returning a GROUP_ID_NOT_FOUND error in response to
>>>>>> the stream-specific RPCs if the groupId is associated with a group type
>>>>>> that is not streams (ie. consumer group or share group). I wonder if at
>>>>>> this point, where we're getting several new group types added, each with
>>>>>> RPCs that are supposed to include groupId of a certain type, we should
>>>> be
>>>>>> more explicit about this situation. Maybe a kind of INVALID_GROUP_TYPE
>>>>>> (group exists but not with a valid type for this RPC) vs a
>>>>>> GROUP_ID_NOT_FOUND (group does not exist).  Those errors would be
>>>>>> consistently used across consumer, share, and streams RPCs whenever the
>>>>>> group id is not of the expected type.
>>>>>> This is truly not specific to this KIP, and should be addressed with all
>>>>>> group types and their RPCs in mind. I just wanted to bring out my
>>>> concern
>>>>>> and get thoughts around it.
>>>>>> LM4. On a related note, StreamsGroupDescribe returns INVALID_REQUEST if
>>>>>> groupId is empty. There is already an INVALID_GROUP_ID error, that seems
>>>>>> more specific to this situation. Error handling of specific errors would
>>>>>> definitely be easier than having to deal with a generic INVALID_REQUEST
>>>>>> (and probably its custom message). I know that for KIP-848 we have
>>>>>> INVALID_REQUEST for similar situations, so if ever we take down this
>>>> path
>>>>>> we should review it there too for consistency. Thoughts?
>>>>>> LM5. The dependency between the StreamsGroupHeartbeat RPC and the
>>>>>> StreamsGroupInitialize RPC is one-way only right? HB requires a previous
>>>>>> StreamsGroupInitialize request, but StreamsGroupInitialize processing is
>>>>>> totally independent of heartbeats (and could perfectly be processed
>>>> without
>>>>>> a previous HB, even though the client implementation we’re proposing
>>>> won’t
>>>>>> go down that path). Is my understanding correct? Just to double check,
>>>>>> seems sensible like that at the protocol level.
>>>>>> LM6. With KIP-848, there is an important improvement that brings a
>>>>>> difference in behaviour around the static membership: with the classic
>>>>>> protocol, if a static member joins with a group instance already in
>>>> use, it
>>>>>> makes the initial member fail with a FENCED_INSTANCED_ID exception, vs.
>>>>>> with the new consumer group protocol, the second member trying to join
>>>>>> fails with an UNRELEASED_INSTANCE_ID. Does this change need to be
>>>>>> considered in any way for the streams app? (I'm not familiar with KS
>>>> yet,
>>>>>> but thought it was worth asking. If it doesn't affect in any way, still
>>>>>> maybe helpful to call it out on a section for static membership)
>>>>>> LM7. Regarding the admin tool to manage streams groups. We can discuss
>>>>>> whether to have it here or separately, but I think we should aim for
>>>> some
>>>>>> basic admin capabilities from the start, mainly because I believe it
>>>> will
>>>>>> be very helpful/needed in practice during the impl of the KIP. From
>>>>>> experience with KIP-848, we felt a bit blindfolded in the initial phase
>>>>>> where we still didn't have kafka-consumer-groups dealing with the new
>>>>>> groups (and then it was very helpful and used when we were able to
>>>> easily
>>>>>> inspect them from the console)
>>>>>> LM8. nit: the links the KIP-848 are not quite right (pointing to an
>>>>>> unrelated “Future work section” at the end of KIP-848)
>>>>>> Thanks!
>>>>>> Lianet
>>>>>> On Fri, Jul 19, 2024 at 11:13 AM Lucas Brutschy
>>>>>> <lbruts...@confluent.io.invalid> wrote:
>>>>>>> Hi Andrew,
>>>>>>>
>>>>>>> AS2: I added a note for now. If others feel strongly about it, we can
>>>>>>> still add more administrative tools to the KIP - it should not change
>>>>>>> the overall story significantly.
>>>>>>>
>>>>>>> AS8: "streams.group.assignor.name" sounds good to me to distinguish
>>>>>>> the config from class names. Not sure if I like the "default". To be
>>>>>>> consistent, we'd then have to call it
>>>>>>> `group.streams.default.session.timeout.ms` as well. I only added the
>>>>>>> `.name` on both broker and group level for now.
>>>>>>>
>>>>>>> AS10: Ah, I misread your comment, now I know what you meant. Good
>>>>>>> point, fixed (by Bruno).
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Lucas
>>>>>>>
>>>>>>> On Fri, Jul 19, 2024 at 4:44 PM Andrew Schofield
>>>>>>> <andrew_schofi...@live.com> wrote:
>>>>>>>>
>>>>>>>> Hi Lucas,
>>>>>>>> I see that I hit send too quickly. One more comment:
>>>>>>>>
>>>>>>>> AS2: I think stating that there will be a `kafka-streams-group.sh` in
>>>> a
>>>>>>>> future KIP is fine to keep this KIP focused. Personally, I would
>>>> probably
>>>>>>>> put all of the gory details in this KIP, but then it’s not my KIP. A
>>>>>>> future
>>>>>>>> pointer is fine too.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Andrew
>>>>>>>>
>>>>>>>>
>>>>>>>>> On 19 Jul 2024, at 13:46, Lucas Brutschy <lbruts...@confluent.io
>>>> .INVALID>
>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> Hi Andrew,
>>>>>>>>>
>>>>>>>>> thanks for getting the discussion going! Here are my responses.
>>>>>>>>>
>>>>>>>>> AS1: Good point, done.
>>>>>>>>>
>>>>>>>>> AS2: We were planning to add more administrative tools to the
>>>>>>>>> interface in a follow-up KIP, to not make this KIP too large. If
>>>>>>>>> people think that it would help to understand the overall picture if
>>>>>>>>> we already add something like `kafka-streams-groups.sh`, we will do
>>>>>>>>> that. I also agree that we should address how this relates to
>>>>>>>>> KIP-1043, we'll add it.
>>>>>>>>>
>>>>>>>>> AS3: Good idea, that's more consistent with `assigning` and
>>>>>>> `reconciling` etc.
>>>>>>>>>
>>>>>>>>> AS4: Thanks, Fixed.
>>>>>>>>>
>>>>>>>>> AS5: Good catch. This was supposed to mean that we require CREATE on
>>>>>>>>> cluster or CREATE on all topics, not both. Fixed.
>>>>>>>>>
>>>>>>>>> AS6: Thanks, Fixed.
>>>>>>>>>
>>>>>>>>> AS7. Thanks, Fixed.
>>>>>>>>>
>>>>>>>>> AS8: I think this works a bit different in this KIP than in consumer
>>>>>>>>> groups. KIP-848 lets the members vote for a preferred assignor, and
>>>>>>>>> the broker-side assignor is picked by majority vote. The
>>>>>>>>> `group.consumer.assignors` specifies the list of assignors that are
>>>>>>>>> supported on the broker, and is configurable because the interface is
>>>>>>>>> pluggable. In this KIP, the task assignor is not voted on by members
>>>>>>>>> but configured on the broker-side. `group.streams.assignor` is used
>>>>>>>>> for this, and uses a specific name. If we'll make the task assignor
>>>>>>>>> pluggable on the broker-side, we'd introduce a separate config
>>>>>>>>> `group.streams.assignors`, which would indeed be a list of class
>>>>>>>>> names. I think there is no conflict here, the two configurations
>>>> serve
>>>>>>>>> different purposes.  The only gripe I'd have here is naming as
>>>>>>>>> `group.streams.assignor` and `group.streams.assignors` would be a bit
>>>>>>>>> similar, but I cannot really think of a better name for
>>>>>>>>> `group.streams.assignor`, so I'd probably rather introduce
>>>>>>>>> `group.streams.assignors`  as `group.streams.possible_assignors`  or
>>>>>>>>> something like that.
>>>>>>>>>
>>>>>>>>> AS9: I added explanations for the various record types. Apart from
>>>> the
>>>>>>>>> new topology record, and the partition metadata (which is based on
>>>> the
>>>>>>>>> topology and can only be created once we have a topology initialized)
>>>>>>>>> the lifecycle for the records is basically identical as in KIP-848.
>>>>>>>>>
>>>>>>>>> AS10: In the consumer offset topic, the version in the key is used to
>>>>>>>>> differentiate different schema types with the same content. So the
>>>>>>>>> keys are not versioned, but the version field is "abused" as a type
>>>>>>>>> tag. This is the same in KIP-848, we followed it for consistency.
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Lucas
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Jul 18, 2024 at 1:27 PM Andrew Schofield
>>>>>>>>> <andrew_schofi...@live.com> wrote:
>>>>>>>>>>
>>>>>>>>>> Hi Lucas and Bruno,
>>>>>>>>>>
>>>>>>>>>> Thanks for the great KIP.
>>>>>>>>>>
>>>>>>>>>> I've read through the document and have some initial comments.
>>>>>>>>>>
>>>>>>>>>> AS1: I suppose that there is a new o.a.k.common.GroupType.STREAMS
>>>>>>> enumeration
>>>>>>>>>> constant. This is a change to the public interface and should be
>>>>>>> called out.
>>>>>>>>>>
>>>>>>>>>> AS2: Since streams groups are no longer consumer groups, how does
>>>> the
>>>>>>> user
>>>>>>>>>> manipulate them, observe lag and so on? Will you add
>>>>>>> `kafka-streams-groups.sh`
>>>>>>>>>> or extend `kafka-streams-application-reset.sh`? Of course, KIP-1043
>>>>>>> can easily
>>>>>>>>>> be extended to support streams groups, but that only lets the user
>>>>>>> see the
>>>>>>>>>> groups, not manipulate them.
>>>>>>>>>>
>>>>>>>>>> AS3: I wonder whether the streams group state of UNINITIALIZED would
>>>>>>> be
>>>>>>>>>> better expressed as INITIALIZING.
>>>>>>>>>>
>>>>>>>>>> AS4: In StreamsGroupInitializeRequest, Topology[].SourceTopicRegex
>>>>>>> should
>>>>>>>>>> be nullable.
>>>>>>>>>>
>>>>>>>>>> AS5: Why does StreamsGroupInitialize require CREATE permission on
>>>> the
>>>>>>>>>> cluster resource? I imagine that this is one of the ways that the
>>>>>>> request might
>>>>>>>>>> be granted permission to create the StateChangelogTopics and
>>>>>>>>>> RepartitionSourceTopics, but if it is granted permission to create
>>>>>>> those topics
>>>>>>>>>> with specific ACLs, would CREATE on the cluster resource still be
>>>>>>> required?
>>>>>>>>>>
>>>>>>>>>> AS6: StreamsGroupInitialize can also fail with
>>>>>>> TOPIC_AUTHORIZATION_FAILED
>>>>>>>>>> and (subject to AS5) CLUSTER_AUTHORIZATION_FAILED.
>>>>>>>>>>
>>>>>>>>>> AS7: A tiny nit. You've used TopologyID (capitals) in
>>>>>>> StreamsGroupHeartbeatRequest
>>>>>>>>>> and a few others, but in all other cases the fields which are ids
>>>> are
>>>>>>> spelled Id.
>>>>>>>>>> I suggest TopologyId.
>>>>>>>>>>
>>>>>>>>>> Also, "interal" is probably meant to be "interval”.
>>>>>>>>>>
>>>>>>>>>> AS8: For consumer groups, the `group.consumer.assignors`
>>>>>>> configuration is a
>>>>>>>>>> list of class names. The assignors do have names too, but the
>>>>>>> configuration which
>>>>>>>>>> enables them is in terms of class names. I wonder whether the
>>>> broker’s
>>>>>>>>>> group.streams.assignor could actually be `group.streams.assignors`
>>>>>>> and specified
>>>>>>>>>> as a list of the class names of the supplied assignors. I know
>>>> you're
>>>>>>> not supporting
>>>>>>>>>> other assignors yet, but when you do, I expect you would prefer to
>>>>>>> have used class
>>>>>>>>>> names from the start.
>>>>>>>>>>
>>>>>>>>>> The use of assignor names in the other places looks good to me.
>>>>>>>>>>
>>>>>>>>>> AS9: I'd find it really helpful to have a bit of a description about
>>>>>>> the purpose and
>>>>>>>>>> lifecycle of the 9 record types you've introduced on the
>>>>>>> __consumer_offsets topic.
>>>>>>>>>> I did a cursory review but without really understanding what's
>>>>>>> written when,
>>>>>>>>>> I can't do a thorough job of reviewing.
>>>>>>>>>>
>>>>>>>>>> AS10: In the definitions of the record keys, such as
>>>>>>>>>> StreamsGroupCurrentMemberAssignmentKey, the versions of the fields
>>>>>>> must
>>>>>>>>>> match the versions of the types.
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Andrew
>>>>>>>>>>
>>>>>>>>>>> On 12 Jul 2024, at 09:04, Lucas Brutschy <lbruts...@confluent.io
>>>> .INVALID>
>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Hi all,
>>>>>>>>>>>
>>>>>>>>>>> I would like to start a discussion thread on KIP-1071: Streams
>>>>>>>>>>> Rebalance Protocol. With this KIP, we aim to bring the principles
>>>>>>> laid
>>>>>>>>>>> down by KIP-848 to Kafka Streams, to make rebalances more reliable
>>>>>>> and
>>>>>>>>>>> scalable, and make Kafka Streams overall easier to deploy and
>>>>>>> operate.
>>>>>>>>>>> The KIP proposed moving the assignment logic to the broker, and
>>>>>>>>>>> introducing a dedicated group type and dedicated RPCs for Kafka
>>>>>>>>>>> Streams.
>>>>>>>>>>>
>>>>>>>>>>> The KIP is here:
>>>>>>>>>>>
>>>>>>>
>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1071%3A+Streams+Rebalance+Protocol
>>>>>>>>>>>
>>>>>>>>>>> This is joint work with Bruno Cadonna.
>>>>>>>>>>>
>>>>>>>>>>> Please take a look and let us know what you think.
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Lucas


Reply via email to