Hi Nick,

NT4. I think it will be hard anyway to ensure that the assignor always
gets disjoint sets (there is no synchronized rebalance point anymore,
so locks wouldn't prevent two clients reporting the same dormant
task). So I think we'll have to lift this restriction. I was thinking
more that locking is required to prevent concurrent access. In
particular, I was expecting that the lock will avoid two threads
opening the same RocksDB in KIP-1035. Wouldn't this cause problems?

Cheers,
Lucas

On Fri, Aug 16, 2024 at 11:34 AM Nick Telford <nick.telf...@gmail.com> wrote:
>
> Hi Lucas,
>
> NT4.
> The reason I mentioned this was because, while implementing 1035, I
> stumbled across a problem: initially I had changed it so that threads
> always reported the lag for *all* dormant Tasks on-disk, even if it meant
> multiple threads reporting lag for the same Tasks. I found that this didn't
> work, apparently because the assignor assumes that multiple threads on the
> same instance always report disjoint sets.
>
> From reading through 1071, it sounded like this assumption is no longer
> being made by the assignor, and that the processId field would allow the
> assignor to understand when multiple clients reporting lag for the same
> Tasks are on the same instance. This would enable us to do away with the
> locking when reporting lag, and just have threads report the lag for every
> Task on-disk, even if other threads are reporting lag for the same Tasks.
>
> But it sounds like this is not correct, and that the new assignor will make
> the same assumptions as the old one?
>
> Regards,
> Nick
>
> On Fri, 16 Aug 2024 at 10:17, Lucas Brutschy <lbruts...@confluent.io.invalid>
> wrote:
>
> > Hi Nick!
> >
> > Thanks for getting involved in the discussion.
> >
> > NT1. We are always referring to offsets in the changelog topics here.
> > I tried to make it more consistent. But in the schemas and API, I find
> > "task changelog end offset" a bit lengthy, so we use "task offset" and
> > "task end offset" for short. We could change it, if people think this
> > is confusing.
> >
> > NT2. You are right. The confusing part is that the current streams
> > config is called `max.warmup.replicas`, but in the new protocol, we
> > are bounding the group-level parameter using
> > `group.streams.max.warmup.replicas`. If we wanted to keep
> > `group.streams.max.warmup.replicas` for the config name on the
> > group-level, we'd have to bound it using
> > `group.streams.max.max.warmup.replicas`. I prefer not doing this, but
> > open to suggestions.
> >
> > NT3. You are right, we do not need to make it this restrictive. I
> > think the main problem with having 10,000 warm-up replicas would be
> > that it slows down the assignment inside the broker - once we are
> > closer to production-ready implementation, we may have better numbers
> > of this and may revisit these defaults. I'll set the max to 100 for
> > now, but it would be good to hear what values people typically use in
> > their production workloads.
> >
> > NT4. We will actually only report the offsets if we manage to acquire
> > the lock. I tried to make this more precise. I suppose also with
> > KIP-1035, we'd require the lock to read the offset?
> >
> > Cheers,
> > Lucas
> >
> > On Thu, Aug 15, 2024 at 8:40 PM Nick Telford <nick.telf...@gmail.com>
> > wrote:
> > >
> > > Hi everyone,
> > >
> > > Looks really promising, and I can see this resolving several issues I've
> > > noticed. I particularly like the choice to use a String for Subtopology
> > ID,
> > > as it will (eventually) lead to a better solution to KIP-816.
> > >
> > > I noticed a few typos in the KIP that I thought I'd mention:
> > >
> > > NT1.
> > > In several places you refer to "task changelog end offsets", while in
> > > others, you call it "task end offsets". Which is it?
> > >
> > > NT2.
> > > Under "Group Configurations", you included
> > > "group.streams.max.warmup.replicas", but I think you meant
> > > "group.streams.num.warmup.replicas"?
> > >
> > > NT3.
> > > Not a typo, but a suggestion: it makes sense to set the default for
> > > "group.streams.num.warmup.replicas" to 2, for compatibility with the
> > > existing defaults, but why set the default for
> > > "group.streams.max.warmup.replicas" to only 4? That seems extremely
> > > restrictive. These "max" configs are typically used to prevent a subset
> > of
> > > users causing problems on the shared broker cluster - what's the reason
> > to
> > > set such a restrictive value for max warmup replicas? If I had 10,000
> > > warmup replicas, would it cause a noticeable problem on the brokers?
> > >
> > > NT4.
> > > It's implied that clients send the changelog offsets for *all* dormant
> > > stateful Tasks, but the current behaviour is that clients will only send
> > > the changelog offsets for the stateful Tasks that they are able to lock
> > > on-disk. Since this is a change in behaviour, perhaps this should be
> > called
> > > out explicitly?
> > >
> > > Regards,
> > > Nick
> > >
> > > On Thu, 15 Aug 2024 at 10:55, Lucas Brutschy <lbruts...@confluent.io
> > .invalid>
> > > wrote:
> > >
> > > > Hi Andrew,
> > > >
> > > > thanks for the comment.
> > > >
> > > > AS12: I clarified the command-line interface. It's supposed to be used
> > > > with --reset-offsets and --delete-offsets. I removed --topic.
> > > >
> > > > AS13: Yes, it's --delete. I clarified the command-line interface.
> > > >
> > > > Cheers,
> > > > Lucas
> > > >
> > > > On Tue, Aug 13, 2024 at 4:14 PM Andrew Schofield
> > > > <andrew_schofi...@live.com> wrote:
> > > > >
> > > > > Hi Lucas,
> > > > > Thanks for the KIP update.
> > > > >
> > > > > I think that `kafka-streams-groups.sh` looks like a good equivalent
> > to
> > > > > the tools for the other types of groups.
> > > > >
> > > > > AS12: In kafka-streams-groups.sh, the description for the
> > > > > --input-topics option seems insufficient. Why is an input topic
> > specified
> > > > > with this option different than a topic specified with --topic? Why
> > is
> > > > > It --input-topics rather than --input-topic? Which action of this
> > tool
> > > > > does this option apply to?
> > > > >
> > > > > AS13: Similarly, for --internal-topics, which action of the tool
> > does it
> > > > > apply to? I suppose it’s --delete, but it’s not clear to me.
> > > > >
> > > > > Thanks,
> > > > > Andrew
> > > > >
> > > > > > On 11 Aug 2024, at 12:10, Lucas Brutschy <lbruts...@confluent.io
> > .INVALID>
> > > > wrote:
> > > > > >
> > > > > > Hi Andrew/Lianet,
> > > > > >
> > > > > > I have added an administrative command-line tool (replacing
> > > > > > `kafka-streams-application-reset`) and extensions of the Admin API
> > for
> > > > > > listing, deleting, describing groups and listing, altering and
> > > > > > deleting offsets for streams groups. No new RPCs have to be added,
> > > > > > however, we duplicate some of the API in the admin client that
> > exist
> > > > > > for consumer groups. It seems to me cleaner to duplicate some
> > > > > > code/interface here, instead of using "consumer group" APIs for
> > > > > > streams groups, or renaming existing APIs that use "consumerGroup"
> > in
> > > > > > the name to something more generic (which wouldn't cover share
> > > > > > groups).
> > > > > >
> > > > > > I think for now, all comments are addressed.
> > > > > >
> > > > > > Cheers,
> > > > > > Lucas
> > > > > >
> > > > > > On Tue, Aug 6, 2024 at 3:19 PM Lucas Brutschy <
> > lbruts...@confluent.io>
> > > > wrote:
> > > > > >>
> > > > > >> Hi Lianet and Andrew,
> > > > > >>
> > > > > >> LM1/LM2: You are right. The idea is to omit fields exactly in the
> > same
> > > > > >> situations as in KIP-848. In the KIP, I stuck with how the
> > behavior
> > > > > >> was defined in KIP-848 (e.g. KIP-848 defined that that instance ID
> > > > > >> will be omitted if it did not change since the last heartbeat).
> > But
> > > > > >> you are correct that the implementation handles these details
> > slightly
> > > > > >> differently. I updated the KIP to match more closely the behavior
> > of
> > > > > >> the KIP-848 implementation.
> > > > > >>
> > > > > >> LM9: Yes, there are several options to do this. The idea is to
> > have
> > > > > >> only one client initialize the topology, not all clients. It seems
> > > > > >> easier to understand on the protocol level (otherwise we'd have N
> > > > > >> topology initializations racing with a hard-to-determine winner).
> > We
> > > > > >> also expect the payload of the request to grow in the future and
> > want
> > > > > >> to avoid the overhead of having all clients sending the topology
> > at
> > > > > >> the same time. But initializing the group could take some time -
> > we
> > > > > >> have to create internal topics, and maybe a client is
> > malfunctioning
> > > > > >> and the initialization has to be retried. It seemed a bit
> > confusing to
> > > > > >> return errors to all other clients that are trying to join the
> > group
> > > > > >> during that time - as if there was a problem with joining the
> > group /
> > > > > >> the contents of the heartbeat. It seems cleaner to me to let all
> > > > > >> clients successfully join the group and heartbeat, but remain in
> > an
> > > > > >> INITIALIZING state which does not yet assign any tasks. Does that
> > make
> > > > > >> sense to you? You are right that returning a retriable error and
> > > > > >> having all clients retry until the group is initialized would also
> > > > > >> work, it just doesn't model well that "everything is going
> > according
> > > > > >> to plan".
> > > > > >> As for the order of the calls - yes, I think it is fine to allow
> > an
> > > > > >> Initialize RPC before the first heartbeat for supporting future
> > admin
> > > > > >> tools. I made this change throughout the KIP, thanks!
> > > > > >>
> > > > > >> AS11: Yes, your understanding is correct. The number of tasks for
> > one
> > > > > >> subtopology is the maximum number of partitions in any of the
> > matched
> > > > > >> topics. What will happen in Kafka Streams is that the partitions
> > of
> > > > > >> the matched topics will effectively be merged during stream
> > > > > >> processing, so in your example, subtopology:0 would consume from
> > AB:0
> > > > > >> and AC:0.
> > > > > >>
> > > > > >> Cheers,
> > > > > >> Lucas
> > > > > >>
> > > > > >> On Fri, Aug 2, 2024 at 9:47 PM Lianet M. <liane...@gmail.com>
> > wrote:
> > > > > >>>
> > > > > >>> Hi Bruno, answering your questions:
> > > > > >>>
> > > > > >>> About the full heartbeat (LM1): I just wanted to confirm that
> > you'll
> > > > be
> > > > > >>> sending full HBs in case of errors in general. It's not clear
> > from
> > > > the KIP,
> > > > > >>> since it referred to sending Id/epoch and whatever had changed
> > since
> > > > the
> > > > > >>> last HB only. Sending full HB on error is key to ensure fresh
> > > > rejoins after
> > > > > >>> fencing for instance, and retries with all relevant info.
> > > > > >>>
> > > > > >>> About the instanceId (LM2): The instanceId is needed on every HB
> > to
> > > > be able
> > > > > >>> to identify a member using one that is already taken. On every
> > HB,
> > > > the
> > > > > >>> broker uses the instance id (if any) to retrieve the member ID
> > > > associated
> > > > > >>> with it, and checks it against the memberId received in the HB
> > > > > >>> (throwing UnreleasedInstance exception if needed). So similar to
> > my
> > > > > >>> previous point, just wanted to confirm that we are considering
> > that
> > > > here
> > > > > >>> too.
> > > > > >>>
> > > > > >>> Now some other thoughts:
> > > > > >>>
> > > > > >>> LM9: Definitely interesting imo if we can avoid the dependency
> > > > between the
> > > > > >>> StreamsGroupInitialize and the StreamsGroupHeartbeat. I totally
> > get
> > > > that
> > > > > >>> the initial client implementation will do a HB first, and that's
> > > > fine, but
> > > > > >>> not having the flow enforced at the protocol level would allow
> > for
> > > > further
> > > > > >>> improvement in the future (that initialize via admin idea you
> > > > mentioned,
> > > > > >>> for instance). Actually, I may be missing something about the HB,
> > > > but if we
> > > > > >>> are at the point where HB requires that the topology has been
> > > > initialized,
> > > > > >>> and the topology init requires the group, why is it the heartbeat
> > > > RPC the
> > > > > >>> one responsible for the group creation? (vs.
> > StreamsGroupInitialize
> > > > creates
> > > > > >>> group if needed + HB just fails if topology not initialized)
> > > > > >>>
> > > > > >>> Thanks!
> > > > > >>>
> > > > > >>> Lianet
> > > > > >>> (I didn't miss your answer on my INVALID_GROUP_TYPE proposal,
> > just
> > > > still
> > > > > >>> thinking about it in sync with the same discussion we're having
> > on
> > > > the
> > > > > >>> KIP-1043 thread...I'll come back on that)
> > > > > >>>
> > > > > >>> On Thu, Aug 1, 2024 at 10:55 AM Andrew Schofield <
> > > > andrew_schofi...@live.com>
> > > > > >>> wrote:
> > > > > >>>
> > > > > >>>> Hi Bruno,
> > > > > >>>> Thanks for adding the detail on the schemas on records written
> > to
> > > > > >>>> __consumer_offsets.
> > > > > >>>> I’ve reviewed them in detail and they look good to me. I have
> > one
> > > > naive
> > > > > >>>> question.
> > > > > >>>>
> > > > > >>>> AS11: I notice that an assignment is essentially a set of
> > partition
> > > > > >>>> indices for
> > > > > >>>> subtopologies. Since a subtopology can be defined by a source
> > topic
> > > > regex,
> > > > > >>>> does
> > > > > >>>> this mean that an assignment gives the same set of partition
> > > > indices for
> > > > > >>>> all topics
> > > > > >>>> which happen to match the regex? So, a subtopology reading from
> > A*
> > > > that
> > > > > >>>> matches
> > > > > >>>> AB and AC would give the same set of partitions to each task for
> > > > both
> > > > > >>>> topics, and
> > > > > >>>> is not able to give AB:0 to one task and AC:0 to a different
> > task.
> > > > Is this
> > > > > >>>> correct?
> > > > > >>>>
> > > > > >>>> Thanks,
> > > > > >>>> Andrew
> > > > > >>>>
> > > > > >>>>> On 23 Jul 2024, at 16:16, Bruno Cadonna <cado...@apache.org>
> > > > wrote:
> > > > > >>>>>
> > > > > >>>>> Hi Lianet,
> > > > > >>>>>
> > > > > >>>>> Thanks for the review!
> > > > > >>>>>
> > > > > >>>>> Here my answers:
> > > > > >>>>>
> > > > > >>>>> LM1. Is your question whether we need to send a full heartbeat
> > > > each time
> > > > > >>>> the member re-joins the group even if the information in the RPC
> > > > did not
> > > > > >>>> change since the last heartbeat?
> > > > > >>>>>
> > > > > >>>>> LM2. Is the reason for sending the instance ID each time that a
> > > > member
> > > > > >>>> could shutdown, change the instance ID and then start and
> > heartbeat
> > > > again,
> > > > > >>>> but the group coordinator would never notice that the instance
> > ID
> > > > changed?
> > > > > >>>>>
> > > > > >>>>> LM3. I see your point. I am wondering whether this additional
> > > > > >>>> information is worth the dependency between the group types. To
> > > > return
> > > > > >>>> INVALID_GROUP_TYPE, the group coordinator needs to know that a
> > > > group ID
> > > > > >>>> exists with a different group type. With a group coordinator as
> > we
> > > > have it
> > > > > >>>> now in Apache Kafka that manages all group types, that is not a
> > big
> > > > deal,
> > > > > >>>> but imagine if we (or some implementation of the Apache Kafka
> > > > protocol)
> > > > > >>>> decide to have a separate group coordinator for each group type.
> > > > > >>>>>
> > > > > >>>>> LM4. Using INVALID_GROUP_ID if the group ID is empty makes
> > sense
> > > > to me.
> > > > > >>>> I going to change that.
> > > > > >>>>>
> > > > > >>>>> LM5. I think there is a dependency from the
> > StreamsGroupInitialize
> > > > RPC
> > > > > >>>> to the heartbeat. The group must exist when the initialize RPC
> > is
> > > > received
> > > > > >>>> by the group coordinator. The group is created by the heartbeat
> > > > RPC. I
> > > > > >>>> would be in favor of making the initialize RPC independent from
> > the
> > > > > >>>> heartbeat RPC. That would allow to initialize a streams group
> > > > explicitly
> > > > > >>>> with an admin tool.
> > > > > >>>>>
> > > > > >>>>> LM6. I think it affects streams and streams should behave as
> > the
> > > > > >>>> consumer group.
> > > > > >>>>>
> > > > > >>>>> LM7. Good point that we will consider.
> > > > > >>>>>
> > > > > >>>>> LM8. Fixed! Thanks!
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>> Best,
> > > > > >>>>> Bruno
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>> On 7/19/24 9:53 PM, Lianet M. wrote:
> > > > > >>>>>> Hi Lucas/Bruno, thanks for the great KIP! First comments:
> > > > > >>>>>> LM1. Related to where the KIP says:  *“Group ID, member ID,
> > > > member epoch
> > > > > >>>>>> are sent with each heartbeat request. Any other information
> > that
> > > > has not
> > > > > >>>>>> changed since the last heartbeat can be omitted.”. *I expect
> > all
> > > > the
> > > > > >>>> other
> > > > > >>>>>> info also needs to be sent whenever a full heartbeat is
> > required
> > > > (even
> > > > > >>>> if
> > > > > >>>>>> it didn’t change from the last heartbeat), ex. on fencing
> > > > scenarios,
> > > > > >>>>>> correct?
> > > > > >>>>>> LM2. For consumer groups we always send the groupInstanceId
> > (if
> > > > any) as
> > > > > >>>>>> part of every heartbeat, along with memberId, epoch and
> > groupId.
> > > > Should
> > > > > >>>> we
> > > > > >>>>>> consider that too here?
> > > > > >>>>>> LM3. We’re proposing returning a GROUP_ID_NOT_FOUND error in
> > > > response to
> > > > > >>>>>> the stream-specific RPCs if the groupId is associated with a
> > > > group type
> > > > > >>>>>> that is not streams (ie. consumer group or share group). I
> > wonder
> > > > if at
> > > > > >>>>>> this point, where we're getting several new group types added,
> > > > each with
> > > > > >>>>>> RPCs that are supposed to include groupId of a certain type,
> > we
> > > > should
> > > > > >>>> be
> > > > > >>>>>> more explicit about this situation. Maybe a kind of
> > > > INVALID_GROUP_TYPE
> > > > > >>>>>> (group exists but not with a valid type for this RPC) vs a
> > > > > >>>>>> GROUP_ID_NOT_FOUND (group does not exist).  Those errors
> > would be
> > > > > >>>>>> consistently used across consumer, share, and streams RPCs
> > > > whenever the
> > > > > >>>>>> group id is not of the expected type.
> > > > > >>>>>> This is truly not specific to this KIP, and should be
> > addressed
> > > > with all
> > > > > >>>>>> group types and their RPCs in mind. I just wanted to bring
> > out my
> > > > > >>>> concern
> > > > > >>>>>> and get thoughts around it.
> > > > > >>>>>> LM4. On a related note, StreamsGroupDescribe returns
> > > > INVALID_REQUEST if
> > > > > >>>>>> groupId is empty. There is already an INVALID_GROUP_ID error,
> > > > that seems
> > > > > >>>>>> more specific to this situation. Error handling of specific
> > > > errors would
> > > > > >>>>>> definitely be easier than having to deal with a generic
> > > > INVALID_REQUEST
> > > > > >>>>>> (and probably its custom message). I know that for KIP-848 we
> > have
> > > > > >>>>>> INVALID_REQUEST for similar situations, so if ever we take
> > down
> > > > this
> > > > > >>>> path
> > > > > >>>>>> we should review it there too for consistency. Thoughts?
> > > > > >>>>>> LM5. The dependency between the StreamsGroupHeartbeat RPC and
> > the
> > > > > >>>>>> StreamsGroupInitialize RPC is one-way only right? HB requires
> > a
> > > > previous
> > > > > >>>>>> StreamsGroupInitialize request, but StreamsGroupInitialize
> > > > processing is
> > > > > >>>>>> totally independent of heartbeats (and could perfectly be
> > > > processed
> > > > > >>>> without
> > > > > >>>>>> a previous HB, even though the client implementation we’re
> > > > proposing
> > > > > >>>> won’t
> > > > > >>>>>> go down that path). Is my understanding correct? Just to
> > double
> > > > check,
> > > > > >>>>>> seems sensible like that at the protocol level.
> > > > > >>>>>> LM6. With KIP-848, there is an important improvement that
> > brings a
> > > > > >>>>>> difference in behaviour around the static membership: with the
> > > > classic
> > > > > >>>>>> protocol, if a static member joins with a group instance
> > already
> > > > in
> > > > > >>>> use, it
> > > > > >>>>>> makes the initial member fail with a FENCED_INSTANCED_ID
> > > > exception, vs.
> > > > > >>>>>> with the new consumer group protocol, the second member
> > trying to
> > > > join
> > > > > >>>>>> fails with an UNRELEASED_INSTANCE_ID. Does this change need
> > to be
> > > > > >>>>>> considered in any way for the streams app? (I'm not familiar
> > with
> > > > KS
> > > > > >>>> yet,
> > > > > >>>>>> but thought it was worth asking. If it doesn't affect in any
> > way,
> > > > still
> > > > > >>>>>> maybe helpful to call it out on a section for static
> > membership)
> > > > > >>>>>> LM7. Regarding the admin tool to manage streams groups. We can
> > > > discuss
> > > > > >>>>>> whether to have it here or separately, but I think we should
> > aim
> > > > for
> > > > > >>>> some
> > > > > >>>>>> basic admin capabilities from the start, mainly because I
> > believe
> > > > it
> > > > > >>>> will
> > > > > >>>>>> be very helpful/needed in practice during the impl of the KIP.
> > > > From
> > > > > >>>>>> experience with KIP-848, we felt a bit blindfolded in the
> > initial
> > > > phase
> > > > > >>>>>> where we still didn't have kafka-consumer-groups dealing with
> > the
> > > > new
> > > > > >>>>>> groups (and then it was very helpful and used when we were
> > able to
> > > > > >>>> easily
> > > > > >>>>>> inspect them from the console)
> > > > > >>>>>> LM8. nit: the links the KIP-848 are not quite right (pointing
> > to
> > > > an
> > > > > >>>>>> unrelated “Future work section” at the end of KIP-848)
> > > > > >>>>>> Thanks!
> > > > > >>>>>> Lianet
> > > > > >>>>>> On Fri, Jul 19, 2024 at 11:13 AM Lucas Brutschy
> > > > > >>>>>> <lbruts...@confluent.io.invalid> wrote:
> > > > > >>>>>>> Hi Andrew,
> > > > > >>>>>>>
> > > > > >>>>>>> AS2: I added a note for now. If others feel strongly about
> > it,
> > > > we can
> > > > > >>>>>>> still add more administrative tools to the KIP - it should
> > not
> > > > change
> > > > > >>>>>>> the overall story significantly.
> > > > > >>>>>>>
> > > > > >>>>>>> AS8: "streams.group.assignor.name" sounds good to me to
> > > > distinguish
> > > > > >>>>>>> the config from class names. Not sure if I like the
> > "default".
> > > > To be
> > > > > >>>>>>> consistent, we'd then have to call it
> > > > > >>>>>>> `group.streams.default.session.timeout.ms` as well. I only
> > > > added the
> > > > > >>>>>>> `.name` on both broker and group level for now.
> > > > > >>>>>>>
> > > > > >>>>>>> AS10: Ah, I misread your comment, now I know what you meant.
> > Good
> > > > > >>>>>>> point, fixed (by Bruno).
> > > > > >>>>>>>
> > > > > >>>>>>> Cheers,
> > > > > >>>>>>> Lucas
> > > > > >>>>>>>
> > > > > >>>>>>> On Fri, Jul 19, 2024 at 4:44 PM Andrew Schofield
> > > > > >>>>>>> <andrew_schofi...@live.com> wrote:
> > > > > >>>>>>>>
> > > > > >>>>>>>> Hi Lucas,
> > > > > >>>>>>>> I see that I hit send too quickly. One more comment:
> > > > > >>>>>>>>
> > > > > >>>>>>>> AS2: I think stating that there will be a
> > > > `kafka-streams-group.sh` in
> > > > > >>>> a
> > > > > >>>>>>>> future KIP is fine to keep this KIP focused. Personally, I
> > would
> > > > > >>>> probably
> > > > > >>>>>>>> put all of the gory details in this KIP, but then it’s not
> > my
> > > > KIP. A
> > > > > >>>>>>> future
> > > > > >>>>>>>> pointer is fine too.
> > > > > >>>>>>>>
> > > > > >>>>>>>> Thanks,
> > > > > >>>>>>>> Andrew
> > > > > >>>>>>>>
> > > > > >>>>>>>>
> > > > > >>>>>>>>> On 19 Jul 2024, at 13:46, Lucas Brutschy <
> > > > lbruts...@confluent.io
> > > > > >>>> .INVALID>
> > > > > >>>>>>> wrote:
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> Hi Andrew,
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> thanks for getting the discussion going! Here are my
> > responses.
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> AS1: Good point, done.
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> AS2: We were planning to add more administrative tools to
> > the
> > > > > >>>>>>>>> interface in a follow-up KIP, to not make this KIP too
> > large.
> > > > If
> > > > > >>>>>>>>> people think that it would help to understand the overall
> > > > picture if
> > > > > >>>>>>>>> we already add something like `kafka-streams-groups.sh`, we
> > > > will do
> > > > > >>>>>>>>> that. I also agree that we should address how this relates
> > to
> > > > > >>>>>>>>> KIP-1043, we'll add it.
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> AS3: Good idea, that's more consistent with `assigning` and
> > > > > >>>>>>> `reconciling` etc.
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> AS4: Thanks, Fixed.
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> AS5: Good catch. This was supposed to mean that we require
> > > > CREATE on
> > > > > >>>>>>>>> cluster or CREATE on all topics, not both. Fixed.
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> AS6: Thanks, Fixed.
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> AS7. Thanks, Fixed.
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> AS8: I think this works a bit different in this KIP than in
> > > > consumer
> > > > > >>>>>>>>> groups. KIP-848 lets the members vote for a preferred
> > > > assignor, and
> > > > > >>>>>>>>> the broker-side assignor is picked by majority vote. The
> > > > > >>>>>>>>> `group.consumer.assignors` specifies the list of assignors
> > > > that are
> > > > > >>>>>>>>> supported on the broker, and is configurable because the
> > > > interface is
> > > > > >>>>>>>>> pluggable. In this KIP, the task assignor is not voted on
> > by
> > > > members
> > > > > >>>>>>>>> but configured on the broker-side.
> > `group.streams.assignor` is
> > > > used
> > > > > >>>>>>>>> for this, and uses a specific name. If we'll make the task
> > > > assignor
> > > > > >>>>>>>>> pluggable on the broker-side, we'd introduce a separate
> > config
> > > > > >>>>>>>>> `group.streams.assignors`, which would indeed be a list of
> > > > class
> > > > > >>>>>>>>> names. I think there is no conflict here, the two
> > > > configurations
> > > > > >>>> serve
> > > > > >>>>>>>>> different purposes.  The only gripe I'd have here is
> > naming as
> > > > > >>>>>>>>> `group.streams.assignor` and `group.streams.assignors`
> > would
> > > > be a bit
> > > > > >>>>>>>>> similar, but I cannot really think of a better name for
> > > > > >>>>>>>>> `group.streams.assignor`, so I'd probably rather introduce
> > > > > >>>>>>>>> `group.streams.assignors`  as
> > > > `group.streams.possible_assignors`  or
> > > > > >>>>>>>>> something like that.
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> AS9: I added explanations for the various record types.
> > Apart
> > > > from
> > > > > >>>> the
> > > > > >>>>>>>>> new topology record, and the partition metadata (which is
> > > > based on
> > > > > >>>> the
> > > > > >>>>>>>>> topology and can only be created once we have a topology
> > > > initialized)
> > > > > >>>>>>>>> the lifecycle for the records is basically identical as in
> > > > KIP-848.
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> AS10: In the consumer offset topic, the version in the key
> > is
> > > > used to
> > > > > >>>>>>>>> differentiate different schema types with the same
> > content. So
> > > > the
> > > > > >>>>>>>>> keys are not versioned, but the version field is "abused"
> > as a
> > > > type
> > > > > >>>>>>>>> tag. This is the same in KIP-848, we followed it for
> > > > consistency.
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> Cheers,
> > > > > >>>>>>>>> Lucas
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> On Thu, Jul 18, 2024 at 1:27 PM Andrew Schofield
> > > > > >>>>>>>>> <andrew_schofi...@live.com> wrote:
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> Hi Lucas and Bruno,
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> Thanks for the great KIP.
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> I've read through the document and have some initial
> > comments.
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> AS1: I suppose that there is a new
> > > > o.a.k.common.GroupType.STREAMS
> > > > > >>>>>>> enumeration
> > > > > >>>>>>>>>> constant. This is a change to the public interface and
> > should
> > > > be
> > > > > >>>>>>> called out.
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> AS2: Since streams groups are no longer consumer groups,
> > how
> > > > does
> > > > > >>>> the
> > > > > >>>>>>> user
> > > > > >>>>>>>>>> manipulate them, observe lag and so on? Will you add
> > > > > >>>>>>> `kafka-streams-groups.sh`
> > > > > >>>>>>>>>> or extend `kafka-streams-application-reset.sh`? Of course,
> > > > KIP-1043
> > > > > >>>>>>> can easily
> > > > > >>>>>>>>>> be extended to support streams groups, but that only lets
> > the
> > > > user
> > > > > >>>>>>> see the
> > > > > >>>>>>>>>> groups, not manipulate them.
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> AS3: I wonder whether the streams group state of
> > > > UNINITIALIZED would
> > > > > >>>>>>> be
> > > > > >>>>>>>>>> better expressed as INITIALIZING.
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> AS4: In StreamsGroupInitializeRequest,
> > > > Topology[].SourceTopicRegex
> > > > > >>>>>>> should
> > > > > >>>>>>>>>> be nullable.
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> AS5: Why does StreamsGroupInitialize require CREATE
> > > > permission on
> > > > > >>>> the
> > > > > >>>>>>>>>> cluster resource? I imagine that this is one of the ways
> > that
> > > > the
> > > > > >>>>>>> request might
> > > > > >>>>>>>>>> be granted permission to create the StateChangelogTopics
> > and
> > > > > >>>>>>>>>> RepartitionSourceTopics, but if it is granted permission
> > to
> > > > create
> > > > > >>>>>>> those topics
> > > > > >>>>>>>>>> with specific ACLs, would CREATE on the cluster resource
> > > > still be
> > > > > >>>>>>> required?
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> AS6: StreamsGroupInitialize can also fail with
> > > > > >>>>>>> TOPIC_AUTHORIZATION_FAILED
> > > > > >>>>>>>>>> and (subject to AS5) CLUSTER_AUTHORIZATION_FAILED.
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> AS7: A tiny nit. You've used TopologyID (capitals) in
> > > > > >>>>>>> StreamsGroupHeartbeatRequest
> > > > > >>>>>>>>>> and a few others, but in all other cases the fields which
> > are
> > > > ids
> > > > > >>>> are
> > > > > >>>>>>> spelled Id.
> > > > > >>>>>>>>>> I suggest TopologyId.
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> Also, "interal" is probably meant to be "interval”.
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> AS8: For consumer groups, the `group.consumer.assignors`
> > > > > >>>>>>> configuration is a
> > > > > >>>>>>>>>> list of class names. The assignors do have names too, but
> > the
> > > > > >>>>>>> configuration which
> > > > > >>>>>>>>>> enables them is in terms of class names. I wonder whether
> > the
> > > > > >>>> broker’s
> > > > > >>>>>>>>>> group.streams.assignor could actually be
> > > > `group.streams.assignors`
> > > > > >>>>>>> and specified
> > > > > >>>>>>>>>> as a list of the class names of the supplied assignors. I
> > know
> > > > > >>>> you're
> > > > > >>>>>>> not supporting
> > > > > >>>>>>>>>> other assignors yet, but when you do, I expect you would
> > > > prefer to
> > > > > >>>>>>> have used class
> > > > > >>>>>>>>>> names from the start.
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> The use of assignor names in the other places looks good
> > to
> > > > me.
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> AS9: I'd find it really helpful to have a bit of a
> > > > description about
> > > > > >>>>>>> the purpose and
> > > > > >>>>>>>>>> lifecycle of the 9 record types you've introduced on the
> > > > > >>>>>>> __consumer_offsets topic.
> > > > > >>>>>>>>>> I did a cursory review but without really understanding
> > what's
> > > > > >>>>>>> written when,
> > > > > >>>>>>>>>> I can't do a thorough job of reviewing.
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> AS10: In the definitions of the record keys, such as
> > > > > >>>>>>>>>> StreamsGroupCurrentMemberAssignmentKey, the versions of
> > the
> > > > fields
> > > > > >>>>>>> must
> > > > > >>>>>>>>>> match the versions of the types.
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> Thanks,
> > > > > >>>>>>>>>> Andrew
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>> On 12 Jul 2024, at 09:04, Lucas Brutschy <
> > > > lbruts...@confluent.io
> > > > > >>>> .INVALID>
> > > > > >>>>>>> wrote:
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>> Hi all,
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>> I would like to start a discussion thread on KIP-1071:
> > > > Streams
> > > > > >>>>>>>>>>> Rebalance Protocol. With this KIP, we aim to bring the
> > > > principles
> > > > > >>>>>>> laid
> > > > > >>>>>>>>>>> down by KIP-848 to Kafka Streams, to make rebalances more
> > > > reliable
> > > > > >>>>>>> and
> > > > > >>>>>>>>>>> scalable, and make Kafka Streams overall easier to
> > deploy and
> > > > > >>>>>>> operate.
> > > > > >>>>>>>>>>> The KIP proposed moving the assignment logic to the
> > broker,
> > > > and
> > > > > >>>>>>>>>>> introducing a dedicated group type and dedicated RPCs for
> > > > Kafka
> > > > > >>>>>>>>>>> Streams.
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>> The KIP is here:
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>
> > > > > >>>>
> > > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1071%3A+Streams+Rebalance+Protocol
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>> This is joint work with Bruno Cadonna.
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>> Please take a look and let us know what you think.
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>> Best,
> > > > > >>>>>>>>>>> Lucas
> > > > >
> > > > >
> > > >
> >

Reply via email to