Hi Guozhang, thanks for clarifying. I think I understand better what you meant now, However, my question remains - wouldn't that effectively make a "rolling bounce" like an offline upgrade, if the application effectively halts processing during the roll? I agree that could be simpler, but it would also mean we cannot remain available even during minor modifications to the topology - something that is possible today.
w.r.t. a) - we'd still need to have an initialization step at some point to tell the broker about the required internal topics etc. Cheers, Lucas On Mon, Aug 19, 2024 at 5:19 PM Guozhang Wang <guozhang.wang...@gmail.com> wrote: > > Hi Lucas, > > From the current description in section "Topology updates", my > understanding is that a) coordinator will remember a topology ID as > the group topology ID, which has to be initialized and agreed by > everyone in the current generation; b) when forming a new generation, > if some members has a topology ID which is different from the group > topology ID that previous generation's members all agree, we will try > to act smart by not assigning any new tasks to these members, will > still give them old tasks (if any) that they own in previous > generation, c) we allow clients to initialize a new topology Id. > > I'm feeling simply that the above complex logic may not be worth it > (plus, what if some tasks no longer exist under the new topology ID > etc, in all we need to consider quite some different corner cases). > What if we just : > > a) do not have the "initialize topology" logic at all, and > b) do not try to try to do assignment, including trying to give the > ones with inconsistent IDs their previous tasks, etc; but simply > c) in any generations, if not every member agrees on the same topology > ID, simply do not perform new assignment, and return an warning code > telling every client there are other peer's whose topology are > different (of course it could be because of a rolling bounce, so no > need to shout out as an ERROR but just WARN or even INFO), every > client will just act as if there's no new assignment received. This is > what I meant by "blocking the progress" since as we did not perform a > new assignment, the new topology ID would not be accepted and hence in > an rolling bounce upgrade case the new application's topology would > not be executed. And if it keeps happening, operators would use > DescribeStreamsGroup to ping down who are the culprits. > > > > On Mon, Aug 19, 2024 at 7:06 AM Lucas Brutschy > <lbruts...@confluent.io.invalid> wrote: > > > > Hi Guozhang, > > > > Thanks for reviewing the KIP, your feedback is extremely valuable. > > > > I think your analysis is quite right - we care about cases a) and b) > > and I generally agree - we want the protocol to be simple and > > debuggable. Situation a) should be relatively rare since in the common > > case all streams applications run from the same jar/build, and we > > shouldn't have zombies that don't update to a new topology. In this > > case, it should just be easy to debug. In situation b), things should > > "just work". And I think both are enabled by the KIP. In particular, > > these situations should be relatively easy to debug: > > > > - Using DescribeStreamsGroup, you can find out the topology ID of the > > group and the topology ID of each member, to understand > > inconsistencies. > > - Inconsistent clients and even the broker could log messages to > > indicate the inconsistencies. > > - One could also consider exposing the number of clients by topology > > IDs as a metric, to enhance observability (this is not in the KIP > > yet). > > > > What I'm not sure about is, what you mean precisely by temporarily > > blocking progress of the group? Do you propose to stop processing > > altogether if topology IDs don't match - wouldn't that defy the aim of > > doing a rolling bounce of the application (in case b)? > > > > Cheers, > > Lucas > > > > On Mon, Aug 19, 2024 at 3:59 PM Lucas Brutschy <lbruts...@confluent.io> > > wrote: > > > > > > Hi Nick, > > > > > > NT4: As discussed, we will still require locking in the new protocol > > > to avoid concurrent read/write access on the checkpoint file, at least > > > as long as KIP-1035 hasn't landed. However, as you correctly pointed > > > out, the assignor will have to accept offsets for overlapping sets of > > > dormant tasks. I updated the KIP to make this explicit. If the > > > corresponding offset information for one task conflicts between > > > clients (which can happen), the conflict is resolved by taking the > > > maximum of the offsets. > > > > > > Cheers, > > > Lucas > > > > > > On Fri, Aug 16, 2024 at 7:14 PM Guozhang Wang > > > <guozhang.wang...@gmail.com> wrote: > > > > > > > > Hello Lucas, > > > > > > > > Thanks for the great KIP. I've read it through and it looks good to > > > > me. As we've discussed, much of my thoughts would be outside the scope > > > > of this very well scoped and defined KIP, so I will omit them for now. > > > > > > > > The only one I had related to this KIP is about topology updating. I > > > > understand the motivation of the proposal is that basically since each > > > > time group forming a (new) generation may potentially accept not all > > > > of the members joining because of the timing of the RPCs, the group's > > > > topology ID may be not reflecting the "actual" most recent topologies > > > > if some zombie members holding an old topology form a group generation > > > > quickly enough, which would effectively mean that zombie members > > > > actually blocking other real members from getting tasks assigned. On > > > > the other hand, like you've mentioned already in the doc, requesting > > > > some sort of ID ordering by pushing the burden on the user's side > > > > would also be too much for users, increasing the risk of human errors > > > > in operations. > > > > > > > > I'm wondering if instead of trying to be smart programmingly, we just > > > > let the protocol to act dumbly (details below). The main reasons I had > > > > in mind are: > > > > > > > > 1) Upon topology changes, some tasks may no longer exist in the new > > > > topology, so still letting them execute on the clients which do not > > > > yet have the new topology would waste resources. > > > > > > > > 2) As we discussed, trying to act smart introduces more complexities > > > > in the coordinator that tries to balance different assignment goals > > > > between stickiness, balance, and now topology mis-matches between > > > > clients. > > > > > > > > 3) Scenarios that mismatching topologies be observed within a group > > > > generation: > > > > a. Zombie / old clients that do not have the new topology, and will > > > > never have. > > > > b. During a rolling bounce upgrade, where not-yet-bounced clients > > > > would not yet have the new topology. > > > > c. Let's assume we would not ever have scenarios where users want > > > > to intentionally have a subset of clients within a group running a > > > > partial / subset of the full sub-topologies, since such cases can well > > > > be covered by a custom assignor that takes into those considerations > > > > by never assigning some tasks to some clients etc. That means, the > > > > only scenarios we would need to consider are a) and b). > > > > > > > > For b), I think it's actually okay to temporarily block the progress > > > > of the group until everyone is bounced with the updated topology; as > > > > for a), originally I thought having one or a few clients blocking the > > > > whole group would be a big problem, but now that I think more, I felt > > > > from the operations point of view, just letting the app being blocked > > > > with a informational log entry to quickly ping-down the zombie clients > > > > may actually be acceptable. All in all, it makes the code simpler > > > > programmingly by not trying to abstract away issue scenario a) from > > > > the users (or operators) but letting them know asap. > > > > > > > > ---------- > > > > > > > > Other than that, everything else looks good to me. > > > > > > > > > > > > Guozhang > > > > > > > > > > > > On Fri, Aug 16, 2024 at 7:38 AM Nick Telford <nick.telf...@gmail.com> > > > > wrote: > > > > > > > > > > Hi Lucas, > > > > > > > > > > NT4. > > > > > Given that the new assignment procedure guarantees that a Task has > > > > > been > > > > > closed before it is assigned to a different client, I don't think > > > > > there > > > > > should be a problem with concurrent access? I don't think we should > > > > > worry > > > > > too much about 1035 here, as it's orthogonal to 1071. I don't think > > > > > that > > > > > 1035 *requires* the locking, and indeed once 1071 is the only > > > > > assignment > > > > > mechanism, we should be able to do away with the locking completely (I > > > > > think). > > > > > > > > > > Anyway, given your point about it not being possible to guarantee > > > > > disjoint > > > > > sets, does it make sense to require clients to continue to supply the > > > > > lags > > > > > for only a subset of the dormant Tasks on-disk? Wouldn't it be > > > > > simpler to > > > > > just have them supply everything, since the assignor has to handle > > > > > overlapping sets anyway? > > > > > > > > > > Cheers, > > > > > Nick > > > > > > > > > > On Fri, 16 Aug 2024 at 13:51, Lucas Brutschy > > > > > <lbruts...@confluent.io.invalid> > > > > > wrote: > > > > > > > > > > > Hi Nick, > > > > > > > > > > > > NT4. I think it will be hard anyway to ensure that the assignor > > > > > > always > > > > > > gets disjoint sets (there is no synchronized rebalance point > > > > > > anymore, > > > > > > so locks wouldn't prevent two clients reporting the same dormant > > > > > > task). So I think we'll have to lift this restriction. I was > > > > > > thinking > > > > > > more that locking is required to prevent concurrent access. In > > > > > > particular, I was expecting that the lock will avoid two threads > > > > > > opening the same RocksDB in KIP-1035. Wouldn't this cause problems? > > > > > > > > > > > > Cheers, > > > > > > Lucas > > > > > > > > > > > > On Fri, Aug 16, 2024 at 11:34 AM Nick Telford > > > > > > <nick.telf...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > > Hi Lucas, > > > > > > > > > > > > > > NT4. > > > > > > > The reason I mentioned this was because, while implementing 1035, > > > > > > > I > > > > > > > stumbled across a problem: initially I had changed it so that > > > > > > > threads > > > > > > > always reported the lag for *all* dormant Tasks on-disk, even if > > > > > > > it meant > > > > > > > multiple threads reporting lag for the same Tasks. I found that > > > > > > > this > > > > > > didn't > > > > > > > work, apparently because the assignor assumes that multiple > > > > > > > threads on > > > > > > the > > > > > > > same instance always report disjoint sets. > > > > > > > > > > > > > > From reading through 1071, it sounded like this assumption is no > > > > > > > longer > > > > > > > being made by the assignor, and that the processId field would > > > > > > > allow the > > > > > > > assignor to understand when multiple clients reporting lag for > > > > > > > the same > > > > > > > Tasks are on the same instance. This would enable us to do away > > > > > > > with the > > > > > > > locking when reporting lag, and just have threads report the lag > > > > > > > for > > > > > > every > > > > > > > Task on-disk, even if other threads are reporting lag for the > > > > > > > same Tasks. > > > > > > > > > > > > > > But it sounds like this is not correct, and that the new assignor > > > > > > > will > > > > > > make > > > > > > > the same assumptions as the old one? > > > > > > > > > > > > > > Regards, > > > > > > > Nick > > > > > > > > > > > > > > On Fri, 16 Aug 2024 at 10:17, Lucas Brutschy > > > > > > > <lbruts...@confluent.io > > > > > > .invalid> > > > > > > > wrote: > > > > > > > > > > > > > > > Hi Nick! > > > > > > > > > > > > > > > > Thanks for getting involved in the discussion. > > > > > > > > > > > > > > > > NT1. We are always referring to offsets in the changelog topics > > > > > > > > here. > > > > > > > > I tried to make it more consistent. But in the schemas and API, > > > > > > > > I find > > > > > > > > "task changelog end offset" a bit lengthy, so we use "task > > > > > > > > offset" and > > > > > > > > "task end offset" for short. We could change it, if people > > > > > > > > think this > > > > > > > > is confusing. > > > > > > > > > > > > > > > > NT2. You are right. The confusing part is that the current > > > > > > > > streams > > > > > > > > config is called `max.warmup.replicas`, but in the new > > > > > > > > protocol, we > > > > > > > > are bounding the group-level parameter using > > > > > > > > `group.streams.max.warmup.replicas`. If we wanted to keep > > > > > > > > `group.streams.max.warmup.replicas` for the config name on the > > > > > > > > group-level, we'd have to bound it using > > > > > > > > `group.streams.max.max.warmup.replicas`. I prefer not doing > > > > > > > > this, but > > > > > > > > open to suggestions. > > > > > > > > > > > > > > > > NT3. You are right, we do not need to make it this restrictive. > > > > > > > > I > > > > > > > > think the main problem with having 10,000 warm-up replicas > > > > > > > > would be > > > > > > > > that it slows down the assignment inside the broker - once we > > > > > > > > are > > > > > > > > closer to production-ready implementation, we may have better > > > > > > > > numbers > > > > > > > > of this and may revisit these defaults. I'll set the max to 100 > > > > > > > > for > > > > > > > > now, but it would be good to hear what values people typically > > > > > > > > use in > > > > > > > > their production workloads. > > > > > > > > > > > > > > > > NT4. We will actually only report the offsets if we manage to > > > > > > > > acquire > > > > > > > > the lock. I tried to make this more precise. I suppose also with > > > > > > > > KIP-1035, we'd require the lock to read the offset? > > > > > > > > > > > > > > > > Cheers, > > > > > > > > Lucas > > > > > > > > > > > > > > > > On Thu, Aug 15, 2024 at 8:40 PM Nick Telford > > > > > > > > <nick.telf...@gmail.com> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > Hi everyone, > > > > > > > > > > > > > > > > > > Looks really promising, and I can see this resolving several > > > > > > > > > issues > > > > > > I've > > > > > > > > > noticed. I particularly like the choice to use a String for > > > > > > Subtopology > > > > > > > > ID, > > > > > > > > > as it will (eventually) lead to a better solution to KIP-816. > > > > > > > > > > > > > > > > > > I noticed a few typos in the KIP that I thought I'd mention: > > > > > > > > > > > > > > > > > > NT1. > > > > > > > > > In several places you refer to "task changelog end offsets", > > > > > > > > > while in > > > > > > > > > others, you call it "task end offsets". Which is it? > > > > > > > > > > > > > > > > > > NT2. > > > > > > > > > Under "Group Configurations", you included > > > > > > > > > "group.streams.max.warmup.replicas", but I think you meant > > > > > > > > > "group.streams.num.warmup.replicas"? > > > > > > > > > > > > > > > > > > NT3. > > > > > > > > > Not a typo, but a suggestion: it makes sense to set the > > > > > > > > > default for > > > > > > > > > "group.streams.num.warmup.replicas" to 2, for compatibility > > > > > > > > > with the > > > > > > > > > existing defaults, but why set the default for > > > > > > > > > "group.streams.max.warmup.replicas" to only 4? That seems > > > > > > > > > extremely > > > > > > > > > restrictive. These "max" configs are typically used to > > > > > > > > > prevent a > > > > > > subset > > > > > > > > of > > > > > > > > > users causing problems on the shared broker cluster - what's > > > > > > > > > the > > > > > > reason > > > > > > > > to > > > > > > > > > set such a restrictive value for max warmup replicas? If I > > > > > > > > > had 10,000 > > > > > > > > > warmup replicas, would it cause a noticeable problem on the > > > > > > > > > brokers? > > > > > > > > > > > > > > > > > > NT4. > > > > > > > > > It's implied that clients send the changelog offsets for *all* > > > > > > dormant > > > > > > > > > stateful Tasks, but the current behaviour is that clients > > > > > > > > > will only > > > > > > send > > > > > > > > > the changelog offsets for the stateful Tasks that they are > > > > > > > > > able to > > > > > > lock > > > > > > > > > on-disk. Since this is a change in behaviour, perhaps this > > > > > > > > > should be > > > > > > > > called > > > > > > > > > out explicitly? > > > > > > > > > > > > > > > > > > Regards, > > > > > > > > > Nick > > > > > > > > > > > > > > > > > > On Thu, 15 Aug 2024 at 10:55, Lucas Brutschy > > > > > > > > > <lbruts...@confluent.io > > > > > > > > .invalid> > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hi Andrew, > > > > > > > > > > > > > > > > > > > > thanks for the comment. > > > > > > > > > > > > > > > > > > > > AS12: I clarified the command-line interface. It's supposed > > > > > > > > > > to be > > > > > > used > > > > > > > > > > with --reset-offsets and --delete-offsets. I removed > > > > > > > > > > --topic. > > > > > > > > > > > > > > > > > > > > AS13: Yes, it's --delete. I clarified the command-line > > > > > > > > > > interface. > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > Lucas > > > > > > > > > > > > > > > > > > > > On Tue, Aug 13, 2024 at 4:14 PM Andrew Schofield > > > > > > > > > > <andrew_schofi...@live.com> wrote: > > > > > > > > > > > > > > > > > > > > > > Hi Lucas, > > > > > > > > > > > Thanks for the KIP update. > > > > > > > > > > > > > > > > > > > > > > I think that `kafka-streams-groups.sh` looks like a good > > > > > > equivalent > > > > > > > > to > > > > > > > > > > > the tools for the other types of groups. > > > > > > > > > > > > > > > > > > > > > > AS12: In kafka-streams-groups.sh, the description for the > > > > > > > > > > > --input-topics option seems insufficient. Why is an input > > > > > > > > > > > topic > > > > > > > > specified > > > > > > > > > > > with this option different than a topic specified with > > > > > > > > > > > --topic? > > > > > > Why > > > > > > > > is > > > > > > > > > > > It --input-topics rather than --input-topic? Which action > > > > > > > > > > > of this > > > > > > > > tool > > > > > > > > > > > does this option apply to? > > > > > > > > > > > > > > > > > > > > > > AS13: Similarly, for --internal-topics, which action of > > > > > > > > > > > the tool > > > > > > > > does it > > > > > > > > > > > apply to? I suppose it’s --delete, but it’s not clear to > > > > > > > > > > > me. > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > Andrew > > > > > > > > > > > > > > > > > > > > > > > On 11 Aug 2024, at 12:10, Lucas Brutschy < > > > > > > lbruts...@confluent.io > > > > > > > > .INVALID> > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > Hi Andrew/Lianet, > > > > > > > > > > > > > > > > > > > > > > > > I have added an administrative command-line tool > > > > > > > > > > > > (replacing > > > > > > > > > > > > `kafka-streams-application-reset`) and extensions of > > > > > > > > > > > > the Admin > > > > > > API > > > > > > > > for > > > > > > > > > > > > listing, deleting, describing groups and listing, > > > > > > > > > > > > altering and > > > > > > > > > > > > deleting offsets for streams groups. No new RPCs have > > > > > > > > > > > > to be > > > > > > added, > > > > > > > > > > > > however, we duplicate some of the API in the admin > > > > > > > > > > > > client that > > > > > > > > exist > > > > > > > > > > > > for consumer groups. It seems to me cleaner to > > > > > > > > > > > > duplicate some > > > > > > > > > > > > code/interface here, instead of using "consumer group" > > > > > > > > > > > > APIs for > > > > > > > > > > > > streams groups, or renaming existing APIs that use > > > > > > "consumerGroup" > > > > > > > > in > > > > > > > > > > > > the name to something more generic (which wouldn't > > > > > > > > > > > > cover share > > > > > > > > > > > > groups). > > > > > > > > > > > > > > > > > > > > > > > > I think for now, all comments are addressed. > > > > > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > > > Lucas > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Aug 6, 2024 at 3:19 PM Lucas Brutschy < > > > > > > > > lbruts...@confluent.io> > > > > > > > > > > wrote: > > > > > > > > > > > >> > > > > > > > > > > > >> Hi Lianet and Andrew, > > > > > > > > > > > >> > > > > > > > > > > > >> LM1/LM2: You are right. The idea is to omit fields > > > > > > > > > > > >> exactly in > > > > > > the > > > > > > > > same > > > > > > > > > > > >> situations as in KIP-848. In the KIP, I stuck with how > > > > > > > > > > > >> the > > > > > > > > behavior > > > > > > > > > > > >> was defined in KIP-848 (e.g. KIP-848 defined that that > > > > > > instance ID > > > > > > > > > > > >> will be omitted if it did not change since the last > > > > > > heartbeat). > > > > > > > > But > > > > > > > > > > > >> you are correct that the implementation handles these > > > > > > > > > > > >> details > > > > > > > > slightly > > > > > > > > > > > >> differently. I updated the KIP to match more closely > > > > > > > > > > > >> the > > > > > > behavior > > > > > > > > of > > > > > > > > > > > >> the KIP-848 implementation. > > > > > > > > > > > >> > > > > > > > > > > > >> LM9: Yes, there are several options to do this. The > > > > > > > > > > > >> idea is to > > > > > > > > have > > > > > > > > > > > >> only one client initialize the topology, not all > > > > > > > > > > > >> clients. It > > > > > > seems > > > > > > > > > > > >> easier to understand on the protocol level (otherwise > > > > > > > > > > > >> we'd > > > > > > have N > > > > > > > > > > > >> topology initializations racing with a > > > > > > > > > > > >> hard-to-determine > > > > > > winner). > > > > > > > > We > > > > > > > > > > > >> also expect the payload of the request to grow in the > > > > > > > > > > > >> future > > > > > > and > > > > > > > > want > > > > > > > > > > > >> to avoid the overhead of having all clients sending the > > > > > > topology > > > > > > > > at > > > > > > > > > > > >> the same time. But initializing the group could take > > > > > > > > > > > >> some > > > > > > time - > > > > > > > > we > > > > > > > > > > > >> have to create internal topics, and maybe a client is > > > > > > > > malfunctioning > > > > > > > > > > > >> and the initialization has to be retried. It seemed a > > > > > > > > > > > >> bit > > > > > > > > confusing to > > > > > > > > > > > >> return errors to all other clients that are trying to > > > > > > > > > > > >> join the > > > > > > > > group > > > > > > > > > > > >> during that time - as if there was a problem with > > > > > > > > > > > >> joining the > > > > > > > > group / > > > > > > > > > > > >> the contents of the heartbeat. It seems cleaner to me > > > > > > > > > > > >> to let > > > > > > all > > > > > > > > > > > >> clients successfully join the group and heartbeat, but > > > > > > > > > > > >> remain > > > > > > in > > > > > > > > an > > > > > > > > > > > >> INITIALIZING state which does not yet assign any > > > > > > > > > > > >> tasks. Does > > > > > > that > > > > > > > > make > > > > > > > > > > > >> sense to you? You are right that returning a retriable > > > > > > > > > > > >> error > > > > > > and > > > > > > > > > > > >> having all clients retry until the group is > > > > > > > > > > > >> initialized would > > > > > > also > > > > > > > > > > > >> work, it just doesn't model well that "everything is > > > > > > > > > > > >> going > > > > > > > > according > > > > > > > > > > > >> to plan". > > > > > > > > > > > >> As for the order of the calls - yes, I think it is > > > > > > > > > > > >> fine to > > > > > > allow > > > > > > > > an > > > > > > > > > > > >> Initialize RPC before the first heartbeat for > > > > > > > > > > > >> supporting > > > > > > future > > > > > > > > admin > > > > > > > > > > > >> tools. I made this change throughout the KIP, thanks! > > > > > > > > > > > >> > > > > > > > > > > > >> AS11: Yes, your understanding is correct. The number > > > > > > > > > > > >> of tasks > > > > > > for > > > > > > > > one > > > > > > > > > > > >> subtopology is the maximum number of partitions in any > > > > > > > > > > > >> of the > > > > > > > > matched > > > > > > > > > > > >> topics. What will happen in Kafka Streams is that the > > > > > > partitions > > > > > > > > of > > > > > > > > > > > >> the matched topics will effectively be merged during > > > > > > > > > > > >> stream > > > > > > > > > > > >> processing, so in your example, subtopology:0 would > > > > > > > > > > > >> consume > > > > > > from > > > > > > > > AB:0 > > > > > > > > > > > >> and AC:0. > > > > > > > > > > > >> > > > > > > > > > > > >> Cheers, > > > > > > > > > > > >> Lucas > > > > > > > > > > > >> > > > > > > > > > > > >> On Fri, Aug 2, 2024 at 9:47 PM Lianet M. > > > > > > > > > > > >> <liane...@gmail.com> > > > > > > > > wrote: > > > > > > > > > > > >>> > > > > > > > > > > > >>> Hi Bruno, answering your questions: > > > > > > > > > > > >>> > > > > > > > > > > > >>> About the full heartbeat (LM1): I just wanted to > > > > > > > > > > > >>> confirm that > > > > > > > > you'll > > > > > > > > > > be > > > > > > > > > > > >>> sending full HBs in case of errors in general. It's > > > > > > > > > > > >>> not clear > > > > > > > > from > > > > > > > > > > the KIP, > > > > > > > > > > > >>> since it referred to sending Id/epoch and whatever had > > > > > > changed > > > > > > > > since > > > > > > > > > > the > > > > > > > > > > > >>> last HB only. Sending full HB on error is key to > > > > > > > > > > > >>> ensure fresh > > > > > > > > > > rejoins after > > > > > > > > > > > >>> fencing for instance, and retries with all relevant > > > > > > > > > > > >>> info. > > > > > > > > > > > >>> > > > > > > > > > > > >>> About the instanceId (LM2): The instanceId is needed > > > > > > > > > > > >>> on > > > > > > every HB > > > > > > > > to > > > > > > > > > > be able > > > > > > > > > > > >>> to identify a member using one that is already taken. > > > > > > > > > > > >>> On > > > > > > every > > > > > > > > HB, > > > > > > > > > > the > > > > > > > > > > > >>> broker uses the instance id (if any) to retrieve the > > > > > > > > > > > >>> member > > > > > > ID > > > > > > > > > > associated > > > > > > > > > > > >>> with it, and checks it against the memberId received > > > > > > > > > > > >>> in the > > > > > > HB > > > > > > > > > > > >>> (throwing UnreleasedInstance exception if needed). So > > > > > > similar to > > > > > > > > my > > > > > > > > > > > >>> previous point, just wanted to confirm that we are > > > > > > considering > > > > > > > > that > > > > > > > > > > here > > > > > > > > > > > >>> too. > > > > > > > > > > > >>> > > > > > > > > > > > >>> Now some other thoughts: > > > > > > > > > > > >>> > > > > > > > > > > > >>> LM9: Definitely interesting imo if we can avoid the > > > > > > dependency > > > > > > > > > > between the > > > > > > > > > > > >>> StreamsGroupInitialize and the StreamsGroupHeartbeat. > > > > > > > > > > > >>> I > > > > > > totally > > > > > > > > get > > > > > > > > > > that > > > > > > > > > > > >>> the initial client implementation will do a HB first, > > > > > > > > > > > >>> and > > > > > > that's > > > > > > > > > > fine, but > > > > > > > > > > > >>> not having the flow enforced at the protocol level > > > > > > > > > > > >>> would > > > > > > allow > > > > > > > > for > > > > > > > > > > further > > > > > > > > > > > >>> improvement in the future (that initialize via admin > > > > > > > > > > > >>> idea you > > > > > > > > > > mentioned, > > > > > > > > > > > >>> for instance). Actually, I may be missing something > > > > > > > > > > > >>> about > > > > > > the HB, > > > > > > > > > > but if we > > > > > > > > > > > >>> are at the point where HB requires that the topology > > > > > > > > > > > >>> has been > > > > > > > > > > initialized, > > > > > > > > > > > >>> and the topology init requires the group, why is it > > > > > > > > > > > >>> the > > > > > > heartbeat > > > > > > > > > > RPC the > > > > > > > > > > > >>> one responsible for the group creation? (vs. > > > > > > > > StreamsGroupInitialize > > > > > > > > > > creates > > > > > > > > > > > >>> group if needed + HB just fails if topology not > > > > > > > > > > > >>> initialized) > > > > > > > > > > > >>> > > > > > > > > > > > >>> Thanks! > > > > > > > > > > > >>> > > > > > > > > > > > >>> Lianet > > > > > > > > > > > >>> (I didn't miss your answer on my INVALID_GROUP_TYPE > > > > > > > > > > > >>> proposal, > > > > > > > > just > > > > > > > > > > still > > > > > > > > > > > >>> thinking about it in sync with the same discussion > > > > > > > > > > > >>> we're > > > > > > having > > > > > > > > on > > > > > > > > > > the > > > > > > > > > > > >>> KIP-1043 thread...I'll come back on that) > > > > > > > > > > > >>> > > > > > > > > > > > >>> On Thu, Aug 1, 2024 at 10:55 AM Andrew Schofield < > > > > > > > > > > andrew_schofi...@live.com> > > > > > > > > > > > >>> wrote: > > > > > > > > > > > >>> > > > > > > > > > > > >>>> Hi Bruno, > > > > > > > > > > > >>>> Thanks for adding the detail on the schemas on > > > > > > > > > > > >>>> records > > > > > > written > > > > > > > > to > > > > > > > > > > > >>>> __consumer_offsets. > > > > > > > > > > > >>>> I’ve reviewed them in detail and they look good to > > > > > > > > > > > >>>> me. I > > > > > > have > > > > > > > > one > > > > > > > > > > naive > > > > > > > > > > > >>>> question. > > > > > > > > > > > >>>> > > > > > > > > > > > >>>> AS11: I notice that an assignment is essentially a > > > > > > > > > > > >>>> set of > > > > > > > > partition > > > > > > > > > > > >>>> indices for > > > > > > > > > > > >>>> subtopologies. Since a subtopology can be defined by > > > > > > > > > > > >>>> a > > > > > > source > > > > > > > > topic > > > > > > > > > > regex, > > > > > > > > > > > >>>> does > > > > > > > > > > > >>>> this mean that an assignment gives the same set of > > > > > > > > > > > >>>> partition > > > > > > > > > > indices for > > > > > > > > > > > >>>> all topics > > > > > > > > > > > >>>> which happen to match the regex? So, a subtopology > > > > > > > > > > > >>>> reading > > > > > > from > > > > > > > > A* > > > > > > > > > > that > > > > > > > > > > > >>>> matches > > > > > > > > > > > >>>> AB and AC would give the same set of partitions to > > > > > > > > > > > >>>> each > > > > > > task for > > > > > > > > > > both > > > > > > > > > > > >>>> topics, and > > > > > > > > > > > >>>> is not able to give AB:0 to one task and AC:0 to a > > > > > > > > > > > >>>> different > > > > > > > > task. > > > > > > > > > > Is this > > > > > > > > > > > >>>> correct? > > > > > > > > > > > >>>> > > > > > > > > > > > >>>> Thanks, > > > > > > > > > > > >>>> Andrew > > > > > > > > > > > >>>> > > > > > > > > > > > >>>>> On 23 Jul 2024, at 16:16, Bruno Cadonna < > > > > > > cado...@apache.org> > > > > > > > > > > wrote: > > > > > > > > > > > >>>>> > > > > > > > > > > > >>>>> Hi Lianet, > > > > > > > > > > > >>>>> > > > > > > > > > > > >>>>> Thanks for the review! > > > > > > > > > > > >>>>> > > > > > > > > > > > >>>>> Here my answers: > > > > > > > > > > > >>>>> > > > > > > > > > > > >>>>> LM1. Is your question whether we need to send a full > > > > > > heartbeat > > > > > > > > > > each time > > > > > > > > > > > >>>> the member re-joins the group even if the > > > > > > > > > > > >>>> information in > > > > > > the RPC > > > > > > > > > > did not > > > > > > > > > > > >>>> change since the last heartbeat? > > > > > > > > > > > >>>>> > > > > > > > > > > > >>>>> LM2. Is the reason for sending the instance ID each > > > > > > > > > > > >>>>> time > > > > > > that a > > > > > > > > > > member > > > > > > > > > > > >>>> could shutdown, change the instance ID and then > > > > > > > > > > > >>>> start and > > > > > > > > heartbeat > > > > > > > > > > again, > > > > > > > > > > > >>>> but the group coordinator would never notice that the > > > > > > instance > > > > > > > > ID > > > > > > > > > > changed? > > > > > > > > > > > >>>>> > > > > > > > > > > > >>>>> LM3. I see your point. I am wondering whether this > > > > > > additional > > > > > > > > > > > >>>> information is worth the dependency between the group > > > > > > types. To > > > > > > > > > > return > > > > > > > > > > > >>>> INVALID_GROUP_TYPE, the group coordinator needs to > > > > > > > > > > > >>>> know > > > > > > that a > > > > > > > > > > group ID > > > > > > > > > > > >>>> exists with a different group type. With a group > > > > > > coordinator as > > > > > > > > we > > > > > > > > > > have it > > > > > > > > > > > >>>> now in Apache Kafka that manages all group types, > > > > > > > > > > > >>>> that is > > > > > > not a > > > > > > > > big > > > > > > > > > > deal, > > > > > > > > > > > >>>> but imagine if we (or some implementation of the > > > > > > > > > > > >>>> Apache > > > > > > Kafka > > > > > > > > > > protocol) > > > > > > > > > > > >>>> decide to have a separate group coordinator for each > > > > > > > > > > > >>>> group > > > > > > type. > > > > > > > > > > > >>>>> > > > > > > > > > > > >>>>> LM4. Using INVALID_GROUP_ID if the group ID is > > > > > > > > > > > >>>>> empty makes > > > > > > > > sense > > > > > > > > > > to me. > > > > > > > > > > > >>>> I going to change that. > > > > > > > > > > > >>>>> > > > > > > > > > > > >>>>> LM5. I think there is a dependency from the > > > > > > > > StreamsGroupInitialize > > > > > > > > > > RPC > > > > > > > > > > > >>>> to the heartbeat. The group must exist when the > > > > > > > > > > > >>>> initialize > > > > > > RPC > > > > > > > > is > > > > > > > > > > received > > > > > > > > > > > >>>> by the group coordinator. The group is created by the > > > > > > heartbeat > > > > > > > > > > RPC. I > > > > > > > > > > > >>>> would be in favor of making the initialize RPC > > > > > > > > > > > >>>> independent > > > > > > from > > > > > > > > the > > > > > > > > > > > >>>> heartbeat RPC. That would allow to initialize a > > > > > > > > > > > >>>> streams > > > > > > group > > > > > > > > > > explicitly > > > > > > > > > > > >>>> with an admin tool. > > > > > > > > > > > >>>>> > > > > > > > > > > > >>>>> LM6. I think it affects streams and streams should > > > > > > > > > > > >>>>> behave > > > > > > as > > > > > > > > the > > > > > > > > > > > >>>> consumer group. > > > > > > > > > > > >>>>> > > > > > > > > > > > >>>>> LM7. Good point that we will consider. > > > > > > > > > > > >>>>> > > > > > > > > > > > >>>>> LM8. Fixed! Thanks! > > > > > > > > > > > >>>>> > > > > > > > > > > > >>>>> > > > > > > > > > > > >>>>> Best, > > > > > > > > > > > >>>>> Bruno > > > > > > > > > > > >>>>> > > > > > > > > > > > >>>>> > > > > > > > > > > > >>>>> > > > > > > > > > > > >>>>> > > > > > > > > > > > >>>>> On 7/19/24 9:53 PM, Lianet M. wrote: > > > > > > > > > > > >>>>>> Hi Lucas/Bruno, thanks for the great KIP! First > > > > > > > > > > > >>>>>> comments: > > > > > > > > > > > >>>>>> LM1. Related to where the KIP says: *“Group ID, > > > > > > > > > > > >>>>>> member > > > > > > ID, > > > > > > > > > > member epoch > > > > > > > > > > > >>>>>> are sent with each heartbeat request. Any other > > > > > > information > > > > > > > > that > > > > > > > > > > has not > > > > > > > > > > > >>>>>> changed since the last heartbeat can be omitted.”. > > > > > > > > > > > >>>>>> *I > > > > > > expect > > > > > > > > all > > > > > > > > > > the > > > > > > > > > > > >>>> other > > > > > > > > > > > >>>>>> info also needs to be sent whenever a full > > > > > > > > > > > >>>>>> heartbeat is > > > > > > > > required > > > > > > > > > > (even > > > > > > > > > > > >>>> if > > > > > > > > > > > >>>>>> it didn’t change from the last heartbeat), ex. on > > > > > > > > > > > >>>>>> fencing > > > > > > > > > > scenarios, > > > > > > > > > > > >>>>>> correct? > > > > > > > > > > > >>>>>> LM2. For consumer groups we always send the > > > > > > groupInstanceId > > > > > > > > (if > > > > > > > > > > any) as > > > > > > > > > > > >>>>>> part of every heartbeat, along with memberId, > > > > > > > > > > > >>>>>> epoch and > > > > > > > > groupId. > > > > > > > > > > Should > > > > > > > > > > > >>>> we > > > > > > > > > > > >>>>>> consider that too here? > > > > > > > > > > > >>>>>> LM3. We’re proposing returning a > > > > > > > > > > > >>>>>> GROUP_ID_NOT_FOUND error > > > > > > in > > > > > > > > > > response to > > > > > > > > > > > >>>>>> the stream-specific RPCs if the groupId is > > > > > > > > > > > >>>>>> associated > > > > > > with a > > > > > > > > > > group type > > > > > > > > > > > >>>>>> that is not streams (ie. consumer group or share > > > > > > > > > > > >>>>>> group). I > > > > > > > > wonder > > > > > > > > > > if at > > > > > > > > > > > >>>>>> this point, where we're getting several new group > > > > > > > > > > > >>>>>> types > > > > > > added, > > > > > > > > > > each with > > > > > > > > > > > >>>>>> RPCs that are supposed to include groupId of a > > > > > > > > > > > >>>>>> certain > > > > > > type, > > > > > > > > we > > > > > > > > > > should > > > > > > > > > > > >>>> be > > > > > > > > > > > >>>>>> more explicit about this situation. Maybe a kind of > > > > > > > > > > INVALID_GROUP_TYPE > > > > > > > > > > > >>>>>> (group exists but not with a valid type for this > > > > > > > > > > > >>>>>> RPC) vs a > > > > > > > > > > > >>>>>> GROUP_ID_NOT_FOUND (group does not exist). Those > > > > > > > > > > > >>>>>> errors > > > > > > > > would be > > > > > > > > > > > >>>>>> consistently used across consumer, share, and > > > > > > > > > > > >>>>>> streams RPCs > > > > > > > > > > whenever the > > > > > > > > > > > >>>>>> group id is not of the expected type. > > > > > > > > > > > >>>>>> This is truly not specific to this KIP, and should > > > > > > > > > > > >>>>>> be > > > > > > > > addressed > > > > > > > > > > with all > > > > > > > > > > > >>>>>> group types and their RPCs in mind. I just wanted > > > > > > > > > > > >>>>>> to bring > > > > > > > > out my > > > > > > > > > > > >>>> concern > > > > > > > > > > > >>>>>> and get thoughts around it. > > > > > > > > > > > >>>>>> LM4. On a related note, StreamsGroupDescribe > > > > > > > > > > > >>>>>> returns > > > > > > > > > > INVALID_REQUEST if > > > > > > > > > > > >>>>>> groupId is empty. There is already an > > > > > > > > > > > >>>>>> INVALID_GROUP_ID > > > > > > error, > > > > > > > > > > that seems > > > > > > > > > > > >>>>>> more specific to this situation. Error handling of > > > > > > specific > > > > > > > > > > errors would > > > > > > > > > > > >>>>>> definitely be easier than having to deal with a > > > > > > > > > > > >>>>>> generic > > > > > > > > > > INVALID_REQUEST > > > > > > > > > > > >>>>>> (and probably its custom message). I know that for > > > > > > KIP-848 we > > > > > > > > have > > > > > > > > > > > >>>>>> INVALID_REQUEST for similar situations, so if ever > > > > > > > > > > > >>>>>> we take > > > > > > > > down > > > > > > > > > > this > > > > > > > > > > > >>>> path > > > > > > > > > > > >>>>>> we should review it there too for consistency. > > > > > > > > > > > >>>>>> Thoughts? > > > > > > > > > > > >>>>>> LM5. The dependency between the > > > > > > > > > > > >>>>>> StreamsGroupHeartbeat RPC > > > > > > and > > > > > > > > the > > > > > > > > > > > >>>>>> StreamsGroupInitialize RPC is one-way only right? > > > > > > > > > > > >>>>>> HB > > > > > > requires > > > > > > > > a > > > > > > > > > > previous > > > > > > > > > > > >>>>>> StreamsGroupInitialize request, but > > > > > > > > > > > >>>>>> StreamsGroupInitialize > > > > > > > > > > processing is > > > > > > > > > > > >>>>>> totally independent of heartbeats (and could > > > > > > > > > > > >>>>>> perfectly be > > > > > > > > > > processed > > > > > > > > > > > >>>> without > > > > > > > > > > > >>>>>> a previous HB, even though the client > > > > > > > > > > > >>>>>> implementation we’re > > > > > > > > > > proposing > > > > > > > > > > > >>>> won’t > > > > > > > > > > > >>>>>> go down that path). Is my understanding correct? > > > > > > > > > > > >>>>>> Just to > > > > > > > > double > > > > > > > > > > check, > > > > > > > > > > > >>>>>> seems sensible like that at the protocol level. > > > > > > > > > > > >>>>>> LM6. With KIP-848, there is an important > > > > > > > > > > > >>>>>> improvement that > > > > > > > > brings a > > > > > > > > > > > >>>>>> difference in behaviour around the static > > > > > > > > > > > >>>>>> membership: > > > > > > with the > > > > > > > > > > classic > > > > > > > > > > > >>>>>> protocol, if a static member joins with a group > > > > > > > > > > > >>>>>> instance > > > > > > > > already > > > > > > > > > > in > > > > > > > > > > > >>>> use, it > > > > > > > > > > > >>>>>> makes the initial member fail with a > > > > > > > > > > > >>>>>> FENCED_INSTANCED_ID > > > > > > > > > > exception, vs. > > > > > > > > > > > >>>>>> with the new consumer group protocol, the second > > > > > > > > > > > >>>>>> member > > > > > > > > trying to > > > > > > > > > > join > > > > > > > > > > > >>>>>> fails with an UNRELEASED_INSTANCE_ID. Does this > > > > > > > > > > > >>>>>> change > > > > > > need > > > > > > > > to be > > > > > > > > > > > >>>>>> considered in any way for the streams app? (I'm not > > > > > > familiar > > > > > > > > with > > > > > > > > > > KS > > > > > > > > > > > >>>> yet, > > > > > > > > > > > >>>>>> but thought it was worth asking. If it doesn't > > > > > > > > > > > >>>>>> affect in > > > > > > any > > > > > > > > way, > > > > > > > > > > still > > > > > > > > > > > >>>>>> maybe helpful to call it out on a section for > > > > > > > > > > > >>>>>> static > > > > > > > > membership) > > > > > > > > > > > >>>>>> LM7. Regarding the admin tool to manage streams > > > > > > > > > > > >>>>>> groups. > > > > > > We can > > > > > > > > > > discuss > > > > > > > > > > > >>>>>> whether to have it here or separately, but I think > > > > > > > > > > > >>>>>> we > > > > > > should > > > > > > > > aim > > > > > > > > > > for > > > > > > > > > > > >>>> some > > > > > > > > > > > >>>>>> basic admin capabilities from the start, mainly > > > > > > > > > > > >>>>>> because I > > > > > > > > believe > > > > > > > > > > it > > > > > > > > > > > >>>> will > > > > > > > > > > > >>>>>> be very helpful/needed in practice during the impl > > > > > > > > > > > >>>>>> of the > > > > > > KIP. > > > > > > > > > > From > > > > > > > > > > > >>>>>> experience with KIP-848, we felt a bit blindfolded > > > > > > > > > > > >>>>>> in the > > > > > > > > initial > > > > > > > > > > phase > > > > > > > > > > > >>>>>> where we still didn't have kafka-consumer-groups > > > > > > > > > > > >>>>>> dealing > > > > > > with > > > > > > > > the > > > > > > > > > > new > > > > > > > > > > > >>>>>> groups (and then it was very helpful and used when > > > > > > > > > > > >>>>>> we were > > > > > > > > able to > > > > > > > > > > > >>>> easily > > > > > > > > > > > >>>>>> inspect them from the console) > > > > > > > > > > > >>>>>> LM8. nit: the links the KIP-848 are not quite right > > > > > > (pointing > > > > > > > > to > > > > > > > > > > an > > > > > > > > > > > >>>>>> unrelated “Future work section” at the end of > > > > > > > > > > > >>>>>> KIP-848) > > > > > > > > > > > >>>>>> Thanks! > > > > > > > > > > > >>>>>> Lianet > > > > > > > > > > > >>>>>> On Fri, Jul 19, 2024 at 11:13 AM Lucas Brutschy > > > > > > > > > > > >>>>>> <lbruts...@confluent.io.invalid> wrote: > > > > > > > > > > > >>>>>>> Hi Andrew, > > > > > > > > > > > >>>>>>> > > > > > > > > > > > >>>>>>> AS2: I added a note for now. If others feel > > > > > > > > > > > >>>>>>> strongly > > > > > > about > > > > > > > > it, > > > > > > > > > > we can > > > > > > > > > > > >>>>>>> still add more administrative tools to the KIP - > > > > > > > > > > > >>>>>>> it > > > > > > should > > > > > > > > not > > > > > > > > > > change > > > > > > > > > > > >>>>>>> the overall story significantly. > > > > > > > > > > > >>>>>>> > > > > > > > > > > > >>>>>>> AS8: "streams.group.assignor.name" sounds good to > > > > > > > > > > > >>>>>>> me to > > > > > > > > > > distinguish > > > > > > > > > > > >>>>>>> the config from class names. Not sure if I like > > > > > > > > > > > >>>>>>> the > > > > > > > > "default". > > > > > > > > > > To be > > > > > > > > > > > >>>>>>> consistent, we'd then have to call it > > > > > > > > > > > >>>>>>> `group.streams.default.session.timeout.ms` as > > > > > > > > > > > >>>>>>> well. I > > > > > > only > > > > > > > > > > added the > > > > > > > > > > > >>>>>>> `.name` on both broker and group level for now. > > > > > > > > > > > >>>>>>> > > > > > > > > > > > >>>>>>> AS10: Ah, I misread your comment, now I know what > > > > > > > > > > > >>>>>>> you > > > > > > meant. > > > > > > > > Good > > > > > > > > > > > >>>>>>> point, fixed (by Bruno). > > > > > > > > > > > >>>>>>> > > > > > > > > > > > >>>>>>> Cheers, > > > > > > > > > > > >>>>>>> Lucas > > > > > > > > > > > >>>>>>> > > > > > > > > > > > >>>>>>> On Fri, Jul 19, 2024 at 4:44 PM Andrew Schofield > > > > > > > > > > > >>>>>>> <andrew_schofi...@live.com> wrote: > > > > > > > > > > > >>>>>>>> > > > > > > > > > > > >>>>>>>> Hi Lucas, > > > > > > > > > > > >>>>>>>> I see that I hit send too quickly. One more > > > > > > > > > > > >>>>>>>> comment: > > > > > > > > > > > >>>>>>>> > > > > > > > > > > > >>>>>>>> AS2: I think stating that there will be a > > > > > > > > > > `kafka-streams-group.sh` in > > > > > > > > > > > >>>> a > > > > > > > > > > > >>>>>>>> future KIP is fine to keep this KIP focused. > > > > > > Personally, I > > > > > > > > would > > > > > > > > > > > >>>> probably > > > > > > > > > > > >>>>>>>> put all of the gory details in this KIP, but > > > > > > > > > > > >>>>>>>> then it’s > > > > > > not > > > > > > > > my > > > > > > > > > > KIP. A > > > > > > > > > > > >>>>>>> future > > > > > > > > > > > >>>>>>>> pointer is fine too. > > > > > > > > > > > >>>>>>>> > > > > > > > > > > > >>>>>>>> Thanks, > > > > > > > > > > > >>>>>>>> Andrew > > > > > > > > > > > >>>>>>>> > > > > > > > > > > > >>>>>>>> > > > > > > > > > > > >>>>>>>>> On 19 Jul 2024, at 13:46, Lucas Brutschy < > > > > > > > > > > lbruts...@confluent.io > > > > > > > > > > > >>>> .INVALID> > > > > > > > > > > > >>>>>>> wrote: > > > > > > > > > > > >>>>>>>>> > > > > > > > > > > > >>>>>>>>> Hi Andrew, > > > > > > > > > > > >>>>>>>>> > > > > > > > > > > > >>>>>>>>> thanks for getting the discussion going! Here > > > > > > > > > > > >>>>>>>>> are my > > > > > > > > responses. > > > > > > > > > > > >>>>>>>>> > > > > > > > > > > > >>>>>>>>> AS1: Good point, done. > > > > > > > > > > > >>>>>>>>> > > > > > > > > > > > >>>>>>>>> AS2: We were planning to add more > > > > > > > > > > > >>>>>>>>> administrative tools > > > > > > to > > > > > > > > the > > > > > > > > > > > >>>>>>>>> interface in a follow-up KIP, to not make this > > > > > > > > > > > >>>>>>>>> KIP too > > > > > > > > large. > > > > > > > > > > If > > > > > > > > > > > >>>>>>>>> people think that it would help to understand > > > > > > > > > > > >>>>>>>>> the > > > > > > overall > > > > > > > > > > picture if > > > > > > > > > > > >>>>>>>>> we already add something like > > > > > > `kafka-streams-groups.sh`, we > > > > > > > > > > will do > > > > > > > > > > > >>>>>>>>> that. I also agree that we should address how > > > > > > > > > > > >>>>>>>>> this > > > > > > relates > > > > > > > > to > > > > > > > > > > > >>>>>>>>> KIP-1043, we'll add it. > > > > > > > > > > > >>>>>>>>> > > > > > > > > > > > >>>>>>>>> AS3: Good idea, that's more consistent with > > > > > > `assigning` and > > > > > > > > > > > >>>>>>> `reconciling` etc. > > > > > > > > > > > >>>>>>>>> > > > > > > > > > > > >>>>>>>>> AS4: Thanks, Fixed. > > > > > > > > > > > >>>>>>>>> > > > > > > > > > > > >>>>>>>>> AS5: Good catch. This was supposed to mean that > > > > > > > > > > > >>>>>>>>> we > > > > > > require > > > > > > > > > > CREATE on > > > > > > > > > > > >>>>>>>>> cluster or CREATE on all topics, not both. > > > > > > > > > > > >>>>>>>>> Fixed. > > > > > > > > > > > >>>>>>>>> > > > > > > > > > > > >>>>>>>>> AS6: Thanks, Fixed. > > > > > > > > > > > >>>>>>>>> > > > > > > > > > > > >>>>>>>>> AS7. Thanks, Fixed. > > > > > > > > > > > >>>>>>>>> > > > > > > > > > > > >>>>>>>>> AS8: I think this works a bit different in this > > > > > > > > > > > >>>>>>>>> KIP > > > > > > than in > > > > > > > > > > consumer > > > > > > > > > > > >>>>>>>>> groups. KIP-848 lets the members vote for a > > > > > > > > > > > >>>>>>>>> preferred > > > > > > > > > > assignor, and > > > > > > > > > > > >>>>>>>>> the broker-side assignor is picked by majority > > > > > > > > > > > >>>>>>>>> vote. > > > > > > The > > > > > > > > > > > >>>>>>>>> `group.consumer.assignors` specifies the list of > > > > > > assignors > > > > > > > > > > that are > > > > > > > > > > > >>>>>>>>> supported on the broker, and is configurable > > > > > > > > > > > >>>>>>>>> because > > > > > > the > > > > > > > > > > interface is > > > > > > > > > > > >>>>>>>>> pluggable. In this KIP, the task assignor is > > > > > > > > > > > >>>>>>>>> not voted > > > > > > on > > > > > > > > by > > > > > > > > > > members > > > > > > > > > > > >>>>>>>>> but configured on the broker-side. > > > > > > > > `group.streams.assignor` is > > > > > > > > > > used > > > > > > > > > > > >>>>>>>>> for this, and uses a specific name. If we'll > > > > > > > > > > > >>>>>>>>> make the > > > > > > task > > > > > > > > > > assignor > > > > > > > > > > > >>>>>>>>> pluggable on the broker-side, we'd introduce a > > > > > > > > > > > >>>>>>>>> separate > > > > > > > > config > > > > > > > > > > > >>>>>>>>> `group.streams.assignors`, which would indeed > > > > > > > > > > > >>>>>>>>> be a > > > > > > list of > > > > > > > > > > class > > > > > > > > > > > >>>>>>>>> names. I think there is no conflict here, the > > > > > > > > > > > >>>>>>>>> two > > > > > > > > > > configurations > > > > > > > > > > > >>>> serve > > > > > > > > > > > >>>>>>>>> different purposes. The only gripe I'd have > > > > > > > > > > > >>>>>>>>> here is > > > > > > > > naming as > > > > > > > > > > > >>>>>>>>> `group.streams.assignor` and > > > > > > > > > > > >>>>>>>>> `group.streams.assignors` > > > > > > > > would > > > > > > > > > > be a bit > > > > > > > > > > > >>>>>>>>> similar, but I cannot really think of a better > > > > > > > > > > > >>>>>>>>> name for > > > > > > > > > > > >>>>>>>>> `group.streams.assignor`, so I'd probably rather > > > > > > introduce > > > > > > > > > > > >>>>>>>>> `group.streams.assignors` as > > > > > > > > > > `group.streams.possible_assignors` or > > > > > > > > > > > >>>>>>>>> something like that. > > > > > > > > > > > >>>>>>>>> > > > > > > > > > > > >>>>>>>>> AS9: I added explanations for the various > > > > > > > > > > > >>>>>>>>> record types. > > > > > > > > Apart > > > > > > > > > > from > > > > > > > > > > > >>>> the > > > > > > > > > > > >>>>>>>>> new topology record, and the partition metadata > > > > > > > > > > > >>>>>>>>> (which > > > > > > is > > > > > > > > > > based on > > > > > > > > > > > >>>> the > > > > > > > > > > > >>>>>>>>> topology and can only be created once we have a > > > > > > topology > > > > > > > > > > initialized) > > > > > > > > > > > >>>>>>>>> the lifecycle for the records is basically > > > > > > > > > > > >>>>>>>>> identical > > > > > > as in > > > > > > > > > > KIP-848. > > > > > > > > > > > >>>>>>>>> > > > > > > > > > > > >>>>>>>>> AS10: In the consumer offset topic, the version > > > > > > > > > > > >>>>>>>>> in the > > > > > > key > > > > > > > > is > > > > > > > > > > used to > > > > > > > > > > > >>>>>>>>> differentiate different schema types with the > > > > > > > > > > > >>>>>>>>> same > > > > > > > > content. So > > > > > > > > > > the > > > > > > > > > > > >>>>>>>>> keys are not versioned, but the version field is > > > > > > "abused" > > > > > > > > as a > > > > > > > > > > type > > > > > > > > > > > >>>>>>>>> tag. This is the same in KIP-848, we followed > > > > > > > > > > > >>>>>>>>> it for > > > > > > > > > > consistency. > > > > > > > > > > > >>>>>>>>> > > > > > > > > > > > >>>>>>>>> Cheers, > > > > > > > > > > > >>>>>>>>> Lucas > > > > > > > > > > > >>>>>>>>> > > > > > > > > > > > >>>>>>>>> > > > > > > > > > > > >>>>>>>>> On Thu, Jul 18, 2024 at 1:27 PM Andrew Schofield > > > > > > > > > > > >>>>>>>>> <andrew_schofi...@live.com> wrote: > > > > > > > > > > > >>>>>>>>>> > > > > > > > > > > > >>>>>>>>>> Hi Lucas and Bruno, > > > > > > > > > > > >>>>>>>>>> > > > > > > > > > > > >>>>>>>>>> Thanks for the great KIP. > > > > > > > > > > > >>>>>>>>>> > > > > > > > > > > > >>>>>>>>>> I've read through the document and have some > > > > > > > > > > > >>>>>>>>>> initial > > > > > > > > comments. > > > > > > > > > > > >>>>>>>>>> > > > > > > > > > > > >>>>>>>>>> AS1: I suppose that there is a new > > > > > > > > > > o.a.k.common.GroupType.STREAMS > > > > > > > > > > > >>>>>>> enumeration > > > > > > > > > > > >>>>>>>>>> constant. This is a change to the public > > > > > > > > > > > >>>>>>>>>> interface and > > > > > > > > should > > > > > > > > > > be > > > > > > > > > > > >>>>>>> called out. > > > > > > > > > > > >>>>>>>>>> > > > > > > > > > > > >>>>>>>>>> AS2: Since streams groups are no longer > > > > > > > > > > > >>>>>>>>>> consumer > > > > > > groups, > > > > > > > > how > > > > > > > > > > does > > > > > > > > > > > >>>> the > > > > > > > > > > > >>>>>>> user > > > > > > > > > > > >>>>>>>>>> manipulate them, observe lag and so on? Will > > > > > > > > > > > >>>>>>>>>> you add > > > > > > > > > > > >>>>>>> `kafka-streams-groups.sh` > > > > > > > > > > > >>>>>>>>>> or extend > > > > > > > > > > > >>>>>>>>>> `kafka-streams-application-reset.sh`? Of > > > > > > course, > > > > > > > > > > KIP-1043 > > > > > > > > > > > >>>>>>> can easily > > > > > > > > > > > >>>>>>>>>> be extended to support streams groups, but > > > > > > > > > > > >>>>>>>>>> that only > > > > > > lets > > > > > > > > the > > > > > > > > > > user > > > > > > > > > > > >>>>>>> see the > > > > > > > > > > > >>>>>>>>>> groups, not manipulate them. > > > > > > > > > > > >>>>>>>>>> > > > > > > > > > > > >>>>>>>>>> AS3: I wonder whether the streams group state > > > > > > > > > > > >>>>>>>>>> of > > > > > > > > > > UNINITIALIZED would > > > > > > > > > > > >>>>>>> be > > > > > > > > > > > >>>>>>>>>> better expressed as INITIALIZING. > > > > > > > > > > > >>>>>>>>>> > > > > > > > > > > > >>>>>>>>>> AS4: In StreamsGroupInitializeRequest, > > > > > > > > > > Topology[].SourceTopicRegex > > > > > > > > > > > >>>>>>> should > > > > > > > > > > > >>>>>>>>>> be nullable. > > > > > > > > > > > >>>>>>>>>> > > > > > > > > > > > >>>>>>>>>> AS5: Why does StreamsGroupInitialize require > > > > > > > > > > > >>>>>>>>>> CREATE > > > > > > > > > > permission on > > > > > > > > > > > >>>> the > > > > > > > > > > > >>>>>>>>>> cluster resource? I imagine that this is one > > > > > > > > > > > >>>>>>>>>> of the > > > > > > ways > > > > > > > > that > > > > > > > > > > the > > > > > > > > > > > >>>>>>> request might > > > > > > > > > > > >>>>>>>>>> be granted permission to create the > > > > > > StateChangelogTopics > > > > > > > > and > > > > > > > > > > > >>>>>>>>>> RepartitionSourceTopics, but if it is granted > > > > > > permission > > > > > > > > to > > > > > > > > > > create > > > > > > > > > > > >>>>>>> those topics > > > > > > > > > > > >>>>>>>>>> with specific ACLs, would CREATE on the cluster > > > > > > resource > > > > > > > > > > still be > > > > > > > > > > > >>>>>>> required? > > > > > > > > > > > >>>>>>>>>> > > > > > > > > > > > >>>>>>>>>> AS6: StreamsGroupInitialize can also fail with > > > > > > > > > > > >>>>>>> TOPIC_AUTHORIZATION_FAILED > > > > > > > > > > > >>>>>>>>>> and (subject to AS5) > > > > > > > > > > > >>>>>>>>>> CLUSTER_AUTHORIZATION_FAILED. > > > > > > > > > > > >>>>>>>>>> > > > > > > > > > > > >>>>>>>>>> AS7: A tiny nit. You've used TopologyID > > > > > > > > > > > >>>>>>>>>> (capitals) in > > > > > > > > > > > >>>>>>> StreamsGroupHeartbeatRequest > > > > > > > > > > > >>>>>>>>>> and a few others, but in all other cases the > > > > > > > > > > > >>>>>>>>>> fields > > > > > > which > > > > > > > > are > > > > > > > > > > ids > > > > > > > > > > > >>>> are > > > > > > > > > > > >>>>>>> spelled Id. > > > > > > > > > > > >>>>>>>>>> I suggest TopologyId. > > > > > > > > > > > >>>>>>>>>> > > > > > > > > > > > >>>>>>>>>> Also, "interal" is probably meant to be > > > > > > > > > > > >>>>>>>>>> "interval”. > > > > > > > > > > > >>>>>>>>>> > > > > > > > > > > > >>>>>>>>>> AS8: For consumer groups, the > > > > > > `group.consumer.assignors` > > > > > > > > > > > >>>>>>> configuration is a > > > > > > > > > > > >>>>>>>>>> list of class names. The assignors do have > > > > > > > > > > > >>>>>>>>>> names too, > > > > > > but > > > > > > > > the > > > > > > > > > > > >>>>>>> configuration which > > > > > > > > > > > >>>>>>>>>> enables them is in terms of class names. I > > > > > > > > > > > >>>>>>>>>> wonder > > > > > > whether > > > > > > > > the > > > > > > > > > > > >>>> broker’s > > > > > > > > > > > >>>>>>>>>> group.streams.assignor could actually be > > > > > > > > > > `group.streams.assignors` > > > > > > > > > > > >>>>>>> and specified > > > > > > > > > > > >>>>>>>>>> as a list of the class names of the supplied > > > > > > assignors. I > > > > > > > > know > > > > > > > > > > > >>>> you're > > > > > > > > > > > >>>>>>> not supporting > > > > > > > > > > > >>>>>>>>>> other assignors yet, but when you do, I expect > > > > > > > > > > > >>>>>>>>>> you > > > > > > would > > > > > > > > > > prefer to > > > > > > > > > > > >>>>>>> have used class > > > > > > > > > > > >>>>>>>>>> names from the start. > > > > > > > > > > > >>>>>>>>>> > > > > > > > > > > > >>>>>>>>>> The use of assignor names in the other places > > > > > > > > > > > >>>>>>>>>> looks > > > > > > good > > > > > > > > to > > > > > > > > > > me. > > > > > > > > > > > >>>>>>>>>> > > > > > > > > > > > >>>>>>>>>> AS9: I'd find it really helpful to have a bit > > > > > > > > > > > >>>>>>>>>> of a > > > > > > > > > > description about > > > > > > > > > > > >>>>>>> the purpose and > > > > > > > > > > > >>>>>>>>>> lifecycle of the 9 record types you've > > > > > > > > > > > >>>>>>>>>> introduced on > > > > > > the > > > > > > > > > > > >>>>>>> __consumer_offsets topic. > > > > > > > > > > > >>>>>>>>>> I did a cursory review but without really > > > > > > understanding > > > > > > > > what's > > > > > > > > > > > >>>>>>> written when, > > > > > > > > > > > >>>>>>>>>> I can't do a thorough job of reviewing. > > > > > > > > > > > >>>>>>>>>> > > > > > > > > > > > >>>>>>>>>> AS10: In the definitions of the record keys, > > > > > > > > > > > >>>>>>>>>> such as > > > > > > > > > > > >>>>>>>>>> StreamsGroupCurrentMemberAssignmentKey, the > > > > > > > > > > > >>>>>>>>>> versions > > > > > > of > > > > > > > > the > > > > > > > > > > fields > > > > > > > > > > > >>>>>>> must > > > > > > > > > > > >>>>>>>>>> match the versions of the types. > > > > > > > > > > > >>>>>>>>>> > > > > > > > > > > > >>>>>>>>>> Thanks, > > > > > > > > > > > >>>>>>>>>> Andrew > > > > > > > > > > > >>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>> On 12 Jul 2024, at 09:04, Lucas Brutschy < > > > > > > > > > > lbruts...@confluent.io > > > > > > > > > > > >>>> .INVALID> > > > > > > > > > > > >>>>>>> wrote: > > > > > > > > > > > >>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>> Hi all, > > > > > > > > > > > >>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>> I would like to start a discussion thread on > > > > > > KIP-1071: > > > > > > > > > > Streams > > > > > > > > > > > >>>>>>>>>>> Rebalance Protocol. With this KIP, we aim to > > > > > > > > > > > >>>>>>>>>>> bring > > > > > > the > > > > > > > > > > principles > > > > > > > > > > > >>>>>>> laid > > > > > > > > > > > >>>>>>>>>>> down by KIP-848 to Kafka Streams, to make > > > > > > > > > > > >>>>>>>>>>> rebalances > > > > > > more > > > > > > > > > > reliable > > > > > > > > > > > >>>>>>> and > > > > > > > > > > > >>>>>>>>>>> scalable, and make Kafka Streams overall > > > > > > > > > > > >>>>>>>>>>> easier to > > > > > > > > deploy and > > > > > > > > > > > >>>>>>> operate. > > > > > > > > > > > >>>>>>>>>>> The KIP proposed moving the assignment logic > > > > > > > > > > > >>>>>>>>>>> to the > > > > > > > > broker, > > > > > > > > > > and > > > > > > > > > > > >>>>>>>>>>> introducing a dedicated group type and > > > > > > > > > > > >>>>>>>>>>> dedicated > > > > > > RPCs for > > > > > > > > > > Kafka > > > > > > > > > > > >>>>>>>>>>> Streams. > > > > > > > > > > > >>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>> The KIP is here: > > > > > > > > > > > >>>>>>>>>>> > > > > > > > > > > > >>>>>>> > > > > > > > > > > > >>>> > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1071%3A+Streams+Rebalance+Protocol > > > > > > > > > > > >>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>> This is joint work with Bruno Cadonna. > > > > > > > > > > > >>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>> Please take a look and let us know what you > > > > > > > > > > > >>>>>>>>>>> think. > > > > > > > > > > > >>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>> Best, > > > > > > > > > > > >>>>>>>>>>> Lucas > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >