I see; thanks for the clarification, Guozhang. If the prior owner of a non-logged, stateful task always reports 0, and all other instances report nothing, then it seems like we need a special case in the assignor to handle assigning these. I.e., the strategy of creating "movement" standbys and waiting for them to be equally caught up to the leader before moving the task with zero downtime doesn't work. Since no standby tasks are possible for non-logged tasks, the other instances would never be able to "catch up", and the algorithm would be doomed to just continue pinning the task to its current instance, and would potentially never be able to balance the cluster.
Unless, of course, we put in a special case that says to keep it assigned for a while and then move it. But in this case, how do we decide when to move it; it seems like any time is as good as any other. And if this is the case, then we might as well just move it right away, which is what the KIP currently proposes. Also, it seems like this creates an asymmetry with the handling of non-logged stores in mixed logged/non-logged tasks (case (1) above). In that case, we agree that the algorithm will make assignments without regard to the non-logged stores at all, and would freely move the task between two instances that are "equally" caught up on the logged stores. But if a task contains only non-logged stores, it sounds like we would not have this freedom of movement, but would just keep giving it back to the instance that previously owned it? Regarding the latter comment, thanks for pointing out my oversight. I've added the section "Iterative Balancing Assignments" to define this behavior: https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP-441:SmoothScalingOutforKafkaStreams-IterativeBalancingAssignments . Thanks! -John On Wed, Sep 4, 2019 at 5:58 PM Guozhang Wang <wangg...@gmail.com> wrote: > Hi John, > > I think I may mis-communicate my ideas a bit, what I meant is simply for > case 1), let the active tasks reporting a lag of 0 instead of not reporting > any lags at all; in addition, for other hosts not hosting this tasks, and > hence we do not know the offset lag at all we set the > "StatefulTasksToRankedCandidates[task][1] > := instance" just to make sure the current host is ranked the first while > all others are ranked equally after it. In this case we would still favor > stickiness for not moving such tasks out of its current host unless it > violates balance, in which case we are free to move it to any other hosts. > Does that make sense? > > Also I just realized that in the update wiki page the algorithm section of > iteratively assigning / moving tasks based on > StatefulTasksToRankedCandidates until convergence was missing (it was there > in the previous version), could you add it back? > > Guozhang > > > On Wed, Sep 4, 2019 at 2:59 PM John Roesler <j...@confluent.io> wrote: > > > Thanks for the reply, Guozhang, > > > > I'll start by re-stating your cases, just to make sure we're on the same > > page... > > > > 1) The task is stateful, and all its stores are non-logged. For this case > > under the proposal, we would not have any standbys and the active version > > would actually not report any lag at all (not a lag of 0). Thus, all > > instances would be considered equal when it comes to assignment, although > > the assignment logic would know that the task is "heavy" because it has > > those non-logged stores and factor that in to the overall cluster > balance. > > > > 2) The task is stateful and maybe has one logged and one non-logged > store. > > As you say, in this case, the non-logged store would not contribute at > all > > to anyone's reported lag. The active task would report a lag of 0, and > the > > standbys would report their lag on the logged store. The assignment logic > > would take these reported lags into account. > > > > It sounds like there might be some dissonance on point (1). It sounds > like > > you're saying we should try to assign non-logged stateful tasks back to > an > > instance that has previously hosted it, or maybe we should assign it back > > to the most recent instance that hosted it, and then only reassign it if > > the movement would affect balance. I guess I'm not opposed to this, but I > > also don't really see the advantage. The fact is that we'd still wind up > > migrating it much of the time, so no one could depend on it not getting > > migrated, and at the same time we're paying a significant complexity cost > > in the assignor to support this case. Plus, we need to encode the > previous > > owner of the task somehow in the wire protocol, which means we pay a > > payload-size penalty to support it as well. > > > > On the other hand, we should make our assignment as stable as possible in > > the implementation to avoid pointless "swapping" of logged and non-logged > > stateful tasks, as well as stateless tasks when it doesn't change the > > balance at all to do so. It seems like this should accomplish the same > > spiritual goal with no special cases. > > > > WDYT? > > -John > > > > On Wed, Sep 4, 2019 at 2:26 PM Guozhang Wang <wangg...@gmail.com> wrote: > > > > > Hello John, > > > > > > Your reasoning about non-logged state store looks good to me. But I > think > > > it worth considering two cases differently: 1) a task's all state > stores > > > are non-logged, 2) a task has non-logged state store but also have > other > > > logged state store. > > > > > > For 1), we should not have any standbys for such tasks at all, but it > is > > > not the same as a stateless task, because the only active task could > > report > > > a lag of "0" while all others should logically report the lag as > infinity > > > (of course, they would not encode such value). > > > > > > For 2), it is sort of the same to a normal case: active task reporting > a > > > lag of 0 while standby task reporting a lag summing all other logged > > state > > > stores, meaning that the non-logged state store does not matter in our > > > assignment semantics. > > > > > > The rationale I had is to effectively favor stickiness for those tasks > in > > > case 1) than arbitrarily reassign them even though it is true that for > a > > > non-logged store there's no durability and hence restoration guarantees > > > anyways from Streams. Then algorithmically we may consider assigning > > tasks > > > with descending order of the number of instances reporting lags for it, > > > then for such tasks there's always one candidate reporting a lag of 0, > > and > > > assigning them to any instance does not actually change the cumulated > > total > > > lag either. At that time we can decide "if it is still within load > > balance > > > factor, then let's respect stickiness, otherwise it's free to move". > > WDYT? > > > > > > > > > Guozhang > > > > > > On Wed, Sep 4, 2019 at 8:30 AM John Roesler <j...@confluent.io> wrote: > > > > > > > Hey Bruno, > > > > > > > > Thanks for taking another look. Some quick responses: > > > > > > > > 1) It just means the number of offsets in the topic. E.g., the LSO is > > > 100, > > > > but the first offset is 40 due to retention, so there are 60 offsets > in > > > the > > > > topic. Further, the lag on that topic would be considered to be 60 > for > > > any > > > > task that hadn't previously done any work on it. > > > > > > > > 2) This is undecidable in general. I.e., there's no way we can know > > > whether > > > > the store is remote or not, and hence whether we can freely assign it > > to > > > > another instance, or whether we have to keep it on the same instance. > > > > However, there are a couple of reasons to go ahead and assume we have > > the > > > > freedom to move such tasks. > > > > * We know that nothing can prevent the loss of an instance in a > cluster > > > > (I.e., this is true of all cloud environments, as well as any managed > > > > virtualized cluster like mesos or kubernetes), so any Streams program > > > that > > > > makes use of non-remote, non-logged state is doomed to lose its state > > > when > > > > it loses an instance. > > > > * If we take on a restriction that we cannot move such state between > > > > instances, we'd become overconstrained very quickly. Effectively, if > > you > > > > made use of non-logged stores, and we didn't assume freedom of > > movement, > > > > then we couldn't make use of any new instances in your cluster. > > > > * On the other hand, if we optimistically assume we can't move state, > > but > > > > only reassign it when we lose an instance, then we're supporting > > > > non-deterministic logic, because the program would produce different > > > > results, depending on whether you lost a node during the execution or > > > not. > > > > 2b) That last point goes along with your side note. I'm not sure if > we > > > > should bother dropping such state on every reassignment, though. It > > seems > > > > to be undefined territory enough that we can just do the simplest > thing > > > and > > > > assume people have made their own (external) provisions for > durability. > > > > I.e., when we say "non-logged", we mean that it doesn't make use of > > _our_ > > > > durability mechanism. I'm arguing that the only sane assumption is > that > > > > such folks have opted to use their own durability measures, and we > > should > > > > just assume it works with no special considerations in the assignment > > > > algorithm. > > > > > > > > 3) Good catch! I've fixed it. > > > > > > > > Thanks again! > > > > -John > > > > > > > > On Wed, Sep 4, 2019 at 6:09 AM Bruno Cadonna <br...@confluent.io> > > wrote: > > > > > > > > > Hi, > > > > > > > > > > 1) What do you mean with "full set of offsets in the topic"? Is > this > > > > > the sum of all offsets of the changelog partitions of the task? > > > > > > > > > > 2) I am not sure whether non-logged stateful tasks should be > > > > > effectively treated as stateless tasks during assignment. First we > > > > > need to decide whether a non-logged stateful task should preferably > > be > > > > > assigned to the same instance on which it just run in order to > > > > > continue to use its state or not. > > > > > > > > > > 3) In the example, you define stand-by tasks {S1, S2, ...} but > never > > > > > use them, because below you use a dedicated row for stand-by tasks. > > > > > > > > > > As a side note to 2) since it is not directly related to this KIP: > We > > > > > should decide if we want to avoid the possible non-determinism > > > > > introduced by non-logged stores or not. That is, if an instance > hosts > > > > > a task with non-logged stores then we can have two cases after the > > > > > next rebalance: a) the task stays on the same instance and > continues > > > > > to use the same state store as used so far or b) the task is > assigned > > > > > to another instance and it starts an empty state store. The > produced > > > > > results for these two cases might differ. To avoid the > > nondeterminism, > > > > > non-logged state stores would need to be wiped out before > assignment. > > > > > Then the question arises, how the removal of non-logged state > stores > > > > > before assignment would affect backward-compatibility. > > > > > > > > > > Best, > > > > > Bruno > > > > > > > > > > On Wed, Aug 21, 2019 at 11:40 PM John Roesler <j...@confluent.io> > > > wrote: > > > > > > > > > > > > Hi Guozhang, > > > > > > > > > > > > > My impression from your previous email is that inside the > > algorithm > > > > > when > > > > > > we > > > > > > are "filling" them to instances some deterministic logic would be > > > used > > > > to > > > > > > avoid the above case, is that correct? > > > > > > > > > > > > Yes, that was my plan, but I didn't formalize it. There was a > > > > requirement > > > > > > that the assignment algorithm must not produce a new assignment > if > > > the > > > > > > current assignment is already balanced, so at the least, any > > > thrashing > > > > > > would be restricted to the "balancing" phase while tasks are > moving > > > > > around > > > > > > the cluster. > > > > > > > > > > > > Anyway, I think it would be good to say that we'll "try to" > produce > > > > > stable > > > > > > assignments, so I've added a "should" clause to the assignment > > spec: > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP-441:SmoothScalingOutforKafkaStreams-AssignmentAlgorithm > > > > > > > > > > > > For example, we would sort the stateless tasks and available > > > instances > > > > > > before assigning them, so that the stateless task assignment > would > > > > mostly > > > > > > stay stable between assignments, modulo the compute capacity of > the > > > > > > instances changing a little as active stateful tasks get assigned > > in > > > > more > > > > > > balanced ways. > > > > > > > > > > > > Thanks, > > > > > > -John > > > > > > > > > > > > > > > > > > On Wed, Aug 21, 2019 at 1:55 PM Guozhang Wang < > wangg...@gmail.com> > > > > > wrote: > > > > > > > > > > > > > Hello John, > > > > > > > > > > > > > > That sounds reasonable. Just double checked the code that with > > > > logging > > > > > > > disabled the corresponding checkpoint file would not contain > any > > > > > values, > > > > > > > just like a stateless task. So I think treating them logically > > the > > > > > same is > > > > > > > fine. > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > > > > On Wed, Aug 21, 2019 at 11:41 AM John Roesler < > j...@confluent.io > > > > > > > > wrote: > > > > > > > > > > > > > > > Hi again, Guozhang, > > > > > > > > > > > > > > > > While writing up the section on stateless tasks ( > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP-441:SmoothScalingOutforKafkaStreams-Statelesstasks > > > > > > > > ), > > > > > > > > I reconsidered whether stateful, but non-logged, tasks should > > > > > actually > > > > > > > > report a lag of zero, versus not reporting any lag. By the > > > > > definition of > > > > > > > > the "StatefulTasksToRankedCandidates" function, the leader > > would > > > > > compute > > > > > > > a > > > > > > > > lag of zero for these tasks anyway. > > > > > > > > > > > > > > > > Therefore, I think the same reasoning that I supplied you for > > > > > stateless > > > > > > > > tasks applies, since the member and leader will agree on a > lag > > of > > > > > zero > > > > > > > > anyway, we can avoid adding them to the "Task Lags" map, and > > save > > > > > some > > > > > > > > bytes in the JoinGroup request. This would be especially > > > beneficial > > > > > in an > > > > > > > > application that uses remote stores for _all_ its state > stores, > > > it > > > > > would > > > > > > > > have an extremely lightweight JoinGroup request, with no task > > > lags > > > > at > > > > > > > all. > > > > > > > > > > > > > > > > WDYT? > > > > > > > > -John > > > > > > > > > > > > > > > > On Wed, Aug 21, 2019 at 1:17 PM John Roesler < > > j...@confluent.io> > > > > > wrote: > > > > > > > > > > > > > > > > > Thanks, Guozhang. > > > > > > > > > > > > > > > > > > (Side note: I noticed on another pass over the discussion > > that > > > > I'd > > > > > > > missed > > > > > > > > > addressing your comment about the potential race condition > > > > between > > > > > > > state > > > > > > > > > cleanup and lag-based assignment. I've added a solution to > > the > > > > > > > proposal: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP-441:SmoothScalingOutforKafkaStreams-Racebetweenassignmentandstatecleanup > > > > > > > > > ) > > > > > > > > > > > > > > > > > > In the JoinGroup (SubscriptionInfo) metadata, stateless > tasks > > > are > > > > > not > > > > > > > > > represented at all. This should save us some bytes in the > > > request > > > > > > > > metadata. > > > > > > > > > If we treated them like non-logged stateful tasks and > > reported > > > a > > > > > lag of > > > > > > > > 0, > > > > > > > > > the only difference is that the assignor would be able to > > tell > > > > > which > > > > > > > > > members previously hosted that stateless task. > > > > > > > > > > > > > > > > > > I'd like to make a simplifying assumption that stateless > > tasks > > > > can > > > > > just > > > > > > > > be > > > > > > > > > freely reassigned with no regard to stickiness at all, > > without > > > > > > > impacting > > > > > > > > > performance. This is almost true. In fact, while assigned a > > > > > stateless > > > > > > > > task, > > > > > > > > > a member fetches batches of records from the broker, so if > we > > > > move > > > > > the > > > > > > > > > stateless task assignment, this buffered input is wasted > and > > > just > > > > > gets > > > > > > > > > dropped. > > > > > > > > > > > > > > > > > > However, we won't be moving the stateless tasks around all > > the > > > > time > > > > > > > (just > > > > > > > > > during rebalances), and we have the requirement that the > > > > assigment > > > > > > > > > algorithm must stabilize to guard against perpetually > > > shuffling a > > > > > > > > stateless > > > > > > > > > task from one node to another. So, my hope is that this > small > > > > > amount of > > > > > > > > > inefficiency would not be a performance-dominating factor. > In > > > > > exchange, > > > > > > > > we > > > > > > > > > gain the opportunity for the assignment algorithm to use > the > > > > > stateless > > > > > > > > > tasks as "filler" during unbalanced assignments. For > example, > > > if > > > > > there > > > > > > > > is a > > > > > > > > > node that is just warming up with several standby tasks, > > maybe > > > > the > > > > > > > > > assignment can give more stateless tasks to that node to > > > balance > > > > > the > > > > > > > > > computational load across the cluster. > > > > > > > > > > > > > > > > > > It's worth noting that such an assignment would still not > be > > > > > considered > > > > > > > > > "balanced", so the ultimately balanced final state of the > > > > > assignment > > > > > > > > (after > > > > > > > > > task movements) would still have the desired property that > > each > > > > > > > stateful > > > > > > > > > and stateless task is evenly spread across the cluster. > > > > > > > > > > > > > > > > > > Does that seem reasonable? > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > -John > > > > > > > > > > > > > > > > > > On Wed, Aug 21, 2019 at 11:22 AM Guozhang Wang < > > > > wangg...@gmail.com > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > >> Hello John, > > > > > > > > >> > > > > > > > > >> I've made another pass on the wiki page again, overall > LGTM. > > > One > > > > > meta > > > > > > > > >> comment about the "stateless" tasks: how do we represent > > them > > > in > > > > > the > > > > > > > > >> metadata? Are they just treated as stateful tasks with > > logging > > > > > > > disabled, > > > > > > > > >> or > > > > > > > > >> are specially handled? It is not very clear in the > > > description. > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> Guozhang > > > > > > > > >> > > > > > > > > >> On Wed, Aug 21, 2019 at 8:43 AM John Roesler < > > > j...@confluent.io > > > > > > > > > > > > wrote: > > > > > > > > >> > > > > > > > > >> > I have also specifically called out that the assignment > > must > > > > > achieve > > > > > > > > >> both > > > > > > > > >> > "instance" and "task" balance: > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP-441:SmoothScalingOutforKafkaStreams-Defining%22balance%22 > > > > > > > > >> > > > > > > > > > >> > I've also addressed the problem of state stores with > > logging > > > > > > > disabled: > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP-441:SmoothScalingOutforKafkaStreams-Statewithloggingdisabled > > > > > > > > >> > > > > > > > > > >> > I believe this addresses all the concerns that have been > > > > raised > > > > > to > > > > > > > > date. > > > > > > > > >> > Apologies if I've overlooked one of your concerns. > > > > > > > > >> > > > > > > > > > >> > Please give the KIP another read and let me know of any > > > > further > > > > > > > > >> thoughts! > > > > > > > > >> > Hopefully, we can start the voting on this KIP by the > end > > of > > > > the > > > > > > > week. > > > > > > > > >> > > > > > > > > > >> > Thanks, > > > > > > > > >> > -John > > > > > > > > >> > > > > > > > > > >> > On Tue, Aug 20, 2019 at 5:16 PM John Roesler < > > > > j...@confluent.io > > > > > > > > > > > > > > wrote: > > > > > > > > >> > > > > > > > > > >> > > In response to Bruno's concern #2, I've also added > that > > > > > section to > > > > > > > > the > > > > > > > > >> > > "Rejected Alternatives" section. > > > > > > > > >> > > > > > > > > > > >> > > Additionally, after reviewing some other assignment > > > papers, > > > > > I've > > > > > > > > >> > developed > > > > > > > > >> > > the concern that specifying which "phases" the > > assignment > > > > > > > algorithm > > > > > > > > >> > should > > > > > > > > >> > > have, or indeed the logic of it at all, might be a > > mistake > > > > > that > > > > > > > > >> > > over-constrains our ability to write an optimal > > algorithm. > > > > > > > > Therefore, > > > > > > > > >> > I've > > > > > > > > >> > > also refactored the KIP to just describe the protocol, > > and > > > > > specify > > > > > > > > the > > > > > > > > >> > > requirements for the assignment algorithm, but not its > > > exact > > > > > > > > behavior > > > > > > > > >> at > > > > > > > > >> > > all. > > > > > > > > >> > > > > > > > > > > >> > > Thanks, > > > > > > > > >> > > -John > > > > > > > > >> > > > > > > > > > > >> > > On Tue, Aug 20, 2019 at 5:13 PM John Roesler < > > > > > j...@confluent.io> > > > > > > > > >> wrote: > > > > > > > > >> > > > > > > > > > > >> > >> Hi All, > > > > > > > > >> > >> > > > > > > > > >> > >> Thanks for the discussion. I've been considering the > > idea > > > > of > > > > > > > giving > > > > > > > > >> the > > > > > > > > >> > >> "catching up" tasks a different name/role. I was in > > favor > > > > > > > > initially, > > > > > > > > >> but > > > > > > > > >> > >> after working though some details, I think it causes > > some > > > > > > > problems, > > > > > > > > >> > which > > > > > > > > >> > >> I've written up in the "rejected alternatives" part > of > > > the > > > > > KIP: > > > > > > > > >> > >> > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP-441:SmoothScalingOutforKafkaStreams-Addinganewkindoftask(%22moving%22,%22recovering%22,%22learner%22)fortaskmovements > > > > > > > > >> > >> > > > > > > > > >> > >> Please give it a read and let me know what you think. > > > > > > > > >> > >> > > > > > > > > >> > >> Thanks, > > > > > > > > >> > >> -John > > > > > > > > >> > >> > > > > > > > > >> > >> On Thu, Aug 8, 2019 at 7:57 PM Guozhang Wang < > > > > > wangg...@gmail.com > > > > > > > > > > > > > > > > >> > wrote: > > > > > > > > >> > >> > > > > > > > > >> > >>> I think I agree with you Sophie. My gut feeling is > > that > > > 1) > > > > > it > > > > > > > > should > > > > > > > > >> > not > > > > > > > > >> > >>> be > > > > > > > > >> > >>> the major concern in assignor's algorithm for > standby > > > > tasks > > > > > not > > > > > > > > >> > catching > > > > > > > > >> > >>> up, but rather be tackled in different modules, and > > 2) a > > > > > lot of > > > > > > > > >> > >>> optimization can be down at the stream thread > itself, > > > like > > > > > > > > dedicated > > > > > > > > >> > >>> threading and larger batching, or even complicated > > > > > scheduling > > > > > > > > >> > mechanisms > > > > > > > > >> > >>> between running, restoring and standby tasks. In > > > anyways, > > > > I > > > > > > > think > > > > > > > > we > > > > > > > > >> > can > > > > > > > > >> > >>> take this out of the scope of KIP-441 for now. > > > > > > > > >> > >>> > > > > > > > > >> > >>> > > > > > > > > >> > >>> Guozhang > > > > > > > > >> > >>> > > > > > > > > >> > >>> > > > > > > > > >> > >>> On Thu, Aug 8, 2019 at 5:48 PM Sophie Blee-Goldman < > > > > > > > > >> > sop...@confluent.io> > > > > > > > > >> > >>> wrote: > > > > > > > > >> > >>> > > > > > > > > >> > >>> > > we may have other ways to not starving the > standby > > > > > tasks, > > > > > > > for > > > > > > > > >> > >>> example, by > > > > > > > > >> > >>> > > using dedicate threads for standby tasks or even > > > > > consider > > > > > > > > having > > > > > > > > >> > >>> > *higher> priority for standby than active* so that > > we > > > > > always > > > > > > > try > > > > > > > > >> to > > > > > > > > >> > >>> caught > > > > > > > > >> > >>> > up standby > > > > > > > > >> > >>> > > first, then process active > > > > > > > > >> > >>> > > > > > > > > > >> > >>> > This is an interesting idea, but seems likely to > get > > > in > > > > > the > > > > > > > way > > > > > > > > of > > > > > > > > >> > the > > > > > > > > >> > >>> > original idea of this KIP > > > > > > > > >> > >>> > -- if we always process standby tasks first, then > if > > > we > > > > > are > > > > > > > > >> assigned > > > > > > > > >> > a > > > > > > > > >> > >>> new > > > > > > > > >> > >>> > standby task we > > > > > > > > >> > >>> > will have to wait for it to catch up completely > > before > > > > > > > > processing > > > > > > > > >> any > > > > > > > > >> > >>> > active tasks! That's > > > > > > > > >> > >>> > even worse than the situation this KIP is trying > to > > > help > > > > > with, > > > > > > > > >> since > > > > > > > > >> > a > > > > > > > > >> > >>> new > > > > > > > > >> > >>> > standby task has to > > > > > > > > >> > >>> > restore from 0 (whereas an active task at least > can > > > take > > > > > over > > > > > > > > from > > > > > > > > >> > >>> wherever > > > > > > > > >> > >>> > the standby was). > > > > > > > > >> > >>> > > > > > > > > > >> > >>> > During restoration -- while there exist any > > restoring > > > > > tasks > > > > > > > -- I > > > > > > > > >> > think > > > > > > > > >> > >>> it's > > > > > > > > >> > >>> > reasonable to de-prioritize the > > > > > > > > >> > >>> > standby tasks and just process restoring and > active > > > > tasks > > > > > so > > > > > > > > both > > > > > > > > >> can > > > > > > > > >> > >>> make > > > > > > > > >> > >>> > progress. But we should > > > > > > > > >> > >>> > let them catch up afterwards somehow -- maybe we > can > > > > apply > > > > > > > some > > > > > > > > >> kind > > > > > > > > >> > of > > > > > > > > >> > >>> > heuristic, like "if we haven't > > > > > > > > >> > >>> > processed standbys for X iterations, or Y > > > milliseconds, > > > > > do so > > > > > > > > >> now." > > > > > > > > >> > >>> > > > > > > > > > >> > >>> > Actually, it might even be beneficial to avoid > > > > processing > > > > > > > > >> standbys a > > > > > > > > >> > >>> record > > > > > > > > >> > >>> > or two at a time and instead > > > > > > > > >> > >>> > wait for a large enough batch to build up for the > > > > RocksDB > > > > > > > > >> > bulk-loading > > > > > > > > >> > >>> > benefits. > > > > > > > > >> > >>> > > > > > > > > > >> > >>> > I think the "use dedicated threads for standby" is > > the > > > > > more > > > > > > > > >> promising > > > > > > > > >> > >>> end > > > > > > > > >> > >>> > goal, especially since > > > > > > > > >> > >>> > if we split restoration into "restoring tasks" > then > > > > > active and > > > > > > > > >> > standbys > > > > > > > > >> > >>> > share almost nothing. But > > > > > > > > >> > >>> > that seems like follow-up work to the current KIP > :) > > > > > > > > >> > >>> > > > > > > > > > >> > >>> > On Thu, Aug 8, 2019 at 5:31 PM Sophie > Blee-Goldman < > > > > > > > > >> > >>> sop...@confluent.io> > > > > > > > > >> > >>> > wrote: > > > > > > > > >> > >>> > > > > > > > > > >> > >>> > > Stateful tasks with logging disabled seem to be > an > > > > > > > interesting > > > > > > > > >> edge > > > > > > > > >> > >>> case. > > > > > > > > >> > >>> > > On the one hand, > > > > > > > > >> > >>> > > for balancing purposes they should be considered > > > > > stateful > > > > > > > > since > > > > > > > > >> as > > > > > > > > >> > >>> > > Guozhang pointed out > > > > > > > > >> > >>> > > they are still "heavy" in IO costs. But for > > > "catching > > > > > up" > > > > > > > > >> purposes, > > > > > > > > >> > >>> ie > > > > > > > > >> > >>> > > when allocating standby > > > > > > > > >> > >>> > > tasks that will become active tasks, they should > > be > > > > > > > considered > > > > > > > > >> > >>> stateless > > > > > > > > >> > >>> > > as there is so > > > > > > > > >> > >>> > > meaningful sense of their lag. We should never > > > > allocate > > > > > > > > standby > > > > > > > > >> > >>> tasks for > > > > > > > > >> > >>> > > them during the > > > > > > > > >> > >>> > > first rebalance, but should ensure they are > evenly > > > > > > > distributed > > > > > > > > >> > across > > > > > > > > >> > >>> > > instances. Maybe we > > > > > > > > >> > >>> > > should split these into a third category -- > after > > we > > > > > assign > > > > > > > > all > > > > > > > > >> > >>> stateful > > > > > > > > >> > >>> > > tasks with logging, we > > > > > > > > >> > >>> > > then distribute the set of logging-disabled > > stateful > > > > > tasks > > > > > > > to > > > > > > > > >> > improve > > > > > > > > >> > >>> > > balance, before lastly > > > > > > > > >> > >>> > > distributing stateless tasks? > > > > > > > > >> > >>> > > > > > > > > > > >> > >>> > > This actually leads into what I was just > thinking, > > > > > which is > > > > > > > > >> that we > > > > > > > > >> > >>> > really > > > > > > > > >> > >>> > > should distinguish the > > > > > > > > >> > >>> > > "catch-up" standbys from normal standbys as well > > as > > > > > > > > >> distinguishing > > > > > > > > >> > >>> > > actively processing tasks > > > > > > > > >> > >>> > > from active tasks that are still in the restore > > > phase. > > > > > It's > > > > > > > > >> > somewhat > > > > > > > > >> > >>> > > awkward that today, some > > > > > > > > >> > >>> > > active tasks just start processing immediately > > while > > > > > others > > > > > > > > >> behave > > > > > > > > >> > >>> more > > > > > > > > >> > >>> > > like standby than active > > > > > > > > >> > >>> > > tasks for some time, before switching to real > > > active. > > > > > They > > > > > > > > first > > > > > > > > >> > use > > > > > > > > >> > >>> the > > > > > > > > >> > >>> > > restoreConsumer, then > > > > > > > > >> > >>> > > later only the "normal" consumer. > > > > > > > > >> > >>> > > > > > > > > > > >> > >>> > > However, this restore period is still distinct > > from > > > > > normal > > > > > > > > >> standbys > > > > > > > > >> > >>> in a > > > > > > > > >> > >>> > > lot of ways -- the code path > > > > > > > > >> > >>> > > for restoring is different than for updating > > > standbys, > > > > > for > > > > > > > > >> example > > > > > > > > >> > >>> in how > > > > > > > > >> > >>> > > long we block on #poll. > > > > > > > > >> > >>> > > So in addition to giving them their own name -- > > > let's > > > > go > > > > > > > with > > > > > > > > >> > >>> restoring > > > > > > > > >> > >>> > > task for now -- they really > > > > > > > > >> > >>> > > do seem to deserve being their own distinct > task. > > We > > > > can > > > > > > > > >> optimize > > > > > > > > >> > >>> them > > > > > > > > >> > >>> > for > > > > > > > > >> > >>> > > efficient conversion > > > > > > > > >> > >>> > > to active tasks since we know that's what they > > will > > > > be. > > > > > > > > >> > >>> > > > > > > > > > > >> > >>> > > This resolves some of the awkwardness of dealing > > > with > > > > > the > > > > > > > > >> special > > > > > > > > >> > >>> case > > > > > > > > >> > >>> > > mentioned above: we > > > > > > > > >> > >>> > > find a balanced assignment of stateful and > > stateless > > > > > tasks, > > > > > > > > and > > > > > > > > >> > >>> create > > > > > > > > >> > >>> > > restoring tasks as needed. > > > > > > > > >> > >>> > > If logging is disabled, no restoring task is > > > created. > > > > > > > > >> > >>> > > > > > > > > > > >> > >>> > > > > > > > > > > >> > >>> > > On Thu, Aug 8, 2019 at 3:44 PM Guozhang Wang < > > > > > > > > >> wangg...@gmail.com> > > > > > > > > >> > >>> wrote: > > > > > > > > >> > >>> > > > > > > > > > > >> > >>> > >> Regarding 3) above: I think for active task > they > > > > should > > > > > > > still > > > > > > > > >> be > > > > > > > > >> > >>> > >> considered > > > > > > > > >> > >>> > >> stateful since the processor would still pay IO > > > cost > > > > > > > > accessing > > > > > > > > >> the > > > > > > > > >> > >>> > store, > > > > > > > > >> > >>> > >> but they would not have standby tasks? > > > > > > > > >> > >>> > >> > > > > > > > > >> > >>> > >> On Thu, Aug 8, 2019 at 7:49 AM Bruno Cadonna < > > > > > > > > >> br...@confluent.io> > > > > > > > > >> > >>> > wrote: > > > > > > > > >> > >>> > >> > > > > > > > > >> > >>> > >> > Hi, > > > > > > > > >> > >>> > >> > > > > > > > > > >> > >>> > >> > Thank you for the KIP! > > > > > > > > >> > >>> > >> > > > > > > > > > >> > >>> > >> > Some questions/comments: > > > > > > > > >> > >>> > >> > > > > > > > > > >> > >>> > >> > 1. I am wondering if the "stand-by" tasks > that > > > > catch > > > > > up > > > > > > > > state > > > > > > > > >> > >>> before > > > > > > > > >> > >>> > >> > the active task is switched deserve its own > > name > > > in > > > > > this > > > > > > > > KIP > > > > > > > > >> and > > > > > > > > >> > >>> maybe > > > > > > > > >> > >>> > >> > in the code. We have already stated that they > > are > > > > not > > > > > > > true > > > > > > > > >> > >>> stand-by > > > > > > > > >> > >>> > >> > tasks, they are not configured through > > > > > > > > >> `num.standby.replicas`, > > > > > > > > >> > and > > > > > > > > >> > >>> > >> > maybe they have also other properties that > > > > > distinguish > > > > > > > them > > > > > > > > >> from > > > > > > > > >> > >>> true > > > > > > > > >> > >>> > >> > stand-by tasks of which we are not aware yet. > > For > > > > > > > example, > > > > > > > > >> they > > > > > > > > >> > >>> may be > > > > > > > > >> > >>> > >> > prioritized differently than other tasks. > > > > > Furthermore, > > > > > > > the > > > > > > > > >> name > > > > > > > > >> > >>> > >> > "stand-by" does not really fit with the > planned > > > > > > > > >> functionality of > > > > > > > > >> > >>> those > > > > > > > > >> > >>> > >> > tasks. In the following, I will call them > false > > > > > stand-by > > > > > > > > >> tasks. > > > > > > > > >> > >>> > >> > > > > > > > > > >> > >>> > >> > 2. Did you consider to trigger the probing > > > > > rebalances not > > > > > > > > at > > > > > > > > >> > >>> regular > > > > > > > > >> > >>> > >> > time intervals but when the false stand-by > > tasks > > > > > reach an > > > > > > > > >> > >>> acceptable > > > > > > > > >> > >>> > >> > lag? If you did consider, could you add a > > > paragraph > > > > > why > > > > > > > you > > > > > > > > >> > >>> rejected > > > > > > > > >> > >>> > >> > this idea to the "Rejected Alternatives" > > section. > > > > > > > > >> > >>> > >> > > > > > > > > > >> > >>> > >> > 3. Are tasks that solely contain stores with > > > > disabled > > > > > > > > logging > > > > > > > > >> > >>> > >> > classified as stateful or stateless in the > > > > > algorithm? I > > > > > > > > would > > > > > > > > >> > >>> guess > > > > > > > > >> > >>> > >> > stateless, although if possible they should > be > > > > > assigned > > > > > > > to > > > > > > > > >> the > > > > > > > > >> > >>> same > > > > > > > > >> > >>> > >> > instance they had run before the rebalance. > As > > > far > > > > > as I > > > > > > > can > > > > > > > > >> see > > > > > > > > >> > >>> this > > > > > > > > >> > >>> > >> > special case is not handled in the algorithm. > > > > > > > > >> > >>> > >> > > > > > > > > > >> > >>> > >> > Best, > > > > > > > > >> > >>> > >> > Bruno > > > > > > > > >> > >>> > >> > > > > > > > > > >> > >>> > >> > > > > > > > > > >> > >>> > >> > > > > > > > > > >> > >>> > >> > On Thu, Aug 8, 2019 at 8:24 AM Guozhang Wang > < > > > > > > > > >> > wangg...@gmail.com> > > > > > > > > >> > >>> > >> wrote: > > > > > > > > >> > >>> > >> > > > > > > > > > > >> > >>> > >> > > 1. Sounds good, just wanted to clarify; and > > it > > > > may > > > > > > > worth > > > > > > > > >> > >>> documenting > > > > > > > > >> > >>> > >> it > > > > > > > > >> > >>> > >> > so > > > > > > > > >> > >>> > >> > > that users would not be surprised when > > > monitoring > > > > > their > > > > > > > > >> > >>> footprint. > > > > > > > > >> > >>> > >> > > > > > > > > > > >> > >>> > >> > > 2. Hmm I see... I think the trade-off can > be > > > > > described > > > > > > > as > > > > > > > > >> "how > > > > > > > > >> > >>> much > > > > > > > > >> > >>> > >> > > imbalance would bother you to be willing to > > pay > > > > > another > > > > > > > > >> > >>> rebalance, > > > > > > > > >> > >>> > >> along > > > > > > > > >> > >>> > >> > > with potentially more restoration lag", and > > the > > > > > current > > > > > > > > >> > >>> definition > > > > > > > > >> > >>> > of > > > > > > > > >> > >>> > >> > > rebalance_factor can be considered as a > rough > > > > > > > measurement > > > > > > > > >> of > > > > > > > > >> > >>> that > > > > > > > > >> > >>> > >> > > imbalance. Of course one can argue that a > > finer > > > > > grained > > > > > > > > >> > >>> measurement > > > > > > > > >> > >>> > >> could > > > > > > > > >> > >>> > >> > > be "resource footprint" like CPU / storage > of > > > > each > > > > > > > > instance > > > > > > > > >> > >>> like we > > > > > > > > >> > >>> > >> have > > > > > > > > >> > >>> > >> > in > > > > > > > > >> > >>> > >> > > Kafka broker auto balancing tools, but I'd > > > prefer > > > > > not > > > > > > > > doing > > > > > > > > >> > >>> that as > > > > > > > > >> > >>> > >> part > > > > > > > > >> > >>> > >> > of > > > > > > > > >> > >>> > >> > > the library but more as an operational tool > > in > > > > the > > > > > > > > future. > > > > > > > > >> On > > > > > > > > >> > >>> the > > > > > > > > >> > >>> > >> other > > > > > > > > >> > >>> > >> > > hand, I've seen stateful and stateless > tasks > > > > having > > > > > > > very > > > > > > > > >> > >>> different > > > > > > > > >> > >>> > >> load, > > > > > > > > >> > >>> > >> > > and sometimes the only bottleneck of a > > Streams > > > > app > > > > > is > > > > > > > > just > > > > > > > > >> one > > > > > > > > >> > >>> > >> stateful > > > > > > > > >> > >>> > >> > > sub-topology and whoever gets tasks of that > > > > > > > sub-topology > > > > > > > > >> > become > > > > > > > > >> > >>> > >> hotspot > > > > > > > > >> > >>> > >> > > (and that's why our algorithm tries to > > balance > > > > per > > > > > > > > >> > sub-topology > > > > > > > > >> > >>> as > > > > > > > > >> > >>> > >> well), > > > > > > > > >> > >>> > >> > > so maybe we can just consider stateful > tasks > > > when > > > > > > > > >> calculating > > > > > > > > >> > >>> this > > > > > > > > >> > >>> > >> factor > > > > > > > > >> > >>> > >> > > as a very brute force heuristic? > > > > > > > > >> > >>> > >> > > > > > > > > > > >> > >>> > >> > > 3.a. Thinking about this a bit more, maybe > > it's > > > > > better > > > > > > > > not > > > > > > > > >> try > > > > > > > > >> > >>> to > > > > > > > > >> > >>> > >> tackle > > > > > > > > >> > >>> > >> > an > > > > > > > > >> > >>> > >> > > unseen enemy just yet, and observe if it > > really > > > > > emerges > > > > > > > > >> later, > > > > > > > > >> > >>> and > > > > > > > > >> > >>> > by > > > > > > > > >> > >>> > >> > then > > > > > > > > >> > >>> > >> > > we may have other ways to not starving the > > > > standby > > > > > > > tasks, > > > > > > > > >> for > > > > > > > > >> > >>> > >> example, by > > > > > > > > >> > >>> > >> > > using dedicate threads for standby tasks or > > > even > > > > > > > consider > > > > > > > > >> > having > > > > > > > > >> > >>> > >> higher > > > > > > > > >> > >>> > >> > > priority for standby than active so that we > > > > always > > > > > try > > > > > > > to > > > > > > > > >> > >>> caught up > > > > > > > > >> > >>> > >> > standby > > > > > > > > >> > >>> > >> > > first, then process active; and if active's > > > > lagging > > > > > > > > >> compared > > > > > > > > >> > to > > > > > > > > >> > >>> > >> > > log-end-offset is increasing then we should > > > > > increase > > > > > > > > >> capacity, > > > > > > > > >> > >>> etc > > > > > > > > >> > >>> > >> etc. > > > > > > > > >> > >>> > >> > > > > > > > > > > >> > >>> > >> > > 4. Actually with KIP-429 this may not be > the > > > > case: > > > > > we > > > > > > > may > > > > > > > > >> not > > > > > > > > >> > >>> call > > > > > > > > >> > >>> > >> > > onPartitionsRevoked prior to rebalance any > > more > > > > so > > > > > > > would > > > > > > > > >> not > > > > > > > > >> > >>> transit > > > > > > > > >> > >>> > >> > state > > > > > > > > >> > >>> > >> > > to PARTITIONS_REVOKED, and hence not cause > > the > > > > > state of > > > > > > > > the > > > > > > > > >> > >>> instance > > > > > > > > >> > >>> > >> to > > > > > > > > >> > >>> > >> > be > > > > > > > > >> > >>> > >> > > REBALANCING. In other words, even if a > > instance > > > > is > > > > > > > > >> undergoing > > > > > > > > >> > a > > > > > > > > >> > >>> > >> rebalance > > > > > > > > >> > >>> > >> > > it's state may still be RUNNING and it may > > > still > > > > be > > > > > > > > >> processing > > > > > > > > >> > >>> > >> records at > > > > > > > > >> > >>> > >> > > the same time. > > > > > > > > >> > >>> > >> > > > > > > > > > > >> > >>> > >> > > > > > > > > > > >> > >>> > >> > > On Wed, Aug 7, 2019 at 12:14 PM John > Roesler > > < > > > > > > > > >> > j...@confluent.io > > > > > > > > >> > >>> > > > > > > > > > >> > >>> > >> wrote: > > > > > > > > >> > >>> > >> > > > > > > > > > > >> > >>> > >> > > > Hey Guozhang, > > > > > > > > >> > >>> > >> > > > > > > > > > > > >> > >>> > >> > > > Thanks for the review! > > > > > > > > >> > >>> > >> > > > > > > > > > > > >> > >>> > >> > > > 1. Yes, even with `num.standby.replicas > := > > > 0`, > > > > we > > > > > > > will > > > > > > > > >> still > > > > > > > > >> > >>> > >> > temporarily > > > > > > > > >> > >>> > >> > > > allocate standby tasks to accomplish a > > > > > no-downtime > > > > > > > task > > > > > > > > >> > >>> migration. > > > > > > > > >> > >>> > >> > > > Although, I'd argue that this doesn't > > really > > > > > violate > > > > > > > > the > > > > > > > > >> > >>> config, > > > > > > > > >> > >>> > as > > > > > > > > >> > >>> > >> the > > > > > > > > >> > >>> > >> > > > task isn't a true hot standby. As soon as > > it > > > > > catches > > > > > > > > up, > > > > > > > > >> > we'll > > > > > > > > >> > >>> > >> > rebalance > > > > > > > > >> > >>> > >> > > > again, that task will become active, and > > the > > > > > original > > > > > > > > >> > instance > > > > > > > > >> > >>> > that > > > > > > > > >> > >>> > >> > hosted > > > > > > > > >> > >>> > >> > > > the active task will no longer have the > > task > > > > > assigned > > > > > > > > at > > > > > > > > >> > all. > > > > > > > > >> > >>> Once > > > > > > > > >> > >>> > >> the > > > > > > > > >> > >>> > >> > > > stateDirCleaner kicks in, we'll free the > > disk > > > > > space > > > > > > > > from > > > > > > > > >> it, > > > > > > > > >> > >>> and > > > > > > > > >> > >>> > >> > return to > > > > > > > > >> > >>> > >> > > > the steady-state of having just one copy > of > > > the > > > > > task > > > > > > > in > > > > > > > > >> the > > > > > > > > >> > >>> > cluster. > > > > > > > > >> > >>> > >> > > > > > > > > > > > >> > >>> > >> > > > We can of course do without this, but I > > feel > > > > the > > > > > > > > current > > > > > > > > >> > >>> proposal > > > > > > > > >> > >>> > is > > > > > > > > >> > >>> > >> > > > operationally preferable, since it > doesn't > > > make > > > > > > > > >> configuring > > > > > > > > >> > >>> > >> > hot-standbys a > > > > > > > > >> > >>> > >> > > > pre-requisite for fast rebalances. > > > > > > > > >> > >>> > >> > > > > > > > > > > > >> > >>> > >> > > > 2. Yes, I think your interpretation is > what > > > we > > > > > > > > intended. > > > > > > > > >> The > > > > > > > > >> > >>> > default > > > > > > > > >> > >>> > >> > > > balance_factor would be 1, as it is > > > implicitly > > > > > today. > > > > > > > > >> What > > > > > > > > >> > >>> this > > > > > > > > >> > >>> > >> does is > > > > > > > > >> > >>> > >> > > > allows operators to trade off less > balanced > > > > > > > assignments > > > > > > > > >> > >>> against > > > > > > > > >> > >>> > >> fewer > > > > > > > > >> > >>> > >> > > > rebalances. If you have lots of space > > > capacity > > > > in > > > > > > > your > > > > > > > > >> > >>> instances, > > > > > > > > >> > >>> > >> this > > > > > > > > >> > >>> > >> > may > > > > > > > > >> > >>> > >> > > > be a perfectly fine tradeoff, and you may > > > > prefer > > > > > for > > > > > > > > >> Streams > > > > > > > > >> > >>> not > > > > > > > > >> > >>> > to > > > > > > > > >> > >>> > >> > bother > > > > > > > > >> > >>> > >> > > > streaming GBs of data from the broker in > > > > pursuit > > > > > of > > > > > > > > >> perfect > > > > > > > > >> > >>> > balance. > > > > > > > > >> > >>> > >> > Not > > > > > > > > >> > >>> > >> > > > married to this configuration, though. It > > was > > > > > > > inspired > > > > > > > > by > > > > > > > > >> > the > > > > > > > > >> > >>> > >> related > > > > > > > > >> > >>> > >> > work > > > > > > > > >> > >>> > >> > > > research we did. > > > > > > > > >> > >>> > >> > > > > > > > > > > > >> > >>> > >> > > > 3. I'll take a look > > > > > > > > >> > >>> > >> > > > > > > > > > > > >> > >>> > >> > > > 3a. I think this is a good idea. I'd > > classify > > > > it > > > > > as a > > > > > > > > >> type > > > > > > > > >> > of > > > > > > > > >> > >>> grey > > > > > > > > >> > >>> > >> > failure > > > > > > > > >> > >>> > >> > > > detection. It may make more sense to > tackle > > > > grey > > > > > > > > >> failures as > > > > > > > > >> > >>> part > > > > > > > > >> > >>> > of > > > > > > > > >> > >>> > >> > the > > > > > > > > >> > >>> > >> > > > heartbeat protocol (as I POCed here: > > > > > > > > >> > >>> > >> > > > > > > > https://github.com/apache/kafka/pull/7096/files > > > > > ). > > > > > > > > WDYT? > > > > > > > > >> > >>> > >> > > > > > > > > > > > >> > >>> > >> > > > 4. Good catch! I didn't think about that > > > > before. > > > > > > > > Looking > > > > > > > > >> at > > > > > > > > >> > it > > > > > > > > >> > >>> > now, > > > > > > > > >> > >>> > >> > though, > > > > > > > > >> > >>> > >> > > > I wonder if we're actually protected > > already. > > > > The > > > > > > > > >> > >>> stateDirCleaner > > > > > > > > >> > >>> > >> > thread > > > > > > > > >> > >>> > >> > > > only executes if the instance is in > RUNNING > > > > > state, > > > > > > > and > > > > > > > > >> > KIP-441 > > > > > > > > >> > >>> > >> > proposes to > > > > > > > > >> > >>> > >> > > > use "probing rebalances" to report task > > lag. > > > > > Hence, > > > > > > > > >> during > > > > > > > > >> > the > > > > > > > > >> > >>> > >> window > > > > > > > > >> > >>> > >> > > > between when the instance reports a lag > and > > > the > > > > > > > > assignor > > > > > > > > >> > >>> makes a > > > > > > > > >> > >>> > >> > decision > > > > > > > > >> > >>> > >> > > > about it, the instance should remain in > > > > > REBALANCING > > > > > > > > >> state, > > > > > > > > >> > >>> right? > > > > > > > > >> > >>> > If > > > > > > > > >> > >>> > >> > so, > > > > > > > > >> > >>> > >> > > > then this should prevent the race > > condition. > > > If > > > > > not, > > > > > > > > >> then we > > > > > > > > >> > >>> do > > > > > > > > >> > >>> > >> indeed > > > > > > > > >> > >>> > >> > need > > > > > > > > >> > >>> > >> > > > to do something about it. > > > > > > > > >> > >>> > >> > > > > > > > > > > > >> > >>> > >> > > > 5. Good idea. I think that today, you can > > > only > > > > > see > > > > > > > the > > > > > > > > >> > >>> consumer > > > > > > > > >> > >>> > lag, > > > > > > > > >> > >>> > >> > which > > > > > > > > >> > >>> > >> > > > is a poor substitute. I'll add some > metrics > > > to > > > > > the > > > > > > > > >> proposal. > > > > > > > > >> > >>> > >> > > > > > > > > > > > >> > >>> > >> > > > Thanks again for the comments! > > > > > > > > >> > >>> > >> > > > -John > > > > > > > > >> > >>> > >> > > > > > > > > > > > >> > >>> > >> > > > On Tue, Aug 6, 2019 at 4:27 PM Guozhang > > Wang > > > < > > > > > > > > >> > >>> wangg...@gmail.com> > > > > > > > > >> > >>> > >> > wrote: > > > > > > > > >> > >>> > >> > > > > > > > > > > > >> > >>> > >> > > > > Hello Sophie, > > > > > > > > >> > >>> > >> > > > > > > > > > > > > >> > >>> > >> > > > > Thanks for the proposed KIP. I left > some > > > > > comments > > > > > > > on > > > > > > > > >> the > > > > > > > > >> > >>> wiki > > > > > > > > >> > >>> > >> itself, > > > > > > > > >> > >>> > >> > > > and I > > > > > > > > >> > >>> > >> > > > > think I'm still not very clear on a > > couple > > > or > > > > > > > those: > > > > > > > > >> > >>> > >> > > > > > > > > > > > > >> > >>> > >> > > > > 1. With this proposal, does that mean > > with > > > > > > > > >> > >>> num.standby.replicas > > > > > > > > >> > >>> > == > > > > > > > > >> > >>> > >> > 0, we > > > > > > > > >> > >>> > >> > > > > may sometimes still have some standby > > tasks > > > > > which > > > > > > > may > > > > > > > > >> > >>> violate > > > > > > > > >> > >>> > the > > > > > > > > >> > >>> > >> > config? > > > > > > > > >> > >>> > >> > > > > > > > > > > > > >> > >>> > >> > > > > 2. I think I understand the rationale > to > > > > > consider > > > > > > > > lags > > > > > > > > >> > that > > > > > > > > >> > >>> is > > > > > > > > >> > >>> > >> below > > > > > > > > >> > >>> > >> > the > > > > > > > > >> > >>> > >> > > > > specified threshold to be equal, rather > > > than > > > > > still > > > > > > > > >> > >>> considering > > > > > > > > >> > >>> > >> 5000 > > > > > > > > >> > >>> > >> > is > > > > > > > > >> > >>> > >> > > > > better than 5001 -- we do not want to > > > > > > > "over-optimize" > > > > > > > > >> and > > > > > > > > >> > >>> > >> potentially > > > > > > > > >> > >>> > >> > > > falls > > > > > > > > >> > >>> > >> > > > > into endless rebalances back and forth. > > > > > > > > >> > >>> > >> > > > > > > > > > > > > >> > >>> > >> > > > > But I'm not clear about the rationale > of > > > the > > > > > second > > > > > > > > >> > >>> parameter of > > > > > > > > >> > >>> > >> > > > > > > > > > > > > >> > >>> > > > > > constrainedBalancedAssignment(StatefulTasksToRankedCandidates, > > > > > > > > >> > >>> > >> > > > > balance_factor): > > > > > > > > >> > >>> > >> > > > > > > > > > > > > >> > >>> > >> > > > > Does that mean, e.g. with > balance_factor > > of > > > > 3, > > > > > we'd > > > > > > > > >> > >>> consider two > > > > > > > > >> > >>> > >> > > > > assignments one resulting > balance_factor > > 0 > > > > and > > > > > one > > > > > > > > >> > resulting > > > > > > > > >> > >>> > >> > > > balance_factor > > > > > > > > >> > >>> > >> > > > > 3 to be equally optimized assignment > and > > > > > therefore > > > > > > > > may > > > > > > > > >> > "stop > > > > > > > > >> > >>> > >> early"? > > > > > > > > >> > >>> > >> > This > > > > > > > > >> > >>> > >> > > > > was not very convincing to me :P > > > > > > > > >> > >>> > >> > > > > > > > > > > > > >> > >>> > >> > > > > 3. There are a couple of minor comments > > > about > > > > > the > > > > > > > > >> > algorithm > > > > > > > > >> > >>> > >> itself, > > > > > > > > >> > >>> > >> > left > > > > > > > > >> > >>> > >> > > > on > > > > > > > > >> > >>> > >> > > > > the wiki page since it needs to refer > to > > > the > > > > > exact > > > > > > > > line > > > > > > > > >> > and > > > > > > > > >> > >>> > better > > > > > > > > >> > >>> > >> > > > > displayed there. > > > > > > > > >> > >>> > >> > > > > > > > > > > > > >> > >>> > >> > > > > 3.a Another wild thought about the > > > threshold > > > > > > > itself: > > > > > > > > >> today > > > > > > > > >> > >>> the > > > > > > > > >> > >>> > >> > assignment > > > > > > > > >> > >>> > >> > > > > itself is memoryless, so we would not > > know > > > if > > > > > the > > > > > > > > >> reported > > > > > > > > >> > >>> > >> `TaskLag` > > > > > > > > >> > >>> > >> > > > itself > > > > > > > > >> > >>> > >> > > > > is increasing or decreasing even if the > > > > current > > > > > > > value > > > > > > > > >> is > > > > > > > > >> > >>> under > > > > > > > > >> > >>> > the > > > > > > > > >> > >>> > >> > > > > threshold. I wonder if it worthy to > make > > > it a > > > > > bit > > > > > > > > more > > > > > > > > >> > >>> > >> complicated to > > > > > > > > >> > >>> > >> > > > track > > > > > > > > >> > >>> > >> > > > > task lag trend at the assignor? > > Practically > > > > it > > > > > may > > > > > > > > not > > > > > > > > >> be > > > > > > > > >> > >>> very > > > > > > > > >> > >>> > >> > uncommon > > > > > > > > >> > >>> > >> > > > > that stand-by tasks are not keeping up > > due > > > to > > > > > the > > > > > > > > fact > > > > > > > > >> > that > > > > > > > > >> > >>> > other > > > > > > > > >> > >>> > >> > active > > > > > > > > >> > >>> > >> > > > > tasks hosted on the same thread is > > starving > > > > the > > > > > > > > standby > > > > > > > > >> > >>> tasks. > > > > > > > > >> > >>> > >> > > > > > > > > > > > > >> > >>> > >> > > > > 4. There's a potential race condition > > risk > > > > when > > > > > > > > >> reporting > > > > > > > > >> > >>> > >> `TaskLags` > > > > > > > > >> > >>> > >> > in > > > > > > > > >> > >>> > >> > > > the > > > > > > > > >> > >>> > >> > > > > subscription: right after reporting it > to > > > the > > > > > > > leader, > > > > > > > > >> the > > > > > > > > >> > >>> > cleanup > > > > > > > > >> > >>> > >> > thread > > > > > > > > >> > >>> > >> > > > > kicks in and deletes the state > directory. > > > If > > > > > the > > > > > > > task > > > > > > > > >> was > > > > > > > > >> > >>> > assigned > > > > > > > > >> > >>> > >> > to the > > > > > > > > >> > >>> > >> > > > > host it would cause it to restore from > > > > > beginning > > > > > > > and > > > > > > > > >> > >>> effectively > > > > > > > > >> > >>> > >> > make the > > > > > > > > >> > >>> > >> > > > > seemingly optimized assignment very > > > > > sub-optimal. > > > > > > > > >> > >>> > >> > > > > > > > > > > > > >> > >>> > >> > > > > To be on the safer side we should > > consider > > > > > either > > > > > > > > prune > > > > > > > > >> > out > > > > > > > > >> > >>> > those > > > > > > > > >> > >>> > >> > tasks > > > > > > > > >> > >>> > >> > > > > that are "close to be cleaned up" in > the > > > > > > > > subscription, > > > > > > > > >> or > > > > > > > > >> > we > > > > > > > > >> > >>> > >> should > > > > > > > > >> > >>> > >> > delay > > > > > > > > >> > >>> > >> > > > > the cleanup right after we've included > > them > > > > in > > > > > the > > > > > > > > >> > >>> subscription > > > > > > > > >> > >>> > in > > > > > > > > >> > >>> > >> > case > > > > > > > > >> > >>> > >> > > > > they are been selected as assigned > tasks > > by > > > > the > > > > > > > > >> assignor. > > > > > > > > >> > >>> > >> > > > > > > > > > > > > >> > >>> > >> > > > > 5. This is a meta comment: I think it > > would > > > > be > > > > > > > > helpful > > > > > > > > >> to > > > > > > > > >> > >>> add > > > > > > > > >> > >>> > some > > > > > > > > >> > >>> > >> > user > > > > > > > > >> > >>> > >> > > > > visibility on the standby tasks lagging > > as > > > > > well, > > > > > > > via > > > > > > > > >> > >>> metrics for > > > > > > > > >> > >>> > >> > example. > > > > > > > > >> > >>> > >> > > > > Today it is hard for us to observe how > > far > > > > are > > > > > our > > > > > > > > >> current > > > > > > > > >> > >>> > standby > > > > > > > > >> > >>> > >> > tasks > > > > > > > > >> > >>> > >> > > > > compared to the active tasks and > whether > > > that > > > > > lag > > > > > > > is > > > > > > > > >> being > > > > > > > > >> > >>> > >> > increasing or > > > > > > > > >> > >>> > >> > > > > decreasing. As a follow-up task, for > > > example, > > > > > the > > > > > > > > >> > rebalance > > > > > > > > >> > >>> > should > > > > > > > > >> > >>> > >> > also > > > > > > > > >> > >>> > >> > > > be > > > > > > > > >> > >>> > >> > > > > triggered if we realize that some > standby > > > > > task's > > > > > > > lag > > > > > > > > is > > > > > > > > >> > >>> > increasing > > > > > > > > >> > >>> > >> > > > > indefinitely means that it cannot keep > up > > > > > (which is > > > > > > > > >> > another > > > > > > > > >> > >>> > >> indicator > > > > > > > > >> > >>> > >> > > > > either you need to add more resources > > with > > > > the > > > > > > > > >> > num.standbys > > > > > > > > >> > >>> or > > > > > > > > >> > >>> > >> your > > > > > > > > >> > >>> > >> > are > > > > > > > > >> > >>> > >> > > > > still not balanced enough). > > > > > > > > >> > >>> > >> > > > > > > > > > > > > >> > >>> > >> > > > > > > > > > > > > >> > >>> > >> > > > > On Tue, Aug 6, 2019 at 1:32 PM Sophie > > > > > Blee-Goldman > > > > > > > < > > > > > > > > >> > >>> > >> > sop...@confluent.io> > > > > > > > > >> > >>> > >> > > > > wrote: > > > > > > > > >> > >>> > >> > > > > > > > > > > > > >> > >>> > >> > > > > > Hey all, > > > > > > > > >> > >>> > >> > > > > > > > > > > > > > >> > >>> > >> > > > > > I'd like to kick off discussion on > > > KIP-441, > > > > > aimed > > > > > > > > at > > > > > > > > >> the > > > > > > > > >> > >>> long > > > > > > > > >> > >>> > >> > restore > > > > > > > > >> > >>> > >> > > > > times > > > > > > > > >> > >>> > >> > > > > > in Streams during which further > active > > > > > processing > > > > > > > > >> and IQ > > > > > > > > >> > >>> are > > > > > > > > >> > >>> > >> > blocked. > > > > > > > > >> > >>> > >> > > > > > Please give it a read and let us know > > > your > > > > > > > thoughts > > > > > > > > >> > >>> > >> > > > > > > > > > > > > > >> > >>> > >> > > > > > > > > > > > > > >> > >>> > >> > > > > > > > > > > > > > >> > >>> > >> > > > > > > > > > > > > >> > >>> > >> > > > > > > > > > > > >> > >>> > >> > > > > > > > > > >> > >>> > >> > > > > > > > > >> > >>> > > > > > > > > > >> > >>> > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-441:+Smooth+Scaling+Out+for+Kafka+Streams > > > > > > > > >> > >>> > >> > > > > > > > > > > > > > >> > >>> > >> > > > > > Cheers, > > > > > > > > >> > >>> > >> > > > > > Sophie > > > > > > > > >> > >>> > >> > > > > > > > > > > > > > >> > >>> > >> > > > > > > > > > > > > >> > >>> > >> > > > > > > > > > > > > >> > >>> > >> > > > > -- > > > > > > > > >> > >>> > >> > > > > -- Guozhang > > > > > > > > >> > >>> > >> > > > > > > > > > > > > >> > >>> > >> > > > > > > > > > > > >> > >>> > >> > > > > > > > > > > >> > >>> > >> > > > > > > > > > > >> > >>> > >> > > -- > > > > > > > > >> > >>> > >> > > -- Guozhang > > > > > > > > >> > >>> > >> > > > > > > > > > >> > >>> > >> > > > > > > > > >> > >>> > >> > > > > > > > > >> > >>> > >> -- > > > > > > > > >> > >>> > >> -- Guozhang > > > > > > > > >> > >>> > >> > > > > > > > > >> > >>> > > > > > > > > > > >> > >>> > > > > > > > > > >> > >>> > > > > > > > > >> > >>> > > > > > > > > >> > >>> -- > > > > > > > > >> > >>> -- Guozhang > > > > > > > > >> > >>> > > > > > > > > >> > >> > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> -- > > > > > > > > >> -- Guozhang > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > > > -- > -- Guozhang >