Hi Lianet,
I have a comment on your comment. I think that’s allowed.

LM3. I quite like the idea of having an INVALID_GROUP_TYPE
error code for RPCs which were applied to a group which turned
out to be the wrong type. We might even be able to use
INCONSISTENT_GROUP_PROTOCOL for this purpose.

However, there would be quite a lot of RPC changes to achieve this.
For example, OffsetDelete is expecting a consumer group so I guess
there would be a new version with the new error code, and so on for
the other RPCs.

If we are going to do this, perhaps it would be best included in KIP-1043.
Feel free to comment in that discussion thread if you agree.


Thanks,
Andrew


> On 19 Jul 2024, at 20:53, Lianet M. <liane...@gmail.com> wrote:
>
> Hi Lucas/Bruno, thanks for the great KIP! First comments:
>
> LM1. Related to where the KIP says:  *“Group ID, member ID, member epoch
> are sent with each heartbeat request. Any other information that has not
> changed since the last heartbeat can be omitted.”. *I expect all the other
> info also needs to be sent whenever a full heartbeat is required (even if
> it didn’t change from the last heartbeat), ex. on fencing scenarios,
> correct?
>
> LM2. For consumer groups we always send the groupInstanceId (if any) as
> part of every heartbeat, along with memberId, epoch and groupId. Should we
> consider that too here?
>
> LM3. We’re proposing returning a GROUP_ID_NOT_FOUND error in response to
> the stream-specific RPCs if the groupId is associated with a group type
> that is not streams (ie. consumer group or share group). I wonder if at
> this point, where we're getting several new group types added, each with
> RPCs that are supposed to include groupId of a certain type, we should be
> more explicit about this situation. Maybe a kind of INVALID_GROUP_TYPE
> (group exists but not with a valid type for this RPC) vs a
> GROUP_ID_NOT_FOUND (group does not exist).  Those errors would be
> consistently used across consumer, share, and streams RPCs whenever the
> group id is not of the expected type.
> This is truly not specific to this KIP, and should be addressed with all
> group types and their RPCs in mind. I just wanted to bring out my concern
> and get thoughts around it.
>
> LM4. On a related note, StreamsGroupDescribe returns INVALID_REQUEST if
> groupId is empty. There is already an INVALID_GROUP_ID error, that seems
> more specific to this situation. Error handling of specific errors would
> definitely be easier than having to deal with a generic INVALID_REQUEST
> (and probably its custom message). I know that for KIP-848 we have
> INVALID_REQUEST for similar situations, so if ever we take down this path
> we should review it there too for consistency. Thoughts?
>
> LM5. The dependency between the StreamsGroupHeartbeat RPC and the
> StreamsGroupInitialize RPC is one-way only right? HB requires a previous
> StreamsGroupInitialize request, but StreamsGroupInitialize processing is
> totally independent of heartbeats (and could perfectly be processed without
> a previous HB, even though the client implementation we’re proposing won’t
> go down that path). Is my understanding correct? Just to double check,
> seems sensible like that at the protocol level.
>
> LM6. With KIP-848, there is an important improvement that brings a
> difference in behaviour around the static membership: with the classic
> protocol, if a static member joins with a group instance already in use, it
> makes the initial member fail with a FENCED_INSTANCED_ID exception, vs.
> with the new consumer group protocol, the second member trying to join
> fails with an UNRELEASED_INSTANCE_ID. Does this change need to be
> considered in any way for the streams app? (I'm not familiar with KS yet,
> but thought it was worth asking. If it doesn't affect in any way, still
> maybe helpful to call it out on a section for static membership)
>
> LM7. Regarding the admin tool to manage streams groups. We can discuss
> whether to have it here or separately, but I think we should aim for some
> basic admin capabilities from the start, mainly because I believe it will
> be very helpful/needed in practice during the impl of the KIP. From
> experience with KIP-848, we felt a bit blindfolded in the initial phase
> where we still didn't have kafka-consumer-groups dealing with the new
> groups (and then it was very helpful and used when we were able to easily
> inspect them from the console)
>
> LM8. nit: the links the KIP-848 are not quite right (pointing to an
> unrelated “Future work section” at the end of KIP-848)
>
> Thanks!
>
> Lianet
>
>
> On Fri, Jul 19, 2024 at 11:13 AM Lucas Brutschy
> <lbruts...@confluent.io.invalid> wrote:
>
>> Hi Andrew,
>>
>> AS2: I added a note for now. If others feel strongly about it, we can
>> still add more administrative tools to the KIP - it should not change
>> the overall story significantly.
>>
>> AS8: "streams.group.assignor.name" sounds good to me to distinguish
>> the config from class names. Not sure if I like the "default". To be
>> consistent, we'd then have to call it
>> `group.streams.default.session.timeout.ms` as well. I only added the
>> `.name` on both broker and group level for now.
>>
>> AS10: Ah, I misread your comment, now I know what you meant. Good
>> point, fixed (by Bruno).
>>
>> Cheers,
>> Lucas
>>
>> On Fri, Jul 19, 2024 at 4:44 PM Andrew Schofield
>> <andrew_schofi...@live.com> wrote:
>>>
>>> Hi Lucas,
>>> I see that I hit send too quickly. One more comment:
>>>
>>> AS2: I think stating that there will be a `kafka-streams-group.sh` in a
>>> future KIP is fine to keep this KIP focused. Personally, I would probably
>>> put all of the gory details in this KIP, but then it’s not my KIP. A
>> future
>>> pointer is fine too.
>>>
>>> Thanks,
>>> Andrew
>>>
>>>
>>>> On 19 Jul 2024, at 13:46, Lucas Brutschy <lbruts...@confluent.io.INVALID>
>> wrote:
>>>>
>>>> Hi Andrew,
>>>>
>>>> thanks for getting the discussion going! Here are my responses.
>>>>
>>>> AS1: Good point, done.
>>>>
>>>> AS2: We were planning to add more administrative tools to the
>>>> interface in a follow-up KIP, to not make this KIP too large. If
>>>> people think that it would help to understand the overall picture if
>>>> we already add something like `kafka-streams-groups.sh`, we will do
>>>> that. I also agree that we should address how this relates to
>>>> KIP-1043, we'll add it.
>>>>
>>>> AS3: Good idea, that's more consistent with `assigning` and
>> `reconciling` etc.
>>>>
>>>> AS4: Thanks, Fixed.
>>>>
>>>> AS5: Good catch. This was supposed to mean that we require CREATE on
>>>> cluster or CREATE on all topics, not both. Fixed.
>>>>
>>>> AS6: Thanks, Fixed.
>>>>
>>>> AS7. Thanks, Fixed.
>>>>
>>>> AS8: I think this works a bit different in this KIP than in consumer
>>>> groups. KIP-848 lets the members vote for a preferred assignor, and
>>>> the broker-side assignor is picked by majority vote. The
>>>> `group.consumer.assignors` specifies the list of assignors that are
>>>> supported on the broker, and is configurable because the interface is
>>>> pluggable. In this KIP, the task assignor is not voted on by members
>>>> but configured on the broker-side. `group.streams.assignor` is used
>>>> for this, and uses a specific name. If we'll make the task assignor
>>>> pluggable on the broker-side, we'd introduce a separate config
>>>> `group.streams.assignors`, which would indeed be a list of class
>>>> names. I think there is no conflict here, the two configurations serve
>>>> different purposes.  The only gripe I'd have here is naming as
>>>> `group.streams.assignor` and `group.streams.assignors` would be a bit
>>>> similar, but I cannot really think of a better name for
>>>> `group.streams.assignor`, so I'd probably rather introduce
>>>> `group.streams.assignors`  as `group.streams.possible_assignors`  or
>>>> something like that.
>>>>
>>>> AS9: I added explanations for the various record types. Apart from the
>>>> new topology record, and the partition metadata (which is based on the
>>>> topology and can only be created once we have a topology initialized)
>>>> the lifecycle for the records is basically identical as in KIP-848.
>>>>
>>>> AS10: In the consumer offset topic, the version in the key is used to
>>>> differentiate different schema types with the same content. So the
>>>> keys are not versioned, but the version field is "abused" as a type
>>>> tag. This is the same in KIP-848, we followed it for consistency.
>>>>
>>>> Cheers,
>>>> Lucas
>>>>
>>>>
>>>> On Thu, Jul 18, 2024 at 1:27 PM Andrew Schofield
>>>> <andrew_schofi...@live.com> wrote:
>>>>>
>>>>> Hi Lucas and Bruno,
>>>>>
>>>>> Thanks for the great KIP.
>>>>>
>>>>> I've read through the document and have some initial comments.
>>>>>
>>>>> AS1: I suppose that there is a new o.a.k.common.GroupType.STREAMS
>> enumeration
>>>>> constant. This is a change to the public interface and should be
>> called out.
>>>>>
>>>>> AS2: Since streams groups are no longer consumer groups, how does the
>> user
>>>>> manipulate them, observe lag and so on? Will you add
>> `kafka-streams-groups.sh`
>>>>> or extend `kafka-streams-application-reset.sh`? Of course, KIP-1043
>> can easily
>>>>> be extended to support streams groups, but that only lets the user
>> see the
>>>>> groups, not manipulate them.
>>>>>
>>>>> AS3: I wonder whether the streams group state of UNINITIALIZED would
>> be
>>>>> better expressed as INITIALIZING.
>>>>>
>>>>> AS4: In StreamsGroupInitializeRequest, Topology[].SourceTopicRegex
>> should
>>>>> be nullable.
>>>>>
>>>>> AS5: Why does StreamsGroupInitialize require CREATE permission on the
>>>>> cluster resource? I imagine that this is one of the ways that the
>> request might
>>>>> be granted permission to create the StateChangelogTopics and
>>>>> RepartitionSourceTopics, but if it is granted permission to create
>> those topics
>>>>> with specific ACLs, would CREATE on the cluster resource still be
>> required?
>>>>>
>>>>> AS6: StreamsGroupInitialize can also fail with
>> TOPIC_AUTHORIZATION_FAILED
>>>>> and (subject to AS5) CLUSTER_AUTHORIZATION_FAILED.
>>>>>
>>>>> AS7: A tiny nit. You've used TopologyID (capitals) in
>> StreamsGroupHeartbeatRequest
>>>>> and a few others, but in all other cases the fields which are ids are
>> spelled Id.
>>>>> I suggest TopologyId.
>>>>>
>>>>> Also, "interal" is probably meant to be "interval”.
>>>>>
>>>>> AS8: For consumer groups, the `group.consumer.assignors`
>> configuration is a
>>>>> list of class names. The assignors do have names too, but the
>> configuration which
>>>>> enables them is in terms of class names. I wonder whether the broker’s
>>>>> group.streams.assignor could actually be `group.streams.assignors`
>> and specified
>>>>> as a list of the class names of the supplied assignors. I know you're
>> not supporting
>>>>> other assignors yet, but when you do, I expect you would prefer to
>> have used class
>>>>> names from the start.
>>>>>
>>>>> The use of assignor names in the other places looks good to me.
>>>>>
>>>>> AS9: I'd find it really helpful to have a bit of a description about
>> the purpose and
>>>>> lifecycle of the 9 record types you've introduced on the
>> __consumer_offsets topic.
>>>>> I did a cursory review but without really understanding what's
>> written when,
>>>>> I can't do a thorough job of reviewing.
>>>>>
>>>>> AS10: In the definitions of the record keys, such as
>>>>> StreamsGroupCurrentMemberAssignmentKey, the versions of the fields
>> must
>>>>> match the versions of the types.
>>>>>
>>>>> Thanks,
>>>>> Andrew
>>>>>
>>>>>> On 12 Jul 2024, at 09:04, Lucas Brutschy <lbruts...@confluent.io.INVALID>
>> wrote:
>>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> I would like to start a discussion thread on KIP-1071: Streams
>>>>>> Rebalance Protocol. With this KIP, we aim to bring the principles
>> laid
>>>>>> down by KIP-848 to Kafka Streams, to make rebalances more reliable
>> and
>>>>>> scalable, and make Kafka Streams overall easier to deploy and
>> operate.
>>>>>> The KIP proposed moving the assignment logic to the broker, and
>>>>>> introducing a dedicated group type and dedicated RPCs for Kafka
>>>>>> Streams.
>>>>>>
>>>>>> The KIP is here:
>>>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1071%3A+Streams+Rebalance+Protocol
>>>>>>
>>>>>> This is joint work with Bruno Cadonna.
>>>>>>
>>>>>> Please take a look and let us know what you think.
>>>>>>
>>>>>> Best,
>>>>>> Lucas
>>>>>
>>>
>>

Reply via email to