Hello John, That sounds reasonable. Just double checked the code that with logging disabled the corresponding checkpoint file would not contain any values, just like a stateless task. So I think treating them logically the same is fine.
Guozhang On Wed, Aug 21, 2019 at 11:41 AM John Roesler <j...@confluent.io> wrote: > Hi again, Guozhang, > > While writing up the section on stateless tasks ( > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP-441:SmoothScalingOutforKafkaStreams-Statelesstasks > ), > I reconsidered whether stateful, but non-logged, tasks should actually > report a lag of zero, versus not reporting any lag. By the definition of > the "StatefulTasksToRankedCandidates" function, the leader would compute a > lag of zero for these tasks anyway. > > Therefore, I think the same reasoning that I supplied you for stateless > tasks applies, since the member and leader will agree on a lag of zero > anyway, we can avoid adding them to the "Task Lags" map, and save some > bytes in the JoinGroup request. This would be especially beneficial in an > application that uses remote stores for _all_ its state stores, it would > have an extremely lightweight JoinGroup request, with no task lags at all. > > WDYT? > -John > > On Wed, Aug 21, 2019 at 1:17 PM John Roesler <j...@confluent.io> wrote: > > > Thanks, Guozhang. > > > > (Side note: I noticed on another pass over the discussion that I'd missed > > addressing your comment about the potential race condition between state > > cleanup and lag-based assignment. I've added a solution to the proposal: > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP-441:SmoothScalingOutforKafkaStreams-Racebetweenassignmentandstatecleanup > > ) > > > > In the JoinGroup (SubscriptionInfo) metadata, stateless tasks are not > > represented at all. This should save us some bytes in the request > metadata. > > If we treated them like non-logged stateful tasks and reported a lag of > 0, > > the only difference is that the assignor would be able to tell which > > members previously hosted that stateless task. > > > > I'd like to make a simplifying assumption that stateless tasks can just > be > > freely reassigned with no regard to stickiness at all, without impacting > > performance. This is almost true. In fact, while assigned a stateless > task, > > a member fetches batches of records from the broker, so if we move the > > stateless task assignment, this buffered input is wasted and just gets > > dropped. > > > > However, we won't be moving the stateless tasks around all the time (just > > during rebalances), and we have the requirement that the assigment > > algorithm must stabilize to guard against perpetually shuffling a > stateless > > task from one node to another. So, my hope is that this small amount of > > inefficiency would not be a performance-dominating factor. In exchange, > we > > gain the opportunity for the assignment algorithm to use the stateless > > tasks as "filler" during unbalanced assignments. For example, if there > is a > > node that is just warming up with several standby tasks, maybe the > > assignment can give more stateless tasks to that node to balance the > > computational load across the cluster. > > > > It's worth noting that such an assignment would still not be considered > > "balanced", so the ultimately balanced final state of the assignment > (after > > task movements) would still have the desired property that each stateful > > and stateless task is evenly spread across the cluster. > > > > Does that seem reasonable? > > > > Thanks, > > -John > > > > On Wed, Aug 21, 2019 at 11:22 AM Guozhang Wang <wangg...@gmail.com> > wrote: > > > >> Hello John, > >> > >> I've made another pass on the wiki page again, overall LGTM. One meta > >> comment about the "stateless" tasks: how do we represent them in the > >> metadata? Are they just treated as stateful tasks with logging disabled, > >> or > >> are specially handled? It is not very clear in the description. > >> > >> > >> Guozhang > >> > >> On Wed, Aug 21, 2019 at 8:43 AM John Roesler <j...@confluent.io> wrote: > >> > >> > I have also specifically called out that the assignment must achieve > >> both > >> > "instance" and "task" balance: > >> > > >> > > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP-441:SmoothScalingOutforKafkaStreams-Defining%22balance%22 > >> > > >> > I've also addressed the problem of state stores with logging disabled: > >> > > >> > > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP-441:SmoothScalingOutforKafkaStreams-Statewithloggingdisabled > >> > > >> > I believe this addresses all the concerns that have been raised to > date. > >> > Apologies if I've overlooked one of your concerns. > >> > > >> > Please give the KIP another read and let me know of any further > >> thoughts! > >> > Hopefully, we can start the voting on this KIP by the end of the week. > >> > > >> > Thanks, > >> > -John > >> > > >> > On Tue, Aug 20, 2019 at 5:16 PM John Roesler <j...@confluent.io> > wrote: > >> > > >> > > In response to Bruno's concern #2, I've also added that section to > the > >> > > "Rejected Alternatives" section. > >> > > > >> > > Additionally, after reviewing some other assignment papers, I've > >> > developed > >> > > the concern that specifying which "phases" the assignment algorithm > >> > should > >> > > have, or indeed the logic of it at all, might be a mistake that > >> > > over-constrains our ability to write an optimal algorithm. > Therefore, > >> > I've > >> > > also refactored the KIP to just describe the protocol, and specify > the > >> > > requirements for the assignment algorithm, but not its exact > behavior > >> at > >> > > all. > >> > > > >> > > Thanks, > >> > > -John > >> > > > >> > > On Tue, Aug 20, 2019 at 5:13 PM John Roesler <j...@confluent.io> > >> wrote: > >> > > > >> > >> Hi All, > >> > >> > >> > >> Thanks for the discussion. I've been considering the idea of giving > >> the > >> > >> "catching up" tasks a different name/role. I was in favor > initially, > >> but > >> > >> after working though some details, I think it causes some problems, > >> > which > >> > >> I've written up in the "rejected alternatives" part of the KIP: > >> > >> > >> > > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP-441:SmoothScalingOutforKafkaStreams-Addinganewkindoftask(%22moving%22,%22recovering%22,%22learner%22)fortaskmovements > >> > >> > >> > >> Please give it a read and let me know what you think. > >> > >> > >> > >> Thanks, > >> > >> -John > >> > >> > >> > >> On Thu, Aug 8, 2019 at 7:57 PM Guozhang Wang <wangg...@gmail.com> > >> > wrote: > >> > >> > >> > >>> I think I agree with you Sophie. My gut feeling is that 1) it > should > >> > not > >> > >>> be > >> > >>> the major concern in assignor's algorithm for standby tasks not > >> > catching > >> > >>> up, but rather be tackled in different modules, and 2) a lot of > >> > >>> optimization can be down at the stream thread itself, like > dedicated > >> > >>> threading and larger batching, or even complicated scheduling > >> > mechanisms > >> > >>> between running, restoring and standby tasks. In anyways, I think > we > >> > can > >> > >>> take this out of the scope of KIP-441 for now. > >> > >>> > >> > >>> > >> > >>> Guozhang > >> > >>> > >> > >>> > >> > >>> On Thu, Aug 8, 2019 at 5:48 PM Sophie Blee-Goldman < > >> > sop...@confluent.io> > >> > >>> wrote: > >> > >>> > >> > >>> > > we may have other ways to not starving the standby tasks, for > >> > >>> example, by > >> > >>> > > using dedicate threads for standby tasks or even consider > having > >> > >>> > *higher> priority for standby than active* so that we always try > >> to > >> > >>> caught > >> > >>> > up standby > >> > >>> > > first, then process active > >> > >>> > > >> > >>> > This is an interesting idea, but seems likely to get in the way > of > >> > the > >> > >>> > original idea of this KIP > >> > >>> > -- if we always process standby tasks first, then if we are > >> assigned > >> > a > >> > >>> new > >> > >>> > standby task we > >> > >>> > will have to wait for it to catch up completely before > processing > >> any > >> > >>> > active tasks! That's > >> > >>> > even worse than the situation this KIP is trying to help with, > >> since > >> > a > >> > >>> new > >> > >>> > standby task has to > >> > >>> > restore from 0 (whereas an active task at least can take over > from > >> > >>> wherever > >> > >>> > the standby was). > >> > >>> > > >> > >>> > During restoration -- while there exist any restoring tasks -- I > >> > think > >> > >>> it's > >> > >>> > reasonable to de-prioritize the > >> > >>> > standby tasks and just process restoring and active tasks so > both > >> can > >> > >>> make > >> > >>> > progress. But we should > >> > >>> > let them catch up afterwards somehow -- maybe we can apply some > >> kind > >> > of > >> > >>> > heuristic, like "if we haven't > >> > >>> > processed standbys for X iterations, or Y milliseconds, do so > >> now." > >> > >>> > > >> > >>> > Actually, it might even be beneficial to avoid processing > >> standbys a > >> > >>> record > >> > >>> > or two at a time and instead > >> > >>> > wait for a large enough batch to build up for the RocksDB > >> > bulk-loading > >> > >>> > benefits. > >> > >>> > > >> > >>> > I think the "use dedicated threads for standby" is the more > >> promising > >> > >>> end > >> > >>> > goal, especially since > >> > >>> > if we split restoration into "restoring tasks" then active and > >> > standbys > >> > >>> > share almost nothing. But > >> > >>> > that seems like follow-up work to the current KIP :) > >> > >>> > > >> > >>> > On Thu, Aug 8, 2019 at 5:31 PM Sophie Blee-Goldman < > >> > >>> sop...@confluent.io> > >> > >>> > wrote: > >> > >>> > > >> > >>> > > Stateful tasks with logging disabled seem to be an interesting > >> edge > >> > >>> case. > >> > >>> > > On the one hand, > >> > >>> > > for balancing purposes they should be considered stateful > since > >> as > >> > >>> > > Guozhang pointed out > >> > >>> > > they are still "heavy" in IO costs. But for "catching up" > >> purposes, > >> > >>> ie > >> > >>> > > when allocating standby > >> > >>> > > tasks that will become active tasks, they should be considered > >> > >>> stateless > >> > >>> > > as there is so > >> > >>> > > meaningful sense of their lag. We should never allocate > standby > >> > >>> tasks for > >> > >>> > > them during the > >> > >>> > > first rebalance, but should ensure they are evenly distributed > >> > across > >> > >>> > > instances. Maybe we > >> > >>> > > should split these into a third category -- after we assign > all > >> > >>> stateful > >> > >>> > > tasks with logging, we > >> > >>> > > then distribute the set of logging-disabled stateful tasks to > >> > improve > >> > >>> > > balance, before lastly > >> > >>> > > distributing stateless tasks? > >> > >>> > > > >> > >>> > > This actually leads into what I was just thinking, which is > >> that we > >> > >>> > really > >> > >>> > > should distinguish the > >> > >>> > > "catch-up" standbys from normal standbys as well as > >> distinguishing > >> > >>> > > actively processing tasks > >> > >>> > > from active tasks that are still in the restore phase. It's > >> > somewhat > >> > >>> > > awkward that today, some > >> > >>> > > active tasks just start processing immediately while others > >> behave > >> > >>> more > >> > >>> > > like standby than active > >> > >>> > > tasks for some time, before switching to real active. They > first > >> > use > >> > >>> the > >> > >>> > > restoreConsumer, then > >> > >>> > > later only the "normal" consumer. > >> > >>> > > > >> > >>> > > However, this restore period is still distinct from normal > >> standbys > >> > >>> in a > >> > >>> > > lot of ways -- the code path > >> > >>> > > for restoring is different than for updating standbys, for > >> example > >> > >>> in how > >> > >>> > > long we block on #poll. > >> > >>> > > So in addition to giving them their own name -- let's go with > >> > >>> restoring > >> > >>> > > task for now -- they really > >> > >>> > > do seem to deserve being their own distinct task. We can > >> optimize > >> > >>> them > >> > >>> > for > >> > >>> > > efficient conversion > >> > >>> > > to active tasks since we know that's what they will be. > >> > >>> > > > >> > >>> > > This resolves some of the awkwardness of dealing with the > >> special > >> > >>> case > >> > >>> > > mentioned above: we > >> > >>> > > find a balanced assignment of stateful and stateless tasks, > and > >> > >>> create > >> > >>> > > restoring tasks as needed. > >> > >>> > > If logging is disabled, no restoring task is created. > >> > >>> > > > >> > >>> > > > >> > >>> > > On Thu, Aug 8, 2019 at 3:44 PM Guozhang Wang < > >> wangg...@gmail.com> > >> > >>> wrote: > >> > >>> > > > >> > >>> > >> Regarding 3) above: I think for active task they should still > >> be > >> > >>> > >> considered > >> > >>> > >> stateful since the processor would still pay IO cost > accessing > >> the > >> > >>> > store, > >> > >>> > >> but they would not have standby tasks? > >> > >>> > >> > >> > >>> > >> On Thu, Aug 8, 2019 at 7:49 AM Bruno Cadonna < > >> br...@confluent.io> > >> > >>> > wrote: > >> > >>> > >> > >> > >>> > >> > Hi, > >> > >>> > >> > > >> > >>> > >> > Thank you for the KIP! > >> > >>> > >> > > >> > >>> > >> > Some questions/comments: > >> > >>> > >> > > >> > >>> > >> > 1. I am wondering if the "stand-by" tasks that catch up > state > >> > >>> before > >> > >>> > >> > the active task is switched deserve its own name in this > KIP > >> and > >> > >>> maybe > >> > >>> > >> > in the code. We have already stated that they are not true > >> > >>> stand-by > >> > >>> > >> > tasks, they are not configured through > >> `num.standby.replicas`, > >> > and > >> > >>> > >> > maybe they have also other properties that distinguish them > >> from > >> > >>> true > >> > >>> > >> > stand-by tasks of which we are not aware yet. For example, > >> they > >> > >>> may be > >> > >>> > >> > prioritized differently than other tasks. Furthermore, the > >> name > >> > >>> > >> > "stand-by" does not really fit with the planned > >> functionality of > >> > >>> those > >> > >>> > >> > tasks. In the following, I will call them false stand-by > >> tasks. > >> > >>> > >> > > >> > >>> > >> > 2. Did you consider to trigger the probing rebalances not > at > >> > >>> regular > >> > >>> > >> > time intervals but when the false stand-by tasks reach an > >> > >>> acceptable > >> > >>> > >> > lag? If you did consider, could you add a paragraph why you > >> > >>> rejected > >> > >>> > >> > this idea to the "Rejected Alternatives" section. > >> > >>> > >> > > >> > >>> > >> > 3. Are tasks that solely contain stores with disabled > logging > >> > >>> > >> > classified as stateful or stateless in the algorithm? I > would > >> > >>> guess > >> > >>> > >> > stateless, although if possible they should be assigned to > >> the > >> > >>> same > >> > >>> > >> > instance they had run before the rebalance. As far as I can > >> see > >> > >>> this > >> > >>> > >> > special case is not handled in the algorithm. > >> > >>> > >> > > >> > >>> > >> > Best, > >> > >>> > >> > Bruno > >> > >>> > >> > > >> > >>> > >> > > >> > >>> > >> > > >> > >>> > >> > On Thu, Aug 8, 2019 at 8:24 AM Guozhang Wang < > >> > wangg...@gmail.com> > >> > >>> > >> wrote: > >> > >>> > >> > > > >> > >>> > >> > > 1. Sounds good, just wanted to clarify; and it may worth > >> > >>> documenting > >> > >>> > >> it > >> > >>> > >> > so > >> > >>> > >> > > that users would not be surprised when monitoring their > >> > >>> footprint. > >> > >>> > >> > > > >> > >>> > >> > > 2. Hmm I see... I think the trade-off can be described as > >> "how > >> > >>> much > >> > >>> > >> > > imbalance would bother you to be willing to pay another > >> > >>> rebalance, > >> > >>> > >> along > >> > >>> > >> > > with potentially more restoration lag", and the current > >> > >>> definition > >> > >>> > of > >> > >>> > >> > > rebalance_factor can be considered as a rough measurement > >> of > >> > >>> that > >> > >>> > >> > > imbalance. Of course one can argue that a finer grained > >> > >>> measurement > >> > >>> > >> could > >> > >>> > >> > > be "resource footprint" like CPU / storage of each > instance > >> > >>> like we > >> > >>> > >> have > >> > >>> > >> > in > >> > >>> > >> > > Kafka broker auto balancing tools, but I'd prefer not > doing > >> > >>> that as > >> > >>> > >> part > >> > >>> > >> > of > >> > >>> > >> > > the library but more as an operational tool in the > future. > >> On > >> > >>> the > >> > >>> > >> other > >> > >>> > >> > > hand, I've seen stateful and stateless tasks having very > >> > >>> different > >> > >>> > >> load, > >> > >>> > >> > > and sometimes the only bottleneck of a Streams app is > just > >> one > >> > >>> > >> stateful > >> > >>> > >> > > sub-topology and whoever gets tasks of that sub-topology > >> > become > >> > >>> > >> hotspot > >> > >>> > >> > > (and that's why our algorithm tries to balance per > >> > sub-topology > >> > >>> as > >> > >>> > >> well), > >> > >>> > >> > > so maybe we can just consider stateful tasks when > >> calculating > >> > >>> this > >> > >>> > >> factor > >> > >>> > >> > > as a very brute force heuristic? > >> > >>> > >> > > > >> > >>> > >> > > 3.a. Thinking about this a bit more, maybe it's better > not > >> try > >> > >>> to > >> > >>> > >> tackle > >> > >>> > >> > an > >> > >>> > >> > > unseen enemy just yet, and observe if it really emerges > >> later, > >> > >>> and > >> > >>> > by > >> > >>> > >> > then > >> > >>> > >> > > we may have other ways to not starving the standby tasks, > >> for > >> > >>> > >> example, by > >> > >>> > >> > > using dedicate threads for standby tasks or even consider > >> > having > >> > >>> > >> higher > >> > >>> > >> > > priority for standby than active so that we always try to > >> > >>> caught up > >> > >>> > >> > standby > >> > >>> > >> > > first, then process active; and if active's lagging > >> compared > >> > to > >> > >>> > >> > > log-end-offset is increasing then we should increase > >> capacity, > >> > >>> etc > >> > >>> > >> etc. > >> > >>> > >> > > > >> > >>> > >> > > 4. Actually with KIP-429 this may not be the case: we may > >> not > >> > >>> call > >> > >>> > >> > > onPartitionsRevoked prior to rebalance any more so would > >> not > >> > >>> transit > >> > >>> > >> > state > >> > >>> > >> > > to PARTITIONS_REVOKED, and hence not cause the state of > the > >> > >>> instance > >> > >>> > >> to > >> > >>> > >> > be > >> > >>> > >> > > REBALANCING. In other words, even if a instance is > >> undergoing > >> > a > >> > >>> > >> rebalance > >> > >>> > >> > > it's state may still be RUNNING and it may still be > >> processing > >> > >>> > >> records at > >> > >>> > >> > > the same time. > >> > >>> > >> > > > >> > >>> > >> > > > >> > >>> > >> > > On Wed, Aug 7, 2019 at 12:14 PM John Roesler < > >> > j...@confluent.io > >> > >>> > > >> > >>> > >> wrote: > >> > >>> > >> > > > >> > >>> > >> > > > Hey Guozhang, > >> > >>> > >> > > > > >> > >>> > >> > > > Thanks for the review! > >> > >>> > >> > > > > >> > >>> > >> > > > 1. Yes, even with `num.standby.replicas := 0`, we will > >> still > >> > >>> > >> > temporarily > >> > >>> > >> > > > allocate standby tasks to accomplish a no-downtime task > >> > >>> migration. > >> > >>> > >> > > > Although, I'd argue that this doesn't really violate > the > >> > >>> config, > >> > >>> > as > >> > >>> > >> the > >> > >>> > >> > > > task isn't a true hot standby. As soon as it catches > up, > >> > we'll > >> > >>> > >> > rebalance > >> > >>> > >> > > > again, that task will become active, and the original > >> > instance > >> > >>> > that > >> > >>> > >> > hosted > >> > >>> > >> > > > the active task will no longer have the task assigned > at > >> > all. > >> > >>> Once > >> > >>> > >> the > >> > >>> > >> > > > stateDirCleaner kicks in, we'll free the disk space > from > >> it, > >> > >>> and > >> > >>> > >> > return to > >> > >>> > >> > > > the steady-state of having just one copy of the task in > >> the > >> > >>> > cluster. > >> > >>> > >> > > > > >> > >>> > >> > > > We can of course do without this, but I feel the > current > >> > >>> proposal > >> > >>> > is > >> > >>> > >> > > > operationally preferable, since it doesn't make > >> configuring > >> > >>> > >> > hot-standbys a > >> > >>> > >> > > > pre-requisite for fast rebalances. > >> > >>> > >> > > > > >> > >>> > >> > > > 2. Yes, I think your interpretation is what we > intended. > >> The > >> > >>> > default > >> > >>> > >> > > > balance_factor would be 1, as it is implicitly today. > >> What > >> > >>> this > >> > >>> > >> does is > >> > >>> > >> > > > allows operators to trade off less balanced assignments > >> > >>> against > >> > >>> > >> fewer > >> > >>> > >> > > > rebalances. If you have lots of space capacity in your > >> > >>> instances, > >> > >>> > >> this > >> > >>> > >> > may > >> > >>> > >> > > > be a perfectly fine tradeoff, and you may prefer for > >> Streams > >> > >>> not > >> > >>> > to > >> > >>> > >> > bother > >> > >>> > >> > > > streaming GBs of data from the broker in pursuit of > >> perfect > >> > >>> > balance. > >> > >>> > >> > Not > >> > >>> > >> > > > married to this configuration, though. It was inspired > by > >> > the > >> > >>> > >> related > >> > >>> > >> > work > >> > >>> > >> > > > research we did. > >> > >>> > >> > > > > >> > >>> > >> > > > 3. I'll take a look > >> > >>> > >> > > > > >> > >>> > >> > > > 3a. I think this is a good idea. I'd classify it as a > >> type > >> > of > >> > >>> grey > >> > >>> > >> > failure > >> > >>> > >> > > > detection. It may make more sense to tackle grey > >> failures as > >> > >>> part > >> > >>> > of > >> > >>> > >> > the > >> > >>> > >> > > > heartbeat protocol (as I POCed here: > >> > >>> > >> > > > https://github.com/apache/kafka/pull/7096/files). > WDYT? > >> > >>> > >> > > > > >> > >>> > >> > > > 4. Good catch! I didn't think about that before. > Looking > >> at > >> > it > >> > >>> > now, > >> > >>> > >> > though, > >> > >>> > >> > > > I wonder if we're actually protected already. The > >> > >>> stateDirCleaner > >> > >>> > >> > thread > >> > >>> > >> > > > only executes if the instance is in RUNNING state, and > >> > KIP-441 > >> > >>> > >> > proposes to > >> > >>> > >> > > > use "probing rebalances" to report task lag. Hence, > >> during > >> > the > >> > >>> > >> window > >> > >>> > >> > > > between when the instance reports a lag and the > assignor > >> > >>> makes a > >> > >>> > >> > decision > >> > >>> > >> > > > about it, the instance should remain in REBALANCING > >> state, > >> > >>> right? > >> > >>> > If > >> > >>> > >> > so, > >> > >>> > >> > > > then this should prevent the race condition. If not, > >> then we > >> > >>> do > >> > >>> > >> indeed > >> > >>> > >> > need > >> > >>> > >> > > > to do something about it. > >> > >>> > >> > > > > >> > >>> > >> > > > 5. Good idea. I think that today, you can only see the > >> > >>> consumer > >> > >>> > lag, > >> > >>> > >> > which > >> > >>> > >> > > > is a poor substitute. I'll add some metrics to the > >> proposal. > >> > >>> > >> > > > > >> > >>> > >> > > > Thanks again for the comments! > >> > >>> > >> > > > -John > >> > >>> > >> > > > > >> > >>> > >> > > > On Tue, Aug 6, 2019 at 4:27 PM Guozhang Wang < > >> > >>> wangg...@gmail.com> > >> > >>> > >> > wrote: > >> > >>> > >> > > > > >> > >>> > >> > > > > Hello Sophie, > >> > >>> > >> > > > > > >> > >>> > >> > > > > Thanks for the proposed KIP. I left some comments on > >> the > >> > >>> wiki > >> > >>> > >> itself, > >> > >>> > >> > > > and I > >> > >>> > >> > > > > think I'm still not very clear on a couple or those: > >> > >>> > >> > > > > > >> > >>> > >> > > > > 1. With this proposal, does that mean with > >> > >>> num.standby.replicas > >> > >>> > == > >> > >>> > >> > 0, we > >> > >>> > >> > > > > may sometimes still have some standby tasks which may > >> > >>> violate > >> > >>> > the > >> > >>> > >> > config? > >> > >>> > >> > > > > > >> > >>> > >> > > > > 2. I think I understand the rationale to consider > lags > >> > that > >> > >>> is > >> > >>> > >> below > >> > >>> > >> > the > >> > >>> > >> > > > > specified threshold to be equal, rather than still > >> > >>> considering > >> > >>> > >> 5000 > >> > >>> > >> > is > >> > >>> > >> > > > > better than 5001 -- we do not want to "over-optimize" > >> and > >> > >>> > >> potentially > >> > >>> > >> > > > falls > >> > >>> > >> > > > > into endless rebalances back and forth. > >> > >>> > >> > > > > > >> > >>> > >> > > > > But I'm not clear about the rationale of the second > >> > >>> parameter of > >> > >>> > >> > > > > > >> > >>> constrainedBalancedAssignment(StatefulTasksToRankedCandidates, > >> > >>> > >> > > > > balance_factor): > >> > >>> > >> > > > > > >> > >>> > >> > > > > Does that mean, e.g. with balance_factor of 3, we'd > >> > >>> consider two > >> > >>> > >> > > > > assignments one resulting balance_factor 0 and one > >> > resulting > >> > >>> > >> > > > balance_factor > >> > >>> > >> > > > > 3 to be equally optimized assignment and therefore > may > >> > "stop > >> > >>> > >> early"? > >> > >>> > >> > This > >> > >>> > >> > > > > was not very convincing to me :P > >> > >>> > >> > > > > > >> > >>> > >> > > > > 3. There are a couple of minor comments about the > >> > algorithm > >> > >>> > >> itself, > >> > >>> > >> > left > >> > >>> > >> > > > on > >> > >>> > >> > > > > the wiki page since it needs to refer to the exact > line > >> > and > >> > >>> > better > >> > >>> > >> > > > > displayed there. > >> > >>> > >> > > > > > >> > >>> > >> > > > > 3.a Another wild thought about the threshold itself: > >> today > >> > >>> the > >> > >>> > >> > assignment > >> > >>> > >> > > > > itself is memoryless, so we would not know if the > >> reported > >> > >>> > >> `TaskLag` > >> > >>> > >> > > > itself > >> > >>> > >> > > > > is increasing or decreasing even if the current value > >> is > >> > >>> under > >> > >>> > the > >> > >>> > >> > > > > threshold. I wonder if it worthy to make it a bit > more > >> > >>> > >> complicated to > >> > >>> > >> > > > track > >> > >>> > >> > > > > task lag trend at the assignor? Practically it may > not > >> be > >> > >>> very > >> > >>> > >> > uncommon > >> > >>> > >> > > > > that stand-by tasks are not keeping up due to the > fact > >> > that > >> > >>> > other > >> > >>> > >> > active > >> > >>> > >> > > > > tasks hosted on the same thread is starving the > standby > >> > >>> tasks. > >> > >>> > >> > > > > > >> > >>> > >> > > > > 4. There's a potential race condition risk when > >> reporting > >> > >>> > >> `TaskLags` > >> > >>> > >> > in > >> > >>> > >> > > > the > >> > >>> > >> > > > > subscription: right after reporting it to the leader, > >> the > >> > >>> > cleanup > >> > >>> > >> > thread > >> > >>> > >> > > > > kicks in and deletes the state directory. If the task > >> was > >> > >>> > assigned > >> > >>> > >> > to the > >> > >>> > >> > > > > host it would cause it to restore from beginning and > >> > >>> effectively > >> > >>> > >> > make the > >> > >>> > >> > > > > seemingly optimized assignment very sub-optimal. > >> > >>> > >> > > > > > >> > >>> > >> > > > > To be on the safer side we should consider either > prune > >> > out > >> > >>> > those > >> > >>> > >> > tasks > >> > >>> > >> > > > > that are "close to be cleaned up" in the > subscription, > >> or > >> > we > >> > >>> > >> should > >> > >>> > >> > delay > >> > >>> > >> > > > > the cleanup right after we've included them in the > >> > >>> subscription > >> > >>> > in > >> > >>> > >> > case > >> > >>> > >> > > > > they are been selected as assigned tasks by the > >> assignor. > >> > >>> > >> > > > > > >> > >>> > >> > > > > 5. This is a meta comment: I think it would be > helpful > >> to > >> > >>> add > >> > >>> > some > >> > >>> > >> > user > >> > >>> > >> > > > > visibility on the standby tasks lagging as well, via > >> > >>> metrics for > >> > >>> > >> > example. > >> > >>> > >> > > > > Today it is hard for us to observe how far are our > >> current > >> > >>> > standby > >> > >>> > >> > tasks > >> > >>> > >> > > > > compared to the active tasks and whether that lag is > >> being > >> > >>> > >> > increasing or > >> > >>> > >> > > > > decreasing. As a follow-up task, for example, the > >> > rebalance > >> > >>> > should > >> > >>> > >> > also > >> > >>> > >> > > > be > >> > >>> > >> > > > > triggered if we realize that some standby task's lag > is > >> > >>> > increasing > >> > >>> > >> > > > > indefinitely means that it cannot keep up (which is > >> > another > >> > >>> > >> indicator > >> > >>> > >> > > > > either you need to add more resources with the > >> > num.standbys > >> > >>> or > >> > >>> > >> your > >> > >>> > >> > are > >> > >>> > >> > > > > still not balanced enough). > >> > >>> > >> > > > > > >> > >>> > >> > > > > > >> > >>> > >> > > > > On Tue, Aug 6, 2019 at 1:32 PM Sophie Blee-Goldman < > >> > >>> > >> > sop...@confluent.io> > >> > >>> > >> > > > > wrote: > >> > >>> > >> > > > > > >> > >>> > >> > > > > > Hey all, > >> > >>> > >> > > > > > > >> > >>> > >> > > > > > I'd like to kick off discussion on KIP-441, aimed > at > >> the > >> > >>> long > >> > >>> > >> > restore > >> > >>> > >> > > > > times > >> > >>> > >> > > > > > in Streams during which further active processing > >> and IQ > >> > >>> are > >> > >>> > >> > blocked. > >> > >>> > >> > > > > > Please give it a read and let us know your thoughts > >> > >>> > >> > > > > > > >> > >>> > >> > > > > > > >> > >>> > >> > > > > > > >> > >>> > >> > > > > > >> > >>> > >> > > > > >> > >>> > >> > > >> > >>> > >> > >> > >>> > > >> > >>> > >> > > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-441:+Smooth+Scaling+Out+for+Kafka+Streams > >> > >>> > >> > > > > > > >> > >>> > >> > > > > > Cheers, > >> > >>> > >> > > > > > Sophie > >> > >>> > >> > > > > > > >> > >>> > >> > > > > > >> > >>> > >> > > > > > >> > >>> > >> > > > > -- > >> > >>> > >> > > > > -- Guozhang > >> > >>> > >> > > > > > >> > >>> > >> > > > > >> > >>> > >> > > > >> > >>> > >> > > > >> > >>> > >> > > -- > >> > >>> > >> > > -- Guozhang > >> > >>> > >> > > >> > >>> > >> > >> > >>> > >> > >> > >>> > >> -- > >> > >>> > >> -- Guozhang > >> > >>> > >> > >> > >>> > > > >> > >>> > > >> > >>> > >> > >>> > >> > >>> -- > >> > >>> -- Guozhang > >> > >>> > >> > >> > >> > > >> > >> > >> -- > >> -- Guozhang > >> > > > -- -- Guozhang