Thanks to the KIP and starting the VOTE Guozhang. I am overall +1.

One follow up thought: the KIP does not discuss in details, how `poll()`
will behave after the change. It might actually be important to ensure
that `poll()` behavior changes to be non-blocking to allow an
application to process data from non-revoked partitions while a
rebalance is happening in the background.

Thoughts?


-Matthias


On 5/10/19 1:10 AM, Guozhang Wang wrote:
> Hello Matthias,
> 
> I'm proposing to change this behavior holistically inside
> ConsumerCoordinator actually. In other words I'm trying to piggy-back this
> behavioral fix of KAFKA-4600 along with this KIP, and the motivation for me
> to do this piggy-backing is that, with incremental rebalancing, there would
> be partial affected partitions as we are not revoking every body any more.
> 
> 
> Guozhang
> 
> 
> On Thu, May 9, 2019 at 6:21 AM Matthias J. Sax <matth...@confluent.io>
> wrote:
> 
>> Thanks Guozhang!
>>
>> The simplified upgrade path is great!
>>
>>
>> Just a clarification question about the "Rebalance Callback Error
>> Handling" -- does this change affect the `ConsumerCoordinator` only if
>> incremental rebalancing is use? Or does the behavior also change for the
>> eager rebalancing case?
>>
>>
>> -Matthias
>>
>>
>> On 5/9/19 3:37 AM, Guozhang Wang wrote:
>>> Hello all,
>>>
>>> Thanks for everyone who've shared their feedbacks for this KIP! If
>> there's
>>> no further comments I'll start the voting thread by end of tomorrow.
>>>
>>>
>>> Guozhang.
>>>
>>> On Wed, May 8, 2019 at 6:36 PM Guozhang Wang <wangg...@gmail.com> wrote:
>>>
>>>> Hello Boyang,
>>>>
>>>> On Wed, May 1, 2019 at 4:51 PM Boyang Chen <bche...@outlook.com> wrote:
>>>>
>>>>> Hey Guozhang,
>>>>>
>>>>> thank you for the great write up. Overall the motivation and changes
>>>>> LGTM, just some minor comments:
>>>>>
>>>>>
>>>>>   1.  In "Consumer Coordinator Algorithm", we could reorder alphabet
>>>>> points for 3d~3f from ["ready-to-migrate-partitions",
>>>>> "unknown-but-owned-partitions",  "maybe-revoking-partitions"] to
>>>>> ["maybe-revoking-partitions", "ready-to-migrate-partitions",
>>>>> "unknown-but-owned-partitions"] in order to be consistent with 3c1~3.
>>>>>
>>>>
>>>> Ack. Updated.
>>>>
>>>>
>>>>>   2.  In "Consumer Coordinator Algorithm", 1c suggests to revoke all
>>>>> partition upon heartbeat/commit fail. What's the gain here? Do we want
>> to
>>>>> keep all partitions running at this moment, to be optimistic for the
>> case
>>>>> when no partitions get reassigned?
>>>>>
>>>>
>>>> That's a good catch. When REBALANCE_IN_PROGRESS is received, we can just
>>>> re-join the group with all the currently owned partitions encoded.
>> Updated.
>>>>
>>>>
>>>>>   3.  In "Recommended Upgrade Procedure", remove extra 'those': " The
>>>>> 'sticky' assignor works even those there are "
>>>>>
>>>>
>>>> Ack, should be `even when`.
>>>>
>>>>
>>>>>   4.  Put two "looking into the future" into a separate category from
>>>>> migration session. It seems inconsistent for readers to see this
>> before we
>>>>> finished discussion for everything.
>>>>>
>>>>
>>>> Ack.
>>>>
>>>>
>>>>>   5.  Have we discussed the concern on the serialization? Could the new
>>>>> metadata we are adding grow larger than the message size cap?
>>>>>
>>>>
>>>> We're completing https://issues.apache.org/jira/browse/KAFKA-7149 which
>>>> should largely reduce the message size (will update the wiki
>> accordingly as
>>>> well).
>>>>
>>>>
>>>>>
>>>>> Boyang
>>>>>
>>>>> ________________________________
>>>>> From: Guozhang Wang <wangg...@gmail.com>
>>>>> Sent: Monday, April 15, 2019 9:20 AM
>>>>> To: dev
>>>>> Subject: Re: [DISCUSS] KIP-429 : Smooth Auto-Scaling for Kafka Streams
>>>>>
>>>>> Hello Jason,
>>>>>
>>>>> I agree with you that for range / round-robin it makes less sense to be
>>>>> compatible with cooperative rebalance protocol.
>>>>>
>>>>> As for StickyAssignor, however, I think it would still be possible to
>> make
>>>>> the current implementation to be compatible with cooperative
>> rebalance. So
>>>>> after pondering on different options at hand I'm now proposing this
>>>>> approach as listed in the upgrade section:
>>>>>
>>>>>
>>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol#KIP-429:KafkaConsumerIncrementalRebalanceProtocol-CompatibilityandUpgradePath
>>>>>
>>>>> The idea is to let assignors specify which protocols it would work
>> with,
>>>>> associating with a different name; then the upgrade path would involve
>> a
>>>>> "compatible" protocol which actually still use eager behavior while
>>>>> encoding two assignors if possible. In "Rejected Section" (just to
>> clarify
>>>>> I'm not finalizing it as rejected, just putting it there for now, and
>> if
>>>>> we
>>>>> like this one instead we can always switch them) I listed the other
>>>>> approach we once discussed about, and arguing its cons of duplicated
>> class
>>>>> seems overwhelm the pros of saving the  "rebalance.protocol" config.
>>>>>
>>>>> Let me know WDYT.
>>>>>
>>>>> Guozhang
>>>>>
>>>>> On Fri, Apr 12, 2019 at 6:08 PM Jason Gustafson <ja...@confluent.io>
>>>>> wrote:
>>>>>
>>>>>> Hi Guozhang,
>>>>>>
>>>>>> Responses below:
>>>>>>
>>>>>> 2. The interface's default implementation will just be
>>>>>>> `onPartitionRevoked`, so for user's instantiation if they do not make
>>>>> any
>>>>>>> code changes they should be able to recompile the code and continue.
>>>>>>
>>>>>>
>>>>>> Ack, makes sense.
>>>>>>
>>>>>> 4. Hmm.. not sure if it will work. The main issue is that the
>>>>>>> consumer-coordinator behavior (whether to revoke all or none at
>>>>>>> onRebalancePrepare) is independent of the selected protocol's
>> assignor
>>>>>>> (eager or cooperative), so even if the assignor is selected to be the
>>>>>>> old-versioned one, we will still not revoke at the
>>>>> consumer-coordinator
>>>>>>> layer and hence has the same risk of migrating still-owned
>> partitions,
>>>>>>> right?
>>>>>>
>>>>>>
>>>>>> Yeah, basically we would have to push the eager/cooperative logic into
>>>>> the
>>>>>> PartitionAssignor itself and make the consumer aware of the rebalance
>>>>>> protocol it is compatible with. As long as an eager protocol _could_
>> be
>>>>>> selected, the consumer would have to be pessimistic and do eager
>>>>>> revocation. But if all the assignors configured in the consumer
>> support
>>>>>> cooperative reassignment, then either 1) a cooperative protocol will
>> be
>>>>>> selected and cooperative revocation can be safely used, or 2) if the
>>>>> rest
>>>>>> of the group does not support it, then the consumer will simply fail.
>>>>>>
>>>>>> Another point which you raised offline and I will repeat here is that
>>>>> this
>>>>>> proposal's benefit is mostly limited to sticky assignment logic.
>>>>> Arguably
>>>>>> the range assignor may have some incidental stickiness, particularly
>> if
>>>>> the
>>>>>> group is rebalancing for a newly created or deleted topic. For other
>>>>> cases,
>>>>>> the proposal is mostly additional overhead since it takes an
>> additional
>>>>>> rebalance and many of the partitions will move. Perhaps it doesn't
>> make
>>>>> as
>>>>>> much sense to use the cooperative protocol for strategies like range
>> and
>>>>>> round-robin. That kind of argues in favor of pushing some of the
>> control
>>>>>> into the assignor itself. Maybe we would not bother creating
>>>>>> CooperativeRange as I suggested above, but it would make sense to
>>>>> create a
>>>>>> cooperative version of the sticky assignment strategy. I thought we
>>>>> might
>>>>>> have to create a new sticky assignor anyway because I can't see how we
>>>>>> would get compatible behavior mixing with the old version anyway.
>>>>>>
>>>>>> Thanks,
>>>>>> Jason
>>>>>>
>>>>>>
>>>>>> On Thu, Apr 11, 2019 at 5:53 PM Guozhang Wang <wangg...@gmail.com>
>>>>> wrote:
>>>>>>
>>>>>>> Hello Matthias:
>>>>>>>
>>>>>>> Thanks for your review.
>>>>>>>
>>>>>>> The background section uses streams assignor as well as the
>> consumer's
>>>>>> own
>>>>>>> stick assignor as examples illustrating the situation, but this KIP
>> is
>>>>>> for
>>>>>>> consumer coordinator itself, and the rest of the paragraph did not
>>>>> talk
>>>>>>> about Streams any more. If you feel it's a bit distracted I can
>> remove
>>>>>>> those examples.
>>>>>>>
>>>>>>> 10). While working on the PR I realized that the revoked partitions
>> on
>>>>>>> assignment is not needed (this is being discussed on the PR itself:
>>>>>>> https://github.com/apache/kafka/pull/6528#issuecomment-480009890
>>>>>>>
>>>>>>> 20). 1.a. Good question, I've updated the wiki to let the consumer's
>>>>>>> cleanup assignment and re-join, and not letting assignor making any
>>>>>>> proactive changes. The idea is to keep logic simpler and not doing
>> any
>>>>>>> "split brain" stuff.
>>>>>>>
>>>>>>> 20). 2.b. No we do not need, since the owned-partitions will be part
>>>>> of
>>>>>> the
>>>>>>> Subscription passed in to assign() already.
>>>>>>>
>>>>>>> 30). As Boyang mentioned, there are some drawbacks that can not be
>>>>>>> addressed by rebalance delay still, hence still voted KIP-345 (some
>>>>> more
>>>>>>> details can be found on the discussion thread of KIP-345 itself). One
>>>>>>> example is that as the instance resumes, its member id will be empty
>>>>> so
>>>>>> we
>>>>>>> are still relying on assignor to give it the assignment from the old
>>>>>>> member-id while keeping all other member's assignment unchanged.
>>>>>>>
>>>>>>> 40). Incomplete sentence, I've updated it.
>>>>>>>
>>>>>>> 50). Here's my idea: suppose we augment the join group schema with
>>>>>>> `protocol version` in 2.3, and then with both brokers and clients
>>>>> being
>>>>>> in
>>>>>>> version 2.3+, on the first rolling bounce where subscription and
>>>>>> assignment
>>>>>>> schema and / or user metadata has changed, this protocol version will
>>>>> be
>>>>>>> bumped. On the broker side, when receiving all member's join-group
>>>>>> request,
>>>>>>> it will choose the one that has the highest protocol version (also it
>>>>>>> assumes higher versioned protocol is always backward compatible, i.e.
>>>>> the
>>>>>>> coordinator can recognize lower versioned protocol as well) and
>>>>> select it
>>>>>>> as the leader. Then the leader can decide, based on its received and
>>>>>>> deserialized subscription information, how to assign partitions and
>>>>> how
>>>>>> to
>>>>>>> encode the assignment accordingly so that everyone can understand it.
>>>>>> With
>>>>>>> this, in Streams for example, no version probing would be needed
>>>>> since we
>>>>>>> are guaranteed the leader knows everyone's version -- again it is
>>>>>> assuming
>>>>>>> that higher versioned protocol is always backward compatible -- and
>>>>> hence
>>>>>>> can successfully do the assignment at that round.
>>>>>>>
>>>>>>> 60). My bad, this section was not updated while the design was
>>>>> evolved,
>>>>>>> I've updated it.
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Apr 9, 2019 at 7:22 PM Boyang Chen <bche...@outlook.com>
>>>>> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>> Thanks for the review Matthias! My 2-cent on the rebalance delay is
>>>>>> that
>>>>>>>> it is a rather fixed trade-off between
>>>>>>>>
>>>>>>>> task availability and resource shuffling. If we eventually trigger
>>>>>>>> rebalance after rolling bounce, certain consumer
>>>>>>>>
>>>>>>>> setup is still faced with global shuffles, for example member.id
>>>>>> ranking
>>>>>>>> based round robin strategy, as rejoining dynamic
>>>>>>>>
>>>>>>>> members will be assigned with new member.id which reorders the
>>>>>>>> assignment. So I think the primary goal of incremental
>>>>>>>>
>>>>>>>> rebalancing is still improving the cluster availability during
>>>>>> rebalance,
>>>>>>>> because it didn't revoke any partition during this
>>>>>>>>
>>>>>>>> process. Also, the perk is minimum configuration requirement :)
>>>>>>>>
>>>>>>>>
>>>>>>>> Best,
>>>>>>>>
>>>>>>>> Boyang
>>>>>>>>
>>>>>>>> ________________________________
>>>>>>>> From: Matthias J. Sax <matth...@confluent.io>
>>>>>>>> Sent: Tuesday, April 9, 2019 7:47 AM
>>>>>>>> To: dev
>>>>>>>> Subject: Re: [DISCUSS] KIP-429 : Smooth Auto-Scaling for Kafka
>>>>> Streams
>>>>>>>>
>>>>>>>> Thank for the KIP, Boyang and Guozhang!
>>>>>>>>
>>>>>>>>
>>>>>>>> I made an initial pass and have some questions/comments. One high
>>>>> level
>>>>>>>> comment: it seems that the KIP "mixes" plain consumer and Kafka
>>>>> Streams
>>>>>>>> use case a little bit (at least in the presentation). It might be
>>>>>>>> helpful to separate both cases clearly, or maybe limit the scope to
>>>>>>>> plain consumer only.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> 10) For `PartitionAssignor.Assignment`: It seems we need a new
>>>>> method
>>>>>>>> `List<TopicPartitions> revokedPartitions()` ?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> 20) In Section "Consumer Coordinator Algorithm"
>>>>>>>>
>>>>>>>>     Bullet point "1a)": If the subscription changes and a topic is
>>>>>>>> removed from the subscription, why do we not revoke the partitions?
>>>>>>>>
>>>>>>>>     Bullet point "1a)": What happens is a topic is deleted (or a
>>>>>>>> partition is removed/deleted from a topic)? Should we call the new
>>>>>>>> `onPartitionsEmigrated()` callback for this case?
>>>>>>>>
>>>>>>>>     Bullet point "2b)" Should we update the `PartitionAssignor`
>>>>>>>> interface to pass in the "old assignment" as third parameter into
>>>>>>>> `assign()`?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> 30) Rebalance delay (as used in KIP-415): Could a rebalance delay
>>>>>>>> subsume KIP-345? Configuring static members is rather complicated,
>>>>> and
>>>>>> I
>>>>>>>> am wondering if a rebalance delay would be sufficient?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> 40) Quote: "otherwise the we would fall into the case 3.b) forever."
>>>>>>>>
>>>>>>>> What is "case 3.b" ?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> 50) Section "Looking into the Future"
>>>>>>>>
>>>>>>>> Nit: the new "ProtocolVersion" field is missing in the first line
>>>>>>>> describing "JoinGroupRequest"
>>>>>>>>
>>>>>>>>> This can also help saving "version probing" cost on Streams as
>>>>> well.
>>>>>>>>
>>>>>>>> How does this relate to Kafka Streams "version probing"
>>>>> implementation?
>>>>>>>> How can we exploit the new `ProtocolVersion` in Streams to improve
>>>>>>>> "version probing" ? I have a rough idea, but would like to hear more
>>>>>>>> details.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> 60) Section "Recommended Upgrade Procedure"
>>>>>>>>
>>>>>>>>> Set the `stream.rebalancing.mode` to `upgrading`, which will force
>>>>>> the
>>>>>>>> stream application to stay with protocol type "consumer".
>>>>>>>>
>>>>>>>> This config is not discussed in the KIP and appears in this section
>>>>>>>> without context. Can you elaborate about it?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> -Matthias
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On 3/29/19 6:20 PM, Guozhang Wang wrote:
>>>>>>>>> Bump up on this discussion thread. I've added a few new drawings
>>>>> for
>>>>>>>> better
>>>>>>>>> illustration, would really appreciate your feedbacks.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Guozhang
>>>>>>>>>
>>>>>>>>> On Wed, Mar 20, 2019 at 6:17 PM Guozhang Wang <wangg...@gmail.com
>>>>>>
>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hello Boyang,
>>>>>>>>>>
>>>>>>>>>> I've made another thorough pass over this KIP and I'd like to
>>>>> spilt
>>>>>> it
>>>>>>>>>> into two parts: the first part, covered in KIP-429 would be
>>>>> touching
>>>>>>> on
>>>>>>>>>> Consumer Coordinator only to have incremental rebalance protocol
>>>>> in
>>>>>>>> place.
>>>>>>>>>> The second part (for now I've reserved KIP number 444 for it)
>>>>> would
>>>>>>>> contain
>>>>>>>>>> all the changes on StreamsPartitionAssginor to allow warming up
>>>>> new
>>>>>>>>>> members.
>>>>>>>>>>
>>>>>>>>>> I think the first part, a.k.a. the current updated KIP-429 is
>>>>> ready
>>>>>>> for
>>>>>>>>>> review and discussions again. Would love to hear people's
>>>>> feedbacks
>>>>>>> and
>>>>>>>>>> ideas.
>>>>>>>>>>
>>>>>>>>>> Guozhang
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Mon, Mar 4, 2019 at 10:09 AM Boyang Chen <bche...@outlook.com
>>>>>>
>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Thanks Guozhang for the great questions. Answers are inlined:
>>>>>>>>>>>
>>>>>>>>>>> 1. I'm still not sure if it's worthwhile to add a new type of
>>>>>>> "learner
>>>>>>>>>>> task" in addition to "standby task": if the only difference is
>>>>> that
>>>>>>> for
>>>>>>>>>>> the
>>>>>>>>>>> latter, we would consider workload balance while for the former
>>>>> we
>>>>>>>> would
>>>>>>>>>>> not, I think we can just adjust the logic of StickyTaskAssignor
>>>>> a
>>>>>> bit
>>>>>>>> to
>>>>>>>>>>> break that difference. Adding a new type of task would be
>>>>> adding a
>>>>>>> lot
>>>>>>>> of
>>>>>>>>>>> code complexity, so if we can still piggy-back the logic on a
>>>>>>>> standby-task
>>>>>>>>>>> I would prefer to do so.
>>>>>>>>>>> In the proposal we stated that we are not adding a new type of
>>>>> task
>>>>>>>>>>> implementation. The
>>>>>>>>>>> learner task shall share the same implementation with normal
>>>>>> standby
>>>>>>>>>>> task, only that we
>>>>>>>>>>> shall tag the standby task with learner and prioritize the
>>>>> learner
>>>>>>>> tasks
>>>>>>>>>>> replay effort.
>>>>>>>>>>> 2. One thing that's still not clear from the KIP wiki itself is
>>>>>> which
>>>>>>>>>>> layer
>>>>>>>>>>> would the logic be implemented at. Although for most KIPs we
>>>>> would
>>>>>>> not
>>>>>>>>>>> require internal implementation details but only public facing
>>>>> API
>>>>>>>>>>> updates,
>>>>>>>>>>> for a KIP like this I think it still requires to flesh out
>>>>> details
>>>>>> on
>>>>>>>> the
>>>>>>>>>>> implementation design. More specifically: today Streams embed a
>>>>>> full
>>>>>>>>>>> fledged Consumer client, which hard-code a ConsumerCoordinator
>>>>>>> inside,
>>>>>>>>>>> Streams then injects a StreamsPartitionAssignor to its pluggable
>>>>>>>>>>> PartitionAssignor interface and inside the
>>>>> StreamsPartitionAssignor
>>>>>>> we
>>>>>>>>>>> also
>>>>>>>>>>> have a TaskAssignor interface whose default implementation is
>>>>>>>>>>> StickyPartitionAssignor. Streams partition assignor logic today
>>>>>> sites
>>>>>>>> in
>>>>>>>>>>> the latter two classes. Hence the hierarchy today is:
>>>>>>>>>>>
>>>>>>>>>>> KafkaConsumer -> ConsumerCoordinator ->
>>>>> StreamsPartitionAssignor ->
>>>>>>>>>>> StickyTaskAssignor.
>>>>>>>>>>>
>>>>>>>>>>> We need to think about where the proposed implementation would
>>>>> take
>>>>>>>> place
>>>>>>>>>>> at, and personally I think it is not the best option to inject
>>>>> all
>>>>>> of
>>>>>>>> them
>>>>>>>>>>> into the StreamsPartitionAssignor / StickyTaskAssignor since the
>>>>>>> logic
>>>>>>>> of
>>>>>>>>>>> "triggering another rebalance" etc would require some
>>>>> coordinator
>>>>>>> logic
>>>>>>>>>>> which is hard to mimic at PartitionAssignor level. On the other
>>>>>> hand,
>>>>>>>>>>> since
>>>>>>>>>>> we are embedding a KafkaConsumer client as a whole we cannot
>>>>> just
>>>>>>>> replace
>>>>>>>>>>> ConsumerCoordinator with a specialized StreamsCoordinator like
>>>>>>> Connect
>>>>>>>>>>> does
>>>>>>>>>>> in KIP-415. So I'd like to maybe split the current proposal in
>>>>> both
>>>>>>>>>>> consumer layer and streams-assignor layer like we did in
>>>>>>>> KIP-98/KIP-129.
>>>>>>>>>>> And then the key thing to consider is how to cut off the
>>>>> boundary
>>>>>> so
>>>>>>>> that
>>>>>>>>>>> the modifications we push to ConsumerCoordinator would be
>>>>>> beneficial
>>>>>>>>>>> universally for any consumers, while keep the Streams-specific
>>>>>> logic
>>>>>>> at
>>>>>>>>>>> the
>>>>>>>>>>> assignor level.
>>>>>>>>>>> Yes, that's also my ideal plan. The details for the
>>>>> implementation
>>>>>>> are
>>>>>>>>>>> depicted
>>>>>>>>>>> in this doc<
>>>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>> https://docs.google.com/document/d/1me2a5wvxAZT1QE6HkwyDl7C2TiBQlKN3Dpw_I1ro91U/edit#heading=h.qix74qdmekae
>>>>>>>>> ,
>>>>>>>> [
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>> https://lh5.googleusercontent.com/DXWMyKNE9rFFIv7TNX56Q41QwqYp8ynivwWSJHHORqSRkoQxtraW2bqiB-NRUGAMYKkt8A=w1200-h630-p
>>>>>>>> ]<
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>> https://docs.google.com/document/d/1me2a5wvxAZT1QE6HkwyDl7C2TiBQlKN3Dpw_I1ro91U/edit#heading=h.qix74qdmekae
>>>>>>>>>
>>>>>>>>
>>>>>>>> [External] KStream Smooth Auto-scaling Implementation Plan<
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>> https://docs.google.com/document/d/1me2a5wvxAZT1QE6HkwyDl7C2TiBQlKN3Dpw_I1ro91U/edit#heading=h.qix74qdmekae
>>>>>>>>>
>>>>>>>> docs.google.com
>>>>>>>> KStream Incremental Rebalancing Implementation Plan Authors: Boyang
>>>>>> Chen,
>>>>>>>> Guozhang Wang KIP link Stage: [Draft | Review | Approved]
>>>>> Background We
>>>>>>>> initiated KIP-429 for the promotion of incremental rebalancing work
>>>>> for
>>>>>>>> KStream. Behind the scene, there is non-trivial amount of effort
>>>>> that
>>>>>>> needs
>>>>>>>> to...
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>>>> and I have explained the reasoning on why we want to push a
>>>>>>>>>>> global change of replacing ConsumerCoordinator with
>>>>>>> StreamCoordinator.
>>>>>>>>>>> The motivation
>>>>>>>>>>> is that KIP space is usually used for public & algorithm level
>>>>>>> change,
>>>>>>>>>>> not for internal
>>>>>>>>>>> implementation details.
>>>>>>>>>>>
>>>>>>>>>>> 3. Depending on which design direction we choose, our migration
>>>>>> plan
>>>>>>>> would
>>>>>>>>>>> also be quite different. For example, if we stay with
>>>>>>>> ConsumerCoordinator
>>>>>>>>>>> whose protocol type is "consumer" still, and we can manage to
>>>>> make
>>>>>>> all
>>>>>>>>>>> changes agnostic to brokers as well as to old versioned
>>>>> consumers,
>>>>>>> then
>>>>>>>>>>> our
>>>>>>>>>>> migration plan could be much easier.
>>>>>>>>>>> Yes, the upgrade plan was designed to take the new
>>>>>> StreamCoordinator
>>>>>>>>>>> approach
>>>>>>>>>>> which means we shall define a new protocol type. For existing
>>>>>>>> application
>>>>>>>>>>> we could only
>>>>>>>>>>> maintain the same `consumer` protocol type is because current
>>>>>> broker
>>>>>>>> only
>>>>>>>>>>> allows
>>>>>>>>>>> change of protocol type when the consumer group is empty. It is
>>>>> of
>>>>>>>> course
>>>>>>>>>>> user-unfriendly to force
>>>>>>>>>>> a wipe-out for the entire application, and I don't think
>>>>>> maintaining
>>>>>>>> old
>>>>>>>>>>> protocol type would greatly
>>>>>>>>>>> impact ongoing services using new stream coordinator. WDYT?
>>>>>>>>>>>
>>>>>>>>>>> 4. I think one major issue related to this KIP is that today, in
>>>>>> the
>>>>>>>>>>> StickyPartitionAssignor, we always try to honor stickiness over
>>>>>>>> workload
>>>>>>>>>>> balance, and hence "learner task" is needed to break this
>>>>> priority,
>>>>>>> but
>>>>>>>>>>> I'm
>>>>>>>>>>> wondering if we can have a better solution within sticky task
>>>>>>> assignor
>>>>>>>>>>> that
>>>>>>>>>>> accommodate this?
>>>>>>>>>>> Great question! That's what I explained in the proposal, which
>>>>> is
>>>>>>> that
>>>>>>>> we
>>>>>>>>>>> should breakdown our
>>>>>>>>>>> delivery into different stages. At very beginning, our goal is
>>>>> to
>>>>>>>> trigger
>>>>>>>>>>> learner task assignment only on
>>>>>>>>>>> `new` hosts, where we shall leverage leader's knowledge of
>>>>> previous
>>>>>>>> round
>>>>>>>>>>> of rebalance to figure out. After
>>>>>>>>>>> stage one, our goal is to have a smooth scaling up experience,
>>>>> but
>>>>>>> the
>>>>>>>>>>> task balance problem is kind of orthogonal.
>>>>>>>>>>> The load balance problem is a much broader topic than auto
>>>>> scaling,
>>>>>>>> which
>>>>>>>>>>> I figure worth discussing within
>>>>>>>>>>> this KIP's context since it's a naturally next-step, but
>>>>> wouldn't
>>>>>> be
>>>>>>>> the
>>>>>>>>>>> main topic.
>>>>>>>>>>> Learner task or auto scaling support should be treated as `a
>>>>>> helpful
>>>>>>>>>>> mechanism to reach load balance`, but not `an algorithm defining
>>>>>> load
>>>>>>>>>>> balance`. It would be great if you could share some insights of
>>>>> the
>>>>>>>> stream
>>>>>>>>>>> task balance, which eventually helps us to break out of the
>>>>>> KIP-429's
>>>>>>>> scope
>>>>>>>>>>> and even define a separate KIP to focus on task weight &
>>>>> assignment
>>>>>>>> logic
>>>>>>>>>>> improvement.
>>>>>>>>>>>
>>>>>>>>>>> Also thank you for making improvement on the KIP context and
>>>>>>>> organization!
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Boyang
>>>>>>>>>>> ________________________________
>>>>>>>>>>> From: Guozhang Wang <wangg...@gmail.com>
>>>>>>>>>>> Sent: Saturday, March 2, 2019 6:00 AM
>>>>>>>>>>> To: dev
>>>>>>>>>>> Subject: Re: [DISCUSS] KIP-429 : Smooth Auto-Scaling for Kafka
>>>>>>> Streams
>>>>>>>>>>>
>>>>>>>>>>> Hello Boyang,
>>>>>>>>>>>
>>>>>>>>>>> I've just made a quick pass on the KIP and here are some
>>>>> thoughts.
>>>>>>>>>>>
>>>>>>>>>>> Meta:
>>>>>>>>>>>
>>>>>>>>>>> 1. I'm still not sure if it's worthwhile to add a new type of
>>>>>>> "learner
>>>>>>>>>>> task" in addition to "standby task": if the only difference is
>>>>> that
>>>>>>> for
>>>>>>>>>>> the
>>>>>>>>>>> latter, we would consider workload balance while for the former
>>>>> we
>>>>>>>> would
>>>>>>>>>>> not, I think we can just adjust the logic of StickyTaskAssignor
>>>>> a
>>>>>> bit
>>>>>>>> to
>>>>>>>>>>> break that difference. Adding a new type of task would be
>>>>> adding a
>>>>>>> lot
>>>>>>>> of
>>>>>>>>>>> code complexity, so if we can still piggy-back the logic on a
>>>>>>>> standby-task
>>>>>>>>>>> I would prefer to do so.
>>>>>>>>>>>
>>>>>>>>>>> 2. One thing that's still not clear from the KIP wiki itself is
>>>>>> which
>>>>>>>>>>> layer
>>>>>>>>>>> would the logic be implemented at. Although for most KIPs we
>>>>> would
>>>>>>> not
>>>>>>>>>>> require internal implementation details but only public facing
>>>>> API
>>>>>>>>>>> updates,
>>>>>>>>>>> for a KIP like this I think it still requires to flesh out
>>>>> details
>>>>>> on
>>>>>>>> the
>>>>>>>>>>> implementation design. More specifically: today Streams embed a
>>>>>> full
>>>>>>>>>>> fledged Consumer client, which hard-code a ConsumerCoordinator
>>>>>>> inside,
>>>>>>>>>>> Streams then injects a StreamsPartitionAssignor to its plugable
>>>>>>>>>>> PartitionAssignor interface and inside the
>>>>> StreamsPartitionAssignor
>>>>>>> we
>>>>>>>>>>> also
>>>>>>>>>>> have a TaskAssignor interface whose default implementation is
>>>>>>>>>>> StickyPartitionAssignor. Streams partition assignor logic today
>>>>>> sites
>>>>>>>> in
>>>>>>>>>>> the latter two classes. Hence the hierarchy today is:
>>>>>>>>>>>
>>>>>>>>>>> KafkaConsumer -> ConsumerCoordinator ->
>>>>> StreamsPartitionAssignor ->
>>>>>>>>>>> StickyTaskAssignor.
>>>>>>>>>>>
>>>>>>>>>>> We need to think about where the proposed implementation would
>>>>> take
>>>>>>>> place
>>>>>>>>>>> at, and personally I think it is not the best option to inject
>>>>> all
>>>>>> of
>>>>>>>> them
>>>>>>>>>>> into the StreamsPartitionAssignor / StickyTaskAssignor since the
>>>>>>> logic
>>>>>>>> of
>>>>>>>>>>> "triggering another rebalance" etc would require some
>>>>> coordinator
>>>>>>> logic
>>>>>>>>>>> which is hard to mimic at PartitionAssignor level. On the other
>>>>>> hand,
>>>>>>>>>>> since
>>>>>>>>>>> we are embedding a KafkaConsumer client as a whole we cannot
>>>>> just
>>>>>>>> replace
>>>>>>>>>>> ConsumerCoordinator with a specialized StreamsCoordinator like
>>>>>>> Connect
>>>>>>>>>>> does
>>>>>>>>>>> in KIP-415. So I'd like to maybe split the current proposal in
>>>>> both
>>>>>>>>>>> consumer layer and streams-assignor layer like we did in
>>>>>>>> KIP-98/KIP-129.
>>>>>>>>>>> And then the key thing to consider is how to cut off the
>>>>> boundary
>>>>>> so
>>>>>>>> that
>>>>>>>>>>> the modifications we push to ConsumerCoordinator would be
>>>>>> beneficial
>>>>>>>>>>> universally for any consumers, while keep the Streams-specific
>>>>>> logic
>>>>>>> at
>>>>>>>>>>> the
>>>>>>>>>>> assignor level.
>>>>>>>>>>>
>>>>>>>>>>> 3. Depending on which design direction we choose, our migration
>>>>>> plan
>>>>>>>> would
>>>>>>>>>>> also be quite different. For example, if we stay with
>>>>>>>> ConsumerCoordinator
>>>>>>>>>>> whose protocol type is "consumer" still, and we can manage to
>>>>> make
>>>>>>> all
>>>>>>>>>>> changes agnostic to brokers as well as to old versioned
>>>>> consumers,
>>>>>>> then
>>>>>>>>>>> our
>>>>>>>>>>> migration plan could be much easier.
>>>>>>>>>>>
>>>>>>>>>>> 4. I think one major issue related to this KIP is that today, in
>>>>>> the
>>>>>>>>>>> StickyPartitionAssignor, we always try to honor stickiness over
>>>>>>>> workload
>>>>>>>>>>> balance, and hence "learner task" is needed to break this
>>>>> priority,
>>>>>>> but
>>>>>>>>>>> I'm
>>>>>>>>>>> wondering if we can have a better solution within sticky task
>>>>>>> assignor
>>>>>>>>>>> that
>>>>>>>>>>> accommodate this?
>>>>>>>>>>>
>>>>>>>>>>> Minor:
>>>>>>>>>>>
>>>>>>>>>>> 1. The idea of two rebalances have also been discussed in
>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-6145. So we should
>>>>> add
>>>>>>> the
>>>>>>>>>>> reference on the wiki page as well.
>>>>>>>>>>> 2. Could you also add a section describing how the subscription
>>>>> /
>>>>>>>>>>> assignment metadata will be re-formatted? Without this
>>>>> information
>>>>>> it
>>>>>>>> is
>>>>>>>>>>> hard to get to the bottom of your idea. For example in the
>>>>> "Leader
>>>>>>>>>>> Transfer
>>>>>>>>>>> Before Scaling" section, I'm not sure why "S2 doesn't know S4 is
>>>>>> new
>>>>>>>>>>> member"
>>>>>>>>>>> and hence would blindly obey stickiness over workload balance
>>>>>>>> requirement.
>>>>>>>>>>>
>>>>>>>>>>> Guozhang
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Feb 28, 2019 at 11:05 AM Boyang Chen <
>>>>> bche...@outlook.com>
>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hey community friends,
>>>>>>>>>>>>
>>>>>>>>>>>> I'm gladly inviting you to have a look at the proposal to add
>>>>>>>>>>> incremental
>>>>>>>>>>>> rebalancing to Kafka Streams, A.K.A auto-scaling support.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Smooth+Auto-Scaling+for+Kafka+Streams
>>>>>>>>>>>>
>>>>>>>>>>>> Special thanks to Guozhang for giving great guidances and
>>>>>> important
>>>>>>>>>>>> feedbacks while making this KIP!
>>>>>>>>>>>>
>>>>>>>>>>>> Best,
>>>>>>>>>>>> Boyang
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> -- Guozhang
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> -- Guozhang
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> -- Guozhang
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> -- Guozhang
>>>>>
>>>>
>>>>
>>>> --
>>>> -- Guozhang
>>>>
>>>
>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to