Hello Matthias, I've thought about that before, and the reason I did not include this as part of the KIP-429 scope is that fetcher / coordinator may get quite complicated to return non-empty data if "updateAssignmentMetadataIfNeeded" returns false in KafkaConsumer. In addition, when there's a rebalance in progress, letting consumers to process data which potentially may take longer time (in Streams for example, it is related to `max.poll.interval` config) could lead to higher chance of "partitions lost" and wasted processing work.
So I've decided to still keep it as simple as is today, and admittedly from a user perspective, they may see consecutive "poll" call returning no data. I will create a JIRA ticket capturing this idea for future discussion whether we should consider this as a general optimization in consumer. Does that sound good to you? Guozhang On Wed, May 22, 2019 at 4:31 AM Matthias J. Sax <matth...@confluent.io> wrote: > Thanks to the KIP and starting the VOTE Guozhang. I am overall +1. > > One follow up thought: the KIP does not discuss in details, how `poll()` > will behave after the change. It might actually be important to ensure > that `poll()` behavior changes to be non-blocking to allow an > application to process data from non-revoked partitions while a > rebalance is happening in the background. > > Thoughts? > > > -Matthias > > > On 5/10/19 1:10 AM, Guozhang Wang wrote: > > Hello Matthias, > > > > I'm proposing to change this behavior holistically inside > > ConsumerCoordinator actually. In other words I'm trying to piggy-back > this > > behavioral fix of KAFKA-4600 along with this KIP, and the motivation for > me > > to do this piggy-backing is that, with incremental rebalancing, there > would > > be partial affected partitions as we are not revoking every body any > more. > > > > > > Guozhang > > > > > > On Thu, May 9, 2019 at 6:21 AM Matthias J. Sax <matth...@confluent.io> > > wrote: > > > >> Thanks Guozhang! > >> > >> The simplified upgrade path is great! > >> > >> > >> Just a clarification question about the "Rebalance Callback Error > >> Handling" -- does this change affect the `ConsumerCoordinator` only if > >> incremental rebalancing is use? Or does the behavior also change for the > >> eager rebalancing case? > >> > >> > >> -Matthias > >> > >> > >> On 5/9/19 3:37 AM, Guozhang Wang wrote: > >>> Hello all, > >>> > >>> Thanks for everyone who've shared their feedbacks for this KIP! If > >> there's > >>> no further comments I'll start the voting thread by end of tomorrow. > >>> > >>> > >>> Guozhang. > >>> > >>> On Wed, May 8, 2019 at 6:36 PM Guozhang Wang <wangg...@gmail.com> > wrote: > >>> > >>>> Hello Boyang, > >>>> > >>>> On Wed, May 1, 2019 at 4:51 PM Boyang Chen <bche...@outlook.com> > wrote: > >>>> > >>>>> Hey Guozhang, > >>>>> > >>>>> thank you for the great write up. Overall the motivation and changes > >>>>> LGTM, just some minor comments: > >>>>> > >>>>> > >>>>> 1. In "Consumer Coordinator Algorithm", we could reorder alphabet > >>>>> points for 3d~3f from ["ready-to-migrate-partitions", > >>>>> "unknown-but-owned-partitions", "maybe-revoking-partitions"] to > >>>>> ["maybe-revoking-partitions", "ready-to-migrate-partitions", > >>>>> "unknown-but-owned-partitions"] in order to be consistent with 3c1~3. > >>>>> > >>>> > >>>> Ack. Updated. > >>>> > >>>> > >>>>> 2. In "Consumer Coordinator Algorithm", 1c suggests to revoke all > >>>>> partition upon heartbeat/commit fail. What's the gain here? Do we > want > >> to > >>>>> keep all partitions running at this moment, to be optimistic for the > >> case > >>>>> when no partitions get reassigned? > >>>>> > >>>> > >>>> That's a good catch. When REBALANCE_IN_PROGRESS is received, we can > just > >>>> re-join the group with all the currently owned partitions encoded. > >> Updated. > >>>> > >>>> > >>>>> 3. In "Recommended Upgrade Procedure", remove extra 'those': " The > >>>>> 'sticky' assignor works even those there are " > >>>>> > >>>> > >>>> Ack, should be `even when`. > >>>> > >>>> > >>>>> 4. Put two "looking into the future" into a separate category from > >>>>> migration session. It seems inconsistent for readers to see this > >> before we > >>>>> finished discussion for everything. > >>>>> > >>>> > >>>> Ack. > >>>> > >>>> > >>>>> 5. Have we discussed the concern on the serialization? Could the > new > >>>>> metadata we are adding grow larger than the message size cap? > >>>>> > >>>> > >>>> We're completing https://issues.apache.org/jira/browse/KAFKA-7149 > which > >>>> should largely reduce the message size (will update the wiki > >> accordingly as > >>>> well). > >>>> > >>>> > >>>>> > >>>>> Boyang > >>>>> > >>>>> ________________________________ > >>>>> From: Guozhang Wang <wangg...@gmail.com> > >>>>> Sent: Monday, April 15, 2019 9:20 AM > >>>>> To: dev > >>>>> Subject: Re: [DISCUSS] KIP-429 : Smooth Auto-Scaling for Kafka > Streams > >>>>> > >>>>> Hello Jason, > >>>>> > >>>>> I agree with you that for range / round-robin it makes less sense to > be > >>>>> compatible with cooperative rebalance protocol. > >>>>> > >>>>> As for StickyAssignor, however, I think it would still be possible to > >> make > >>>>> the current implementation to be compatible with cooperative > >> rebalance. So > >>>>> after pondering on different options at hand I'm now proposing this > >>>>> approach as listed in the upgrade section: > >>>>> > >>>>> > >>>>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol#KIP-429:KafkaConsumerIncrementalRebalanceProtocol-CompatibilityandUpgradePath > >>>>> > >>>>> The idea is to let assignors specify which protocols it would work > >> with, > >>>>> associating with a different name; then the upgrade path would > involve > >> a > >>>>> "compatible" protocol which actually still use eager behavior while > >>>>> encoding two assignors if possible. In "Rejected Section" (just to > >> clarify > >>>>> I'm not finalizing it as rejected, just putting it there for now, and > >> if > >>>>> we > >>>>> like this one instead we can always switch them) I listed the other > >>>>> approach we once discussed about, and arguing its cons of duplicated > >> class > >>>>> seems overwhelm the pros of saving the "rebalance.protocol" config. > >>>>> > >>>>> Let me know WDYT. > >>>>> > >>>>> Guozhang > >>>>> > >>>>> On Fri, Apr 12, 2019 at 6:08 PM Jason Gustafson <ja...@confluent.io> > >>>>> wrote: > >>>>> > >>>>>> Hi Guozhang, > >>>>>> > >>>>>> Responses below: > >>>>>> > >>>>>> 2. The interface's default implementation will just be > >>>>>>> `onPartitionRevoked`, so for user's instantiation if they do not > make > >>>>> any > >>>>>>> code changes they should be able to recompile the code and > continue. > >>>>>> > >>>>>> > >>>>>> Ack, makes sense. > >>>>>> > >>>>>> 4. Hmm.. not sure if it will work. The main issue is that the > >>>>>>> consumer-coordinator behavior (whether to revoke all or none at > >>>>>>> onRebalancePrepare) is independent of the selected protocol's > >> assignor > >>>>>>> (eager or cooperative), so even if the assignor is selected to be > the > >>>>>>> old-versioned one, we will still not revoke at the > >>>>> consumer-coordinator > >>>>>>> layer and hence has the same risk of migrating still-owned > >> partitions, > >>>>>>> right? > >>>>>> > >>>>>> > >>>>>> Yeah, basically we would have to push the eager/cooperative logic > into > >>>>> the > >>>>>> PartitionAssignor itself and make the consumer aware of the > rebalance > >>>>>> protocol it is compatible with. As long as an eager protocol _could_ > >> be > >>>>>> selected, the consumer would have to be pessimistic and do eager > >>>>>> revocation. But if all the assignors configured in the consumer > >> support > >>>>>> cooperative reassignment, then either 1) a cooperative protocol will > >> be > >>>>>> selected and cooperative revocation can be safely used, or 2) if the > >>>>> rest > >>>>>> of the group does not support it, then the consumer will simply > fail. > >>>>>> > >>>>>> Another point which you raised offline and I will repeat here is > that > >>>>> this > >>>>>> proposal's benefit is mostly limited to sticky assignment logic. > >>>>> Arguably > >>>>>> the range assignor may have some incidental stickiness, particularly > >> if > >>>>> the > >>>>>> group is rebalancing for a newly created or deleted topic. For other > >>>>> cases, > >>>>>> the proposal is mostly additional overhead since it takes an > >> additional > >>>>>> rebalance and many of the partitions will move. Perhaps it doesn't > >> make > >>>>> as > >>>>>> much sense to use the cooperative protocol for strategies like range > >> and > >>>>>> round-robin. That kind of argues in favor of pushing some of the > >> control > >>>>>> into the assignor itself. Maybe we would not bother creating > >>>>>> CooperativeRange as I suggested above, but it would make sense to > >>>>> create a > >>>>>> cooperative version of the sticky assignment strategy. I thought we > >>>>> might > >>>>>> have to create a new sticky assignor anyway because I can't see how > we > >>>>>> would get compatible behavior mixing with the old version anyway. > >>>>>> > >>>>>> Thanks, > >>>>>> Jason > >>>>>> > >>>>>> > >>>>>> On Thu, Apr 11, 2019 at 5:53 PM Guozhang Wang <wangg...@gmail.com> > >>>>> wrote: > >>>>>> > >>>>>>> Hello Matthias: > >>>>>>> > >>>>>>> Thanks for your review. > >>>>>>> > >>>>>>> The background section uses streams assignor as well as the > >> consumer's > >>>>>> own > >>>>>>> stick assignor as examples illustrating the situation, but this KIP > >> is > >>>>>> for > >>>>>>> consumer coordinator itself, and the rest of the paragraph did not > >>>>> talk > >>>>>>> about Streams any more. If you feel it's a bit distracted I can > >> remove > >>>>>>> those examples. > >>>>>>> > >>>>>>> 10). While working on the PR I realized that the revoked partitions > >> on > >>>>>>> assignment is not needed (this is being discussed on the PR itself: > >>>>>>> https://github.com/apache/kafka/pull/6528#issuecomment-480009890 > >>>>>>> > >>>>>>> 20). 1.a. Good question, I've updated the wiki to let the > consumer's > >>>>>>> cleanup assignment and re-join, and not letting assignor making any > >>>>>>> proactive changes. The idea is to keep logic simpler and not doing > >> any > >>>>>>> "split brain" stuff. > >>>>>>> > >>>>>>> 20). 2.b. No we do not need, since the owned-partitions will be > part > >>>>> of > >>>>>> the > >>>>>>> Subscription passed in to assign() already. > >>>>>>> > >>>>>>> 30). As Boyang mentioned, there are some drawbacks that can not be > >>>>>>> addressed by rebalance delay still, hence still voted KIP-345 (some > >>>>> more > >>>>>>> details can be found on the discussion thread of KIP-345 itself). > One > >>>>>>> example is that as the instance resumes, its member id will be > empty > >>>>> so > >>>>>> we > >>>>>>> are still relying on assignor to give it the assignment from the > old > >>>>>>> member-id while keeping all other member's assignment unchanged. > >>>>>>> > >>>>>>> 40). Incomplete sentence, I've updated it. > >>>>>>> > >>>>>>> 50). Here's my idea: suppose we augment the join group schema with > >>>>>>> `protocol version` in 2.3, and then with both brokers and clients > >>>>> being > >>>>>> in > >>>>>>> version 2.3+, on the first rolling bounce where subscription and > >>>>>> assignment > >>>>>>> schema and / or user metadata has changed, this protocol version > will > >>>>> be > >>>>>>> bumped. On the broker side, when receiving all member's join-group > >>>>>> request, > >>>>>>> it will choose the one that has the highest protocol version (also > it > >>>>>>> assumes higher versioned protocol is always backward compatible, > i.e. > >>>>> the > >>>>>>> coordinator can recognize lower versioned protocol as well) and > >>>>> select it > >>>>>>> as the leader. Then the leader can decide, based on its received > and > >>>>>>> deserialized subscription information, how to assign partitions and > >>>>> how > >>>>>> to > >>>>>>> encode the assignment accordingly so that everyone can understand > it. > >>>>>> With > >>>>>>> this, in Streams for example, no version probing would be needed > >>>>> since we > >>>>>>> are guaranteed the leader knows everyone's version -- again it is > >>>>>> assuming > >>>>>>> that higher versioned protocol is always backward compatible -- and > >>>>> hence > >>>>>>> can successfully do the assignment at that round. > >>>>>>> > >>>>>>> 60). My bad, this section was not updated while the design was > >>>>> evolved, > >>>>>>> I've updated it. > >>>>>>> > >>>>>>> > >>>>>>> On Tue, Apr 9, 2019 at 7:22 PM Boyang Chen <bche...@outlook.com> > >>>>> wrote: > >>>>>>> > >>>>>>>> > >>>>>>>> Thanks for the review Matthias! My 2-cent on the rebalance delay > is > >>>>>> that > >>>>>>>> it is a rather fixed trade-off between > >>>>>>>> > >>>>>>>> task availability and resource shuffling. If we eventually trigger > >>>>>>>> rebalance after rolling bounce, certain consumer > >>>>>>>> > >>>>>>>> setup is still faced with global shuffles, for example member.id > >>>>>> ranking > >>>>>>>> based round robin strategy, as rejoining dynamic > >>>>>>>> > >>>>>>>> members will be assigned with new member.id which reorders the > >>>>>>>> assignment. So I think the primary goal of incremental > >>>>>>>> > >>>>>>>> rebalancing is still improving the cluster availability during > >>>>>> rebalance, > >>>>>>>> because it didn't revoke any partition during this > >>>>>>>> > >>>>>>>> process. Also, the perk is minimum configuration requirement :) > >>>>>>>> > >>>>>>>> > >>>>>>>> Best, > >>>>>>>> > >>>>>>>> Boyang > >>>>>>>> > >>>>>>>> ________________________________ > >>>>>>>> From: Matthias J. Sax <matth...@confluent.io> > >>>>>>>> Sent: Tuesday, April 9, 2019 7:47 AM > >>>>>>>> To: dev > >>>>>>>> Subject: Re: [DISCUSS] KIP-429 : Smooth Auto-Scaling for Kafka > >>>>> Streams > >>>>>>>> > >>>>>>>> Thank for the KIP, Boyang and Guozhang! > >>>>>>>> > >>>>>>>> > >>>>>>>> I made an initial pass and have some questions/comments. One high > >>>>> level > >>>>>>>> comment: it seems that the KIP "mixes" plain consumer and Kafka > >>>>> Streams > >>>>>>>> use case a little bit (at least in the presentation). It might be > >>>>>>>> helpful to separate both cases clearly, or maybe limit the scope > to > >>>>>>>> plain consumer only. > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> 10) For `PartitionAssignor.Assignment`: It seems we need a new > >>>>> method > >>>>>>>> `List<TopicPartitions> revokedPartitions()` ? > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> 20) In Section "Consumer Coordinator Algorithm" > >>>>>>>> > >>>>>>>> Bullet point "1a)": If the subscription changes and a topic is > >>>>>>>> removed from the subscription, why do we not revoke the > partitions? > >>>>>>>> > >>>>>>>> Bullet point "1a)": What happens is a topic is deleted (or a > >>>>>>>> partition is removed/deleted from a topic)? Should we call the new > >>>>>>>> `onPartitionsEmigrated()` callback for this case? > >>>>>>>> > >>>>>>>> Bullet point "2b)" Should we update the `PartitionAssignor` > >>>>>>>> interface to pass in the "old assignment" as third parameter into > >>>>>>>> `assign()`? > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> 30) Rebalance delay (as used in KIP-415): Could a rebalance delay > >>>>>>>> subsume KIP-345? Configuring static members is rather complicated, > >>>>> and > >>>>>> I > >>>>>>>> am wondering if a rebalance delay would be sufficient? > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> 40) Quote: "otherwise the we would fall into the case 3.b) > forever." > >>>>>>>> > >>>>>>>> What is "case 3.b" ? > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> 50) Section "Looking into the Future" > >>>>>>>> > >>>>>>>> Nit: the new "ProtocolVersion" field is missing in the first line > >>>>>>>> describing "JoinGroupRequest" > >>>>>>>> > >>>>>>>>> This can also help saving "version probing" cost on Streams as > >>>>> well. > >>>>>>>> > >>>>>>>> How does this relate to Kafka Streams "version probing" > >>>>> implementation? > >>>>>>>> How can we exploit the new `ProtocolVersion` in Streams to improve > >>>>>>>> "version probing" ? I have a rough idea, but would like to hear > more > >>>>>>>> details. > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> 60) Section "Recommended Upgrade Procedure" > >>>>>>>> > >>>>>>>>> Set the `stream.rebalancing.mode` to `upgrading`, which will > force > >>>>>> the > >>>>>>>> stream application to stay with protocol type "consumer". > >>>>>>>> > >>>>>>>> This config is not discussed in the KIP and appears in this > section > >>>>>>>> without context. Can you elaborate about it? > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> -Matthias > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> On 3/29/19 6:20 PM, Guozhang Wang wrote: > >>>>>>>>> Bump up on this discussion thread. I've added a few new drawings > >>>>> for > >>>>>>>> better > >>>>>>>>> illustration, would really appreciate your feedbacks. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> Guozhang > >>>>>>>>> > >>>>>>>>> On Wed, Mar 20, 2019 at 6:17 PM Guozhang Wang < > wangg...@gmail.com > >>>>>> > >>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> Hello Boyang, > >>>>>>>>>> > >>>>>>>>>> I've made another thorough pass over this KIP and I'd like to > >>>>> spilt > >>>>>> it > >>>>>>>>>> into two parts: the first part, covered in KIP-429 would be > >>>>> touching > >>>>>>> on > >>>>>>>>>> Consumer Coordinator only to have incremental rebalance protocol > >>>>> in > >>>>>>>> place. > >>>>>>>>>> The second part (for now I've reserved KIP number 444 for it) > >>>>> would > >>>>>>>> contain > >>>>>>>>>> all the changes on StreamsPartitionAssginor to allow warming up > >>>>> new > >>>>>>>>>> members. > >>>>>>>>>> > >>>>>>>>>> I think the first part, a.k.a. the current updated KIP-429 is > >>>>> ready > >>>>>>> for > >>>>>>>>>> review and discussions again. Would love to hear people's > >>>>> feedbacks > >>>>>>> and > >>>>>>>>>> ideas. > >>>>>>>>>> > >>>>>>>>>> Guozhang > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> On Mon, Mar 4, 2019 at 10:09 AM Boyang Chen < > bche...@outlook.com > >>>>>> > >>>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>>> Thanks Guozhang for the great questions. Answers are inlined: > >>>>>>>>>>> > >>>>>>>>>>> 1. I'm still not sure if it's worthwhile to add a new type of > >>>>>>> "learner > >>>>>>>>>>> task" in addition to "standby task": if the only difference is > >>>>> that > >>>>>>> for > >>>>>>>>>>> the > >>>>>>>>>>> latter, we would consider workload balance while for the former > >>>>> we > >>>>>>>> would > >>>>>>>>>>> not, I think we can just adjust the logic of StickyTaskAssignor > >>>>> a > >>>>>> bit > >>>>>>>> to > >>>>>>>>>>> break that difference. Adding a new type of task would be > >>>>> adding a > >>>>>>> lot > >>>>>>>> of > >>>>>>>>>>> code complexity, so if we can still piggy-back the logic on a > >>>>>>>> standby-task > >>>>>>>>>>> I would prefer to do so. > >>>>>>>>>>> In the proposal we stated that we are not adding a new type of > >>>>> task > >>>>>>>>>>> implementation. The > >>>>>>>>>>> learner task shall share the same implementation with normal > >>>>>> standby > >>>>>>>>>>> task, only that we > >>>>>>>>>>> shall tag the standby task with learner and prioritize the > >>>>> learner > >>>>>>>> tasks > >>>>>>>>>>> replay effort. > >>>>>>>>>>> 2. One thing that's still not clear from the KIP wiki itself is > >>>>>> which > >>>>>>>>>>> layer > >>>>>>>>>>> would the logic be implemented at. Although for most KIPs we > >>>>> would > >>>>>>> not > >>>>>>>>>>> require internal implementation details but only public facing > >>>>> API > >>>>>>>>>>> updates, > >>>>>>>>>>> for a KIP like this I think it still requires to flesh out > >>>>> details > >>>>>> on > >>>>>>>> the > >>>>>>>>>>> implementation design. More specifically: today Streams embed a > >>>>>> full > >>>>>>>>>>> fledged Consumer client, which hard-code a ConsumerCoordinator > >>>>>>> inside, > >>>>>>>>>>> Streams then injects a StreamsPartitionAssignor to its > pluggable > >>>>>>>>>>> PartitionAssignor interface and inside the > >>>>> StreamsPartitionAssignor > >>>>>>> we > >>>>>>>>>>> also > >>>>>>>>>>> have a TaskAssignor interface whose default implementation is > >>>>>>>>>>> StickyPartitionAssignor. Streams partition assignor logic today > >>>>>> sites > >>>>>>>> in > >>>>>>>>>>> the latter two classes. Hence the hierarchy today is: > >>>>>>>>>>> > >>>>>>>>>>> KafkaConsumer -> ConsumerCoordinator -> > >>>>> StreamsPartitionAssignor -> > >>>>>>>>>>> StickyTaskAssignor. > >>>>>>>>>>> > >>>>>>>>>>> We need to think about where the proposed implementation would > >>>>> take > >>>>>>>> place > >>>>>>>>>>> at, and personally I think it is not the best option to inject > >>>>> all > >>>>>> of > >>>>>>>> them > >>>>>>>>>>> into the StreamsPartitionAssignor / StickyTaskAssignor since > the > >>>>>>> logic > >>>>>>>> of > >>>>>>>>>>> "triggering another rebalance" etc would require some > >>>>> coordinator > >>>>>>> logic > >>>>>>>>>>> which is hard to mimic at PartitionAssignor level. On the other > >>>>>> hand, > >>>>>>>>>>> since > >>>>>>>>>>> we are embedding a KafkaConsumer client as a whole we cannot > >>>>> just > >>>>>>>> replace > >>>>>>>>>>> ConsumerCoordinator with a specialized StreamsCoordinator like > >>>>>>> Connect > >>>>>>>>>>> does > >>>>>>>>>>> in KIP-415. So I'd like to maybe split the current proposal in > >>>>> both > >>>>>>>>>>> consumer layer and streams-assignor layer like we did in > >>>>>>>> KIP-98/KIP-129. > >>>>>>>>>>> And then the key thing to consider is how to cut off the > >>>>> boundary > >>>>>> so > >>>>>>>> that > >>>>>>>>>>> the modifications we push to ConsumerCoordinator would be > >>>>>> beneficial > >>>>>>>>>>> universally for any consumers, while keep the Streams-specific > >>>>>> logic > >>>>>>> at > >>>>>>>>>>> the > >>>>>>>>>>> assignor level. > >>>>>>>>>>> Yes, that's also my ideal plan. The details for the > >>>>> implementation > >>>>>>> are > >>>>>>>>>>> depicted > >>>>>>>>>>> in this doc< > >>>>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >> > https://docs.google.com/document/d/1me2a5wvxAZT1QE6HkwyDl7C2TiBQlKN3Dpw_I1ro91U/edit#heading=h.qix74qdmekae > >>>>>>>>> , > >>>>>>>> [ > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >> > https://lh5.googleusercontent.com/DXWMyKNE9rFFIv7TNX56Q41QwqYp8ynivwWSJHHORqSRkoQxtraW2bqiB-NRUGAMYKkt8A=w1200-h630-p > >>>>>>>> ]< > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >> > https://docs.google.com/document/d/1me2a5wvxAZT1QE6HkwyDl7C2TiBQlKN3Dpw_I1ro91U/edit#heading=h.qix74qdmekae > >>>>>>>>> > >>>>>>>> > >>>>>>>> [External] KStream Smooth Auto-scaling Implementation Plan< > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >> > https://docs.google.com/document/d/1me2a5wvxAZT1QE6HkwyDl7C2TiBQlKN3Dpw_I1ro91U/edit#heading=h.qix74qdmekae > >>>>>>>>> > >>>>>>>> docs.google.com > >>>>>>>> KStream Incremental Rebalancing Implementation Plan Authors: > Boyang > >>>>>> Chen, > >>>>>>>> Guozhang Wang KIP link Stage: [Draft | Review | Approved] > >>>>> Background We > >>>>>>>> initiated KIP-429 for the promotion of incremental rebalancing > work > >>>>> for > >>>>>>>> KStream. Behind the scene, there is non-trivial amount of effort > >>>>> that > >>>>>>> needs > >>>>>>>> to... > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>>>>> and I have explained the reasoning on why we want to push a > >>>>>>>>>>> global change of replacing ConsumerCoordinator with > >>>>>>> StreamCoordinator. > >>>>>>>>>>> The motivation > >>>>>>>>>>> is that KIP space is usually used for public & algorithm level > >>>>>>> change, > >>>>>>>>>>> not for internal > >>>>>>>>>>> implementation details. > >>>>>>>>>>> > >>>>>>>>>>> 3. Depending on which design direction we choose, our migration > >>>>>> plan > >>>>>>>> would > >>>>>>>>>>> also be quite different. For example, if we stay with > >>>>>>>> ConsumerCoordinator > >>>>>>>>>>> whose protocol type is "consumer" still, and we can manage to > >>>>> make > >>>>>>> all > >>>>>>>>>>> changes agnostic to brokers as well as to old versioned > >>>>> consumers, > >>>>>>> then > >>>>>>>>>>> our > >>>>>>>>>>> migration plan could be much easier. > >>>>>>>>>>> Yes, the upgrade plan was designed to take the new > >>>>>> StreamCoordinator > >>>>>>>>>>> approach > >>>>>>>>>>> which means we shall define a new protocol type. For existing > >>>>>>>> application > >>>>>>>>>>> we could only > >>>>>>>>>>> maintain the same `consumer` protocol type is because current > >>>>>> broker > >>>>>>>> only > >>>>>>>>>>> allows > >>>>>>>>>>> change of protocol type when the consumer group is empty. It is > >>>>> of > >>>>>>>> course > >>>>>>>>>>> user-unfriendly to force > >>>>>>>>>>> a wipe-out for the entire application, and I don't think > >>>>>> maintaining > >>>>>>>> old > >>>>>>>>>>> protocol type would greatly > >>>>>>>>>>> impact ongoing services using new stream coordinator. WDYT? > >>>>>>>>>>> > >>>>>>>>>>> 4. I think one major issue related to this KIP is that today, > in > >>>>>> the > >>>>>>>>>>> StickyPartitionAssignor, we always try to honor stickiness over > >>>>>>>> workload > >>>>>>>>>>> balance, and hence "learner task" is needed to break this > >>>>> priority, > >>>>>>> but > >>>>>>>>>>> I'm > >>>>>>>>>>> wondering if we can have a better solution within sticky task > >>>>>>> assignor > >>>>>>>>>>> that > >>>>>>>>>>> accommodate this? > >>>>>>>>>>> Great question! That's what I explained in the proposal, which > >>>>> is > >>>>>>> that > >>>>>>>> we > >>>>>>>>>>> should breakdown our > >>>>>>>>>>> delivery into different stages. At very beginning, our goal is > >>>>> to > >>>>>>>> trigger > >>>>>>>>>>> learner task assignment only on > >>>>>>>>>>> `new` hosts, where we shall leverage leader's knowledge of > >>>>> previous > >>>>>>>> round > >>>>>>>>>>> of rebalance to figure out. After > >>>>>>>>>>> stage one, our goal is to have a smooth scaling up experience, > >>>>> but > >>>>>>> the > >>>>>>>>>>> task balance problem is kind of orthogonal. > >>>>>>>>>>> The load balance problem is a much broader topic than auto > >>>>> scaling, > >>>>>>>> which > >>>>>>>>>>> I figure worth discussing within > >>>>>>>>>>> this KIP's context since it's a naturally next-step, but > >>>>> wouldn't > >>>>>> be > >>>>>>>> the > >>>>>>>>>>> main topic. > >>>>>>>>>>> Learner task or auto scaling support should be treated as `a > >>>>>> helpful > >>>>>>>>>>> mechanism to reach load balance`, but not `an algorithm > defining > >>>>>> load > >>>>>>>>>>> balance`. It would be great if you could share some insights of > >>>>> the > >>>>>>>> stream > >>>>>>>>>>> task balance, which eventually helps us to break out of the > >>>>>> KIP-429's > >>>>>>>> scope > >>>>>>>>>>> and even define a separate KIP to focus on task weight & > >>>>> assignment > >>>>>>>> logic > >>>>>>>>>>> improvement. > >>>>>>>>>>> > >>>>>>>>>>> Also thank you for making improvement on the KIP context and > >>>>>>>> organization! > >>>>>>>>>>> > >>>>>>>>>>> Best, > >>>>>>>>>>> Boyang > >>>>>>>>>>> ________________________________ > >>>>>>>>>>> From: Guozhang Wang <wangg...@gmail.com> > >>>>>>>>>>> Sent: Saturday, March 2, 2019 6:00 AM > >>>>>>>>>>> To: dev > >>>>>>>>>>> Subject: Re: [DISCUSS] KIP-429 : Smooth Auto-Scaling for Kafka > >>>>>>> Streams > >>>>>>>>>>> > >>>>>>>>>>> Hello Boyang, > >>>>>>>>>>> > >>>>>>>>>>> I've just made a quick pass on the KIP and here are some > >>>>> thoughts. > >>>>>>>>>>> > >>>>>>>>>>> Meta: > >>>>>>>>>>> > >>>>>>>>>>> 1. I'm still not sure if it's worthwhile to add a new type of > >>>>>>> "learner > >>>>>>>>>>> task" in addition to "standby task": if the only difference is > >>>>> that > >>>>>>> for > >>>>>>>>>>> the > >>>>>>>>>>> latter, we would consider workload balance while for the former > >>>>> we > >>>>>>>> would > >>>>>>>>>>> not, I think we can just adjust the logic of StickyTaskAssignor > >>>>> a > >>>>>> bit > >>>>>>>> to > >>>>>>>>>>> break that difference. Adding a new type of task would be > >>>>> adding a > >>>>>>> lot > >>>>>>>> of > >>>>>>>>>>> code complexity, so if we can still piggy-back the logic on a > >>>>>>>> standby-task > >>>>>>>>>>> I would prefer to do so. > >>>>>>>>>>> > >>>>>>>>>>> 2. One thing that's still not clear from the KIP wiki itself is > >>>>>> which > >>>>>>>>>>> layer > >>>>>>>>>>> would the logic be implemented at. Although for most KIPs we > >>>>> would > >>>>>>> not > >>>>>>>>>>> require internal implementation details but only public facing > >>>>> API > >>>>>>>>>>> updates, > >>>>>>>>>>> for a KIP like this I think it still requires to flesh out > >>>>> details > >>>>>> on > >>>>>>>> the > >>>>>>>>>>> implementation design. More specifically: today Streams embed a > >>>>>> full > >>>>>>>>>>> fledged Consumer client, which hard-code a ConsumerCoordinator > >>>>>>> inside, > >>>>>>>>>>> Streams then injects a StreamsPartitionAssignor to its plugable > >>>>>>>>>>> PartitionAssignor interface and inside the > >>>>> StreamsPartitionAssignor > >>>>>>> we > >>>>>>>>>>> also > >>>>>>>>>>> have a TaskAssignor interface whose default implementation is > >>>>>>>>>>> StickyPartitionAssignor. Streams partition assignor logic today > >>>>>> sites > >>>>>>>> in > >>>>>>>>>>> the latter two classes. Hence the hierarchy today is: > >>>>>>>>>>> > >>>>>>>>>>> KafkaConsumer -> ConsumerCoordinator -> > >>>>> StreamsPartitionAssignor -> > >>>>>>>>>>> StickyTaskAssignor. > >>>>>>>>>>> > >>>>>>>>>>> We need to think about where the proposed implementation would > >>>>> take > >>>>>>>> place > >>>>>>>>>>> at, and personally I think it is not the best option to inject > >>>>> all > >>>>>> of > >>>>>>>> them > >>>>>>>>>>> into the StreamsPartitionAssignor / StickyTaskAssignor since > the > >>>>>>> logic > >>>>>>>> of > >>>>>>>>>>> "triggering another rebalance" etc would require some > >>>>> coordinator > >>>>>>> logic > >>>>>>>>>>> which is hard to mimic at PartitionAssignor level. On the other > >>>>>> hand, > >>>>>>>>>>> since > >>>>>>>>>>> we are embedding a KafkaConsumer client as a whole we cannot > >>>>> just > >>>>>>>> replace > >>>>>>>>>>> ConsumerCoordinator with a specialized StreamsCoordinator like > >>>>>>> Connect > >>>>>>>>>>> does > >>>>>>>>>>> in KIP-415. So I'd like to maybe split the current proposal in > >>>>> both > >>>>>>>>>>> consumer layer and streams-assignor layer like we did in > >>>>>>>> KIP-98/KIP-129. > >>>>>>>>>>> And then the key thing to consider is how to cut off the > >>>>> boundary > >>>>>> so > >>>>>>>> that > >>>>>>>>>>> the modifications we push to ConsumerCoordinator would be > >>>>>> beneficial > >>>>>>>>>>> universally for any consumers, while keep the Streams-specific > >>>>>> logic > >>>>>>> at > >>>>>>>>>>> the > >>>>>>>>>>> assignor level. > >>>>>>>>>>> > >>>>>>>>>>> 3. Depending on which design direction we choose, our migration > >>>>>> plan > >>>>>>>> would > >>>>>>>>>>> also be quite different. For example, if we stay with > >>>>>>>> ConsumerCoordinator > >>>>>>>>>>> whose protocol type is "consumer" still, and we can manage to > >>>>> make > >>>>>>> all > >>>>>>>>>>> changes agnostic to brokers as well as to old versioned > >>>>> consumers, > >>>>>>> then > >>>>>>>>>>> our > >>>>>>>>>>> migration plan could be much easier. > >>>>>>>>>>> > >>>>>>>>>>> 4. I think one major issue related to this KIP is that today, > in > >>>>>> the > >>>>>>>>>>> StickyPartitionAssignor, we always try to honor stickiness over > >>>>>>>> workload > >>>>>>>>>>> balance, and hence "learner task" is needed to break this > >>>>> priority, > >>>>>>> but > >>>>>>>>>>> I'm > >>>>>>>>>>> wondering if we can have a better solution within sticky task > >>>>>>> assignor > >>>>>>>>>>> that > >>>>>>>>>>> accommodate this? > >>>>>>>>>>> > >>>>>>>>>>> Minor: > >>>>>>>>>>> > >>>>>>>>>>> 1. The idea of two rebalances have also been discussed in > >>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-6145. So we should > >>>>> add > >>>>>>> the > >>>>>>>>>>> reference on the wiki page as well. > >>>>>>>>>>> 2. Could you also add a section describing how the subscription > >>>>> / > >>>>>>>>>>> assignment metadata will be re-formatted? Without this > >>>>> information > >>>>>> it > >>>>>>>> is > >>>>>>>>>>> hard to get to the bottom of your idea. For example in the > >>>>> "Leader > >>>>>>>>>>> Transfer > >>>>>>>>>>> Before Scaling" section, I'm not sure why "S2 doesn't know S4 > is > >>>>>> new > >>>>>>>>>>> member" > >>>>>>>>>>> and hence would blindly obey stickiness over workload balance > >>>>>>>> requirement. > >>>>>>>>>>> > >>>>>>>>>>> Guozhang > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> On Thu, Feb 28, 2019 at 11:05 AM Boyang Chen < > >>>>> bche...@outlook.com> > >>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>>> Hey community friends, > >>>>>>>>>>>> > >>>>>>>>>>>> I'm gladly inviting you to have a look at the proposal to add > >>>>>>>>>>> incremental > >>>>>>>>>>>> rebalancing to Kafka Streams, A.K.A auto-scaling support. > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Smooth+Auto-Scaling+for+Kafka+Streams > >>>>>>>>>>>> > >>>>>>>>>>>> Special thanks to Guozhang for giving great guidances and > >>>>>> important > >>>>>>>>>>>> feedbacks while making this KIP! > >>>>>>>>>>>> > >>>>>>>>>>>> Best, > >>>>>>>>>>>> Boyang > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> -- > >>>>>>>>>>> -- Guozhang > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> -- > >>>>>>>>>> -- Guozhang > >>>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>>> -- > >>>>>>> -- Guozhang > >>>>>>> > >>>>>> > >>>>> > >>>>> > >>>>> -- > >>>>> -- Guozhang > >>>>> > >>>> > >>>> > >>>> -- > >>>> -- Guozhang > >>>> > >>> > >>> > >> > >> > > > > -- -- Guozhang