Aha! I did miss that detail in your email before. Sorry for my density.

It does seem like, if it turns out to be a problem, it would be pretty
straightforward to add your proposal in. It wouldn't even require a version
bump, because the wire protocol and the assignment algorithm would be
mutually intelligible with and without this tweak.

Thanks for the thorough review! I'll start the vote shortly.

Thanks,
-John

On Thu, Sep 5, 2019 at 11:27 AM Guozhang Wang <wangg...@gmail.com> wrote:

> Hello John,
>
> What I was thinking is that for other hosts of non-logged store only tasks,
> they will be ranked with a lag of "1" (as I tried to illustrate with
> "StatefulTasksToRankedCandidates[task][1]
> := instance" in the previous email), so that the current host would be
> ranked first, while others would be ranked equally second. So the scenario
> would be:
>
> 1. For non-logged store only tasks, the current owner is ranked with lag
> "0", all others ranked with lag "1".
> 2. For mixed non-logged and logged store tasks, the current owned is ranked
> with lag "0", all others ranked by the summed lag of logged stores. And of
> course, for those whose summed lag is below the threshold, they will be
> ranked with lag "0" as well.
>
> But I agree that the case 1) requires special handling logic indeed: you
> need to first tell which tasks are non-logged store only tasks, and then
> add a lag "1" for all other instances. Since non-logged store only tasks
> are not very common so far I think I agree that it may not worth the
> special handling effort anyways, and if people complain about the
> non-stickiness we can always come back and revisit this policy anyways, so
> I'm fine with the current proposal as-is as well.
>
> Just made another pass on the wiki page and it LGTM to me now, I'm +1 on
> the KIP.
>
>
> Guozhang
>
>
>
>
> On Wed, Sep 4, 2019 at 8:19 PM John Roesler <j...@confluent.io> wrote:
>
> > I see; thanks for the clarification, Guozhang.
> >
> > If the prior owner of a non-logged, stateful task always reports 0, and
> all
> > other instances report nothing, then it seems like we need a special case
> > in the assignor to handle assigning these. I.e., the strategy of creating
> > "movement" standbys and waiting for them to be equally caught up to the
> > leader before moving the task with zero downtime doesn't work. Since no
> > standby tasks are possible for non-logged tasks, the other instances
> would
> > never be able to "catch up", and the algorithm would be doomed to just
> > continue pinning the task to its current instance, and would potentially
> > never be able to balance the cluster.
> >
> > Unless, of course, we put in a special case that says to keep it assigned
> > for a while and then move it.
> >
> > But in this case, how do we decide when to move it; it seems like any
> time
> > is as good as any other. And if this is the case, then we might as well
> > just move it right away, which is what the KIP currently proposes.
> >
> > Also, it seems like this creates an asymmetry with the handling of
> > non-logged stores in mixed logged/non-logged tasks (case (1) above). In
> > that case, we agree that the algorithm will make assignments without
> regard
> > to the non-logged stores at all, and would freely move the task between
> two
> > instances that are "equally" caught up on the logged stores. But if a
> task
> > contains only non-logged stores, it sounds like we would not have this
> > freedom of movement, but would just keep giving it back to the instance
> > that previously owned it?
> >
> > Regarding the latter comment, thanks for pointing out my oversight. I've
> > added the section "Iterative Balancing Assignments" to define this
> > behavior:
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP-441:SmoothScalingOutforKafkaStreams-IterativeBalancingAssignments
> >  .
> >
> > Thanks!
> > -John
> >
> > On Wed, Sep 4, 2019 at 5:58 PM Guozhang Wang <wangg...@gmail.com> wrote:
> >
> > > Hi John,
> > >
> > > I think I may mis-communicate my ideas a bit, what I meant is simply
> for
> > > case 1), let the active tasks reporting a lag of 0 instead of not
> > reporting
> > > any lags at all; in addition, for other hosts not hosting this tasks,
> and
> > > hence we do not know the offset lag at all we set the
> > > "StatefulTasksToRankedCandidates[task][1]
> > > := instance" just to make sure the current host is ranked the first
> while
> > > all others are ranked equally after it. In this case we would still
> favor
> > > stickiness for not moving such tasks out of its current host unless it
> > > violates balance, in which case we are free to move it to any other
> > hosts.
> > > Does that make sense?
> > >
> > > Also I just realized that in the update wiki page the algorithm section
> > of
> > > iteratively assigning / moving tasks based on
> > > StatefulTasksToRankedCandidates until convergence was missing (it was
> > there
> > > in the previous version), could you add it back?
> > >
> > > Guozhang
> > >
> > >
> > > On Wed, Sep 4, 2019 at 2:59 PM John Roesler <j...@confluent.io> wrote:
> > >
> > > > Thanks for the reply, Guozhang,
> > > >
> > > > I'll start by re-stating your cases, just to make sure we're on the
> > same
> > > > page...
> > > >
> > > > 1) The task is stateful, and all its stores are non-logged. For this
> > case
> > > > under the proposal, we would not have any standbys and the active
> > version
> > > > would actually not report any lag at all (not a lag of 0). Thus, all
> > > > instances would be considered equal when it comes to assignment,
> > although
> > > > the assignment logic would know that the task is "heavy" because it
> has
> > > > those non-logged stores and factor that in to the overall cluster
> > > balance.
> > > >
> > > > 2) The task is stateful and maybe has one logged and one non-logged
> > > store.
> > > > As you say, in this case, the non-logged store would not contribute
> at
> > > all
> > > > to anyone's reported lag. The active task would report a lag of 0,
> and
> > > the
> > > > standbys would report their lag on the logged store. The assignment
> > logic
> > > > would take these reported lags into account.
> > > >
> > > > It sounds like there might be some dissonance on point (1). It sounds
> > > like
> > > > you're saying we should try to assign non-logged stateful tasks back
> to
> > > an
> > > > instance that has previously hosted it, or maybe we should assign it
> > back
> > > > to the most recent instance that hosted it, and then only reassign it
> > if
> > > > the movement would affect balance. I guess I'm not opposed to this,
> > but I
> > > > also don't really see the advantage. The fact is that we'd still wind
> > up
> > > > migrating it much of the time, so no one could depend on it not
> getting
> > > > migrated, and at the same time we're paying a significant complexity
> > cost
> > > > in the assignor to support this case. Plus, we need to encode the
> > > previous
> > > > owner of the task somehow in the wire protocol, which means we pay a
> > > > payload-size penalty to support it as well.
> > > >
> > > > On the other hand, we should make our assignment as stable as
> possible
> > in
> > > > the implementation to avoid pointless "swapping" of logged and
> > non-logged
> > > > stateful tasks, as well as stateless tasks when it doesn't change the
> > > > balance at all to do so. It seems like this should accomplish the
> same
> > > > spiritual goal with no special cases.
> > > >
> > > > WDYT?
> > > > -John
> > > >
> > > > On Wed, Sep 4, 2019 at 2:26 PM Guozhang Wang <wangg...@gmail.com>
> > wrote:
> > > >
> > > > > Hello John,
> > > > >
> > > > > Your reasoning about non-logged state store looks good to me. But I
> > > think
> > > > > it worth considering two cases differently: 1) a task's all state
> > > stores
> > > > > are non-logged, 2) a task has non-logged state store but also have
> > > other
> > > > > logged state store.
> > > > >
> > > > > For 1), we should not have any standbys for such tasks at all, but
> it
> > > is
> > > > > not the same as a stateless task, because the only active task
> could
> > > > report
> > > > > a lag of "0" while all others should logically report the lag as
> > > infinity
> > > > > (of course, they would not encode such value).
> > > > >
> > > > > For 2), it is sort of the same to a normal case: active task
> > reporting
> > > a
> > > > > lag of 0 while standby task reporting a lag summing all other
> logged
> > > > state
> > > > > stores, meaning that the non-logged state store does not matter in
> > our
> > > > > assignment semantics.
> > > > >
> > > > > The rationale I had is to effectively favor stickiness for those
> > tasks
> > > in
> > > > > case 1) than arbitrarily reassign them even though it is true that
> > for
> > > a
> > > > > non-logged store there's no durability and hence restoration
> > guarantees
> > > > > anyways from Streams. Then algorithmically we may consider
> assigning
> > > > tasks
> > > > > with descending order of the number of instances reporting lags for
> > it,
> > > > > then for such tasks there's always one candidate reporting a lag of
> > 0,
> > > > and
> > > > > assigning them to any instance does not actually change the
> cumulated
> > > > total
> > > > > lag either. At that time we can decide "if it is still within load
> > > > balance
> > > > > factor, then let's respect stickiness, otherwise it's free to
> move".
> > > > WDYT?
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > > On Wed, Sep 4, 2019 at 8:30 AM John Roesler <j...@confluent.io>
> > wrote:
> > > > >
> > > > > > Hey Bruno,
> > > > > >
> > > > > > Thanks for taking another look. Some quick responses:
> > > > > >
> > > > > > 1) It just means the number of offsets in the topic. E.g., the
> LSO
> > is
> > > > > 100,
> > > > > > but the first offset is 40 due to retention, so there are 60
> > offsets
> > > in
> > > > > the
> > > > > > topic. Further, the lag on that topic would be considered to be
> 60
> > > for
> > > > > any
> > > > > > task that hadn't previously done any work on it.
> > > > > >
> > > > > > 2) This is undecidable in general. I.e., there's no way we can
> know
> > > > > whether
> > > > > > the store is remote or not, and hence whether we can freely
> assign
> > it
> > > > to
> > > > > > another instance, or whether we have to keep it on the same
> > instance.
> > > > > > However, there are a couple of reasons to go ahead and assume we
> > have
> > > > the
> > > > > > freedom to move such tasks.
> > > > > > * We know that nothing can prevent the loss of an instance in a
> > > cluster
> > > > > > (I.e., this is true of all cloud environments, as well as any
> > managed
> > > > > > virtualized cluster like mesos or kubernetes), so any Streams
> > program
> > > > > that
> > > > > > makes use of non-remote, non-logged state is doomed to lose its
> > state
> > > > > when
> > > > > > it loses an instance.
> > > > > > * If we take on a restriction that we cannot move such state
> > between
> > > > > > instances, we'd become overconstrained very quickly. Effectively,
> > if
> > > > you
> > > > > > made use of non-logged stores, and we didn't assume freedom of
> > > > movement,
> > > > > > then we couldn't make use of any new instances in your cluster.
> > > > > > * On the other hand, if we optimistically assume we can't move
> > state,
> > > > but
> > > > > > only reassign it when we lose an instance, then we're supporting
> > > > > > non-deterministic logic, because the program would produce
> > different
> > > > > > results, depending on whether you lost a node during the
> execution
> > or
> > > > > not.
> > > > > > 2b) That last point goes along with your side note. I'm not sure
> if
> > > we
> > > > > > should bother dropping such state on every reassignment, though.
> It
> > > > seems
> > > > > > to be undefined territory enough that we can just do the simplest
> > > thing
> > > > > and
> > > > > > assume people have made their own (external) provisions for
> > > durability.
> > > > > > I.e., when we say "non-logged", we mean that it doesn't make use
> of
> > > > _our_
> > > > > > durability mechanism. I'm arguing that the only sane assumption
> is
> > > that
> > > > > > such folks have opted to use their own durability measures, and
> we
> > > > should
> > > > > > just assume it works with no special considerations in the
> > assignment
> > > > > > algorithm.
> > > > > >
> > > > > > 3) Good catch! I've fixed it.
> > > > > >
> > > > > > Thanks again!
> > > > > > -John
> > > > > >
> > > > > > On Wed, Sep 4, 2019 at 6:09 AM Bruno Cadonna <br...@confluent.io
> >
> > > > wrote:
> > > > > >
> > > > > > > Hi,
> > > > > > >
> > > > > > > 1) What do you mean with "full set of offsets in the topic"? Is
> > > this
> > > > > > > the sum of all offsets of the changelog partitions of the task?
> > > > > > >
> > > > > > > 2) I am not sure whether non-logged stateful tasks should be
> > > > > > > effectively treated as stateless tasks during assignment. First
> > we
> > > > > > > need to decide whether a non-logged stateful task should
> > preferably
> > > > be
> > > > > > > assigned to the same instance on which it just run in order to
> > > > > > > continue to use its state or not.
> > > > > > >
> > > > > > > 3) In the example, you define stand-by tasks {S1, S2, ...} but
> > > never
> > > > > > > use them, because below you use a dedicated row for stand-by
> > tasks.
> > > > > > >
> > > > > > > As a side note to 2) since it is not directly related to this
> > KIP:
> > > We
> > > > > > > should decide if we want to avoid the possible non-determinism
> > > > > > > introduced by non-logged stores or not. That is, if an instance
> > > hosts
> > > > > > > a task with non-logged stores then we can have two cases after
> > the
> > > > > > > next rebalance: a) the task stays on the same instance and
> > > continues
> > > > > > > to use the same state store as used so far or b) the task is
> > > assigned
> > > > > > > to another instance and it starts an empty state store. The
> > > produced
> > > > > > > results for these two cases might differ. To avoid the
> > > > nondeterminism,
> > > > > > > non-logged state stores would need to be wiped out before
> > > assignment.
> > > > > > > Then the question arises, how the removal of non-logged state
> > > stores
> > > > > > > before assignment would affect backward-compatibility.
> > > > > > >
> > > > > > > Best,
> > > > > > > Bruno
> > > > > > >
> > > > > > > On Wed, Aug 21, 2019 at 11:40 PM John Roesler <
> j...@confluent.io
> > >
> > > > > wrote:
> > > > > > > >
> > > > > > > > Hi Guozhang,
> > > > > > > >
> > > > > > > > > My impression from your previous email is that inside the
> > > > algorithm
> > > > > > > when
> > > > > > > > we
> > > > > > > > are "filling" them to instances some deterministic logic
> would
> > be
> > > > > used
> > > > > > to
> > > > > > > > avoid the above case, is that correct?
> > > > > > > >
> > > > > > > > Yes, that was my plan, but I didn't formalize it. There was a
> > > > > > requirement
> > > > > > > > that the assignment algorithm must not produce a new
> assignment
> > > if
> > > > > the
> > > > > > > > current assignment is already balanced, so at the least, any
> > > > > thrashing
> > > > > > > > would be restricted to the "balancing" phase while tasks are
> > > moving
> > > > > > > around
> > > > > > > > the cluster.
> > > > > > > >
> > > > > > > > Anyway, I think it would be good to say that we'll "try to"
> > > produce
> > > > > > > stable
> > > > > > > > assignments, so I've added a "should" clause to the
> assignment
> > > > spec:
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP-441:SmoothScalingOutforKafkaStreams-AssignmentAlgorithm
> > > > > > > >
> > > > > > > > For example, we would sort the stateless tasks and available
> > > > > instances
> > > > > > > > before assigning them, so that the stateless task assignment
> > > would
> > > > > > mostly
> > > > > > > > stay stable between assignments, modulo the compute capacity
> of
> > > the
> > > > > > > > instances changing a little as active stateful tasks get
> > assigned
> > > > in
> > > > > > more
> > > > > > > > balanced ways.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > -John
> > > > > > > >
> > > > > > > >
> > > > > > > > On Wed, Aug 21, 2019 at 1:55 PM Guozhang Wang <
> > > wangg...@gmail.com>
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hello John,
> > > > > > > > >
> > > > > > > > > That sounds reasonable. Just double checked the code that
> > with
> > > > > > logging
> > > > > > > > > disabled the corresponding checkpoint file would not
> contain
> > > any
> > > > > > > values,
> > > > > > > > > just like a stateless task. So I think treating them
> > logically
> > > > the
> > > > > > > same is
> > > > > > > > > fine.
> > > > > > > > >
> > > > > > > > > Guozhang
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Wed, Aug 21, 2019 at 11:41 AM John Roesler <
> > > j...@confluent.io
> > > > >
> > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi again, Guozhang,
> > > > > > > > > >
> > > > > > > > > > While writing up the section on stateless tasks (
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP-441:SmoothScalingOutforKafkaStreams-Statelesstasks
> > > > > > > > > > ),
> > > > > > > > > > I reconsidered whether stateful, but non-logged, tasks
> > should
> > > > > > > actually
> > > > > > > > > > report a lag of zero, versus not reporting any lag. By
> the
> > > > > > > definition of
> > > > > > > > > > the "StatefulTasksToRankedCandidates" function, the
> leader
> > > > would
> > > > > > > compute
> > > > > > > > > a
> > > > > > > > > > lag of zero for these tasks anyway.
> > > > > > > > > >
> > > > > > > > > > Therefore, I think the same reasoning that I supplied you
> > for
> > > > > > > stateless
> > > > > > > > > > tasks applies, since the member and leader will agree on
> a
> > > lag
> > > > of
> > > > > > > zero
> > > > > > > > > > anyway, we can avoid adding them to the "Task Lags" map,
> > and
> > > > save
> > > > > > > some
> > > > > > > > > > bytes in the JoinGroup request. This would be especially
> > > > > beneficial
> > > > > > > in an
> > > > > > > > > > application that uses remote stores for _all_ its state
> > > stores,
> > > > > it
> > > > > > > would
> > > > > > > > > > have an extremely lightweight JoinGroup request, with no
> > task
> > > > > lags
> > > > > > at
> > > > > > > > > all.
> > > > > > > > > >
> > > > > > > > > > WDYT?
> > > > > > > > > > -John
> > > > > > > > > >
> > > > > > > > > > On Wed, Aug 21, 2019 at 1:17 PM John Roesler <
> > > > j...@confluent.io>
> > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Thanks, Guozhang.
> > > > > > > > > > >
> > > > > > > > > > > (Side note: I noticed on another pass over the
> discussion
> > > > that
> > > > > > I'd
> > > > > > > > > missed
> > > > > > > > > > > addressing your comment about the potential race
> > condition
> > > > > > between
> > > > > > > > > state
> > > > > > > > > > > cleanup and lag-based assignment. I've added a solution
> > to
> > > > the
> > > > > > > > > proposal:
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP-441:SmoothScalingOutforKafkaStreams-Racebetweenassignmentandstatecleanup
> > > > > > > > > > > )
> > > > > > > > > > >
> > > > > > > > > > > In the JoinGroup (SubscriptionInfo) metadata, stateless
> > > tasks
> > > > > are
> > > > > > > not
> > > > > > > > > > > represented at all. This should save us some bytes in
> the
> > > > > request
> > > > > > > > > > metadata.
> > > > > > > > > > > If we treated them like non-logged stateful tasks and
> > > > reported
> > > > > a
> > > > > > > lag of
> > > > > > > > > > 0,
> > > > > > > > > > > the only difference is that the assignor would be able
> to
> > > > tell
> > > > > > > which
> > > > > > > > > > > members previously hosted that stateless task.
> > > > > > > > > > >
> > > > > > > > > > > I'd like to make a simplifying assumption that
> stateless
> > > > tasks
> > > > > > can
> > > > > > > just
> > > > > > > > > > be
> > > > > > > > > > > freely reassigned with no regard to stickiness at all,
> > > > without
> > > > > > > > > impacting
> > > > > > > > > > > performance. This is almost true. In fact, while
> > assigned a
> > > > > > > stateless
> > > > > > > > > > task,
> > > > > > > > > > > a member fetches batches of records from the broker, so
> > if
> > > we
> > > > > > move
> > > > > > > the
> > > > > > > > > > > stateless task assignment, this buffered input is
> wasted
> > > and
> > > > > just
> > > > > > > gets
> > > > > > > > > > > dropped.
> > > > > > > > > > >
> > > > > > > > > > > However, we won't be moving the stateless tasks around
> > all
> > > > the
> > > > > > time
> > > > > > > > > (just
> > > > > > > > > > > during rebalances), and we have the requirement that
> the
> > > > > > assigment
> > > > > > > > > > > algorithm must stabilize to guard against perpetually
> > > > > shuffling a
> > > > > > > > > > stateless
> > > > > > > > > > > task from one node to another. So, my hope is that this
> > > small
> > > > > > > amount of
> > > > > > > > > > > inefficiency would not be a performance-dominating
> > factor.
> > > In
> > > > > > > exchange,
> > > > > > > > > > we
> > > > > > > > > > > gain the opportunity for the assignment algorithm to
> use
> > > the
> > > > > > > stateless
> > > > > > > > > > > tasks as "filler" during unbalanced assignments. For
> > > example,
> > > > > if
> > > > > > > there
> > > > > > > > > > is a
> > > > > > > > > > > node that is just warming up with several standby
> tasks,
> > > > maybe
> > > > > > the
> > > > > > > > > > > assignment can give more stateless tasks to that node
> to
> > > > > balance
> > > > > > > the
> > > > > > > > > > > computational load across the cluster.
> > > > > > > > > > >
> > > > > > > > > > > It's worth noting that such an assignment would still
> not
> > > be
> > > > > > > considered
> > > > > > > > > > > "balanced", so the ultimately balanced final state of
> the
> > > > > > > assignment
> > > > > > > > > > (after
> > > > > > > > > > > task movements) would still have the desired property
> > that
> > > > each
> > > > > > > > > stateful
> > > > > > > > > > > and stateless task is evenly spread across the cluster.
> > > > > > > > > > >
> > > > > > > > > > > Does that seem reasonable?
> > > > > > > > > > >
> > > > > > > > > > > Thanks,
> > > > > > > > > > > -John
> > > > > > > > > > >
> > > > > > > > > > > On Wed, Aug 21, 2019 at 11:22 AM Guozhang Wang <
> > > > > > wangg...@gmail.com
> > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > >> Hello John,
> > > > > > > > > > >>
> > > > > > > > > > >> I've made another pass on the wiki page again, overall
> > > LGTM.
> > > > > One
> > > > > > > meta
> > > > > > > > > > >> comment about the "stateless" tasks: how do we
> represent
> > > > them
> > > > > in
> > > > > > > the
> > > > > > > > > > >> metadata? Are they just treated as stateful tasks with
> > > > logging
> > > > > > > > > disabled,
> > > > > > > > > > >> or
> > > > > > > > > > >> are specially handled? It is not very clear in the
> > > > > description.
> > > > > > > > > > >>
> > > > > > > > > > >>
> > > > > > > > > > >> Guozhang
> > > > > > > > > > >>
> > > > > > > > > > >> On Wed, Aug 21, 2019 at 8:43 AM John Roesler <
> > > > > j...@confluent.io
> > > > > > >
> > > > > > > > > wrote:
> > > > > > > > > > >>
> > > > > > > > > > >> > I have also specifically called out that the
> > assignment
> > > > must
> > > > > > > achieve
> > > > > > > > > > >> both
> > > > > > > > > > >> > "instance" and "task" balance:
> > > > > > > > > > >> >
> > > > > > > > > > >> >
> > > > > > > > > > >>
> > > > > > > > > >
> > > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP-441:SmoothScalingOutforKafkaStreams-Defining%22balance%22
> > > > > > > > > > >> >
> > > > > > > > > > >> > I've also addressed the problem of state stores with
> > > > logging
> > > > > > > > > disabled:
> > > > > > > > > > >> >
> > > > > > > > > > >> >
> > > > > > > > > > >>
> > > > > > > > > >
> > > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP-441:SmoothScalingOutforKafkaStreams-Statewithloggingdisabled
> > > > > > > > > > >> >
> > > > > > > > > > >> > I believe this addresses all the concerns that have
> > been
> > > > > > raised
> > > > > > > to
> > > > > > > > > > date.
> > > > > > > > > > >> > Apologies if I've overlooked one of your concerns.
> > > > > > > > > > >> >
> > > > > > > > > > >> > Please give the KIP another read and let me know of
> > any
> > > > > > further
> > > > > > > > > > >> thoughts!
> > > > > > > > > > >> > Hopefully, we can start the voting on this KIP by
> the
> > > end
> > > > of
> > > > > > the
> > > > > > > > > week.
> > > > > > > > > > >> >
> > > > > > > > > > >> > Thanks,
> > > > > > > > > > >> > -John
> > > > > > > > > > >> >
> > > > > > > > > > >> > On Tue, Aug 20, 2019 at 5:16 PM John Roesler <
> > > > > > j...@confluent.io
> > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > > >> >
> > > > > > > > > > >> > > In response to Bruno's concern #2, I've also added
> > > that
> > > > > > > section to
> > > > > > > > > > the
> > > > > > > > > > >> > > "Rejected Alternatives" section.
> > > > > > > > > > >> > >
> > > > > > > > > > >> > > Additionally, after reviewing some other
> assignment
> > > > > papers,
> > > > > > > I've
> > > > > > > > > > >> > developed
> > > > > > > > > > >> > > the concern that specifying which "phases" the
> > > > assignment
> > > > > > > > > algorithm
> > > > > > > > > > >> > should
> > > > > > > > > > >> > > have, or indeed the logic of it at all, might be a
> > > > mistake
> > > > > > > that
> > > > > > > > > > >> > > over-constrains our ability to write an optimal
> > > > algorithm.
> > > > > > > > > > Therefore,
> > > > > > > > > > >> > I've
> > > > > > > > > > >> > > also refactored the KIP to just describe the
> > protocol,
> > > > and
> > > > > > > specify
> > > > > > > > > > the
> > > > > > > > > > >> > > requirements for the assignment algorithm, but not
> > its
> > > > > exact
> > > > > > > > > > behavior
> > > > > > > > > > >> at
> > > > > > > > > > >> > > all.
> > > > > > > > > > >> > >
> > > > > > > > > > >> > > Thanks,
> > > > > > > > > > >> > > -John
> > > > > > > > > > >> > >
> > > > > > > > > > >> > > On Tue, Aug 20, 2019 at 5:13 PM John Roesler <
> > > > > > > j...@confluent.io>
> > > > > > > > > > >> wrote:
> > > > > > > > > > >> > >
> > > > > > > > > > >> > >> Hi All,
> > > > > > > > > > >> > >>
> > > > > > > > > > >> > >> Thanks for the discussion. I've been considering
> > the
> > > > idea
> > > > > > of
> > > > > > > > > giving
> > > > > > > > > > >> the
> > > > > > > > > > >> > >> "catching up" tasks a different name/role. I was
> in
> > > > favor
> > > > > > > > > > initially,
> > > > > > > > > > >> but
> > > > > > > > > > >> > >> after working though some details, I think it
> > causes
> > > > some
> > > > > > > > > problems,
> > > > > > > > > > >> > which
> > > > > > > > > > >> > >> I've written up in the "rejected alternatives"
> part
> > > of
> > > > > the
> > > > > > > KIP:
> > > > > > > > > > >> > >>
> > > > > > > > > > >> >
> > > > > > > > > > >>
> > > > > > > > > >
> > > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP-441:SmoothScalingOutforKafkaStreams-Addinganewkindoftask(%22moving%22,%22recovering%22,%22learner%22)fortaskmovements
> > > > > > > > > > >> > >>
> > > > > > > > > > >> > >> Please give it a read and let me know what you
> > think.
> > > > > > > > > > >> > >>
> > > > > > > > > > >> > >> Thanks,
> > > > > > > > > > >> > >> -John
> > > > > > > > > > >> > >>
> > > > > > > > > > >> > >> On Thu, Aug 8, 2019 at 7:57 PM Guozhang Wang <
> > > > > > > wangg...@gmail.com
> > > > > > > > > >
> > > > > > > > > > >> > wrote:
> > > > > > > > > > >> > >>
> > > > > > > > > > >> > >>> I think I agree with you Sophie. My gut feeling
> is
> > > > that
> > > > > 1)
> > > > > > > it
> > > > > > > > > > should
> > > > > > > > > > >> > not
> > > > > > > > > > >> > >>> be
> > > > > > > > > > >> > >>> the major concern in assignor's algorithm for
> > > standby
> > > > > > tasks
> > > > > > > not
> > > > > > > > > > >> > catching
> > > > > > > > > > >> > >>> up, but rather be tackled in different modules,
> > and
> > > > 2) a
> > > > > > > lot of
> > > > > > > > > > >> > >>> optimization can be down at the stream thread
> > > itself,
> > > > > like
> > > > > > > > > > dedicated
> > > > > > > > > > >> > >>> threading and larger batching, or even
> complicated
> > > > > > > scheduling
> > > > > > > > > > >> > mechanisms
> > > > > > > > > > >> > >>> between running, restoring and standby tasks. In
> > > > > anyways,
> > > > > > I
> > > > > > > > > think
> > > > > > > > > > we
> > > > > > > > > > >> > can
> > > > > > > > > > >> > >>> take this out of the scope of KIP-441 for now.
> > > > > > > > > > >> > >>>
> > > > > > > > > > >> > >>>
> > > > > > > > > > >> > >>> Guozhang
> > > > > > > > > > >> > >>>
> > > > > > > > > > >> > >>>
> > > > > > > > > > >> > >>> On Thu, Aug 8, 2019 at 5:48 PM Sophie
> > Blee-Goldman <
> > > > > > > > > > >> > sop...@confluent.io>
> > > > > > > > > > >> > >>> wrote:
> > > > > > > > > > >> > >>>
> > > > > > > > > > >> > >>> > > we may have other ways to not starving the
> > > standby
> > > > > > > tasks,
> > > > > > > > > for
> > > > > > > > > > >> > >>> example, by
> > > > > > > > > > >> > >>> > > using dedicate threads for standby tasks or
> > even
> > > > > > > consider
> > > > > > > > > > having
> > > > > > > > > > >> > >>> > *higher> priority for standby than active* so
> > that
> > > > we
> > > > > > > always
> > > > > > > > > try
> > > > > > > > > > >> to
> > > > > > > > > > >> > >>> caught
> > > > > > > > > > >> > >>> > up standby
> > > > > > > > > > >> > >>> > > first, then process active
> > > > > > > > > > >> > >>> >
> > > > > > > > > > >> > >>> > This is an interesting idea, but seems likely
> to
> > > get
> > > > > in
> > > > > > > the
> > > > > > > > > way
> > > > > > > > > > of
> > > > > > > > > > >> > the
> > > > > > > > > > >> > >>> > original idea of this KIP
> > > > > > > > > > >> > >>> > -- if we always process standby tasks first,
> > then
> > > if
> > > > > we
> > > > > > > are
> > > > > > > > > > >> assigned
> > > > > > > > > > >> > a
> > > > > > > > > > >> > >>> new
> > > > > > > > > > >> > >>> > standby task we
> > > > > > > > > > >> > >>> > will have to wait for it to catch up
> completely
> > > > before
> > > > > > > > > > processing
> > > > > > > > > > >> any
> > > > > > > > > > >> > >>> > active tasks! That's
> > > > > > > > > > >> > >>> > even worse than the situation this KIP is
> trying
> > > to
> > > > > help
> > > > > > > with,
> > > > > > > > > > >> since
> > > > > > > > > > >> > a
> > > > > > > > > > >> > >>> new
> > > > > > > > > > >> > >>> > standby task has to
> > > > > > > > > > >> > >>> > restore from 0 (whereas an active task at
> least
> > > can
> > > > > take
> > > > > > > over
> > > > > > > > > > from
> > > > > > > > > > >> > >>> wherever
> > > > > > > > > > >> > >>> > the standby was).
> > > > > > > > > > >> > >>> >
> > > > > > > > > > >> > >>> > During restoration -- while there exist any
> > > > restoring
> > > > > > > tasks
> > > > > > > > > -- I
> > > > > > > > > > >> > think
> > > > > > > > > > >> > >>> it's
> > > > > > > > > > >> > >>> > reasonable to de-prioritize the
> > > > > > > > > > >> > >>> > standby tasks and just process restoring and
> > > active
> > > > > > tasks
> > > > > > > so
> > > > > > > > > > both
> > > > > > > > > > >> can
> > > > > > > > > > >> > >>> make
> > > > > > > > > > >> > >>> > progress. But we should
> > > > > > > > > > >> > >>> > let them catch up afterwards somehow -- maybe
> we
> > > can
> > > > > > apply
> > > > > > > > > some
> > > > > > > > > > >> kind
> > > > > > > > > > >> > of
> > > > > > > > > > >> > >>> > heuristic, like "if we haven't
> > > > > > > > > > >> > >>> > processed standbys for X iterations, or Y
> > > > > milliseconds,
> > > > > > > do so
> > > > > > > > > > >> now."
> > > > > > > > > > >> > >>> >
> > > > > > > > > > >> > >>> > Actually, it might even be beneficial to avoid
> > > > > > processing
> > > > > > > > > > >> standbys a
> > > > > > > > > > >> > >>> record
> > > > > > > > > > >> > >>> > or two at a time and instead
> > > > > > > > > > >> > >>> > wait for a large enough batch to build up for
> > the
> > > > > > RocksDB
> > > > > > > > > > >> > bulk-loading
> > > > > > > > > > >> > >>> > benefits.
> > > > > > > > > > >> > >>> >
> > > > > > > > > > >> > >>> > I think the "use dedicated threads for
> standby"
> > is
> > > > the
> > > > > > > more
> > > > > > > > > > >> promising
> > > > > > > > > > >> > >>> end
> > > > > > > > > > >> > >>> > goal, especially since
> > > > > > > > > > >> > >>> > if we split restoration into "restoring tasks"
> > > then
> > > > > > > active and
> > > > > > > > > > >> > standbys
> > > > > > > > > > >> > >>> > share almost nothing. But
> > > > > > > > > > >> > >>> > that seems like follow-up work to the current
> > KIP
> > > :)
> > > > > > > > > > >> > >>> >
> > > > > > > > > > >> > >>> > On Thu, Aug 8, 2019 at 5:31 PM Sophie
> > > Blee-Goldman <
> > > > > > > > > > >> > >>> sop...@confluent.io>
> > > > > > > > > > >> > >>> > wrote:
> > > > > > > > > > >> > >>> >
> > > > > > > > > > >> > >>> > > Stateful tasks with logging disabled seem to
> > be
> > > an
> > > > > > > > > interesting
> > > > > > > > > > >> edge
> > > > > > > > > > >> > >>> case.
> > > > > > > > > > >> > >>> > > On the one hand,
> > > > > > > > > > >> > >>> > > for balancing purposes they should be
> > considered
> > > > > > > stateful
> > > > > > > > > > since
> > > > > > > > > > >> as
> > > > > > > > > > >> > >>> > > Guozhang pointed out
> > > > > > > > > > >> > >>> > > they are still "heavy" in IO costs. But for
> > > > > "catching
> > > > > > > up"
> > > > > > > > > > >> purposes,
> > > > > > > > > > >> > >>> ie
> > > > > > > > > > >> > >>> > > when allocating standby
> > > > > > > > > > >> > >>> > > tasks that will become active tasks, they
> > should
> > > > be
> > > > > > > > > considered
> > > > > > > > > > >> > >>> stateless
> > > > > > > > > > >> > >>> > > as there is so
> > > > > > > > > > >> > >>> > > meaningful sense of their lag. We should
> never
> > > > > > allocate
> > > > > > > > > > standby
> > > > > > > > > > >> > >>> tasks for
> > > > > > > > > > >> > >>> > > them during the
> > > > > > > > > > >> > >>> > > first rebalance, but should ensure they are
> > > evenly
> > > > > > > > > distributed
> > > > > > > > > > >> > across
> > > > > > > > > > >> > >>> > > instances. Maybe we
> > > > > > > > > > >> > >>> > > should split these into a third category --
> > > after
> > > > we
> > > > > > > assign
> > > > > > > > > > all
> > > > > > > > > > >> > >>> stateful
> > > > > > > > > > >> > >>> > > tasks with logging, we
> > > > > > > > > > >> > >>> > > then distribute the set of logging-disabled
> > > > stateful
> > > > > > > tasks
> > > > > > > > > to
> > > > > > > > > > >> > improve
> > > > > > > > > > >> > >>> > > balance, before lastly
> > > > > > > > > > >> > >>> > > distributing stateless tasks?
> > > > > > > > > > >> > >>> > >
> > > > > > > > > > >> > >>> > > This actually leads into what I was just
> > > thinking,
> > > > > > > which is
> > > > > > > > > > >> that we
> > > > > > > > > > >> > >>> > really
> > > > > > > > > > >> > >>> > > should distinguish the
> > > > > > > > > > >> > >>> > > "catch-up" standbys from normal standbys as
> > well
> > > > as
> > > > > > > > > > >> distinguishing
> > > > > > > > > > >> > >>> > > actively processing tasks
> > > > > > > > > > >> > >>> > > from active tasks that are still in the
> > restore
> > > > > phase.
> > > > > > > It's
> > > > > > > > > > >> > somewhat
> > > > > > > > > > >> > >>> > > awkward that today, some
> > > > > > > > > > >> > >>> > > active tasks just start processing
> immediately
> > > > while
> > > > > > > others
> > > > > > > > > > >> behave
> > > > > > > > > > >> > >>> more
> > > > > > > > > > >> > >>> > > like standby than active
> > > > > > > > > > >> > >>> > > tasks for some time, before switching to
> real
> > > > > active.
> > > > > > > They
> > > > > > > > > > first
> > > > > > > > > > >> > use
> > > > > > > > > > >> > >>> the
> > > > > > > > > > >> > >>> > > restoreConsumer, then
> > > > > > > > > > >> > >>> > > later only the "normal" consumer.
> > > > > > > > > > >> > >>> > >
> > > > > > > > > > >> > >>> > > However, this restore period is still
> distinct
> > > > from
> > > > > > > normal
> > > > > > > > > > >> standbys
> > > > > > > > > > >> > >>> in a
> > > > > > > > > > >> > >>> > > lot of ways -- the code path
> > > > > > > > > > >> > >>> > > for restoring is different than for updating
> > > > > standbys,
> > > > > > > for
> > > > > > > > > > >> example
> > > > > > > > > > >> > >>> in how
> > > > > > > > > > >> > >>> > > long we block on #poll.
> > > > > > > > > > >> > >>> > > So in addition to giving them their own name
> > --
> > > > > let's
> > > > > > go
> > > > > > > > > with
> > > > > > > > > > >> > >>> restoring
> > > > > > > > > > >> > >>> > > task for now -- they really
> > > > > > > > > > >> > >>> > > do seem to deserve being their own distinct
> > > task.
> > > > We
> > > > > > can
> > > > > > > > > > >> optimize
> > > > > > > > > > >> > >>> them
> > > > > > > > > > >> > >>> > for
> > > > > > > > > > >> > >>> > > efficient conversion
> > > > > > > > > > >> > >>> > > to active tasks since we know that's what
> they
> > > > will
> > > > > > be.
> > > > > > > > > > >> > >>> > >
> > > > > > > > > > >> > >>> > > This resolves some of the awkwardness of
> > dealing
> > > > > with
> > > > > > > the
> > > > > > > > > > >> special
> > > > > > > > > > >> > >>> case
> > > > > > > > > > >> > >>> > > mentioned above: we
> > > > > > > > > > >> > >>> > > find a balanced assignment of stateful and
> > > > stateless
> > > > > > > tasks,
> > > > > > > > > > and
> > > > > > > > > > >> > >>> create
> > > > > > > > > > >> > >>> > > restoring tasks as needed.
> > > > > > > > > > >> > >>> > > If logging is disabled, no restoring task is
> > > > > created.
> > > > > > > > > > >> > >>> > >
> > > > > > > > > > >> > >>> > >
> > > > > > > > > > >> > >>> > > On Thu, Aug 8, 2019 at 3:44 PM Guozhang
> Wang <
> > > > > > > > > > >> wangg...@gmail.com>
> > > > > > > > > > >> > >>> wrote:
> > > > > > > > > > >> > >>> > >
> > > > > > > > > > >> > >>> > >> Regarding 3) above: I think for active task
> > > they
> > > > > > should
> > > > > > > > > still
> > > > > > > > > > >> be
> > > > > > > > > > >> > >>> > >> considered
> > > > > > > > > > >> > >>> > >> stateful since the processor would still
> pay
> > IO
> > > > > cost
> > > > > > > > > > accessing
> > > > > > > > > > >> the
> > > > > > > > > > >> > >>> > store,
> > > > > > > > > > >> > >>> > >> but they would not have standby tasks?
> > > > > > > > > > >> > >>> > >>
> > > > > > > > > > >> > >>> > >> On Thu, Aug 8, 2019 at 7:49 AM Bruno
> Cadonna
> > <
> > > > > > > > > > >> br...@confluent.io>
> > > > > > > > > > >> > >>> > wrote:
> > > > > > > > > > >> > >>> > >>
> > > > > > > > > > >> > >>> > >> > Hi,
> > > > > > > > > > >> > >>> > >> >
> > > > > > > > > > >> > >>> > >> > Thank you for the KIP!
> > > > > > > > > > >> > >>> > >> >
> > > > > > > > > > >> > >>> > >> > Some questions/comments:
> > > > > > > > > > >> > >>> > >> >
> > > > > > > > > > >> > >>> > >> > 1. I am wondering if the "stand-by" tasks
> > > that
> > > > > > catch
> > > > > > > up
> > > > > > > > > > state
> > > > > > > > > > >> > >>> before
> > > > > > > > > > >> > >>> > >> > the active task is switched deserve its
> own
> > > > name
> > > > > in
> > > > > > > this
> > > > > > > > > > KIP
> > > > > > > > > > >> and
> > > > > > > > > > >> > >>> maybe
> > > > > > > > > > >> > >>> > >> > in the code. We have already stated that
> > they
> > > > are
> > > > > > not
> > > > > > > > > true
> > > > > > > > > > >> > >>> stand-by
> > > > > > > > > > >> > >>> > >> > tasks, they are not configured through
> > > > > > > > > > >> `num.standby.replicas`,
> > > > > > > > > > >> > and
> > > > > > > > > > >> > >>> > >> > maybe they have also other properties
> that
> > > > > > > distinguish
> > > > > > > > > them
> > > > > > > > > > >> from
> > > > > > > > > > >> > >>> true
> > > > > > > > > > >> > >>> > >> > stand-by tasks of which we are not aware
> > yet.
> > > > For
> > > > > > > > > example,
> > > > > > > > > > >> they
> > > > > > > > > > >> > >>> may be
> > > > > > > > > > >> > >>> > >> > prioritized differently than other tasks.
> > > > > > > Furthermore,
> > > > > > > > > the
> > > > > > > > > > >> name
> > > > > > > > > > >> > >>> > >> > "stand-by" does not really fit with the
> > > planned
> > > > > > > > > > >> functionality of
> > > > > > > > > > >> > >>> those
> > > > > > > > > > >> > >>> > >> > tasks. In the following, I will call them
> > > false
> > > > > > > stand-by
> > > > > > > > > > >> tasks.
> > > > > > > > > > >> > >>> > >> >
> > > > > > > > > > >> > >>> > >> > 2. Did you consider to trigger the
> probing
> > > > > > > rebalances not
> > > > > > > > > > at
> > > > > > > > > > >> > >>> regular
> > > > > > > > > > >> > >>> > >> > time intervals but when the false
> stand-by
> > > > tasks
> > > > > > > reach an
> > > > > > > > > > >> > >>> acceptable
> > > > > > > > > > >> > >>> > >> > lag? If you did consider, could you add a
> > > > > paragraph
> > > > > > > why
> > > > > > > > > you
> > > > > > > > > > >> > >>> rejected
> > > > > > > > > > >> > >>> > >> > this idea to the "Rejected Alternatives"
> > > > section.
> > > > > > > > > > >> > >>> > >> >
> > > > > > > > > > >> > >>> > >> > 3. Are tasks that solely contain stores
> > with
> > > > > > disabled
> > > > > > > > > > logging
> > > > > > > > > > >> > >>> > >> > classified as stateful or stateless in
> the
> > > > > > > algorithm? I
> > > > > > > > > > would
> > > > > > > > > > >> > >>> guess
> > > > > > > > > > >> > >>> > >> > stateless, although if possible they
> should
> > > be
> > > > > > > assigned
> > > > > > > > > to
> > > > > > > > > > >> the
> > > > > > > > > > >> > >>> same
> > > > > > > > > > >> > >>> > >> > instance they had run before the
> rebalance.
> > > As
> > > > > far
> > > > > > > as I
> > > > > > > > > can
> > > > > > > > > > >> see
> > > > > > > > > > >> > >>> this
> > > > > > > > > > >> > >>> > >> > special case is not handled in the
> > algorithm.
> > > > > > > > > > >> > >>> > >> >
> > > > > > > > > > >> > >>> > >> > Best,
> > > > > > > > > > >> > >>> > >> > Bruno
> > > > > > > > > > >> > >>> > >> >
> > > > > > > > > > >> > >>> > >> >
> > > > > > > > > > >> > >>> > >> >
> > > > > > > > > > >> > >>> > >> > On Thu, Aug 8, 2019 at 8:24 AM Guozhang
> > Wang
> > > <
> > > > > > > > > > >> > wangg...@gmail.com>
> > > > > > > > > > >> > >>> > >> wrote:
> > > > > > > > > > >> > >>> > >> > >
> > > > > > > > > > >> > >>> > >> > > 1. Sounds good, just wanted to clarify;
> > and
> > > > it
> > > > > > may
> > > > > > > > > worth
> > > > > > > > > > >> > >>> documenting
> > > > > > > > > > >> > >>> > >> it
> > > > > > > > > > >> > >>> > >> > so
> > > > > > > > > > >> > >>> > >> > > that users would not be surprised when
> > > > > monitoring
> > > > > > > their
> > > > > > > > > > >> > >>> footprint.
> > > > > > > > > > >> > >>> > >> > >
> > > > > > > > > > >> > >>> > >> > > 2. Hmm I see... I think the trade-off
> can
> > > be
> > > > > > > described
> > > > > > > > > as
> > > > > > > > > > >> "how
> > > > > > > > > > >> > >>> much
> > > > > > > > > > >> > >>> > >> > > imbalance would bother you to be
> willing
> > to
> > > > pay
> > > > > > > another
> > > > > > > > > > >> > >>> rebalance,
> > > > > > > > > > >> > >>> > >> along
> > > > > > > > > > >> > >>> > >> > > with potentially more restoration lag",
> > and
> > > > the
> > > > > > > current
> > > > > > > > > > >> > >>> definition
> > > > > > > > > > >> > >>> > of
> > > > > > > > > > >> > >>> > >> > > rebalance_factor can be considered as a
> > > rough
> > > > > > > > > measurement
> > > > > > > > > > >> of
> > > > > > > > > > >> > >>> that
> > > > > > > > > > >> > >>> > >> > > imbalance. Of course one can argue
> that a
> > > > finer
> > > > > > > grained
> > > > > > > > > > >> > >>> measurement
> > > > > > > > > > >> > >>> > >> could
> > > > > > > > > > >> > >>> > >> > > be "resource footprint" like CPU /
> > storage
> > > of
> > > > > > each
> > > > > > > > > > instance
> > > > > > > > > > >> > >>> like we
> > > > > > > > > > >> > >>> > >> have
> > > > > > > > > > >> > >>> > >> > in
> > > > > > > > > > >> > >>> > >> > > Kafka broker auto balancing tools, but
> > I'd
> > > > > prefer
> > > > > > > not
> > > > > > > > > > doing
> > > > > > > > > > >> > >>> that as
> > > > > > > > > > >> > >>> > >> part
> > > > > > > > > > >> > >>> > >> > of
> > > > > > > > > > >> > >>> > >> > > the library but more as an operational
> > tool
> > > > in
> > > > > > the
> > > > > > > > > > future.
> > > > > > > > > > >> On
> > > > > > > > > > >> > >>> the
> > > > > > > > > > >> > >>> > >> other
> > > > > > > > > > >> > >>> > >> > > hand, I've seen stateful and stateless
> > > tasks
> > > > > > having
> > > > > > > > > very
> > > > > > > > > > >> > >>> different
> > > > > > > > > > >> > >>> > >> load,
> > > > > > > > > > >> > >>> > >> > > and sometimes the only bottleneck of a
> > > > Streams
> > > > > > app
> > > > > > > is
> > > > > > > > > > just
> > > > > > > > > > >> one
> > > > > > > > > > >> > >>> > >> stateful
> > > > > > > > > > >> > >>> > >> > > sub-topology and whoever gets tasks of
> > that
> > > > > > > > > sub-topology
> > > > > > > > > > >> > become
> > > > > > > > > > >> > >>> > >> hotspot
> > > > > > > > > > >> > >>> > >> > > (and that's why our algorithm tries to
> > > > balance
> > > > > > per
> > > > > > > > > > >> > sub-topology
> > > > > > > > > > >> > >>> as
> > > > > > > > > > >> > >>> > >> well),
> > > > > > > > > > >> > >>> > >> > > so maybe we can just consider stateful
> > > tasks
> > > > > when
> > > > > > > > > > >> calculating
> > > > > > > > > > >> > >>> this
> > > > > > > > > > >> > >>> > >> factor
> > > > > > > > > > >> > >>> > >> > > as a very brute force heuristic?
> > > > > > > > > > >> > >>> > >> > >
> > > > > > > > > > >> > >>> > >> > > 3.a. Thinking about this a bit more,
> > maybe
> > > > it's
> > > > > > > better
> > > > > > > > > > not
> > > > > > > > > > >> try
> > > > > > > > > > >> > >>> to
> > > > > > > > > > >> > >>> > >> tackle
> > > > > > > > > > >> > >>> > >> > an
> > > > > > > > > > >> > >>> > >> > > unseen enemy just yet, and observe if
> it
> > > > really
> > > > > > > emerges
> > > > > > > > > > >> later,
> > > > > > > > > > >> > >>> and
> > > > > > > > > > >> > >>> > by
> > > > > > > > > > >> > >>> > >> > then
> > > > > > > > > > >> > >>> > >> > > we may have other ways to not starving
> > the
> > > > > > standby
> > > > > > > > > tasks,
> > > > > > > > > > >> for
> > > > > > > > > > >> > >>> > >> example, by
> > > > > > > > > > >> > >>> > >> > > using dedicate threads for standby
> tasks
> > or
> > > > > even
> > > > > > > > > consider
> > > > > > > > > > >> > having
> > > > > > > > > > >> > >>> > >> higher
> > > > > > > > > > >> > >>> > >> > > priority for standby than active so
> that
> > we
> > > > > > always
> > > > > > > try
> > > > > > > > > to
> > > > > > > > > > >> > >>> caught up
> > > > > > > > > > >> > >>> > >> > standby
> > > > > > > > > > >> > >>> > >> > > first, then process active; and if
> > active's
> > > > > > lagging
> > > > > > > > > > >> compared
> > > > > > > > > > >> > to
> > > > > > > > > > >> > >>> > >> > > log-end-offset is increasing then we
> > should
> > > > > > > increase
> > > > > > > > > > >> capacity,
> > > > > > > > > > >> > >>> etc
> > > > > > > > > > >> > >>> > >> etc.
> > > > > > > > > > >> > >>> > >> > >
> > > > > > > > > > >> > >>> > >> > > 4. Actually with KIP-429 this may not
> be
> > > the
> > > > > > case:
> > > > > > > we
> > > > > > > > > may
> > > > > > > > > > >> not
> > > > > > > > > > >> > >>> call
> > > > > > > > > > >> > >>> > >> > > onPartitionsRevoked prior to rebalance
> > any
> > > > more
> > > > > > so
> > > > > > > > > would
> > > > > > > > > > >> not
> > > > > > > > > > >> > >>> transit
> > > > > > > > > > >> > >>> > >> > state
> > > > > > > > > > >> > >>> > >> > > to PARTITIONS_REVOKED, and hence not
> > cause
> > > > the
> > > > > > > state of
> > > > > > > > > > the
> > > > > > > > > > >> > >>> instance
> > > > > > > > > > >> > >>> > >> to
> > > > > > > > > > >> > >>> > >> > be
> > > > > > > > > > >> > >>> > >> > > REBALANCING. In other words, even if a
> > > > instance
> > > > > > is
> > > > > > > > > > >> undergoing
> > > > > > > > > > >> > a
> > > > > > > > > > >> > >>> > >> rebalance
> > > > > > > > > > >> > >>> > >> > > it's state may still be RUNNING and it
> > may
> > > > > still
> > > > > > be
> > > > > > > > > > >> processing
> > > > > > > > > > >> > >>> > >> records at
> > > > > > > > > > >> > >>> > >> > > the same time.
> > > > > > > > > > >> > >>> > >> > >
> > > > > > > > > > >> > >>> > >> > >
> > > > > > > > > > >> > >>> > >> > > On Wed, Aug 7, 2019 at 12:14 PM John
> > > Roesler
> > > > <
> > > > > > > > > > >> > j...@confluent.io
> > > > > > > > > > >> > >>> >
> > > > > > > > > > >> > >>> > >> wrote:
> > > > > > > > > > >> > >>> > >> > >
> > > > > > > > > > >> > >>> > >> > > > Hey Guozhang,
> > > > > > > > > > >> > >>> > >> > > >
> > > > > > > > > > >> > >>> > >> > > > Thanks for the review!
> > > > > > > > > > >> > >>> > >> > > >
> > > > > > > > > > >> > >>> > >> > > > 1. Yes, even with
> `num.standby.replicas
> > > :=
> > > > > 0`,
> > > > > > we
> > > > > > > > > will
> > > > > > > > > > >> still
> > > > > > > > > > >> > >>> > >> > temporarily
> > > > > > > > > > >> > >>> > >> > > > allocate standby tasks to accomplish
> a
> > > > > > > no-downtime
> > > > > > > > > task
> > > > > > > > > > >> > >>> migration.
> > > > > > > > > > >> > >>> > >> > > > Although, I'd argue that this doesn't
> > > > really
> > > > > > > violate
> > > > > > > > > > the
> > > > > > > > > > >> > >>> config,
> > > > > > > > > > >> > >>> > as
> > > > > > > > > > >> > >>> > >> the
> > > > > > > > > > >> > >>> > >> > > > task isn't a true hot standby. As
> soon
> > as
> > > > it
> > > > > > > catches
> > > > > > > > > > up,
> > > > > > > > > > >> > we'll
> > > > > > > > > > >> > >>> > >> > rebalance
> > > > > > > > > > >> > >>> > >> > > > again, that task will become active,
> > and
> > > > the
> > > > > > > original
> > > > > > > > > > >> > instance
> > > > > > > > > > >> > >>> > that
> > > > > > > > > > >> > >>> > >> > hosted
> > > > > > > > > > >> > >>> > >> > > > the active task will no longer have
> the
> > > > task
> > > > > > > assigned
> > > > > > > > > > at
> > > > > > > > > > >> > all.
> > > > > > > > > > >> > >>> Once
> > > > > > > > > > >> > >>> > >> the
> > > > > > > > > > >> > >>> > >> > > > stateDirCleaner kicks in, we'll free
> > the
> > > > disk
> > > > > > > space
> > > > > > > > > > from
> > > > > > > > > > >> it,
> > > > > > > > > > >> > >>> and
> > > > > > > > > > >> > >>> > >> > return to
> > > > > > > > > > >> > >>> > >> > > > the steady-state of having just one
> > copy
> > > of
> > > > > the
> > > > > > > task
> > > > > > > > > in
> > > > > > > > > > >> the
> > > > > > > > > > >> > >>> > cluster.
> > > > > > > > > > >> > >>> > >> > > >
> > > > > > > > > > >> > >>> > >> > > > We can of course do without this,
> but I
> > > > feel
> > > > > > the
> > > > > > > > > > current
> > > > > > > > > > >> > >>> proposal
> > > > > > > > > > >> > >>> > is
> > > > > > > > > > >> > >>> > >> > > > operationally preferable, since it
> > > doesn't
> > > > > make
> > > > > > > > > > >> configuring
> > > > > > > > > > >> > >>> > >> > hot-standbys a
> > > > > > > > > > >> > >>> > >> > > > pre-requisite for fast rebalances.
> > > > > > > > > > >> > >>> > >> > > >
> > > > > > > > > > >> > >>> > >> > > > 2. Yes, I think your interpretation
> is
> > > what
> > > > > we
> > > > > > > > > > intended.
> > > > > > > > > > >> The
> > > > > > > > > > >> > >>> > default
> > > > > > > > > > >> > >>> > >> > > > balance_factor would be 1, as it is
> > > > > implicitly
> > > > > > > today.
> > > > > > > > > > >> What
> > > > > > > > > > >> > >>> this
> > > > > > > > > > >> > >>> > >> does is
> > > > > > > > > > >> > >>> > >> > > > allows operators to trade off less
> > > balanced
> > > > > > > > > assignments
> > > > > > > > > > >> > >>> against
> > > > > > > > > > >> > >>> > >> fewer
> > > > > > > > > > >> > >>> > >> > > > rebalances. If you have lots of space
> > > > > capacity
> > > > > > in
> > > > > > > > > your
> > > > > > > > > > >> > >>> instances,
> > > > > > > > > > >> > >>> > >> this
> > > > > > > > > > >> > >>> > >> > may
> > > > > > > > > > >> > >>> > >> > > > be a perfectly fine tradeoff, and you
> > may
> > > > > > prefer
> > > > > > > for
> > > > > > > > > > >> Streams
> > > > > > > > > > >> > >>> not
> > > > > > > > > > >> > >>> > to
> > > > > > > > > > >> > >>> > >> > bother
> > > > > > > > > > >> > >>> > >> > > > streaming GBs of data from the broker
> > in
> > > > > > pursuit
> > > > > > > of
> > > > > > > > > > >> perfect
> > > > > > > > > > >> > >>> > balance.
> > > > > > > > > > >> > >>> > >> > Not
> > > > > > > > > > >> > >>> > >> > > > married to this configuration,
> though.
> > It
> > > > was
> > > > > > > > > inspired
> > > > > > > > > > by
> > > > > > > > > > >> > the
> > > > > > > > > > >> > >>> > >> related
> > > > > > > > > > >> > >>> > >> > work
> > > > > > > > > > >> > >>> > >> > > > research we did.
> > > > > > > > > > >> > >>> > >> > > >
> > > > > > > > > > >> > >>> > >> > > > 3. I'll take a look
> > > > > > > > > > >> > >>> > >> > > >
> > > > > > > > > > >> > >>> > >> > > > 3a. I think this is a good idea. I'd
> > > > classify
> > > > > > it
> > > > > > > as a
> > > > > > > > > > >> type
> > > > > > > > > > >> > of
> > > > > > > > > > >> > >>> grey
> > > > > > > > > > >> > >>> > >> > failure
> > > > > > > > > > >> > >>> > >> > > > detection. It may make more sense to
> > > tackle
> > > > > > grey
> > > > > > > > > > >> failures as
> > > > > > > > > > >> > >>> part
> > > > > > > > > > >> > >>> > of
> > > > > > > > > > >> > >>> > >> > the
> > > > > > > > > > >> > >>> > >> > > > heartbeat protocol (as I POCed here:
> > > > > > > > > > >> > >>> > >> > > >
> > > > > > https://github.com/apache/kafka/pull/7096/files
> > > > > > > ).
> > > > > > > > > > WDYT?
> > > > > > > > > > >> > >>> > >> > > >
> > > > > > > > > > >> > >>> > >> > > > 4. Good catch! I didn't think about
> > that
> > > > > > before.
> > > > > > > > > > Looking
> > > > > > > > > > >> at
> > > > > > > > > > >> > it
> > > > > > > > > > >> > >>> > now,
> > > > > > > > > > >> > >>> > >> > though,
> > > > > > > > > > >> > >>> > >> > > > I wonder if we're actually protected
> > > > already.
> > > > > > The
> > > > > > > > > > >> > >>> stateDirCleaner
> > > > > > > > > > >> > >>> > >> > thread
> > > > > > > > > > >> > >>> > >> > > > only executes if the instance is in
> > > RUNNING
> > > > > > > state,
> > > > > > > > > and
> > > > > > > > > > >> > KIP-441
> > > > > > > > > > >> > >>> > >> > proposes to
> > > > > > > > > > >> > >>> > >> > > > use "probing rebalances" to report
> task
> > > > lag.
> > > > > > > Hence,
> > > > > > > > > > >> during
> > > > > > > > > > >> > the
> > > > > > > > > > >> > >>> > >> window
> > > > > > > > > > >> > >>> > >> > > > between when the instance reports a
> lag
> > > and
> > > > > the
> > > > > > > > > > assignor
> > > > > > > > > > >> > >>> makes a
> > > > > > > > > > >> > >>> > >> > decision
> > > > > > > > > > >> > >>> > >> > > > about it, the instance should remain
> in
> > > > > > > REBALANCING
> > > > > > > > > > >> state,
> > > > > > > > > > >> > >>> right?
> > > > > > > > > > >> > >>> > If
> > > > > > > > > > >> > >>> > >> > so,
> > > > > > > > > > >> > >>> > >> > > > then this should prevent the race
> > > > condition.
> > > > > If
> > > > > > > not,
> > > > > > > > > > >> then we
> > > > > > > > > > >> > >>> do
> > > > > > > > > > >> > >>> > >> indeed
> > > > > > > > > > >> > >>> > >> > need
> > > > > > > > > > >> > >>> > >> > > > to do something about it.
> > > > > > > > > > >> > >>> > >> > > >
> > > > > > > > > > >> > >>> > >> > > > 5. Good idea. I think that today, you
> > can
> > > > > only
> > > > > > > see
> > > > > > > > > the
> > > > > > > > > > >> > >>> consumer
> > > > > > > > > > >> > >>> > lag,
> > > > > > > > > > >> > >>> > >> > which
> > > > > > > > > > >> > >>> > >> > > > is a poor substitute. I'll add some
> > > metrics
> > > > > to
> > > > > > > the
> > > > > > > > > > >> proposal.
> > > > > > > > > > >> > >>> > >> > > >
> > > > > > > > > > >> > >>> > >> > > > Thanks again for the comments!
> > > > > > > > > > >> > >>> > >> > > > -John
> > > > > > > > > > >> > >>> > >> > > >
> > > > > > > > > > >> > >>> > >> > > > On Tue, Aug 6, 2019 at 4:27 PM
> Guozhang
> > > > Wang
> > > > > <
> > > > > > > > > > >> > >>> wangg...@gmail.com>
> > > > > > > > > > >> > >>> > >> > wrote:
> > > > > > > > > > >> > >>> > >> > > >
> > > > > > > > > > >> > >>> > >> > > > > Hello Sophie,
> > > > > > > > > > >> > >>> > >> > > > >
> > > > > > > > > > >> > >>> > >> > > > > Thanks for the proposed KIP. I left
> > > some
> > > > > > > comments
> > > > > > > > > on
> > > > > > > > > > >> the
> > > > > > > > > > >> > >>> wiki
> > > > > > > > > > >> > >>> > >> itself,
> > > > > > > > > > >> > >>> > >> > > > and I
> > > > > > > > > > >> > >>> > >> > > > > think I'm still not very clear on a
> > > > couple
> > > > > or
> > > > > > > > > those:
> > > > > > > > > > >> > >>> > >> > > > >
> > > > > > > > > > >> > >>> > >> > > > > 1. With this proposal, does that
> mean
> > > > with
> > > > > > > > > > >> > >>> num.standby.replicas
> > > > > > > > > > >> > >>> > ==
> > > > > > > > > > >> > >>> > >> > 0, we
> > > > > > > > > > >> > >>> > >> > > > > may sometimes still have some
> standby
> > > > tasks
> > > > > > > which
> > > > > > > > > may
> > > > > > > > > > >> > >>> violate
> > > > > > > > > > >> > >>> > the
> > > > > > > > > > >> > >>> > >> > config?
> > > > > > > > > > >> > >>> > >> > > > >
> > > > > > > > > > >> > >>> > >> > > > > 2. I think I understand the
> rationale
> > > to
> > > > > > > consider
> > > > > > > > > > lags
> > > > > > > > > > >> > that
> > > > > > > > > > >> > >>> is
> > > > > > > > > > >> > >>> > >> below
> > > > > > > > > > >> > >>> > >> > the
> > > > > > > > > > >> > >>> > >> > > > > specified threshold to be equal,
> > rather
> > > > > than
> > > > > > > still
> > > > > > > > > > >> > >>> considering
> > > > > > > > > > >> > >>> > >> 5000
> > > > > > > > > > >> > >>> > >> > is
> > > > > > > > > > >> > >>> > >> > > > > better than 5001 -- we do not want
> to
> > > > > > > > > "over-optimize"
> > > > > > > > > > >> and
> > > > > > > > > > >> > >>> > >> potentially
> > > > > > > > > > >> > >>> > >> > > > falls
> > > > > > > > > > >> > >>> > >> > > > > into endless rebalances back and
> > forth.
> > > > > > > > > > >> > >>> > >> > > > >
> > > > > > > > > > >> > >>> > >> > > > > But I'm not clear about the
> rationale
> > > of
> > > > > the
> > > > > > > second
> > > > > > > > > > >> > >>> parameter of
> > > > > > > > > > >> > >>> > >> > > > >
> > > > > > > > > > >> > >>>
> > > > > > > constrainedBalancedAssignment(StatefulTasksToRankedCandidates,
> > > > > > > > > > >> > >>> > >> > > > > balance_factor):
> > > > > > > > > > >> > >>> > >> > > > >
> > > > > > > > > > >> > >>> > >> > > > > Does that mean, e.g. with
> > > balance_factor
> > > > of
> > > > > > 3,
> > > > > > > we'd
> > > > > > > > > > >> > >>> consider two
> > > > > > > > > > >> > >>> > >> > > > > assignments one resulting
> > > balance_factor
> > > > 0
> > > > > > and
> > > > > > > one
> > > > > > > > > > >> > resulting
> > > > > > > > > > >> > >>> > >> > > > balance_factor
> > > > > > > > > > >> > >>> > >> > > > > 3 to be equally optimized
> assignment
> > > and
> > > > > > > therefore
> > > > > > > > > > may
> > > > > > > > > > >> > "stop
> > > > > > > > > > >> > >>> > >> early"?
> > > > > > > > > > >> > >>> > >> > This
> > > > > > > > > > >> > >>> > >> > > > > was not very convincing to me :P
> > > > > > > > > > >> > >>> > >> > > > >
> > > > > > > > > > >> > >>> > >> > > > > 3. There are a couple of minor
> > comments
> > > > > about
> > > > > > > the
> > > > > > > > > > >> > algorithm
> > > > > > > > > > >> > >>> > >> itself,
> > > > > > > > > > >> > >>> > >> > left
> > > > > > > > > > >> > >>> > >> > > > on
> > > > > > > > > > >> > >>> > >> > > > > the wiki page since it needs to
> refer
> > > to
> > > > > the
> > > > > > > exact
> > > > > > > > > > line
> > > > > > > > > > >> > and
> > > > > > > > > > >> > >>> > better
> > > > > > > > > > >> > >>> > >> > > > > displayed there.
> > > > > > > > > > >> > >>> > >> > > > >
> > > > > > > > > > >> > >>> > >> > > > > 3.a Another wild thought about the
> > > > > threshold
> > > > > > > > > itself:
> > > > > > > > > > >> today
> > > > > > > > > > >> > >>> the
> > > > > > > > > > >> > >>> > >> > assignment
> > > > > > > > > > >> > >>> > >> > > > > itself is memoryless, so we would
> not
> > > > know
> > > > > if
> > > > > > > the
> > > > > > > > > > >> reported
> > > > > > > > > > >> > >>> > >> `TaskLag`
> > > > > > > > > > >> > >>> > >> > > > itself
> > > > > > > > > > >> > >>> > >> > > > > is increasing or decreasing even if
> > the
> > > > > > current
> > > > > > > > > value
> > > > > > > > > > >> is
> > > > > > > > > > >> > >>> under
> > > > > > > > > > >> > >>> > the
> > > > > > > > > > >> > >>> > >> > > > > threshold. I wonder if it worthy to
> > > make
> > > > > it a
> > > > > > > bit
> > > > > > > > > > more
> > > > > > > > > > >> > >>> > >> complicated to
> > > > > > > > > > >> > >>> > >> > > > track
> > > > > > > > > > >> > >>> > >> > > > > task lag trend at the assignor?
> > > > Practically
> > > > > > it
> > > > > > > may
> > > > > > > > > > not
> > > > > > > > > > >> be
> > > > > > > > > > >> > >>> very
> > > > > > > > > > >> > >>> > >> > uncommon
> > > > > > > > > > >> > >>> > >> > > > > that stand-by tasks are not keeping
> > up
> > > > due
> > > > > to
> > > > > > > the
> > > > > > > > > > fact
> > > > > > > > > > >> > that
> > > > > > > > > > >> > >>> > other
> > > > > > > > > > >> > >>> > >> > active
> > > > > > > > > > >> > >>> > >> > > > > tasks hosted on the same thread is
> > > > starving
> > > > > > the
> > > > > > > > > > standby
> > > > > > > > > > >> > >>> tasks.
> > > > > > > > > > >> > >>> > >> > > > >
> > > > > > > > > > >> > >>> > >> > > > > 4. There's a potential race
> condition
> > > > risk
> > > > > > when
> > > > > > > > > > >> reporting
> > > > > > > > > > >> > >>> > >> `TaskLags`
> > > > > > > > > > >> > >>> > >> > in
> > > > > > > > > > >> > >>> > >> > > > the
> > > > > > > > > > >> > >>> > >> > > > > subscription: right after reporting
> > it
> > > to
> > > > > the
> > > > > > > > > leader,
> > > > > > > > > > >> the
> > > > > > > > > > >> > >>> > cleanup
> > > > > > > > > > >> > >>> > >> > thread
> > > > > > > > > > >> > >>> > >> > > > > kicks in and deletes the state
> > > directory.
> > > > > If
> > > > > > > the
> > > > > > > > > task
> > > > > > > > > > >> was
> > > > > > > > > > >> > >>> > assigned
> > > > > > > > > > >> > >>> > >> > to the
> > > > > > > > > > >> > >>> > >> > > > > host it would cause it to restore
> > from
> > > > > > > beginning
> > > > > > > > > and
> > > > > > > > > > >> > >>> effectively
> > > > > > > > > > >> > >>> > >> > make the
> > > > > > > > > > >> > >>> > >> > > > > seemingly optimized assignment very
> > > > > > > sub-optimal.
> > > > > > > > > > >> > >>> > >> > > > >
> > > > > > > > > > >> > >>> > >> > > > > To be on the safer side we should
> > > > consider
> > > > > > > either
> > > > > > > > > > prune
> > > > > > > > > > >> > out
> > > > > > > > > > >> > >>> > those
> > > > > > > > > > >> > >>> > >> > tasks
> > > > > > > > > > >> > >>> > >> > > > > that are "close to be cleaned up"
> in
> > > the
> > > > > > > > > > subscription,
> > > > > > > > > > >> or
> > > > > > > > > > >> > we
> > > > > > > > > > >> > >>> > >> should
> > > > > > > > > > >> > >>> > >> > delay
> > > > > > > > > > >> > >>> > >> > > > > the cleanup right after we've
> > included
> > > > them
> > > > > > in
> > > > > > > the
> > > > > > > > > > >> > >>> subscription
> > > > > > > > > > >> > >>> > in
> > > > > > > > > > >> > >>> > >> > case
> > > > > > > > > > >> > >>> > >> > > > > they are been selected as assigned
> > > tasks
> > > > by
> > > > > > the
> > > > > > > > > > >> assignor.
> > > > > > > > > > >> > >>> > >> > > > >
> > > > > > > > > > >> > >>> > >> > > > > 5. This is a meta comment: I think
> it
> > > > would
> > > > > > be
> > > > > > > > > > helpful
> > > > > > > > > > >> to
> > > > > > > > > > >> > >>> add
> > > > > > > > > > >> > >>> > some
> > > > > > > > > > >> > >>> > >> > user
> > > > > > > > > > >> > >>> > >> > > > > visibility on the standby tasks
> > lagging
> > > > as
> > > > > > > well,
> > > > > > > > > via
> > > > > > > > > > >> > >>> metrics for
> > > > > > > > > > >> > >>> > >> > example.
> > > > > > > > > > >> > >>> > >> > > > > Today it is hard for us to observe
> > how
> > > > far
> > > > > > are
> > > > > > > our
> > > > > > > > > > >> current
> > > > > > > > > > >> > >>> > standby
> > > > > > > > > > >> > >>> > >> > tasks
> > > > > > > > > > >> > >>> > >> > > > > compared to the active tasks and
> > > whether
> > > > > that
> > > > > > > lag
> > > > > > > > > is
> > > > > > > > > > >> being
> > > > > > > > > > >> > >>> > >> > increasing or
> > > > > > > > > > >> > >>> > >> > > > > decreasing. As a follow-up task,
> for
> > > > > example,
> > > > > > > the
> > > > > > > > > > >> > rebalance
> > > > > > > > > > >> > >>> > should
> > > > > > > > > > >> > >>> > >> > also
> > > > > > > > > > >> > >>> > >> > > > be
> > > > > > > > > > >> > >>> > >> > > > > triggered if we realize that some
> > > standby
> > > > > > > task's
> > > > > > > > > lag
> > > > > > > > > > is
> > > > > > > > > > >> > >>> > increasing
> > > > > > > > > > >> > >>> > >> > > > > indefinitely means that it cannot
> > keep
> > > up
> > > > > > > (which is
> > > > > > > > > > >> > another
> > > > > > > > > > >> > >>> > >> indicator
> > > > > > > > > > >> > >>> > >> > > > > either you need to add more
> resources
> > > > with
> > > > > > the
> > > > > > > > > > >> > num.standbys
> > > > > > > > > > >> > >>> or
> > > > > > > > > > >> > >>> > >> your
> > > > > > > > > > >> > >>> > >> > are
> > > > > > > > > > >> > >>> > >> > > > > still not balanced enough).
> > > > > > > > > > >> > >>> > >> > > > >
> > > > > > > > > > >> > >>> > >> > > > >
> > > > > > > > > > >> > >>> > >> > > > > On Tue, Aug 6, 2019 at 1:32 PM
> Sophie
> > > > > > > Blee-Goldman
> > > > > > > > > <
> > > > > > > > > > >> > >>> > >> > sop...@confluent.io>
> > > > > > > > > > >> > >>> > >> > > > > wrote:
> > > > > > > > > > >> > >>> > >> > > > >
> > > > > > > > > > >> > >>> > >> > > > > > Hey all,
> > > > > > > > > > >> > >>> > >> > > > > >
> > > > > > > > > > >> > >>> > >> > > > > > I'd like to kick off discussion
> on
> > > > > KIP-441,
> > > > > > > aimed
> > > > > > > > > > at
> > > > > > > > > > >> the
> > > > > > > > > > >> > >>> long
> > > > > > > > > > >> > >>> > >> > restore
> > > > > > > > > > >> > >>> > >> > > > > times
> > > > > > > > > > >> > >>> > >> > > > > > in Streams during which further
> > > active
> > > > > > > processing
> > > > > > > > > > >> and IQ
> > > > > > > > > > >> > >>> are
> > > > > > > > > > >> > >>> > >> > blocked.
> > > > > > > > > > >> > >>> > >> > > > > > Please give it a read and let us
> > know
> > > > > your
> > > > > > > > > thoughts
> > > > > > > > > > >> > >>> > >> > > > > >
> > > > > > > > > > >> > >>> > >> > > > > >
> > > > > > > > > > >> > >>> > >> > > > > >
> > > > > > > > > > >> > >>> > >> > > > >
> > > > > > > > > > >> > >>> > >> > > >
> > > > > > > > > > >> > >>> > >> >
> > > > > > > > > > >> > >>> > >>
> > > > > > > > > > >> > >>> >
> > > > > > > > > > >> > >>>
> > > > > > > > > > >> >
> > > > > > > > > > >>
> > > > > > > > > >
> > > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-441:+Smooth+Scaling+Out+for+Kafka+Streams
> > > > > > > > > > >> > >>> > >> > > > > >
> > > > > > > > > > >> > >>> > >> > > > > > Cheers,
> > > > > > > > > > >> > >>> > >> > > > > > Sophie
> > > > > > > > > > >> > >>> > >> > > > > >
> > > > > > > > > > >> > >>> > >> > > > >
> > > > > > > > > > >> > >>> > >> > > > >
> > > > > > > > > > >> > >>> > >> > > > > --
> > > > > > > > > > >> > >>> > >> > > > > -- Guozhang
> > > > > > > > > > >> > >>> > >> > > > >
> > > > > > > > > > >> > >>> > >> > > >
> > > > > > > > > > >> > >>> > >> > >
> > > > > > > > > > >> > >>> > >> > >
> > > > > > > > > > >> > >>> > >> > > --
> > > > > > > > > > >> > >>> > >> > > -- Guozhang
> > > > > > > > > > >> > >>> > >> >
> > > > > > > > > > >> > >>> > >>
> > > > > > > > > > >> > >>> > >>
> > > > > > > > > > >> > >>> > >> --
> > > > > > > > > > >> > >>> > >> -- Guozhang
> > > > > > > > > > >> > >>> > >>
> > > > > > > > > > >> > >>> > >
> > > > > > > > > > >> > >>> >
> > > > > > > > > > >> > >>>
> > > > > > > > > > >> > >>>
> > > > > > > > > > >> > >>> --
> > > > > > > > > > >> > >>> -- Guozhang
> > > > > > > > > > >> > >>>
> > > > > > > > > > >> > >>
> > > > > > > > > > >> >
> > > > > > > > > > >>
> > > > > > > > > > >>
> > > > > > > > > > >> --
> > > > > > > > > > >> -- Guozhang
> > > > > > > > > > >>
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > --
> > > > > > > > > -- Guozhang
> > > > > > > > >
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
> --
> -- Guozhang
>

Reply via email to