Hi Guozhang,

thanks for clarifying. I think I understand better what you meant now,
However, my question remains - wouldn't that effectively make a
"rolling bounce" like an offline upgrade, if the application
effectively halts processing during the roll? I agree that could be
simpler, but it would also mean we cannot remain available even during
minor modifications to the topology - something that is possible
today.

w.r.t. a) - we'd still need to have an initialization step at some
point to tell the broker about the required internal topics etc.

Cheers,
Lucas

On Mon, Aug 19, 2024 at 5:19 PM Guozhang Wang
<guozhang.wang...@gmail.com> wrote:
>
> Hi Lucas,
>
> From the current description in section "Topology updates", my
> understanding is that a) coordinator will remember a topology ID as
> the group topology ID, which has to be initialized and agreed by
> everyone in the current generation; b) when forming a new generation,
> if some members has a topology ID which is different from the group
> topology ID that previous generation's members all agree, we will try
> to act smart by not assigning any new tasks to these members, will
> still give them old tasks (if any) that they own in previous
> generation, c) we allow clients to initialize a new topology Id.
>
> I'm feeling simply that the above complex logic may not be worth it
> (plus, what if some tasks no longer exist under the new topology ID
> etc, in all we need to consider quite some different corner cases).
> What if we just :
>
> a) do not have the "initialize topology" logic at all, and
> b) do not try to try to do assignment, including trying to give the
> ones with inconsistent IDs their previous tasks, etc; but simply
> c) in any generations, if not every member agrees on the same topology
> ID, simply do not perform new assignment, and return an warning code
> telling every client there are other peer's whose topology are
> different (of course it could be because of a rolling bounce, so no
> need to shout out as an ERROR but just WARN or even INFO), every
> client will just act as if there's no new assignment received. This is
> what I meant by "blocking the progress" since as we did not perform a
> new assignment, the new topology ID would not be accepted and hence in
> an rolling bounce upgrade case the new application's topology would
> not be executed. And if it keeps happening, operators would use
> DescribeStreamsGroup to ping down who are the culprits.
>
>
>
> On Mon, Aug 19, 2024 at 7:06 AM Lucas Brutschy
> <lbruts...@confluent.io.invalid> wrote:
> >
> > Hi Guozhang,
> >
> > Thanks for reviewing the KIP, your feedback is extremely valuable.
> >
> > I think your analysis is quite right  - we care about cases a) and b)
> > and I generally agree - we want the protocol to be simple and
> > debuggable. Situation a) should be relatively rare since in the common
> > case all streams applications run from the same jar/build, and we
> > shouldn't have zombies that don't update to a new topology. In this
> > case, it should just be easy to debug. In situation b), things should
> > "just work". And I think both are enabled by the KIP. In particular,
> > these situations should be relatively easy to debug:
> >
> >  - Using DescribeStreamsGroup, you can find out the topology ID of the
> > group and the topology ID of each member, to understand
> > inconsistencies.
> >  - Inconsistent clients and even the broker could log messages to
> > indicate the inconsistencies.
> >  - One could also consider exposing the number of clients by topology
> > IDs as a metric, to enhance observability (this is not in the KIP
> > yet).
> >
> > What I'm not sure about is, what you mean precisely by temporarily
> > blocking progress of the group? Do you propose to stop processing
> > altogether if topology IDs don't match - wouldn't that defy the aim of
> > doing a rolling bounce of the application (in case b)?
> >
> > Cheers,
> > Lucas
> >
> > On Mon, Aug 19, 2024 at 3:59 PM Lucas Brutschy <lbruts...@confluent.io> 
> > wrote:
> > >
> > > Hi Nick,
> > >
> > > NT4: As discussed, we will still require locking in the new protocol
> > > to avoid concurrent read/write access on the checkpoint file, at least
> > > as long as KIP-1035 hasn't landed. However, as you correctly pointed
> > > out, the assignor will have to accept offsets for overlapping sets of
> > > dormant tasks. I updated the KIP to make this explicit. If the
> > > corresponding offset information for one task conflicts between
> > > clients (which can happen), the conflict is resolved by taking the
> > > maximum of the offsets.
> > >
> > > Cheers,
> > > Lucas
> > >
> > > On Fri, Aug 16, 2024 at 7:14 PM Guozhang Wang
> > > <guozhang.wang...@gmail.com> wrote:
> > > >
> > > > Hello Lucas,
> > > >
> > > > Thanks for the great KIP. I've read it through and it looks good to
> > > > me. As we've discussed, much of my thoughts would be outside the scope
> > > > of this very well scoped and defined KIP, so I will omit them for now.
> > > >
> > > > The only one I had related to this KIP is about topology updating. I
> > > > understand the motivation of the proposal is that basically since each
> > > > time group forming a (new) generation may potentially accept not all
> > > > of the members joining because of the timing of the RPCs, the group's
> > > > topology ID may be not reflecting the "actual" most recent topologies
> > > > if some zombie members holding an old topology form a group generation
> > > > quickly enough, which would effectively mean that zombie members
> > > > actually blocking other real members from getting tasks assigned. On
> > > > the other hand, like you've mentioned already in the doc, requesting
> > > > some sort of ID ordering by pushing the burden on the user's side
> > > > would also be too much for users, increasing the risk of human errors
> > > > in operations.
> > > >
> > > > I'm wondering if instead of trying to be smart programmingly, we just
> > > > let the protocol to act dumbly (details below). The main reasons I had
> > > > in mind are:
> > > >
> > > > 1) Upon topology changes, some tasks may no longer exist in the new
> > > > topology, so still letting them execute on the clients which do not
> > > > yet have the new topology would waste resources.
> > > >
> > > > 2) As we discussed, trying to act smart introduces more complexities
> > > > in the coordinator that tries to balance different assignment goals
> > > > between stickiness, balance, and now topology mis-matches between
> > > > clients.
> > > >
> > > > 3) Scenarios that mismatching topologies be observed within a group 
> > > > generation:
> > > >    a. Zombie / old clients that do not have the new topology, and will
> > > > never have.
> > > >    b. During a rolling bounce upgrade, where not-yet-bounced clients
> > > > would not yet have the new topology.
> > > >    c. Let's assume we would not ever have scenarios where users want
> > > > to intentionally have a subset of clients within a group running a
> > > > partial / subset of the full sub-topologies, since such cases can well
> > > > be covered by a custom assignor that takes into those considerations
> > > > by never assigning some tasks to some clients etc. That means, the
> > > > only scenarios we would need to consider are a) and b).
> > > >
> > > > For b), I think it's actually okay to temporarily block the progress
> > > > of the group until everyone is bounced with the updated topology; as
> > > > for a), originally I thought having one or a few clients blocking the
> > > > whole group would be a big problem, but now that I think more, I felt
> > > > from the operations point of view, just letting the app being blocked
> > > > with a informational log entry to quickly ping-down the zombie clients
> > > > may actually be acceptable. All in all, it makes the code simpler
> > > > programmingly by not trying to abstract away issue scenario a) from
> > > > the users (or operators) but letting them know asap.
> > > >
> > > > ----------
> > > >
> > > > Other than that, everything else looks good to me.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Fri, Aug 16, 2024 at 7:38 AM Nick Telford <nick.telf...@gmail.com> 
> > > > wrote:
> > > > >
> > > > > Hi Lucas,
> > > > >
> > > > > NT4.
> > > > > Given that the new assignment procedure guarantees that a Task has 
> > > > > been
> > > > > closed before it is assigned to a different client, I don't think 
> > > > > there
> > > > > should be a problem with concurrent access? I don't think we should 
> > > > > worry
> > > > > too much about 1035 here, as it's orthogonal to 1071. I don't think 
> > > > > that
> > > > > 1035 *requires* the locking, and indeed once 1071 is the only 
> > > > > assignment
> > > > > mechanism, we should be able to do away with the locking completely (I
> > > > > think).
> > > > >
> > > > > Anyway, given your point about it not being possible to guarantee 
> > > > > disjoint
> > > > > sets, does it make sense to require clients to continue to supply the 
> > > > > lags
> > > > > for only a subset of the dormant Tasks on-disk? Wouldn't it be 
> > > > > simpler to
> > > > > just have them supply everything, since the assignor has to handle
> > > > > overlapping sets anyway?
> > > > >
> > > > > Cheers,
> > > > > Nick
> > > > >
> > > > > On Fri, 16 Aug 2024 at 13:51, Lucas Brutschy 
> > > > > <lbruts...@confluent.io.invalid>
> > > > > wrote:
> > > > >
> > > > > > Hi Nick,
> > > > > >
> > > > > > NT4. I think it will be hard anyway to ensure that the assignor 
> > > > > > always
> > > > > > gets disjoint sets (there is no synchronized rebalance point 
> > > > > > anymore,
> > > > > > so locks wouldn't prevent two clients reporting the same dormant
> > > > > > task). So I think we'll have to lift this restriction. I was 
> > > > > > thinking
> > > > > > more that locking is required to prevent concurrent access. In
> > > > > > particular, I was expecting that the lock will avoid two threads
> > > > > > opening the same RocksDB in KIP-1035. Wouldn't this cause problems?
> > > > > >
> > > > > > Cheers,
> > > > > > Lucas
> > > > > >
> > > > > > On Fri, Aug 16, 2024 at 11:34 AM Nick Telford 
> > > > > > <nick.telf...@gmail.com>
> > > > > > wrote:
> > > > > > >
> > > > > > > Hi Lucas,
> > > > > > >
> > > > > > > NT4.
> > > > > > > The reason I mentioned this was because, while implementing 1035, 
> > > > > > > I
> > > > > > > stumbled across a problem: initially I had changed it so that 
> > > > > > > threads
> > > > > > > always reported the lag for *all* dormant Tasks on-disk, even if 
> > > > > > > it meant
> > > > > > > multiple threads reporting lag for the same Tasks. I found that 
> > > > > > > this
> > > > > > didn't
> > > > > > > work, apparently because the assignor assumes that multiple 
> > > > > > > threads on
> > > > > > the
> > > > > > > same instance always report disjoint sets.
> > > > > > >
> > > > > > > From reading through 1071, it sounded like this assumption is no 
> > > > > > > longer
> > > > > > > being made by the assignor, and that the processId field would 
> > > > > > > allow the
> > > > > > > assignor to understand when multiple clients reporting lag for 
> > > > > > > the same
> > > > > > > Tasks are on the same instance. This would enable us to do away 
> > > > > > > with the
> > > > > > > locking when reporting lag, and just have threads report the lag 
> > > > > > > for
> > > > > > every
> > > > > > > Task on-disk, even if other threads are reporting lag for the 
> > > > > > > same Tasks.
> > > > > > >
> > > > > > > But it sounds like this is not correct, and that the new assignor 
> > > > > > > will
> > > > > > make
> > > > > > > the same assumptions as the old one?
> > > > > > >
> > > > > > > Regards,
> > > > > > > Nick
> > > > > > >
> > > > > > > On Fri, 16 Aug 2024 at 10:17, Lucas Brutschy 
> > > > > > > <lbruts...@confluent.io
> > > > > > .invalid>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Nick!
> > > > > > > >
> > > > > > > > Thanks for getting involved in the discussion.
> > > > > > > >
> > > > > > > > NT1. We are always referring to offsets in the changelog topics 
> > > > > > > > here.
> > > > > > > > I tried to make it more consistent. But in the schemas and API, 
> > > > > > > > I find
> > > > > > > > "task changelog end offset" a bit lengthy, so we use "task 
> > > > > > > > offset" and
> > > > > > > > "task end offset" for short. We could change it, if people 
> > > > > > > > think this
> > > > > > > > is confusing.
> > > > > > > >
> > > > > > > > NT2. You are right. The confusing part is that the current 
> > > > > > > > streams
> > > > > > > > config is called `max.warmup.replicas`, but in the new 
> > > > > > > > protocol, we
> > > > > > > > are bounding the group-level parameter using
> > > > > > > > `group.streams.max.warmup.replicas`. If we wanted to keep
> > > > > > > > `group.streams.max.warmup.replicas` for the config name on the
> > > > > > > > group-level, we'd have to bound it using
> > > > > > > > `group.streams.max.max.warmup.replicas`. I prefer not doing 
> > > > > > > > this, but
> > > > > > > > open to suggestions.
> > > > > > > >
> > > > > > > > NT3. You are right, we do not need to make it this restrictive. 
> > > > > > > > I
> > > > > > > > think the main problem with having 10,000 warm-up replicas 
> > > > > > > > would be
> > > > > > > > that it slows down the assignment inside the broker - once we 
> > > > > > > > are
> > > > > > > > closer to production-ready implementation, we may have better 
> > > > > > > > numbers
> > > > > > > > of this and may revisit these defaults. I'll set the max to 100 
> > > > > > > > for
> > > > > > > > now, but it would be good to hear what values people typically 
> > > > > > > > use in
> > > > > > > > their production workloads.
> > > > > > > >
> > > > > > > > NT4. We will actually only report the offsets if we manage to 
> > > > > > > > acquire
> > > > > > > > the lock. I tried to make this more precise. I suppose also with
> > > > > > > > KIP-1035, we'd require the lock to read the offset?
> > > > > > > >
> > > > > > > > Cheers,
> > > > > > > > Lucas
> > > > > > > >
> > > > > > > > On Thu, Aug 15, 2024 at 8:40 PM Nick Telford 
> > > > > > > > <nick.telf...@gmail.com>
> > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > Hi everyone,
> > > > > > > > >
> > > > > > > > > Looks really promising, and I can see this resolving several 
> > > > > > > > > issues
> > > > > > I've
> > > > > > > > > noticed. I particularly like the choice to use a String for
> > > > > > Subtopology
> > > > > > > > ID,
> > > > > > > > > as it will (eventually) lead to a better solution to KIP-816.
> > > > > > > > >
> > > > > > > > > I noticed a few typos in the KIP that I thought I'd mention:
> > > > > > > > >
> > > > > > > > > NT1.
> > > > > > > > > In several places you refer to "task changelog end offsets", 
> > > > > > > > > while in
> > > > > > > > > others, you call it "task end offsets". Which is it?
> > > > > > > > >
> > > > > > > > > NT2.
> > > > > > > > > Under "Group Configurations", you included
> > > > > > > > > "group.streams.max.warmup.replicas", but I think you meant
> > > > > > > > > "group.streams.num.warmup.replicas"?
> > > > > > > > >
> > > > > > > > > NT3.
> > > > > > > > > Not a typo, but a suggestion: it makes sense to set the 
> > > > > > > > > default for
> > > > > > > > > "group.streams.num.warmup.replicas" to 2, for compatibility 
> > > > > > > > > with the
> > > > > > > > > existing defaults, but why set the default for
> > > > > > > > > "group.streams.max.warmup.replicas" to only 4? That seems 
> > > > > > > > > extremely
> > > > > > > > > restrictive. These "max" configs are typically used to 
> > > > > > > > > prevent a
> > > > > > subset
> > > > > > > > of
> > > > > > > > > users causing problems on the shared broker cluster - what's 
> > > > > > > > > the
> > > > > > reason
> > > > > > > > to
> > > > > > > > > set such a restrictive value for max warmup replicas? If I 
> > > > > > > > > had 10,000
> > > > > > > > > warmup replicas, would it cause a noticeable problem on the 
> > > > > > > > > brokers?
> > > > > > > > >
> > > > > > > > > NT4.
> > > > > > > > > It's implied that clients send the changelog offsets for *all*
> > > > > > dormant
> > > > > > > > > stateful Tasks, but the current behaviour is that clients 
> > > > > > > > > will only
> > > > > > send
> > > > > > > > > the changelog offsets for the stateful Tasks that they are 
> > > > > > > > > able to
> > > > > > lock
> > > > > > > > > on-disk. Since this is a change in behaviour, perhaps this 
> > > > > > > > > should be
> > > > > > > > called
> > > > > > > > > out explicitly?
> > > > > > > > >
> > > > > > > > > Regards,
> > > > > > > > > Nick
> > > > > > > > >
> > > > > > > > > On Thu, 15 Aug 2024 at 10:55, Lucas Brutschy 
> > > > > > > > > <lbruts...@confluent.io
> > > > > > > > .invalid>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi Andrew,
> > > > > > > > > >
> > > > > > > > > > thanks for the comment.
> > > > > > > > > >
> > > > > > > > > > AS12: I clarified the command-line interface. It's supposed 
> > > > > > > > > > to be
> > > > > > used
> > > > > > > > > > with --reset-offsets and --delete-offsets. I removed 
> > > > > > > > > > --topic.
> > > > > > > > > >
> > > > > > > > > > AS13: Yes, it's --delete. I clarified the command-line 
> > > > > > > > > > interface.
> > > > > > > > > >
> > > > > > > > > > Cheers,
> > > > > > > > > > Lucas
> > > > > > > > > >
> > > > > > > > > > On Tue, Aug 13, 2024 at 4:14 PM Andrew Schofield
> > > > > > > > > > <andrew_schofi...@live.com> wrote:
> > > > > > > > > > >
> > > > > > > > > > > Hi Lucas,
> > > > > > > > > > > Thanks for the KIP update.
> > > > > > > > > > >
> > > > > > > > > > > I think that `kafka-streams-groups.sh` looks like a good
> > > > > > equivalent
> > > > > > > > to
> > > > > > > > > > > the tools for the other types of groups.
> > > > > > > > > > >
> > > > > > > > > > > AS12: In kafka-streams-groups.sh, the description for the
> > > > > > > > > > > --input-topics option seems insufficient. Why is an input 
> > > > > > > > > > > topic
> > > > > > > > specified
> > > > > > > > > > > with this option different than a topic specified with 
> > > > > > > > > > > --topic?
> > > > > > Why
> > > > > > > > is
> > > > > > > > > > > It --input-topics rather than --input-topic? Which action 
> > > > > > > > > > > of this
> > > > > > > > tool
> > > > > > > > > > > does this option apply to?
> > > > > > > > > > >
> > > > > > > > > > > AS13: Similarly, for --internal-topics, which action of 
> > > > > > > > > > > the tool
> > > > > > > > does it
> > > > > > > > > > > apply to? I suppose it’s --delete, but it’s not clear to 
> > > > > > > > > > > me.
> > > > > > > > > > >
> > > > > > > > > > > Thanks,
> > > > > > > > > > > Andrew
> > > > > > > > > > >
> > > > > > > > > > > > On 11 Aug 2024, at 12:10, Lucas Brutschy <
> > > > > > lbruts...@confluent.io
> > > > > > > > .INVALID>
> > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > Hi Andrew/Lianet,
> > > > > > > > > > > >
> > > > > > > > > > > > I have added an administrative command-line tool 
> > > > > > > > > > > > (replacing
> > > > > > > > > > > > `kafka-streams-application-reset`) and extensions of 
> > > > > > > > > > > > the Admin
> > > > > > API
> > > > > > > > for
> > > > > > > > > > > > listing, deleting, describing groups and listing, 
> > > > > > > > > > > > altering and
> > > > > > > > > > > > deleting offsets for streams groups. No new RPCs have 
> > > > > > > > > > > > to be
> > > > > > added,
> > > > > > > > > > > > however, we duplicate some of the API in the admin 
> > > > > > > > > > > > client that
> > > > > > > > exist
> > > > > > > > > > > > for consumer groups. It seems to me cleaner to 
> > > > > > > > > > > > duplicate some
> > > > > > > > > > > > code/interface here, instead of using "consumer group" 
> > > > > > > > > > > > APIs for
> > > > > > > > > > > > streams groups, or renaming existing APIs that use
> > > > > > "consumerGroup"
> > > > > > > > in
> > > > > > > > > > > > the name to something more generic (which wouldn't 
> > > > > > > > > > > > cover share
> > > > > > > > > > > > groups).
> > > > > > > > > > > >
> > > > > > > > > > > > I think for now, all comments are addressed.
> > > > > > > > > > > >
> > > > > > > > > > > > Cheers,
> > > > > > > > > > > > Lucas
> > > > > > > > > > > >
> > > > > > > > > > > > On Tue, Aug 6, 2024 at 3:19 PM Lucas Brutschy <
> > > > > > > > lbruts...@confluent.io>
> > > > > > > > > > wrote:
> > > > > > > > > > > >>
> > > > > > > > > > > >> Hi Lianet and Andrew,
> > > > > > > > > > > >>
> > > > > > > > > > > >> LM1/LM2: You are right. The idea is to omit fields 
> > > > > > > > > > > >> exactly in
> > > > > > the
> > > > > > > > same
> > > > > > > > > > > >> situations as in KIP-848. In the KIP, I stuck with how 
> > > > > > > > > > > >> the
> > > > > > > > behavior
> > > > > > > > > > > >> was defined in KIP-848 (e.g. KIP-848 defined that that
> > > > > > instance ID
> > > > > > > > > > > >> will be omitted if it did not change since the last
> > > > > > heartbeat).
> > > > > > > > But
> > > > > > > > > > > >> you are correct that the implementation handles these 
> > > > > > > > > > > >> details
> > > > > > > > slightly
> > > > > > > > > > > >> differently. I updated the KIP to match more closely 
> > > > > > > > > > > >> the
> > > > > > behavior
> > > > > > > > of
> > > > > > > > > > > >> the KIP-848 implementation.
> > > > > > > > > > > >>
> > > > > > > > > > > >> LM9: Yes, there are several options to do this. The 
> > > > > > > > > > > >> idea is to
> > > > > > > > have
> > > > > > > > > > > >> only one client initialize the topology, not all 
> > > > > > > > > > > >> clients. It
> > > > > > seems
> > > > > > > > > > > >> easier to understand on the protocol level (otherwise 
> > > > > > > > > > > >> we'd
> > > > > > have N
> > > > > > > > > > > >> topology initializations racing with a 
> > > > > > > > > > > >> hard-to-determine
> > > > > > winner).
> > > > > > > > We
> > > > > > > > > > > >> also expect the payload of the request to grow in the 
> > > > > > > > > > > >> future
> > > > > > and
> > > > > > > > want
> > > > > > > > > > > >> to avoid the overhead of having all clients sending the
> > > > > > topology
> > > > > > > > at
> > > > > > > > > > > >> the same time. But initializing the group could take 
> > > > > > > > > > > >> some
> > > > > > time -
> > > > > > > > we
> > > > > > > > > > > >> have to create internal topics, and maybe a client is
> > > > > > > > malfunctioning
> > > > > > > > > > > >> and the initialization has to be retried. It seemed a 
> > > > > > > > > > > >> bit
> > > > > > > > confusing to
> > > > > > > > > > > >> return errors to all other clients that are trying to 
> > > > > > > > > > > >> join the
> > > > > > > > group
> > > > > > > > > > > >> during that time - as if there was a problem with 
> > > > > > > > > > > >> joining the
> > > > > > > > group /
> > > > > > > > > > > >> the contents of the heartbeat. It seems cleaner to me 
> > > > > > > > > > > >> to let
> > > > > > all
> > > > > > > > > > > >> clients successfully join the group and heartbeat, but 
> > > > > > > > > > > >> remain
> > > > > > in
> > > > > > > > an
> > > > > > > > > > > >> INITIALIZING state which does not yet assign any 
> > > > > > > > > > > >> tasks. Does
> > > > > > that
> > > > > > > > make
> > > > > > > > > > > >> sense to you? You are right that returning a retriable 
> > > > > > > > > > > >> error
> > > > > > and
> > > > > > > > > > > >> having all clients retry until the group is 
> > > > > > > > > > > >> initialized would
> > > > > > also
> > > > > > > > > > > >> work, it just doesn't model well that "everything is 
> > > > > > > > > > > >> going
> > > > > > > > according
> > > > > > > > > > > >> to plan".
> > > > > > > > > > > >> As for the order of the calls - yes, I think it is 
> > > > > > > > > > > >> fine to
> > > > > > allow
> > > > > > > > an
> > > > > > > > > > > >> Initialize RPC before the first heartbeat for 
> > > > > > > > > > > >> supporting
> > > > > > future
> > > > > > > > admin
> > > > > > > > > > > >> tools. I made this change throughout the KIP, thanks!
> > > > > > > > > > > >>
> > > > > > > > > > > >> AS11: Yes, your understanding is correct. The number 
> > > > > > > > > > > >> of tasks
> > > > > > for
> > > > > > > > one
> > > > > > > > > > > >> subtopology is the maximum number of partitions in any 
> > > > > > > > > > > >> of the
> > > > > > > > matched
> > > > > > > > > > > >> topics. What will happen in Kafka Streams is that the
> > > > > > partitions
> > > > > > > > of
> > > > > > > > > > > >> the matched topics will effectively be merged during 
> > > > > > > > > > > >> stream
> > > > > > > > > > > >> processing, so in your example, subtopology:0 would 
> > > > > > > > > > > >> consume
> > > > > > from
> > > > > > > > AB:0
> > > > > > > > > > > >> and AC:0.
> > > > > > > > > > > >>
> > > > > > > > > > > >> Cheers,
> > > > > > > > > > > >> Lucas
> > > > > > > > > > > >>
> > > > > > > > > > > >> On Fri, Aug 2, 2024 at 9:47 PM Lianet M. 
> > > > > > > > > > > >> <liane...@gmail.com>
> > > > > > > > wrote:
> > > > > > > > > > > >>>
> > > > > > > > > > > >>> Hi Bruno, answering your questions:
> > > > > > > > > > > >>>
> > > > > > > > > > > >>> About the full heartbeat (LM1): I just wanted to 
> > > > > > > > > > > >>> confirm that
> > > > > > > > you'll
> > > > > > > > > > be
> > > > > > > > > > > >>> sending full HBs in case of errors in general. It's 
> > > > > > > > > > > >>> not clear
> > > > > > > > from
> > > > > > > > > > the KIP,
> > > > > > > > > > > >>> since it referred to sending Id/epoch and whatever had
> > > > > > changed
> > > > > > > > since
> > > > > > > > > > the
> > > > > > > > > > > >>> last HB only. Sending full HB on error is key to 
> > > > > > > > > > > >>> ensure fresh
> > > > > > > > > > rejoins after
> > > > > > > > > > > >>> fencing for instance, and retries with all relevant 
> > > > > > > > > > > >>> info.
> > > > > > > > > > > >>>
> > > > > > > > > > > >>> About the instanceId (LM2): The instanceId is needed 
> > > > > > > > > > > >>> on
> > > > > > every HB
> > > > > > > > to
> > > > > > > > > > be able
> > > > > > > > > > > >>> to identify a member using one that is already taken. 
> > > > > > > > > > > >>> On
> > > > > > every
> > > > > > > > HB,
> > > > > > > > > > the
> > > > > > > > > > > >>> broker uses the instance id (if any) to retrieve the 
> > > > > > > > > > > >>> member
> > > > > > ID
> > > > > > > > > > associated
> > > > > > > > > > > >>> with it, and checks it against the memberId received 
> > > > > > > > > > > >>> in the
> > > > > > HB
> > > > > > > > > > > >>> (throwing UnreleasedInstance exception if needed). So
> > > > > > similar to
> > > > > > > > my
> > > > > > > > > > > >>> previous point, just wanted to confirm that we are
> > > > > > considering
> > > > > > > > that
> > > > > > > > > > here
> > > > > > > > > > > >>> too.
> > > > > > > > > > > >>>
> > > > > > > > > > > >>> Now some other thoughts:
> > > > > > > > > > > >>>
> > > > > > > > > > > >>> LM9: Definitely interesting imo if we can avoid the
> > > > > > dependency
> > > > > > > > > > between the
> > > > > > > > > > > >>> StreamsGroupInitialize and the StreamsGroupHeartbeat. 
> > > > > > > > > > > >>> I
> > > > > > totally
> > > > > > > > get
> > > > > > > > > > that
> > > > > > > > > > > >>> the initial client implementation will do a HB first, 
> > > > > > > > > > > >>> and
> > > > > > that's
> > > > > > > > > > fine, but
> > > > > > > > > > > >>> not having the flow enforced at the protocol level 
> > > > > > > > > > > >>> would
> > > > > > allow
> > > > > > > > for
> > > > > > > > > > further
> > > > > > > > > > > >>> improvement in the future (that initialize via admin 
> > > > > > > > > > > >>> idea you
> > > > > > > > > > mentioned,
> > > > > > > > > > > >>> for instance). Actually, I may be missing something 
> > > > > > > > > > > >>> about
> > > > > > the HB,
> > > > > > > > > > but if we
> > > > > > > > > > > >>> are at the point where HB requires that the topology 
> > > > > > > > > > > >>> has been
> > > > > > > > > > initialized,
> > > > > > > > > > > >>> and the topology init requires the group, why is it 
> > > > > > > > > > > >>> the
> > > > > > heartbeat
> > > > > > > > > > RPC the
> > > > > > > > > > > >>> one responsible for the group creation? (vs.
> > > > > > > > StreamsGroupInitialize
> > > > > > > > > > creates
> > > > > > > > > > > >>> group if needed + HB just fails if topology not 
> > > > > > > > > > > >>> initialized)
> > > > > > > > > > > >>>
> > > > > > > > > > > >>> Thanks!
> > > > > > > > > > > >>>
> > > > > > > > > > > >>> Lianet
> > > > > > > > > > > >>> (I didn't miss your answer on my INVALID_GROUP_TYPE 
> > > > > > > > > > > >>> proposal,
> > > > > > > > just
> > > > > > > > > > still
> > > > > > > > > > > >>> thinking about it in sync with the same discussion 
> > > > > > > > > > > >>> we're
> > > > > > having
> > > > > > > > on
> > > > > > > > > > the
> > > > > > > > > > > >>> KIP-1043 thread...I'll come back on that)
> > > > > > > > > > > >>>
> > > > > > > > > > > >>> On Thu, Aug 1, 2024 at 10:55 AM Andrew Schofield <
> > > > > > > > > > andrew_schofi...@live.com>
> > > > > > > > > > > >>> wrote:
> > > > > > > > > > > >>>
> > > > > > > > > > > >>>> Hi Bruno,
> > > > > > > > > > > >>>> Thanks for adding the detail on the schemas on 
> > > > > > > > > > > >>>> records
> > > > > > written
> > > > > > > > to
> > > > > > > > > > > >>>> __consumer_offsets.
> > > > > > > > > > > >>>> I’ve reviewed them in detail and they look good to 
> > > > > > > > > > > >>>> me. I
> > > > > > have
> > > > > > > > one
> > > > > > > > > > naive
> > > > > > > > > > > >>>> question.
> > > > > > > > > > > >>>>
> > > > > > > > > > > >>>> AS11: I notice that an assignment is essentially a 
> > > > > > > > > > > >>>> set of
> > > > > > > > partition
> > > > > > > > > > > >>>> indices for
> > > > > > > > > > > >>>> subtopologies. Since a subtopology can be defined by 
> > > > > > > > > > > >>>> a
> > > > > > source
> > > > > > > > topic
> > > > > > > > > > regex,
> > > > > > > > > > > >>>> does
> > > > > > > > > > > >>>> this mean that an assignment gives the same set of 
> > > > > > > > > > > >>>> partition
> > > > > > > > > > indices for
> > > > > > > > > > > >>>> all topics
> > > > > > > > > > > >>>> which happen to match the regex? So, a subtopology 
> > > > > > > > > > > >>>> reading
> > > > > > from
> > > > > > > > A*
> > > > > > > > > > that
> > > > > > > > > > > >>>> matches
> > > > > > > > > > > >>>> AB and AC would give the same set of partitions to 
> > > > > > > > > > > >>>> each
> > > > > > task for
> > > > > > > > > > both
> > > > > > > > > > > >>>> topics, and
> > > > > > > > > > > >>>> is not able to give AB:0 to one task and AC:0 to a 
> > > > > > > > > > > >>>> different
> > > > > > > > task.
> > > > > > > > > > Is this
> > > > > > > > > > > >>>> correct?
> > > > > > > > > > > >>>>
> > > > > > > > > > > >>>> Thanks,
> > > > > > > > > > > >>>> Andrew
> > > > > > > > > > > >>>>
> > > > > > > > > > > >>>>> On 23 Jul 2024, at 16:16, Bruno Cadonna <
> > > > > > cado...@apache.org>
> > > > > > > > > > wrote:
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> Hi Lianet,
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> Thanks for the review!
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> Here my answers:
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> LM1. Is your question whether we need to send a full
> > > > > > heartbeat
> > > > > > > > > > each time
> > > > > > > > > > > >>>> the member re-joins the group even if the 
> > > > > > > > > > > >>>> information in
> > > > > > the RPC
> > > > > > > > > > did not
> > > > > > > > > > > >>>> change since the last heartbeat?
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> LM2. Is the reason for sending the instance ID each 
> > > > > > > > > > > >>>>> time
> > > > > > that a
> > > > > > > > > > member
> > > > > > > > > > > >>>> could shutdown, change the instance ID and then 
> > > > > > > > > > > >>>> start and
> > > > > > > > heartbeat
> > > > > > > > > > again,
> > > > > > > > > > > >>>> but the group coordinator would never notice that the
> > > > > > instance
> > > > > > > > ID
> > > > > > > > > > changed?
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> LM3. I see your point. I am wondering whether this
> > > > > > additional
> > > > > > > > > > > >>>> information is worth the dependency between the group
> > > > > > types. To
> > > > > > > > > > return
> > > > > > > > > > > >>>> INVALID_GROUP_TYPE, the group coordinator needs to 
> > > > > > > > > > > >>>> know
> > > > > > that a
> > > > > > > > > > group ID
> > > > > > > > > > > >>>> exists with a different group type. With a group
> > > > > > coordinator as
> > > > > > > > we
> > > > > > > > > > have it
> > > > > > > > > > > >>>> now in Apache Kafka that manages all group types, 
> > > > > > > > > > > >>>> that is
> > > > > > not a
> > > > > > > > big
> > > > > > > > > > deal,
> > > > > > > > > > > >>>> but imagine if we (or some implementation of the 
> > > > > > > > > > > >>>> Apache
> > > > > > Kafka
> > > > > > > > > > protocol)
> > > > > > > > > > > >>>> decide to have a separate group coordinator for each 
> > > > > > > > > > > >>>> group
> > > > > > type.
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> LM4. Using INVALID_GROUP_ID if the group ID is 
> > > > > > > > > > > >>>>> empty makes
> > > > > > > > sense
> > > > > > > > > > to me.
> > > > > > > > > > > >>>> I going to change that.
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> LM5. I think there is a dependency from the
> > > > > > > > StreamsGroupInitialize
> > > > > > > > > > RPC
> > > > > > > > > > > >>>> to the heartbeat. The group must exist when the 
> > > > > > > > > > > >>>> initialize
> > > > > > RPC
> > > > > > > > is
> > > > > > > > > > received
> > > > > > > > > > > >>>> by the group coordinator. The group is created by the
> > > > > > heartbeat
> > > > > > > > > > RPC. I
> > > > > > > > > > > >>>> would be in favor of making the initialize RPC 
> > > > > > > > > > > >>>> independent
> > > > > > from
> > > > > > > > the
> > > > > > > > > > > >>>> heartbeat RPC. That would allow to initialize a 
> > > > > > > > > > > >>>> streams
> > > > > > group
> > > > > > > > > > explicitly
> > > > > > > > > > > >>>> with an admin tool.
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> LM6. I think it affects streams and streams should 
> > > > > > > > > > > >>>>> behave
> > > > > > as
> > > > > > > > the
> > > > > > > > > > > >>>> consumer group.
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> LM7. Good point that we will consider.
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> LM8. Fixed! Thanks!
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> Best,
> > > > > > > > > > > >>>>> Bruno
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> On 7/19/24 9:53 PM, Lianet M. wrote:
> > > > > > > > > > > >>>>>> Hi Lucas/Bruno, thanks for the great KIP! First 
> > > > > > > > > > > >>>>>> comments:
> > > > > > > > > > > >>>>>> LM1. Related to where the KIP says:  *“Group ID, 
> > > > > > > > > > > >>>>>> member
> > > > > > ID,
> > > > > > > > > > member epoch
> > > > > > > > > > > >>>>>> are sent with each heartbeat request. Any other
> > > > > > information
> > > > > > > > that
> > > > > > > > > > has not
> > > > > > > > > > > >>>>>> changed since the last heartbeat can be omitted.”. 
> > > > > > > > > > > >>>>>> *I
> > > > > > expect
> > > > > > > > all
> > > > > > > > > > the
> > > > > > > > > > > >>>> other
> > > > > > > > > > > >>>>>> info also needs to be sent whenever a full 
> > > > > > > > > > > >>>>>> heartbeat is
> > > > > > > > required
> > > > > > > > > > (even
> > > > > > > > > > > >>>> if
> > > > > > > > > > > >>>>>> it didn’t change from the last heartbeat), ex. on 
> > > > > > > > > > > >>>>>> fencing
> > > > > > > > > > scenarios,
> > > > > > > > > > > >>>>>> correct?
> > > > > > > > > > > >>>>>> LM2. For consumer groups we always send the
> > > > > > groupInstanceId
> > > > > > > > (if
> > > > > > > > > > any) as
> > > > > > > > > > > >>>>>> part of every heartbeat, along with memberId, 
> > > > > > > > > > > >>>>>> epoch and
> > > > > > > > groupId.
> > > > > > > > > > Should
> > > > > > > > > > > >>>> we
> > > > > > > > > > > >>>>>> consider that too here?
> > > > > > > > > > > >>>>>> LM3. We’re proposing returning a 
> > > > > > > > > > > >>>>>> GROUP_ID_NOT_FOUND error
> > > > > > in
> > > > > > > > > > response to
> > > > > > > > > > > >>>>>> the stream-specific RPCs if the groupId is 
> > > > > > > > > > > >>>>>> associated
> > > > > > with a
> > > > > > > > > > group type
> > > > > > > > > > > >>>>>> that is not streams (ie. consumer group or share 
> > > > > > > > > > > >>>>>> group). I
> > > > > > > > wonder
> > > > > > > > > > if at
> > > > > > > > > > > >>>>>> this point, where we're getting several new group 
> > > > > > > > > > > >>>>>> types
> > > > > > added,
> > > > > > > > > > each with
> > > > > > > > > > > >>>>>> RPCs that are supposed to include groupId of a 
> > > > > > > > > > > >>>>>> certain
> > > > > > type,
> > > > > > > > we
> > > > > > > > > > should
> > > > > > > > > > > >>>> be
> > > > > > > > > > > >>>>>> more explicit about this situation. Maybe a kind of
> > > > > > > > > > INVALID_GROUP_TYPE
> > > > > > > > > > > >>>>>> (group exists but not with a valid type for this 
> > > > > > > > > > > >>>>>> RPC) vs a
> > > > > > > > > > > >>>>>> GROUP_ID_NOT_FOUND (group does not exist).  Those 
> > > > > > > > > > > >>>>>> errors
> > > > > > > > would be
> > > > > > > > > > > >>>>>> consistently used across consumer, share, and 
> > > > > > > > > > > >>>>>> streams RPCs
> > > > > > > > > > whenever the
> > > > > > > > > > > >>>>>> group id is not of the expected type.
> > > > > > > > > > > >>>>>> This is truly not specific to this KIP, and should 
> > > > > > > > > > > >>>>>> be
> > > > > > > > addressed
> > > > > > > > > > with all
> > > > > > > > > > > >>>>>> group types and their RPCs in mind. I just wanted 
> > > > > > > > > > > >>>>>> to bring
> > > > > > > > out my
> > > > > > > > > > > >>>> concern
> > > > > > > > > > > >>>>>> and get thoughts around it.
> > > > > > > > > > > >>>>>> LM4. On a related note, StreamsGroupDescribe 
> > > > > > > > > > > >>>>>> returns
> > > > > > > > > > INVALID_REQUEST if
> > > > > > > > > > > >>>>>> groupId is empty. There is already an 
> > > > > > > > > > > >>>>>> INVALID_GROUP_ID
> > > > > > error,
> > > > > > > > > > that seems
> > > > > > > > > > > >>>>>> more specific to this situation. Error handling of
> > > > > > specific
> > > > > > > > > > errors would
> > > > > > > > > > > >>>>>> definitely be easier than having to deal with a 
> > > > > > > > > > > >>>>>> generic
> > > > > > > > > > INVALID_REQUEST
> > > > > > > > > > > >>>>>> (and probably its custom message). I know that for
> > > > > > KIP-848 we
> > > > > > > > have
> > > > > > > > > > > >>>>>> INVALID_REQUEST for similar situations, so if ever 
> > > > > > > > > > > >>>>>> we take
> > > > > > > > down
> > > > > > > > > > this
> > > > > > > > > > > >>>> path
> > > > > > > > > > > >>>>>> we should review it there too for consistency. 
> > > > > > > > > > > >>>>>> Thoughts?
> > > > > > > > > > > >>>>>> LM5. The dependency between the 
> > > > > > > > > > > >>>>>> StreamsGroupHeartbeat RPC
> > > > > > and
> > > > > > > > the
> > > > > > > > > > > >>>>>> StreamsGroupInitialize RPC is one-way only right? 
> > > > > > > > > > > >>>>>> HB
> > > > > > requires
> > > > > > > > a
> > > > > > > > > > previous
> > > > > > > > > > > >>>>>> StreamsGroupInitialize request, but 
> > > > > > > > > > > >>>>>> StreamsGroupInitialize
> > > > > > > > > > processing is
> > > > > > > > > > > >>>>>> totally independent of heartbeats (and could 
> > > > > > > > > > > >>>>>> perfectly be
> > > > > > > > > > processed
> > > > > > > > > > > >>>> without
> > > > > > > > > > > >>>>>> a previous HB, even though the client 
> > > > > > > > > > > >>>>>> implementation we’re
> > > > > > > > > > proposing
> > > > > > > > > > > >>>> won’t
> > > > > > > > > > > >>>>>> go down that path). Is my understanding correct? 
> > > > > > > > > > > >>>>>> Just to
> > > > > > > > double
> > > > > > > > > > check,
> > > > > > > > > > > >>>>>> seems sensible like that at the protocol level.
> > > > > > > > > > > >>>>>> LM6. With KIP-848, there is an important 
> > > > > > > > > > > >>>>>> improvement that
> > > > > > > > brings a
> > > > > > > > > > > >>>>>> difference in behaviour around the static 
> > > > > > > > > > > >>>>>> membership:
> > > > > > with the
> > > > > > > > > > classic
> > > > > > > > > > > >>>>>> protocol, if a static member joins with a group 
> > > > > > > > > > > >>>>>> instance
> > > > > > > > already
> > > > > > > > > > in
> > > > > > > > > > > >>>> use, it
> > > > > > > > > > > >>>>>> makes the initial member fail with a 
> > > > > > > > > > > >>>>>> FENCED_INSTANCED_ID
> > > > > > > > > > exception, vs.
> > > > > > > > > > > >>>>>> with the new consumer group protocol, the second 
> > > > > > > > > > > >>>>>> member
> > > > > > > > trying to
> > > > > > > > > > join
> > > > > > > > > > > >>>>>> fails with an UNRELEASED_INSTANCE_ID. Does this 
> > > > > > > > > > > >>>>>> change
> > > > > > need
> > > > > > > > to be
> > > > > > > > > > > >>>>>> considered in any way for the streams app? (I'm not
> > > > > > familiar
> > > > > > > > with
> > > > > > > > > > KS
> > > > > > > > > > > >>>> yet,
> > > > > > > > > > > >>>>>> but thought it was worth asking. If it doesn't 
> > > > > > > > > > > >>>>>> affect in
> > > > > > any
> > > > > > > > way,
> > > > > > > > > > still
> > > > > > > > > > > >>>>>> maybe helpful to call it out on a section for 
> > > > > > > > > > > >>>>>> static
> > > > > > > > membership)
> > > > > > > > > > > >>>>>> LM7. Regarding the admin tool to manage streams 
> > > > > > > > > > > >>>>>> groups.
> > > > > > We can
> > > > > > > > > > discuss
> > > > > > > > > > > >>>>>> whether to have it here or separately, but I think 
> > > > > > > > > > > >>>>>> we
> > > > > > should
> > > > > > > > aim
> > > > > > > > > > for
> > > > > > > > > > > >>>> some
> > > > > > > > > > > >>>>>> basic admin capabilities from the start, mainly 
> > > > > > > > > > > >>>>>> because I
> > > > > > > > believe
> > > > > > > > > > it
> > > > > > > > > > > >>>> will
> > > > > > > > > > > >>>>>> be very helpful/needed in practice during the impl 
> > > > > > > > > > > >>>>>> of the
> > > > > > KIP.
> > > > > > > > > > From
> > > > > > > > > > > >>>>>> experience with KIP-848, we felt a bit blindfolded 
> > > > > > > > > > > >>>>>> in the
> > > > > > > > initial
> > > > > > > > > > phase
> > > > > > > > > > > >>>>>> where we still didn't have kafka-consumer-groups 
> > > > > > > > > > > >>>>>> dealing
> > > > > > with
> > > > > > > > the
> > > > > > > > > > new
> > > > > > > > > > > >>>>>> groups (and then it was very helpful and used when 
> > > > > > > > > > > >>>>>> we were
> > > > > > > > able to
> > > > > > > > > > > >>>> easily
> > > > > > > > > > > >>>>>> inspect them from the console)
> > > > > > > > > > > >>>>>> LM8. nit: the links the KIP-848 are not quite right
> > > > > > (pointing
> > > > > > > > to
> > > > > > > > > > an
> > > > > > > > > > > >>>>>> unrelated “Future work section” at the end of 
> > > > > > > > > > > >>>>>> KIP-848)
> > > > > > > > > > > >>>>>> Thanks!
> > > > > > > > > > > >>>>>> Lianet
> > > > > > > > > > > >>>>>> On Fri, Jul 19, 2024 at 11:13 AM Lucas Brutschy
> > > > > > > > > > > >>>>>> <lbruts...@confluent.io.invalid> wrote:
> > > > > > > > > > > >>>>>>> Hi Andrew,
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>> AS2: I added a note for now. If others feel 
> > > > > > > > > > > >>>>>>> strongly
> > > > > > about
> > > > > > > > it,
> > > > > > > > > > we can
> > > > > > > > > > > >>>>>>> still add more administrative tools to the KIP - 
> > > > > > > > > > > >>>>>>> it
> > > > > > should
> > > > > > > > not
> > > > > > > > > > change
> > > > > > > > > > > >>>>>>> the overall story significantly.
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>> AS8: "streams.group.assignor.name" sounds good to 
> > > > > > > > > > > >>>>>>> me to
> > > > > > > > > > distinguish
> > > > > > > > > > > >>>>>>> the config from class names. Not sure if I like 
> > > > > > > > > > > >>>>>>> the
> > > > > > > > "default".
> > > > > > > > > > To be
> > > > > > > > > > > >>>>>>> consistent, we'd then have to call it
> > > > > > > > > > > >>>>>>> `group.streams.default.session.timeout.ms` as 
> > > > > > > > > > > >>>>>>> well. I
> > > > > > only
> > > > > > > > > > added the
> > > > > > > > > > > >>>>>>> `.name` on both broker and group level for now.
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>> AS10: Ah, I misread your comment, now I know what 
> > > > > > > > > > > >>>>>>> you
> > > > > > meant.
> > > > > > > > Good
> > > > > > > > > > > >>>>>>> point, fixed (by Bruno).
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>> Cheers,
> > > > > > > > > > > >>>>>>> Lucas
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>> On Fri, Jul 19, 2024 at 4:44 PM Andrew Schofield
> > > > > > > > > > > >>>>>>> <andrew_schofi...@live.com> wrote:
> > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > >>>>>>>> Hi Lucas,
> > > > > > > > > > > >>>>>>>> I see that I hit send too quickly. One more 
> > > > > > > > > > > >>>>>>>> comment:
> > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > >>>>>>>> AS2: I think stating that there will be a
> > > > > > > > > > `kafka-streams-group.sh` in
> > > > > > > > > > > >>>> a
> > > > > > > > > > > >>>>>>>> future KIP is fine to keep this KIP focused.
> > > > > > Personally, I
> > > > > > > > would
> > > > > > > > > > > >>>> probably
> > > > > > > > > > > >>>>>>>> put all of the gory details in this KIP, but 
> > > > > > > > > > > >>>>>>>> then it’s
> > > > > > not
> > > > > > > > my
> > > > > > > > > > KIP. A
> > > > > > > > > > > >>>>>>> future
> > > > > > > > > > > >>>>>>>> pointer is fine too.
> > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > >>>>>>>> Thanks,
> > > > > > > > > > > >>>>>>>> Andrew
> > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > >>>>>>>>> On 19 Jul 2024, at 13:46, Lucas Brutschy <
> > > > > > > > > > lbruts...@confluent.io
> > > > > > > > > > > >>>> .INVALID>
> > > > > > > > > > > >>>>>>> wrote:
> > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > >>>>>>>>> Hi Andrew,
> > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > >>>>>>>>> thanks for getting the discussion going! Here 
> > > > > > > > > > > >>>>>>>>> are my
> > > > > > > > responses.
> > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > >>>>>>>>> AS1: Good point, done.
> > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > >>>>>>>>> AS2: We were planning to add more 
> > > > > > > > > > > >>>>>>>>> administrative tools
> > > > > > to
> > > > > > > > the
> > > > > > > > > > > >>>>>>>>> interface in a follow-up KIP, to not make this 
> > > > > > > > > > > >>>>>>>>> KIP too
> > > > > > > > large.
> > > > > > > > > > If
> > > > > > > > > > > >>>>>>>>> people think that it would help to understand 
> > > > > > > > > > > >>>>>>>>> the
> > > > > > overall
> > > > > > > > > > picture if
> > > > > > > > > > > >>>>>>>>> we already add something like
> > > > > > `kafka-streams-groups.sh`, we
> > > > > > > > > > will do
> > > > > > > > > > > >>>>>>>>> that. I also agree that we should address how 
> > > > > > > > > > > >>>>>>>>> this
> > > > > > relates
> > > > > > > > to
> > > > > > > > > > > >>>>>>>>> KIP-1043, we'll add it.
> > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > >>>>>>>>> AS3: Good idea, that's more consistent with
> > > > > > `assigning` and
> > > > > > > > > > > >>>>>>> `reconciling` etc.
> > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > >>>>>>>>> AS4: Thanks, Fixed.
> > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > >>>>>>>>> AS5: Good catch. This was supposed to mean that 
> > > > > > > > > > > >>>>>>>>> we
> > > > > > require
> > > > > > > > > > CREATE on
> > > > > > > > > > > >>>>>>>>> cluster or CREATE on all topics, not both. 
> > > > > > > > > > > >>>>>>>>> Fixed.
> > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > >>>>>>>>> AS6: Thanks, Fixed.
> > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > >>>>>>>>> AS7. Thanks, Fixed.
> > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > >>>>>>>>> AS8: I think this works a bit different in this 
> > > > > > > > > > > >>>>>>>>> KIP
> > > > > > than in
> > > > > > > > > > consumer
> > > > > > > > > > > >>>>>>>>> groups. KIP-848 lets the members vote for a 
> > > > > > > > > > > >>>>>>>>> preferred
> > > > > > > > > > assignor, and
> > > > > > > > > > > >>>>>>>>> the broker-side assignor is picked by majority 
> > > > > > > > > > > >>>>>>>>> vote.
> > > > > > The
> > > > > > > > > > > >>>>>>>>> `group.consumer.assignors` specifies the list of
> > > > > > assignors
> > > > > > > > > > that are
> > > > > > > > > > > >>>>>>>>> supported on the broker, and is configurable 
> > > > > > > > > > > >>>>>>>>> because
> > > > > > the
> > > > > > > > > > interface is
> > > > > > > > > > > >>>>>>>>> pluggable. In this KIP, the task assignor is 
> > > > > > > > > > > >>>>>>>>> not voted
> > > > > > on
> > > > > > > > by
> > > > > > > > > > members
> > > > > > > > > > > >>>>>>>>> but configured on the broker-side.
> > > > > > > > `group.streams.assignor` is
> > > > > > > > > > used
> > > > > > > > > > > >>>>>>>>> for this, and uses a specific name. If we'll 
> > > > > > > > > > > >>>>>>>>> make the
> > > > > > task
> > > > > > > > > > assignor
> > > > > > > > > > > >>>>>>>>> pluggable on the broker-side, we'd introduce a 
> > > > > > > > > > > >>>>>>>>> separate
> > > > > > > > config
> > > > > > > > > > > >>>>>>>>> `group.streams.assignors`, which would indeed 
> > > > > > > > > > > >>>>>>>>> be a
> > > > > > list of
> > > > > > > > > > class
> > > > > > > > > > > >>>>>>>>> names. I think there is no conflict here, the 
> > > > > > > > > > > >>>>>>>>> two
> > > > > > > > > > configurations
> > > > > > > > > > > >>>> serve
> > > > > > > > > > > >>>>>>>>> different purposes.  The only gripe I'd have 
> > > > > > > > > > > >>>>>>>>> here is
> > > > > > > > naming as
> > > > > > > > > > > >>>>>>>>> `group.streams.assignor` and 
> > > > > > > > > > > >>>>>>>>> `group.streams.assignors`
> > > > > > > > would
> > > > > > > > > > be a bit
> > > > > > > > > > > >>>>>>>>> similar, but I cannot really think of a better 
> > > > > > > > > > > >>>>>>>>> name for
> > > > > > > > > > > >>>>>>>>> `group.streams.assignor`, so I'd probably rather
> > > > > > introduce
> > > > > > > > > > > >>>>>>>>> `group.streams.assignors`  as
> > > > > > > > > > `group.streams.possible_assignors`  or
> > > > > > > > > > > >>>>>>>>> something like that.
> > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > >>>>>>>>> AS9: I added explanations for the various 
> > > > > > > > > > > >>>>>>>>> record types.
> > > > > > > > Apart
> > > > > > > > > > from
> > > > > > > > > > > >>>> the
> > > > > > > > > > > >>>>>>>>> new topology record, and the partition metadata 
> > > > > > > > > > > >>>>>>>>> (which
> > > > > > is
> > > > > > > > > > based on
> > > > > > > > > > > >>>> the
> > > > > > > > > > > >>>>>>>>> topology and can only be created once we have a
> > > > > > topology
> > > > > > > > > > initialized)
> > > > > > > > > > > >>>>>>>>> the lifecycle for the records is basically 
> > > > > > > > > > > >>>>>>>>> identical
> > > > > > as in
> > > > > > > > > > KIP-848.
> > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > >>>>>>>>> AS10: In the consumer offset topic, the version 
> > > > > > > > > > > >>>>>>>>> in the
> > > > > > key
> > > > > > > > is
> > > > > > > > > > used to
> > > > > > > > > > > >>>>>>>>> differentiate different schema types with the 
> > > > > > > > > > > >>>>>>>>> same
> > > > > > > > content. So
> > > > > > > > > > the
> > > > > > > > > > > >>>>>>>>> keys are not versioned, but the version field is
> > > > > > "abused"
> > > > > > > > as a
> > > > > > > > > > type
> > > > > > > > > > > >>>>>>>>> tag. This is the same in KIP-848, we followed 
> > > > > > > > > > > >>>>>>>>> it for
> > > > > > > > > > consistency.
> > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > >>>>>>>>> Cheers,
> > > > > > > > > > > >>>>>>>>> Lucas
> > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > >>>>>>>>> On Thu, Jul 18, 2024 at 1:27 PM Andrew Schofield
> > > > > > > > > > > >>>>>>>>> <andrew_schofi...@live.com> wrote:
> > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>> Hi Lucas and Bruno,
> > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>> Thanks for the great KIP.
> > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>> I've read through the document and have some 
> > > > > > > > > > > >>>>>>>>>> initial
> > > > > > > > comments.
> > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>> AS1: I suppose that there is a new
> > > > > > > > > > o.a.k.common.GroupType.STREAMS
> > > > > > > > > > > >>>>>>> enumeration
> > > > > > > > > > > >>>>>>>>>> constant. This is a change to the public 
> > > > > > > > > > > >>>>>>>>>> interface and
> > > > > > > > should
> > > > > > > > > > be
> > > > > > > > > > > >>>>>>> called out.
> > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>> AS2: Since streams groups are no longer 
> > > > > > > > > > > >>>>>>>>>> consumer
> > > > > > groups,
> > > > > > > > how
> > > > > > > > > > does
> > > > > > > > > > > >>>> the
> > > > > > > > > > > >>>>>>> user
> > > > > > > > > > > >>>>>>>>>> manipulate them, observe lag and so on? Will 
> > > > > > > > > > > >>>>>>>>>> you add
> > > > > > > > > > > >>>>>>> `kafka-streams-groups.sh`
> > > > > > > > > > > >>>>>>>>>> or extend 
> > > > > > > > > > > >>>>>>>>>> `kafka-streams-application-reset.sh`? Of
> > > > > > course,
> > > > > > > > > > KIP-1043
> > > > > > > > > > > >>>>>>> can easily
> > > > > > > > > > > >>>>>>>>>> be extended to support streams groups, but 
> > > > > > > > > > > >>>>>>>>>> that only
> > > > > > lets
> > > > > > > > the
> > > > > > > > > > user
> > > > > > > > > > > >>>>>>> see the
> > > > > > > > > > > >>>>>>>>>> groups, not manipulate them.
> > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>> AS3: I wonder whether the streams group state 
> > > > > > > > > > > >>>>>>>>>> of
> > > > > > > > > > UNINITIALIZED would
> > > > > > > > > > > >>>>>>> be
> > > > > > > > > > > >>>>>>>>>> better expressed as INITIALIZING.
> > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>> AS4: In StreamsGroupInitializeRequest,
> > > > > > > > > > Topology[].SourceTopicRegex
> > > > > > > > > > > >>>>>>> should
> > > > > > > > > > > >>>>>>>>>> be nullable.
> > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>> AS5: Why does StreamsGroupInitialize require 
> > > > > > > > > > > >>>>>>>>>> CREATE
> > > > > > > > > > permission on
> > > > > > > > > > > >>>> the
> > > > > > > > > > > >>>>>>>>>> cluster resource? I imagine that this is one 
> > > > > > > > > > > >>>>>>>>>> of the
> > > > > > ways
> > > > > > > > that
> > > > > > > > > > the
> > > > > > > > > > > >>>>>>> request might
> > > > > > > > > > > >>>>>>>>>> be granted permission to create the
> > > > > > StateChangelogTopics
> > > > > > > > and
> > > > > > > > > > > >>>>>>>>>> RepartitionSourceTopics, but if it is granted
> > > > > > permission
> > > > > > > > to
> > > > > > > > > > create
> > > > > > > > > > > >>>>>>> those topics
> > > > > > > > > > > >>>>>>>>>> with specific ACLs, would CREATE on the cluster
> > > > > > resource
> > > > > > > > > > still be
> > > > > > > > > > > >>>>>>> required?
> > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>> AS6: StreamsGroupInitialize can also fail with
> > > > > > > > > > > >>>>>>> TOPIC_AUTHORIZATION_FAILED
> > > > > > > > > > > >>>>>>>>>> and (subject to AS5) 
> > > > > > > > > > > >>>>>>>>>> CLUSTER_AUTHORIZATION_FAILED.
> > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>> AS7: A tiny nit. You've used TopologyID 
> > > > > > > > > > > >>>>>>>>>> (capitals) in
> > > > > > > > > > > >>>>>>> StreamsGroupHeartbeatRequest
> > > > > > > > > > > >>>>>>>>>> and a few others, but in all other cases the 
> > > > > > > > > > > >>>>>>>>>> fields
> > > > > > which
> > > > > > > > are
> > > > > > > > > > ids
> > > > > > > > > > > >>>> are
> > > > > > > > > > > >>>>>>> spelled Id.
> > > > > > > > > > > >>>>>>>>>> I suggest TopologyId.
> > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>> Also, "interal" is probably meant to be 
> > > > > > > > > > > >>>>>>>>>> "interval”.
> > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>> AS8: For consumer groups, the
> > > > > > `group.consumer.assignors`
> > > > > > > > > > > >>>>>>> configuration is a
> > > > > > > > > > > >>>>>>>>>> list of class names. The assignors do have 
> > > > > > > > > > > >>>>>>>>>> names too,
> > > > > > but
> > > > > > > > the
> > > > > > > > > > > >>>>>>> configuration which
> > > > > > > > > > > >>>>>>>>>> enables them is in terms of class names. I 
> > > > > > > > > > > >>>>>>>>>> wonder
> > > > > > whether
> > > > > > > > the
> > > > > > > > > > > >>>> broker’s
> > > > > > > > > > > >>>>>>>>>> group.streams.assignor could actually be
> > > > > > > > > > `group.streams.assignors`
> > > > > > > > > > > >>>>>>> and specified
> > > > > > > > > > > >>>>>>>>>> as a list of the class names of the supplied
> > > > > > assignors. I
> > > > > > > > know
> > > > > > > > > > > >>>> you're
> > > > > > > > > > > >>>>>>> not supporting
> > > > > > > > > > > >>>>>>>>>> other assignors yet, but when you do, I expect 
> > > > > > > > > > > >>>>>>>>>> you
> > > > > > would
> > > > > > > > > > prefer to
> > > > > > > > > > > >>>>>>> have used class
> > > > > > > > > > > >>>>>>>>>> names from the start.
> > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>> The use of assignor names in the other places 
> > > > > > > > > > > >>>>>>>>>> looks
> > > > > > good
> > > > > > > > to
> > > > > > > > > > me.
> > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>> AS9: I'd find it really helpful to have a bit 
> > > > > > > > > > > >>>>>>>>>> of a
> > > > > > > > > > description about
> > > > > > > > > > > >>>>>>> the purpose and
> > > > > > > > > > > >>>>>>>>>> lifecycle of the 9 record types you've 
> > > > > > > > > > > >>>>>>>>>> introduced on
> > > > > > the
> > > > > > > > > > > >>>>>>> __consumer_offsets topic.
> > > > > > > > > > > >>>>>>>>>> I did a cursory review but without really
> > > > > > understanding
> > > > > > > > what's
> > > > > > > > > > > >>>>>>> written when,
> > > > > > > > > > > >>>>>>>>>> I can't do a thorough job of reviewing.
> > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>> AS10: In the definitions of the record keys, 
> > > > > > > > > > > >>>>>>>>>> such as
> > > > > > > > > > > >>>>>>>>>> StreamsGroupCurrentMemberAssignmentKey, the 
> > > > > > > > > > > >>>>>>>>>> versions
> > > > > > of
> > > > > > > > the
> > > > > > > > > > fields
> > > > > > > > > > > >>>>>>> must
> > > > > > > > > > > >>>>>>>>>> match the versions of the types.
> > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>> Thanks,
> > > > > > > > > > > >>>>>>>>>> Andrew
> > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>> On 12 Jul 2024, at 09:04, Lucas Brutschy <
> > > > > > > > > > lbruts...@confluent.io
> > > > > > > > > > > >>>> .INVALID>
> > > > > > > > > > > >>>>>>> wrote:
> > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>> Hi all,
> > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>> I would like to start a discussion thread on
> > > > > > KIP-1071:
> > > > > > > > > > Streams
> > > > > > > > > > > >>>>>>>>>>> Rebalance Protocol. With this KIP, we aim to 
> > > > > > > > > > > >>>>>>>>>>> bring
> > > > > > the
> > > > > > > > > > principles
> > > > > > > > > > > >>>>>>> laid
> > > > > > > > > > > >>>>>>>>>>> down by KIP-848 to Kafka Streams, to make 
> > > > > > > > > > > >>>>>>>>>>> rebalances
> > > > > > more
> > > > > > > > > > reliable
> > > > > > > > > > > >>>>>>> and
> > > > > > > > > > > >>>>>>>>>>> scalable, and make Kafka Streams overall 
> > > > > > > > > > > >>>>>>>>>>> easier to
> > > > > > > > deploy and
> > > > > > > > > > > >>>>>>> operate.
> > > > > > > > > > > >>>>>>>>>>> The KIP proposed moving the assignment logic 
> > > > > > > > > > > >>>>>>>>>>> to the
> > > > > > > > broker,
> > > > > > > > > > and
> > > > > > > > > > > >>>>>>>>>>> introducing a dedicated group type and 
> > > > > > > > > > > >>>>>>>>>>> dedicated
> > > > > > RPCs for
> > > > > > > > > > Kafka
> > > > > > > > > > > >>>>>>>>>>> Streams.
> > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>> The KIP is here:
> > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>
> > > > > > > > > >
> > > > > > > >
> > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1071%3A+Streams+Rebalance+Protocol
> > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>> This is joint work with Bruno Cadonna.
> > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>> Please take a look and let us know what you 
> > > > > > > > > > > >>>>>>>>>>> think.
> > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>> Best,
> > > > > > > > > > > >>>>>>>>>>> Lucas
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > >
> > > > > >
> > > > > >
> > > >
> >

Reply via email to