SGTM.

I think we should still document the behavior on the KIP and explain why
it's implemented that way.


-Matthias

On 5/22/19 7:04 PM, Guozhang Wang wrote:
> Hello Matthias,
> 
> I've thought about that before, and the reason I did not include this as
> part of the KIP-429 scope is that fetcher / coordinator may get quite
> complicated to return non-empty data if "updateAssignmentMetadataIfNeeded"
> returns false in KafkaConsumer. In addition, when there's a rebalance in
> progress, letting consumers to process data which potentially may take
> longer time (in Streams for example, it is related to `max.poll.interval`
> config) could lead to higher chance of "partitions lost" and wasted
> processing work.
> 
> So I've decided to still keep it as simple as is today, and admittedly from
> a user perspective, they may see consecutive "poll" call returning no data.
> I will create a JIRA ticket capturing this idea for future discussion
> whether we should consider this as a general optimization in consumer. Does
> that sound good to you?
> 
> 
> Guozhang
> 
> 
> On Wed, May 22, 2019 at 4:31 AM Matthias J. Sax <matth...@confluent.io>
> wrote:
> 
>> Thanks to the KIP and starting the VOTE Guozhang. I am overall +1.
>>
>> One follow up thought: the KIP does not discuss in details, how `poll()`
>> will behave after the change. It might actually be important to ensure
>> that `poll()` behavior changes to be non-blocking to allow an
>> application to process data from non-revoked partitions while a
>> rebalance is happening in the background.
>>
>> Thoughts?
>>
>>
>> -Matthias
>>
>>
>> On 5/10/19 1:10 AM, Guozhang Wang wrote:
>>> Hello Matthias,
>>>
>>> I'm proposing to change this behavior holistically inside
>>> ConsumerCoordinator actually. In other words I'm trying to piggy-back
>> this
>>> behavioral fix of KAFKA-4600 along with this KIP, and the motivation for
>> me
>>> to do this piggy-backing is that, with incremental rebalancing, there
>> would
>>> be partial affected partitions as we are not revoking every body any
>> more.
>>>
>>>
>>> Guozhang
>>>
>>>
>>> On Thu, May 9, 2019 at 6:21 AM Matthias J. Sax <matth...@confluent.io>
>>> wrote:
>>>
>>>> Thanks Guozhang!
>>>>
>>>> The simplified upgrade path is great!
>>>>
>>>>
>>>> Just a clarification question about the "Rebalance Callback Error
>>>> Handling" -- does this change affect the `ConsumerCoordinator` only if
>>>> incremental rebalancing is use? Or does the behavior also change for the
>>>> eager rebalancing case?
>>>>
>>>>
>>>> -Matthias
>>>>
>>>>
>>>> On 5/9/19 3:37 AM, Guozhang Wang wrote:
>>>>> Hello all,
>>>>>
>>>>> Thanks for everyone who've shared their feedbacks for this KIP! If
>>>> there's
>>>>> no further comments I'll start the voting thread by end of tomorrow.
>>>>>
>>>>>
>>>>> Guozhang.
>>>>>
>>>>> On Wed, May 8, 2019 at 6:36 PM Guozhang Wang <wangg...@gmail.com>
>> wrote:
>>>>>
>>>>>> Hello Boyang,
>>>>>>
>>>>>> On Wed, May 1, 2019 at 4:51 PM Boyang Chen <bche...@outlook.com>
>> wrote:
>>>>>>
>>>>>>> Hey Guozhang,
>>>>>>>
>>>>>>> thank you for the great write up. Overall the motivation and changes
>>>>>>> LGTM, just some minor comments:
>>>>>>>
>>>>>>>
>>>>>>>   1.  In "Consumer Coordinator Algorithm", we could reorder alphabet
>>>>>>> points for 3d~3f from ["ready-to-migrate-partitions",
>>>>>>> "unknown-but-owned-partitions",  "maybe-revoking-partitions"] to
>>>>>>> ["maybe-revoking-partitions", "ready-to-migrate-partitions",
>>>>>>> "unknown-but-owned-partitions"] in order to be consistent with 3c1~3.
>>>>>>>
>>>>>>
>>>>>> Ack. Updated.
>>>>>>
>>>>>>
>>>>>>>   2.  In "Consumer Coordinator Algorithm", 1c suggests to revoke all
>>>>>>> partition upon heartbeat/commit fail. What's the gain here? Do we
>> want
>>>> to
>>>>>>> keep all partitions running at this moment, to be optimistic for the
>>>> case
>>>>>>> when no partitions get reassigned?
>>>>>>>
>>>>>>
>>>>>> That's a good catch. When REBALANCE_IN_PROGRESS is received, we can
>> just
>>>>>> re-join the group with all the currently owned partitions encoded.
>>>> Updated.
>>>>>>
>>>>>>
>>>>>>>   3.  In "Recommended Upgrade Procedure", remove extra 'those': " The
>>>>>>> 'sticky' assignor works even those there are "
>>>>>>>
>>>>>>
>>>>>> Ack, should be `even when`.
>>>>>>
>>>>>>
>>>>>>>   4.  Put two "looking into the future" into a separate category from
>>>>>>> migration session. It seems inconsistent for readers to see this
>>>> before we
>>>>>>> finished discussion for everything.
>>>>>>>
>>>>>>
>>>>>> Ack.
>>>>>>
>>>>>>
>>>>>>>   5.  Have we discussed the concern on the serialization? Could the
>> new
>>>>>>> metadata we are adding grow larger than the message size cap?
>>>>>>>
>>>>>>
>>>>>> We're completing https://issues.apache.org/jira/browse/KAFKA-7149
>> which
>>>>>> should largely reduce the message size (will update the wiki
>>>> accordingly as
>>>>>> well).
>>>>>>
>>>>>>
>>>>>>>
>>>>>>> Boyang
>>>>>>>
>>>>>>> ________________________________
>>>>>>> From: Guozhang Wang <wangg...@gmail.com>
>>>>>>> Sent: Monday, April 15, 2019 9:20 AM
>>>>>>> To: dev
>>>>>>> Subject: Re: [DISCUSS] KIP-429 : Smooth Auto-Scaling for Kafka
>> Streams
>>>>>>>
>>>>>>> Hello Jason,
>>>>>>>
>>>>>>> I agree with you that for range / round-robin it makes less sense to
>> be
>>>>>>> compatible with cooperative rebalance protocol.
>>>>>>>
>>>>>>> As for StickyAssignor, however, I think it would still be possible to
>>>> make
>>>>>>> the current implementation to be compatible with cooperative
>>>> rebalance. So
>>>>>>> after pondering on different options at hand I'm now proposing this
>>>>>>> approach as listed in the upgrade section:
>>>>>>>
>>>>>>>
>>>>>>>
>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol#KIP-429:KafkaConsumerIncrementalRebalanceProtocol-CompatibilityandUpgradePath
>>>>>>>
>>>>>>> The idea is to let assignors specify which protocols it would work
>>>> with,
>>>>>>> associating with a different name; then the upgrade path would
>> involve
>>>> a
>>>>>>> "compatible" protocol which actually still use eager behavior while
>>>>>>> encoding two assignors if possible. In "Rejected Section" (just to
>>>> clarify
>>>>>>> I'm not finalizing it as rejected, just putting it there for now, and
>>>> if
>>>>>>> we
>>>>>>> like this one instead we can always switch them) I listed the other
>>>>>>> approach we once discussed about, and arguing its cons of duplicated
>>>> class
>>>>>>> seems overwhelm the pros of saving the  "rebalance.protocol" config.
>>>>>>>
>>>>>>> Let me know WDYT.
>>>>>>>
>>>>>>> Guozhang
>>>>>>>
>>>>>>> On Fri, Apr 12, 2019 at 6:08 PM Jason Gustafson <ja...@confluent.io>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Guozhang,
>>>>>>>>
>>>>>>>> Responses below:
>>>>>>>>
>>>>>>>> 2. The interface's default implementation will just be
>>>>>>>>> `onPartitionRevoked`, so for user's instantiation if they do not
>> make
>>>>>>> any
>>>>>>>>> code changes they should be able to recompile the code and
>> continue.
>>>>>>>>
>>>>>>>>
>>>>>>>> Ack, makes sense.
>>>>>>>>
>>>>>>>> 4. Hmm.. not sure if it will work. The main issue is that the
>>>>>>>>> consumer-coordinator behavior (whether to revoke all or none at
>>>>>>>>> onRebalancePrepare) is independent of the selected protocol's
>>>> assignor
>>>>>>>>> (eager or cooperative), so even if the assignor is selected to be
>> the
>>>>>>>>> old-versioned one, we will still not revoke at the
>>>>>>> consumer-coordinator
>>>>>>>>> layer and hence has the same risk of migrating still-owned
>>>> partitions,
>>>>>>>>> right?
>>>>>>>>
>>>>>>>>
>>>>>>>> Yeah, basically we would have to push the eager/cooperative logic
>> into
>>>>>>> the
>>>>>>>> PartitionAssignor itself and make the consumer aware of the
>> rebalance
>>>>>>>> protocol it is compatible with. As long as an eager protocol _could_
>>>> be
>>>>>>>> selected, the consumer would have to be pessimistic and do eager
>>>>>>>> revocation. But if all the assignors configured in the consumer
>>>> support
>>>>>>>> cooperative reassignment, then either 1) a cooperative protocol will
>>>> be
>>>>>>>> selected and cooperative revocation can be safely used, or 2) if the
>>>>>>> rest
>>>>>>>> of the group does not support it, then the consumer will simply
>> fail.
>>>>>>>>
>>>>>>>> Another point which you raised offline and I will repeat here is
>> that
>>>>>>> this
>>>>>>>> proposal's benefit is mostly limited to sticky assignment logic.
>>>>>>> Arguably
>>>>>>>> the range assignor may have some incidental stickiness, particularly
>>>> if
>>>>>>> the
>>>>>>>> group is rebalancing for a newly created or deleted topic. For other
>>>>>>> cases,
>>>>>>>> the proposal is mostly additional overhead since it takes an
>>>> additional
>>>>>>>> rebalance and many of the partitions will move. Perhaps it doesn't
>>>> make
>>>>>>> as
>>>>>>>> much sense to use the cooperative protocol for strategies like range
>>>> and
>>>>>>>> round-robin. That kind of argues in favor of pushing some of the
>>>> control
>>>>>>>> into the assignor itself. Maybe we would not bother creating
>>>>>>>> CooperativeRange as I suggested above, but it would make sense to
>>>>>>> create a
>>>>>>>> cooperative version of the sticky assignment strategy. I thought we
>>>>>>> might
>>>>>>>> have to create a new sticky assignor anyway because I can't see how
>> we
>>>>>>>> would get compatible behavior mixing with the old version anyway.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Jason
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Apr 11, 2019 at 5:53 PM Guozhang Wang <wangg...@gmail.com>
>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hello Matthias:
>>>>>>>>>
>>>>>>>>> Thanks for your review.
>>>>>>>>>
>>>>>>>>> The background section uses streams assignor as well as the
>>>> consumer's
>>>>>>>> own
>>>>>>>>> stick assignor as examples illustrating the situation, but this KIP
>>>> is
>>>>>>>> for
>>>>>>>>> consumer coordinator itself, and the rest of the paragraph did not
>>>>>>> talk
>>>>>>>>> about Streams any more. If you feel it's a bit distracted I can
>>>> remove
>>>>>>>>> those examples.
>>>>>>>>>
>>>>>>>>> 10). While working on the PR I realized that the revoked partitions
>>>> on
>>>>>>>>> assignment is not needed (this is being discussed on the PR itself:
>>>>>>>>> https://github.com/apache/kafka/pull/6528#issuecomment-480009890
>>>>>>>>>
>>>>>>>>> 20). 1.a. Good question, I've updated the wiki to let the
>> consumer's
>>>>>>>>> cleanup assignment and re-join, and not letting assignor making any
>>>>>>>>> proactive changes. The idea is to keep logic simpler and not doing
>>>> any
>>>>>>>>> "split brain" stuff.
>>>>>>>>>
>>>>>>>>> 20). 2.b. No we do not need, since the owned-partitions will be
>> part
>>>>>>> of
>>>>>>>> the
>>>>>>>>> Subscription passed in to assign() already.
>>>>>>>>>
>>>>>>>>> 30). As Boyang mentioned, there are some drawbacks that can not be
>>>>>>>>> addressed by rebalance delay still, hence still voted KIP-345 (some
>>>>>>> more
>>>>>>>>> details can be found on the discussion thread of KIP-345 itself).
>> One
>>>>>>>>> example is that as the instance resumes, its member id will be
>> empty
>>>>>>> so
>>>>>>>> we
>>>>>>>>> are still relying on assignor to give it the assignment from the
>> old
>>>>>>>>> member-id while keeping all other member's assignment unchanged.
>>>>>>>>>
>>>>>>>>> 40). Incomplete sentence, I've updated it.
>>>>>>>>>
>>>>>>>>> 50). Here's my idea: suppose we augment the join group schema with
>>>>>>>>> `protocol version` in 2.3, and then with both brokers and clients
>>>>>>> being
>>>>>>>> in
>>>>>>>>> version 2.3+, on the first rolling bounce where subscription and
>>>>>>>> assignment
>>>>>>>>> schema and / or user metadata has changed, this protocol version
>> will
>>>>>>> be
>>>>>>>>> bumped. On the broker side, when receiving all member's join-group
>>>>>>>> request,
>>>>>>>>> it will choose the one that has the highest protocol version (also
>> it
>>>>>>>>> assumes higher versioned protocol is always backward compatible,
>> i.e.
>>>>>>> the
>>>>>>>>> coordinator can recognize lower versioned protocol as well) and
>>>>>>> select it
>>>>>>>>> as the leader. Then the leader can decide, based on its received
>> and
>>>>>>>>> deserialized subscription information, how to assign partitions and
>>>>>>> how
>>>>>>>> to
>>>>>>>>> encode the assignment accordingly so that everyone can understand
>> it.
>>>>>>>> With
>>>>>>>>> this, in Streams for example, no version probing would be needed
>>>>>>> since we
>>>>>>>>> are guaranteed the leader knows everyone's version -- again it is
>>>>>>>> assuming
>>>>>>>>> that higher versioned protocol is always backward compatible -- and
>>>>>>> hence
>>>>>>>>> can successfully do the assignment at that round.
>>>>>>>>>
>>>>>>>>> 60). My bad, this section was not updated while the design was
>>>>>>> evolved,
>>>>>>>>> I've updated it.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Apr 9, 2019 at 7:22 PM Boyang Chen <bche...@outlook.com>
>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Thanks for the review Matthias! My 2-cent on the rebalance delay
>> is
>>>>>>>> that
>>>>>>>>>> it is a rather fixed trade-off between
>>>>>>>>>>
>>>>>>>>>> task availability and resource shuffling. If we eventually trigger
>>>>>>>>>> rebalance after rolling bounce, certain consumer
>>>>>>>>>>
>>>>>>>>>> setup is still faced with global shuffles, for example member.id
>>>>>>>> ranking
>>>>>>>>>> based round robin strategy, as rejoining dynamic
>>>>>>>>>>
>>>>>>>>>> members will be assigned with new member.id which reorders the
>>>>>>>>>> assignment. So I think the primary goal of incremental
>>>>>>>>>>
>>>>>>>>>> rebalancing is still improving the cluster availability during
>>>>>>>> rebalance,
>>>>>>>>>> because it didn't revoke any partition during this
>>>>>>>>>>
>>>>>>>>>> process. Also, the perk is minimum configuration requirement :)
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>>
>>>>>>>>>> Boyang
>>>>>>>>>>
>>>>>>>>>> ________________________________
>>>>>>>>>> From: Matthias J. Sax <matth...@confluent.io>
>>>>>>>>>> Sent: Tuesday, April 9, 2019 7:47 AM
>>>>>>>>>> To: dev
>>>>>>>>>> Subject: Re: [DISCUSS] KIP-429 : Smooth Auto-Scaling for Kafka
>>>>>>> Streams
>>>>>>>>>>
>>>>>>>>>> Thank for the KIP, Boyang and Guozhang!
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> I made an initial pass and have some questions/comments. One high
>>>>>>> level
>>>>>>>>>> comment: it seems that the KIP "mixes" plain consumer and Kafka
>>>>>>> Streams
>>>>>>>>>> use case a little bit (at least in the presentation). It might be
>>>>>>>>>> helpful to separate both cases clearly, or maybe limit the scope
>> to
>>>>>>>>>> plain consumer only.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> 10) For `PartitionAssignor.Assignment`: It seems we need a new
>>>>>>> method
>>>>>>>>>> `List<TopicPartitions> revokedPartitions()` ?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> 20) In Section "Consumer Coordinator Algorithm"
>>>>>>>>>>
>>>>>>>>>>     Bullet point "1a)": If the subscription changes and a topic is
>>>>>>>>>> removed from the subscription, why do we not revoke the
>> partitions?
>>>>>>>>>>
>>>>>>>>>>     Bullet point "1a)": What happens is a topic is deleted (or a
>>>>>>>>>> partition is removed/deleted from a topic)? Should we call the new
>>>>>>>>>> `onPartitionsEmigrated()` callback for this case?
>>>>>>>>>>
>>>>>>>>>>     Bullet point "2b)" Should we update the `PartitionAssignor`
>>>>>>>>>> interface to pass in the "old assignment" as third parameter into
>>>>>>>>>> `assign()`?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> 30) Rebalance delay (as used in KIP-415): Could a rebalance delay
>>>>>>>>>> subsume KIP-345? Configuring static members is rather complicated,
>>>>>>> and
>>>>>>>> I
>>>>>>>>>> am wondering if a rebalance delay would be sufficient?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> 40) Quote: "otherwise the we would fall into the case 3.b)
>> forever."
>>>>>>>>>>
>>>>>>>>>> What is "case 3.b" ?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> 50) Section "Looking into the Future"
>>>>>>>>>>
>>>>>>>>>> Nit: the new "ProtocolVersion" field is missing in the first line
>>>>>>>>>> describing "JoinGroupRequest"
>>>>>>>>>>
>>>>>>>>>>> This can also help saving "version probing" cost on Streams as
>>>>>>> well.
>>>>>>>>>>
>>>>>>>>>> How does this relate to Kafka Streams "version probing"
>>>>>>> implementation?
>>>>>>>>>> How can we exploit the new `ProtocolVersion` in Streams to improve
>>>>>>>>>> "version probing" ? I have a rough idea, but would like to hear
>> more
>>>>>>>>>> details.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> 60) Section "Recommended Upgrade Procedure"
>>>>>>>>>>
>>>>>>>>>>> Set the `stream.rebalancing.mode` to `upgrading`, which will
>> force
>>>>>>>> the
>>>>>>>>>> stream application to stay with protocol type "consumer".
>>>>>>>>>>
>>>>>>>>>> This config is not discussed in the KIP and appears in this
>> section
>>>>>>>>>> without context. Can you elaborate about it?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> -Matthias
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On 3/29/19 6:20 PM, Guozhang Wang wrote:
>>>>>>>>>>> Bump up on this discussion thread. I've added a few new drawings
>>>>>>> for
>>>>>>>>>> better
>>>>>>>>>>> illustration, would really appreciate your feedbacks.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Guozhang
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Mar 20, 2019 at 6:17 PM Guozhang Wang <
>> wangg...@gmail.com
>>>>>>>>
>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hello Boyang,
>>>>>>>>>>>>
>>>>>>>>>>>> I've made another thorough pass over this KIP and I'd like to
>>>>>>> spilt
>>>>>>>> it
>>>>>>>>>>>> into two parts: the first part, covered in KIP-429 would be
>>>>>>> touching
>>>>>>>>> on
>>>>>>>>>>>> Consumer Coordinator only to have incremental rebalance protocol
>>>>>>> in
>>>>>>>>>> place.
>>>>>>>>>>>> The second part (for now I've reserved KIP number 444 for it)
>>>>>>> would
>>>>>>>>>> contain
>>>>>>>>>>>> all the changes on StreamsPartitionAssginor to allow warming up
>>>>>>> new
>>>>>>>>>>>> members.
>>>>>>>>>>>>
>>>>>>>>>>>> I think the first part, a.k.a. the current updated KIP-429 is
>>>>>>> ready
>>>>>>>>> for
>>>>>>>>>>>> review and discussions again. Would love to hear people's
>>>>>>> feedbacks
>>>>>>>>> and
>>>>>>>>>>>> ideas.
>>>>>>>>>>>>
>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Mar 4, 2019 at 10:09 AM Boyang Chen <
>> bche...@outlook.com
>>>>>>>>
>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks Guozhang for the great questions. Answers are inlined:
>>>>>>>>>>>>>
>>>>>>>>>>>>> 1. I'm still not sure if it's worthwhile to add a new type of
>>>>>>>>> "learner
>>>>>>>>>>>>> task" in addition to "standby task": if the only difference is
>>>>>>> that
>>>>>>>>> for
>>>>>>>>>>>>> the
>>>>>>>>>>>>> latter, we would consider workload balance while for the former
>>>>>>> we
>>>>>>>>>> would
>>>>>>>>>>>>> not, I think we can just adjust the logic of StickyTaskAssignor
>>>>>>> a
>>>>>>>> bit
>>>>>>>>>> to
>>>>>>>>>>>>> break that difference. Adding a new type of task would be
>>>>>>> adding a
>>>>>>>>> lot
>>>>>>>>>> of
>>>>>>>>>>>>> code complexity, so if we can still piggy-back the logic on a
>>>>>>>>>> standby-task
>>>>>>>>>>>>> I would prefer to do so.
>>>>>>>>>>>>> In the proposal we stated that we are not adding a new type of
>>>>>>> task
>>>>>>>>>>>>> implementation. The
>>>>>>>>>>>>> learner task shall share the same implementation with normal
>>>>>>>> standby
>>>>>>>>>>>>> task, only that we
>>>>>>>>>>>>> shall tag the standby task with learner and prioritize the
>>>>>>> learner
>>>>>>>>>> tasks
>>>>>>>>>>>>> replay effort.
>>>>>>>>>>>>> 2. One thing that's still not clear from the KIP wiki itself is
>>>>>>>> which
>>>>>>>>>>>>> layer
>>>>>>>>>>>>> would the logic be implemented at. Although for most KIPs we
>>>>>>> would
>>>>>>>>> not
>>>>>>>>>>>>> require internal implementation details but only public facing
>>>>>>> API
>>>>>>>>>>>>> updates,
>>>>>>>>>>>>> for a KIP like this I think it still requires to flesh out
>>>>>>> details
>>>>>>>> on
>>>>>>>>>> the
>>>>>>>>>>>>> implementation design. More specifically: today Streams embed a
>>>>>>>> full
>>>>>>>>>>>>> fledged Consumer client, which hard-code a ConsumerCoordinator
>>>>>>>>> inside,
>>>>>>>>>>>>> Streams then injects a StreamsPartitionAssignor to its
>> pluggable
>>>>>>>>>>>>> PartitionAssignor interface and inside the
>>>>>>> StreamsPartitionAssignor
>>>>>>>>> we
>>>>>>>>>>>>> also
>>>>>>>>>>>>> have a TaskAssignor interface whose default implementation is
>>>>>>>>>>>>> StickyPartitionAssignor. Streams partition assignor logic today
>>>>>>>> sites
>>>>>>>>>> in
>>>>>>>>>>>>> the latter two classes. Hence the hierarchy today is:
>>>>>>>>>>>>>
>>>>>>>>>>>>> KafkaConsumer -> ConsumerCoordinator ->
>>>>>>> StreamsPartitionAssignor ->
>>>>>>>>>>>>> StickyTaskAssignor.
>>>>>>>>>>>>>
>>>>>>>>>>>>> We need to think about where the proposed implementation would
>>>>>>> take
>>>>>>>>>> place
>>>>>>>>>>>>> at, and personally I think it is not the best option to inject
>>>>>>> all
>>>>>>>> of
>>>>>>>>>> them
>>>>>>>>>>>>> into the StreamsPartitionAssignor / StickyTaskAssignor since
>> the
>>>>>>>>> logic
>>>>>>>>>> of
>>>>>>>>>>>>> "triggering another rebalance" etc would require some
>>>>>>> coordinator
>>>>>>>>> logic
>>>>>>>>>>>>> which is hard to mimic at PartitionAssignor level. On the other
>>>>>>>> hand,
>>>>>>>>>>>>> since
>>>>>>>>>>>>> we are embedding a KafkaConsumer client as a whole we cannot
>>>>>>> just
>>>>>>>>>> replace
>>>>>>>>>>>>> ConsumerCoordinator with a specialized StreamsCoordinator like
>>>>>>>>> Connect
>>>>>>>>>>>>> does
>>>>>>>>>>>>> in KIP-415. So I'd like to maybe split the current proposal in
>>>>>>> both
>>>>>>>>>>>>> consumer layer and streams-assignor layer like we did in
>>>>>>>>>> KIP-98/KIP-129.
>>>>>>>>>>>>> And then the key thing to consider is how to cut off the
>>>>>>> boundary
>>>>>>>> so
>>>>>>>>>> that
>>>>>>>>>>>>> the modifications we push to ConsumerCoordinator would be
>>>>>>>> beneficial
>>>>>>>>>>>>> universally for any consumers, while keep the Streams-specific
>>>>>>>> logic
>>>>>>>>> at
>>>>>>>>>>>>> the
>>>>>>>>>>>>> assignor level.
>>>>>>>>>>>>> Yes, that's also my ideal plan. The details for the
>>>>>>> implementation
>>>>>>>>> are
>>>>>>>>>>>>> depicted
>>>>>>>>>>>>> in this doc<
>>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>
>> https://docs.google.com/document/d/1me2a5wvxAZT1QE6HkwyDl7C2TiBQlKN3Dpw_I1ro91U/edit#heading=h.qix74qdmekae
>>>>>>>>>>> ,
>>>>>>>>>> [
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>
>> https://lh5.googleusercontent.com/DXWMyKNE9rFFIv7TNX56Q41QwqYp8ynivwWSJHHORqSRkoQxtraW2bqiB-NRUGAMYKkt8A=w1200-h630-p
>>>>>>>>>> ]<
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>
>> https://docs.google.com/document/d/1me2a5wvxAZT1QE6HkwyDl7C2TiBQlKN3Dpw_I1ro91U/edit#heading=h.qix74qdmekae
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> [External] KStream Smooth Auto-scaling Implementation Plan<
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>
>> https://docs.google.com/document/d/1me2a5wvxAZT1QE6HkwyDl7C2TiBQlKN3Dpw_I1ro91U/edit#heading=h.qix74qdmekae
>>>>>>>>>>>
>>>>>>>>>> docs.google.com
>>>>>>>>>> KStream Incremental Rebalancing Implementation Plan Authors:
>> Boyang
>>>>>>>> Chen,
>>>>>>>>>> Guozhang Wang KIP link Stage: [Draft | Review | Approved]
>>>>>>> Background We
>>>>>>>>>> initiated KIP-429 for the promotion of incremental rebalancing
>> work
>>>>>>> for
>>>>>>>>>> KStream. Behind the scene, there is non-trivial amount of effort
>>>>>>> that
>>>>>>>>> needs
>>>>>>>>>> to...
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>>>> and I have explained the reasoning on why we want to push a
>>>>>>>>>>>>> global change of replacing ConsumerCoordinator with
>>>>>>>>> StreamCoordinator.
>>>>>>>>>>>>> The motivation
>>>>>>>>>>>>> is that KIP space is usually used for public & algorithm level
>>>>>>>>> change,
>>>>>>>>>>>>> not for internal
>>>>>>>>>>>>> implementation details.
>>>>>>>>>>>>>
>>>>>>>>>>>>> 3. Depending on which design direction we choose, our migration
>>>>>>>> plan
>>>>>>>>>> would
>>>>>>>>>>>>> also be quite different. For example, if we stay with
>>>>>>>>>> ConsumerCoordinator
>>>>>>>>>>>>> whose protocol type is "consumer" still, and we can manage to
>>>>>>> make
>>>>>>>>> all
>>>>>>>>>>>>> changes agnostic to brokers as well as to old versioned
>>>>>>> consumers,
>>>>>>>>> then
>>>>>>>>>>>>> our
>>>>>>>>>>>>> migration plan could be much easier.
>>>>>>>>>>>>> Yes, the upgrade plan was designed to take the new
>>>>>>>> StreamCoordinator
>>>>>>>>>>>>> approach
>>>>>>>>>>>>> which means we shall define a new protocol type. For existing
>>>>>>>>>> application
>>>>>>>>>>>>> we could only
>>>>>>>>>>>>> maintain the same `consumer` protocol type is because current
>>>>>>>> broker
>>>>>>>>>> only
>>>>>>>>>>>>> allows
>>>>>>>>>>>>> change of protocol type when the consumer group is empty. It is
>>>>>>> of
>>>>>>>>>> course
>>>>>>>>>>>>> user-unfriendly to force
>>>>>>>>>>>>> a wipe-out for the entire application, and I don't think
>>>>>>>> maintaining
>>>>>>>>>> old
>>>>>>>>>>>>> protocol type would greatly
>>>>>>>>>>>>> impact ongoing services using new stream coordinator. WDYT?
>>>>>>>>>>>>>
>>>>>>>>>>>>> 4. I think one major issue related to this KIP is that today,
>> in
>>>>>>>> the
>>>>>>>>>>>>> StickyPartitionAssignor, we always try to honor stickiness over
>>>>>>>>>> workload
>>>>>>>>>>>>> balance, and hence "learner task" is needed to break this
>>>>>>> priority,
>>>>>>>>> but
>>>>>>>>>>>>> I'm
>>>>>>>>>>>>> wondering if we can have a better solution within sticky task
>>>>>>>>> assignor
>>>>>>>>>>>>> that
>>>>>>>>>>>>> accommodate this?
>>>>>>>>>>>>> Great question! That's what I explained in the proposal, which
>>>>>>> is
>>>>>>>>> that
>>>>>>>>>> we
>>>>>>>>>>>>> should breakdown our
>>>>>>>>>>>>> delivery into different stages. At very beginning, our goal is
>>>>>>> to
>>>>>>>>>> trigger
>>>>>>>>>>>>> learner task assignment only on
>>>>>>>>>>>>> `new` hosts, where we shall leverage leader's knowledge of
>>>>>>> previous
>>>>>>>>>> round
>>>>>>>>>>>>> of rebalance to figure out. After
>>>>>>>>>>>>> stage one, our goal is to have a smooth scaling up experience,
>>>>>>> but
>>>>>>>>> the
>>>>>>>>>>>>> task balance problem is kind of orthogonal.
>>>>>>>>>>>>> The load balance problem is a much broader topic than auto
>>>>>>> scaling,
>>>>>>>>>> which
>>>>>>>>>>>>> I figure worth discussing within
>>>>>>>>>>>>> this KIP's context since it's a naturally next-step, but
>>>>>>> wouldn't
>>>>>>>> be
>>>>>>>>>> the
>>>>>>>>>>>>> main topic.
>>>>>>>>>>>>> Learner task or auto scaling support should be treated as `a
>>>>>>>> helpful
>>>>>>>>>>>>> mechanism to reach load balance`, but not `an algorithm
>> defining
>>>>>>>> load
>>>>>>>>>>>>> balance`. It would be great if you could share some insights of
>>>>>>> the
>>>>>>>>>> stream
>>>>>>>>>>>>> task balance, which eventually helps us to break out of the
>>>>>>>> KIP-429's
>>>>>>>>>> scope
>>>>>>>>>>>>> and even define a separate KIP to focus on task weight &
>>>>>>> assignment
>>>>>>>>>> logic
>>>>>>>>>>>>> improvement.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Also thank you for making improvement on the KIP context and
>>>>>>>>>> organization!
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best,
>>>>>>>>>>>>> Boyang
>>>>>>>>>>>>> ________________________________
>>>>>>>>>>>>> From: Guozhang Wang <wangg...@gmail.com>
>>>>>>>>>>>>> Sent: Saturday, March 2, 2019 6:00 AM
>>>>>>>>>>>>> To: dev
>>>>>>>>>>>>> Subject: Re: [DISCUSS] KIP-429 : Smooth Auto-Scaling for Kafka
>>>>>>>>> Streams
>>>>>>>>>>>>>
>>>>>>>>>>>>> Hello Boyang,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I've just made a quick pass on the KIP and here are some
>>>>>>> thoughts.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Meta:
>>>>>>>>>>>>>
>>>>>>>>>>>>> 1. I'm still not sure if it's worthwhile to add a new type of
>>>>>>>>> "learner
>>>>>>>>>>>>> task" in addition to "standby task": if the only difference is
>>>>>>> that
>>>>>>>>> for
>>>>>>>>>>>>> the
>>>>>>>>>>>>> latter, we would consider workload balance while for the former
>>>>>>> we
>>>>>>>>>> would
>>>>>>>>>>>>> not, I think we can just adjust the logic of StickyTaskAssignor
>>>>>>> a
>>>>>>>> bit
>>>>>>>>>> to
>>>>>>>>>>>>> break that difference. Adding a new type of task would be
>>>>>>> adding a
>>>>>>>>> lot
>>>>>>>>>> of
>>>>>>>>>>>>> code complexity, so if we can still piggy-back the logic on a
>>>>>>>>>> standby-task
>>>>>>>>>>>>> I would prefer to do so.
>>>>>>>>>>>>>
>>>>>>>>>>>>> 2. One thing that's still not clear from the KIP wiki itself is
>>>>>>>> which
>>>>>>>>>>>>> layer
>>>>>>>>>>>>> would the logic be implemented at. Although for most KIPs we
>>>>>>> would
>>>>>>>>> not
>>>>>>>>>>>>> require internal implementation details but only public facing
>>>>>>> API
>>>>>>>>>>>>> updates,
>>>>>>>>>>>>> for a KIP like this I think it still requires to flesh out
>>>>>>> details
>>>>>>>> on
>>>>>>>>>> the
>>>>>>>>>>>>> implementation design. More specifically: today Streams embed a
>>>>>>>> full
>>>>>>>>>>>>> fledged Consumer client, which hard-code a ConsumerCoordinator
>>>>>>>>> inside,
>>>>>>>>>>>>> Streams then injects a StreamsPartitionAssignor to its plugable
>>>>>>>>>>>>> PartitionAssignor interface and inside the
>>>>>>> StreamsPartitionAssignor
>>>>>>>>> we
>>>>>>>>>>>>> also
>>>>>>>>>>>>> have a TaskAssignor interface whose default implementation is
>>>>>>>>>>>>> StickyPartitionAssignor. Streams partition assignor logic today
>>>>>>>> sites
>>>>>>>>>> in
>>>>>>>>>>>>> the latter two classes. Hence the hierarchy today is:
>>>>>>>>>>>>>
>>>>>>>>>>>>> KafkaConsumer -> ConsumerCoordinator ->
>>>>>>> StreamsPartitionAssignor ->
>>>>>>>>>>>>> StickyTaskAssignor.
>>>>>>>>>>>>>
>>>>>>>>>>>>> We need to think about where the proposed implementation would
>>>>>>> take
>>>>>>>>>> place
>>>>>>>>>>>>> at, and personally I think it is not the best option to inject
>>>>>>> all
>>>>>>>> of
>>>>>>>>>> them
>>>>>>>>>>>>> into the StreamsPartitionAssignor / StickyTaskAssignor since
>> the
>>>>>>>>> logic
>>>>>>>>>> of
>>>>>>>>>>>>> "triggering another rebalance" etc would require some
>>>>>>> coordinator
>>>>>>>>> logic
>>>>>>>>>>>>> which is hard to mimic at PartitionAssignor level. On the other
>>>>>>>> hand,
>>>>>>>>>>>>> since
>>>>>>>>>>>>> we are embedding a KafkaConsumer client as a whole we cannot
>>>>>>> just
>>>>>>>>>> replace
>>>>>>>>>>>>> ConsumerCoordinator with a specialized StreamsCoordinator like
>>>>>>>>> Connect
>>>>>>>>>>>>> does
>>>>>>>>>>>>> in KIP-415. So I'd like to maybe split the current proposal in
>>>>>>> both
>>>>>>>>>>>>> consumer layer and streams-assignor layer like we did in
>>>>>>>>>> KIP-98/KIP-129.
>>>>>>>>>>>>> And then the key thing to consider is how to cut off the
>>>>>>> boundary
>>>>>>>> so
>>>>>>>>>> that
>>>>>>>>>>>>> the modifications we push to ConsumerCoordinator would be
>>>>>>>> beneficial
>>>>>>>>>>>>> universally for any consumers, while keep the Streams-specific
>>>>>>>> logic
>>>>>>>>> at
>>>>>>>>>>>>> the
>>>>>>>>>>>>> assignor level.
>>>>>>>>>>>>>
>>>>>>>>>>>>> 3. Depending on which design direction we choose, our migration
>>>>>>>> plan
>>>>>>>>>> would
>>>>>>>>>>>>> also be quite different. For example, if we stay with
>>>>>>>>>> ConsumerCoordinator
>>>>>>>>>>>>> whose protocol type is "consumer" still, and we can manage to
>>>>>>> make
>>>>>>>>> all
>>>>>>>>>>>>> changes agnostic to brokers as well as to old versioned
>>>>>>> consumers,
>>>>>>>>> then
>>>>>>>>>>>>> our
>>>>>>>>>>>>> migration plan could be much easier.
>>>>>>>>>>>>>
>>>>>>>>>>>>> 4. I think one major issue related to this KIP is that today,
>> in
>>>>>>>> the
>>>>>>>>>>>>> StickyPartitionAssignor, we always try to honor stickiness over
>>>>>>>>>> workload
>>>>>>>>>>>>> balance, and hence "learner task" is needed to break this
>>>>>>> priority,
>>>>>>>>> but
>>>>>>>>>>>>> I'm
>>>>>>>>>>>>> wondering if we can have a better solution within sticky task
>>>>>>>>> assignor
>>>>>>>>>>>>> that
>>>>>>>>>>>>> accommodate this?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Minor:
>>>>>>>>>>>>>
>>>>>>>>>>>>> 1. The idea of two rebalances have also been discussed in
>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-6145. So we should
>>>>>>> add
>>>>>>>>> the
>>>>>>>>>>>>> reference on the wiki page as well.
>>>>>>>>>>>>> 2. Could you also add a section describing how the subscription
>>>>>>> /
>>>>>>>>>>>>> assignment metadata will be re-formatted? Without this
>>>>>>> information
>>>>>>>> it
>>>>>>>>>> is
>>>>>>>>>>>>> hard to get to the bottom of your idea. For example in the
>>>>>>> "Leader
>>>>>>>>>>>>> Transfer
>>>>>>>>>>>>> Before Scaling" section, I'm not sure why "S2 doesn't know S4
>> is
>>>>>>>> new
>>>>>>>>>>>>> member"
>>>>>>>>>>>>> and hence would blindly obey stickiness over workload balance
>>>>>>>>>> requirement.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Feb 28, 2019 at 11:05 AM Boyang Chen <
>>>>>>> bche...@outlook.com>
>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hey community friends,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I'm gladly inviting you to have a look at the proposal to add
>>>>>>>>>>>>> incremental
>>>>>>>>>>>>>> rebalancing to Kafka Streams, A.K.A auto-scaling support.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Smooth+Auto-Scaling+for+Kafka+Streams
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Special thanks to Guozhang for giving great guidances and
>>>>>>>> important
>>>>>>>>>>>>>> feedbacks while making this KIP!
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>> Boyang
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> --
>>>>>>>>>>>>> -- Guozhang
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> -- Guozhang
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> -- Guozhang
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> -- Guozhang
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> -- Guozhang
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to