Hello Matthias:

Thanks for your review.

The background section uses streams assignor as well as the consumer's own
stick assignor as examples illustrating the situation, but this KIP is for
consumer coordinator itself, and the rest of the paragraph did not talk
about Streams any more. If you feel it's a bit distracted I can remove
those examples.

10). While working on the PR I realized that the revoked partitions on
assignment is not needed (this is being discussed on the PR itself:
https://github.com/apache/kafka/pull/6528#issuecomment-480009890

20). 1.a. Good question, I've updated the wiki to let the consumer's
cleanup assignment and re-join, and not letting assignor making any
proactive changes. The idea is to keep logic simpler and not doing any
"split brain" stuff.

20). 2.b. No we do not need, since the owned-partitions will be part of the
Subscription passed in to assign() already.

30). As Boyang mentioned, there are some drawbacks that can not be
addressed by rebalance delay still, hence still voted KIP-345 (some more
details can be found on the discussion thread of KIP-345 itself). One
example is that as the instance resumes, its member id will be empty so we
are still relying on assignor to give it the assignment from the old
member-id while keeping all other member's assignment unchanged.

40). Incomplete sentence, I've updated it.

50). Here's my idea: suppose we augment the join group schema with
`protocol version` in 2.3, and then with both brokers and clients being in
version 2.3+, on the first rolling bounce where subscription and assignment
schema and / or user metadata has changed, this protocol version will be
bumped. On the broker side, when receiving all member's join-group request,
it will choose the one that has the highest protocol version (also it
assumes higher versioned protocol is always backward compatible, i.e. the
coordinator can recognize lower versioned protocol as well) and select it
as the leader. Then the leader can decide, based on its received and
deserialized subscription information, how to assign partitions and how to
encode the assignment accordingly so that everyone can understand it. With
this, in Streams for example, no version probing would be needed since we
are guaranteed the leader knows everyone's version -- again it is assuming
that higher versioned protocol is always backward compatible -- and hence
can successfully do the assignment at that round.

60). My bad, this section was not updated while the design was evolved,
I've updated it.


On Tue, Apr 9, 2019 at 7:22 PM Boyang Chen <bche...@outlook.com> wrote:

>
> Thanks for the review Matthias! My 2-cent on the rebalance delay is that
> it is a rather fixed trade-off between
>
> task availability and resource shuffling. If we eventually trigger
> rebalance after rolling bounce, certain consumer
>
> setup is still faced with global shuffles, for example member.id ranking
> based round robin strategy, as rejoining dynamic
>
> members will be assigned with new member.id which reorders the
> assignment. So I think the primary goal of incremental
>
> rebalancing is still improving the cluster availability during rebalance,
> because it didn't revoke any partition during this
>
> process. Also, the perk is minimum configuration requirement :)
>
>
> Best,
>
> Boyang
>
> ________________________________
> From: Matthias J. Sax <matth...@confluent.io>
> Sent: Tuesday, April 9, 2019 7:47 AM
> To: dev
> Subject: Re: [DISCUSS] KIP-429 : Smooth Auto-Scaling for Kafka Streams
>
> Thank for the KIP, Boyang and Guozhang!
>
>
> I made an initial pass and have some questions/comments. One high level
> comment: it seems that the KIP "mixes" plain consumer and Kafka Streams
> use case a little bit (at least in the presentation). It might be
> helpful to separate both cases clearly, or maybe limit the scope to
> plain consumer only.
>
>
>
> 10) For `PartitionAssignor.Assignment`: It seems we need a new method
> `List<TopicPartitions> revokedPartitions()` ?
>
>
>
> 20) In Section "Consumer Coordinator Algorithm"
>
>     Bullet point "1a)": If the subscription changes and a topic is
> removed from the subscription, why do we not revoke the partitions?
>
>     Bullet point "1a)": What happens is a topic is deleted (or a
> partition is removed/deleted from a topic)? Should we call the new
> `onPartitionsEmigrated()` callback for this case?
>
>     Bullet point "2b)" Should we update the `PartitionAssignor`
> interface to pass in the "old assignment" as third parameter into
> `assign()`?
>
>
>
> 30) Rebalance delay (as used in KIP-415): Could a rebalance delay
> subsume KIP-345? Configuring static members is rather complicated, and I
> am wondering if a rebalance delay would be sufficient?
>
>
>
> 40) Quote: "otherwise the we would fall into the case 3.b) forever."
>
> What is "case 3.b" ?
>
>
>
> 50) Section "Looking into the Future"
>
> Nit: the new "ProtocolVersion" field is missing in the first line
> describing "JoinGroupRequest"
>
> > This can also help saving "version probing" cost on Streams as well.
>
> How does this relate to Kafka Streams "version probing" implementation?
> How can we exploit the new `ProtocolVersion` in Streams to improve
> "version probing" ? I have a rough idea, but would like to hear more
> details.
>
>
>
> 60) Section "Recommended Upgrade Procedure"
>
> > Set the `stream.rebalancing.mode` to `upgrading`, which will force the
> stream application to stay with protocol type "consumer".
>
> This config is not discussed in the KIP and appears in this section
> without context. Can you elaborate about it?
>
>
>
> -Matthias
>
>
>
>
> On 3/29/19 6:20 PM, Guozhang Wang wrote:
> > Bump up on this discussion thread. I've added a few new drawings for
> better
> > illustration, would really appreciate your feedbacks.
> >
> >
> > Guozhang
> >
> > On Wed, Mar 20, 2019 at 6:17 PM Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> >> Hello Boyang,
> >>
> >> I've made another thorough pass over this KIP and I'd like to spilt it
> >> into two parts: the first part, covered in KIP-429 would be touching on
> >> Consumer Coordinator only to have incremental rebalance protocol in
> place.
> >> The second part (for now I've reserved KIP number 444 for it) would
> contain
> >> all the changes on StreamsPartitionAssginor to allow warming up new
> >> members.
> >>
> >> I think the first part, a.k.a. the current updated KIP-429 is ready for
> >> review and discussions again. Would love to hear people's feedbacks and
> >> ideas.
> >>
> >> Guozhang
> >>
> >>
> >>
> >> On Mon, Mar 4, 2019 at 10:09 AM Boyang Chen <bche...@outlook.com>
> wrote:
> >>
> >>> Thanks Guozhang for the great questions. Answers are inlined:
> >>>
> >>> 1. I'm still not sure if it's worthwhile to add a new type of "learner
> >>> task" in addition to "standby task": if the only difference is that for
> >>> the
> >>> latter, we would consider workload balance while for the former we
> would
> >>> not, I think we can just adjust the logic of StickyTaskAssignor a bit
> to
> >>> break that difference. Adding a new type of task would be adding a lot
> of
> >>> code complexity, so if we can still piggy-back the logic on a
> standby-task
> >>> I would prefer to do so.
> >>> In the proposal we stated that we are not adding a new type of task
> >>> implementation. The
> >>> learner task shall share the same implementation with normal standby
> >>> task, only that we
> >>> shall tag the standby task with learner and prioritize the learner
> tasks
> >>> replay effort.
> >>> 2. One thing that's still not clear from the KIP wiki itself is which
> >>> layer
> >>> would the logic be implemented at. Although for most KIPs we would not
> >>> require internal implementation details but only public facing API
> >>> updates,
> >>> for a KIP like this I think it still requires to flesh out details on
> the
> >>> implementation design. More specifically: today Streams embed a full
> >>> fledged Consumer client, which hard-code a ConsumerCoordinator inside,
> >>> Streams then injects a StreamsPartitionAssignor to its pluggable
> >>> PartitionAssignor interface and inside the StreamsPartitionAssignor we
> >>> also
> >>> have a TaskAssignor interface whose default implementation is
> >>> StickyPartitionAssignor. Streams partition assignor logic today sites
> in
> >>> the latter two classes. Hence the hierarchy today is:
> >>>
> >>> KafkaConsumer -> ConsumerCoordinator -> StreamsPartitionAssignor ->
> >>> StickyTaskAssignor.
> >>>
> >>> We need to think about where the proposed implementation would take
> place
> >>> at, and personally I think it is not the best option to inject all of
> them
> >>> into the StreamsPartitionAssignor / StickyTaskAssignor since the logic
> of
> >>> "triggering another rebalance" etc would require some coordinator logic
> >>> which is hard to mimic at PartitionAssignor level. On the other hand,
> >>> since
> >>> we are embedding a KafkaConsumer client as a whole we cannot just
> replace
> >>> ConsumerCoordinator with a specialized StreamsCoordinator like Connect
> >>> does
> >>> in KIP-415. So I'd like to maybe split the current proposal in both
> >>> consumer layer and streams-assignor layer like we did in
> KIP-98/KIP-129.
> >>> And then the key thing to consider is how to cut off the boundary so
> that
> >>> the modifications we push to ConsumerCoordinator would be beneficial
> >>> universally for any consumers, while keep the Streams-specific logic at
> >>> the
> >>> assignor level.
> >>> Yes, that's also my ideal plan. The details for the implementation are
> >>> depicted
> >>> in this doc<
> >>>
> https://docs.google.com/document/d/1me2a5wvxAZT1QE6HkwyDl7C2TiBQlKN3Dpw_I1ro91U/edit#heading=h.qix74qdmekae
> >,
> [
> https://lh5.googleusercontent.com/DXWMyKNE9rFFIv7TNX56Q41QwqYp8ynivwWSJHHORqSRkoQxtraW2bqiB-NRUGAMYKkt8A=w1200-h630-p
> ]<
> https://docs.google.com/document/d/1me2a5wvxAZT1QE6HkwyDl7C2TiBQlKN3Dpw_I1ro91U/edit#heading=h.qix74qdmekae
> >
>
> [External] KStream Smooth Auto-scaling Implementation Plan<
> https://docs.google.com/document/d/1me2a5wvxAZT1QE6HkwyDl7C2TiBQlKN3Dpw_I1ro91U/edit#heading=h.qix74qdmekae
> >
> docs.google.com
> KStream Incremental Rebalancing Implementation Plan Authors: Boyang Chen,
> Guozhang Wang KIP link Stage: [Draft | Review | Approved] Background We
> initiated KIP-429 for the promotion of incremental rebalancing work for
> KStream. Behind the scene, there is non-trivial amount of effort that needs
> to...
>
>
>
> >>> and I have explained the reasoning on why we want to push a
> >>> global change of replacing ConsumerCoordinator with StreamCoordinator.
> >>> The motivation
> >>> is that KIP space is usually used for public & algorithm level change,
> >>> not for internal
> >>> implementation details.
> >>>
> >>> 3. Depending on which design direction we choose, our migration plan
> would
> >>> also be quite different. For example, if we stay with
> ConsumerCoordinator
> >>> whose protocol type is "consumer" still, and we can manage to make all
> >>> changes agnostic to brokers as well as to old versioned consumers, then
> >>> our
> >>> migration plan could be much easier.
> >>> Yes, the upgrade plan was designed to take the new StreamCoordinator
> >>> approach
> >>> which means we shall define a new protocol type. For existing
> application
> >>> we could only
> >>> maintain the same `consumer` protocol type is because current broker
> only
> >>> allows
> >>> change of protocol type when the consumer group is empty. It is of
> course
> >>> user-unfriendly to force
> >>> a wipe-out for the entire application, and I don't think maintaining
> old
> >>> protocol type would greatly
> >>> impact ongoing services using new stream coordinator. WDYT?
> >>>
> >>> 4. I think one major issue related to this KIP is that today, in the
> >>> StickyPartitionAssignor, we always try to honor stickiness over
> workload
> >>> balance, and hence "learner task" is needed to break this priority, but
> >>> I'm
> >>> wondering if we can have a better solution within sticky task assignor
> >>> that
> >>> accommodate this?
> >>> Great question! That's what I explained in the proposal, which is that
> we
> >>> should breakdown our
> >>> delivery into different stages. At very beginning, our goal is to
> trigger
> >>> learner task assignment only on
> >>> `new` hosts, where we shall leverage leader's knowledge of previous
> round
> >>> of rebalance to figure out. After
> >>> stage one, our goal is to have a smooth scaling up experience, but the
> >>> task balance problem is kind of orthogonal.
> >>> The load balance problem is a much broader topic than auto scaling,
> which
> >>> I figure worth discussing within
> >>> this KIP's context since it's a naturally next-step, but wouldn't be
> the
> >>> main topic.
> >>> Learner task or auto scaling support should be treated as `a helpful
> >>> mechanism to reach load balance`, but not `an algorithm defining load
> >>> balance`. It would be great if you could share some insights of the
> stream
> >>> task balance, which eventually helps us to break out of the KIP-429's
> scope
> >>> and even define a separate KIP to focus on task weight & assignment
> logic
> >>> improvement.
> >>>
> >>> Also thank you for making improvement on the KIP context and
> organization!
> >>>
> >>> Best,
> >>> Boyang
> >>> ________________________________
> >>> From: Guozhang Wang <wangg...@gmail.com>
> >>> Sent: Saturday, March 2, 2019 6:00 AM
> >>> To: dev
> >>> Subject: Re: [DISCUSS] KIP-429 : Smooth Auto-Scaling for Kafka Streams
> >>>
> >>> Hello Boyang,
> >>>
> >>> I've just made a quick pass on the KIP and here are some thoughts.
> >>>
> >>> Meta:
> >>>
> >>> 1. I'm still not sure if it's worthwhile to add a new type of "learner
> >>> task" in addition to "standby task": if the only difference is that for
> >>> the
> >>> latter, we would consider workload balance while for the former we
> would
> >>> not, I think we can just adjust the logic of StickyTaskAssignor a bit
> to
> >>> break that difference. Adding a new type of task would be adding a lot
> of
> >>> code complexity, so if we can still piggy-back the logic on a
> standby-task
> >>> I would prefer to do so.
> >>>
> >>> 2. One thing that's still not clear from the KIP wiki itself is which
> >>> layer
> >>> would the logic be implemented at. Although for most KIPs we would not
> >>> require internal implementation details but only public facing API
> >>> updates,
> >>> for a KIP like this I think it still requires to flesh out details on
> the
> >>> implementation design. More specifically: today Streams embed a full
> >>> fledged Consumer client, which hard-code a ConsumerCoordinator inside,
> >>> Streams then injects a StreamsPartitionAssignor to its plugable
> >>> PartitionAssignor interface and inside the StreamsPartitionAssignor we
> >>> also
> >>> have a TaskAssignor interface whose default implementation is
> >>> StickyPartitionAssignor. Streams partition assignor logic today sites
> in
> >>> the latter two classes. Hence the hierarchy today is:
> >>>
> >>> KafkaConsumer -> ConsumerCoordinator -> StreamsPartitionAssignor ->
> >>> StickyTaskAssignor.
> >>>
> >>> We need to think about where the proposed implementation would take
> place
> >>> at, and personally I think it is not the best option to inject all of
> them
> >>> into the StreamsPartitionAssignor / StickyTaskAssignor since the logic
> of
> >>> "triggering another rebalance" etc would require some coordinator logic
> >>> which is hard to mimic at PartitionAssignor level. On the other hand,
> >>> since
> >>> we are embedding a KafkaConsumer client as a whole we cannot just
> replace
> >>> ConsumerCoordinator with a specialized StreamsCoordinator like Connect
> >>> does
> >>> in KIP-415. So I'd like to maybe split the current proposal in both
> >>> consumer layer and streams-assignor layer like we did in
> KIP-98/KIP-129.
> >>> And then the key thing to consider is how to cut off the boundary so
> that
> >>> the modifications we push to ConsumerCoordinator would be beneficial
> >>> universally for any consumers, while keep the Streams-specific logic at
> >>> the
> >>> assignor level.
> >>>
> >>> 3. Depending on which design direction we choose, our migration plan
> would
> >>> also be quite different. For example, if we stay with
> ConsumerCoordinator
> >>> whose protocol type is "consumer" still, and we can manage to make all
> >>> changes agnostic to brokers as well as to old versioned consumers, then
> >>> our
> >>> migration plan could be much easier.
> >>>
> >>> 4. I think one major issue related to this KIP is that today, in the
> >>> StickyPartitionAssignor, we always try to honor stickiness over
> workload
> >>> balance, and hence "learner task" is needed to break this priority, but
> >>> I'm
> >>> wondering if we can have a better solution within sticky task assignor
> >>> that
> >>> accommodate this?
> >>>
> >>> Minor:
> >>>
> >>> 1. The idea of two rebalances have also been discussed in
> >>> https://issues.apache.org/jira/browse/KAFKA-6145. So we should add the
> >>> reference on the wiki page as well.
> >>> 2. Could you also add a section describing how the subscription /
> >>> assignment metadata will be re-formatted? Without this information it
> is
> >>> hard to get to the bottom of your idea. For example in the "Leader
> >>> Transfer
> >>> Before Scaling" section, I'm not sure why "S2 doesn't know S4 is new
> >>> member"
> >>> and hence would blindly obey stickiness over workload balance
> requirement.
> >>>
> >>> Guozhang
> >>>
> >>>
> >>> On Thu, Feb 28, 2019 at 11:05 AM Boyang Chen <bche...@outlook.com>
> wrote:
> >>>
> >>>> Hey community friends,
> >>>>
> >>>> I'm gladly inviting you to have a look at the proposal to add
> >>> incremental
> >>>> rebalancing to Kafka Streams, A.K.A auto-scaling support.
> >>>>
> >>>>
> >>>>
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Smooth+Auto-Scaling+for+Kafka+Streams
> >>>>
> >>>> Special thanks to Guozhang for giving great guidances and important
> >>>> feedbacks while making this KIP!
> >>>>
> >>>> Best,
> >>>> Boyang
> >>>>
> >>>
> >>>
> >>> --
> >>> -- Guozhang
> >>>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
> >
>
>

-- 
-- Guozhang

Reply via email to