Hello Matthias, I'm proposing to change this behavior holistically inside ConsumerCoordinator actually. In other words I'm trying to piggy-back this behavioral fix of KAFKA-4600 along with this KIP, and the motivation for me to do this piggy-backing is that, with incremental rebalancing, there would be partial affected partitions as we are not revoking every body any more.
Guozhang On Thu, May 9, 2019 at 6:21 AM Matthias J. Sax <matth...@confluent.io> wrote: > Thanks Guozhang! > > The simplified upgrade path is great! > > > Just a clarification question about the "Rebalance Callback Error > Handling" -- does this change affect the `ConsumerCoordinator` only if > incremental rebalancing is use? Or does the behavior also change for the > eager rebalancing case? > > > -Matthias > > > On 5/9/19 3:37 AM, Guozhang Wang wrote: > > Hello all, > > > > Thanks for everyone who've shared their feedbacks for this KIP! If > there's > > no further comments I'll start the voting thread by end of tomorrow. > > > > > > Guozhang. > > > > On Wed, May 8, 2019 at 6:36 PM Guozhang Wang <wangg...@gmail.com> wrote: > > > >> Hello Boyang, > >> > >> On Wed, May 1, 2019 at 4:51 PM Boyang Chen <bche...@outlook.com> wrote: > >> > >>> Hey Guozhang, > >>> > >>> thank you for the great write up. Overall the motivation and changes > >>> LGTM, just some minor comments: > >>> > >>> > >>> 1. In "Consumer Coordinator Algorithm", we could reorder alphabet > >>> points for 3d~3f from ["ready-to-migrate-partitions", > >>> "unknown-but-owned-partitions", "maybe-revoking-partitions"] to > >>> ["maybe-revoking-partitions", "ready-to-migrate-partitions", > >>> "unknown-but-owned-partitions"] in order to be consistent with 3c1~3. > >>> > >> > >> Ack. Updated. > >> > >> > >>> 2. In "Consumer Coordinator Algorithm", 1c suggests to revoke all > >>> partition upon heartbeat/commit fail. What's the gain here? Do we want > to > >>> keep all partitions running at this moment, to be optimistic for the > case > >>> when no partitions get reassigned? > >>> > >> > >> That's a good catch. When REBALANCE_IN_PROGRESS is received, we can just > >> re-join the group with all the currently owned partitions encoded. > Updated. > >> > >> > >>> 3. In "Recommended Upgrade Procedure", remove extra 'those': " The > >>> 'sticky' assignor works even those there are " > >>> > >> > >> Ack, should be `even when`. > >> > >> > >>> 4. Put two "looking into the future" into a separate category from > >>> migration session. It seems inconsistent for readers to see this > before we > >>> finished discussion for everything. > >>> > >> > >> Ack. > >> > >> > >>> 5. Have we discussed the concern on the serialization? Could the new > >>> metadata we are adding grow larger than the message size cap? > >>> > >> > >> We're completing https://issues.apache.org/jira/browse/KAFKA-7149 which > >> should largely reduce the message size (will update the wiki > accordingly as > >> well). > >> > >> > >>> > >>> Boyang > >>> > >>> ________________________________ > >>> From: Guozhang Wang <wangg...@gmail.com> > >>> Sent: Monday, April 15, 2019 9:20 AM > >>> To: dev > >>> Subject: Re: [DISCUSS] KIP-429 : Smooth Auto-Scaling for Kafka Streams > >>> > >>> Hello Jason, > >>> > >>> I agree with you that for range / round-robin it makes less sense to be > >>> compatible with cooperative rebalance protocol. > >>> > >>> As for StickyAssignor, however, I think it would still be possible to > make > >>> the current implementation to be compatible with cooperative > rebalance. So > >>> after pondering on different options at hand I'm now proposing this > >>> approach as listed in the upgrade section: > >>> > >>> > >>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol#KIP-429:KafkaConsumerIncrementalRebalanceProtocol-CompatibilityandUpgradePath > >>> > >>> The idea is to let assignors specify which protocols it would work > with, > >>> associating with a different name; then the upgrade path would involve > a > >>> "compatible" protocol which actually still use eager behavior while > >>> encoding two assignors if possible. In "Rejected Section" (just to > clarify > >>> I'm not finalizing it as rejected, just putting it there for now, and > if > >>> we > >>> like this one instead we can always switch them) I listed the other > >>> approach we once discussed about, and arguing its cons of duplicated > class > >>> seems overwhelm the pros of saving the "rebalance.protocol" config. > >>> > >>> Let me know WDYT. > >>> > >>> Guozhang > >>> > >>> On Fri, Apr 12, 2019 at 6:08 PM Jason Gustafson <ja...@confluent.io> > >>> wrote: > >>> > >>>> Hi Guozhang, > >>>> > >>>> Responses below: > >>>> > >>>> 2. The interface's default implementation will just be > >>>>> `onPartitionRevoked`, so for user's instantiation if they do not make > >>> any > >>>>> code changes they should be able to recompile the code and continue. > >>>> > >>>> > >>>> Ack, makes sense. > >>>> > >>>> 4. Hmm.. not sure if it will work. The main issue is that the > >>>>> consumer-coordinator behavior (whether to revoke all or none at > >>>>> onRebalancePrepare) is independent of the selected protocol's > assignor > >>>>> (eager or cooperative), so even if the assignor is selected to be the > >>>>> old-versioned one, we will still not revoke at the > >>> consumer-coordinator > >>>>> layer and hence has the same risk of migrating still-owned > partitions, > >>>>> right? > >>>> > >>>> > >>>> Yeah, basically we would have to push the eager/cooperative logic into > >>> the > >>>> PartitionAssignor itself and make the consumer aware of the rebalance > >>>> protocol it is compatible with. As long as an eager protocol _could_ > be > >>>> selected, the consumer would have to be pessimistic and do eager > >>>> revocation. But if all the assignors configured in the consumer > support > >>>> cooperative reassignment, then either 1) a cooperative protocol will > be > >>>> selected and cooperative revocation can be safely used, or 2) if the > >>> rest > >>>> of the group does not support it, then the consumer will simply fail. > >>>> > >>>> Another point which you raised offline and I will repeat here is that > >>> this > >>>> proposal's benefit is mostly limited to sticky assignment logic. > >>> Arguably > >>>> the range assignor may have some incidental stickiness, particularly > if > >>> the > >>>> group is rebalancing for a newly created or deleted topic. For other > >>> cases, > >>>> the proposal is mostly additional overhead since it takes an > additional > >>>> rebalance and many of the partitions will move. Perhaps it doesn't > make > >>> as > >>>> much sense to use the cooperative protocol for strategies like range > and > >>>> round-robin. That kind of argues in favor of pushing some of the > control > >>>> into the assignor itself. Maybe we would not bother creating > >>>> CooperativeRange as I suggested above, but it would make sense to > >>> create a > >>>> cooperative version of the sticky assignment strategy. I thought we > >>> might > >>>> have to create a new sticky assignor anyway because I can't see how we > >>>> would get compatible behavior mixing with the old version anyway. > >>>> > >>>> Thanks, > >>>> Jason > >>>> > >>>> > >>>> On Thu, Apr 11, 2019 at 5:53 PM Guozhang Wang <wangg...@gmail.com> > >>> wrote: > >>>> > >>>>> Hello Matthias: > >>>>> > >>>>> Thanks for your review. > >>>>> > >>>>> The background section uses streams assignor as well as the > consumer's > >>>> own > >>>>> stick assignor as examples illustrating the situation, but this KIP > is > >>>> for > >>>>> consumer coordinator itself, and the rest of the paragraph did not > >>> talk > >>>>> about Streams any more. If you feel it's a bit distracted I can > remove > >>>>> those examples. > >>>>> > >>>>> 10). While working on the PR I realized that the revoked partitions > on > >>>>> assignment is not needed (this is being discussed on the PR itself: > >>>>> https://github.com/apache/kafka/pull/6528#issuecomment-480009890 > >>>>> > >>>>> 20). 1.a. Good question, I've updated the wiki to let the consumer's > >>>>> cleanup assignment and re-join, and not letting assignor making any > >>>>> proactive changes. The idea is to keep logic simpler and not doing > any > >>>>> "split brain" stuff. > >>>>> > >>>>> 20). 2.b. No we do not need, since the owned-partitions will be part > >>> of > >>>> the > >>>>> Subscription passed in to assign() already. > >>>>> > >>>>> 30). As Boyang mentioned, there are some drawbacks that can not be > >>>>> addressed by rebalance delay still, hence still voted KIP-345 (some > >>> more > >>>>> details can be found on the discussion thread of KIP-345 itself). One > >>>>> example is that as the instance resumes, its member id will be empty > >>> so > >>>> we > >>>>> are still relying on assignor to give it the assignment from the old > >>>>> member-id while keeping all other member's assignment unchanged. > >>>>> > >>>>> 40). Incomplete sentence, I've updated it. > >>>>> > >>>>> 50). Here's my idea: suppose we augment the join group schema with > >>>>> `protocol version` in 2.3, and then with both brokers and clients > >>> being > >>>> in > >>>>> version 2.3+, on the first rolling bounce where subscription and > >>>> assignment > >>>>> schema and / or user metadata has changed, this protocol version will > >>> be > >>>>> bumped. On the broker side, when receiving all member's join-group > >>>> request, > >>>>> it will choose the one that has the highest protocol version (also it > >>>>> assumes higher versioned protocol is always backward compatible, i.e. > >>> the > >>>>> coordinator can recognize lower versioned protocol as well) and > >>> select it > >>>>> as the leader. Then the leader can decide, based on its received and > >>>>> deserialized subscription information, how to assign partitions and > >>> how > >>>> to > >>>>> encode the assignment accordingly so that everyone can understand it. > >>>> With > >>>>> this, in Streams for example, no version probing would be needed > >>> since we > >>>>> are guaranteed the leader knows everyone's version -- again it is > >>>> assuming > >>>>> that higher versioned protocol is always backward compatible -- and > >>> hence > >>>>> can successfully do the assignment at that round. > >>>>> > >>>>> 60). My bad, this section was not updated while the design was > >>> evolved, > >>>>> I've updated it. > >>>>> > >>>>> > >>>>> On Tue, Apr 9, 2019 at 7:22 PM Boyang Chen <bche...@outlook.com> > >>> wrote: > >>>>> > >>>>>> > >>>>>> Thanks for the review Matthias! My 2-cent on the rebalance delay is > >>>> that > >>>>>> it is a rather fixed trade-off between > >>>>>> > >>>>>> task availability and resource shuffling. If we eventually trigger > >>>>>> rebalance after rolling bounce, certain consumer > >>>>>> > >>>>>> setup is still faced with global shuffles, for example member.id > >>>> ranking > >>>>>> based round robin strategy, as rejoining dynamic > >>>>>> > >>>>>> members will be assigned with new member.id which reorders the > >>>>>> assignment. So I think the primary goal of incremental > >>>>>> > >>>>>> rebalancing is still improving the cluster availability during > >>>> rebalance, > >>>>>> because it didn't revoke any partition during this > >>>>>> > >>>>>> process. Also, the perk is minimum configuration requirement :) > >>>>>> > >>>>>> > >>>>>> Best, > >>>>>> > >>>>>> Boyang > >>>>>> > >>>>>> ________________________________ > >>>>>> From: Matthias J. Sax <matth...@confluent.io> > >>>>>> Sent: Tuesday, April 9, 2019 7:47 AM > >>>>>> To: dev > >>>>>> Subject: Re: [DISCUSS] KIP-429 : Smooth Auto-Scaling for Kafka > >>> Streams > >>>>>> > >>>>>> Thank for the KIP, Boyang and Guozhang! > >>>>>> > >>>>>> > >>>>>> I made an initial pass and have some questions/comments. One high > >>> level > >>>>>> comment: it seems that the KIP "mixes" plain consumer and Kafka > >>> Streams > >>>>>> use case a little bit (at least in the presentation). It might be > >>>>>> helpful to separate both cases clearly, or maybe limit the scope to > >>>>>> plain consumer only. > >>>>>> > >>>>>> > >>>>>> > >>>>>> 10) For `PartitionAssignor.Assignment`: It seems we need a new > >>> method > >>>>>> `List<TopicPartitions> revokedPartitions()` ? > >>>>>> > >>>>>> > >>>>>> > >>>>>> 20) In Section "Consumer Coordinator Algorithm" > >>>>>> > >>>>>> Bullet point "1a)": If the subscription changes and a topic is > >>>>>> removed from the subscription, why do we not revoke the partitions? > >>>>>> > >>>>>> Bullet point "1a)": What happens is a topic is deleted (or a > >>>>>> partition is removed/deleted from a topic)? Should we call the new > >>>>>> `onPartitionsEmigrated()` callback for this case? > >>>>>> > >>>>>> Bullet point "2b)" Should we update the `PartitionAssignor` > >>>>>> interface to pass in the "old assignment" as third parameter into > >>>>>> `assign()`? > >>>>>> > >>>>>> > >>>>>> > >>>>>> 30) Rebalance delay (as used in KIP-415): Could a rebalance delay > >>>>>> subsume KIP-345? Configuring static members is rather complicated, > >>> and > >>>> I > >>>>>> am wondering if a rebalance delay would be sufficient? > >>>>>> > >>>>>> > >>>>>> > >>>>>> 40) Quote: "otherwise the we would fall into the case 3.b) forever." > >>>>>> > >>>>>> What is "case 3.b" ? > >>>>>> > >>>>>> > >>>>>> > >>>>>> 50) Section "Looking into the Future" > >>>>>> > >>>>>> Nit: the new "ProtocolVersion" field is missing in the first line > >>>>>> describing "JoinGroupRequest" > >>>>>> > >>>>>>> This can also help saving "version probing" cost on Streams as > >>> well. > >>>>>> > >>>>>> How does this relate to Kafka Streams "version probing" > >>> implementation? > >>>>>> How can we exploit the new `ProtocolVersion` in Streams to improve > >>>>>> "version probing" ? I have a rough idea, but would like to hear more > >>>>>> details. > >>>>>> > >>>>>> > >>>>>> > >>>>>> 60) Section "Recommended Upgrade Procedure" > >>>>>> > >>>>>>> Set the `stream.rebalancing.mode` to `upgrading`, which will force > >>>> the > >>>>>> stream application to stay with protocol type "consumer". > >>>>>> > >>>>>> This config is not discussed in the KIP and appears in this section > >>>>>> without context. Can you elaborate about it? > >>>>>> > >>>>>> > >>>>>> > >>>>>> -Matthias > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> On 3/29/19 6:20 PM, Guozhang Wang wrote: > >>>>>>> Bump up on this discussion thread. I've added a few new drawings > >>> for > >>>>>> better > >>>>>>> illustration, would really appreciate your feedbacks. > >>>>>>> > >>>>>>> > >>>>>>> Guozhang > >>>>>>> > >>>>>>> On Wed, Mar 20, 2019 at 6:17 PM Guozhang Wang <wangg...@gmail.com > >>>> > >>>>>> wrote: > >>>>>>> > >>>>>>>> Hello Boyang, > >>>>>>>> > >>>>>>>> I've made another thorough pass over this KIP and I'd like to > >>> spilt > >>>> it > >>>>>>>> into two parts: the first part, covered in KIP-429 would be > >>> touching > >>>>> on > >>>>>>>> Consumer Coordinator only to have incremental rebalance protocol > >>> in > >>>>>> place. > >>>>>>>> The second part (for now I've reserved KIP number 444 for it) > >>> would > >>>>>> contain > >>>>>>>> all the changes on StreamsPartitionAssginor to allow warming up > >>> new > >>>>>>>> members. > >>>>>>>> > >>>>>>>> I think the first part, a.k.a. the current updated KIP-429 is > >>> ready > >>>>> for > >>>>>>>> review and discussions again. Would love to hear people's > >>> feedbacks > >>>>> and > >>>>>>>> ideas. > >>>>>>>> > >>>>>>>> Guozhang > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> On Mon, Mar 4, 2019 at 10:09 AM Boyang Chen <bche...@outlook.com > >>>> > >>>>>> wrote: > >>>>>>>> > >>>>>>>>> Thanks Guozhang for the great questions. Answers are inlined: > >>>>>>>>> > >>>>>>>>> 1. I'm still not sure if it's worthwhile to add a new type of > >>>>> "learner > >>>>>>>>> task" in addition to "standby task": if the only difference is > >>> that > >>>>> for > >>>>>>>>> the > >>>>>>>>> latter, we would consider workload balance while for the former > >>> we > >>>>>> would > >>>>>>>>> not, I think we can just adjust the logic of StickyTaskAssignor > >>> a > >>>> bit > >>>>>> to > >>>>>>>>> break that difference. Adding a new type of task would be > >>> adding a > >>>>> lot > >>>>>> of > >>>>>>>>> code complexity, so if we can still piggy-back the logic on a > >>>>>> standby-task > >>>>>>>>> I would prefer to do so. > >>>>>>>>> In the proposal we stated that we are not adding a new type of > >>> task > >>>>>>>>> implementation. The > >>>>>>>>> learner task shall share the same implementation with normal > >>>> standby > >>>>>>>>> task, only that we > >>>>>>>>> shall tag the standby task with learner and prioritize the > >>> learner > >>>>>> tasks > >>>>>>>>> replay effort. > >>>>>>>>> 2. One thing that's still not clear from the KIP wiki itself is > >>>> which > >>>>>>>>> layer > >>>>>>>>> would the logic be implemented at. Although for most KIPs we > >>> would > >>>>> not > >>>>>>>>> require internal implementation details but only public facing > >>> API > >>>>>>>>> updates, > >>>>>>>>> for a KIP like this I think it still requires to flesh out > >>> details > >>>> on > >>>>>> the > >>>>>>>>> implementation design. More specifically: today Streams embed a > >>>> full > >>>>>>>>> fledged Consumer client, which hard-code a ConsumerCoordinator > >>>>> inside, > >>>>>>>>> Streams then injects a StreamsPartitionAssignor to its pluggable > >>>>>>>>> PartitionAssignor interface and inside the > >>> StreamsPartitionAssignor > >>>>> we > >>>>>>>>> also > >>>>>>>>> have a TaskAssignor interface whose default implementation is > >>>>>>>>> StickyPartitionAssignor. Streams partition assignor logic today > >>>> sites > >>>>>> in > >>>>>>>>> the latter two classes. Hence the hierarchy today is: > >>>>>>>>> > >>>>>>>>> KafkaConsumer -> ConsumerCoordinator -> > >>> StreamsPartitionAssignor -> > >>>>>>>>> StickyTaskAssignor. > >>>>>>>>> > >>>>>>>>> We need to think about where the proposed implementation would > >>> take > >>>>>> place > >>>>>>>>> at, and personally I think it is not the best option to inject > >>> all > >>>> of > >>>>>> them > >>>>>>>>> into the StreamsPartitionAssignor / StickyTaskAssignor since the > >>>>> logic > >>>>>> of > >>>>>>>>> "triggering another rebalance" etc would require some > >>> coordinator > >>>>> logic > >>>>>>>>> which is hard to mimic at PartitionAssignor level. On the other > >>>> hand, > >>>>>>>>> since > >>>>>>>>> we are embedding a KafkaConsumer client as a whole we cannot > >>> just > >>>>>> replace > >>>>>>>>> ConsumerCoordinator with a specialized StreamsCoordinator like > >>>>> Connect > >>>>>>>>> does > >>>>>>>>> in KIP-415. So I'd like to maybe split the current proposal in > >>> both > >>>>>>>>> consumer layer and streams-assignor layer like we did in > >>>>>> KIP-98/KIP-129. > >>>>>>>>> And then the key thing to consider is how to cut off the > >>> boundary > >>>> so > >>>>>> that > >>>>>>>>> the modifications we push to ConsumerCoordinator would be > >>>> beneficial > >>>>>>>>> universally for any consumers, while keep the Streams-specific > >>>> logic > >>>>> at > >>>>>>>>> the > >>>>>>>>> assignor level. > >>>>>>>>> Yes, that's also my ideal plan. The details for the > >>> implementation > >>>>> are > >>>>>>>>> depicted > >>>>>>>>> in this doc< > >>>>>>>>> > >>>>>> > >>>>> > >>>> > >>> > https://docs.google.com/document/d/1me2a5wvxAZT1QE6HkwyDl7C2TiBQlKN3Dpw_I1ro91U/edit#heading=h.qix74qdmekae > >>>>>>> , > >>>>>> [ > >>>>>> > >>>>> > >>>> > >>> > https://lh5.googleusercontent.com/DXWMyKNE9rFFIv7TNX56Q41QwqYp8ynivwWSJHHORqSRkoQxtraW2bqiB-NRUGAMYKkt8A=w1200-h630-p > >>>>>> ]< > >>>>>> > >>>>> > >>>> > >>> > https://docs.google.com/document/d/1me2a5wvxAZT1QE6HkwyDl7C2TiBQlKN3Dpw_I1ro91U/edit#heading=h.qix74qdmekae > >>>>>>> > >>>>>> > >>>>>> [External] KStream Smooth Auto-scaling Implementation Plan< > >>>>>> > >>>>> > >>>> > >>> > https://docs.google.com/document/d/1me2a5wvxAZT1QE6HkwyDl7C2TiBQlKN3Dpw_I1ro91U/edit#heading=h.qix74qdmekae > >>>>>>> > >>>>>> docs.google.com > >>>>>> KStream Incremental Rebalancing Implementation Plan Authors: Boyang > >>>> Chen, > >>>>>> Guozhang Wang KIP link Stage: [Draft | Review | Approved] > >>> Background We > >>>>>> initiated KIP-429 for the promotion of incremental rebalancing work > >>> for > >>>>>> KStream. Behind the scene, there is non-trivial amount of effort > >>> that > >>>>> needs > >>>>>> to... > >>>>>> > >>>>>> > >>>>>> > >>>>>>>>> and I have explained the reasoning on why we want to push a > >>>>>>>>> global change of replacing ConsumerCoordinator with > >>>>> StreamCoordinator. > >>>>>>>>> The motivation > >>>>>>>>> is that KIP space is usually used for public & algorithm level > >>>>> change, > >>>>>>>>> not for internal > >>>>>>>>> implementation details. > >>>>>>>>> > >>>>>>>>> 3. Depending on which design direction we choose, our migration > >>>> plan > >>>>>> would > >>>>>>>>> also be quite different. For example, if we stay with > >>>>>> ConsumerCoordinator > >>>>>>>>> whose protocol type is "consumer" still, and we can manage to > >>> make > >>>>> all > >>>>>>>>> changes agnostic to brokers as well as to old versioned > >>> consumers, > >>>>> then > >>>>>>>>> our > >>>>>>>>> migration plan could be much easier. > >>>>>>>>> Yes, the upgrade plan was designed to take the new > >>>> StreamCoordinator > >>>>>>>>> approach > >>>>>>>>> which means we shall define a new protocol type. For existing > >>>>>> application > >>>>>>>>> we could only > >>>>>>>>> maintain the same `consumer` protocol type is because current > >>>> broker > >>>>>> only > >>>>>>>>> allows > >>>>>>>>> change of protocol type when the consumer group is empty. It is > >>> of > >>>>>> course > >>>>>>>>> user-unfriendly to force > >>>>>>>>> a wipe-out for the entire application, and I don't think > >>>> maintaining > >>>>>> old > >>>>>>>>> protocol type would greatly > >>>>>>>>> impact ongoing services using new stream coordinator. WDYT? > >>>>>>>>> > >>>>>>>>> 4. I think one major issue related to this KIP is that today, in > >>>> the > >>>>>>>>> StickyPartitionAssignor, we always try to honor stickiness over > >>>>>> workload > >>>>>>>>> balance, and hence "learner task" is needed to break this > >>> priority, > >>>>> but > >>>>>>>>> I'm > >>>>>>>>> wondering if we can have a better solution within sticky task > >>>>> assignor > >>>>>>>>> that > >>>>>>>>> accommodate this? > >>>>>>>>> Great question! That's what I explained in the proposal, which > >>> is > >>>>> that > >>>>>> we > >>>>>>>>> should breakdown our > >>>>>>>>> delivery into different stages. At very beginning, our goal is > >>> to > >>>>>> trigger > >>>>>>>>> learner task assignment only on > >>>>>>>>> `new` hosts, where we shall leverage leader's knowledge of > >>> previous > >>>>>> round > >>>>>>>>> of rebalance to figure out. After > >>>>>>>>> stage one, our goal is to have a smooth scaling up experience, > >>> but > >>>>> the > >>>>>>>>> task balance problem is kind of orthogonal. > >>>>>>>>> The load balance problem is a much broader topic than auto > >>> scaling, > >>>>>> which > >>>>>>>>> I figure worth discussing within > >>>>>>>>> this KIP's context since it's a naturally next-step, but > >>> wouldn't > >>>> be > >>>>>> the > >>>>>>>>> main topic. > >>>>>>>>> Learner task or auto scaling support should be treated as `a > >>>> helpful > >>>>>>>>> mechanism to reach load balance`, but not `an algorithm defining > >>>> load > >>>>>>>>> balance`. It would be great if you could share some insights of > >>> the > >>>>>> stream > >>>>>>>>> task balance, which eventually helps us to break out of the > >>>> KIP-429's > >>>>>> scope > >>>>>>>>> and even define a separate KIP to focus on task weight & > >>> assignment > >>>>>> logic > >>>>>>>>> improvement. > >>>>>>>>> > >>>>>>>>> Also thank you for making improvement on the KIP context and > >>>>>> organization! > >>>>>>>>> > >>>>>>>>> Best, > >>>>>>>>> Boyang > >>>>>>>>> ________________________________ > >>>>>>>>> From: Guozhang Wang <wangg...@gmail.com> > >>>>>>>>> Sent: Saturday, March 2, 2019 6:00 AM > >>>>>>>>> To: dev > >>>>>>>>> Subject: Re: [DISCUSS] KIP-429 : Smooth Auto-Scaling for Kafka > >>>>> Streams > >>>>>>>>> > >>>>>>>>> Hello Boyang, > >>>>>>>>> > >>>>>>>>> I've just made a quick pass on the KIP and here are some > >>> thoughts. > >>>>>>>>> > >>>>>>>>> Meta: > >>>>>>>>> > >>>>>>>>> 1. I'm still not sure if it's worthwhile to add a new type of > >>>>> "learner > >>>>>>>>> task" in addition to "standby task": if the only difference is > >>> that > >>>>> for > >>>>>>>>> the > >>>>>>>>> latter, we would consider workload balance while for the former > >>> we > >>>>>> would > >>>>>>>>> not, I think we can just adjust the logic of StickyTaskAssignor > >>> a > >>>> bit > >>>>>> to > >>>>>>>>> break that difference. Adding a new type of task would be > >>> adding a > >>>>> lot > >>>>>> of > >>>>>>>>> code complexity, so if we can still piggy-back the logic on a > >>>>>> standby-task > >>>>>>>>> I would prefer to do so. > >>>>>>>>> > >>>>>>>>> 2. One thing that's still not clear from the KIP wiki itself is > >>>> which > >>>>>>>>> layer > >>>>>>>>> would the logic be implemented at. Although for most KIPs we > >>> would > >>>>> not > >>>>>>>>> require internal implementation details but only public facing > >>> API > >>>>>>>>> updates, > >>>>>>>>> for a KIP like this I think it still requires to flesh out > >>> details > >>>> on > >>>>>> the > >>>>>>>>> implementation design. More specifically: today Streams embed a > >>>> full > >>>>>>>>> fledged Consumer client, which hard-code a ConsumerCoordinator > >>>>> inside, > >>>>>>>>> Streams then injects a StreamsPartitionAssignor to its plugable > >>>>>>>>> PartitionAssignor interface and inside the > >>> StreamsPartitionAssignor > >>>>> we > >>>>>>>>> also > >>>>>>>>> have a TaskAssignor interface whose default implementation is > >>>>>>>>> StickyPartitionAssignor. Streams partition assignor logic today > >>>> sites > >>>>>> in > >>>>>>>>> the latter two classes. Hence the hierarchy today is: > >>>>>>>>> > >>>>>>>>> KafkaConsumer -> ConsumerCoordinator -> > >>> StreamsPartitionAssignor -> > >>>>>>>>> StickyTaskAssignor. > >>>>>>>>> > >>>>>>>>> We need to think about where the proposed implementation would > >>> take > >>>>>> place > >>>>>>>>> at, and personally I think it is not the best option to inject > >>> all > >>>> of > >>>>>> them > >>>>>>>>> into the StreamsPartitionAssignor / StickyTaskAssignor since the > >>>>> logic > >>>>>> of > >>>>>>>>> "triggering another rebalance" etc would require some > >>> coordinator > >>>>> logic > >>>>>>>>> which is hard to mimic at PartitionAssignor level. On the other > >>>> hand, > >>>>>>>>> since > >>>>>>>>> we are embedding a KafkaConsumer client as a whole we cannot > >>> just > >>>>>> replace > >>>>>>>>> ConsumerCoordinator with a specialized StreamsCoordinator like > >>>>> Connect > >>>>>>>>> does > >>>>>>>>> in KIP-415. So I'd like to maybe split the current proposal in > >>> both > >>>>>>>>> consumer layer and streams-assignor layer like we did in > >>>>>> KIP-98/KIP-129. > >>>>>>>>> And then the key thing to consider is how to cut off the > >>> boundary > >>>> so > >>>>>> that > >>>>>>>>> the modifications we push to ConsumerCoordinator would be > >>>> beneficial > >>>>>>>>> universally for any consumers, while keep the Streams-specific > >>>> logic > >>>>> at > >>>>>>>>> the > >>>>>>>>> assignor level. > >>>>>>>>> > >>>>>>>>> 3. Depending on which design direction we choose, our migration > >>>> plan > >>>>>> would > >>>>>>>>> also be quite different. For example, if we stay with > >>>>>> ConsumerCoordinator > >>>>>>>>> whose protocol type is "consumer" still, and we can manage to > >>> make > >>>>> all > >>>>>>>>> changes agnostic to brokers as well as to old versioned > >>> consumers, > >>>>> then > >>>>>>>>> our > >>>>>>>>> migration plan could be much easier. > >>>>>>>>> > >>>>>>>>> 4. I think one major issue related to this KIP is that today, in > >>>> the > >>>>>>>>> StickyPartitionAssignor, we always try to honor stickiness over > >>>>>> workload > >>>>>>>>> balance, and hence "learner task" is needed to break this > >>> priority, > >>>>> but > >>>>>>>>> I'm > >>>>>>>>> wondering if we can have a better solution within sticky task > >>>>> assignor > >>>>>>>>> that > >>>>>>>>> accommodate this? > >>>>>>>>> > >>>>>>>>> Minor: > >>>>>>>>> > >>>>>>>>> 1. The idea of two rebalances have also been discussed in > >>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-6145. So we should > >>> add > >>>>> the > >>>>>>>>> reference on the wiki page as well. > >>>>>>>>> 2. Could you also add a section describing how the subscription > >>> / > >>>>>>>>> assignment metadata will be re-formatted? Without this > >>> information > >>>> it > >>>>>> is > >>>>>>>>> hard to get to the bottom of your idea. For example in the > >>> "Leader > >>>>>>>>> Transfer > >>>>>>>>> Before Scaling" section, I'm not sure why "S2 doesn't know S4 is > >>>> new > >>>>>>>>> member" > >>>>>>>>> and hence would blindly obey stickiness over workload balance > >>>>>> requirement. > >>>>>>>>> > >>>>>>>>> Guozhang > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On Thu, Feb 28, 2019 at 11:05 AM Boyang Chen < > >>> bche...@outlook.com> > >>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> Hey community friends, > >>>>>>>>>> > >>>>>>>>>> I'm gladly inviting you to have a look at the proposal to add > >>>>>>>>> incremental > >>>>>>>>>> rebalancing to Kafka Streams, A.K.A auto-scaling support. > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>> > >>>>> > >>>> > >>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Smooth+Auto-Scaling+for+Kafka+Streams > >>>>>>>>>> > >>>>>>>>>> Special thanks to Guozhang for giving great guidances and > >>>> important > >>>>>>>>>> feedbacks while making this KIP! > >>>>>>>>>> > >>>>>>>>>> Best, > >>>>>>>>>> Boyang > >>>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> -- > >>>>>>>>> -- Guozhang > >>>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> -- > >>>>>>>> -- Guozhang > >>>>>>>> > >>>>>>> > >>>>>>> > >>>>>> > >>>>>> > >>>>> > >>>>> -- > >>>>> -- Guozhang > >>>>> > >>>> > >>> > >>> > >>> -- > >>> -- Guozhang > >>> > >> > >> > >> -- > >> -- Guozhang > >> > > > > > > -- -- Guozhang