Thanks for your answers Guozhang! I slowly understand more and more details. Couple of follow up questions:
10) Consumer Coordinator Algorithm--1a: > If subscription has changed: revoke all partitions by calling > onPartitionsRevoked, send join-group request with empty owned partitions in > Subscription. Could we call `onPartitionsRevoked()` not on all partitions, but only the assigned ones for topics that got removed from the subscription? And send corresponding potentially non-empty owned partitions in the Subscription? In your reply to mentioned to avoid "split brain" -- what scenario do you have in mind? Releasing partitions seem save, and we enter a rebalance afterwards anyway. 20) Consumer Coordinator Algorithm--1b: > If topic metadata has changed: call onPartitionsLost on those > owned-but-no-longer-exist partitions; and if the consumer is the leader, send > join-group request. Why should only the leader send join-group request? In any client detect a metadata change, it seems that any client could trigger a new rebalance? 30) Consumer Coordinator Algorithm--1c: > If received REBALANCE_IN_PROGRESS from heartbeat response or commit response: > same as a) above. For this case, we missed a rebalance. Should we rather call `onPartitionsLost()` instead of `onPartitionsRevoked()` for this case? 40) Consumer Coordinator Algorithm--1d: > If received MEMBER_ID_REQUIRED from join-group request: same as a) above. This can only happen when a new consumer starts and thus no partitions are assigned. Why do we need to call `onPartitionsRevoked()` before we send the second join-group request? 50) Consumer Coordinator Algorithm--2c: > Note the this set otherwise the we would fall into the case 3.b) forevercould > be different from owned-partitions. > Compare the owned-partitions with assigned-partitions and generate three > exclusive sub-sets: Incomplete sentence? 60) Consumer Coordinator Algorithm--3: > For every consumer: after received the sync-group request, do the following: Do you mean sync-group _response_? 70) nit: typo, double `since` > It is safe to just follow the above algorithm since for V0 members, since > they've revoked everything 80) Downgrading and Old-Versioned New Member > We will rely on the above consumer-side metric so that users would be > notified in time. What does this exactly mean, ie, how is the user notified? What metric are you referring to? 90) About the upgrade path discussion: To use the already existing mechanism as proposed by Jason, we could sub-class `PartitionAssignor` as `IncrementalPartitionAssignor extends PartitionAssignor` (or introduce a marker interface). This would allow the coordinator to distinguish between both cases and either revoke eagerly or not. -Matthias On 4/12/19 6:08 PM, Jason Gustafson wrote: > Hi Guozhang, > > Responses below: > > 2. The interface's default implementation will just be >> `onPartitionRevoked`, so for user's instantiation if they do not make any >> code changes they should be able to recompile the code and continue. > > > Ack, makes sense. > > 4. Hmm.. not sure if it will work. The main issue is that the >> consumer-coordinator behavior (whether to revoke all or none at >> onRebalancePrepare) is independent of the selected protocol's assignor >> (eager or cooperative), so even if the assignor is selected to be the >> old-versioned one, we will still not revoke at the consumer-coordinator >> layer and hence has the same risk of migrating still-owned partitions, >> right? > > > Yeah, basically we would have to push the eager/cooperative logic into the > PartitionAssignor itself and make the consumer aware of the rebalance > protocol it is compatible with. As long as an eager protocol _could_ be > selected, the consumer would have to be pessimistic and do eager > revocation. But if all the assignors configured in the consumer support > cooperative reassignment, then either 1) a cooperative protocol will be > selected and cooperative revocation can be safely used, or 2) if the rest > of the group does not support it, then the consumer will simply fail. > > Another point which you raised offline and I will repeat here is that this > proposal's benefit is mostly limited to sticky assignment logic. Arguably > the range assignor may have some incidental stickiness, particularly if the > group is rebalancing for a newly created or deleted topic. For other cases, > the proposal is mostly additional overhead since it takes an additional > rebalance and many of the partitions will move. Perhaps it doesn't make as > much sense to use the cooperative protocol for strategies like range and > round-robin. That kind of argues in favor of pushing some of the control > into the assignor itself. Maybe we would not bother creating > CooperativeRange as I suggested above, but it would make sense to create a > cooperative version of the sticky assignment strategy. I thought we might > have to create a new sticky assignor anyway because I can't see how we > would get compatible behavior mixing with the old version anyway. > > Thanks, > Jason > > > On Thu, Apr 11, 2019 at 5:53 PM Guozhang Wang <wangg...@gmail.com> wrote: > >> Hello Matthias: >> >> Thanks for your review. >> >> The background section uses streams assignor as well as the consumer's own >> stick assignor as examples illustrating the situation, but this KIP is for >> consumer coordinator itself, and the rest of the paragraph did not talk >> about Streams any more. If you feel it's a bit distracted I can remove >> those examples. >> >> 10). While working on the PR I realized that the revoked partitions on >> assignment is not needed (this is being discussed on the PR itself: >> https://github.com/apache/kafka/pull/6528#issuecomment-480009890 >> >> 20). 1.a. Good question, I've updated the wiki to let the consumer's >> cleanup assignment and re-join, and not letting assignor making any >> proactive changes. The idea is to keep logic simpler and not doing any >> "split brain" stuff. >> >> 20). 2.b. No we do not need, since the owned-partitions will be part of the >> Subscription passed in to assign() already. >> >> 30). As Boyang mentioned, there are some drawbacks that can not be >> addressed by rebalance delay still, hence still voted KIP-345 (some more >> details can be found on the discussion thread of KIP-345 itself). One >> example is that as the instance resumes, its member id will be empty so we >> are still relying on assignor to give it the assignment from the old >> member-id while keeping all other member's assignment unchanged. >> >> 40). Incomplete sentence, I've updated it. >> >> 50). Here's my idea: suppose we augment the join group schema with >> `protocol version` in 2.3, and then with both brokers and clients being in >> version 2.3+, on the first rolling bounce where subscription and assignment >> schema and / or user metadata has changed, this protocol version will be >> bumped. On the broker side, when receiving all member's join-group request, >> it will choose the one that has the highest protocol version (also it >> assumes higher versioned protocol is always backward compatible, i.e. the >> coordinator can recognize lower versioned protocol as well) and select it >> as the leader. Then the leader can decide, based on its received and >> deserialized subscription information, how to assign partitions and how to >> encode the assignment accordingly so that everyone can understand it. With >> this, in Streams for example, no version probing would be needed since we >> are guaranteed the leader knows everyone's version -- again it is assuming >> that higher versioned protocol is always backward compatible -- and hence >> can successfully do the assignment at that round. >> >> 60). My bad, this section was not updated while the design was evolved, >> I've updated it. >> >> >> On Tue, Apr 9, 2019 at 7:22 PM Boyang Chen <bche...@outlook.com> wrote: >> >>> >>> Thanks for the review Matthias! My 2-cent on the rebalance delay is that >>> it is a rather fixed trade-off between >>> >>> task availability and resource shuffling. If we eventually trigger >>> rebalance after rolling bounce, certain consumer >>> >>> setup is still faced with global shuffles, for example member.id ranking >>> based round robin strategy, as rejoining dynamic >>> >>> members will be assigned with new member.id which reorders the >>> assignment. So I think the primary goal of incremental >>> >>> rebalancing is still improving the cluster availability during rebalance, >>> because it didn't revoke any partition during this >>> >>> process. Also, the perk is minimum configuration requirement :) >>> >>> >>> Best, >>> >>> Boyang >>> >>> ________________________________ >>> From: Matthias J. Sax <matth...@confluent.io> >>> Sent: Tuesday, April 9, 2019 7:47 AM >>> To: dev >>> Subject: Re: [DISCUSS] KIP-429 : Smooth Auto-Scaling for Kafka Streams >>> >>> Thank for the KIP, Boyang and Guozhang! >>> >>> >>> I made an initial pass and have some questions/comments. One high level >>> comment: it seems that the KIP "mixes" plain consumer and Kafka Streams >>> use case a little bit (at least in the presentation). It might be >>> helpful to separate both cases clearly, or maybe limit the scope to >>> plain consumer only. >>> >>> >>> >>> 10) For `PartitionAssignor.Assignment`: It seems we need a new method >>> `List<TopicPartitions> revokedPartitions()` ? >>> >>> >>> >>> 20) In Section "Consumer Coordinator Algorithm" >>> >>> Bullet point "1a)": If the subscription changes and a topic is >>> removed from the subscription, why do we not revoke the partitions? >>> >>> Bullet point "1a)": What happens is a topic is deleted (or a >>> partition is removed/deleted from a topic)? Should we call the new >>> `onPartitionsEmigrated()` callback for this case? >>> >>> Bullet point "2b)" Should we update the `PartitionAssignor` >>> interface to pass in the "old assignment" as third parameter into >>> `assign()`? >>> >>> >>> >>> 30) Rebalance delay (as used in KIP-415): Could a rebalance delay >>> subsume KIP-345? Configuring static members is rather complicated, and I >>> am wondering if a rebalance delay would be sufficient? >>> >>> >>> >>> 40) Quote: "otherwise the we would fall into the case 3.b) forever." >>> >>> What is "case 3.b" ? >>> >>> >>> >>> 50) Section "Looking into the Future" >>> >>> Nit: the new "ProtocolVersion" field is missing in the first line >>> describing "JoinGroupRequest" >>> >>>> This can also help saving "version probing" cost on Streams as well. >>> >>> How does this relate to Kafka Streams "version probing" implementation? >>> How can we exploit the new `ProtocolVersion` in Streams to improve >>> "version probing" ? I have a rough idea, but would like to hear more >>> details. >>> >>> >>> >>> 60) Section "Recommended Upgrade Procedure" >>> >>>> Set the `stream.rebalancing.mode` to `upgrading`, which will force the >>> stream application to stay with protocol type "consumer". >>> >>> This config is not discussed in the KIP and appears in this section >>> without context. Can you elaborate about it? >>> >>> >>> >>> -Matthias >>> >>> >>> >>> >>> On 3/29/19 6:20 PM, Guozhang Wang wrote: >>>> Bump up on this discussion thread. I've added a few new drawings for >>> better >>>> illustration, would really appreciate your feedbacks. >>>> >>>> >>>> Guozhang >>>> >>>> On Wed, Mar 20, 2019 at 6:17 PM Guozhang Wang <wangg...@gmail.com> >>> wrote: >>>> >>>>> Hello Boyang, >>>>> >>>>> I've made another thorough pass over this KIP and I'd like to spilt it >>>>> into two parts: the first part, covered in KIP-429 would be touching >> on >>>>> Consumer Coordinator only to have incremental rebalance protocol in >>> place. >>>>> The second part (for now I've reserved KIP number 444 for it) would >>> contain >>>>> all the changes on StreamsPartitionAssginor to allow warming up new >>>>> members. >>>>> >>>>> I think the first part, a.k.a. the current updated KIP-429 is ready >> for >>>>> review and discussions again. Would love to hear people's feedbacks >> and >>>>> ideas. >>>>> >>>>> Guozhang >>>>> >>>>> >>>>> >>>>> On Mon, Mar 4, 2019 at 10:09 AM Boyang Chen <bche...@outlook.com> >>> wrote: >>>>> >>>>>> Thanks Guozhang for the great questions. Answers are inlined: >>>>>> >>>>>> 1. I'm still not sure if it's worthwhile to add a new type of >> "learner >>>>>> task" in addition to "standby task": if the only difference is that >> for >>>>>> the >>>>>> latter, we would consider workload balance while for the former we >>> would >>>>>> not, I think we can just adjust the logic of StickyTaskAssignor a bit >>> to >>>>>> break that difference. Adding a new type of task would be adding a >> lot >>> of >>>>>> code complexity, so if we can still piggy-back the logic on a >>> standby-task >>>>>> I would prefer to do so. >>>>>> In the proposal we stated that we are not adding a new type of task >>>>>> implementation. The >>>>>> learner task shall share the same implementation with normal standby >>>>>> task, only that we >>>>>> shall tag the standby task with learner and prioritize the learner >>> tasks >>>>>> replay effort. >>>>>> 2. One thing that's still not clear from the KIP wiki itself is which >>>>>> layer >>>>>> would the logic be implemented at. Although for most KIPs we would >> not >>>>>> require internal implementation details but only public facing API >>>>>> updates, >>>>>> for a KIP like this I think it still requires to flesh out details on >>> the >>>>>> implementation design. More specifically: today Streams embed a full >>>>>> fledged Consumer client, which hard-code a ConsumerCoordinator >> inside, >>>>>> Streams then injects a StreamsPartitionAssignor to its pluggable >>>>>> PartitionAssignor interface and inside the StreamsPartitionAssignor >> we >>>>>> also >>>>>> have a TaskAssignor interface whose default implementation is >>>>>> StickyPartitionAssignor. Streams partition assignor logic today sites >>> in >>>>>> the latter two classes. Hence the hierarchy today is: >>>>>> >>>>>> KafkaConsumer -> ConsumerCoordinator -> StreamsPartitionAssignor -> >>>>>> StickyTaskAssignor. >>>>>> >>>>>> We need to think about where the proposed implementation would take >>> place >>>>>> at, and personally I think it is not the best option to inject all of >>> them >>>>>> into the StreamsPartitionAssignor / StickyTaskAssignor since the >> logic >>> of >>>>>> "triggering another rebalance" etc would require some coordinator >> logic >>>>>> which is hard to mimic at PartitionAssignor level. On the other hand, >>>>>> since >>>>>> we are embedding a KafkaConsumer client as a whole we cannot just >>> replace >>>>>> ConsumerCoordinator with a specialized StreamsCoordinator like >> Connect >>>>>> does >>>>>> in KIP-415. So I'd like to maybe split the current proposal in both >>>>>> consumer layer and streams-assignor layer like we did in >>> KIP-98/KIP-129. >>>>>> And then the key thing to consider is how to cut off the boundary so >>> that >>>>>> the modifications we push to ConsumerCoordinator would be beneficial >>>>>> universally for any consumers, while keep the Streams-specific logic >> at >>>>>> the >>>>>> assignor level. >>>>>> Yes, that's also my ideal plan. The details for the implementation >> are >>>>>> depicted >>>>>> in this doc< >>>>>> >>> >> https://docs.google.com/document/d/1me2a5wvxAZT1QE6HkwyDl7C2TiBQlKN3Dpw_I1ro91U/edit#heading=h.qix74qdmekae >>>> , >>> [ >>> >> https://lh5.googleusercontent.com/DXWMyKNE9rFFIv7TNX56Q41QwqYp8ynivwWSJHHORqSRkoQxtraW2bqiB-NRUGAMYKkt8A=w1200-h630-p >>> ]< >>> >> https://docs.google.com/document/d/1me2a5wvxAZT1QE6HkwyDl7C2TiBQlKN3Dpw_I1ro91U/edit#heading=h.qix74qdmekae >>>> >>> >>> [External] KStream Smooth Auto-scaling Implementation Plan< >>> >> https://docs.google.com/document/d/1me2a5wvxAZT1QE6HkwyDl7C2TiBQlKN3Dpw_I1ro91U/edit#heading=h.qix74qdmekae >>>> >>> docs.google.com >>> KStream Incremental Rebalancing Implementation Plan Authors: Boyang Chen, >>> Guozhang Wang KIP link Stage: [Draft | Review | Approved] Background We >>> initiated KIP-429 for the promotion of incremental rebalancing work for >>> KStream. Behind the scene, there is non-trivial amount of effort that >> needs >>> to... >>> >>> >>> >>>>>> and I have explained the reasoning on why we want to push a >>>>>> global change of replacing ConsumerCoordinator with >> StreamCoordinator. >>>>>> The motivation >>>>>> is that KIP space is usually used for public & algorithm level >> change, >>>>>> not for internal >>>>>> implementation details. >>>>>> >>>>>> 3. Depending on which design direction we choose, our migration plan >>> would >>>>>> also be quite different. For example, if we stay with >>> ConsumerCoordinator >>>>>> whose protocol type is "consumer" still, and we can manage to make >> all >>>>>> changes agnostic to brokers as well as to old versioned consumers, >> then >>>>>> our >>>>>> migration plan could be much easier. >>>>>> Yes, the upgrade plan was designed to take the new StreamCoordinator >>>>>> approach >>>>>> which means we shall define a new protocol type. For existing >>> application >>>>>> we could only >>>>>> maintain the same `consumer` protocol type is because current broker >>> only >>>>>> allows >>>>>> change of protocol type when the consumer group is empty. It is of >>> course >>>>>> user-unfriendly to force >>>>>> a wipe-out for the entire application, and I don't think maintaining >>> old >>>>>> protocol type would greatly >>>>>> impact ongoing services using new stream coordinator. WDYT? >>>>>> >>>>>> 4. I think one major issue related to this KIP is that today, in the >>>>>> StickyPartitionAssignor, we always try to honor stickiness over >>> workload >>>>>> balance, and hence "learner task" is needed to break this priority, >> but >>>>>> I'm >>>>>> wondering if we can have a better solution within sticky task >> assignor >>>>>> that >>>>>> accommodate this? >>>>>> Great question! That's what I explained in the proposal, which is >> that >>> we >>>>>> should breakdown our >>>>>> delivery into different stages. At very beginning, our goal is to >>> trigger >>>>>> learner task assignment only on >>>>>> `new` hosts, where we shall leverage leader's knowledge of previous >>> round >>>>>> of rebalance to figure out. After >>>>>> stage one, our goal is to have a smooth scaling up experience, but >> the >>>>>> task balance problem is kind of orthogonal. >>>>>> The load balance problem is a much broader topic than auto scaling, >>> which >>>>>> I figure worth discussing within >>>>>> this KIP's context since it's a naturally next-step, but wouldn't be >>> the >>>>>> main topic. >>>>>> Learner task or auto scaling support should be treated as `a helpful >>>>>> mechanism to reach load balance`, but not `an algorithm defining load >>>>>> balance`. It would be great if you could share some insights of the >>> stream >>>>>> task balance, which eventually helps us to break out of the KIP-429's >>> scope >>>>>> and even define a separate KIP to focus on task weight & assignment >>> logic >>>>>> improvement. >>>>>> >>>>>> Also thank you for making improvement on the KIP context and >>> organization! >>>>>> >>>>>> Best, >>>>>> Boyang >>>>>> ________________________________ >>>>>> From: Guozhang Wang <wangg...@gmail.com> >>>>>> Sent: Saturday, March 2, 2019 6:00 AM >>>>>> To: dev >>>>>> Subject: Re: [DISCUSS] KIP-429 : Smooth Auto-Scaling for Kafka >> Streams >>>>>> >>>>>> Hello Boyang, >>>>>> >>>>>> I've just made a quick pass on the KIP and here are some thoughts. >>>>>> >>>>>> Meta: >>>>>> >>>>>> 1. I'm still not sure if it's worthwhile to add a new type of >> "learner >>>>>> task" in addition to "standby task": if the only difference is that >> for >>>>>> the >>>>>> latter, we would consider workload balance while for the former we >>> would >>>>>> not, I think we can just adjust the logic of StickyTaskAssignor a bit >>> to >>>>>> break that difference. Adding a new type of task would be adding a >> lot >>> of >>>>>> code complexity, so if we can still piggy-back the logic on a >>> standby-task >>>>>> I would prefer to do so. >>>>>> >>>>>> 2. One thing that's still not clear from the KIP wiki itself is which >>>>>> layer >>>>>> would the logic be implemented at. Although for most KIPs we would >> not >>>>>> require internal implementation details but only public facing API >>>>>> updates, >>>>>> for a KIP like this I think it still requires to flesh out details on >>> the >>>>>> implementation design. More specifically: today Streams embed a full >>>>>> fledged Consumer client, which hard-code a ConsumerCoordinator >> inside, >>>>>> Streams then injects a StreamsPartitionAssignor to its plugable >>>>>> PartitionAssignor interface and inside the StreamsPartitionAssignor >> we >>>>>> also >>>>>> have a TaskAssignor interface whose default implementation is >>>>>> StickyPartitionAssignor. Streams partition assignor logic today sites >>> in >>>>>> the latter two classes. Hence the hierarchy today is: >>>>>> >>>>>> KafkaConsumer -> ConsumerCoordinator -> StreamsPartitionAssignor -> >>>>>> StickyTaskAssignor. >>>>>> >>>>>> We need to think about where the proposed implementation would take >>> place >>>>>> at, and personally I think it is not the best option to inject all of >>> them >>>>>> into the StreamsPartitionAssignor / StickyTaskAssignor since the >> logic >>> of >>>>>> "triggering another rebalance" etc would require some coordinator >> logic >>>>>> which is hard to mimic at PartitionAssignor level. On the other hand, >>>>>> since >>>>>> we are embedding a KafkaConsumer client as a whole we cannot just >>> replace >>>>>> ConsumerCoordinator with a specialized StreamsCoordinator like >> Connect >>>>>> does >>>>>> in KIP-415. So I'd like to maybe split the current proposal in both >>>>>> consumer layer and streams-assignor layer like we did in >>> KIP-98/KIP-129. >>>>>> And then the key thing to consider is how to cut off the boundary so >>> that >>>>>> the modifications we push to ConsumerCoordinator would be beneficial >>>>>> universally for any consumers, while keep the Streams-specific logic >> at >>>>>> the >>>>>> assignor level. >>>>>> >>>>>> 3. Depending on which design direction we choose, our migration plan >>> would >>>>>> also be quite different. For example, if we stay with >>> ConsumerCoordinator >>>>>> whose protocol type is "consumer" still, and we can manage to make >> all >>>>>> changes agnostic to brokers as well as to old versioned consumers, >> then >>>>>> our >>>>>> migration plan could be much easier. >>>>>> >>>>>> 4. I think one major issue related to this KIP is that today, in the >>>>>> StickyPartitionAssignor, we always try to honor stickiness over >>> workload >>>>>> balance, and hence "learner task" is needed to break this priority, >> but >>>>>> I'm >>>>>> wondering if we can have a better solution within sticky task >> assignor >>>>>> that >>>>>> accommodate this? >>>>>> >>>>>> Minor: >>>>>> >>>>>> 1. The idea of two rebalances have also been discussed in >>>>>> https://issues.apache.org/jira/browse/KAFKA-6145. So we should add >> the >>>>>> reference on the wiki page as well. >>>>>> 2. Could you also add a section describing how the subscription / >>>>>> assignment metadata will be re-formatted? Without this information it >>> is >>>>>> hard to get to the bottom of your idea. For example in the "Leader >>>>>> Transfer >>>>>> Before Scaling" section, I'm not sure why "S2 doesn't know S4 is new >>>>>> member" >>>>>> and hence would blindly obey stickiness over workload balance >>> requirement. >>>>>> >>>>>> Guozhang >>>>>> >>>>>> >>>>>> On Thu, Feb 28, 2019 at 11:05 AM Boyang Chen <bche...@outlook.com> >>> wrote: >>>>>> >>>>>>> Hey community friends, >>>>>>> >>>>>>> I'm gladly inviting you to have a look at the proposal to add >>>>>> incremental >>>>>>> rebalancing to Kafka Streams, A.K.A auto-scaling support. >>>>>>> >>>>>>> >>>>>>> >>>>>> >>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Smooth+Auto-Scaling+for+Kafka+Streams >>>>>>> >>>>>>> Special thanks to Guozhang for giving great guidances and important >>>>>>> feedbacks while making this KIP! >>>>>>> >>>>>>> Best, >>>>>>> Boyang >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> -- Guozhang >>>>>> >>>>> >>>>> >>>>> -- >>>>> -- Guozhang >>>>> >>>> >>>> >>> >>> >> >> -- >> -- Guozhang >> >
signature.asc
Description: OpenPGP digital signature