Hi Andrew,

thanks for getting the discussion going! Here are my responses.

AS1: Good point, done.

AS2: We were planning to add more administrative tools to the
interface in a follow-up KIP, to not make this KIP too large. If
people think that it would help to understand the overall picture if
we already add something like `kafka-streams-groups.sh`, we will do
that. I also agree that we should address how this relates to
KIP-1043, we'll add it.

AS3: Good idea, that's more consistent with `assigning` and `reconciling` etc.

AS4: Thanks, Fixed.

AS5: Good catch. This was supposed to mean that we require CREATE on
cluster or CREATE on all topics, not both. Fixed.

AS6: Thanks, Fixed.

AS7. Thanks, Fixed.

AS8: I think this works a bit different in this KIP than in consumer
groups. KIP-848 lets the members vote for a preferred assignor, and
the broker-side assignor is picked by majority vote. The
`group.consumer.assignors` specifies the list of assignors that are
supported on the broker, and is configurable because the interface is
pluggable. In this KIP, the task assignor is not voted on by members
but configured on the broker-side. `group.streams.assignor` is used
for this, and uses a specific name. If we'll make the task assignor
pluggable on the broker-side, we'd introduce a separate config
`group.streams.assignors`, which would indeed be a list of class
names. I think there is no conflict here, the two configurations serve
different purposes.  The only gripe I'd have here is naming as
`group.streams.assignor` and `group.streams.assignors` would be a bit
similar, but I cannot really think of a better name for
`group.streams.assignor`, so I'd probably rather introduce
`group.streams.assignors`  as `group.streams.possible_assignors`  or
something like that.

AS9: I added explanations for the various record types. Apart from the
new topology record, and the partition metadata (which is based on the
topology and can only be created once we have a topology initialized)
the lifecycle for the records is basically identical as in KIP-848.

AS10: In the consumer offset topic, the version in the key is used to
differentiate different schema types with the same content. So the
keys are not versioned, but the version field is "abused" as a type
tag. This is the same in KIP-848, we followed it for consistency.

Cheers,
Lucas


On Thu, Jul 18, 2024 at 1:27 PM Andrew Schofield
<andrew_schofi...@live.com> wrote:
>
> Hi Lucas and Bruno,
>
> Thanks for the great KIP.
>
> I've read through the document and have some initial comments.
>
> AS1: I suppose that there is a new o.a.k.common.GroupType.STREAMS enumeration
> constant. This is a change to the public interface and should be called out.
>
> AS2: Since streams groups are no longer consumer groups, how does the user
> manipulate them, observe lag and so on? Will you add `kafka-streams-groups.sh`
> or extend `kafka-streams-application-reset.sh`? Of course, KIP-1043 can easily
> be extended to support streams groups, but that only lets the user see the
> groups, not manipulate them.
>
> AS3: I wonder whether the streams group state of UNINITIALIZED would be
> better expressed as INITIALIZING.
>
> AS4: In StreamsGroupInitializeRequest, Topology[].SourceTopicRegex should
> be nullable.
>
> AS5: Why does StreamsGroupInitialize require CREATE permission on the
> cluster resource? I imagine that this is one of the ways that the request 
> might
> be granted permission to create the StateChangelogTopics and
> RepartitionSourceTopics, but if it is granted permission to create those 
> topics
> with specific ACLs, would CREATE on the cluster resource still be required?
>
> AS6: StreamsGroupInitialize can also fail with TOPIC_AUTHORIZATION_FAILED
> and (subject to AS5) CLUSTER_AUTHORIZATION_FAILED.
>
> AS7: A tiny nit. You've used TopologyID (capitals) in 
> StreamsGroupHeartbeatRequest
> and a few others, but in all other cases the fields which are ids are spelled 
> Id.
> I suggest TopologyId.
>
> Also, "interal" is probably meant to be "interval”.
>
> AS8: For consumer groups, the `group.consumer.assignors` configuration is a
> list of class names. The assignors do have names too, but the configuration 
> which
> enables them is in terms of class names. I wonder whether the broker’s
> group.streams.assignor could actually be `group.streams.assignors` and 
> specified
> as a list of the class names of the supplied assignors. I know you're not 
> supporting
> other assignors yet, but when you do, I expect you would prefer to have used 
> class
> names from the start.
>
> The use of assignor names in the other places looks good to me.
>
> AS9: I'd find it really helpful to have a bit of a description about the 
> purpose and
> lifecycle of the 9 record types you've introduced on the __consumer_offsets 
> topic.
> I did a cursory review but without really understanding what's written when,
> I can't do a thorough job of reviewing.
>
> AS10: In the definitions of the record keys, such as
> StreamsGroupCurrentMemberAssignmentKey, the versions of the fields must
> match the versions of the types.
>
> Thanks,
> Andrew
>
> > On 12 Jul 2024, at 09:04, Lucas Brutschy <lbruts...@confluent.io.INVALID> 
> > wrote:
> >
> > Hi all,
> >
> > I would like to start a discussion thread on KIP-1071: Streams
> > Rebalance Protocol. With this KIP, we aim to bring the principles laid
> > down by KIP-848 to Kafka Streams, to make rebalances more reliable and
> > scalable, and make Kafka Streams overall easier to deploy and operate.
> > The KIP proposed moving the assignment logic to the broker, and
> > introducing a dedicated group type and dedicated RPCs for Kafka
> > Streams.
> >
> > The KIP is here:
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1071%3A+Streams+Rebalance+Protocol
> >
> > This is joint work with Bruno Cadonna.
> >
> > Please take a look and let us know what you think.
> >
> > Best,
> > Lucas
>

Reply via email to