SGTM. I think we should still document the behavior on the KIP and explain why it's implemented that way.
-Matthias On 5/22/19 7:04 PM, Guozhang Wang wrote: > Hello Matthias, > > I've thought about that before, and the reason I did not include this as > part of the KIP-429 scope is that fetcher / coordinator may get quite > complicated to return non-empty data if "updateAssignmentMetadataIfNeeded" > returns false in KafkaConsumer. In addition, when there's a rebalance in > progress, letting consumers to process data which potentially may take > longer time (in Streams for example, it is related to `max.poll.interval` > config) could lead to higher chance of "partitions lost" and wasted > processing work. > > So I've decided to still keep it as simple as is today, and admittedly from > a user perspective, they may see consecutive "poll" call returning no data. > I will create a JIRA ticket capturing this idea for future discussion > whether we should consider this as a general optimization in consumer. Does > that sound good to you? > > > Guozhang > > > On Wed, May 22, 2019 at 4:31 AM Matthias J. Sax <matth...@confluent.io> > wrote: > >> Thanks to the KIP and starting the VOTE Guozhang. I am overall +1. >> >> One follow up thought: the KIP does not discuss in details, how `poll()` >> will behave after the change. It might actually be important to ensure >> that `poll()` behavior changes to be non-blocking to allow an >> application to process data from non-revoked partitions while a >> rebalance is happening in the background. >> >> Thoughts? >> >> >> -Matthias >> >> >> On 5/10/19 1:10 AM, Guozhang Wang wrote: >>> Hello Matthias, >>> >>> I'm proposing to change this behavior holistically inside >>> ConsumerCoordinator actually. In other words I'm trying to piggy-back >> this >>> behavioral fix of KAFKA-4600 along with this KIP, and the motivation for >> me >>> to do this piggy-backing is that, with incremental rebalancing, there >> would >>> be partial affected partitions as we are not revoking every body any >> more. >>> >>> >>> Guozhang >>> >>> >>> On Thu, May 9, 2019 at 6:21 AM Matthias J. Sax <matth...@confluent.io> >>> wrote: >>> >>>> Thanks Guozhang! >>>> >>>> The simplified upgrade path is great! >>>> >>>> >>>> Just a clarification question about the "Rebalance Callback Error >>>> Handling" -- does this change affect the `ConsumerCoordinator` only if >>>> incremental rebalancing is use? Or does the behavior also change for the >>>> eager rebalancing case? >>>> >>>> >>>> -Matthias >>>> >>>> >>>> On 5/9/19 3:37 AM, Guozhang Wang wrote: >>>>> Hello all, >>>>> >>>>> Thanks for everyone who've shared their feedbacks for this KIP! If >>>> there's >>>>> no further comments I'll start the voting thread by end of tomorrow. >>>>> >>>>> >>>>> Guozhang. >>>>> >>>>> On Wed, May 8, 2019 at 6:36 PM Guozhang Wang <wangg...@gmail.com> >> wrote: >>>>> >>>>>> Hello Boyang, >>>>>> >>>>>> On Wed, May 1, 2019 at 4:51 PM Boyang Chen <bche...@outlook.com> >> wrote: >>>>>> >>>>>>> Hey Guozhang, >>>>>>> >>>>>>> thank you for the great write up. Overall the motivation and changes >>>>>>> LGTM, just some minor comments: >>>>>>> >>>>>>> >>>>>>> 1. In "Consumer Coordinator Algorithm", we could reorder alphabet >>>>>>> points for 3d~3f from ["ready-to-migrate-partitions", >>>>>>> "unknown-but-owned-partitions", "maybe-revoking-partitions"] to >>>>>>> ["maybe-revoking-partitions", "ready-to-migrate-partitions", >>>>>>> "unknown-but-owned-partitions"] in order to be consistent with 3c1~3. >>>>>>> >>>>>> >>>>>> Ack. Updated. >>>>>> >>>>>> >>>>>>> 2. In "Consumer Coordinator Algorithm", 1c suggests to revoke all >>>>>>> partition upon heartbeat/commit fail. What's the gain here? Do we >> want >>>> to >>>>>>> keep all partitions running at this moment, to be optimistic for the >>>> case >>>>>>> when no partitions get reassigned? >>>>>>> >>>>>> >>>>>> That's a good catch. When REBALANCE_IN_PROGRESS is received, we can >> just >>>>>> re-join the group with all the currently owned partitions encoded. >>>> Updated. >>>>>> >>>>>> >>>>>>> 3. In "Recommended Upgrade Procedure", remove extra 'those': " The >>>>>>> 'sticky' assignor works even those there are " >>>>>>> >>>>>> >>>>>> Ack, should be `even when`. >>>>>> >>>>>> >>>>>>> 4. Put two "looking into the future" into a separate category from >>>>>>> migration session. It seems inconsistent for readers to see this >>>> before we >>>>>>> finished discussion for everything. >>>>>>> >>>>>> >>>>>> Ack. >>>>>> >>>>>> >>>>>>> 5. Have we discussed the concern on the serialization? Could the >> new >>>>>>> metadata we are adding grow larger than the message size cap? >>>>>>> >>>>>> >>>>>> We're completing https://issues.apache.org/jira/browse/KAFKA-7149 >> which >>>>>> should largely reduce the message size (will update the wiki >>>> accordingly as >>>>>> well). >>>>>> >>>>>> >>>>>>> >>>>>>> Boyang >>>>>>> >>>>>>> ________________________________ >>>>>>> From: Guozhang Wang <wangg...@gmail.com> >>>>>>> Sent: Monday, April 15, 2019 9:20 AM >>>>>>> To: dev >>>>>>> Subject: Re: [DISCUSS] KIP-429 : Smooth Auto-Scaling for Kafka >> Streams >>>>>>> >>>>>>> Hello Jason, >>>>>>> >>>>>>> I agree with you that for range / round-robin it makes less sense to >> be >>>>>>> compatible with cooperative rebalance protocol. >>>>>>> >>>>>>> As for StickyAssignor, however, I think it would still be possible to >>>> make >>>>>>> the current implementation to be compatible with cooperative >>>> rebalance. So >>>>>>> after pondering on different options at hand I'm now proposing this >>>>>>> approach as listed in the upgrade section: >>>>>>> >>>>>>> >>>>>>> >>>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol#KIP-429:KafkaConsumerIncrementalRebalanceProtocol-CompatibilityandUpgradePath >>>>>>> >>>>>>> The idea is to let assignors specify which protocols it would work >>>> with, >>>>>>> associating with a different name; then the upgrade path would >> involve >>>> a >>>>>>> "compatible" protocol which actually still use eager behavior while >>>>>>> encoding two assignors if possible. In "Rejected Section" (just to >>>> clarify >>>>>>> I'm not finalizing it as rejected, just putting it there for now, and >>>> if >>>>>>> we >>>>>>> like this one instead we can always switch them) I listed the other >>>>>>> approach we once discussed about, and arguing its cons of duplicated >>>> class >>>>>>> seems overwhelm the pros of saving the "rebalance.protocol" config. >>>>>>> >>>>>>> Let me know WDYT. >>>>>>> >>>>>>> Guozhang >>>>>>> >>>>>>> On Fri, Apr 12, 2019 at 6:08 PM Jason Gustafson <ja...@confluent.io> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Guozhang, >>>>>>>> >>>>>>>> Responses below: >>>>>>>> >>>>>>>> 2. The interface's default implementation will just be >>>>>>>>> `onPartitionRevoked`, so for user's instantiation if they do not >> make >>>>>>> any >>>>>>>>> code changes they should be able to recompile the code and >> continue. >>>>>>>> >>>>>>>> >>>>>>>> Ack, makes sense. >>>>>>>> >>>>>>>> 4. Hmm.. not sure if it will work. The main issue is that the >>>>>>>>> consumer-coordinator behavior (whether to revoke all or none at >>>>>>>>> onRebalancePrepare) is independent of the selected protocol's >>>> assignor >>>>>>>>> (eager or cooperative), so even if the assignor is selected to be >> the >>>>>>>>> old-versioned one, we will still not revoke at the >>>>>>> consumer-coordinator >>>>>>>>> layer and hence has the same risk of migrating still-owned >>>> partitions, >>>>>>>>> right? >>>>>>>> >>>>>>>> >>>>>>>> Yeah, basically we would have to push the eager/cooperative logic >> into >>>>>>> the >>>>>>>> PartitionAssignor itself and make the consumer aware of the >> rebalance >>>>>>>> protocol it is compatible with. As long as an eager protocol _could_ >>>> be >>>>>>>> selected, the consumer would have to be pessimistic and do eager >>>>>>>> revocation. But if all the assignors configured in the consumer >>>> support >>>>>>>> cooperative reassignment, then either 1) a cooperative protocol will >>>> be >>>>>>>> selected and cooperative revocation can be safely used, or 2) if the >>>>>>> rest >>>>>>>> of the group does not support it, then the consumer will simply >> fail. >>>>>>>> >>>>>>>> Another point which you raised offline and I will repeat here is >> that >>>>>>> this >>>>>>>> proposal's benefit is mostly limited to sticky assignment logic. >>>>>>> Arguably >>>>>>>> the range assignor may have some incidental stickiness, particularly >>>> if >>>>>>> the >>>>>>>> group is rebalancing for a newly created or deleted topic. For other >>>>>>> cases, >>>>>>>> the proposal is mostly additional overhead since it takes an >>>> additional >>>>>>>> rebalance and many of the partitions will move. Perhaps it doesn't >>>> make >>>>>>> as >>>>>>>> much sense to use the cooperative protocol for strategies like range >>>> and >>>>>>>> round-robin. That kind of argues in favor of pushing some of the >>>> control >>>>>>>> into the assignor itself. Maybe we would not bother creating >>>>>>>> CooperativeRange as I suggested above, but it would make sense to >>>>>>> create a >>>>>>>> cooperative version of the sticky assignment strategy. I thought we >>>>>>> might >>>>>>>> have to create a new sticky assignor anyway because I can't see how >> we >>>>>>>> would get compatible behavior mixing with the old version anyway. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Jason >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Apr 11, 2019 at 5:53 PM Guozhang Wang <wangg...@gmail.com> >>>>>>> wrote: >>>>>>>> >>>>>>>>> Hello Matthias: >>>>>>>>> >>>>>>>>> Thanks for your review. >>>>>>>>> >>>>>>>>> The background section uses streams assignor as well as the >>>> consumer's >>>>>>>> own >>>>>>>>> stick assignor as examples illustrating the situation, but this KIP >>>> is >>>>>>>> for >>>>>>>>> consumer coordinator itself, and the rest of the paragraph did not >>>>>>> talk >>>>>>>>> about Streams any more. If you feel it's a bit distracted I can >>>> remove >>>>>>>>> those examples. >>>>>>>>> >>>>>>>>> 10). While working on the PR I realized that the revoked partitions >>>> on >>>>>>>>> assignment is not needed (this is being discussed on the PR itself: >>>>>>>>> https://github.com/apache/kafka/pull/6528#issuecomment-480009890 >>>>>>>>> >>>>>>>>> 20). 1.a. Good question, I've updated the wiki to let the >> consumer's >>>>>>>>> cleanup assignment and re-join, and not letting assignor making any >>>>>>>>> proactive changes. The idea is to keep logic simpler and not doing >>>> any >>>>>>>>> "split brain" stuff. >>>>>>>>> >>>>>>>>> 20). 2.b. No we do not need, since the owned-partitions will be >> part >>>>>>> of >>>>>>>> the >>>>>>>>> Subscription passed in to assign() already. >>>>>>>>> >>>>>>>>> 30). As Boyang mentioned, there are some drawbacks that can not be >>>>>>>>> addressed by rebalance delay still, hence still voted KIP-345 (some >>>>>>> more >>>>>>>>> details can be found on the discussion thread of KIP-345 itself). >> One >>>>>>>>> example is that as the instance resumes, its member id will be >> empty >>>>>>> so >>>>>>>> we >>>>>>>>> are still relying on assignor to give it the assignment from the >> old >>>>>>>>> member-id while keeping all other member's assignment unchanged. >>>>>>>>> >>>>>>>>> 40). Incomplete sentence, I've updated it. >>>>>>>>> >>>>>>>>> 50). Here's my idea: suppose we augment the join group schema with >>>>>>>>> `protocol version` in 2.3, and then with both brokers and clients >>>>>>> being >>>>>>>> in >>>>>>>>> version 2.3+, on the first rolling bounce where subscription and >>>>>>>> assignment >>>>>>>>> schema and / or user metadata has changed, this protocol version >> will >>>>>>> be >>>>>>>>> bumped. On the broker side, when receiving all member's join-group >>>>>>>> request, >>>>>>>>> it will choose the one that has the highest protocol version (also >> it >>>>>>>>> assumes higher versioned protocol is always backward compatible, >> i.e. >>>>>>> the >>>>>>>>> coordinator can recognize lower versioned protocol as well) and >>>>>>> select it >>>>>>>>> as the leader. Then the leader can decide, based on its received >> and >>>>>>>>> deserialized subscription information, how to assign partitions and >>>>>>> how >>>>>>>> to >>>>>>>>> encode the assignment accordingly so that everyone can understand >> it. >>>>>>>> With >>>>>>>>> this, in Streams for example, no version probing would be needed >>>>>>> since we >>>>>>>>> are guaranteed the leader knows everyone's version -- again it is >>>>>>>> assuming >>>>>>>>> that higher versioned protocol is always backward compatible -- and >>>>>>> hence >>>>>>>>> can successfully do the assignment at that round. >>>>>>>>> >>>>>>>>> 60). My bad, this section was not updated while the design was >>>>>>> evolved, >>>>>>>>> I've updated it. >>>>>>>>> >>>>>>>>> >>>>>>>>> On Tue, Apr 9, 2019 at 7:22 PM Boyang Chen <bche...@outlook.com> >>>>>>> wrote: >>>>>>>>> >>>>>>>>>> >>>>>>>>>> Thanks for the review Matthias! My 2-cent on the rebalance delay >> is >>>>>>>> that >>>>>>>>>> it is a rather fixed trade-off between >>>>>>>>>> >>>>>>>>>> task availability and resource shuffling. If we eventually trigger >>>>>>>>>> rebalance after rolling bounce, certain consumer >>>>>>>>>> >>>>>>>>>> setup is still faced with global shuffles, for example member.id >>>>>>>> ranking >>>>>>>>>> based round robin strategy, as rejoining dynamic >>>>>>>>>> >>>>>>>>>> members will be assigned with new member.id which reorders the >>>>>>>>>> assignment. So I think the primary goal of incremental >>>>>>>>>> >>>>>>>>>> rebalancing is still improving the cluster availability during >>>>>>>> rebalance, >>>>>>>>>> because it didn't revoke any partition during this >>>>>>>>>> >>>>>>>>>> process. Also, the perk is minimum configuration requirement :) >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> >>>>>>>>>> Boyang >>>>>>>>>> >>>>>>>>>> ________________________________ >>>>>>>>>> From: Matthias J. Sax <matth...@confluent.io> >>>>>>>>>> Sent: Tuesday, April 9, 2019 7:47 AM >>>>>>>>>> To: dev >>>>>>>>>> Subject: Re: [DISCUSS] KIP-429 : Smooth Auto-Scaling for Kafka >>>>>>> Streams >>>>>>>>>> >>>>>>>>>> Thank for the KIP, Boyang and Guozhang! >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> I made an initial pass and have some questions/comments. One high >>>>>>> level >>>>>>>>>> comment: it seems that the KIP "mixes" plain consumer and Kafka >>>>>>> Streams >>>>>>>>>> use case a little bit (at least in the presentation). It might be >>>>>>>>>> helpful to separate both cases clearly, or maybe limit the scope >> to >>>>>>>>>> plain consumer only. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> 10) For `PartitionAssignor.Assignment`: It seems we need a new >>>>>>> method >>>>>>>>>> `List<TopicPartitions> revokedPartitions()` ? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> 20) In Section "Consumer Coordinator Algorithm" >>>>>>>>>> >>>>>>>>>> Bullet point "1a)": If the subscription changes and a topic is >>>>>>>>>> removed from the subscription, why do we not revoke the >> partitions? >>>>>>>>>> >>>>>>>>>> Bullet point "1a)": What happens is a topic is deleted (or a >>>>>>>>>> partition is removed/deleted from a topic)? Should we call the new >>>>>>>>>> `onPartitionsEmigrated()` callback for this case? >>>>>>>>>> >>>>>>>>>> Bullet point "2b)" Should we update the `PartitionAssignor` >>>>>>>>>> interface to pass in the "old assignment" as third parameter into >>>>>>>>>> `assign()`? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> 30) Rebalance delay (as used in KIP-415): Could a rebalance delay >>>>>>>>>> subsume KIP-345? Configuring static members is rather complicated, >>>>>>> and >>>>>>>> I >>>>>>>>>> am wondering if a rebalance delay would be sufficient? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> 40) Quote: "otherwise the we would fall into the case 3.b) >> forever." >>>>>>>>>> >>>>>>>>>> What is "case 3.b" ? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> 50) Section "Looking into the Future" >>>>>>>>>> >>>>>>>>>> Nit: the new "ProtocolVersion" field is missing in the first line >>>>>>>>>> describing "JoinGroupRequest" >>>>>>>>>> >>>>>>>>>>> This can also help saving "version probing" cost on Streams as >>>>>>> well. >>>>>>>>>> >>>>>>>>>> How does this relate to Kafka Streams "version probing" >>>>>>> implementation? >>>>>>>>>> How can we exploit the new `ProtocolVersion` in Streams to improve >>>>>>>>>> "version probing" ? I have a rough idea, but would like to hear >> more >>>>>>>>>> details. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> 60) Section "Recommended Upgrade Procedure" >>>>>>>>>> >>>>>>>>>>> Set the `stream.rebalancing.mode` to `upgrading`, which will >> force >>>>>>>> the >>>>>>>>>> stream application to stay with protocol type "consumer". >>>>>>>>>> >>>>>>>>>> This config is not discussed in the KIP and appears in this >> section >>>>>>>>>> without context. Can you elaborate about it? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -Matthias >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On 3/29/19 6:20 PM, Guozhang Wang wrote: >>>>>>>>>>> Bump up on this discussion thread. I've added a few new drawings >>>>>>> for >>>>>>>>>> better >>>>>>>>>>> illustration, would really appreciate your feedbacks. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Guozhang >>>>>>>>>>> >>>>>>>>>>> On Wed, Mar 20, 2019 at 6:17 PM Guozhang Wang < >> wangg...@gmail.com >>>>>>>> >>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hello Boyang, >>>>>>>>>>>> >>>>>>>>>>>> I've made another thorough pass over this KIP and I'd like to >>>>>>> spilt >>>>>>>> it >>>>>>>>>>>> into two parts: the first part, covered in KIP-429 would be >>>>>>> touching >>>>>>>>> on >>>>>>>>>>>> Consumer Coordinator only to have incremental rebalance protocol >>>>>>> in >>>>>>>>>> place. >>>>>>>>>>>> The second part (for now I've reserved KIP number 444 for it) >>>>>>> would >>>>>>>>>> contain >>>>>>>>>>>> all the changes on StreamsPartitionAssginor to allow warming up >>>>>>> new >>>>>>>>>>>> members. >>>>>>>>>>>> >>>>>>>>>>>> I think the first part, a.k.a. the current updated KIP-429 is >>>>>>> ready >>>>>>>>> for >>>>>>>>>>>> review and discussions again. Would love to hear people's >>>>>>> feedbacks >>>>>>>>> and >>>>>>>>>>>> ideas. >>>>>>>>>>>> >>>>>>>>>>>> Guozhang >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Mon, Mar 4, 2019 at 10:09 AM Boyang Chen < >> bche...@outlook.com >>>>>>>> >>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Thanks Guozhang for the great questions. Answers are inlined: >>>>>>>>>>>>> >>>>>>>>>>>>> 1. I'm still not sure if it's worthwhile to add a new type of >>>>>>>>> "learner >>>>>>>>>>>>> task" in addition to "standby task": if the only difference is >>>>>>> that >>>>>>>>> for >>>>>>>>>>>>> the >>>>>>>>>>>>> latter, we would consider workload balance while for the former >>>>>>> we >>>>>>>>>> would >>>>>>>>>>>>> not, I think we can just adjust the logic of StickyTaskAssignor >>>>>>> a >>>>>>>> bit >>>>>>>>>> to >>>>>>>>>>>>> break that difference. Adding a new type of task would be >>>>>>> adding a >>>>>>>>> lot >>>>>>>>>> of >>>>>>>>>>>>> code complexity, so if we can still piggy-back the logic on a >>>>>>>>>> standby-task >>>>>>>>>>>>> I would prefer to do so. >>>>>>>>>>>>> In the proposal we stated that we are not adding a new type of >>>>>>> task >>>>>>>>>>>>> implementation. The >>>>>>>>>>>>> learner task shall share the same implementation with normal >>>>>>>> standby >>>>>>>>>>>>> task, only that we >>>>>>>>>>>>> shall tag the standby task with learner and prioritize the >>>>>>> learner >>>>>>>>>> tasks >>>>>>>>>>>>> replay effort. >>>>>>>>>>>>> 2. One thing that's still not clear from the KIP wiki itself is >>>>>>>> which >>>>>>>>>>>>> layer >>>>>>>>>>>>> would the logic be implemented at. Although for most KIPs we >>>>>>> would >>>>>>>>> not >>>>>>>>>>>>> require internal implementation details but only public facing >>>>>>> API >>>>>>>>>>>>> updates, >>>>>>>>>>>>> for a KIP like this I think it still requires to flesh out >>>>>>> details >>>>>>>> on >>>>>>>>>> the >>>>>>>>>>>>> implementation design. More specifically: today Streams embed a >>>>>>>> full >>>>>>>>>>>>> fledged Consumer client, which hard-code a ConsumerCoordinator >>>>>>>>> inside, >>>>>>>>>>>>> Streams then injects a StreamsPartitionAssignor to its >> pluggable >>>>>>>>>>>>> PartitionAssignor interface and inside the >>>>>>> StreamsPartitionAssignor >>>>>>>>> we >>>>>>>>>>>>> also >>>>>>>>>>>>> have a TaskAssignor interface whose default implementation is >>>>>>>>>>>>> StickyPartitionAssignor. Streams partition assignor logic today >>>>>>>> sites >>>>>>>>>> in >>>>>>>>>>>>> the latter two classes. Hence the hierarchy today is: >>>>>>>>>>>>> >>>>>>>>>>>>> KafkaConsumer -> ConsumerCoordinator -> >>>>>>> StreamsPartitionAssignor -> >>>>>>>>>>>>> StickyTaskAssignor. >>>>>>>>>>>>> >>>>>>>>>>>>> We need to think about where the proposed implementation would >>>>>>> take >>>>>>>>>> place >>>>>>>>>>>>> at, and personally I think it is not the best option to inject >>>>>>> all >>>>>>>> of >>>>>>>>>> them >>>>>>>>>>>>> into the StreamsPartitionAssignor / StickyTaskAssignor since >> the >>>>>>>>> logic >>>>>>>>>> of >>>>>>>>>>>>> "triggering another rebalance" etc would require some >>>>>>> coordinator >>>>>>>>> logic >>>>>>>>>>>>> which is hard to mimic at PartitionAssignor level. On the other >>>>>>>> hand, >>>>>>>>>>>>> since >>>>>>>>>>>>> we are embedding a KafkaConsumer client as a whole we cannot >>>>>>> just >>>>>>>>>> replace >>>>>>>>>>>>> ConsumerCoordinator with a specialized StreamsCoordinator like >>>>>>>>> Connect >>>>>>>>>>>>> does >>>>>>>>>>>>> in KIP-415. So I'd like to maybe split the current proposal in >>>>>>> both >>>>>>>>>>>>> consumer layer and streams-assignor layer like we did in >>>>>>>>>> KIP-98/KIP-129. >>>>>>>>>>>>> And then the key thing to consider is how to cut off the >>>>>>> boundary >>>>>>>> so >>>>>>>>>> that >>>>>>>>>>>>> the modifications we push to ConsumerCoordinator would be >>>>>>>> beneficial >>>>>>>>>>>>> universally for any consumers, while keep the Streams-specific >>>>>>>> logic >>>>>>>>> at >>>>>>>>>>>>> the >>>>>>>>>>>>> assignor level. >>>>>>>>>>>>> Yes, that's also my ideal plan. The details for the >>>>>>> implementation >>>>>>>>> are >>>>>>>>>>>>> depicted >>>>>>>>>>>>> in this doc< >>>>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>> >> https://docs.google.com/document/d/1me2a5wvxAZT1QE6HkwyDl7C2TiBQlKN3Dpw_I1ro91U/edit#heading=h.qix74qdmekae >>>>>>>>>>> , >>>>>>>>>> [ >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>> >> https://lh5.googleusercontent.com/DXWMyKNE9rFFIv7TNX56Q41QwqYp8ynivwWSJHHORqSRkoQxtraW2bqiB-NRUGAMYKkt8A=w1200-h630-p >>>>>>>>>> ]< >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>> >> https://docs.google.com/document/d/1me2a5wvxAZT1QE6HkwyDl7C2TiBQlKN3Dpw_I1ro91U/edit#heading=h.qix74qdmekae >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> [External] KStream Smooth Auto-scaling Implementation Plan< >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>> >> https://docs.google.com/document/d/1me2a5wvxAZT1QE6HkwyDl7C2TiBQlKN3Dpw_I1ro91U/edit#heading=h.qix74qdmekae >>>>>>>>>>> >>>>>>>>>> docs.google.com >>>>>>>>>> KStream Incremental Rebalancing Implementation Plan Authors: >> Boyang >>>>>>>> Chen, >>>>>>>>>> Guozhang Wang KIP link Stage: [Draft | Review | Approved] >>>>>>> Background We >>>>>>>>>> initiated KIP-429 for the promotion of incremental rebalancing >> work >>>>>>> for >>>>>>>>>> KStream. Behind the scene, there is non-trivial amount of effort >>>>>>> that >>>>>>>>> needs >>>>>>>>>> to... >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>>>>> and I have explained the reasoning on why we want to push a >>>>>>>>>>>>> global change of replacing ConsumerCoordinator with >>>>>>>>> StreamCoordinator. >>>>>>>>>>>>> The motivation >>>>>>>>>>>>> is that KIP space is usually used for public & algorithm level >>>>>>>>> change, >>>>>>>>>>>>> not for internal >>>>>>>>>>>>> implementation details. >>>>>>>>>>>>> >>>>>>>>>>>>> 3. Depending on which design direction we choose, our migration >>>>>>>> plan >>>>>>>>>> would >>>>>>>>>>>>> also be quite different. For example, if we stay with >>>>>>>>>> ConsumerCoordinator >>>>>>>>>>>>> whose protocol type is "consumer" still, and we can manage to >>>>>>> make >>>>>>>>> all >>>>>>>>>>>>> changes agnostic to brokers as well as to old versioned >>>>>>> consumers, >>>>>>>>> then >>>>>>>>>>>>> our >>>>>>>>>>>>> migration plan could be much easier. >>>>>>>>>>>>> Yes, the upgrade plan was designed to take the new >>>>>>>> StreamCoordinator >>>>>>>>>>>>> approach >>>>>>>>>>>>> which means we shall define a new protocol type. For existing >>>>>>>>>> application >>>>>>>>>>>>> we could only >>>>>>>>>>>>> maintain the same `consumer` protocol type is because current >>>>>>>> broker >>>>>>>>>> only >>>>>>>>>>>>> allows >>>>>>>>>>>>> change of protocol type when the consumer group is empty. It is >>>>>>> of >>>>>>>>>> course >>>>>>>>>>>>> user-unfriendly to force >>>>>>>>>>>>> a wipe-out for the entire application, and I don't think >>>>>>>> maintaining >>>>>>>>>> old >>>>>>>>>>>>> protocol type would greatly >>>>>>>>>>>>> impact ongoing services using new stream coordinator. WDYT? >>>>>>>>>>>>> >>>>>>>>>>>>> 4. I think one major issue related to this KIP is that today, >> in >>>>>>>> the >>>>>>>>>>>>> StickyPartitionAssignor, we always try to honor stickiness over >>>>>>>>>> workload >>>>>>>>>>>>> balance, and hence "learner task" is needed to break this >>>>>>> priority, >>>>>>>>> but >>>>>>>>>>>>> I'm >>>>>>>>>>>>> wondering if we can have a better solution within sticky task >>>>>>>>> assignor >>>>>>>>>>>>> that >>>>>>>>>>>>> accommodate this? >>>>>>>>>>>>> Great question! That's what I explained in the proposal, which >>>>>>> is >>>>>>>>> that >>>>>>>>>> we >>>>>>>>>>>>> should breakdown our >>>>>>>>>>>>> delivery into different stages. At very beginning, our goal is >>>>>>> to >>>>>>>>>> trigger >>>>>>>>>>>>> learner task assignment only on >>>>>>>>>>>>> `new` hosts, where we shall leverage leader's knowledge of >>>>>>> previous >>>>>>>>>> round >>>>>>>>>>>>> of rebalance to figure out. After >>>>>>>>>>>>> stage one, our goal is to have a smooth scaling up experience, >>>>>>> but >>>>>>>>> the >>>>>>>>>>>>> task balance problem is kind of orthogonal. >>>>>>>>>>>>> The load balance problem is a much broader topic than auto >>>>>>> scaling, >>>>>>>>>> which >>>>>>>>>>>>> I figure worth discussing within >>>>>>>>>>>>> this KIP's context since it's a naturally next-step, but >>>>>>> wouldn't >>>>>>>> be >>>>>>>>>> the >>>>>>>>>>>>> main topic. >>>>>>>>>>>>> Learner task or auto scaling support should be treated as `a >>>>>>>> helpful >>>>>>>>>>>>> mechanism to reach load balance`, but not `an algorithm >> defining >>>>>>>> load >>>>>>>>>>>>> balance`. It would be great if you could share some insights of >>>>>>> the >>>>>>>>>> stream >>>>>>>>>>>>> task balance, which eventually helps us to break out of the >>>>>>>> KIP-429's >>>>>>>>>> scope >>>>>>>>>>>>> and even define a separate KIP to focus on task weight & >>>>>>> assignment >>>>>>>>>> logic >>>>>>>>>>>>> improvement. >>>>>>>>>>>>> >>>>>>>>>>>>> Also thank you for making improvement on the KIP context and >>>>>>>>>> organization! >>>>>>>>>>>>> >>>>>>>>>>>>> Best, >>>>>>>>>>>>> Boyang >>>>>>>>>>>>> ________________________________ >>>>>>>>>>>>> From: Guozhang Wang <wangg...@gmail.com> >>>>>>>>>>>>> Sent: Saturday, March 2, 2019 6:00 AM >>>>>>>>>>>>> To: dev >>>>>>>>>>>>> Subject: Re: [DISCUSS] KIP-429 : Smooth Auto-Scaling for Kafka >>>>>>>>> Streams >>>>>>>>>>>>> >>>>>>>>>>>>> Hello Boyang, >>>>>>>>>>>>> >>>>>>>>>>>>> I've just made a quick pass on the KIP and here are some >>>>>>> thoughts. >>>>>>>>>>>>> >>>>>>>>>>>>> Meta: >>>>>>>>>>>>> >>>>>>>>>>>>> 1. I'm still not sure if it's worthwhile to add a new type of >>>>>>>>> "learner >>>>>>>>>>>>> task" in addition to "standby task": if the only difference is >>>>>>> that >>>>>>>>> for >>>>>>>>>>>>> the >>>>>>>>>>>>> latter, we would consider workload balance while for the former >>>>>>> we >>>>>>>>>> would >>>>>>>>>>>>> not, I think we can just adjust the logic of StickyTaskAssignor >>>>>>> a >>>>>>>> bit >>>>>>>>>> to >>>>>>>>>>>>> break that difference. Adding a new type of task would be >>>>>>> adding a >>>>>>>>> lot >>>>>>>>>> of >>>>>>>>>>>>> code complexity, so if we can still piggy-back the logic on a >>>>>>>>>> standby-task >>>>>>>>>>>>> I would prefer to do so. >>>>>>>>>>>>> >>>>>>>>>>>>> 2. One thing that's still not clear from the KIP wiki itself is >>>>>>>> which >>>>>>>>>>>>> layer >>>>>>>>>>>>> would the logic be implemented at. Although for most KIPs we >>>>>>> would >>>>>>>>> not >>>>>>>>>>>>> require internal implementation details but only public facing >>>>>>> API >>>>>>>>>>>>> updates, >>>>>>>>>>>>> for a KIP like this I think it still requires to flesh out >>>>>>> details >>>>>>>> on >>>>>>>>>> the >>>>>>>>>>>>> implementation design. More specifically: today Streams embed a >>>>>>>> full >>>>>>>>>>>>> fledged Consumer client, which hard-code a ConsumerCoordinator >>>>>>>>> inside, >>>>>>>>>>>>> Streams then injects a StreamsPartitionAssignor to its plugable >>>>>>>>>>>>> PartitionAssignor interface and inside the >>>>>>> StreamsPartitionAssignor >>>>>>>>> we >>>>>>>>>>>>> also >>>>>>>>>>>>> have a TaskAssignor interface whose default implementation is >>>>>>>>>>>>> StickyPartitionAssignor. Streams partition assignor logic today >>>>>>>> sites >>>>>>>>>> in >>>>>>>>>>>>> the latter two classes. Hence the hierarchy today is: >>>>>>>>>>>>> >>>>>>>>>>>>> KafkaConsumer -> ConsumerCoordinator -> >>>>>>> StreamsPartitionAssignor -> >>>>>>>>>>>>> StickyTaskAssignor. >>>>>>>>>>>>> >>>>>>>>>>>>> We need to think about where the proposed implementation would >>>>>>> take >>>>>>>>>> place >>>>>>>>>>>>> at, and personally I think it is not the best option to inject >>>>>>> all >>>>>>>> of >>>>>>>>>> them >>>>>>>>>>>>> into the StreamsPartitionAssignor / StickyTaskAssignor since >> the >>>>>>>>> logic >>>>>>>>>> of >>>>>>>>>>>>> "triggering another rebalance" etc would require some >>>>>>> coordinator >>>>>>>>> logic >>>>>>>>>>>>> which is hard to mimic at PartitionAssignor level. On the other >>>>>>>> hand, >>>>>>>>>>>>> since >>>>>>>>>>>>> we are embedding a KafkaConsumer client as a whole we cannot >>>>>>> just >>>>>>>>>> replace >>>>>>>>>>>>> ConsumerCoordinator with a specialized StreamsCoordinator like >>>>>>>>> Connect >>>>>>>>>>>>> does >>>>>>>>>>>>> in KIP-415. So I'd like to maybe split the current proposal in >>>>>>> both >>>>>>>>>>>>> consumer layer and streams-assignor layer like we did in >>>>>>>>>> KIP-98/KIP-129. >>>>>>>>>>>>> And then the key thing to consider is how to cut off the >>>>>>> boundary >>>>>>>> so >>>>>>>>>> that >>>>>>>>>>>>> the modifications we push to ConsumerCoordinator would be >>>>>>>> beneficial >>>>>>>>>>>>> universally for any consumers, while keep the Streams-specific >>>>>>>> logic >>>>>>>>> at >>>>>>>>>>>>> the >>>>>>>>>>>>> assignor level. >>>>>>>>>>>>> >>>>>>>>>>>>> 3. Depending on which design direction we choose, our migration >>>>>>>> plan >>>>>>>>>> would >>>>>>>>>>>>> also be quite different. For example, if we stay with >>>>>>>>>> ConsumerCoordinator >>>>>>>>>>>>> whose protocol type is "consumer" still, and we can manage to >>>>>>> make >>>>>>>>> all >>>>>>>>>>>>> changes agnostic to brokers as well as to old versioned >>>>>>> consumers, >>>>>>>>> then >>>>>>>>>>>>> our >>>>>>>>>>>>> migration plan could be much easier. >>>>>>>>>>>>> >>>>>>>>>>>>> 4. I think one major issue related to this KIP is that today, >> in >>>>>>>> the >>>>>>>>>>>>> StickyPartitionAssignor, we always try to honor stickiness over >>>>>>>>>> workload >>>>>>>>>>>>> balance, and hence "learner task" is needed to break this >>>>>>> priority, >>>>>>>>> but >>>>>>>>>>>>> I'm >>>>>>>>>>>>> wondering if we can have a better solution within sticky task >>>>>>>>> assignor >>>>>>>>>>>>> that >>>>>>>>>>>>> accommodate this? >>>>>>>>>>>>> >>>>>>>>>>>>> Minor: >>>>>>>>>>>>> >>>>>>>>>>>>> 1. The idea of two rebalances have also been discussed in >>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-6145. So we should >>>>>>> add >>>>>>>>> the >>>>>>>>>>>>> reference on the wiki page as well. >>>>>>>>>>>>> 2. Could you also add a section describing how the subscription >>>>>>> / >>>>>>>>>>>>> assignment metadata will be re-formatted? Without this >>>>>>> information >>>>>>>> it >>>>>>>>>> is >>>>>>>>>>>>> hard to get to the bottom of your idea. For example in the >>>>>>> "Leader >>>>>>>>>>>>> Transfer >>>>>>>>>>>>> Before Scaling" section, I'm not sure why "S2 doesn't know S4 >> is >>>>>>>> new >>>>>>>>>>>>> member" >>>>>>>>>>>>> and hence would blindly obey stickiness over workload balance >>>>>>>>>> requirement. >>>>>>>>>>>>> >>>>>>>>>>>>> Guozhang >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Thu, Feb 28, 2019 at 11:05 AM Boyang Chen < >>>>>>> bche...@outlook.com> >>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hey community friends, >>>>>>>>>>>>>> >>>>>>>>>>>>>> I'm gladly inviting you to have a look at the proposal to add >>>>>>>>>>>>> incremental >>>>>>>>>>>>>> rebalancing to Kafka Streams, A.K.A auto-scaling support. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Smooth+Auto-Scaling+for+Kafka+Streams >>>>>>>>>>>>>> >>>>>>>>>>>>>> Special thanks to Guozhang for giving great guidances and >>>>>>>> important >>>>>>>>>>>>>> feedbacks while making this KIP! >>>>>>>>>>>>>> >>>>>>>>>>>>>> Best, >>>>>>>>>>>>>> Boyang >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> -- >>>>>>>>>>>>> -- Guozhang >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> -- Guozhang >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> -- Guozhang >>>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> -- Guozhang >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> -- Guozhang >>>>>> >>>>> >>>>> >>>> >>>> >>> >> >> >
signature.asc
Description: OpenPGP digital signature