Hi Lucas,
Thanks for your response. All makes sense for me, with just a couple
of follow-up comments.

AS8: So, really the broker config is the name of the default assignor
used unless it’s overridden by a group config. I have one suggestion,
which you can of course ignore, that you use
`group.streams.default.assignor.name` on the broker,
`group.streams.assignor.name` on the group,
`group.remote.assignor.name` on the application, and when you permit
pluggable assignors, you use `group.streams.assignors` for the list of
class names.

I’d prefer if we’d used `group.*.assignor.class.names` for all group types
but that ship has sailed.

AS10: Yes, exactly. In practice, you’ll assign the numbers as you implement
the KIP. Looks good to me.


I’ll take another look now the records are more fully described.

Thanks,
Andrew

> On 19 Jul 2024, at 13:46, Lucas Brutschy <lbruts...@confluent.io.INVALID> 
> wrote:
>
> Hi Andrew,
>
> thanks for getting the discussion going! Here are my responses.
>
> AS1: Good point, done.
>
> AS2: We were planning to add more administrative tools to the
> interface in a follow-up KIP, to not make this KIP too large. If
> people think that it would help to understand the overall picture if
> we already add something like `kafka-streams-groups.sh`, we will do
> that. I also agree that we should address how this relates to
> KIP-1043, we'll add it.
>
> AS3: Good idea, that's more consistent with `assigning` and `reconciling` etc.
>
> AS4: Thanks, Fixed.
>
> AS5: Good catch. This was supposed to mean that we require CREATE on
> cluster or CREATE on all topics, not both. Fixed.
>
> AS6: Thanks, Fixed.
>
> AS7. Thanks, Fixed.
>
> AS8: I think this works a bit different in this KIP than in consumer
> groups. KIP-848 lets the members vote for a preferred assignor, and
> the broker-side assignor is picked by majority vote. The
> `group.consumer.assignors` specifies the list of assignors that are
> supported on the broker, and is configurable because the interface is
> pluggable. In this KIP, the task assignor is not voted on by members
> but configured on the broker-side. `group.streams.assignor` is used
> for this, and uses a specific name. If we'll make the task assignor
> pluggable on the broker-side, we'd introduce a separate config
> `group.streams.assignors`, which would indeed be a list of class
> names. I think there is no conflict here, the two configurations serve
> different purposes.  The only gripe I'd have here is naming as
> `group.streams.assignor` and `group.streams.assignors` would be a bit
> similar, but I cannot really think of a better name for
> `group.streams.assignor`, so I'd probably rather introduce
> `group.streams.assignors`  as `group.streams.possible_assignors`  or
> something like that.
>
> AS9: I added explanations for the various record types. Apart from the
> new topology record, and the partition metadata (which is based on the
> topology and can only be created once we have a topology initialized)
> the lifecycle for the records is basically identical as in KIP-848.
>
> AS10: In the consumer offset topic, the version in the key is used to
> differentiate different schema types with the same content. So the
> keys are not versioned, but the version field is "abused" as a type
> tag. This is the same in KIP-848, we followed it for consistency.
>
> Cheers,
> Lucas
>
>
> On Thu, Jul 18, 2024 at 1:27 PM Andrew Schofield
> <andrew_schofi...@live.com> wrote:
>>
>> Hi Lucas and Bruno,
>>
>> Thanks for the great KIP.
>>
>> I've read through the document and have some initial comments.
>>
>> AS1: I suppose that there is a new o.a.k.common.GroupType.STREAMS enumeration
>> constant. This is a change to the public interface and should be called out.
>>
>> AS2: Since streams groups are no longer consumer groups, how does the user
>> manipulate them, observe lag and so on? Will you add 
>> `kafka-streams-groups.sh`
>> or extend `kafka-streams-application-reset.sh`? Of course, KIP-1043 can 
>> easily
>> be extended to support streams groups, but that only lets the user see the
>> groups, not manipulate them.
>>
>> AS3: I wonder whether the streams group state of UNINITIALIZED would be
>> better expressed as INITIALIZING.
>>
>> AS4: In StreamsGroupInitializeRequest, Topology[].SourceTopicRegex should
>> be nullable.
>>
>> AS5: Why does StreamsGroupInitialize require CREATE permission on the
>> cluster resource? I imagine that this is one of the ways that the request 
>> might
>> be granted permission to create the StateChangelogTopics and
>> RepartitionSourceTopics, but if it is granted permission to create those 
>> topics
>> with specific ACLs, would CREATE on the cluster resource still be required?
>>
>> AS6: StreamsGroupInitialize can also fail with TOPIC_AUTHORIZATION_FAILED
>> and (subject to AS5) CLUSTER_AUTHORIZATION_FAILED.
>>
>> AS7: A tiny nit. You've used TopologyID (capitals) in 
>> StreamsGroupHeartbeatRequest
>> and a few others, but in all other cases the fields which are ids are 
>> spelled Id.
>> I suggest TopologyId.
>>
>> Also, "interal" is probably meant to be "interval”.
>>
>> AS8: For consumer groups, the `group.consumer.assignors` configuration is a
>> list of class names. The assignors do have names too, but the configuration 
>> which
>> enables them is in terms of class names. I wonder whether the broker’s
>> group.streams.assignor could actually be `group.streams.assignors` and 
>> specified
>> as a list of the class names of the supplied assignors. I know you're not 
>> supporting
>> other assignors yet, but when you do, I expect you would prefer to have used 
>> class
>> names from the start.
>>
>> The use of assignor names in the other places looks good to me.
>>
>> AS9: I'd find it really helpful to have a bit of a description about the 
>> purpose and
>> lifecycle of the 9 record types you've introduced on the __consumer_offsets 
>> topic.
>> I did a cursory review but without really understanding what's written when,
>> I can't do a thorough job of reviewing.
>>
>> AS10: In the definitions of the record keys, such as
>> StreamsGroupCurrentMemberAssignmentKey, the versions of the fields must
>> match the versions of the types.
>>
>> Thanks,
>> Andrew
>>
>>> On 12 Jul 2024, at 09:04, Lucas Brutschy <lbruts...@confluent.io.INVALID> 
>>> wrote:
>>>
>>> Hi all,
>>>
>>> I would like to start a discussion thread on KIP-1071: Streams
>>> Rebalance Protocol. With this KIP, we aim to bring the principles laid
>>> down by KIP-848 to Kafka Streams, to make rebalances more reliable and
>>> scalable, and make Kafka Streams overall easier to deploy and operate.
>>> The KIP proposed moving the assignment logic to the broker, and
>>> introducing a dedicated group type and dedicated RPCs for Kafka
>>> Streams.
>>>
>>> The KIP is here:
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1071%3A+Streams+Rebalance+Protocol
>>>
>>> This is joint work with Bruno Cadonna.
>>>
>>> Please take a look and let us know what you think.
>>>
>>> Best,
>>> Lucas
>>

Reply via email to