Hi Lucas, NT4. Sounds good, although should it take the maximum offsets? Wouldn't it be more correct to take the *most recent* offsets? (i.e. the offsets from the more recently received heartbeat) My thinking is that it might be possible (albeit exceptionally rare) for the on-disk offsets to revert to a previous number, and taking the max would incorrectly assume the older offsets are correct.
Regards, Nick On Mon, 19 Aug 2024 at 15:00, Lucas Brutschy <lbruts...@confluent.io.invalid> wrote: > Hi Nick, > > NT4: As discussed, we will still require locking in the new protocol > to avoid concurrent read/write access on the checkpoint file, at least > as long as KIP-1035 hasn't landed. However, as you correctly pointed > out, the assignor will have to accept offsets for overlapping sets of > dormant tasks. I updated the KIP to make this explicit. If the > corresponding offset information for one task conflicts between > clients (which can happen), the conflict is resolved by taking the > maximum of the offsets. > > Cheers, > Lucas > > On Fri, Aug 16, 2024 at 7:14 PM Guozhang Wang > <guozhang.wang...@gmail.com> wrote: > > > > Hello Lucas, > > > > Thanks for the great KIP. I've read it through and it looks good to > > me. As we've discussed, much of my thoughts would be outside the scope > > of this very well scoped and defined KIP, so I will omit them for now. > > > > The only one I had related to this KIP is about topology updating. I > > understand the motivation of the proposal is that basically since each > > time group forming a (new) generation may potentially accept not all > > of the members joining because of the timing of the RPCs, the group's > > topology ID may be not reflecting the "actual" most recent topologies > > if some zombie members holding an old topology form a group generation > > quickly enough, which would effectively mean that zombie members > > actually blocking other real members from getting tasks assigned. On > > the other hand, like you've mentioned already in the doc, requesting > > some sort of ID ordering by pushing the burden on the user's side > > would also be too much for users, increasing the risk of human errors > > in operations. > > > > I'm wondering if instead of trying to be smart programmingly, we just > > let the protocol to act dumbly (details below). The main reasons I had > > in mind are: > > > > 1) Upon topology changes, some tasks may no longer exist in the new > > topology, so still letting them execute on the clients which do not > > yet have the new topology would waste resources. > > > > 2) As we discussed, trying to act smart introduces more complexities > > in the coordinator that tries to balance different assignment goals > > between stickiness, balance, and now topology mis-matches between > > clients. > > > > 3) Scenarios that mismatching topologies be observed within a group > generation: > > a. Zombie / old clients that do not have the new topology, and will > > never have. > > b. During a rolling bounce upgrade, where not-yet-bounced clients > > would not yet have the new topology. > > c. Let's assume we would not ever have scenarios where users want > > to intentionally have a subset of clients within a group running a > > partial / subset of the full sub-topologies, since such cases can well > > be covered by a custom assignor that takes into those considerations > > by never assigning some tasks to some clients etc. That means, the > > only scenarios we would need to consider are a) and b). > > > > For b), I think it's actually okay to temporarily block the progress > > of the group until everyone is bounced with the updated topology; as > > for a), originally I thought having one or a few clients blocking the > > whole group would be a big problem, but now that I think more, I felt > > from the operations point of view, just letting the app being blocked > > with a informational log entry to quickly ping-down the zombie clients > > may actually be acceptable. All in all, it makes the code simpler > > programmingly by not trying to abstract away issue scenario a) from > > the users (or operators) but letting them know asap. > > > > ---------- > > > > Other than that, everything else looks good to me. > > > > > > Guozhang > > > > > > On Fri, Aug 16, 2024 at 7:38 AM Nick Telford <nick.telf...@gmail.com> > wrote: > > > > > > Hi Lucas, > > > > > > NT4. > > > Given that the new assignment procedure guarantees that a Task has been > > > closed before it is assigned to a different client, I don't think there > > > should be a problem with concurrent access? I don't think we should > worry > > > too much about 1035 here, as it's orthogonal to 1071. I don't think > that > > > 1035 *requires* the locking, and indeed once 1071 is the only > assignment > > > mechanism, we should be able to do away with the locking completely (I > > > think). > > > > > > Anyway, given your point about it not being possible to guarantee > disjoint > > > sets, does it make sense to require clients to continue to supply the > lags > > > for only a subset of the dormant Tasks on-disk? Wouldn't it be simpler > to > > > just have them supply everything, since the assignor has to handle > > > overlapping sets anyway? > > > > > > Cheers, > > > Nick > > > > > > On Fri, 16 Aug 2024 at 13:51, Lucas Brutschy <lbruts...@confluent.io > .invalid> > > > wrote: > > > > > > > Hi Nick, > > > > > > > > NT4. I think it will be hard anyway to ensure that the assignor > always > > > > gets disjoint sets (there is no synchronized rebalance point anymore, > > > > so locks wouldn't prevent two clients reporting the same dormant > > > > task). So I think we'll have to lift this restriction. I was thinking > > > > more that locking is required to prevent concurrent access. In > > > > particular, I was expecting that the lock will avoid two threads > > > > opening the same RocksDB in KIP-1035. Wouldn't this cause problems? > > > > > > > > Cheers, > > > > Lucas > > > > > > > > On Fri, Aug 16, 2024 at 11:34 AM Nick Telford < > nick.telf...@gmail.com> > > > > wrote: > > > > > > > > > > Hi Lucas, > > > > > > > > > > NT4. > > > > > The reason I mentioned this was because, while implementing 1035, I > > > > > stumbled across a problem: initially I had changed it so that > threads > > > > > always reported the lag for *all* dormant Tasks on-disk, even if > it meant > > > > > multiple threads reporting lag for the same Tasks. I found that > this > > > > didn't > > > > > work, apparently because the assignor assumes that multiple > threads on > > > > the > > > > > same instance always report disjoint sets. > > > > > > > > > > From reading through 1071, it sounded like this assumption is no > longer > > > > > being made by the assignor, and that the processId field would > allow the > > > > > assignor to understand when multiple clients reporting lag for the > same > > > > > Tasks are on the same instance. This would enable us to do away > with the > > > > > locking when reporting lag, and just have threads report the lag > for > > > > every > > > > > Task on-disk, even if other threads are reporting lag for the same > Tasks. > > > > > > > > > > But it sounds like this is not correct, and that the new assignor > will > > > > make > > > > > the same assumptions as the old one? > > > > > > > > > > Regards, > > > > > Nick > > > > > > > > > > On Fri, 16 Aug 2024 at 10:17, Lucas Brutschy < > lbruts...@confluent.io > > > > .invalid> > > > > > wrote: > > > > > > > > > > > Hi Nick! > > > > > > > > > > > > Thanks for getting involved in the discussion. > > > > > > > > > > > > NT1. We are always referring to offsets in the changelog topics > here. > > > > > > I tried to make it more consistent. But in the schemas and API, > I find > > > > > > "task changelog end offset" a bit lengthy, so we use "task > offset" and > > > > > > "task end offset" for short. We could change it, if people think > this > > > > > > is confusing. > > > > > > > > > > > > NT2. You are right. The confusing part is that the current > streams > > > > > > config is called `max.warmup.replicas`, but in the new protocol, > we > > > > > > are bounding the group-level parameter using > > > > > > `group.streams.max.warmup.replicas`. If we wanted to keep > > > > > > `group.streams.max.warmup.replicas` for the config name on the > > > > > > group-level, we'd have to bound it using > > > > > > `group.streams.max.max.warmup.replicas`. I prefer not doing > this, but > > > > > > open to suggestions. > > > > > > > > > > > > NT3. You are right, we do not need to make it this restrictive. I > > > > > > think the main problem with having 10,000 warm-up replicas would > be > > > > > > that it slows down the assignment inside the broker - once we are > > > > > > closer to production-ready implementation, we may have better > numbers > > > > > > of this and may revisit these defaults. I'll set the max to 100 > for > > > > > > now, but it would be good to hear what values people typically > use in > > > > > > their production workloads. > > > > > > > > > > > > NT4. We will actually only report the offsets if we manage to > acquire > > > > > > the lock. I tried to make this more precise. I suppose also with > > > > > > KIP-1035, we'd require the lock to read the offset? > > > > > > > > > > > > Cheers, > > > > > > Lucas > > > > > > > > > > > > On Thu, Aug 15, 2024 at 8:40 PM Nick Telford < > nick.telf...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > > Hi everyone, > > > > > > > > > > > > > > Looks really promising, and I can see this resolving several > issues > > > > I've > > > > > > > noticed. I particularly like the choice to use a String for > > > > Subtopology > > > > > > ID, > > > > > > > as it will (eventually) lead to a better solution to KIP-816. > > > > > > > > > > > > > > I noticed a few typos in the KIP that I thought I'd mention: > > > > > > > > > > > > > > NT1. > > > > > > > In several places you refer to "task changelog end offsets", > while in > > > > > > > others, you call it "task end offsets". Which is it? > > > > > > > > > > > > > > NT2. > > > > > > > Under "Group Configurations", you included > > > > > > > "group.streams.max.warmup.replicas", but I think you meant > > > > > > > "group.streams.num.warmup.replicas"? > > > > > > > > > > > > > > NT3. > > > > > > > Not a typo, but a suggestion: it makes sense to set the > default for > > > > > > > "group.streams.num.warmup.replicas" to 2, for compatibility > with the > > > > > > > existing defaults, but why set the default for > > > > > > > "group.streams.max.warmup.replicas" to only 4? That seems > extremely > > > > > > > restrictive. These "max" configs are typically used to prevent > a > > > > subset > > > > > > of > > > > > > > users causing problems on the shared broker cluster - what's > the > > > > reason > > > > > > to > > > > > > > set such a restrictive value for max warmup replicas? If I had > 10,000 > > > > > > > warmup replicas, would it cause a noticeable problem on the > brokers? > > > > > > > > > > > > > > NT4. > > > > > > > It's implied that clients send the changelog offsets for *all* > > > > dormant > > > > > > > stateful Tasks, but the current behaviour is that clients will > only > > > > send > > > > > > > the changelog offsets for the stateful Tasks that they are > able to > > > > lock > > > > > > > on-disk. Since this is a change in behaviour, perhaps this > should be > > > > > > called > > > > > > > out explicitly? > > > > > > > > > > > > > > Regards, > > > > > > > Nick > > > > > > > > > > > > > > On Thu, 15 Aug 2024 at 10:55, Lucas Brutschy < > lbruts...@confluent.io > > > > > > .invalid> > > > > > > > wrote: > > > > > > > > > > > > > > > Hi Andrew, > > > > > > > > > > > > > > > > thanks for the comment. > > > > > > > > > > > > > > > > AS12: I clarified the command-line interface. It's supposed > to be > > > > used > > > > > > > > with --reset-offsets and --delete-offsets. I removed --topic. > > > > > > > > > > > > > > > > AS13: Yes, it's --delete. I clarified the command-line > interface. > > > > > > > > > > > > > > > > Cheers, > > > > > > > > Lucas > > > > > > > > > > > > > > > > On Tue, Aug 13, 2024 at 4:14 PM Andrew Schofield > > > > > > > > <andrew_schofi...@live.com> wrote: > > > > > > > > > > > > > > > > > > Hi Lucas, > > > > > > > > > Thanks for the KIP update. > > > > > > > > > > > > > > > > > > I think that `kafka-streams-groups.sh` looks like a good > > > > equivalent > > > > > > to > > > > > > > > > the tools for the other types of groups. > > > > > > > > > > > > > > > > > > AS12: In kafka-streams-groups.sh, the description for the > > > > > > > > > --input-topics option seems insufficient. Why is an input > topic > > > > > > specified > > > > > > > > > with this option different than a topic specified with > --topic? > > > > Why > > > > > > is > > > > > > > > > It --input-topics rather than --input-topic? Which action > of this > > > > > > tool > > > > > > > > > does this option apply to? > > > > > > > > > > > > > > > > > > AS13: Similarly, for --internal-topics, which action of > the tool > > > > > > does it > > > > > > > > > apply to? I suppose it’s --delete, but it’s not clear to > me. > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > Andrew > > > > > > > > > > > > > > > > > > > On 11 Aug 2024, at 12:10, Lucas Brutschy < > > > > lbruts...@confluent.io > > > > > > .INVALID> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > Hi Andrew/Lianet, > > > > > > > > > > > > > > > > > > > > I have added an administrative command-line tool > (replacing > > > > > > > > > > `kafka-streams-application-reset`) and extensions of the > Admin > > > > API > > > > > > for > > > > > > > > > > listing, deleting, describing groups and listing, > altering and > > > > > > > > > > deleting offsets for streams groups. No new RPCs have to > be > > > > added, > > > > > > > > > > however, we duplicate some of the API in the admin > client that > > > > > > exist > > > > > > > > > > for consumer groups. It seems to me cleaner to duplicate > some > > > > > > > > > > code/interface here, instead of using "consumer group" > APIs for > > > > > > > > > > streams groups, or renaming existing APIs that use > > > > "consumerGroup" > > > > > > in > > > > > > > > > > the name to something more generic (which wouldn't cover > share > > > > > > > > > > groups). > > > > > > > > > > > > > > > > > > > > I think for now, all comments are addressed. > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > Lucas > > > > > > > > > > > > > > > > > > > > On Tue, Aug 6, 2024 at 3:19 PM Lucas Brutschy < > > > > > > lbruts...@confluent.io> > > > > > > > > wrote: > > > > > > > > > >> > > > > > > > > > >> Hi Lianet and Andrew, > > > > > > > > > >> > > > > > > > > > >> LM1/LM2: You are right. The idea is to omit fields > exactly in > > > > the > > > > > > same > > > > > > > > > >> situations as in KIP-848. In the KIP, I stuck with how > the > > > > > > behavior > > > > > > > > > >> was defined in KIP-848 (e.g. KIP-848 defined that that > > > > instance ID > > > > > > > > > >> will be omitted if it did not change since the last > > > > heartbeat). > > > > > > But > > > > > > > > > >> you are correct that the implementation handles these > details > > > > > > slightly > > > > > > > > > >> differently. I updated the KIP to match more closely the > > > > behavior > > > > > > of > > > > > > > > > >> the KIP-848 implementation. > > > > > > > > > >> > > > > > > > > > >> LM9: Yes, there are several options to do this. The > idea is to > > > > > > have > > > > > > > > > >> only one client initialize the topology, not all > clients. It > > > > seems > > > > > > > > > >> easier to understand on the protocol level (otherwise > we'd > > > > have N > > > > > > > > > >> topology initializations racing with a hard-to-determine > > > > winner). > > > > > > We > > > > > > > > > >> also expect the payload of the request to grow in the > future > > > > and > > > > > > want > > > > > > > > > >> to avoid the overhead of having all clients sending the > > > > topology > > > > > > at > > > > > > > > > >> the same time. But initializing the group could take > some > > > > time - > > > > > > we > > > > > > > > > >> have to create internal topics, and maybe a client is > > > > > > malfunctioning > > > > > > > > > >> and the initialization has to be retried. It seemed a > bit > > > > > > confusing to > > > > > > > > > >> return errors to all other clients that are trying to > join the > > > > > > group > > > > > > > > > >> during that time - as if there was a problem with > joining the > > > > > > group / > > > > > > > > > >> the contents of the heartbeat. It seems cleaner to me > to let > > > > all > > > > > > > > > >> clients successfully join the group and heartbeat, but > remain > > > > in > > > > > > an > > > > > > > > > >> INITIALIZING state which does not yet assign any tasks. > Does > > > > that > > > > > > make > > > > > > > > > >> sense to you? You are right that returning a retriable > error > > > > and > > > > > > > > > >> having all clients retry until the group is initialized > would > > > > also > > > > > > > > > >> work, it just doesn't model well that "everything is > going > > > > > > according > > > > > > > > > >> to plan". > > > > > > > > > >> As for the order of the calls - yes, I think it is fine > to > > > > allow > > > > > > an > > > > > > > > > >> Initialize RPC before the first heartbeat for supporting > > > > future > > > > > > admin > > > > > > > > > >> tools. I made this change throughout the KIP, thanks! > > > > > > > > > >> > > > > > > > > > >> AS11: Yes, your understanding is correct. The number of > tasks > > > > for > > > > > > one > > > > > > > > > >> subtopology is the maximum number of partitions in any > of the > > > > > > matched > > > > > > > > > >> topics. What will happen in Kafka Streams is that the > > > > partitions > > > > > > of > > > > > > > > > >> the matched topics will effectively be merged during > stream > > > > > > > > > >> processing, so in your example, subtopology:0 would > consume > > > > from > > > > > > AB:0 > > > > > > > > > >> and AC:0. > > > > > > > > > >> > > > > > > > > > >> Cheers, > > > > > > > > > >> Lucas > > > > > > > > > >> > > > > > > > > > >> On Fri, Aug 2, 2024 at 9:47 PM Lianet M. < > liane...@gmail.com> > > > > > > wrote: > > > > > > > > > >>> > > > > > > > > > >>> Hi Bruno, answering your questions: > > > > > > > > > >>> > > > > > > > > > >>> About the full heartbeat (LM1): I just wanted to > confirm that > > > > > > you'll > > > > > > > > be > > > > > > > > > >>> sending full HBs in case of errors in general. It's > not clear > > > > > > from > > > > > > > > the KIP, > > > > > > > > > >>> since it referred to sending Id/epoch and whatever had > > > > changed > > > > > > since > > > > > > > > the > > > > > > > > > >>> last HB only. Sending full HB on error is key to > ensure fresh > > > > > > > > rejoins after > > > > > > > > > >>> fencing for instance, and retries with all relevant > info. > > > > > > > > > >>> > > > > > > > > > >>> About the instanceId (LM2): The instanceId is needed on > > > > every HB > > > > > > to > > > > > > > > be able > > > > > > > > > >>> to identify a member using one that is already taken. > On > > > > every > > > > > > HB, > > > > > > > > the > > > > > > > > > >>> broker uses the instance id (if any) to retrieve the > member > > > > ID > > > > > > > > associated > > > > > > > > > >>> with it, and checks it against the memberId received > in the > > > > HB > > > > > > > > > >>> (throwing UnreleasedInstance exception if needed). So > > > > similar to > > > > > > my > > > > > > > > > >>> previous point, just wanted to confirm that we are > > > > considering > > > > > > that > > > > > > > > here > > > > > > > > > >>> too. > > > > > > > > > >>> > > > > > > > > > >>> Now some other thoughts: > > > > > > > > > >>> > > > > > > > > > >>> LM9: Definitely interesting imo if we can avoid the > > > > dependency > > > > > > > > between the > > > > > > > > > >>> StreamsGroupInitialize and the StreamsGroupHeartbeat. I > > > > totally > > > > > > get > > > > > > > > that > > > > > > > > > >>> the initial client implementation will do a HB first, > and > > > > that's > > > > > > > > fine, but > > > > > > > > > >>> not having the flow enforced at the protocol level > would > > > > allow > > > > > > for > > > > > > > > further > > > > > > > > > >>> improvement in the future (that initialize via admin > idea you > > > > > > > > mentioned, > > > > > > > > > >>> for instance). Actually, I may be missing something > about > > > > the HB, > > > > > > > > but if we > > > > > > > > > >>> are at the point where HB requires that the topology > has been > > > > > > > > initialized, > > > > > > > > > >>> and the topology init requires the group, why is it the > > > > heartbeat > > > > > > > > RPC the > > > > > > > > > >>> one responsible for the group creation? (vs. > > > > > > StreamsGroupInitialize > > > > > > > > creates > > > > > > > > > >>> group if needed + HB just fails if topology not > initialized) > > > > > > > > > >>> > > > > > > > > > >>> Thanks! > > > > > > > > > >>> > > > > > > > > > >>> Lianet > > > > > > > > > >>> (I didn't miss your answer on my INVALID_GROUP_TYPE > proposal, > > > > > > just > > > > > > > > still > > > > > > > > > >>> thinking about it in sync with the same discussion > we're > > > > having > > > > > > on > > > > > > > > the > > > > > > > > > >>> KIP-1043 thread...I'll come back on that) > > > > > > > > > >>> > > > > > > > > > >>> On Thu, Aug 1, 2024 at 10:55 AM Andrew Schofield < > > > > > > > > andrew_schofi...@live.com> > > > > > > > > > >>> wrote: > > > > > > > > > >>> > > > > > > > > > >>>> Hi Bruno, > > > > > > > > > >>>> Thanks for adding the detail on the schemas on records > > > > written > > > > > > to > > > > > > > > > >>>> __consumer_offsets. > > > > > > > > > >>>> I’ve reviewed them in detail and they look good to > me. I > > > > have > > > > > > one > > > > > > > > naive > > > > > > > > > >>>> question. > > > > > > > > > >>>> > > > > > > > > > >>>> AS11: I notice that an assignment is essentially a > set of > > > > > > partition > > > > > > > > > >>>> indices for > > > > > > > > > >>>> subtopologies. Since a subtopology can be defined by a > > > > source > > > > > > topic > > > > > > > > regex, > > > > > > > > > >>>> does > > > > > > > > > >>>> this mean that an assignment gives the same set of > partition > > > > > > > > indices for > > > > > > > > > >>>> all topics > > > > > > > > > >>>> which happen to match the regex? So, a subtopology > reading > > > > from > > > > > > A* > > > > > > > > that > > > > > > > > > >>>> matches > > > > > > > > > >>>> AB and AC would give the same set of partitions to > each > > > > task for > > > > > > > > both > > > > > > > > > >>>> topics, and > > > > > > > > > >>>> is not able to give AB:0 to one task and AC:0 to a > different > > > > > > task. > > > > > > > > Is this > > > > > > > > > >>>> correct? > > > > > > > > > >>>> > > > > > > > > > >>>> Thanks, > > > > > > > > > >>>> Andrew > > > > > > > > > >>>> > > > > > > > > > >>>>> On 23 Jul 2024, at 16:16, Bruno Cadonna < > > > > cado...@apache.org> > > > > > > > > wrote: > > > > > > > > > >>>>> > > > > > > > > > >>>>> Hi Lianet, > > > > > > > > > >>>>> > > > > > > > > > >>>>> Thanks for the review! > > > > > > > > > >>>>> > > > > > > > > > >>>>> Here my answers: > > > > > > > > > >>>>> > > > > > > > > > >>>>> LM1. Is your question whether we need to send a full > > > > heartbeat > > > > > > > > each time > > > > > > > > > >>>> the member re-joins the group even if the information > in > > > > the RPC > > > > > > > > did not > > > > > > > > > >>>> change since the last heartbeat? > > > > > > > > > >>>>> > > > > > > > > > >>>>> LM2. Is the reason for sending the instance ID each > time > > > > that a > > > > > > > > member > > > > > > > > > >>>> could shutdown, change the instance ID and then start > and > > > > > > heartbeat > > > > > > > > again, > > > > > > > > > >>>> but the group coordinator would never notice that the > > > > instance > > > > > > ID > > > > > > > > changed? > > > > > > > > > >>>>> > > > > > > > > > >>>>> LM3. I see your point. I am wondering whether this > > > > additional > > > > > > > > > >>>> information is worth the dependency between the group > > > > types. To > > > > > > > > return > > > > > > > > > >>>> INVALID_GROUP_TYPE, the group coordinator needs to > know > > > > that a > > > > > > > > group ID > > > > > > > > > >>>> exists with a different group type. With a group > > > > coordinator as > > > > > > we > > > > > > > > have it > > > > > > > > > >>>> now in Apache Kafka that manages all group types, > that is > > > > not a > > > > > > big > > > > > > > > deal, > > > > > > > > > >>>> but imagine if we (or some implementation of the > Apache > > > > Kafka > > > > > > > > protocol) > > > > > > > > > >>>> decide to have a separate group coordinator for each > group > > > > type. > > > > > > > > > >>>>> > > > > > > > > > >>>>> LM4. Using INVALID_GROUP_ID if the group ID is empty > makes > > > > > > sense > > > > > > > > to me. > > > > > > > > > >>>> I going to change that. > > > > > > > > > >>>>> > > > > > > > > > >>>>> LM5. I think there is a dependency from the > > > > > > StreamsGroupInitialize > > > > > > > > RPC > > > > > > > > > >>>> to the heartbeat. The group must exist when the > initialize > > > > RPC > > > > > > is > > > > > > > > received > > > > > > > > > >>>> by the group coordinator. The group is created by the > > > > heartbeat > > > > > > > > RPC. I > > > > > > > > > >>>> would be in favor of making the initialize RPC > independent > > > > from > > > > > > the > > > > > > > > > >>>> heartbeat RPC. That would allow to initialize a > streams > > > > group > > > > > > > > explicitly > > > > > > > > > >>>> with an admin tool. > > > > > > > > > >>>>> > > > > > > > > > >>>>> LM6. I think it affects streams and streams should > behave > > > > as > > > > > > the > > > > > > > > > >>>> consumer group. > > > > > > > > > >>>>> > > > > > > > > > >>>>> LM7. Good point that we will consider. > > > > > > > > > >>>>> > > > > > > > > > >>>>> LM8. Fixed! Thanks! > > > > > > > > > >>>>> > > > > > > > > > >>>>> > > > > > > > > > >>>>> Best, > > > > > > > > > >>>>> Bruno > > > > > > > > > >>>>> > > > > > > > > > >>>>> > > > > > > > > > >>>>> > > > > > > > > > >>>>> > > > > > > > > > >>>>> On 7/19/24 9:53 PM, Lianet M. wrote: > > > > > > > > > >>>>>> Hi Lucas/Bruno, thanks for the great KIP! First > comments: > > > > > > > > > >>>>>> LM1. Related to where the KIP says: *“Group ID, > member > > > > ID, > > > > > > > > member epoch > > > > > > > > > >>>>>> are sent with each heartbeat request. Any other > > > > information > > > > > > that > > > > > > > > has not > > > > > > > > > >>>>>> changed since the last heartbeat can be omitted.”. > *I > > > > expect > > > > > > all > > > > > > > > the > > > > > > > > > >>>> other > > > > > > > > > >>>>>> info also needs to be sent whenever a full > heartbeat is > > > > > > required > > > > > > > > (even > > > > > > > > > >>>> if > > > > > > > > > >>>>>> it didn’t change from the last heartbeat), ex. on > fencing > > > > > > > > scenarios, > > > > > > > > > >>>>>> correct? > > > > > > > > > >>>>>> LM2. For consumer groups we always send the > > > > groupInstanceId > > > > > > (if > > > > > > > > any) as > > > > > > > > > >>>>>> part of every heartbeat, along with memberId, epoch > and > > > > > > groupId. > > > > > > > > Should > > > > > > > > > >>>> we > > > > > > > > > >>>>>> consider that too here? > > > > > > > > > >>>>>> LM3. We’re proposing returning a GROUP_ID_NOT_FOUND > error > > > > in > > > > > > > > response to > > > > > > > > > >>>>>> the stream-specific RPCs if the groupId is > associated > > > > with a > > > > > > > > group type > > > > > > > > > >>>>>> that is not streams (ie. consumer group or share > group). I > > > > > > wonder > > > > > > > > if at > > > > > > > > > >>>>>> this point, where we're getting several new group > types > > > > added, > > > > > > > > each with > > > > > > > > > >>>>>> RPCs that are supposed to include groupId of a > certain > > > > type, > > > > > > we > > > > > > > > should > > > > > > > > > >>>> be > > > > > > > > > >>>>>> more explicit about this situation. Maybe a kind of > > > > > > > > INVALID_GROUP_TYPE > > > > > > > > > >>>>>> (group exists but not with a valid type for this > RPC) vs a > > > > > > > > > >>>>>> GROUP_ID_NOT_FOUND (group does not exist). Those > errors > > > > > > would be > > > > > > > > > >>>>>> consistently used across consumer, share, and > streams RPCs > > > > > > > > whenever the > > > > > > > > > >>>>>> group id is not of the expected type. > > > > > > > > > >>>>>> This is truly not specific to this KIP, and should > be > > > > > > addressed > > > > > > > > with all > > > > > > > > > >>>>>> group types and their RPCs in mind. I just wanted > to bring > > > > > > out my > > > > > > > > > >>>> concern > > > > > > > > > >>>>>> and get thoughts around it. > > > > > > > > > >>>>>> LM4. On a related note, StreamsGroupDescribe returns > > > > > > > > INVALID_REQUEST if > > > > > > > > > >>>>>> groupId is empty. There is already an > INVALID_GROUP_ID > > > > error, > > > > > > > > that seems > > > > > > > > > >>>>>> more specific to this situation. Error handling of > > > > specific > > > > > > > > errors would > > > > > > > > > >>>>>> definitely be easier than having to deal with a > generic > > > > > > > > INVALID_REQUEST > > > > > > > > > >>>>>> (and probably its custom message). I know that for > > > > KIP-848 we > > > > > > have > > > > > > > > > >>>>>> INVALID_REQUEST for similar situations, so if ever > we take > > > > > > down > > > > > > > > this > > > > > > > > > >>>> path > > > > > > > > > >>>>>> we should review it there too for consistency. > Thoughts? > > > > > > > > > >>>>>> LM5. The dependency between the > StreamsGroupHeartbeat RPC > > > > and > > > > > > the > > > > > > > > > >>>>>> StreamsGroupInitialize RPC is one-way only right? HB > > > > requires > > > > > > a > > > > > > > > previous > > > > > > > > > >>>>>> StreamsGroupInitialize request, but > StreamsGroupInitialize > > > > > > > > processing is > > > > > > > > > >>>>>> totally independent of heartbeats (and could > perfectly be > > > > > > > > processed > > > > > > > > > >>>> without > > > > > > > > > >>>>>> a previous HB, even though the client > implementation we’re > > > > > > > > proposing > > > > > > > > > >>>> won’t > > > > > > > > > >>>>>> go down that path). Is my understanding correct? > Just to > > > > > > double > > > > > > > > check, > > > > > > > > > >>>>>> seems sensible like that at the protocol level. > > > > > > > > > >>>>>> LM6. With KIP-848, there is an important > improvement that > > > > > > brings a > > > > > > > > > >>>>>> difference in behaviour around the static > membership: > > > > with the > > > > > > > > classic > > > > > > > > > >>>>>> protocol, if a static member joins with a group > instance > > > > > > already > > > > > > > > in > > > > > > > > > >>>> use, it > > > > > > > > > >>>>>> makes the initial member fail with a > FENCED_INSTANCED_ID > > > > > > > > exception, vs. > > > > > > > > > >>>>>> with the new consumer group protocol, the second > member > > > > > > trying to > > > > > > > > join > > > > > > > > > >>>>>> fails with an UNRELEASED_INSTANCE_ID. Does this > change > > > > need > > > > > > to be > > > > > > > > > >>>>>> considered in any way for the streams app? (I'm not > > > > familiar > > > > > > with > > > > > > > > KS > > > > > > > > > >>>> yet, > > > > > > > > > >>>>>> but thought it was worth asking. If it doesn't > affect in > > > > any > > > > > > way, > > > > > > > > still > > > > > > > > > >>>>>> maybe helpful to call it out on a section for static > > > > > > membership) > > > > > > > > > >>>>>> LM7. Regarding the admin tool to manage streams > groups. > > > > We can > > > > > > > > discuss > > > > > > > > > >>>>>> whether to have it here or separately, but I think > we > > > > should > > > > > > aim > > > > > > > > for > > > > > > > > > >>>> some > > > > > > > > > >>>>>> basic admin capabilities from the start, mainly > because I > > > > > > believe > > > > > > > > it > > > > > > > > > >>>> will > > > > > > > > > >>>>>> be very helpful/needed in practice during the impl > of the > > > > KIP. > > > > > > > > From > > > > > > > > > >>>>>> experience with KIP-848, we felt a bit blindfolded > in the > > > > > > initial > > > > > > > > phase > > > > > > > > > >>>>>> where we still didn't have kafka-consumer-groups > dealing > > > > with > > > > > > the > > > > > > > > new > > > > > > > > > >>>>>> groups (and then it was very helpful and used when > we were > > > > > > able to > > > > > > > > > >>>> easily > > > > > > > > > >>>>>> inspect them from the console) > > > > > > > > > >>>>>> LM8. nit: the links the KIP-848 are not quite right > > > > (pointing > > > > > > to > > > > > > > > an > > > > > > > > > >>>>>> unrelated “Future work section” at the end of > KIP-848) > > > > > > > > > >>>>>> Thanks! > > > > > > > > > >>>>>> Lianet > > > > > > > > > >>>>>> On Fri, Jul 19, 2024 at 11:13 AM Lucas Brutschy > > > > > > > > > >>>>>> <lbruts...@confluent.io.invalid> wrote: > > > > > > > > > >>>>>>> Hi Andrew, > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>> AS2: I added a note for now. If others feel > strongly > > > > about > > > > > > it, > > > > > > > > we can > > > > > > > > > >>>>>>> still add more administrative tools to the KIP - it > > > > should > > > > > > not > > > > > > > > change > > > > > > > > > >>>>>>> the overall story significantly. > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>> AS8: "streams.group.assignor.name" sounds good to > me to > > > > > > > > distinguish > > > > > > > > > >>>>>>> the config from class names. Not sure if I like the > > > > > > "default". > > > > > > > > To be > > > > > > > > > >>>>>>> consistent, we'd then have to call it > > > > > > > > > >>>>>>> `group.streams.default.session.timeout.ms` as > well. I > > > > only > > > > > > > > added the > > > > > > > > > >>>>>>> `.name` on both broker and group level for now. > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>> AS10: Ah, I misread your comment, now I know what > you > > > > meant. > > > > > > Good > > > > > > > > > >>>>>>> point, fixed (by Bruno). > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>> Cheers, > > > > > > > > > >>>>>>> Lucas > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>> On Fri, Jul 19, 2024 at 4:44 PM Andrew Schofield > > > > > > > > > >>>>>>> <andrew_schofi...@live.com> wrote: > > > > > > > > > >>>>>>>> > > > > > > > > > >>>>>>>> Hi Lucas, > > > > > > > > > >>>>>>>> I see that I hit send too quickly. One more > comment: > > > > > > > > > >>>>>>>> > > > > > > > > > >>>>>>>> AS2: I think stating that there will be a > > > > > > > > `kafka-streams-group.sh` in > > > > > > > > > >>>> a > > > > > > > > > >>>>>>>> future KIP is fine to keep this KIP focused. > > > > Personally, I > > > > > > would > > > > > > > > > >>>> probably > > > > > > > > > >>>>>>>> put all of the gory details in this KIP, but then > it’s > > > > not > > > > > > my > > > > > > > > KIP. A > > > > > > > > > >>>>>>> future > > > > > > > > > >>>>>>>> pointer is fine too. > > > > > > > > > >>>>>>>> > > > > > > > > > >>>>>>>> Thanks, > > > > > > > > > >>>>>>>> Andrew > > > > > > > > > >>>>>>>> > > > > > > > > > >>>>>>>> > > > > > > > > > >>>>>>>>> On 19 Jul 2024, at 13:46, Lucas Brutschy < > > > > > > > > lbruts...@confluent.io > > > > > > > > > >>>> .INVALID> > > > > > > > > > >>>>>>> wrote: > > > > > > > > > >>>>>>>>> > > > > > > > > > >>>>>>>>> Hi Andrew, > > > > > > > > > >>>>>>>>> > > > > > > > > > >>>>>>>>> thanks for getting the discussion going! Here > are my > > > > > > responses. > > > > > > > > > >>>>>>>>> > > > > > > > > > >>>>>>>>> AS1: Good point, done. > > > > > > > > > >>>>>>>>> > > > > > > > > > >>>>>>>>> AS2: We were planning to add more administrative > tools > > > > to > > > > > > the > > > > > > > > > >>>>>>>>> interface in a follow-up KIP, to not make this > KIP too > > > > > > large. > > > > > > > > If > > > > > > > > > >>>>>>>>> people think that it would help to understand the > > > > overall > > > > > > > > picture if > > > > > > > > > >>>>>>>>> we already add something like > > > > `kafka-streams-groups.sh`, we > > > > > > > > will do > > > > > > > > > >>>>>>>>> that. I also agree that we should address how > this > > > > relates > > > > > > to > > > > > > > > > >>>>>>>>> KIP-1043, we'll add it. > > > > > > > > > >>>>>>>>> > > > > > > > > > >>>>>>>>> AS3: Good idea, that's more consistent with > > > > `assigning` and > > > > > > > > > >>>>>>> `reconciling` etc. > > > > > > > > > >>>>>>>>> > > > > > > > > > >>>>>>>>> AS4: Thanks, Fixed. > > > > > > > > > >>>>>>>>> > > > > > > > > > >>>>>>>>> AS5: Good catch. This was supposed to mean that > we > > > > require > > > > > > > > CREATE on > > > > > > > > > >>>>>>>>> cluster or CREATE on all topics, not both. Fixed. > > > > > > > > > >>>>>>>>> > > > > > > > > > >>>>>>>>> AS6: Thanks, Fixed. > > > > > > > > > >>>>>>>>> > > > > > > > > > >>>>>>>>> AS7. Thanks, Fixed. > > > > > > > > > >>>>>>>>> > > > > > > > > > >>>>>>>>> AS8: I think this works a bit different in this > KIP > > > > than in > > > > > > > > consumer > > > > > > > > > >>>>>>>>> groups. KIP-848 lets the members vote for a > preferred > > > > > > > > assignor, and > > > > > > > > > >>>>>>>>> the broker-side assignor is picked by majority > vote. > > > > The > > > > > > > > > >>>>>>>>> `group.consumer.assignors` specifies the list of > > > > assignors > > > > > > > > that are > > > > > > > > > >>>>>>>>> supported on the broker, and is configurable > because > > > > the > > > > > > > > interface is > > > > > > > > > >>>>>>>>> pluggable. In this KIP, the task assignor is not > voted > > > > on > > > > > > by > > > > > > > > members > > > > > > > > > >>>>>>>>> but configured on the broker-side. > > > > > > `group.streams.assignor` is > > > > > > > > used > > > > > > > > > >>>>>>>>> for this, and uses a specific name. If we'll > make the > > > > task > > > > > > > > assignor > > > > > > > > > >>>>>>>>> pluggable on the broker-side, we'd introduce a > separate > > > > > > config > > > > > > > > > >>>>>>>>> `group.streams.assignors`, which would indeed be > a > > > > list of > > > > > > > > class > > > > > > > > > >>>>>>>>> names. I think there is no conflict here, the two > > > > > > > > configurations > > > > > > > > > >>>> serve > > > > > > > > > >>>>>>>>> different purposes. The only gripe I'd have > here is > > > > > > naming as > > > > > > > > > >>>>>>>>> `group.streams.assignor` and > `group.streams.assignors` > > > > > > would > > > > > > > > be a bit > > > > > > > > > >>>>>>>>> similar, but I cannot really think of a better > name for > > > > > > > > > >>>>>>>>> `group.streams.assignor`, so I'd probably rather > > > > introduce > > > > > > > > > >>>>>>>>> `group.streams.assignors` as > > > > > > > > `group.streams.possible_assignors` or > > > > > > > > > >>>>>>>>> something like that. > > > > > > > > > >>>>>>>>> > > > > > > > > > >>>>>>>>> AS9: I added explanations for the various record > types. > > > > > > Apart > > > > > > > > from > > > > > > > > > >>>> the > > > > > > > > > >>>>>>>>> new topology record, and the partition metadata > (which > > > > is > > > > > > > > based on > > > > > > > > > >>>> the > > > > > > > > > >>>>>>>>> topology and can only be created once we have a > > > > topology > > > > > > > > initialized) > > > > > > > > > >>>>>>>>> the lifecycle for the records is basically > identical > > > > as in > > > > > > > > KIP-848. > > > > > > > > > >>>>>>>>> > > > > > > > > > >>>>>>>>> AS10: In the consumer offset topic, the version > in the > > > > key > > > > > > is > > > > > > > > used to > > > > > > > > > >>>>>>>>> differentiate different schema types with the > same > > > > > > content. So > > > > > > > > the > > > > > > > > > >>>>>>>>> keys are not versioned, but the version field is > > > > "abused" > > > > > > as a > > > > > > > > type > > > > > > > > > >>>>>>>>> tag. This is the same in KIP-848, we followed it > for > > > > > > > > consistency. > > > > > > > > > >>>>>>>>> > > > > > > > > > >>>>>>>>> Cheers, > > > > > > > > > >>>>>>>>> Lucas > > > > > > > > > >>>>>>>>> > > > > > > > > > >>>>>>>>> > > > > > > > > > >>>>>>>>> On Thu, Jul 18, 2024 at 1:27 PM Andrew Schofield > > > > > > > > > >>>>>>>>> <andrew_schofi...@live.com> wrote: > > > > > > > > > >>>>>>>>>> > > > > > > > > > >>>>>>>>>> Hi Lucas and Bruno, > > > > > > > > > >>>>>>>>>> > > > > > > > > > >>>>>>>>>> Thanks for the great KIP. > > > > > > > > > >>>>>>>>>> > > > > > > > > > >>>>>>>>>> I've read through the document and have some > initial > > > > > > comments. > > > > > > > > > >>>>>>>>>> > > > > > > > > > >>>>>>>>>> AS1: I suppose that there is a new > > > > > > > > o.a.k.common.GroupType.STREAMS > > > > > > > > > >>>>>>> enumeration > > > > > > > > > >>>>>>>>>> constant. This is a change to the public > interface and > > > > > > should > > > > > > > > be > > > > > > > > > >>>>>>> called out. > > > > > > > > > >>>>>>>>>> > > > > > > > > > >>>>>>>>>> AS2: Since streams groups are no longer consumer > > > > groups, > > > > > > how > > > > > > > > does > > > > > > > > > >>>> the > > > > > > > > > >>>>>>> user > > > > > > > > > >>>>>>>>>> manipulate them, observe lag and so on? Will > you add > > > > > > > > > >>>>>>> `kafka-streams-groups.sh` > > > > > > > > > >>>>>>>>>> or extend `kafka-streams-application-reset.sh`? > Of > > > > course, > > > > > > > > KIP-1043 > > > > > > > > > >>>>>>> can easily > > > > > > > > > >>>>>>>>>> be extended to support streams groups, but that > only > > > > lets > > > > > > the > > > > > > > > user > > > > > > > > > >>>>>>> see the > > > > > > > > > >>>>>>>>>> groups, not manipulate them. > > > > > > > > > >>>>>>>>>> > > > > > > > > > >>>>>>>>>> AS3: I wonder whether the streams group state of > > > > > > > > UNINITIALIZED would > > > > > > > > > >>>>>>> be > > > > > > > > > >>>>>>>>>> better expressed as INITIALIZING. > > > > > > > > > >>>>>>>>>> > > > > > > > > > >>>>>>>>>> AS4: In StreamsGroupInitializeRequest, > > > > > > > > Topology[].SourceTopicRegex > > > > > > > > > >>>>>>> should > > > > > > > > > >>>>>>>>>> be nullable. > > > > > > > > > >>>>>>>>>> > > > > > > > > > >>>>>>>>>> AS5: Why does StreamsGroupInitialize require > CREATE > > > > > > > > permission on > > > > > > > > > >>>> the > > > > > > > > > >>>>>>>>>> cluster resource? I imagine that this is one of > the > > > > ways > > > > > > that > > > > > > > > the > > > > > > > > > >>>>>>> request might > > > > > > > > > >>>>>>>>>> be granted permission to create the > > > > StateChangelogTopics > > > > > > and > > > > > > > > > >>>>>>>>>> RepartitionSourceTopics, but if it is granted > > > > permission > > > > > > to > > > > > > > > create > > > > > > > > > >>>>>>> those topics > > > > > > > > > >>>>>>>>>> with specific ACLs, would CREATE on the cluster > > > > resource > > > > > > > > still be > > > > > > > > > >>>>>>> required? > > > > > > > > > >>>>>>>>>> > > > > > > > > > >>>>>>>>>> AS6: StreamsGroupInitialize can also fail with > > > > > > > > > >>>>>>> TOPIC_AUTHORIZATION_FAILED > > > > > > > > > >>>>>>>>>> and (subject to AS5) > CLUSTER_AUTHORIZATION_FAILED. > > > > > > > > > >>>>>>>>>> > > > > > > > > > >>>>>>>>>> AS7: A tiny nit. You've used TopologyID > (capitals) in > > > > > > > > > >>>>>>> StreamsGroupHeartbeatRequest > > > > > > > > > >>>>>>>>>> and a few others, but in all other cases the > fields > > > > which > > > > > > are > > > > > > > > ids > > > > > > > > > >>>> are > > > > > > > > > >>>>>>> spelled Id. > > > > > > > > > >>>>>>>>>> I suggest TopologyId. > > > > > > > > > >>>>>>>>>> > > > > > > > > > >>>>>>>>>> Also, "interal" is probably meant to be > "interval”. > > > > > > > > > >>>>>>>>>> > > > > > > > > > >>>>>>>>>> AS8: For consumer groups, the > > > > `group.consumer.assignors` > > > > > > > > > >>>>>>> configuration is a > > > > > > > > > >>>>>>>>>> list of class names. The assignors do have > names too, > > > > but > > > > > > the > > > > > > > > > >>>>>>> configuration which > > > > > > > > > >>>>>>>>>> enables them is in terms of class names. I > wonder > > > > whether > > > > > > the > > > > > > > > > >>>> broker’s > > > > > > > > > >>>>>>>>>> group.streams.assignor could actually be > > > > > > > > `group.streams.assignors` > > > > > > > > > >>>>>>> and specified > > > > > > > > > >>>>>>>>>> as a list of the class names of the supplied > > > > assignors. I > > > > > > know > > > > > > > > > >>>> you're > > > > > > > > > >>>>>>> not supporting > > > > > > > > > >>>>>>>>>> other assignors yet, but when you do, I expect > you > > > > would > > > > > > > > prefer to > > > > > > > > > >>>>>>> have used class > > > > > > > > > >>>>>>>>>> names from the start. > > > > > > > > > >>>>>>>>>> > > > > > > > > > >>>>>>>>>> The use of assignor names in the other places > looks > > > > good > > > > > > to > > > > > > > > me. > > > > > > > > > >>>>>>>>>> > > > > > > > > > >>>>>>>>>> AS9: I'd find it really helpful to have a bit > of a > > > > > > > > description about > > > > > > > > > >>>>>>> the purpose and > > > > > > > > > >>>>>>>>>> lifecycle of the 9 record types you've > introduced on > > > > the > > > > > > > > > >>>>>>> __consumer_offsets topic. > > > > > > > > > >>>>>>>>>> I did a cursory review but without really > > > > understanding > > > > > > what's > > > > > > > > > >>>>>>> written when, > > > > > > > > > >>>>>>>>>> I can't do a thorough job of reviewing. > > > > > > > > > >>>>>>>>>> > > > > > > > > > >>>>>>>>>> AS10: In the definitions of the record keys, > such as > > > > > > > > > >>>>>>>>>> StreamsGroupCurrentMemberAssignmentKey, the > versions > > > > of > > > > > > the > > > > > > > > fields > > > > > > > > > >>>>>>> must > > > > > > > > > >>>>>>>>>> match the versions of the types. > > > > > > > > > >>>>>>>>>> > > > > > > > > > >>>>>>>>>> Thanks, > > > > > > > > > >>>>>>>>>> Andrew > > > > > > > > > >>>>>>>>>> > > > > > > > > > >>>>>>>>>>> On 12 Jul 2024, at 09:04, Lucas Brutschy < > > > > > > > > lbruts...@confluent.io > > > > > > > > > >>>> .INVALID> > > > > > > > > > >>>>>>> wrote: > > > > > > > > > >>>>>>>>>>> > > > > > > > > > >>>>>>>>>>> Hi all, > > > > > > > > > >>>>>>>>>>> > > > > > > > > > >>>>>>>>>>> I would like to start a discussion thread on > > > > KIP-1071: > > > > > > > > Streams > > > > > > > > > >>>>>>>>>>> Rebalance Protocol. With this KIP, we aim to > bring > > > > the > > > > > > > > principles > > > > > > > > > >>>>>>> laid > > > > > > > > > >>>>>>>>>>> down by KIP-848 to Kafka Streams, to make > rebalances > > > > more > > > > > > > > reliable > > > > > > > > > >>>>>>> and > > > > > > > > > >>>>>>>>>>> scalable, and make Kafka Streams overall > easier to > > > > > > deploy and > > > > > > > > > >>>>>>> operate. > > > > > > > > > >>>>>>>>>>> The KIP proposed moving the assignment logic > to the > > > > > > broker, > > > > > > > > and > > > > > > > > > >>>>>>>>>>> introducing a dedicated group type and > dedicated > > > > RPCs for > > > > > > > > Kafka > > > > > > > > > >>>>>>>>>>> Streams. > > > > > > > > > >>>>>>>>>>> > > > > > > > > > >>>>>>>>>>> The KIP is here: > > > > > > > > > >>>>>>>>>>> > > > > > > > > > >>>>>>> > > > > > > > > > >>>> > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1071%3A+Streams+Rebalance+Protocol > > > > > > > > > >>>>>>>>>>> > > > > > > > > > >>>>>>>>>>> This is joint work with Bruno Cadonna. > > > > > > > > > >>>>>>>>>>> > > > > > > > > > >>>>>>>>>>> Please take a look and let us know what you > think. > > > > > > > > > >>>>>>>>>>> > > > > > > > > > >>>>>>>>>>> Best, > > > > > > > > > >>>>>>>>>>> Lucas > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >