Hello John,

That sounds reasonable. Just double checked the code that with logging
disabled the corresponding checkpoint file would not contain any values,
just like a stateless task. So I think treating them logically the same is
fine.

Guozhang


On Wed, Aug 21, 2019 at 11:41 AM John Roesler <j...@confluent.io> wrote:

> Hi again, Guozhang,
>
> While writing up the section on stateless tasks (
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP-441:SmoothScalingOutforKafkaStreams-Statelesstasks
> ),
> I reconsidered whether stateful, but non-logged, tasks should actually
> report a lag of zero, versus not reporting any lag. By the definition of
> the "StatefulTasksToRankedCandidates" function, the leader would compute a
> lag of zero for these tasks anyway.
>
> Therefore, I think the same reasoning that I supplied you for stateless
> tasks applies, since the member and leader will agree on a lag of zero
> anyway, we can avoid adding them to the "Task Lags" map, and save some
> bytes in the JoinGroup request. This would be especially beneficial in an
> application that uses remote stores for _all_ its state stores, it would
> have an extremely lightweight JoinGroup request, with no task lags at all.
>
> WDYT?
> -John
>
> On Wed, Aug 21, 2019 at 1:17 PM John Roesler <j...@confluent.io> wrote:
>
> > Thanks, Guozhang.
> >
> > (Side note: I noticed on another pass over the discussion that I'd missed
> > addressing your comment about the potential race condition between state
> > cleanup and lag-based assignment. I've added a solution to the proposal:
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP-441:SmoothScalingOutforKafkaStreams-Racebetweenassignmentandstatecleanup
> > )
> >
> > In the JoinGroup (SubscriptionInfo) metadata, stateless tasks are not
> > represented at all. This should save us some bytes in the request
> metadata.
> > If we treated them like non-logged stateful tasks and reported a lag of
> 0,
> > the only difference is that the assignor would be able to tell which
> > members previously hosted that stateless task.
> >
> > I'd like to make a simplifying assumption that stateless tasks can just
> be
> > freely reassigned with no regard to stickiness at all, without impacting
> > performance. This is almost true. In fact, while assigned a stateless
> task,
> > a member fetches batches of records from the broker, so if we move the
> > stateless task assignment, this buffered input is wasted and just gets
> > dropped.
> >
> > However, we won't be moving the stateless tasks around all the time (just
> > during rebalances), and we have the requirement that the assigment
> > algorithm must stabilize to guard against perpetually shuffling a
> stateless
> > task from one node to another. So, my hope is that this small amount of
> > inefficiency would not be a performance-dominating factor. In exchange,
> we
> > gain the opportunity for the assignment algorithm to use the stateless
> > tasks as "filler" during unbalanced assignments. For example, if there
> is a
> > node that is just warming up with several standby tasks, maybe the
> > assignment can give more stateless tasks to that node to balance the
> > computational load across the cluster.
> >
> > It's worth noting that such an assignment would still not be considered
> > "balanced", so the ultimately balanced final state of the assignment
> (after
> > task movements) would still have the desired property that each stateful
> > and stateless task is evenly spread across the cluster.
> >
> > Does that seem reasonable?
> >
> > Thanks,
> > -John
> >
> > On Wed, Aug 21, 2019 at 11:22 AM Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> >> Hello John,
> >>
> >> I've made another pass on the wiki page again, overall LGTM. One meta
> >> comment about the "stateless" tasks: how do we represent them in the
> >> metadata? Are they just treated as stateful tasks with logging disabled,
> >> or
> >> are specially handled? It is not very clear in the description.
> >>
> >>
> >> Guozhang
> >>
> >> On Wed, Aug 21, 2019 at 8:43 AM John Roesler <j...@confluent.io> wrote:
> >>
> >> > I have also specifically called out that the assignment must achieve
> >> both
> >> > "instance" and "task" balance:
> >> >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP-441:SmoothScalingOutforKafkaStreams-Defining%22balance%22
> >> >
> >> > I've also addressed the problem of state stores with logging disabled:
> >> >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP-441:SmoothScalingOutforKafkaStreams-Statewithloggingdisabled
> >> >
> >> > I believe this addresses all the concerns that have been raised to
> date.
> >> > Apologies if I've overlooked one of your concerns.
> >> >
> >> > Please give the KIP another read and let me know of any further
> >> thoughts!
> >> > Hopefully, we can start the voting on this KIP by the end of the week.
> >> >
> >> > Thanks,
> >> > -John
> >> >
> >> > On Tue, Aug 20, 2019 at 5:16 PM John Roesler <j...@confluent.io>
> wrote:
> >> >
> >> > > In response to Bruno's concern #2, I've also added that section to
> the
> >> > > "Rejected Alternatives" section.
> >> > >
> >> > > Additionally, after reviewing some other assignment papers, I've
> >> > developed
> >> > > the concern that specifying which "phases" the assignment algorithm
> >> > should
> >> > > have, or indeed the logic of it at all, might be a mistake that
> >> > > over-constrains our ability to write an optimal algorithm.
> Therefore,
> >> > I've
> >> > > also refactored the KIP to just describe the protocol, and specify
> the
> >> > > requirements for the assignment algorithm, but not its exact
> behavior
> >> at
> >> > > all.
> >> > >
> >> > > Thanks,
> >> > > -John
> >> > >
> >> > > On Tue, Aug 20, 2019 at 5:13 PM John Roesler <j...@confluent.io>
> >> wrote:
> >> > >
> >> > >> Hi All,
> >> > >>
> >> > >> Thanks for the discussion. I've been considering the idea of giving
> >> the
> >> > >> "catching up" tasks a different name/role. I was in favor
> initially,
> >> but
> >> > >> after working though some details, I think it causes some problems,
> >> > which
> >> > >> I've written up in the "rejected alternatives" part of the KIP:
> >> > >>
> >> >
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP-441:SmoothScalingOutforKafkaStreams-Addinganewkindoftask(%22moving%22,%22recovering%22,%22learner%22)fortaskmovements
> >> > >>
> >> > >> Please give it a read and let me know what you think.
> >> > >>
> >> > >> Thanks,
> >> > >> -John
> >> > >>
> >> > >> On Thu, Aug 8, 2019 at 7:57 PM Guozhang Wang <wangg...@gmail.com>
> >> > wrote:
> >> > >>
> >> > >>> I think I agree with you Sophie. My gut feeling is that 1) it
> should
> >> > not
> >> > >>> be
> >> > >>> the major concern in assignor's algorithm for standby tasks not
> >> > catching
> >> > >>> up, but rather be tackled in different modules, and 2) a lot of
> >> > >>> optimization can be down at the stream thread itself, like
> dedicated
> >> > >>> threading and larger batching, or even complicated scheduling
> >> > mechanisms
> >> > >>> between running, restoring and standby tasks. In anyways, I think
> we
> >> > can
> >> > >>> take this out of the scope of KIP-441 for now.
> >> > >>>
> >> > >>>
> >> > >>> Guozhang
> >> > >>>
> >> > >>>
> >> > >>> On Thu, Aug 8, 2019 at 5:48 PM Sophie Blee-Goldman <
> >> > sop...@confluent.io>
> >> > >>> wrote:
> >> > >>>
> >> > >>> > > we may have other ways to not starving the standby tasks, for
> >> > >>> example, by
> >> > >>> > > using dedicate threads for standby tasks or even consider
> having
> >> > >>> > *higher> priority for standby than active* so that we always try
> >> to
> >> > >>> caught
> >> > >>> > up standby
> >> > >>> > > first, then process active
> >> > >>> >
> >> > >>> > This is an interesting idea, but seems likely to get in the way
> of
> >> > the
> >> > >>> > original idea of this KIP
> >> > >>> > -- if we always process standby tasks first, then if we are
> >> assigned
> >> > a
> >> > >>> new
> >> > >>> > standby task we
> >> > >>> > will have to wait for it to catch up completely before
> processing
> >> any
> >> > >>> > active tasks! That's
> >> > >>> > even worse than the situation this KIP is trying to help with,
> >> since
> >> > a
> >> > >>> new
> >> > >>> > standby task has to
> >> > >>> > restore from 0 (whereas an active task at least can take over
> from
> >> > >>> wherever
> >> > >>> > the standby was).
> >> > >>> >
> >> > >>> > During restoration -- while there exist any restoring tasks -- I
> >> > think
> >> > >>> it's
> >> > >>> > reasonable to de-prioritize the
> >> > >>> > standby tasks and just process restoring and active tasks so
> both
> >> can
> >> > >>> make
> >> > >>> > progress. But we should
> >> > >>> > let them catch up afterwards somehow -- maybe we can apply some
> >> kind
> >> > of
> >> > >>> > heuristic, like "if we haven't
> >> > >>> > processed standbys for X iterations, or Y milliseconds, do so
> >> now."
> >> > >>> >
> >> > >>> > Actually, it might even be beneficial to avoid processing
> >> standbys a
> >> > >>> record
> >> > >>> > or two at a time and instead
> >> > >>> > wait for a large enough batch to build up for the RocksDB
> >> > bulk-loading
> >> > >>> > benefits.
> >> > >>> >
> >> > >>> > I think the "use dedicated threads for standby" is the more
> >> promising
> >> > >>> end
> >> > >>> > goal, especially since
> >> > >>> > if we split restoration into "restoring tasks" then active and
> >> > standbys
> >> > >>> > share almost nothing. But
> >> > >>> > that seems like follow-up work to the current KIP :)
> >> > >>> >
> >> > >>> > On Thu, Aug 8, 2019 at 5:31 PM Sophie Blee-Goldman <
> >> > >>> sop...@confluent.io>
> >> > >>> > wrote:
> >> > >>> >
> >> > >>> > > Stateful tasks with logging disabled seem to be an interesting
> >> edge
> >> > >>> case.
> >> > >>> > > On the one hand,
> >> > >>> > > for balancing purposes they should be considered stateful
> since
> >> as
> >> > >>> > > Guozhang pointed out
> >> > >>> > > they are still "heavy" in IO costs. But for "catching up"
> >> purposes,
> >> > >>> ie
> >> > >>> > > when allocating standby
> >> > >>> > > tasks that will become active tasks, they should be considered
> >> > >>> stateless
> >> > >>> > > as there is so
> >> > >>> > > meaningful sense of their lag. We should never allocate
> standby
> >> > >>> tasks for
> >> > >>> > > them during the
> >> > >>> > > first rebalance, but should ensure they are evenly distributed
> >> > across
> >> > >>> > > instances. Maybe we
> >> > >>> > > should split these into a third category -- after we assign
> all
> >> > >>> stateful
> >> > >>> > > tasks with logging, we
> >> > >>> > > then distribute the set of logging-disabled stateful tasks to
> >> > improve
> >> > >>> > > balance, before lastly
> >> > >>> > > distributing stateless tasks?
> >> > >>> > >
> >> > >>> > > This actually leads into what I was just thinking, which is
> >> that we
> >> > >>> > really
> >> > >>> > > should distinguish the
> >> > >>> > > "catch-up" standbys from normal standbys as well as
> >> distinguishing
> >> > >>> > > actively processing tasks
> >> > >>> > > from active tasks that are still in the restore phase. It's
> >> > somewhat
> >> > >>> > > awkward that today, some
> >> > >>> > > active tasks just start processing immediately while others
> >> behave
> >> > >>> more
> >> > >>> > > like standby than active
> >> > >>> > > tasks for some time, before switching to real active. They
> first
> >> > use
> >> > >>> the
> >> > >>> > > restoreConsumer, then
> >> > >>> > > later only the "normal" consumer.
> >> > >>> > >
> >> > >>> > > However, this restore period is still distinct from normal
> >> standbys
> >> > >>> in a
> >> > >>> > > lot of ways -- the code path
> >> > >>> > > for restoring is different than for updating standbys, for
> >> example
> >> > >>> in how
> >> > >>> > > long we block on #poll.
> >> > >>> > > So in addition to giving them their own name -- let's go with
> >> > >>> restoring
> >> > >>> > > task for now -- they really
> >> > >>> > > do seem to deserve being their own distinct task. We can
> >> optimize
> >> > >>> them
> >> > >>> > for
> >> > >>> > > efficient conversion
> >> > >>> > > to active tasks since we know that's what they will be.
> >> > >>> > >
> >> > >>> > > This resolves some of the awkwardness of dealing with the
> >> special
> >> > >>> case
> >> > >>> > > mentioned above: we
> >> > >>> > > find a balanced assignment of stateful and stateless tasks,
> and
> >> > >>> create
> >> > >>> > > restoring tasks as needed.
> >> > >>> > > If logging is disabled, no restoring task is created.
> >> > >>> > >
> >> > >>> > >
> >> > >>> > > On Thu, Aug 8, 2019 at 3:44 PM Guozhang Wang <
> >> wangg...@gmail.com>
> >> > >>> wrote:
> >> > >>> > >
> >> > >>> > >> Regarding 3) above: I think for active task they should still
> >> be
> >> > >>> > >> considered
> >> > >>> > >> stateful since the processor would still pay IO cost
> accessing
> >> the
> >> > >>> > store,
> >> > >>> > >> but they would not have standby tasks?
> >> > >>> > >>
> >> > >>> > >> On Thu, Aug 8, 2019 at 7:49 AM Bruno Cadonna <
> >> br...@confluent.io>
> >> > >>> > wrote:
> >> > >>> > >>
> >> > >>> > >> > Hi,
> >> > >>> > >> >
> >> > >>> > >> > Thank you for the KIP!
> >> > >>> > >> >
> >> > >>> > >> > Some questions/comments:
> >> > >>> > >> >
> >> > >>> > >> > 1. I am wondering if the "stand-by" tasks that catch up
> state
> >> > >>> before
> >> > >>> > >> > the active task is switched deserve its own name in this
> KIP
> >> and
> >> > >>> maybe
> >> > >>> > >> > in the code. We have already stated that they are not true
> >> > >>> stand-by
> >> > >>> > >> > tasks, they are not configured through
> >> `num.standby.replicas`,
> >> > and
> >> > >>> > >> > maybe they have also other properties that distinguish them
> >> from
> >> > >>> true
> >> > >>> > >> > stand-by tasks of which we are not aware yet. For example,
> >> they
> >> > >>> may be
> >> > >>> > >> > prioritized differently than other tasks. Furthermore, the
> >> name
> >> > >>> > >> > "stand-by" does not really fit with the planned
> >> functionality of
> >> > >>> those
> >> > >>> > >> > tasks. In the following, I will call them false stand-by
> >> tasks.
> >> > >>> > >> >
> >> > >>> > >> > 2. Did you consider to trigger the probing rebalances not
> at
> >> > >>> regular
> >> > >>> > >> > time intervals but when the false stand-by tasks reach an
> >> > >>> acceptable
> >> > >>> > >> > lag? If you did consider, could you add a paragraph why you
> >> > >>> rejected
> >> > >>> > >> > this idea to the "Rejected Alternatives" section.
> >> > >>> > >> >
> >> > >>> > >> > 3. Are tasks that solely contain stores with disabled
> logging
> >> > >>> > >> > classified as stateful or stateless in the algorithm? I
> would
> >> > >>> guess
> >> > >>> > >> > stateless, although if possible they should be assigned to
> >> the
> >> > >>> same
> >> > >>> > >> > instance they had run before the rebalance. As far as I can
> >> see
> >> > >>> this
> >> > >>> > >> > special case is not handled in the algorithm.
> >> > >>> > >> >
> >> > >>> > >> > Best,
> >> > >>> > >> > Bruno
> >> > >>> > >> >
> >> > >>> > >> >
> >> > >>> > >> >
> >> > >>> > >> > On Thu, Aug 8, 2019 at 8:24 AM Guozhang Wang <
> >> > wangg...@gmail.com>
> >> > >>> > >> wrote:
> >> > >>> > >> > >
> >> > >>> > >> > > 1. Sounds good, just wanted to clarify; and it may worth
> >> > >>> documenting
> >> > >>> > >> it
> >> > >>> > >> > so
> >> > >>> > >> > > that users would not be surprised when monitoring their
> >> > >>> footprint.
> >> > >>> > >> > >
> >> > >>> > >> > > 2. Hmm I see... I think the trade-off can be described as
> >> "how
> >> > >>> much
> >> > >>> > >> > > imbalance would bother you to be willing to pay another
> >> > >>> rebalance,
> >> > >>> > >> along
> >> > >>> > >> > > with potentially more restoration lag", and the current
> >> > >>> definition
> >> > >>> > of
> >> > >>> > >> > > rebalance_factor can be considered as a rough measurement
> >> of
> >> > >>> that
> >> > >>> > >> > > imbalance. Of course one can argue that a finer grained
> >> > >>> measurement
> >> > >>> > >> could
> >> > >>> > >> > > be "resource footprint" like CPU / storage of each
> instance
> >> > >>> like we
> >> > >>> > >> have
> >> > >>> > >> > in
> >> > >>> > >> > > Kafka broker auto balancing tools, but I'd prefer not
> doing
> >> > >>> that as
> >> > >>> > >> part
> >> > >>> > >> > of
> >> > >>> > >> > > the library but more as an operational tool in the
> future.
> >> On
> >> > >>> the
> >> > >>> > >> other
> >> > >>> > >> > > hand, I've seen stateful and stateless tasks having very
> >> > >>> different
> >> > >>> > >> load,
> >> > >>> > >> > > and sometimes the only bottleneck of a Streams app is
> just
> >> one
> >> > >>> > >> stateful
> >> > >>> > >> > > sub-topology and whoever gets tasks of that sub-topology
> >> > become
> >> > >>> > >> hotspot
> >> > >>> > >> > > (and that's why our algorithm tries to balance per
> >> > sub-topology
> >> > >>> as
> >> > >>> > >> well),
> >> > >>> > >> > > so maybe we can just consider stateful tasks when
> >> calculating
> >> > >>> this
> >> > >>> > >> factor
> >> > >>> > >> > > as a very brute force heuristic?
> >> > >>> > >> > >
> >> > >>> > >> > > 3.a. Thinking about this a bit more, maybe it's better
> not
> >> try
> >> > >>> to
> >> > >>> > >> tackle
> >> > >>> > >> > an
> >> > >>> > >> > > unseen enemy just yet, and observe if it really emerges
> >> later,
> >> > >>> and
> >> > >>> > by
> >> > >>> > >> > then
> >> > >>> > >> > > we may have other ways to not starving the standby tasks,
> >> for
> >> > >>> > >> example, by
> >> > >>> > >> > > using dedicate threads for standby tasks or even consider
> >> > having
> >> > >>> > >> higher
> >> > >>> > >> > > priority for standby than active so that we always try to
> >> > >>> caught up
> >> > >>> > >> > standby
> >> > >>> > >> > > first, then process active; and if active's lagging
> >> compared
> >> > to
> >> > >>> > >> > > log-end-offset is increasing then we should increase
> >> capacity,
> >> > >>> etc
> >> > >>> > >> etc.
> >> > >>> > >> > >
> >> > >>> > >> > > 4. Actually with KIP-429 this may not be the case: we may
> >> not
> >> > >>> call
> >> > >>> > >> > > onPartitionsRevoked prior to rebalance any more so would
> >> not
> >> > >>> transit
> >> > >>> > >> > state
> >> > >>> > >> > > to PARTITIONS_REVOKED, and hence not cause the state of
> the
> >> > >>> instance
> >> > >>> > >> to
> >> > >>> > >> > be
> >> > >>> > >> > > REBALANCING. In other words, even if a instance is
> >> undergoing
> >> > a
> >> > >>> > >> rebalance
> >> > >>> > >> > > it's state may still be RUNNING and it may still be
> >> processing
> >> > >>> > >> records at
> >> > >>> > >> > > the same time.
> >> > >>> > >> > >
> >> > >>> > >> > >
> >> > >>> > >> > > On Wed, Aug 7, 2019 at 12:14 PM John Roesler <
> >> > j...@confluent.io
> >> > >>> >
> >> > >>> > >> wrote:
> >> > >>> > >> > >
> >> > >>> > >> > > > Hey Guozhang,
> >> > >>> > >> > > >
> >> > >>> > >> > > > Thanks for the review!
> >> > >>> > >> > > >
> >> > >>> > >> > > > 1. Yes, even with `num.standby.replicas := 0`, we will
> >> still
> >> > >>> > >> > temporarily
> >> > >>> > >> > > > allocate standby tasks to accomplish a no-downtime task
> >> > >>> migration.
> >> > >>> > >> > > > Although, I'd argue that this doesn't really violate
> the
> >> > >>> config,
> >> > >>> > as
> >> > >>> > >> the
> >> > >>> > >> > > > task isn't a true hot standby. As soon as it catches
> up,
> >> > we'll
> >> > >>> > >> > rebalance
> >> > >>> > >> > > > again, that task will become active, and the original
> >> > instance
> >> > >>> > that
> >> > >>> > >> > hosted
> >> > >>> > >> > > > the active task will no longer have the task assigned
> at
> >> > all.
> >> > >>> Once
> >> > >>> > >> the
> >> > >>> > >> > > > stateDirCleaner kicks in, we'll free the disk space
> from
> >> it,
> >> > >>> and
> >> > >>> > >> > return to
> >> > >>> > >> > > > the steady-state of having just one copy of the task in
> >> the
> >> > >>> > cluster.
> >> > >>> > >> > > >
> >> > >>> > >> > > > We can of course do without this, but I feel the
> current
> >> > >>> proposal
> >> > >>> > is
> >> > >>> > >> > > > operationally preferable, since it doesn't make
> >> configuring
> >> > >>> > >> > hot-standbys a
> >> > >>> > >> > > > pre-requisite for fast rebalances.
> >> > >>> > >> > > >
> >> > >>> > >> > > > 2. Yes, I think your interpretation is what we
> intended.
> >> The
> >> > >>> > default
> >> > >>> > >> > > > balance_factor would be 1, as it is implicitly today.
> >> What
> >> > >>> this
> >> > >>> > >> does is
> >> > >>> > >> > > > allows operators to trade off less balanced assignments
> >> > >>> against
> >> > >>> > >> fewer
> >> > >>> > >> > > > rebalances. If you have lots of space capacity in your
> >> > >>> instances,
> >> > >>> > >> this
> >> > >>> > >> > may
> >> > >>> > >> > > > be a perfectly fine tradeoff, and you may prefer for
> >> Streams
> >> > >>> not
> >> > >>> > to
> >> > >>> > >> > bother
> >> > >>> > >> > > > streaming GBs of data from the broker in pursuit of
> >> perfect
> >> > >>> > balance.
> >> > >>> > >> > Not
> >> > >>> > >> > > > married to this configuration, though. It was inspired
> by
> >> > the
> >> > >>> > >> related
> >> > >>> > >> > work
> >> > >>> > >> > > > research we did.
> >> > >>> > >> > > >
> >> > >>> > >> > > > 3. I'll take a look
> >> > >>> > >> > > >
> >> > >>> > >> > > > 3a. I think this is a good idea. I'd classify it as a
> >> type
> >> > of
> >> > >>> grey
> >> > >>> > >> > failure
> >> > >>> > >> > > > detection. It may make more sense to tackle grey
> >> failures as
> >> > >>> part
> >> > >>> > of
> >> > >>> > >> > the
> >> > >>> > >> > > > heartbeat protocol (as I POCed here:
> >> > >>> > >> > > > https://github.com/apache/kafka/pull/7096/files).
> WDYT?
> >> > >>> > >> > > >
> >> > >>> > >> > > > 4. Good catch! I didn't think about that before.
> Looking
> >> at
> >> > it
> >> > >>> > now,
> >> > >>> > >> > though,
> >> > >>> > >> > > > I wonder if we're actually protected already. The
> >> > >>> stateDirCleaner
> >> > >>> > >> > thread
> >> > >>> > >> > > > only executes if the instance is in RUNNING state, and
> >> > KIP-441
> >> > >>> > >> > proposes to
> >> > >>> > >> > > > use "probing rebalances" to report task lag. Hence,
> >> during
> >> > the
> >> > >>> > >> window
> >> > >>> > >> > > > between when the instance reports a lag and the
> assignor
> >> > >>> makes a
> >> > >>> > >> > decision
> >> > >>> > >> > > > about it, the instance should remain in REBALANCING
> >> state,
> >> > >>> right?
> >> > >>> > If
> >> > >>> > >> > so,
> >> > >>> > >> > > > then this should prevent the race condition. If not,
> >> then we
> >> > >>> do
> >> > >>> > >> indeed
> >> > >>> > >> > need
> >> > >>> > >> > > > to do something about it.
> >> > >>> > >> > > >
> >> > >>> > >> > > > 5. Good idea. I think that today, you can only see the
> >> > >>> consumer
> >> > >>> > lag,
> >> > >>> > >> > which
> >> > >>> > >> > > > is a poor substitute. I'll add some metrics to the
> >> proposal.
> >> > >>> > >> > > >
> >> > >>> > >> > > > Thanks again for the comments!
> >> > >>> > >> > > > -John
> >> > >>> > >> > > >
> >> > >>> > >> > > > On Tue, Aug 6, 2019 at 4:27 PM Guozhang Wang <
> >> > >>> wangg...@gmail.com>
> >> > >>> > >> > wrote:
> >> > >>> > >> > > >
> >> > >>> > >> > > > > Hello Sophie,
> >> > >>> > >> > > > >
> >> > >>> > >> > > > > Thanks for the proposed KIP. I left some comments on
> >> the
> >> > >>> wiki
> >> > >>> > >> itself,
> >> > >>> > >> > > > and I
> >> > >>> > >> > > > > think I'm still not very clear on a couple or those:
> >> > >>> > >> > > > >
> >> > >>> > >> > > > > 1. With this proposal, does that mean with
> >> > >>> num.standby.replicas
> >> > >>> > ==
> >> > >>> > >> > 0, we
> >> > >>> > >> > > > > may sometimes still have some standby tasks which may
> >> > >>> violate
> >> > >>> > the
> >> > >>> > >> > config?
> >> > >>> > >> > > > >
> >> > >>> > >> > > > > 2. I think I understand the rationale to consider
> lags
> >> > that
> >> > >>> is
> >> > >>> > >> below
> >> > >>> > >> > the
> >> > >>> > >> > > > > specified threshold to be equal, rather than still
> >> > >>> considering
> >> > >>> > >> 5000
> >> > >>> > >> > is
> >> > >>> > >> > > > > better than 5001 -- we do not want to "over-optimize"
> >> and
> >> > >>> > >> potentially
> >> > >>> > >> > > > falls
> >> > >>> > >> > > > > into endless rebalances back and forth.
> >> > >>> > >> > > > >
> >> > >>> > >> > > > > But I'm not clear about the rationale of the second
> >> > >>> parameter of
> >> > >>> > >> > > > >
> >> > >>> constrainedBalancedAssignment(StatefulTasksToRankedCandidates,
> >> > >>> > >> > > > > balance_factor):
> >> > >>> > >> > > > >
> >> > >>> > >> > > > > Does that mean, e.g. with balance_factor of 3, we'd
> >> > >>> consider two
> >> > >>> > >> > > > > assignments one resulting balance_factor 0 and one
> >> > resulting
> >> > >>> > >> > > > balance_factor
> >> > >>> > >> > > > > 3 to be equally optimized assignment and therefore
> may
> >> > "stop
> >> > >>> > >> early"?
> >> > >>> > >> > This
> >> > >>> > >> > > > > was not very convincing to me :P
> >> > >>> > >> > > > >
> >> > >>> > >> > > > > 3. There are a couple of minor comments about the
> >> > algorithm
> >> > >>> > >> itself,
> >> > >>> > >> > left
> >> > >>> > >> > > > on
> >> > >>> > >> > > > > the wiki page since it needs to refer to the exact
> line
> >> > and
> >> > >>> > better
> >> > >>> > >> > > > > displayed there.
> >> > >>> > >> > > > >
> >> > >>> > >> > > > > 3.a Another wild thought about the threshold itself:
> >> today
> >> > >>> the
> >> > >>> > >> > assignment
> >> > >>> > >> > > > > itself is memoryless, so we would not know if the
> >> reported
> >> > >>> > >> `TaskLag`
> >> > >>> > >> > > > itself
> >> > >>> > >> > > > > is increasing or decreasing even if the current value
> >> is
> >> > >>> under
> >> > >>> > the
> >> > >>> > >> > > > > threshold. I wonder if it worthy to make it a bit
> more
> >> > >>> > >> complicated to
> >> > >>> > >> > > > track
> >> > >>> > >> > > > > task lag trend at the assignor? Practically it may
> not
> >> be
> >> > >>> very
> >> > >>> > >> > uncommon
> >> > >>> > >> > > > > that stand-by tasks are not keeping up due to the
> fact
> >> > that
> >> > >>> > other
> >> > >>> > >> > active
> >> > >>> > >> > > > > tasks hosted on the same thread is starving the
> standby
> >> > >>> tasks.
> >> > >>> > >> > > > >
> >> > >>> > >> > > > > 4. There's a potential race condition risk when
> >> reporting
> >> > >>> > >> `TaskLags`
> >> > >>> > >> > in
> >> > >>> > >> > > > the
> >> > >>> > >> > > > > subscription: right after reporting it to the leader,
> >> the
> >> > >>> > cleanup
> >> > >>> > >> > thread
> >> > >>> > >> > > > > kicks in and deletes the state directory. If the task
> >> was
> >> > >>> > assigned
> >> > >>> > >> > to the
> >> > >>> > >> > > > > host it would cause it to restore from beginning and
> >> > >>> effectively
> >> > >>> > >> > make the
> >> > >>> > >> > > > > seemingly optimized assignment very sub-optimal.
> >> > >>> > >> > > > >
> >> > >>> > >> > > > > To be on the safer side we should consider either
> prune
> >> > out
> >> > >>> > those
> >> > >>> > >> > tasks
> >> > >>> > >> > > > > that are "close to be cleaned up" in the
> subscription,
> >> or
> >> > we
> >> > >>> > >> should
> >> > >>> > >> > delay
> >> > >>> > >> > > > > the cleanup right after we've included them in the
> >> > >>> subscription
> >> > >>> > in
> >> > >>> > >> > case
> >> > >>> > >> > > > > they are been selected as assigned tasks by the
> >> assignor.
> >> > >>> > >> > > > >
> >> > >>> > >> > > > > 5. This is a meta comment: I think it would be
> helpful
> >> to
> >> > >>> add
> >> > >>> > some
> >> > >>> > >> > user
> >> > >>> > >> > > > > visibility on the standby tasks lagging as well, via
> >> > >>> metrics for
> >> > >>> > >> > example.
> >> > >>> > >> > > > > Today it is hard for us to observe how far are our
> >> current
> >> > >>> > standby
> >> > >>> > >> > tasks
> >> > >>> > >> > > > > compared to the active tasks and whether that lag is
> >> being
> >> > >>> > >> > increasing or
> >> > >>> > >> > > > > decreasing. As a follow-up task, for example, the
> >> > rebalance
> >> > >>> > should
> >> > >>> > >> > also
> >> > >>> > >> > > > be
> >> > >>> > >> > > > > triggered if we realize that some standby task's lag
> is
> >> > >>> > increasing
> >> > >>> > >> > > > > indefinitely means that it cannot keep up (which is
> >> > another
> >> > >>> > >> indicator
> >> > >>> > >> > > > > either you need to add more resources with the
> >> > num.standbys
> >> > >>> or
> >> > >>> > >> your
> >> > >>> > >> > are
> >> > >>> > >> > > > > still not balanced enough).
> >> > >>> > >> > > > >
> >> > >>> > >> > > > >
> >> > >>> > >> > > > > On Tue, Aug 6, 2019 at 1:32 PM Sophie Blee-Goldman <
> >> > >>> > >> > sop...@confluent.io>
> >> > >>> > >> > > > > wrote:
> >> > >>> > >> > > > >
> >> > >>> > >> > > > > > Hey all,
> >> > >>> > >> > > > > >
> >> > >>> > >> > > > > > I'd like to kick off discussion on KIP-441, aimed
> at
> >> the
> >> > >>> long
> >> > >>> > >> > restore
> >> > >>> > >> > > > > times
> >> > >>> > >> > > > > > in Streams during which further active processing
> >> and IQ
> >> > >>> are
> >> > >>> > >> > blocked.
> >> > >>> > >> > > > > > Please give it a read and let us know your thoughts
> >> > >>> > >> > > > > >
> >> > >>> > >> > > > > >
> >> > >>> > >> > > > > >
> >> > >>> > >> > > > >
> >> > >>> > >> > > >
> >> > >>> > >> >
> >> > >>> > >>
> >> > >>> >
> >> > >>>
> >> >
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-441:+Smooth+Scaling+Out+for+Kafka+Streams
> >> > >>> > >> > > > > >
> >> > >>> > >> > > > > > Cheers,
> >> > >>> > >> > > > > > Sophie
> >> > >>> > >> > > > > >
> >> > >>> > >> > > > >
> >> > >>> > >> > > > >
> >> > >>> > >> > > > > --
> >> > >>> > >> > > > > -- Guozhang
> >> > >>> > >> > > > >
> >> > >>> > >> > > >
> >> > >>> > >> > >
> >> > >>> > >> > >
> >> > >>> > >> > > --
> >> > >>> > >> > > -- Guozhang
> >> > >>> > >> >
> >> > >>> > >>
> >> > >>> > >>
> >> > >>> > >> --
> >> > >>> > >> -- Guozhang
> >> > >>> > >>
> >> > >>> > >
> >> > >>> >
> >> > >>>
> >> > >>>
> >> > >>> --
> >> > >>> -- Guozhang
> >> > >>>
> >> > >>
> >> >
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
>


-- 
-- Guozhang

Reply via email to