Hi Lucas,

NT4.
Sounds good, although should it take the maximum offsets? Wouldn't it be
more correct to take the *most recent* offsets? (i.e. the offsets from the
more recently received heartbeat)
My thinking is that it might be possible (albeit exceptionally rare) for
the on-disk offsets to revert to a previous number, and taking the max
would incorrectly assume the older offsets are correct.

Regards,
Nick

On Mon, 19 Aug 2024 at 15:00, Lucas Brutschy <lbruts...@confluent.io.invalid>
wrote:

> Hi Nick,
>
> NT4: As discussed, we will still require locking in the new protocol
> to avoid concurrent read/write access on the checkpoint file, at least
> as long as KIP-1035 hasn't landed. However, as you correctly pointed
> out, the assignor will have to accept offsets for overlapping sets of
> dormant tasks. I updated the KIP to make this explicit. If the
> corresponding offset information for one task conflicts between
> clients (which can happen), the conflict is resolved by taking the
> maximum of the offsets.
>
> Cheers,
> Lucas
>
> On Fri, Aug 16, 2024 at 7:14 PM Guozhang Wang
> <guozhang.wang...@gmail.com> wrote:
> >
> > Hello Lucas,
> >
> > Thanks for the great KIP. I've read it through and it looks good to
> > me. As we've discussed, much of my thoughts would be outside the scope
> > of this very well scoped and defined KIP, so I will omit them for now.
> >
> > The only one I had related to this KIP is about topology updating. I
> > understand the motivation of the proposal is that basically since each
> > time group forming a (new) generation may potentially accept not all
> > of the members joining because of the timing of the RPCs, the group's
> > topology ID may be not reflecting the "actual" most recent topologies
> > if some zombie members holding an old topology form a group generation
> > quickly enough, which would effectively mean that zombie members
> > actually blocking other real members from getting tasks assigned. On
> > the other hand, like you've mentioned already in the doc, requesting
> > some sort of ID ordering by pushing the burden on the user's side
> > would also be too much for users, increasing the risk of human errors
> > in operations.
> >
> > I'm wondering if instead of trying to be smart programmingly, we just
> > let the protocol to act dumbly (details below). The main reasons I had
> > in mind are:
> >
> > 1) Upon topology changes, some tasks may no longer exist in the new
> > topology, so still letting them execute on the clients which do not
> > yet have the new topology would waste resources.
> >
> > 2) As we discussed, trying to act smart introduces more complexities
> > in the coordinator that tries to balance different assignment goals
> > between stickiness, balance, and now topology mis-matches between
> > clients.
> >
> > 3) Scenarios that mismatching topologies be observed within a group
> generation:
> >    a. Zombie / old clients that do not have the new topology, and will
> > never have.
> >    b. During a rolling bounce upgrade, where not-yet-bounced clients
> > would not yet have the new topology.
> >    c. Let's assume we would not ever have scenarios where users want
> > to intentionally have a subset of clients within a group running a
> > partial / subset of the full sub-topologies, since such cases can well
> > be covered by a custom assignor that takes into those considerations
> > by never assigning some tasks to some clients etc. That means, the
> > only scenarios we would need to consider are a) and b).
> >
> > For b), I think it's actually okay to temporarily block the progress
> > of the group until everyone is bounced with the updated topology; as
> > for a), originally I thought having one or a few clients blocking the
> > whole group would be a big problem, but now that I think more, I felt
> > from the operations point of view, just letting the app being blocked
> > with a informational log entry to quickly ping-down the zombie clients
> > may actually be acceptable. All in all, it makes the code simpler
> > programmingly by not trying to abstract away issue scenario a) from
> > the users (or operators) but letting them know asap.
> >
> > ----------
> >
> > Other than that, everything else looks good to me.
> >
> >
> > Guozhang
> >
> >
> > On Fri, Aug 16, 2024 at 7:38 AM Nick Telford <nick.telf...@gmail.com>
> wrote:
> > >
> > > Hi Lucas,
> > >
> > > NT4.
> > > Given that the new assignment procedure guarantees that a Task has been
> > > closed before it is assigned to a different client, I don't think there
> > > should be a problem with concurrent access? I don't think we should
> worry
> > > too much about 1035 here, as it's orthogonal to 1071. I don't think
> that
> > > 1035 *requires* the locking, and indeed once 1071 is the only
> assignment
> > > mechanism, we should be able to do away with the locking completely (I
> > > think).
> > >
> > > Anyway, given your point about it not being possible to guarantee
> disjoint
> > > sets, does it make sense to require clients to continue to supply the
> lags
> > > for only a subset of the dormant Tasks on-disk? Wouldn't it be simpler
> to
> > > just have them supply everything, since the assignor has to handle
> > > overlapping sets anyway?
> > >
> > > Cheers,
> > > Nick
> > >
> > > On Fri, 16 Aug 2024 at 13:51, Lucas Brutschy <lbruts...@confluent.io
> .invalid>
> > > wrote:
> > >
> > > > Hi Nick,
> > > >
> > > > NT4. I think it will be hard anyway to ensure that the assignor
> always
> > > > gets disjoint sets (there is no synchronized rebalance point anymore,
> > > > so locks wouldn't prevent two clients reporting the same dormant
> > > > task). So I think we'll have to lift this restriction. I was thinking
> > > > more that locking is required to prevent concurrent access. In
> > > > particular, I was expecting that the lock will avoid two threads
> > > > opening the same RocksDB in KIP-1035. Wouldn't this cause problems?
> > > >
> > > > Cheers,
> > > > Lucas
> > > >
> > > > On Fri, Aug 16, 2024 at 11:34 AM Nick Telford <
> nick.telf...@gmail.com>
> > > > wrote:
> > > > >
> > > > > Hi Lucas,
> > > > >
> > > > > NT4.
> > > > > The reason I mentioned this was because, while implementing 1035, I
> > > > > stumbled across a problem: initially I had changed it so that
> threads
> > > > > always reported the lag for *all* dormant Tasks on-disk, even if
> it meant
> > > > > multiple threads reporting lag for the same Tasks. I found that
> this
> > > > didn't
> > > > > work, apparently because the assignor assumes that multiple
> threads on
> > > > the
> > > > > same instance always report disjoint sets.
> > > > >
> > > > > From reading through 1071, it sounded like this assumption is no
> longer
> > > > > being made by the assignor, and that the processId field would
> allow the
> > > > > assignor to understand when multiple clients reporting lag for the
> same
> > > > > Tasks are on the same instance. This would enable us to do away
> with the
> > > > > locking when reporting lag, and just have threads report the lag
> for
> > > > every
> > > > > Task on-disk, even if other threads are reporting lag for the same
> Tasks.
> > > > >
> > > > > But it sounds like this is not correct, and that the new assignor
> will
> > > > make
> > > > > the same assumptions as the old one?
> > > > >
> > > > > Regards,
> > > > > Nick
> > > > >
> > > > > On Fri, 16 Aug 2024 at 10:17, Lucas Brutschy <
> lbruts...@confluent.io
> > > > .invalid>
> > > > > wrote:
> > > > >
> > > > > > Hi Nick!
> > > > > >
> > > > > > Thanks for getting involved in the discussion.
> > > > > >
> > > > > > NT1. We are always referring to offsets in the changelog topics
> here.
> > > > > > I tried to make it more consistent. But in the schemas and API,
> I find
> > > > > > "task changelog end offset" a bit lengthy, so we use "task
> offset" and
> > > > > > "task end offset" for short. We could change it, if people think
> this
> > > > > > is confusing.
> > > > > >
> > > > > > NT2. You are right. The confusing part is that the current
> streams
> > > > > > config is called `max.warmup.replicas`, but in the new protocol,
> we
> > > > > > are bounding the group-level parameter using
> > > > > > `group.streams.max.warmup.replicas`. If we wanted to keep
> > > > > > `group.streams.max.warmup.replicas` for the config name on the
> > > > > > group-level, we'd have to bound it using
> > > > > > `group.streams.max.max.warmup.replicas`. I prefer not doing
> this, but
> > > > > > open to suggestions.
> > > > > >
> > > > > > NT3. You are right, we do not need to make it this restrictive. I
> > > > > > think the main problem with having 10,000 warm-up replicas would
> be
> > > > > > that it slows down the assignment inside the broker - once we are
> > > > > > closer to production-ready implementation, we may have better
> numbers
> > > > > > of this and may revisit these defaults. I'll set the max to 100
> for
> > > > > > now, but it would be good to hear what values people typically
> use in
> > > > > > their production workloads.
> > > > > >
> > > > > > NT4. We will actually only report the offsets if we manage to
> acquire
> > > > > > the lock. I tried to make this more precise. I suppose also with
> > > > > > KIP-1035, we'd require the lock to read the offset?
> > > > > >
> > > > > > Cheers,
> > > > > > Lucas
> > > > > >
> > > > > > On Thu, Aug 15, 2024 at 8:40 PM Nick Telford <
> nick.telf...@gmail.com>
> > > > > > wrote:
> > > > > > >
> > > > > > > Hi everyone,
> > > > > > >
> > > > > > > Looks really promising, and I can see this resolving several
> issues
> > > > I've
> > > > > > > noticed. I particularly like the choice to use a String for
> > > > Subtopology
> > > > > > ID,
> > > > > > > as it will (eventually) lead to a better solution to KIP-816.
> > > > > > >
> > > > > > > I noticed a few typos in the KIP that I thought I'd mention:
> > > > > > >
> > > > > > > NT1.
> > > > > > > In several places you refer to "task changelog end offsets",
> while in
> > > > > > > others, you call it "task end offsets". Which is it?
> > > > > > >
> > > > > > > NT2.
> > > > > > > Under "Group Configurations", you included
> > > > > > > "group.streams.max.warmup.replicas", but I think you meant
> > > > > > > "group.streams.num.warmup.replicas"?
> > > > > > >
> > > > > > > NT3.
> > > > > > > Not a typo, but a suggestion: it makes sense to set the
> default for
> > > > > > > "group.streams.num.warmup.replicas" to 2, for compatibility
> with the
> > > > > > > existing defaults, but why set the default for
> > > > > > > "group.streams.max.warmup.replicas" to only 4? That seems
> extremely
> > > > > > > restrictive. These "max" configs are typically used to prevent
> a
> > > > subset
> > > > > > of
> > > > > > > users causing problems on the shared broker cluster - what's
> the
> > > > reason
> > > > > > to
> > > > > > > set such a restrictive value for max warmup replicas? If I had
> 10,000
> > > > > > > warmup replicas, would it cause a noticeable problem on the
> brokers?
> > > > > > >
> > > > > > > NT4.
> > > > > > > It's implied that clients send the changelog offsets for *all*
> > > > dormant
> > > > > > > stateful Tasks, but the current behaviour is that clients will
> only
> > > > send
> > > > > > > the changelog offsets for the stateful Tasks that they are
> able to
> > > > lock
> > > > > > > on-disk. Since this is a change in behaviour, perhaps this
> should be
> > > > > > called
> > > > > > > out explicitly?
> > > > > > >
> > > > > > > Regards,
> > > > > > > Nick
> > > > > > >
> > > > > > > On Thu, 15 Aug 2024 at 10:55, Lucas Brutschy <
> lbruts...@confluent.io
> > > > > > .invalid>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Andrew,
> > > > > > > >
> > > > > > > > thanks for the comment.
> > > > > > > >
> > > > > > > > AS12: I clarified the command-line interface. It's supposed
> to be
> > > > used
> > > > > > > > with --reset-offsets and --delete-offsets. I removed --topic.
> > > > > > > >
> > > > > > > > AS13: Yes, it's --delete. I clarified the command-line
> interface.
> > > > > > > >
> > > > > > > > Cheers,
> > > > > > > > Lucas
> > > > > > > >
> > > > > > > > On Tue, Aug 13, 2024 at 4:14 PM Andrew Schofield
> > > > > > > > <andrew_schofi...@live.com> wrote:
> > > > > > > > >
> > > > > > > > > Hi Lucas,
> > > > > > > > > Thanks for the KIP update.
> > > > > > > > >
> > > > > > > > > I think that `kafka-streams-groups.sh` looks like a good
> > > > equivalent
> > > > > > to
> > > > > > > > > the tools for the other types of groups.
> > > > > > > > >
> > > > > > > > > AS12: In kafka-streams-groups.sh, the description for the
> > > > > > > > > --input-topics option seems insufficient. Why is an input
> topic
> > > > > > specified
> > > > > > > > > with this option different than a topic specified with
> --topic?
> > > > Why
> > > > > > is
> > > > > > > > > It --input-topics rather than --input-topic? Which action
> of this
> > > > > > tool
> > > > > > > > > does this option apply to?
> > > > > > > > >
> > > > > > > > > AS13: Similarly, for --internal-topics, which action of
> the tool
> > > > > > does it
> > > > > > > > > apply to? I suppose it’s --delete, but it’s not clear to
> me.
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Andrew
> > > > > > > > >
> > > > > > > > > > On 11 Aug 2024, at 12:10, Lucas Brutschy <
> > > > lbruts...@confluent.io
> > > > > > .INVALID>
> > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > Hi Andrew/Lianet,
> > > > > > > > > >
> > > > > > > > > > I have added an administrative command-line tool
> (replacing
> > > > > > > > > > `kafka-streams-application-reset`) and extensions of the
> Admin
> > > > API
> > > > > > for
> > > > > > > > > > listing, deleting, describing groups and listing,
> altering and
> > > > > > > > > > deleting offsets for streams groups. No new RPCs have to
> be
> > > > added,
> > > > > > > > > > however, we duplicate some of the API in the admin
> client that
> > > > > > exist
> > > > > > > > > > for consumer groups. It seems to me cleaner to duplicate
> some
> > > > > > > > > > code/interface here, instead of using "consumer group"
> APIs for
> > > > > > > > > > streams groups, or renaming existing APIs that use
> > > > "consumerGroup"
> > > > > > in
> > > > > > > > > > the name to something more generic (which wouldn't cover
> share
> > > > > > > > > > groups).
> > > > > > > > > >
> > > > > > > > > > I think for now, all comments are addressed.
> > > > > > > > > >
> > > > > > > > > > Cheers,
> > > > > > > > > > Lucas
> > > > > > > > > >
> > > > > > > > > > On Tue, Aug 6, 2024 at 3:19 PM Lucas Brutschy <
> > > > > > lbruts...@confluent.io>
> > > > > > > > wrote:
> > > > > > > > > >>
> > > > > > > > > >> Hi Lianet and Andrew,
> > > > > > > > > >>
> > > > > > > > > >> LM1/LM2: You are right. The idea is to omit fields
> exactly in
> > > > the
> > > > > > same
> > > > > > > > > >> situations as in KIP-848. In the KIP, I stuck with how
> the
> > > > > > behavior
> > > > > > > > > >> was defined in KIP-848 (e.g. KIP-848 defined that that
> > > > instance ID
> > > > > > > > > >> will be omitted if it did not change since the last
> > > > heartbeat).
> > > > > > But
> > > > > > > > > >> you are correct that the implementation handles these
> details
> > > > > > slightly
> > > > > > > > > >> differently. I updated the KIP to match more closely the
> > > > behavior
> > > > > > of
> > > > > > > > > >> the KIP-848 implementation.
> > > > > > > > > >>
> > > > > > > > > >> LM9: Yes, there are several options to do this. The
> idea is to
> > > > > > have
> > > > > > > > > >> only one client initialize the topology, not all
> clients. It
> > > > seems
> > > > > > > > > >> easier to understand on the protocol level (otherwise
> we'd
> > > > have N
> > > > > > > > > >> topology initializations racing with a hard-to-determine
> > > > winner).
> > > > > > We
> > > > > > > > > >> also expect the payload of the request to grow in the
> future
> > > > and
> > > > > > want
> > > > > > > > > >> to avoid the overhead of having all clients sending the
> > > > topology
> > > > > > at
> > > > > > > > > >> the same time. But initializing the group could take
> some
> > > > time -
> > > > > > we
> > > > > > > > > >> have to create internal topics, and maybe a client is
> > > > > > malfunctioning
> > > > > > > > > >> and the initialization has to be retried. It seemed a
> bit
> > > > > > confusing to
> > > > > > > > > >> return errors to all other clients that are trying to
> join the
> > > > > > group
> > > > > > > > > >> during that time - as if there was a problem with
> joining the
> > > > > > group /
> > > > > > > > > >> the contents of the heartbeat. It seems cleaner to me
> to let
> > > > all
> > > > > > > > > >> clients successfully join the group and heartbeat, but
> remain
> > > > in
> > > > > > an
> > > > > > > > > >> INITIALIZING state which does not yet assign any tasks.
> Does
> > > > that
> > > > > > make
> > > > > > > > > >> sense to you? You are right that returning a retriable
> error
> > > > and
> > > > > > > > > >> having all clients retry until the group is initialized
> would
> > > > also
> > > > > > > > > >> work, it just doesn't model well that "everything is
> going
> > > > > > according
> > > > > > > > > >> to plan".
> > > > > > > > > >> As for the order of the calls - yes, I think it is fine
> to
> > > > allow
> > > > > > an
> > > > > > > > > >> Initialize RPC before the first heartbeat for supporting
> > > > future
> > > > > > admin
> > > > > > > > > >> tools. I made this change throughout the KIP, thanks!
> > > > > > > > > >>
> > > > > > > > > >> AS11: Yes, your understanding is correct. The number of
> tasks
> > > > for
> > > > > > one
> > > > > > > > > >> subtopology is the maximum number of partitions in any
> of the
> > > > > > matched
> > > > > > > > > >> topics. What will happen in Kafka Streams is that the
> > > > partitions
> > > > > > of
> > > > > > > > > >> the matched topics will effectively be merged during
> stream
> > > > > > > > > >> processing, so in your example, subtopology:0 would
> consume
> > > > from
> > > > > > AB:0
> > > > > > > > > >> and AC:0.
> > > > > > > > > >>
> > > > > > > > > >> Cheers,
> > > > > > > > > >> Lucas
> > > > > > > > > >>
> > > > > > > > > >> On Fri, Aug 2, 2024 at 9:47 PM Lianet M. <
> liane...@gmail.com>
> > > > > > wrote:
> > > > > > > > > >>>
> > > > > > > > > >>> Hi Bruno, answering your questions:
> > > > > > > > > >>>
> > > > > > > > > >>> About the full heartbeat (LM1): I just wanted to
> confirm that
> > > > > > you'll
> > > > > > > > be
> > > > > > > > > >>> sending full HBs in case of errors in general. It's
> not clear
> > > > > > from
> > > > > > > > the KIP,
> > > > > > > > > >>> since it referred to sending Id/epoch and whatever had
> > > > changed
> > > > > > since
> > > > > > > > the
> > > > > > > > > >>> last HB only. Sending full HB on error is key to
> ensure fresh
> > > > > > > > rejoins after
> > > > > > > > > >>> fencing for instance, and retries with all relevant
> info.
> > > > > > > > > >>>
> > > > > > > > > >>> About the instanceId (LM2): The instanceId is needed on
> > > > every HB
> > > > > > to
> > > > > > > > be able
> > > > > > > > > >>> to identify a member using one that is already taken.
> On
> > > > every
> > > > > > HB,
> > > > > > > > the
> > > > > > > > > >>> broker uses the instance id (if any) to retrieve the
> member
> > > > ID
> > > > > > > > associated
> > > > > > > > > >>> with it, and checks it against the memberId received
> in the
> > > > HB
> > > > > > > > > >>> (throwing UnreleasedInstance exception if needed). So
> > > > similar to
> > > > > > my
> > > > > > > > > >>> previous point, just wanted to confirm that we are
> > > > considering
> > > > > > that
> > > > > > > > here
> > > > > > > > > >>> too.
> > > > > > > > > >>>
> > > > > > > > > >>> Now some other thoughts:
> > > > > > > > > >>>
> > > > > > > > > >>> LM9: Definitely interesting imo if we can avoid the
> > > > dependency
> > > > > > > > between the
> > > > > > > > > >>> StreamsGroupInitialize and the StreamsGroupHeartbeat. I
> > > > totally
> > > > > > get
> > > > > > > > that
> > > > > > > > > >>> the initial client implementation will do a HB first,
> and
> > > > that's
> > > > > > > > fine, but
> > > > > > > > > >>> not having the flow enforced at the protocol level
> would
> > > > allow
> > > > > > for
> > > > > > > > further
> > > > > > > > > >>> improvement in the future (that initialize via admin
> idea you
> > > > > > > > mentioned,
> > > > > > > > > >>> for instance). Actually, I may be missing something
> about
> > > > the HB,
> > > > > > > > but if we
> > > > > > > > > >>> are at the point where HB requires that the topology
> has been
> > > > > > > > initialized,
> > > > > > > > > >>> and the topology init requires the group, why is it the
> > > > heartbeat
> > > > > > > > RPC the
> > > > > > > > > >>> one responsible for the group creation? (vs.
> > > > > > StreamsGroupInitialize
> > > > > > > > creates
> > > > > > > > > >>> group if needed + HB just fails if topology not
> initialized)
> > > > > > > > > >>>
> > > > > > > > > >>> Thanks!
> > > > > > > > > >>>
> > > > > > > > > >>> Lianet
> > > > > > > > > >>> (I didn't miss your answer on my INVALID_GROUP_TYPE
> proposal,
> > > > > > just
> > > > > > > > still
> > > > > > > > > >>> thinking about it in sync with the same discussion
> we're
> > > > having
> > > > > > on
> > > > > > > > the
> > > > > > > > > >>> KIP-1043 thread...I'll come back on that)
> > > > > > > > > >>>
> > > > > > > > > >>> On Thu, Aug 1, 2024 at 10:55 AM Andrew Schofield <
> > > > > > > > andrew_schofi...@live.com>
> > > > > > > > > >>> wrote:
> > > > > > > > > >>>
> > > > > > > > > >>>> Hi Bruno,
> > > > > > > > > >>>> Thanks for adding the detail on the schemas on records
> > > > written
> > > > > > to
> > > > > > > > > >>>> __consumer_offsets.
> > > > > > > > > >>>> I’ve reviewed them in detail and they look good to
> me. I
> > > > have
> > > > > > one
> > > > > > > > naive
> > > > > > > > > >>>> question.
> > > > > > > > > >>>>
> > > > > > > > > >>>> AS11: I notice that an assignment is essentially a
> set of
> > > > > > partition
> > > > > > > > > >>>> indices for
> > > > > > > > > >>>> subtopologies. Since a subtopology can be defined by a
> > > > source
> > > > > > topic
> > > > > > > > regex,
> > > > > > > > > >>>> does
> > > > > > > > > >>>> this mean that an assignment gives the same set of
> partition
> > > > > > > > indices for
> > > > > > > > > >>>> all topics
> > > > > > > > > >>>> which happen to match the regex? So, a subtopology
> reading
> > > > from
> > > > > > A*
> > > > > > > > that
> > > > > > > > > >>>> matches
> > > > > > > > > >>>> AB and AC would give the same set of partitions to
> each
> > > > task for
> > > > > > > > both
> > > > > > > > > >>>> topics, and
> > > > > > > > > >>>> is not able to give AB:0 to one task and AC:0 to a
> different
> > > > > > task.
> > > > > > > > Is this
> > > > > > > > > >>>> correct?
> > > > > > > > > >>>>
> > > > > > > > > >>>> Thanks,
> > > > > > > > > >>>> Andrew
> > > > > > > > > >>>>
> > > > > > > > > >>>>> On 23 Jul 2024, at 16:16, Bruno Cadonna <
> > > > cado...@apache.org>
> > > > > > > > wrote:
> > > > > > > > > >>>>>
> > > > > > > > > >>>>> Hi Lianet,
> > > > > > > > > >>>>>
> > > > > > > > > >>>>> Thanks for the review!
> > > > > > > > > >>>>>
> > > > > > > > > >>>>> Here my answers:
> > > > > > > > > >>>>>
> > > > > > > > > >>>>> LM1. Is your question whether we need to send a full
> > > > heartbeat
> > > > > > > > each time
> > > > > > > > > >>>> the member re-joins the group even if the information
> in
> > > > the RPC
> > > > > > > > did not
> > > > > > > > > >>>> change since the last heartbeat?
> > > > > > > > > >>>>>
> > > > > > > > > >>>>> LM2. Is the reason for sending the instance ID each
> time
> > > > that a
> > > > > > > > member
> > > > > > > > > >>>> could shutdown, change the instance ID and then start
> and
> > > > > > heartbeat
> > > > > > > > again,
> > > > > > > > > >>>> but the group coordinator would never notice that the
> > > > instance
> > > > > > ID
> > > > > > > > changed?
> > > > > > > > > >>>>>
> > > > > > > > > >>>>> LM3. I see your point. I am wondering whether this
> > > > additional
> > > > > > > > > >>>> information is worth the dependency between the group
> > > > types. To
> > > > > > > > return
> > > > > > > > > >>>> INVALID_GROUP_TYPE, the group coordinator needs to
> know
> > > > that a
> > > > > > > > group ID
> > > > > > > > > >>>> exists with a different group type. With a group
> > > > coordinator as
> > > > > > we
> > > > > > > > have it
> > > > > > > > > >>>> now in Apache Kafka that manages all group types,
> that is
> > > > not a
> > > > > > big
> > > > > > > > deal,
> > > > > > > > > >>>> but imagine if we (or some implementation of the
> Apache
> > > > Kafka
> > > > > > > > protocol)
> > > > > > > > > >>>> decide to have a separate group coordinator for each
> group
> > > > type.
> > > > > > > > > >>>>>
> > > > > > > > > >>>>> LM4. Using INVALID_GROUP_ID if the group ID is empty
> makes
> > > > > > sense
> > > > > > > > to me.
> > > > > > > > > >>>> I going to change that.
> > > > > > > > > >>>>>
> > > > > > > > > >>>>> LM5. I think there is a dependency from the
> > > > > > StreamsGroupInitialize
> > > > > > > > RPC
> > > > > > > > > >>>> to the heartbeat. The group must exist when the
> initialize
> > > > RPC
> > > > > > is
> > > > > > > > received
> > > > > > > > > >>>> by the group coordinator. The group is created by the
> > > > heartbeat
> > > > > > > > RPC. I
> > > > > > > > > >>>> would be in favor of making the initialize RPC
> independent
> > > > from
> > > > > > the
> > > > > > > > > >>>> heartbeat RPC. That would allow to initialize a
> streams
> > > > group
> > > > > > > > explicitly
> > > > > > > > > >>>> with an admin tool.
> > > > > > > > > >>>>>
> > > > > > > > > >>>>> LM6. I think it affects streams and streams should
> behave
> > > > as
> > > > > > the
> > > > > > > > > >>>> consumer group.
> > > > > > > > > >>>>>
> > > > > > > > > >>>>> LM7. Good point that we will consider.
> > > > > > > > > >>>>>
> > > > > > > > > >>>>> LM8. Fixed! Thanks!
> > > > > > > > > >>>>>
> > > > > > > > > >>>>>
> > > > > > > > > >>>>> Best,
> > > > > > > > > >>>>> Bruno
> > > > > > > > > >>>>>
> > > > > > > > > >>>>>
> > > > > > > > > >>>>>
> > > > > > > > > >>>>>
> > > > > > > > > >>>>> On 7/19/24 9:53 PM, Lianet M. wrote:
> > > > > > > > > >>>>>> Hi Lucas/Bruno, thanks for the great KIP! First
> comments:
> > > > > > > > > >>>>>> LM1. Related to where the KIP says:  *“Group ID,
> member
> > > > ID,
> > > > > > > > member epoch
> > > > > > > > > >>>>>> are sent with each heartbeat request. Any other
> > > > information
> > > > > > that
> > > > > > > > has not
> > > > > > > > > >>>>>> changed since the last heartbeat can be omitted.”.
> *I
> > > > expect
> > > > > > all
> > > > > > > > the
> > > > > > > > > >>>> other
> > > > > > > > > >>>>>> info also needs to be sent whenever a full
> heartbeat is
> > > > > > required
> > > > > > > > (even
> > > > > > > > > >>>> if
> > > > > > > > > >>>>>> it didn’t change from the last heartbeat), ex. on
> fencing
> > > > > > > > scenarios,
> > > > > > > > > >>>>>> correct?
> > > > > > > > > >>>>>> LM2. For consumer groups we always send the
> > > > groupInstanceId
> > > > > > (if
> > > > > > > > any) as
> > > > > > > > > >>>>>> part of every heartbeat, along with memberId, epoch
> and
> > > > > > groupId.
> > > > > > > > Should
> > > > > > > > > >>>> we
> > > > > > > > > >>>>>> consider that too here?
> > > > > > > > > >>>>>> LM3. We’re proposing returning a GROUP_ID_NOT_FOUND
> error
> > > > in
> > > > > > > > response to
> > > > > > > > > >>>>>> the stream-specific RPCs if the groupId is
> associated
> > > > with a
> > > > > > > > group type
> > > > > > > > > >>>>>> that is not streams (ie. consumer group or share
> group). I
> > > > > > wonder
> > > > > > > > if at
> > > > > > > > > >>>>>> this point, where we're getting several new group
> types
> > > > added,
> > > > > > > > each with
> > > > > > > > > >>>>>> RPCs that are supposed to include groupId of a
> certain
> > > > type,
> > > > > > we
> > > > > > > > should
> > > > > > > > > >>>> be
> > > > > > > > > >>>>>> more explicit about this situation. Maybe a kind of
> > > > > > > > INVALID_GROUP_TYPE
> > > > > > > > > >>>>>> (group exists but not with a valid type for this
> RPC) vs a
> > > > > > > > > >>>>>> GROUP_ID_NOT_FOUND (group does not exist).  Those
> errors
> > > > > > would be
> > > > > > > > > >>>>>> consistently used across consumer, share, and
> streams RPCs
> > > > > > > > whenever the
> > > > > > > > > >>>>>> group id is not of the expected type.
> > > > > > > > > >>>>>> This is truly not specific to this KIP, and should
> be
> > > > > > addressed
> > > > > > > > with all
> > > > > > > > > >>>>>> group types and their RPCs in mind. I just wanted
> to bring
> > > > > > out my
> > > > > > > > > >>>> concern
> > > > > > > > > >>>>>> and get thoughts around it.
> > > > > > > > > >>>>>> LM4. On a related note, StreamsGroupDescribe returns
> > > > > > > > INVALID_REQUEST if
> > > > > > > > > >>>>>> groupId is empty. There is already an
> INVALID_GROUP_ID
> > > > error,
> > > > > > > > that seems
> > > > > > > > > >>>>>> more specific to this situation. Error handling of
> > > > specific
> > > > > > > > errors would
> > > > > > > > > >>>>>> definitely be easier than having to deal with a
> generic
> > > > > > > > INVALID_REQUEST
> > > > > > > > > >>>>>> (and probably its custom message). I know that for
> > > > KIP-848 we
> > > > > > have
> > > > > > > > > >>>>>> INVALID_REQUEST for similar situations, so if ever
> we take
> > > > > > down
> > > > > > > > this
> > > > > > > > > >>>> path
> > > > > > > > > >>>>>> we should review it there too for consistency.
> Thoughts?
> > > > > > > > > >>>>>> LM5. The dependency between the
> StreamsGroupHeartbeat RPC
> > > > and
> > > > > > the
> > > > > > > > > >>>>>> StreamsGroupInitialize RPC is one-way only right? HB
> > > > requires
> > > > > > a
> > > > > > > > previous
> > > > > > > > > >>>>>> StreamsGroupInitialize request, but
> StreamsGroupInitialize
> > > > > > > > processing is
> > > > > > > > > >>>>>> totally independent of heartbeats (and could
> perfectly be
> > > > > > > > processed
> > > > > > > > > >>>> without
> > > > > > > > > >>>>>> a previous HB, even though the client
> implementation we’re
> > > > > > > > proposing
> > > > > > > > > >>>> won’t
> > > > > > > > > >>>>>> go down that path). Is my understanding correct?
> Just to
> > > > > > double
> > > > > > > > check,
> > > > > > > > > >>>>>> seems sensible like that at the protocol level.
> > > > > > > > > >>>>>> LM6. With KIP-848, there is an important
> improvement that
> > > > > > brings a
> > > > > > > > > >>>>>> difference in behaviour around the static
> membership:
> > > > with the
> > > > > > > > classic
> > > > > > > > > >>>>>> protocol, if a static member joins with a group
> instance
> > > > > > already
> > > > > > > > in
> > > > > > > > > >>>> use, it
> > > > > > > > > >>>>>> makes the initial member fail with a
> FENCED_INSTANCED_ID
> > > > > > > > exception, vs.
> > > > > > > > > >>>>>> with the new consumer group protocol, the second
> member
> > > > > > trying to
> > > > > > > > join
> > > > > > > > > >>>>>> fails with an UNRELEASED_INSTANCE_ID. Does this
> change
> > > > need
> > > > > > to be
> > > > > > > > > >>>>>> considered in any way for the streams app? (I'm not
> > > > familiar
> > > > > > with
> > > > > > > > KS
> > > > > > > > > >>>> yet,
> > > > > > > > > >>>>>> but thought it was worth asking. If it doesn't
> affect in
> > > > any
> > > > > > way,
> > > > > > > > still
> > > > > > > > > >>>>>> maybe helpful to call it out on a section for static
> > > > > > membership)
> > > > > > > > > >>>>>> LM7. Regarding the admin tool to manage streams
> groups.
> > > > We can
> > > > > > > > discuss
> > > > > > > > > >>>>>> whether to have it here or separately, but I think
> we
> > > > should
> > > > > > aim
> > > > > > > > for
> > > > > > > > > >>>> some
> > > > > > > > > >>>>>> basic admin capabilities from the start, mainly
> because I
> > > > > > believe
> > > > > > > > it
> > > > > > > > > >>>> will
> > > > > > > > > >>>>>> be very helpful/needed in practice during the impl
> of the
> > > > KIP.
> > > > > > > > From
> > > > > > > > > >>>>>> experience with KIP-848, we felt a bit blindfolded
> in the
> > > > > > initial
> > > > > > > > phase
> > > > > > > > > >>>>>> where we still didn't have kafka-consumer-groups
> dealing
> > > > with
> > > > > > the
> > > > > > > > new
> > > > > > > > > >>>>>> groups (and then it was very helpful and used when
> we were
> > > > > > able to
> > > > > > > > > >>>> easily
> > > > > > > > > >>>>>> inspect them from the console)
> > > > > > > > > >>>>>> LM8. nit: the links the KIP-848 are not quite right
> > > > (pointing
> > > > > > to
> > > > > > > > an
> > > > > > > > > >>>>>> unrelated “Future work section” at the end of
> KIP-848)
> > > > > > > > > >>>>>> Thanks!
> > > > > > > > > >>>>>> Lianet
> > > > > > > > > >>>>>> On Fri, Jul 19, 2024 at 11:13 AM Lucas Brutschy
> > > > > > > > > >>>>>> <lbruts...@confluent.io.invalid> wrote:
> > > > > > > > > >>>>>>> Hi Andrew,
> > > > > > > > > >>>>>>>
> > > > > > > > > >>>>>>> AS2: I added a note for now. If others feel
> strongly
> > > > about
> > > > > > it,
> > > > > > > > we can
> > > > > > > > > >>>>>>> still add more administrative tools to the KIP - it
> > > > should
> > > > > > not
> > > > > > > > change
> > > > > > > > > >>>>>>> the overall story significantly.
> > > > > > > > > >>>>>>>
> > > > > > > > > >>>>>>> AS8: "streams.group.assignor.name" sounds good to
> me to
> > > > > > > > distinguish
> > > > > > > > > >>>>>>> the config from class names. Not sure if I like the
> > > > > > "default".
> > > > > > > > To be
> > > > > > > > > >>>>>>> consistent, we'd then have to call it
> > > > > > > > > >>>>>>> `group.streams.default.session.timeout.ms` as
> well. I
> > > > only
> > > > > > > > added the
> > > > > > > > > >>>>>>> `.name` on both broker and group level for now.
> > > > > > > > > >>>>>>>
> > > > > > > > > >>>>>>> AS10: Ah, I misread your comment, now I know what
> you
> > > > meant.
> > > > > > Good
> > > > > > > > > >>>>>>> point, fixed (by Bruno).
> > > > > > > > > >>>>>>>
> > > > > > > > > >>>>>>> Cheers,
> > > > > > > > > >>>>>>> Lucas
> > > > > > > > > >>>>>>>
> > > > > > > > > >>>>>>> On Fri, Jul 19, 2024 at 4:44 PM Andrew Schofield
> > > > > > > > > >>>>>>> <andrew_schofi...@live.com> wrote:
> > > > > > > > > >>>>>>>>
> > > > > > > > > >>>>>>>> Hi Lucas,
> > > > > > > > > >>>>>>>> I see that I hit send too quickly. One more
> comment:
> > > > > > > > > >>>>>>>>
> > > > > > > > > >>>>>>>> AS2: I think stating that there will be a
> > > > > > > > `kafka-streams-group.sh` in
> > > > > > > > > >>>> a
> > > > > > > > > >>>>>>>> future KIP is fine to keep this KIP focused.
> > > > Personally, I
> > > > > > would
> > > > > > > > > >>>> probably
> > > > > > > > > >>>>>>>> put all of the gory details in this KIP, but then
> it’s
> > > > not
> > > > > > my
> > > > > > > > KIP. A
> > > > > > > > > >>>>>>> future
> > > > > > > > > >>>>>>>> pointer is fine too.
> > > > > > > > > >>>>>>>>
> > > > > > > > > >>>>>>>> Thanks,
> > > > > > > > > >>>>>>>> Andrew
> > > > > > > > > >>>>>>>>
> > > > > > > > > >>>>>>>>
> > > > > > > > > >>>>>>>>> On 19 Jul 2024, at 13:46, Lucas Brutschy <
> > > > > > > > lbruts...@confluent.io
> > > > > > > > > >>>> .INVALID>
> > > > > > > > > >>>>>>> wrote:
> > > > > > > > > >>>>>>>>>
> > > > > > > > > >>>>>>>>> Hi Andrew,
> > > > > > > > > >>>>>>>>>
> > > > > > > > > >>>>>>>>> thanks for getting the discussion going! Here
> are my
> > > > > > responses.
> > > > > > > > > >>>>>>>>>
> > > > > > > > > >>>>>>>>> AS1: Good point, done.
> > > > > > > > > >>>>>>>>>
> > > > > > > > > >>>>>>>>> AS2: We were planning to add more administrative
> tools
> > > > to
> > > > > > the
> > > > > > > > > >>>>>>>>> interface in a follow-up KIP, to not make this
> KIP too
> > > > > > large.
> > > > > > > > If
> > > > > > > > > >>>>>>>>> people think that it would help to understand the
> > > > overall
> > > > > > > > picture if
> > > > > > > > > >>>>>>>>> we already add something like
> > > > `kafka-streams-groups.sh`, we
> > > > > > > > will do
> > > > > > > > > >>>>>>>>> that. I also agree that we should address how
> this
> > > > relates
> > > > > > to
> > > > > > > > > >>>>>>>>> KIP-1043, we'll add it.
> > > > > > > > > >>>>>>>>>
> > > > > > > > > >>>>>>>>> AS3: Good idea, that's more consistent with
> > > > `assigning` and
> > > > > > > > > >>>>>>> `reconciling` etc.
> > > > > > > > > >>>>>>>>>
> > > > > > > > > >>>>>>>>> AS4: Thanks, Fixed.
> > > > > > > > > >>>>>>>>>
> > > > > > > > > >>>>>>>>> AS5: Good catch. This was supposed to mean that
> we
> > > > require
> > > > > > > > CREATE on
> > > > > > > > > >>>>>>>>> cluster or CREATE on all topics, not both. Fixed.
> > > > > > > > > >>>>>>>>>
> > > > > > > > > >>>>>>>>> AS6: Thanks, Fixed.
> > > > > > > > > >>>>>>>>>
> > > > > > > > > >>>>>>>>> AS7. Thanks, Fixed.
> > > > > > > > > >>>>>>>>>
> > > > > > > > > >>>>>>>>> AS8: I think this works a bit different in this
> KIP
> > > > than in
> > > > > > > > consumer
> > > > > > > > > >>>>>>>>> groups. KIP-848 lets the members vote for a
> preferred
> > > > > > > > assignor, and
> > > > > > > > > >>>>>>>>> the broker-side assignor is picked by majority
> vote.
> > > > The
> > > > > > > > > >>>>>>>>> `group.consumer.assignors` specifies the list of
> > > > assignors
> > > > > > > > that are
> > > > > > > > > >>>>>>>>> supported on the broker, and is configurable
> because
> > > > the
> > > > > > > > interface is
> > > > > > > > > >>>>>>>>> pluggable. In this KIP, the task assignor is not
> voted
> > > > on
> > > > > > by
> > > > > > > > members
> > > > > > > > > >>>>>>>>> but configured on the broker-side.
> > > > > > `group.streams.assignor` is
> > > > > > > > used
> > > > > > > > > >>>>>>>>> for this, and uses a specific name. If we'll
> make the
> > > > task
> > > > > > > > assignor
> > > > > > > > > >>>>>>>>> pluggable on the broker-side, we'd introduce a
> separate
> > > > > > config
> > > > > > > > > >>>>>>>>> `group.streams.assignors`, which would indeed be
> a
> > > > list of
> > > > > > > > class
> > > > > > > > > >>>>>>>>> names. I think there is no conflict here, the two
> > > > > > > > configurations
> > > > > > > > > >>>> serve
> > > > > > > > > >>>>>>>>> different purposes.  The only gripe I'd have
> here is
> > > > > > naming as
> > > > > > > > > >>>>>>>>> `group.streams.assignor` and
> `group.streams.assignors`
> > > > > > would
> > > > > > > > be a bit
> > > > > > > > > >>>>>>>>> similar, but I cannot really think of a better
> name for
> > > > > > > > > >>>>>>>>> `group.streams.assignor`, so I'd probably rather
> > > > introduce
> > > > > > > > > >>>>>>>>> `group.streams.assignors`  as
> > > > > > > > `group.streams.possible_assignors`  or
> > > > > > > > > >>>>>>>>> something like that.
> > > > > > > > > >>>>>>>>>
> > > > > > > > > >>>>>>>>> AS9: I added explanations for the various record
> types.
> > > > > > Apart
> > > > > > > > from
> > > > > > > > > >>>> the
> > > > > > > > > >>>>>>>>> new topology record, and the partition metadata
> (which
> > > > is
> > > > > > > > based on
> > > > > > > > > >>>> the
> > > > > > > > > >>>>>>>>> topology and can only be created once we have a
> > > > topology
> > > > > > > > initialized)
> > > > > > > > > >>>>>>>>> the lifecycle for the records is basically
> identical
> > > > as in
> > > > > > > > KIP-848.
> > > > > > > > > >>>>>>>>>
> > > > > > > > > >>>>>>>>> AS10: In the consumer offset topic, the version
> in the
> > > > key
> > > > > > is
> > > > > > > > used to
> > > > > > > > > >>>>>>>>> differentiate different schema types with the
> same
> > > > > > content. So
> > > > > > > > the
> > > > > > > > > >>>>>>>>> keys are not versioned, but the version field is
> > > > "abused"
> > > > > > as a
> > > > > > > > type
> > > > > > > > > >>>>>>>>> tag. This is the same in KIP-848, we followed it
> for
> > > > > > > > consistency.
> > > > > > > > > >>>>>>>>>
> > > > > > > > > >>>>>>>>> Cheers,
> > > > > > > > > >>>>>>>>> Lucas
> > > > > > > > > >>>>>>>>>
> > > > > > > > > >>>>>>>>>
> > > > > > > > > >>>>>>>>> On Thu, Jul 18, 2024 at 1:27 PM Andrew Schofield
> > > > > > > > > >>>>>>>>> <andrew_schofi...@live.com> wrote:
> > > > > > > > > >>>>>>>>>>
> > > > > > > > > >>>>>>>>>> Hi Lucas and Bruno,
> > > > > > > > > >>>>>>>>>>
> > > > > > > > > >>>>>>>>>> Thanks for the great KIP.
> > > > > > > > > >>>>>>>>>>
> > > > > > > > > >>>>>>>>>> I've read through the document and have some
> initial
> > > > > > comments.
> > > > > > > > > >>>>>>>>>>
> > > > > > > > > >>>>>>>>>> AS1: I suppose that there is a new
> > > > > > > > o.a.k.common.GroupType.STREAMS
> > > > > > > > > >>>>>>> enumeration
> > > > > > > > > >>>>>>>>>> constant. This is a change to the public
> interface and
> > > > > > should
> > > > > > > > be
> > > > > > > > > >>>>>>> called out.
> > > > > > > > > >>>>>>>>>>
> > > > > > > > > >>>>>>>>>> AS2: Since streams groups are no longer consumer
> > > > groups,
> > > > > > how
> > > > > > > > does
> > > > > > > > > >>>> the
> > > > > > > > > >>>>>>> user
> > > > > > > > > >>>>>>>>>> manipulate them, observe lag and so on? Will
> you add
> > > > > > > > > >>>>>>> `kafka-streams-groups.sh`
> > > > > > > > > >>>>>>>>>> or extend `kafka-streams-application-reset.sh`?
> Of
> > > > course,
> > > > > > > > KIP-1043
> > > > > > > > > >>>>>>> can easily
> > > > > > > > > >>>>>>>>>> be extended to support streams groups, but that
> only
> > > > lets
> > > > > > the
> > > > > > > > user
> > > > > > > > > >>>>>>> see the
> > > > > > > > > >>>>>>>>>> groups, not manipulate them.
> > > > > > > > > >>>>>>>>>>
> > > > > > > > > >>>>>>>>>> AS3: I wonder whether the streams group state of
> > > > > > > > UNINITIALIZED would
> > > > > > > > > >>>>>>> be
> > > > > > > > > >>>>>>>>>> better expressed as INITIALIZING.
> > > > > > > > > >>>>>>>>>>
> > > > > > > > > >>>>>>>>>> AS4: In StreamsGroupInitializeRequest,
> > > > > > > > Topology[].SourceTopicRegex
> > > > > > > > > >>>>>>> should
> > > > > > > > > >>>>>>>>>> be nullable.
> > > > > > > > > >>>>>>>>>>
> > > > > > > > > >>>>>>>>>> AS5: Why does StreamsGroupInitialize require
> CREATE
> > > > > > > > permission on
> > > > > > > > > >>>> the
> > > > > > > > > >>>>>>>>>> cluster resource? I imagine that this is one of
> the
> > > > ways
> > > > > > that
> > > > > > > > the
> > > > > > > > > >>>>>>> request might
> > > > > > > > > >>>>>>>>>> be granted permission to create the
> > > > StateChangelogTopics
> > > > > > and
> > > > > > > > > >>>>>>>>>> RepartitionSourceTopics, but if it is granted
> > > > permission
> > > > > > to
> > > > > > > > create
> > > > > > > > > >>>>>>> those topics
> > > > > > > > > >>>>>>>>>> with specific ACLs, would CREATE on the cluster
> > > > resource
> > > > > > > > still be
> > > > > > > > > >>>>>>> required?
> > > > > > > > > >>>>>>>>>>
> > > > > > > > > >>>>>>>>>> AS6: StreamsGroupInitialize can also fail with
> > > > > > > > > >>>>>>> TOPIC_AUTHORIZATION_FAILED
> > > > > > > > > >>>>>>>>>> and (subject to AS5)
> CLUSTER_AUTHORIZATION_FAILED.
> > > > > > > > > >>>>>>>>>>
> > > > > > > > > >>>>>>>>>> AS7: A tiny nit. You've used TopologyID
> (capitals) in
> > > > > > > > > >>>>>>> StreamsGroupHeartbeatRequest
> > > > > > > > > >>>>>>>>>> and a few others, but in all other cases the
> fields
> > > > which
> > > > > > are
> > > > > > > > ids
> > > > > > > > > >>>> are
> > > > > > > > > >>>>>>> spelled Id.
> > > > > > > > > >>>>>>>>>> I suggest TopologyId.
> > > > > > > > > >>>>>>>>>>
> > > > > > > > > >>>>>>>>>> Also, "interal" is probably meant to be
> "interval”.
> > > > > > > > > >>>>>>>>>>
> > > > > > > > > >>>>>>>>>> AS8: For consumer groups, the
> > > > `group.consumer.assignors`
> > > > > > > > > >>>>>>> configuration is a
> > > > > > > > > >>>>>>>>>> list of class names. The assignors do have
> names too,
> > > > but
> > > > > > the
> > > > > > > > > >>>>>>> configuration which
> > > > > > > > > >>>>>>>>>> enables them is in terms of class names. I
> wonder
> > > > whether
> > > > > > the
> > > > > > > > > >>>> broker’s
> > > > > > > > > >>>>>>>>>> group.streams.assignor could actually be
> > > > > > > > `group.streams.assignors`
> > > > > > > > > >>>>>>> and specified
> > > > > > > > > >>>>>>>>>> as a list of the class names of the supplied
> > > > assignors. I
> > > > > > know
> > > > > > > > > >>>> you're
> > > > > > > > > >>>>>>> not supporting
> > > > > > > > > >>>>>>>>>> other assignors yet, but when you do, I expect
> you
> > > > would
> > > > > > > > prefer to
> > > > > > > > > >>>>>>> have used class
> > > > > > > > > >>>>>>>>>> names from the start.
> > > > > > > > > >>>>>>>>>>
> > > > > > > > > >>>>>>>>>> The use of assignor names in the other places
> looks
> > > > good
> > > > > > to
> > > > > > > > me.
> > > > > > > > > >>>>>>>>>>
> > > > > > > > > >>>>>>>>>> AS9: I'd find it really helpful to have a bit
> of a
> > > > > > > > description about
> > > > > > > > > >>>>>>> the purpose and
> > > > > > > > > >>>>>>>>>> lifecycle of the 9 record types you've
> introduced on
> > > > the
> > > > > > > > > >>>>>>> __consumer_offsets topic.
> > > > > > > > > >>>>>>>>>> I did a cursory review but without really
> > > > understanding
> > > > > > what's
> > > > > > > > > >>>>>>> written when,
> > > > > > > > > >>>>>>>>>> I can't do a thorough job of reviewing.
> > > > > > > > > >>>>>>>>>>
> > > > > > > > > >>>>>>>>>> AS10: In the definitions of the record keys,
> such as
> > > > > > > > > >>>>>>>>>> StreamsGroupCurrentMemberAssignmentKey, the
> versions
> > > > of
> > > > > > the
> > > > > > > > fields
> > > > > > > > > >>>>>>> must
> > > > > > > > > >>>>>>>>>> match the versions of the types.
> > > > > > > > > >>>>>>>>>>
> > > > > > > > > >>>>>>>>>> Thanks,
> > > > > > > > > >>>>>>>>>> Andrew
> > > > > > > > > >>>>>>>>>>
> > > > > > > > > >>>>>>>>>>> On 12 Jul 2024, at 09:04, Lucas Brutschy <
> > > > > > > > lbruts...@confluent.io
> > > > > > > > > >>>> .INVALID>
> > > > > > > > > >>>>>>> wrote:
> > > > > > > > > >>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>> Hi all,
> > > > > > > > > >>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>> I would like to start a discussion thread on
> > > > KIP-1071:
> > > > > > > > Streams
> > > > > > > > > >>>>>>>>>>> Rebalance Protocol. With this KIP, we aim to
> bring
> > > > the
> > > > > > > > principles
> > > > > > > > > >>>>>>> laid
> > > > > > > > > >>>>>>>>>>> down by KIP-848 to Kafka Streams, to make
> rebalances
> > > > more
> > > > > > > > reliable
> > > > > > > > > >>>>>>> and
> > > > > > > > > >>>>>>>>>>> scalable, and make Kafka Streams overall
> easier to
> > > > > > deploy and
> > > > > > > > > >>>>>>> operate.
> > > > > > > > > >>>>>>>>>>> The KIP proposed moving the assignment logic
> to the
> > > > > > broker,
> > > > > > > > and
> > > > > > > > > >>>>>>>>>>> introducing a dedicated group type and
> dedicated
> > > > RPCs for
> > > > > > > > Kafka
> > > > > > > > > >>>>>>>>>>> Streams.
> > > > > > > > > >>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>> The KIP is here:
> > > > > > > > > >>>>>>>>>>>
> > > > > > > > > >>>>>>>
> > > > > > > > > >>>>
> > > > > > > >
> > > > > >
> > > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1071%3A+Streams+Rebalance+Protocol
> > > > > > > > > >>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>> This is joint work with Bruno Cadonna.
> > > > > > > > > >>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>> Please take a look and let us know what you
> think.
> > > > > > > > > >>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>> Best,
> > > > > > > > > >>>>>>>>>>> Lucas
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > >
> > > >
> > > >
> >
>
>

Reply via email to