Hello Matthias,

I'm proposing to change this behavior holistically inside
ConsumerCoordinator actually. In other words I'm trying to piggy-back this
behavioral fix of KAFKA-4600 along with this KIP, and the motivation for me
to do this piggy-backing is that, with incremental rebalancing, there would
be partial affected partitions as we are not revoking every body any more.


Guozhang


On Thu, May 9, 2019 at 6:21 AM Matthias J. Sax <matth...@confluent.io>
wrote:

> Thanks Guozhang!
>
> The simplified upgrade path is great!
>
>
> Just a clarification question about the "Rebalance Callback Error
> Handling" -- does this change affect the `ConsumerCoordinator` only if
> incremental rebalancing is use? Or does the behavior also change for the
> eager rebalancing case?
>
>
> -Matthias
>
>
> On 5/9/19 3:37 AM, Guozhang Wang wrote:
> > Hello all,
> >
> > Thanks for everyone who've shared their feedbacks for this KIP! If
> there's
> > no further comments I'll start the voting thread by end of tomorrow.
> >
> >
> > Guozhang.
> >
> > On Wed, May 8, 2019 at 6:36 PM Guozhang Wang <wangg...@gmail.com> wrote:
> >
> >> Hello Boyang,
> >>
> >> On Wed, May 1, 2019 at 4:51 PM Boyang Chen <bche...@outlook.com> wrote:
> >>
> >>> Hey Guozhang,
> >>>
> >>> thank you for the great write up. Overall the motivation and changes
> >>> LGTM, just some minor comments:
> >>>
> >>>
> >>>   1.  In "Consumer Coordinator Algorithm", we could reorder alphabet
> >>> points for 3d~3f from ["ready-to-migrate-partitions",
> >>> "unknown-but-owned-partitions",  "maybe-revoking-partitions"] to
> >>> ["maybe-revoking-partitions", "ready-to-migrate-partitions",
> >>> "unknown-but-owned-partitions"] in order to be consistent with 3c1~3.
> >>>
> >>
> >> Ack. Updated.
> >>
> >>
> >>>   2.  In "Consumer Coordinator Algorithm", 1c suggests to revoke all
> >>> partition upon heartbeat/commit fail. What's the gain here? Do we want
> to
> >>> keep all partitions running at this moment, to be optimistic for the
> case
> >>> when no partitions get reassigned?
> >>>
> >>
> >> That's a good catch. When REBALANCE_IN_PROGRESS is received, we can just
> >> re-join the group with all the currently owned partitions encoded.
> Updated.
> >>
> >>
> >>>   3.  In "Recommended Upgrade Procedure", remove extra 'those': " The
> >>> 'sticky' assignor works even those there are "
> >>>
> >>
> >> Ack, should be `even when`.
> >>
> >>
> >>>   4.  Put two "looking into the future" into a separate category from
> >>> migration session. It seems inconsistent for readers to see this
> before we
> >>> finished discussion for everything.
> >>>
> >>
> >> Ack.
> >>
> >>
> >>>   5.  Have we discussed the concern on the serialization? Could the new
> >>> metadata we are adding grow larger than the message size cap?
> >>>
> >>
> >> We're completing https://issues.apache.org/jira/browse/KAFKA-7149 which
> >> should largely reduce the message size (will update the wiki
> accordingly as
> >> well).
> >>
> >>
> >>>
> >>> Boyang
> >>>
> >>> ________________________________
> >>> From: Guozhang Wang <wangg...@gmail.com>
> >>> Sent: Monday, April 15, 2019 9:20 AM
> >>> To: dev
> >>> Subject: Re: [DISCUSS] KIP-429 : Smooth Auto-Scaling for Kafka Streams
> >>>
> >>> Hello Jason,
> >>>
> >>> I agree with you that for range / round-robin it makes less sense to be
> >>> compatible with cooperative rebalance protocol.
> >>>
> >>> As for StickyAssignor, however, I think it would still be possible to
> make
> >>> the current implementation to be compatible with cooperative
> rebalance. So
> >>> after pondering on different options at hand I'm now proposing this
> >>> approach as listed in the upgrade section:
> >>>
> >>>
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol#KIP-429:KafkaConsumerIncrementalRebalanceProtocol-CompatibilityandUpgradePath
> >>>
> >>> The idea is to let assignors specify which protocols it would work
> with,
> >>> associating with a different name; then the upgrade path would involve
> a
> >>> "compatible" protocol which actually still use eager behavior while
> >>> encoding two assignors if possible. In "Rejected Section" (just to
> clarify
> >>> I'm not finalizing it as rejected, just putting it there for now, and
> if
> >>> we
> >>> like this one instead we can always switch them) I listed the other
> >>> approach we once discussed about, and arguing its cons of duplicated
> class
> >>> seems overwhelm the pros of saving the  "rebalance.protocol" config.
> >>>
> >>> Let me know WDYT.
> >>>
> >>> Guozhang
> >>>
> >>> On Fri, Apr 12, 2019 at 6:08 PM Jason Gustafson <ja...@confluent.io>
> >>> wrote:
> >>>
> >>>> Hi Guozhang,
> >>>>
> >>>> Responses below:
> >>>>
> >>>> 2. The interface's default implementation will just be
> >>>>> `onPartitionRevoked`, so for user's instantiation if they do not make
> >>> any
> >>>>> code changes they should be able to recompile the code and continue.
> >>>>
> >>>>
> >>>> Ack, makes sense.
> >>>>
> >>>> 4. Hmm.. not sure if it will work. The main issue is that the
> >>>>> consumer-coordinator behavior (whether to revoke all or none at
> >>>>> onRebalancePrepare) is independent of the selected protocol's
> assignor
> >>>>> (eager or cooperative), so even if the assignor is selected to be the
> >>>>> old-versioned one, we will still not revoke at the
> >>> consumer-coordinator
> >>>>> layer and hence has the same risk of migrating still-owned
> partitions,
> >>>>> right?
> >>>>
> >>>>
> >>>> Yeah, basically we would have to push the eager/cooperative logic into
> >>> the
> >>>> PartitionAssignor itself and make the consumer aware of the rebalance
> >>>> protocol it is compatible with. As long as an eager protocol _could_
> be
> >>>> selected, the consumer would have to be pessimistic and do eager
> >>>> revocation. But if all the assignors configured in the consumer
> support
> >>>> cooperative reassignment, then either 1) a cooperative protocol will
> be
> >>>> selected and cooperative revocation can be safely used, or 2) if the
> >>> rest
> >>>> of the group does not support it, then the consumer will simply fail.
> >>>>
> >>>> Another point which you raised offline and I will repeat here is that
> >>> this
> >>>> proposal's benefit is mostly limited to sticky assignment logic.
> >>> Arguably
> >>>> the range assignor may have some incidental stickiness, particularly
> if
> >>> the
> >>>> group is rebalancing for a newly created or deleted topic. For other
> >>> cases,
> >>>> the proposal is mostly additional overhead since it takes an
> additional
> >>>> rebalance and many of the partitions will move. Perhaps it doesn't
> make
> >>> as
> >>>> much sense to use the cooperative protocol for strategies like range
> and
> >>>> round-robin. That kind of argues in favor of pushing some of the
> control
> >>>> into the assignor itself. Maybe we would not bother creating
> >>>> CooperativeRange as I suggested above, but it would make sense to
> >>> create a
> >>>> cooperative version of the sticky assignment strategy. I thought we
> >>> might
> >>>> have to create a new sticky assignor anyway because I can't see how we
> >>>> would get compatible behavior mixing with the old version anyway.
> >>>>
> >>>> Thanks,
> >>>> Jason
> >>>>
> >>>>
> >>>> On Thu, Apr 11, 2019 at 5:53 PM Guozhang Wang <wangg...@gmail.com>
> >>> wrote:
> >>>>
> >>>>> Hello Matthias:
> >>>>>
> >>>>> Thanks for your review.
> >>>>>
> >>>>> The background section uses streams assignor as well as the
> consumer's
> >>>> own
> >>>>> stick assignor as examples illustrating the situation, but this KIP
> is
> >>>> for
> >>>>> consumer coordinator itself, and the rest of the paragraph did not
> >>> talk
> >>>>> about Streams any more. If you feel it's a bit distracted I can
> remove
> >>>>> those examples.
> >>>>>
> >>>>> 10). While working on the PR I realized that the revoked partitions
> on
> >>>>> assignment is not needed (this is being discussed on the PR itself:
> >>>>> https://github.com/apache/kafka/pull/6528#issuecomment-480009890
> >>>>>
> >>>>> 20). 1.a. Good question, I've updated the wiki to let the consumer's
> >>>>> cleanup assignment and re-join, and not letting assignor making any
> >>>>> proactive changes. The idea is to keep logic simpler and not doing
> any
> >>>>> "split brain" stuff.
> >>>>>
> >>>>> 20). 2.b. No we do not need, since the owned-partitions will be part
> >>> of
> >>>> the
> >>>>> Subscription passed in to assign() already.
> >>>>>
> >>>>> 30). As Boyang mentioned, there are some drawbacks that can not be
> >>>>> addressed by rebalance delay still, hence still voted KIP-345 (some
> >>> more
> >>>>> details can be found on the discussion thread of KIP-345 itself). One
> >>>>> example is that as the instance resumes, its member id will be empty
> >>> so
> >>>> we
> >>>>> are still relying on assignor to give it the assignment from the old
> >>>>> member-id while keeping all other member's assignment unchanged.
> >>>>>
> >>>>> 40). Incomplete sentence, I've updated it.
> >>>>>
> >>>>> 50). Here's my idea: suppose we augment the join group schema with
> >>>>> `protocol version` in 2.3, and then with both brokers and clients
> >>> being
> >>>> in
> >>>>> version 2.3+, on the first rolling bounce where subscription and
> >>>> assignment
> >>>>> schema and / or user metadata has changed, this protocol version will
> >>> be
> >>>>> bumped. On the broker side, when receiving all member's join-group
> >>>> request,
> >>>>> it will choose the one that has the highest protocol version (also it
> >>>>> assumes higher versioned protocol is always backward compatible, i.e.
> >>> the
> >>>>> coordinator can recognize lower versioned protocol as well) and
> >>> select it
> >>>>> as the leader. Then the leader can decide, based on its received and
> >>>>> deserialized subscription information, how to assign partitions and
> >>> how
> >>>> to
> >>>>> encode the assignment accordingly so that everyone can understand it.
> >>>> With
> >>>>> this, in Streams for example, no version probing would be needed
> >>> since we
> >>>>> are guaranteed the leader knows everyone's version -- again it is
> >>>> assuming
> >>>>> that higher versioned protocol is always backward compatible -- and
> >>> hence
> >>>>> can successfully do the assignment at that round.
> >>>>>
> >>>>> 60). My bad, this section was not updated while the design was
> >>> evolved,
> >>>>> I've updated it.
> >>>>>
> >>>>>
> >>>>> On Tue, Apr 9, 2019 at 7:22 PM Boyang Chen <bche...@outlook.com>
> >>> wrote:
> >>>>>
> >>>>>>
> >>>>>> Thanks for the review Matthias! My 2-cent on the rebalance delay is
> >>>> that
> >>>>>> it is a rather fixed trade-off between
> >>>>>>
> >>>>>> task availability and resource shuffling. If we eventually trigger
> >>>>>> rebalance after rolling bounce, certain consumer
> >>>>>>
> >>>>>> setup is still faced with global shuffles, for example member.id
> >>>> ranking
> >>>>>> based round robin strategy, as rejoining dynamic
> >>>>>>
> >>>>>> members will be assigned with new member.id which reorders the
> >>>>>> assignment. So I think the primary goal of incremental
> >>>>>>
> >>>>>> rebalancing is still improving the cluster availability during
> >>>> rebalance,
> >>>>>> because it didn't revoke any partition during this
> >>>>>>
> >>>>>> process. Also, the perk is minimum configuration requirement :)
> >>>>>>
> >>>>>>
> >>>>>> Best,
> >>>>>>
> >>>>>> Boyang
> >>>>>>
> >>>>>> ________________________________
> >>>>>> From: Matthias J. Sax <matth...@confluent.io>
> >>>>>> Sent: Tuesday, April 9, 2019 7:47 AM
> >>>>>> To: dev
> >>>>>> Subject: Re: [DISCUSS] KIP-429 : Smooth Auto-Scaling for Kafka
> >>> Streams
> >>>>>>
> >>>>>> Thank for the KIP, Boyang and Guozhang!
> >>>>>>
> >>>>>>
> >>>>>> I made an initial pass and have some questions/comments. One high
> >>> level
> >>>>>> comment: it seems that the KIP "mixes" plain consumer and Kafka
> >>> Streams
> >>>>>> use case a little bit (at least in the presentation). It might be
> >>>>>> helpful to separate both cases clearly, or maybe limit the scope to
> >>>>>> plain consumer only.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> 10) For `PartitionAssignor.Assignment`: It seems we need a new
> >>> method
> >>>>>> `List<TopicPartitions> revokedPartitions()` ?
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> 20) In Section "Consumer Coordinator Algorithm"
> >>>>>>
> >>>>>>     Bullet point "1a)": If the subscription changes and a topic is
> >>>>>> removed from the subscription, why do we not revoke the partitions?
> >>>>>>
> >>>>>>     Bullet point "1a)": What happens is a topic is deleted (or a
> >>>>>> partition is removed/deleted from a topic)? Should we call the new
> >>>>>> `onPartitionsEmigrated()` callback for this case?
> >>>>>>
> >>>>>>     Bullet point "2b)" Should we update the `PartitionAssignor`
> >>>>>> interface to pass in the "old assignment" as third parameter into
> >>>>>> `assign()`?
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> 30) Rebalance delay (as used in KIP-415): Could a rebalance delay
> >>>>>> subsume KIP-345? Configuring static members is rather complicated,
> >>> and
> >>>> I
> >>>>>> am wondering if a rebalance delay would be sufficient?
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> 40) Quote: "otherwise the we would fall into the case 3.b) forever."
> >>>>>>
> >>>>>> What is "case 3.b" ?
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> 50) Section "Looking into the Future"
> >>>>>>
> >>>>>> Nit: the new "ProtocolVersion" field is missing in the first line
> >>>>>> describing "JoinGroupRequest"
> >>>>>>
> >>>>>>> This can also help saving "version probing" cost on Streams as
> >>> well.
> >>>>>>
> >>>>>> How does this relate to Kafka Streams "version probing"
> >>> implementation?
> >>>>>> How can we exploit the new `ProtocolVersion` in Streams to improve
> >>>>>> "version probing" ? I have a rough idea, but would like to hear more
> >>>>>> details.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> 60) Section "Recommended Upgrade Procedure"
> >>>>>>
> >>>>>>> Set the `stream.rebalancing.mode` to `upgrading`, which will force
> >>>> the
> >>>>>> stream application to stay with protocol type "consumer".
> >>>>>>
> >>>>>> This config is not discussed in the KIP and appears in this section
> >>>>>> without context. Can you elaborate about it?
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> -Matthias
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On 3/29/19 6:20 PM, Guozhang Wang wrote:
> >>>>>>> Bump up on this discussion thread. I've added a few new drawings
> >>> for
> >>>>>> better
> >>>>>>> illustration, would really appreciate your feedbacks.
> >>>>>>>
> >>>>>>>
> >>>>>>> Guozhang
> >>>>>>>
> >>>>>>> On Wed, Mar 20, 2019 at 6:17 PM Guozhang Wang <wangg...@gmail.com
> >>>>
> >>>>>> wrote:
> >>>>>>>
> >>>>>>>> Hello Boyang,
> >>>>>>>>
> >>>>>>>> I've made another thorough pass over this KIP and I'd like to
> >>> spilt
> >>>> it
> >>>>>>>> into two parts: the first part, covered in KIP-429 would be
> >>> touching
> >>>>> on
> >>>>>>>> Consumer Coordinator only to have incremental rebalance protocol
> >>> in
> >>>>>> place.
> >>>>>>>> The second part (for now I've reserved KIP number 444 for it)
> >>> would
> >>>>>> contain
> >>>>>>>> all the changes on StreamsPartitionAssginor to allow warming up
> >>> new
> >>>>>>>> members.
> >>>>>>>>
> >>>>>>>> I think the first part, a.k.a. the current updated KIP-429 is
> >>> ready
> >>>>> for
> >>>>>>>> review and discussions again. Would love to hear people's
> >>> feedbacks
> >>>>> and
> >>>>>>>> ideas.
> >>>>>>>>
> >>>>>>>> Guozhang
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Mon, Mar 4, 2019 at 10:09 AM Boyang Chen <bche...@outlook.com
> >>>>
> >>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Thanks Guozhang for the great questions. Answers are inlined:
> >>>>>>>>>
> >>>>>>>>> 1. I'm still not sure if it's worthwhile to add a new type of
> >>>>> "learner
> >>>>>>>>> task" in addition to "standby task": if the only difference is
> >>> that
> >>>>> for
> >>>>>>>>> the
> >>>>>>>>> latter, we would consider workload balance while for the former
> >>> we
> >>>>>> would
> >>>>>>>>> not, I think we can just adjust the logic of StickyTaskAssignor
> >>> a
> >>>> bit
> >>>>>> to
> >>>>>>>>> break that difference. Adding a new type of task would be
> >>> adding a
> >>>>> lot
> >>>>>> of
> >>>>>>>>> code complexity, so if we can still piggy-back the logic on a
> >>>>>> standby-task
> >>>>>>>>> I would prefer to do so.
> >>>>>>>>> In the proposal we stated that we are not adding a new type of
> >>> task
> >>>>>>>>> implementation. The
> >>>>>>>>> learner task shall share the same implementation with normal
> >>>> standby
> >>>>>>>>> task, only that we
> >>>>>>>>> shall tag the standby task with learner and prioritize the
> >>> learner
> >>>>>> tasks
> >>>>>>>>> replay effort.
> >>>>>>>>> 2. One thing that's still not clear from the KIP wiki itself is
> >>>> which
> >>>>>>>>> layer
> >>>>>>>>> would the logic be implemented at. Although for most KIPs we
> >>> would
> >>>>> not
> >>>>>>>>> require internal implementation details but only public facing
> >>> API
> >>>>>>>>> updates,
> >>>>>>>>> for a KIP like this I think it still requires to flesh out
> >>> details
> >>>> on
> >>>>>> the
> >>>>>>>>> implementation design. More specifically: today Streams embed a
> >>>> full
> >>>>>>>>> fledged Consumer client, which hard-code a ConsumerCoordinator
> >>>>> inside,
> >>>>>>>>> Streams then injects a StreamsPartitionAssignor to its pluggable
> >>>>>>>>> PartitionAssignor interface and inside the
> >>> StreamsPartitionAssignor
> >>>>> we
> >>>>>>>>> also
> >>>>>>>>> have a TaskAssignor interface whose default implementation is
> >>>>>>>>> StickyPartitionAssignor. Streams partition assignor logic today
> >>>> sites
> >>>>>> in
> >>>>>>>>> the latter two classes. Hence the hierarchy today is:
> >>>>>>>>>
> >>>>>>>>> KafkaConsumer -> ConsumerCoordinator ->
> >>> StreamsPartitionAssignor ->
> >>>>>>>>> StickyTaskAssignor.
> >>>>>>>>>
> >>>>>>>>> We need to think about where the proposed implementation would
> >>> take
> >>>>>> place
> >>>>>>>>> at, and personally I think it is not the best option to inject
> >>> all
> >>>> of
> >>>>>> them
> >>>>>>>>> into the StreamsPartitionAssignor / StickyTaskAssignor since the
> >>>>> logic
> >>>>>> of
> >>>>>>>>> "triggering another rebalance" etc would require some
> >>> coordinator
> >>>>> logic
> >>>>>>>>> which is hard to mimic at PartitionAssignor level. On the other
> >>>> hand,
> >>>>>>>>> since
> >>>>>>>>> we are embedding a KafkaConsumer client as a whole we cannot
> >>> just
> >>>>>> replace
> >>>>>>>>> ConsumerCoordinator with a specialized StreamsCoordinator like
> >>>>> Connect
> >>>>>>>>> does
> >>>>>>>>> in KIP-415. So I'd like to maybe split the current proposal in
> >>> both
> >>>>>>>>> consumer layer and streams-assignor layer like we did in
> >>>>>> KIP-98/KIP-129.
> >>>>>>>>> And then the key thing to consider is how to cut off the
> >>> boundary
> >>>> so
> >>>>>> that
> >>>>>>>>> the modifications we push to ConsumerCoordinator would be
> >>>> beneficial
> >>>>>>>>> universally for any consumers, while keep the Streams-specific
> >>>> logic
> >>>>> at
> >>>>>>>>> the
> >>>>>>>>> assignor level.
> >>>>>>>>> Yes, that's also my ideal plan. The details for the
> >>> implementation
> >>>>> are
> >>>>>>>>> depicted
> >>>>>>>>> in this doc<
> >>>>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> https://docs.google.com/document/d/1me2a5wvxAZT1QE6HkwyDl7C2TiBQlKN3Dpw_I1ro91U/edit#heading=h.qix74qdmekae
> >>>>>>> ,
> >>>>>> [
> >>>>>>
> >>>>>
> >>>>
> >>>
> https://lh5.googleusercontent.com/DXWMyKNE9rFFIv7TNX56Q41QwqYp8ynivwWSJHHORqSRkoQxtraW2bqiB-NRUGAMYKkt8A=w1200-h630-p
> >>>>>> ]<
> >>>>>>
> >>>>>
> >>>>
> >>>
> https://docs.google.com/document/d/1me2a5wvxAZT1QE6HkwyDl7C2TiBQlKN3Dpw_I1ro91U/edit#heading=h.qix74qdmekae
> >>>>>>>
> >>>>>>
> >>>>>> [External] KStream Smooth Auto-scaling Implementation Plan<
> >>>>>>
> >>>>>
> >>>>
> >>>
> https://docs.google.com/document/d/1me2a5wvxAZT1QE6HkwyDl7C2TiBQlKN3Dpw_I1ro91U/edit#heading=h.qix74qdmekae
> >>>>>>>
> >>>>>> docs.google.com
> >>>>>> KStream Incremental Rebalancing Implementation Plan Authors: Boyang
> >>>> Chen,
> >>>>>> Guozhang Wang KIP link Stage: [Draft | Review | Approved]
> >>> Background We
> >>>>>> initiated KIP-429 for the promotion of incremental rebalancing work
> >>> for
> >>>>>> KStream. Behind the scene, there is non-trivial amount of effort
> >>> that
> >>>>> needs
> >>>>>> to...
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>>>> and I have explained the reasoning on why we want to push a
> >>>>>>>>> global change of replacing ConsumerCoordinator with
> >>>>> StreamCoordinator.
> >>>>>>>>> The motivation
> >>>>>>>>> is that KIP space is usually used for public & algorithm level
> >>>>> change,
> >>>>>>>>> not for internal
> >>>>>>>>> implementation details.
> >>>>>>>>>
> >>>>>>>>> 3. Depending on which design direction we choose, our migration
> >>>> plan
> >>>>>> would
> >>>>>>>>> also be quite different. For example, if we stay with
> >>>>>> ConsumerCoordinator
> >>>>>>>>> whose protocol type is "consumer" still, and we can manage to
> >>> make
> >>>>> all
> >>>>>>>>> changes agnostic to brokers as well as to old versioned
> >>> consumers,
> >>>>> then
> >>>>>>>>> our
> >>>>>>>>> migration plan could be much easier.
> >>>>>>>>> Yes, the upgrade plan was designed to take the new
> >>>> StreamCoordinator
> >>>>>>>>> approach
> >>>>>>>>> which means we shall define a new protocol type. For existing
> >>>>>> application
> >>>>>>>>> we could only
> >>>>>>>>> maintain the same `consumer` protocol type is because current
> >>>> broker
> >>>>>> only
> >>>>>>>>> allows
> >>>>>>>>> change of protocol type when the consumer group is empty. It is
> >>> of
> >>>>>> course
> >>>>>>>>> user-unfriendly to force
> >>>>>>>>> a wipe-out for the entire application, and I don't think
> >>>> maintaining
> >>>>>> old
> >>>>>>>>> protocol type would greatly
> >>>>>>>>> impact ongoing services using new stream coordinator. WDYT?
> >>>>>>>>>
> >>>>>>>>> 4. I think one major issue related to this KIP is that today, in
> >>>> the
> >>>>>>>>> StickyPartitionAssignor, we always try to honor stickiness over
> >>>>>> workload
> >>>>>>>>> balance, and hence "learner task" is needed to break this
> >>> priority,
> >>>>> but
> >>>>>>>>> I'm
> >>>>>>>>> wondering if we can have a better solution within sticky task
> >>>>> assignor
> >>>>>>>>> that
> >>>>>>>>> accommodate this?
> >>>>>>>>> Great question! That's what I explained in the proposal, which
> >>> is
> >>>>> that
> >>>>>> we
> >>>>>>>>> should breakdown our
> >>>>>>>>> delivery into different stages. At very beginning, our goal is
> >>> to
> >>>>>> trigger
> >>>>>>>>> learner task assignment only on
> >>>>>>>>> `new` hosts, where we shall leverage leader's knowledge of
> >>> previous
> >>>>>> round
> >>>>>>>>> of rebalance to figure out. After
> >>>>>>>>> stage one, our goal is to have a smooth scaling up experience,
> >>> but
> >>>>> the
> >>>>>>>>> task balance problem is kind of orthogonal.
> >>>>>>>>> The load balance problem is a much broader topic than auto
> >>> scaling,
> >>>>>> which
> >>>>>>>>> I figure worth discussing within
> >>>>>>>>> this KIP's context since it's a naturally next-step, but
> >>> wouldn't
> >>>> be
> >>>>>> the
> >>>>>>>>> main topic.
> >>>>>>>>> Learner task or auto scaling support should be treated as `a
> >>>> helpful
> >>>>>>>>> mechanism to reach load balance`, but not `an algorithm defining
> >>>> load
> >>>>>>>>> balance`. It would be great if you could share some insights of
> >>> the
> >>>>>> stream
> >>>>>>>>> task balance, which eventually helps us to break out of the
> >>>> KIP-429's
> >>>>>> scope
> >>>>>>>>> and even define a separate KIP to focus on task weight &
> >>> assignment
> >>>>>> logic
> >>>>>>>>> improvement.
> >>>>>>>>>
> >>>>>>>>> Also thank you for making improvement on the KIP context and
> >>>>>> organization!
> >>>>>>>>>
> >>>>>>>>> Best,
> >>>>>>>>> Boyang
> >>>>>>>>> ________________________________
> >>>>>>>>> From: Guozhang Wang <wangg...@gmail.com>
> >>>>>>>>> Sent: Saturday, March 2, 2019 6:00 AM
> >>>>>>>>> To: dev
> >>>>>>>>> Subject: Re: [DISCUSS] KIP-429 : Smooth Auto-Scaling for Kafka
> >>>>> Streams
> >>>>>>>>>
> >>>>>>>>> Hello Boyang,
> >>>>>>>>>
> >>>>>>>>> I've just made a quick pass on the KIP and here are some
> >>> thoughts.
> >>>>>>>>>
> >>>>>>>>> Meta:
> >>>>>>>>>
> >>>>>>>>> 1. I'm still not sure if it's worthwhile to add a new type of
> >>>>> "learner
> >>>>>>>>> task" in addition to "standby task": if the only difference is
> >>> that
> >>>>> for
> >>>>>>>>> the
> >>>>>>>>> latter, we would consider workload balance while for the former
> >>> we
> >>>>>> would
> >>>>>>>>> not, I think we can just adjust the logic of StickyTaskAssignor
> >>> a
> >>>> bit
> >>>>>> to
> >>>>>>>>> break that difference. Adding a new type of task would be
> >>> adding a
> >>>>> lot
> >>>>>> of
> >>>>>>>>> code complexity, so if we can still piggy-back the logic on a
> >>>>>> standby-task
> >>>>>>>>> I would prefer to do so.
> >>>>>>>>>
> >>>>>>>>> 2. One thing that's still not clear from the KIP wiki itself is
> >>>> which
> >>>>>>>>> layer
> >>>>>>>>> would the logic be implemented at. Although for most KIPs we
> >>> would
> >>>>> not
> >>>>>>>>> require internal implementation details but only public facing
> >>> API
> >>>>>>>>> updates,
> >>>>>>>>> for a KIP like this I think it still requires to flesh out
> >>> details
> >>>> on
> >>>>>> the
> >>>>>>>>> implementation design. More specifically: today Streams embed a
> >>>> full
> >>>>>>>>> fledged Consumer client, which hard-code a ConsumerCoordinator
> >>>>> inside,
> >>>>>>>>> Streams then injects a StreamsPartitionAssignor to its plugable
> >>>>>>>>> PartitionAssignor interface and inside the
> >>> StreamsPartitionAssignor
> >>>>> we
> >>>>>>>>> also
> >>>>>>>>> have a TaskAssignor interface whose default implementation is
> >>>>>>>>> StickyPartitionAssignor. Streams partition assignor logic today
> >>>> sites
> >>>>>> in
> >>>>>>>>> the latter two classes. Hence the hierarchy today is:
> >>>>>>>>>
> >>>>>>>>> KafkaConsumer -> ConsumerCoordinator ->
> >>> StreamsPartitionAssignor ->
> >>>>>>>>> StickyTaskAssignor.
> >>>>>>>>>
> >>>>>>>>> We need to think about where the proposed implementation would
> >>> take
> >>>>>> place
> >>>>>>>>> at, and personally I think it is not the best option to inject
> >>> all
> >>>> of
> >>>>>> them
> >>>>>>>>> into the StreamsPartitionAssignor / StickyTaskAssignor since the
> >>>>> logic
> >>>>>> of
> >>>>>>>>> "triggering another rebalance" etc would require some
> >>> coordinator
> >>>>> logic
> >>>>>>>>> which is hard to mimic at PartitionAssignor level. On the other
> >>>> hand,
> >>>>>>>>> since
> >>>>>>>>> we are embedding a KafkaConsumer client as a whole we cannot
> >>> just
> >>>>>> replace
> >>>>>>>>> ConsumerCoordinator with a specialized StreamsCoordinator like
> >>>>> Connect
> >>>>>>>>> does
> >>>>>>>>> in KIP-415. So I'd like to maybe split the current proposal in
> >>> both
> >>>>>>>>> consumer layer and streams-assignor layer like we did in
> >>>>>> KIP-98/KIP-129.
> >>>>>>>>> And then the key thing to consider is how to cut off the
> >>> boundary
> >>>> so
> >>>>>> that
> >>>>>>>>> the modifications we push to ConsumerCoordinator would be
> >>>> beneficial
> >>>>>>>>> universally for any consumers, while keep the Streams-specific
> >>>> logic
> >>>>> at
> >>>>>>>>> the
> >>>>>>>>> assignor level.
> >>>>>>>>>
> >>>>>>>>> 3. Depending on which design direction we choose, our migration
> >>>> plan
> >>>>>> would
> >>>>>>>>> also be quite different. For example, if we stay with
> >>>>>> ConsumerCoordinator
> >>>>>>>>> whose protocol type is "consumer" still, and we can manage to
> >>> make
> >>>>> all
> >>>>>>>>> changes agnostic to brokers as well as to old versioned
> >>> consumers,
> >>>>> then
> >>>>>>>>> our
> >>>>>>>>> migration plan could be much easier.
> >>>>>>>>>
> >>>>>>>>> 4. I think one major issue related to this KIP is that today, in
> >>>> the
> >>>>>>>>> StickyPartitionAssignor, we always try to honor stickiness over
> >>>>>> workload
> >>>>>>>>> balance, and hence "learner task" is needed to break this
> >>> priority,
> >>>>> but
> >>>>>>>>> I'm
> >>>>>>>>> wondering if we can have a better solution within sticky task
> >>>>> assignor
> >>>>>>>>> that
> >>>>>>>>> accommodate this?
> >>>>>>>>>
> >>>>>>>>> Minor:
> >>>>>>>>>
> >>>>>>>>> 1. The idea of two rebalances have also been discussed in
> >>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-6145. So we should
> >>> add
> >>>>> the
> >>>>>>>>> reference on the wiki page as well.
> >>>>>>>>> 2. Could you also add a section describing how the subscription
> >>> /
> >>>>>>>>> assignment metadata will be re-formatted? Without this
> >>> information
> >>>> it
> >>>>>> is
> >>>>>>>>> hard to get to the bottom of your idea. For example in the
> >>> "Leader
> >>>>>>>>> Transfer
> >>>>>>>>> Before Scaling" section, I'm not sure why "S2 doesn't know S4 is
> >>>> new
> >>>>>>>>> member"
> >>>>>>>>> and hence would blindly obey stickiness over workload balance
> >>>>>> requirement.
> >>>>>>>>>
> >>>>>>>>> Guozhang
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Thu, Feb 28, 2019 at 11:05 AM Boyang Chen <
> >>> bche...@outlook.com>
> >>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hey community friends,
> >>>>>>>>>>
> >>>>>>>>>> I'm gladly inviting you to have a look at the proposal to add
> >>>>>>>>> incremental
> >>>>>>>>>> rebalancing to Kafka Streams, A.K.A auto-scaling support.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Smooth+Auto-Scaling+for+Kafka+Streams
> >>>>>>>>>>
> >>>>>>>>>> Special thanks to Guozhang for giving great guidances and
> >>>> important
> >>>>>>>>>> feedbacks while making this KIP!
> >>>>>>>>>>
> >>>>>>>>>> Best,
> >>>>>>>>>> Boyang
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> --
> >>>>>>>>> -- Guozhang
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> --
> >>>>>>>> -- Guozhang
> >>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>>> --
> >>>>> -- Guozhang
> >>>>>
> >>>>
> >>>
> >>>
> >>> --
> >>> -- Guozhang
> >>>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
> >
>
>

-- 
-- Guozhang

Reply via email to