Hello Matthias,

I've thought about that before, and the reason I did not include this as
part of the KIP-429 scope is that fetcher / coordinator may get quite
complicated to return non-empty data if "updateAssignmentMetadataIfNeeded"
returns false in KafkaConsumer. In addition, when there's a rebalance in
progress, letting consumers to process data which potentially may take
longer time (in Streams for example, it is related to `max.poll.interval`
config) could lead to higher chance of "partitions lost" and wasted
processing work.

So I've decided to still keep it as simple as is today, and admittedly from
a user perspective, they may see consecutive "poll" call returning no data.
I will create a JIRA ticket capturing this idea for future discussion
whether we should consider this as a general optimization in consumer. Does
that sound good to you?


Guozhang


On Wed, May 22, 2019 at 4:31 AM Matthias J. Sax <matth...@confluent.io>
wrote:

> Thanks to the KIP and starting the VOTE Guozhang. I am overall +1.
>
> One follow up thought: the KIP does not discuss in details, how `poll()`
> will behave after the change. It might actually be important to ensure
> that `poll()` behavior changes to be non-blocking to allow an
> application to process data from non-revoked partitions while a
> rebalance is happening in the background.
>
> Thoughts?
>
>
> -Matthias
>
>
> On 5/10/19 1:10 AM, Guozhang Wang wrote:
> > Hello Matthias,
> >
> > I'm proposing to change this behavior holistically inside
> > ConsumerCoordinator actually. In other words I'm trying to piggy-back
> this
> > behavioral fix of KAFKA-4600 along with this KIP, and the motivation for
> me
> > to do this piggy-backing is that, with incremental rebalancing, there
> would
> > be partial affected partitions as we are not revoking every body any
> more.
> >
> >
> > Guozhang
> >
> >
> > On Thu, May 9, 2019 at 6:21 AM Matthias J. Sax <matth...@confluent.io>
> > wrote:
> >
> >> Thanks Guozhang!
> >>
> >> The simplified upgrade path is great!
> >>
> >>
> >> Just a clarification question about the "Rebalance Callback Error
> >> Handling" -- does this change affect the `ConsumerCoordinator` only if
> >> incremental rebalancing is use? Or does the behavior also change for the
> >> eager rebalancing case?
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 5/9/19 3:37 AM, Guozhang Wang wrote:
> >>> Hello all,
> >>>
> >>> Thanks for everyone who've shared their feedbacks for this KIP! If
> >> there's
> >>> no further comments I'll start the voting thread by end of tomorrow.
> >>>
> >>>
> >>> Guozhang.
> >>>
> >>> On Wed, May 8, 2019 at 6:36 PM Guozhang Wang <wangg...@gmail.com>
> wrote:
> >>>
> >>>> Hello Boyang,
> >>>>
> >>>> On Wed, May 1, 2019 at 4:51 PM Boyang Chen <bche...@outlook.com>
> wrote:
> >>>>
> >>>>> Hey Guozhang,
> >>>>>
> >>>>> thank you for the great write up. Overall the motivation and changes
> >>>>> LGTM, just some minor comments:
> >>>>>
> >>>>>
> >>>>>   1.  In "Consumer Coordinator Algorithm", we could reorder alphabet
> >>>>> points for 3d~3f from ["ready-to-migrate-partitions",
> >>>>> "unknown-but-owned-partitions",  "maybe-revoking-partitions"] to
> >>>>> ["maybe-revoking-partitions", "ready-to-migrate-partitions",
> >>>>> "unknown-but-owned-partitions"] in order to be consistent with 3c1~3.
> >>>>>
> >>>>
> >>>> Ack. Updated.
> >>>>
> >>>>
> >>>>>   2.  In "Consumer Coordinator Algorithm", 1c suggests to revoke all
> >>>>> partition upon heartbeat/commit fail. What's the gain here? Do we
> want
> >> to
> >>>>> keep all partitions running at this moment, to be optimistic for the
> >> case
> >>>>> when no partitions get reassigned?
> >>>>>
> >>>>
> >>>> That's a good catch. When REBALANCE_IN_PROGRESS is received, we can
> just
> >>>> re-join the group with all the currently owned partitions encoded.
> >> Updated.
> >>>>
> >>>>
> >>>>>   3.  In "Recommended Upgrade Procedure", remove extra 'those': " The
> >>>>> 'sticky' assignor works even those there are "
> >>>>>
> >>>>
> >>>> Ack, should be `even when`.
> >>>>
> >>>>
> >>>>>   4.  Put two "looking into the future" into a separate category from
> >>>>> migration session. It seems inconsistent for readers to see this
> >> before we
> >>>>> finished discussion for everything.
> >>>>>
> >>>>
> >>>> Ack.
> >>>>
> >>>>
> >>>>>   5.  Have we discussed the concern on the serialization? Could the
> new
> >>>>> metadata we are adding grow larger than the message size cap?
> >>>>>
> >>>>
> >>>> We're completing https://issues.apache.org/jira/browse/KAFKA-7149
> which
> >>>> should largely reduce the message size (will update the wiki
> >> accordingly as
> >>>> well).
> >>>>
> >>>>
> >>>>>
> >>>>> Boyang
> >>>>>
> >>>>> ________________________________
> >>>>> From: Guozhang Wang <wangg...@gmail.com>
> >>>>> Sent: Monday, April 15, 2019 9:20 AM
> >>>>> To: dev
> >>>>> Subject: Re: [DISCUSS] KIP-429 : Smooth Auto-Scaling for Kafka
> Streams
> >>>>>
> >>>>> Hello Jason,
> >>>>>
> >>>>> I agree with you that for range / round-robin it makes less sense to
> be
> >>>>> compatible with cooperative rebalance protocol.
> >>>>>
> >>>>> As for StickyAssignor, however, I think it would still be possible to
> >> make
> >>>>> the current implementation to be compatible with cooperative
> >> rebalance. So
> >>>>> after pondering on different options at hand I'm now proposing this
> >>>>> approach as listed in the upgrade section:
> >>>>>
> >>>>>
> >>>>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol#KIP-429:KafkaConsumerIncrementalRebalanceProtocol-CompatibilityandUpgradePath
> >>>>>
> >>>>> The idea is to let assignors specify which protocols it would work
> >> with,
> >>>>> associating with a different name; then the upgrade path would
> involve
> >> a
> >>>>> "compatible" protocol which actually still use eager behavior while
> >>>>> encoding two assignors if possible. In "Rejected Section" (just to
> >> clarify
> >>>>> I'm not finalizing it as rejected, just putting it there for now, and
> >> if
> >>>>> we
> >>>>> like this one instead we can always switch them) I listed the other
> >>>>> approach we once discussed about, and arguing its cons of duplicated
> >> class
> >>>>> seems overwhelm the pros of saving the  "rebalance.protocol" config.
> >>>>>
> >>>>> Let me know WDYT.
> >>>>>
> >>>>> Guozhang
> >>>>>
> >>>>> On Fri, Apr 12, 2019 at 6:08 PM Jason Gustafson <ja...@confluent.io>
> >>>>> wrote:
> >>>>>
> >>>>>> Hi Guozhang,
> >>>>>>
> >>>>>> Responses below:
> >>>>>>
> >>>>>> 2. The interface's default implementation will just be
> >>>>>>> `onPartitionRevoked`, so for user's instantiation if they do not
> make
> >>>>> any
> >>>>>>> code changes they should be able to recompile the code and
> continue.
> >>>>>>
> >>>>>>
> >>>>>> Ack, makes sense.
> >>>>>>
> >>>>>> 4. Hmm.. not sure if it will work. The main issue is that the
> >>>>>>> consumer-coordinator behavior (whether to revoke all or none at
> >>>>>>> onRebalancePrepare) is independent of the selected protocol's
> >> assignor
> >>>>>>> (eager or cooperative), so even if the assignor is selected to be
> the
> >>>>>>> old-versioned one, we will still not revoke at the
> >>>>> consumer-coordinator
> >>>>>>> layer and hence has the same risk of migrating still-owned
> >> partitions,
> >>>>>>> right?
> >>>>>>
> >>>>>>
> >>>>>> Yeah, basically we would have to push the eager/cooperative logic
> into
> >>>>> the
> >>>>>> PartitionAssignor itself and make the consumer aware of the
> rebalance
> >>>>>> protocol it is compatible with. As long as an eager protocol _could_
> >> be
> >>>>>> selected, the consumer would have to be pessimistic and do eager
> >>>>>> revocation. But if all the assignors configured in the consumer
> >> support
> >>>>>> cooperative reassignment, then either 1) a cooperative protocol will
> >> be
> >>>>>> selected and cooperative revocation can be safely used, or 2) if the
> >>>>> rest
> >>>>>> of the group does not support it, then the consumer will simply
> fail.
> >>>>>>
> >>>>>> Another point which you raised offline and I will repeat here is
> that
> >>>>> this
> >>>>>> proposal's benefit is mostly limited to sticky assignment logic.
> >>>>> Arguably
> >>>>>> the range assignor may have some incidental stickiness, particularly
> >> if
> >>>>> the
> >>>>>> group is rebalancing for a newly created or deleted topic. For other
> >>>>> cases,
> >>>>>> the proposal is mostly additional overhead since it takes an
> >> additional
> >>>>>> rebalance and many of the partitions will move. Perhaps it doesn't
> >> make
> >>>>> as
> >>>>>> much sense to use the cooperative protocol for strategies like range
> >> and
> >>>>>> round-robin. That kind of argues in favor of pushing some of the
> >> control
> >>>>>> into the assignor itself. Maybe we would not bother creating
> >>>>>> CooperativeRange as I suggested above, but it would make sense to
> >>>>> create a
> >>>>>> cooperative version of the sticky assignment strategy. I thought we
> >>>>> might
> >>>>>> have to create a new sticky assignor anyway because I can't see how
> we
> >>>>>> would get compatible behavior mixing with the old version anyway.
> >>>>>>
> >>>>>> Thanks,
> >>>>>> Jason
> >>>>>>
> >>>>>>
> >>>>>> On Thu, Apr 11, 2019 at 5:53 PM Guozhang Wang <wangg...@gmail.com>
> >>>>> wrote:
> >>>>>>
> >>>>>>> Hello Matthias:
> >>>>>>>
> >>>>>>> Thanks for your review.
> >>>>>>>
> >>>>>>> The background section uses streams assignor as well as the
> >> consumer's
> >>>>>> own
> >>>>>>> stick assignor as examples illustrating the situation, but this KIP
> >> is
> >>>>>> for
> >>>>>>> consumer coordinator itself, and the rest of the paragraph did not
> >>>>> talk
> >>>>>>> about Streams any more. If you feel it's a bit distracted I can
> >> remove
> >>>>>>> those examples.
> >>>>>>>
> >>>>>>> 10). While working on the PR I realized that the revoked partitions
> >> on
> >>>>>>> assignment is not needed (this is being discussed on the PR itself:
> >>>>>>> https://github.com/apache/kafka/pull/6528#issuecomment-480009890
> >>>>>>>
> >>>>>>> 20). 1.a. Good question, I've updated the wiki to let the
> consumer's
> >>>>>>> cleanup assignment and re-join, and not letting assignor making any
> >>>>>>> proactive changes. The idea is to keep logic simpler and not doing
> >> any
> >>>>>>> "split brain" stuff.
> >>>>>>>
> >>>>>>> 20). 2.b. No we do not need, since the owned-partitions will be
> part
> >>>>> of
> >>>>>> the
> >>>>>>> Subscription passed in to assign() already.
> >>>>>>>
> >>>>>>> 30). As Boyang mentioned, there are some drawbacks that can not be
> >>>>>>> addressed by rebalance delay still, hence still voted KIP-345 (some
> >>>>> more
> >>>>>>> details can be found on the discussion thread of KIP-345 itself).
> One
> >>>>>>> example is that as the instance resumes, its member id will be
> empty
> >>>>> so
> >>>>>> we
> >>>>>>> are still relying on assignor to give it the assignment from the
> old
> >>>>>>> member-id while keeping all other member's assignment unchanged.
> >>>>>>>
> >>>>>>> 40). Incomplete sentence, I've updated it.
> >>>>>>>
> >>>>>>> 50). Here's my idea: suppose we augment the join group schema with
> >>>>>>> `protocol version` in 2.3, and then with both brokers and clients
> >>>>> being
> >>>>>> in
> >>>>>>> version 2.3+, on the first rolling bounce where subscription and
> >>>>>> assignment
> >>>>>>> schema and / or user metadata has changed, this protocol version
> will
> >>>>> be
> >>>>>>> bumped. On the broker side, when receiving all member's join-group
> >>>>>> request,
> >>>>>>> it will choose the one that has the highest protocol version (also
> it
> >>>>>>> assumes higher versioned protocol is always backward compatible,
> i.e.
> >>>>> the
> >>>>>>> coordinator can recognize lower versioned protocol as well) and
> >>>>> select it
> >>>>>>> as the leader. Then the leader can decide, based on its received
> and
> >>>>>>> deserialized subscription information, how to assign partitions and
> >>>>> how
> >>>>>> to
> >>>>>>> encode the assignment accordingly so that everyone can understand
> it.
> >>>>>> With
> >>>>>>> this, in Streams for example, no version probing would be needed
> >>>>> since we
> >>>>>>> are guaranteed the leader knows everyone's version -- again it is
> >>>>>> assuming
> >>>>>>> that higher versioned protocol is always backward compatible -- and
> >>>>> hence
> >>>>>>> can successfully do the assignment at that round.
> >>>>>>>
> >>>>>>> 60). My bad, this section was not updated while the design was
> >>>>> evolved,
> >>>>>>> I've updated it.
> >>>>>>>
> >>>>>>>
> >>>>>>> On Tue, Apr 9, 2019 at 7:22 PM Boyang Chen <bche...@outlook.com>
> >>>>> wrote:
> >>>>>>>
> >>>>>>>>
> >>>>>>>> Thanks for the review Matthias! My 2-cent on the rebalance delay
> is
> >>>>>> that
> >>>>>>>> it is a rather fixed trade-off between
> >>>>>>>>
> >>>>>>>> task availability and resource shuffling. If we eventually trigger
> >>>>>>>> rebalance after rolling bounce, certain consumer
> >>>>>>>>
> >>>>>>>> setup is still faced with global shuffles, for example member.id
> >>>>>> ranking
> >>>>>>>> based round robin strategy, as rejoining dynamic
> >>>>>>>>
> >>>>>>>> members will be assigned with new member.id which reorders the
> >>>>>>>> assignment. So I think the primary goal of incremental
> >>>>>>>>
> >>>>>>>> rebalancing is still improving the cluster availability during
> >>>>>> rebalance,
> >>>>>>>> because it didn't revoke any partition during this
> >>>>>>>>
> >>>>>>>> process. Also, the perk is minimum configuration requirement :)
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Best,
> >>>>>>>>
> >>>>>>>> Boyang
> >>>>>>>>
> >>>>>>>> ________________________________
> >>>>>>>> From: Matthias J. Sax <matth...@confluent.io>
> >>>>>>>> Sent: Tuesday, April 9, 2019 7:47 AM
> >>>>>>>> To: dev
> >>>>>>>> Subject: Re: [DISCUSS] KIP-429 : Smooth Auto-Scaling for Kafka
> >>>>> Streams
> >>>>>>>>
> >>>>>>>> Thank for the KIP, Boyang and Guozhang!
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> I made an initial pass and have some questions/comments. One high
> >>>>> level
> >>>>>>>> comment: it seems that the KIP "mixes" plain consumer and Kafka
> >>>>> Streams
> >>>>>>>> use case a little bit (at least in the presentation). It might be
> >>>>>>>> helpful to separate both cases clearly, or maybe limit the scope
> to
> >>>>>>>> plain consumer only.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> 10) For `PartitionAssignor.Assignment`: It seems we need a new
> >>>>> method
> >>>>>>>> `List<TopicPartitions> revokedPartitions()` ?
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> 20) In Section "Consumer Coordinator Algorithm"
> >>>>>>>>
> >>>>>>>>     Bullet point "1a)": If the subscription changes and a topic is
> >>>>>>>> removed from the subscription, why do we not revoke the
> partitions?
> >>>>>>>>
> >>>>>>>>     Bullet point "1a)": What happens is a topic is deleted (or a
> >>>>>>>> partition is removed/deleted from a topic)? Should we call the new
> >>>>>>>> `onPartitionsEmigrated()` callback for this case?
> >>>>>>>>
> >>>>>>>>     Bullet point "2b)" Should we update the `PartitionAssignor`
> >>>>>>>> interface to pass in the "old assignment" as third parameter into
> >>>>>>>> `assign()`?
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> 30) Rebalance delay (as used in KIP-415): Could a rebalance delay
> >>>>>>>> subsume KIP-345? Configuring static members is rather complicated,
> >>>>> and
> >>>>>> I
> >>>>>>>> am wondering if a rebalance delay would be sufficient?
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> 40) Quote: "otherwise the we would fall into the case 3.b)
> forever."
> >>>>>>>>
> >>>>>>>> What is "case 3.b" ?
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> 50) Section "Looking into the Future"
> >>>>>>>>
> >>>>>>>> Nit: the new "ProtocolVersion" field is missing in the first line
> >>>>>>>> describing "JoinGroupRequest"
> >>>>>>>>
> >>>>>>>>> This can also help saving "version probing" cost on Streams as
> >>>>> well.
> >>>>>>>>
> >>>>>>>> How does this relate to Kafka Streams "version probing"
> >>>>> implementation?
> >>>>>>>> How can we exploit the new `ProtocolVersion` in Streams to improve
> >>>>>>>> "version probing" ? I have a rough idea, but would like to hear
> more
> >>>>>>>> details.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> 60) Section "Recommended Upgrade Procedure"
> >>>>>>>>
> >>>>>>>>> Set the `stream.rebalancing.mode` to `upgrading`, which will
> force
> >>>>>> the
> >>>>>>>> stream application to stay with protocol type "consumer".
> >>>>>>>>
> >>>>>>>> This config is not discussed in the KIP and appears in this
> section
> >>>>>>>> without context. Can you elaborate about it?
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> -Matthias
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On 3/29/19 6:20 PM, Guozhang Wang wrote:
> >>>>>>>>> Bump up on this discussion thread. I've added a few new drawings
> >>>>> for
> >>>>>>>> better
> >>>>>>>>> illustration, would really appreciate your feedbacks.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Guozhang
> >>>>>>>>>
> >>>>>>>>> On Wed, Mar 20, 2019 at 6:17 PM Guozhang Wang <
> wangg...@gmail.com
> >>>>>>
> >>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hello Boyang,
> >>>>>>>>>>
> >>>>>>>>>> I've made another thorough pass over this KIP and I'd like to
> >>>>> spilt
> >>>>>> it
> >>>>>>>>>> into two parts: the first part, covered in KIP-429 would be
> >>>>> touching
> >>>>>>> on
> >>>>>>>>>> Consumer Coordinator only to have incremental rebalance protocol
> >>>>> in
> >>>>>>>> place.
> >>>>>>>>>> The second part (for now I've reserved KIP number 444 for it)
> >>>>> would
> >>>>>>>> contain
> >>>>>>>>>> all the changes on StreamsPartitionAssginor to allow warming up
> >>>>> new
> >>>>>>>>>> members.
> >>>>>>>>>>
> >>>>>>>>>> I think the first part, a.k.a. the current updated KIP-429 is
> >>>>> ready
> >>>>>>> for
> >>>>>>>>>> review and discussions again. Would love to hear people's
> >>>>> feedbacks
> >>>>>>> and
> >>>>>>>>>> ideas.
> >>>>>>>>>>
> >>>>>>>>>> Guozhang
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On Mon, Mar 4, 2019 at 10:09 AM Boyang Chen <
> bche...@outlook.com
> >>>>>>
> >>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Thanks Guozhang for the great questions. Answers are inlined:
> >>>>>>>>>>>
> >>>>>>>>>>> 1. I'm still not sure if it's worthwhile to add a new type of
> >>>>>>> "learner
> >>>>>>>>>>> task" in addition to "standby task": if the only difference is
> >>>>> that
> >>>>>>> for
> >>>>>>>>>>> the
> >>>>>>>>>>> latter, we would consider workload balance while for the former
> >>>>> we
> >>>>>>>> would
> >>>>>>>>>>> not, I think we can just adjust the logic of StickyTaskAssignor
> >>>>> a
> >>>>>> bit
> >>>>>>>> to
> >>>>>>>>>>> break that difference. Adding a new type of task would be
> >>>>> adding a
> >>>>>>> lot
> >>>>>>>> of
> >>>>>>>>>>> code complexity, so if we can still piggy-back the logic on a
> >>>>>>>> standby-task
> >>>>>>>>>>> I would prefer to do so.
> >>>>>>>>>>> In the proposal we stated that we are not adding a new type of
> >>>>> task
> >>>>>>>>>>> implementation. The
> >>>>>>>>>>> learner task shall share the same implementation with normal
> >>>>>> standby
> >>>>>>>>>>> task, only that we
> >>>>>>>>>>> shall tag the standby task with learner and prioritize the
> >>>>> learner
> >>>>>>>> tasks
> >>>>>>>>>>> replay effort.
> >>>>>>>>>>> 2. One thing that's still not clear from the KIP wiki itself is
> >>>>>> which
> >>>>>>>>>>> layer
> >>>>>>>>>>> would the logic be implemented at. Although for most KIPs we
> >>>>> would
> >>>>>>> not
> >>>>>>>>>>> require internal implementation details but only public facing
> >>>>> API
> >>>>>>>>>>> updates,
> >>>>>>>>>>> for a KIP like this I think it still requires to flesh out
> >>>>> details
> >>>>>> on
> >>>>>>>> the
> >>>>>>>>>>> implementation design. More specifically: today Streams embed a
> >>>>>> full
> >>>>>>>>>>> fledged Consumer client, which hard-code a ConsumerCoordinator
> >>>>>>> inside,
> >>>>>>>>>>> Streams then injects a StreamsPartitionAssignor to its
> pluggable
> >>>>>>>>>>> PartitionAssignor interface and inside the
> >>>>> StreamsPartitionAssignor
> >>>>>>> we
> >>>>>>>>>>> also
> >>>>>>>>>>> have a TaskAssignor interface whose default implementation is
> >>>>>>>>>>> StickyPartitionAssignor. Streams partition assignor logic today
> >>>>>> sites
> >>>>>>>> in
> >>>>>>>>>>> the latter two classes. Hence the hierarchy today is:
> >>>>>>>>>>>
> >>>>>>>>>>> KafkaConsumer -> ConsumerCoordinator ->
> >>>>> StreamsPartitionAssignor ->
> >>>>>>>>>>> StickyTaskAssignor.
> >>>>>>>>>>>
> >>>>>>>>>>> We need to think about where the proposed implementation would
> >>>>> take
> >>>>>>>> place
> >>>>>>>>>>> at, and personally I think it is not the best option to inject
> >>>>> all
> >>>>>> of
> >>>>>>>> them
> >>>>>>>>>>> into the StreamsPartitionAssignor / StickyTaskAssignor since
> the
> >>>>>>> logic
> >>>>>>>> of
> >>>>>>>>>>> "triggering another rebalance" etc would require some
> >>>>> coordinator
> >>>>>>> logic
> >>>>>>>>>>> which is hard to mimic at PartitionAssignor level. On the other
> >>>>>> hand,
> >>>>>>>>>>> since
> >>>>>>>>>>> we are embedding a KafkaConsumer client as a whole we cannot
> >>>>> just
> >>>>>>>> replace
> >>>>>>>>>>> ConsumerCoordinator with a specialized StreamsCoordinator like
> >>>>>>> Connect
> >>>>>>>>>>> does
> >>>>>>>>>>> in KIP-415. So I'd like to maybe split the current proposal in
> >>>>> both
> >>>>>>>>>>> consumer layer and streams-assignor layer like we did in
> >>>>>>>> KIP-98/KIP-129.
> >>>>>>>>>>> And then the key thing to consider is how to cut off the
> >>>>> boundary
> >>>>>> so
> >>>>>>>> that
> >>>>>>>>>>> the modifications we push to ConsumerCoordinator would be
> >>>>>> beneficial
> >>>>>>>>>>> universally for any consumers, while keep the Streams-specific
> >>>>>> logic
> >>>>>>> at
> >>>>>>>>>>> the
> >>>>>>>>>>> assignor level.
> >>>>>>>>>>> Yes, that's also my ideal plan. The details for the
> >>>>> implementation
> >>>>>>> are
> >>>>>>>>>>> depicted
> >>>>>>>>>>> in this doc<
> >>>>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>
> https://docs.google.com/document/d/1me2a5wvxAZT1QE6HkwyDl7C2TiBQlKN3Dpw_I1ro91U/edit#heading=h.qix74qdmekae
> >>>>>>>>> ,
> >>>>>>>> [
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>
> https://lh5.googleusercontent.com/DXWMyKNE9rFFIv7TNX56Q41QwqYp8ynivwWSJHHORqSRkoQxtraW2bqiB-NRUGAMYKkt8A=w1200-h630-p
> >>>>>>>> ]<
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>
> https://docs.google.com/document/d/1me2a5wvxAZT1QE6HkwyDl7C2TiBQlKN3Dpw_I1ro91U/edit#heading=h.qix74qdmekae
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>> [External] KStream Smooth Auto-scaling Implementation Plan<
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>
> https://docs.google.com/document/d/1me2a5wvxAZT1QE6HkwyDl7C2TiBQlKN3Dpw_I1ro91U/edit#heading=h.qix74qdmekae
> >>>>>>>>>
> >>>>>>>> docs.google.com
> >>>>>>>> KStream Incremental Rebalancing Implementation Plan Authors:
> Boyang
> >>>>>> Chen,
> >>>>>>>> Guozhang Wang KIP link Stage: [Draft | Review | Approved]
> >>>>> Background We
> >>>>>>>> initiated KIP-429 for the promotion of incremental rebalancing
> work
> >>>>> for
> >>>>>>>> KStream. Behind the scene, there is non-trivial amount of effort
> >>>>> that
> >>>>>>> needs
> >>>>>>>> to...
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>>>> and I have explained the reasoning on why we want to push a
> >>>>>>>>>>> global change of replacing ConsumerCoordinator with
> >>>>>>> StreamCoordinator.
> >>>>>>>>>>> The motivation
> >>>>>>>>>>> is that KIP space is usually used for public & algorithm level
> >>>>>>> change,
> >>>>>>>>>>> not for internal
> >>>>>>>>>>> implementation details.
> >>>>>>>>>>>
> >>>>>>>>>>> 3. Depending on which design direction we choose, our migration
> >>>>>> plan
> >>>>>>>> would
> >>>>>>>>>>> also be quite different. For example, if we stay with
> >>>>>>>> ConsumerCoordinator
> >>>>>>>>>>> whose protocol type is "consumer" still, and we can manage to
> >>>>> make
> >>>>>>> all
> >>>>>>>>>>> changes agnostic to brokers as well as to old versioned
> >>>>> consumers,
> >>>>>>> then
> >>>>>>>>>>> our
> >>>>>>>>>>> migration plan could be much easier.
> >>>>>>>>>>> Yes, the upgrade plan was designed to take the new
> >>>>>> StreamCoordinator
> >>>>>>>>>>> approach
> >>>>>>>>>>> which means we shall define a new protocol type. For existing
> >>>>>>>> application
> >>>>>>>>>>> we could only
> >>>>>>>>>>> maintain the same `consumer` protocol type is because current
> >>>>>> broker
> >>>>>>>> only
> >>>>>>>>>>> allows
> >>>>>>>>>>> change of protocol type when the consumer group is empty. It is
> >>>>> of
> >>>>>>>> course
> >>>>>>>>>>> user-unfriendly to force
> >>>>>>>>>>> a wipe-out for the entire application, and I don't think
> >>>>>> maintaining
> >>>>>>>> old
> >>>>>>>>>>> protocol type would greatly
> >>>>>>>>>>> impact ongoing services using new stream coordinator. WDYT?
> >>>>>>>>>>>
> >>>>>>>>>>> 4. I think one major issue related to this KIP is that today,
> in
> >>>>>> the
> >>>>>>>>>>> StickyPartitionAssignor, we always try to honor stickiness over
> >>>>>>>> workload
> >>>>>>>>>>> balance, and hence "learner task" is needed to break this
> >>>>> priority,
> >>>>>>> but
> >>>>>>>>>>> I'm
> >>>>>>>>>>> wondering if we can have a better solution within sticky task
> >>>>>>> assignor
> >>>>>>>>>>> that
> >>>>>>>>>>> accommodate this?
> >>>>>>>>>>> Great question! That's what I explained in the proposal, which
> >>>>> is
> >>>>>>> that
> >>>>>>>> we
> >>>>>>>>>>> should breakdown our
> >>>>>>>>>>> delivery into different stages. At very beginning, our goal is
> >>>>> to
> >>>>>>>> trigger
> >>>>>>>>>>> learner task assignment only on
> >>>>>>>>>>> `new` hosts, where we shall leverage leader's knowledge of
> >>>>> previous
> >>>>>>>> round
> >>>>>>>>>>> of rebalance to figure out. After
> >>>>>>>>>>> stage one, our goal is to have a smooth scaling up experience,
> >>>>> but
> >>>>>>> the
> >>>>>>>>>>> task balance problem is kind of orthogonal.
> >>>>>>>>>>> The load balance problem is a much broader topic than auto
> >>>>> scaling,
> >>>>>>>> which
> >>>>>>>>>>> I figure worth discussing within
> >>>>>>>>>>> this KIP's context since it's a naturally next-step, but
> >>>>> wouldn't
> >>>>>> be
> >>>>>>>> the
> >>>>>>>>>>> main topic.
> >>>>>>>>>>> Learner task or auto scaling support should be treated as `a
> >>>>>> helpful
> >>>>>>>>>>> mechanism to reach load balance`, but not `an algorithm
> defining
> >>>>>> load
> >>>>>>>>>>> balance`. It would be great if you could share some insights of
> >>>>> the
> >>>>>>>> stream
> >>>>>>>>>>> task balance, which eventually helps us to break out of the
> >>>>>> KIP-429's
> >>>>>>>> scope
> >>>>>>>>>>> and even define a separate KIP to focus on task weight &
> >>>>> assignment
> >>>>>>>> logic
> >>>>>>>>>>> improvement.
> >>>>>>>>>>>
> >>>>>>>>>>> Also thank you for making improvement on the KIP context and
> >>>>>>>> organization!
> >>>>>>>>>>>
> >>>>>>>>>>> Best,
> >>>>>>>>>>> Boyang
> >>>>>>>>>>> ________________________________
> >>>>>>>>>>> From: Guozhang Wang <wangg...@gmail.com>
> >>>>>>>>>>> Sent: Saturday, March 2, 2019 6:00 AM
> >>>>>>>>>>> To: dev
> >>>>>>>>>>> Subject: Re: [DISCUSS] KIP-429 : Smooth Auto-Scaling for Kafka
> >>>>>>> Streams
> >>>>>>>>>>>
> >>>>>>>>>>> Hello Boyang,
> >>>>>>>>>>>
> >>>>>>>>>>> I've just made a quick pass on the KIP and here are some
> >>>>> thoughts.
> >>>>>>>>>>>
> >>>>>>>>>>> Meta:
> >>>>>>>>>>>
> >>>>>>>>>>> 1. I'm still not sure if it's worthwhile to add a new type of
> >>>>>>> "learner
> >>>>>>>>>>> task" in addition to "standby task": if the only difference is
> >>>>> that
> >>>>>>> for
> >>>>>>>>>>> the
> >>>>>>>>>>> latter, we would consider workload balance while for the former
> >>>>> we
> >>>>>>>> would
> >>>>>>>>>>> not, I think we can just adjust the logic of StickyTaskAssignor
> >>>>> a
> >>>>>> bit
> >>>>>>>> to
> >>>>>>>>>>> break that difference. Adding a new type of task would be
> >>>>> adding a
> >>>>>>> lot
> >>>>>>>> of
> >>>>>>>>>>> code complexity, so if we can still piggy-back the logic on a
> >>>>>>>> standby-task
> >>>>>>>>>>> I would prefer to do so.
> >>>>>>>>>>>
> >>>>>>>>>>> 2. One thing that's still not clear from the KIP wiki itself is
> >>>>>> which
> >>>>>>>>>>> layer
> >>>>>>>>>>> would the logic be implemented at. Although for most KIPs we
> >>>>> would
> >>>>>>> not
> >>>>>>>>>>> require internal implementation details but only public facing
> >>>>> API
> >>>>>>>>>>> updates,
> >>>>>>>>>>> for a KIP like this I think it still requires to flesh out
> >>>>> details
> >>>>>> on
> >>>>>>>> the
> >>>>>>>>>>> implementation design. More specifically: today Streams embed a
> >>>>>> full
> >>>>>>>>>>> fledged Consumer client, which hard-code a ConsumerCoordinator
> >>>>>>> inside,
> >>>>>>>>>>> Streams then injects a StreamsPartitionAssignor to its plugable
> >>>>>>>>>>> PartitionAssignor interface and inside the
> >>>>> StreamsPartitionAssignor
> >>>>>>> we
> >>>>>>>>>>> also
> >>>>>>>>>>> have a TaskAssignor interface whose default implementation is
> >>>>>>>>>>> StickyPartitionAssignor. Streams partition assignor logic today
> >>>>>> sites
> >>>>>>>> in
> >>>>>>>>>>> the latter two classes. Hence the hierarchy today is:
> >>>>>>>>>>>
> >>>>>>>>>>> KafkaConsumer -> ConsumerCoordinator ->
> >>>>> StreamsPartitionAssignor ->
> >>>>>>>>>>> StickyTaskAssignor.
> >>>>>>>>>>>
> >>>>>>>>>>> We need to think about where the proposed implementation would
> >>>>> take
> >>>>>>>> place
> >>>>>>>>>>> at, and personally I think it is not the best option to inject
> >>>>> all
> >>>>>> of
> >>>>>>>> them
> >>>>>>>>>>> into the StreamsPartitionAssignor / StickyTaskAssignor since
> the
> >>>>>>> logic
> >>>>>>>> of
> >>>>>>>>>>> "triggering another rebalance" etc would require some
> >>>>> coordinator
> >>>>>>> logic
> >>>>>>>>>>> which is hard to mimic at PartitionAssignor level. On the other
> >>>>>> hand,
> >>>>>>>>>>> since
> >>>>>>>>>>> we are embedding a KafkaConsumer client as a whole we cannot
> >>>>> just
> >>>>>>>> replace
> >>>>>>>>>>> ConsumerCoordinator with a specialized StreamsCoordinator like
> >>>>>>> Connect
> >>>>>>>>>>> does
> >>>>>>>>>>> in KIP-415. So I'd like to maybe split the current proposal in
> >>>>> both
> >>>>>>>>>>> consumer layer and streams-assignor layer like we did in
> >>>>>>>> KIP-98/KIP-129.
> >>>>>>>>>>> And then the key thing to consider is how to cut off the
> >>>>> boundary
> >>>>>> so
> >>>>>>>> that
> >>>>>>>>>>> the modifications we push to ConsumerCoordinator would be
> >>>>>> beneficial
> >>>>>>>>>>> universally for any consumers, while keep the Streams-specific
> >>>>>> logic
> >>>>>>> at
> >>>>>>>>>>> the
> >>>>>>>>>>> assignor level.
> >>>>>>>>>>>
> >>>>>>>>>>> 3. Depending on which design direction we choose, our migration
> >>>>>> plan
> >>>>>>>> would
> >>>>>>>>>>> also be quite different. For example, if we stay with
> >>>>>>>> ConsumerCoordinator
> >>>>>>>>>>> whose protocol type is "consumer" still, and we can manage to
> >>>>> make
> >>>>>>> all
> >>>>>>>>>>> changes agnostic to brokers as well as to old versioned
> >>>>> consumers,
> >>>>>>> then
> >>>>>>>>>>> our
> >>>>>>>>>>> migration plan could be much easier.
> >>>>>>>>>>>
> >>>>>>>>>>> 4. I think one major issue related to this KIP is that today,
> in
> >>>>>> the
> >>>>>>>>>>> StickyPartitionAssignor, we always try to honor stickiness over
> >>>>>>>> workload
> >>>>>>>>>>> balance, and hence "learner task" is needed to break this
> >>>>> priority,
> >>>>>>> but
> >>>>>>>>>>> I'm
> >>>>>>>>>>> wondering if we can have a better solution within sticky task
> >>>>>>> assignor
> >>>>>>>>>>> that
> >>>>>>>>>>> accommodate this?
> >>>>>>>>>>>
> >>>>>>>>>>> Minor:
> >>>>>>>>>>>
> >>>>>>>>>>> 1. The idea of two rebalances have also been discussed in
> >>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-6145. So we should
> >>>>> add
> >>>>>>> the
> >>>>>>>>>>> reference on the wiki page as well.
> >>>>>>>>>>> 2. Could you also add a section describing how the subscription
> >>>>> /
> >>>>>>>>>>> assignment metadata will be re-formatted? Without this
> >>>>> information
> >>>>>> it
> >>>>>>>> is
> >>>>>>>>>>> hard to get to the bottom of your idea. For example in the
> >>>>> "Leader
> >>>>>>>>>>> Transfer
> >>>>>>>>>>> Before Scaling" section, I'm not sure why "S2 doesn't know S4
> is
> >>>>>> new
> >>>>>>>>>>> member"
> >>>>>>>>>>> and hence would blindly obey stickiness over workload balance
> >>>>>>>> requirement.
> >>>>>>>>>>>
> >>>>>>>>>>> Guozhang
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On Thu, Feb 28, 2019 at 11:05 AM Boyang Chen <
> >>>>> bche...@outlook.com>
> >>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Hey community friends,
> >>>>>>>>>>>>
> >>>>>>>>>>>> I'm gladly inviting you to have a look at the proposal to add
> >>>>>>>>>>> incremental
> >>>>>>>>>>>> rebalancing to Kafka Streams, A.K.A auto-scaling support.
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Smooth+Auto-Scaling+for+Kafka+Streams
> >>>>>>>>>>>>
> >>>>>>>>>>>> Special thanks to Guozhang for giving great guidances and
> >>>>>> important
> >>>>>>>>>>>> feedbacks while making this KIP!
> >>>>>>>>>>>>
> >>>>>>>>>>>> Best,
> >>>>>>>>>>>> Boyang
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> --
> >>>>>>>>>>> -- Guozhang
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> --
> >>>>>>>>>> -- Guozhang
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>> --
> >>>>>>> -- Guozhang
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>> --
> >>>>> -- Guozhang
> >>>>>
> >>>>
> >>>>
> >>>> --
> >>>> -- Guozhang
> >>>>
> >>>
> >>>
> >>
> >>
> >
>
>

-- 
-- Guozhang

Reply via email to