Aha! I did miss that detail in your email before. Sorry for my density. It does seem like, if it turns out to be a problem, it would be pretty straightforward to add your proposal in. It wouldn't even require a version bump, because the wire protocol and the assignment algorithm would be mutually intelligible with and without this tweak.
Thanks for the thorough review! I'll start the vote shortly. Thanks, -John On Thu, Sep 5, 2019 at 11:27 AM Guozhang Wang <wangg...@gmail.com> wrote: > Hello John, > > What I was thinking is that for other hosts of non-logged store only tasks, > they will be ranked with a lag of "1" (as I tried to illustrate with > "StatefulTasksToRankedCandidates[task][1] > := instance" in the previous email), so that the current host would be > ranked first, while others would be ranked equally second. So the scenario > would be: > > 1. For non-logged store only tasks, the current owner is ranked with lag > "0", all others ranked with lag "1". > 2. For mixed non-logged and logged store tasks, the current owned is ranked > with lag "0", all others ranked by the summed lag of logged stores. And of > course, for those whose summed lag is below the threshold, they will be > ranked with lag "0" as well. > > But I agree that the case 1) requires special handling logic indeed: you > need to first tell which tasks are non-logged store only tasks, and then > add a lag "1" for all other instances. Since non-logged store only tasks > are not very common so far I think I agree that it may not worth the > special handling effort anyways, and if people complain about the > non-stickiness we can always come back and revisit this policy anyways, so > I'm fine with the current proposal as-is as well. > > Just made another pass on the wiki page and it LGTM to me now, I'm +1 on > the KIP. > > > Guozhang > > > > > On Wed, Sep 4, 2019 at 8:19 PM John Roesler <j...@confluent.io> wrote: > > > I see; thanks for the clarification, Guozhang. > > > > If the prior owner of a non-logged, stateful task always reports 0, and > all > > other instances report nothing, then it seems like we need a special case > > in the assignor to handle assigning these. I.e., the strategy of creating > > "movement" standbys and waiting for them to be equally caught up to the > > leader before moving the task with zero downtime doesn't work. Since no > > standby tasks are possible for non-logged tasks, the other instances > would > > never be able to "catch up", and the algorithm would be doomed to just > > continue pinning the task to its current instance, and would potentially > > never be able to balance the cluster. > > > > Unless, of course, we put in a special case that says to keep it assigned > > for a while and then move it. > > > > But in this case, how do we decide when to move it; it seems like any > time > > is as good as any other. And if this is the case, then we might as well > > just move it right away, which is what the KIP currently proposes. > > > > Also, it seems like this creates an asymmetry with the handling of > > non-logged stores in mixed logged/non-logged tasks (case (1) above). In > > that case, we agree that the algorithm will make assignments without > regard > > to the non-logged stores at all, and would freely move the task between > two > > instances that are "equally" caught up on the logged stores. But if a > task > > contains only non-logged stores, it sounds like we would not have this > > freedom of movement, but would just keep giving it back to the instance > > that previously owned it? > > > > Regarding the latter comment, thanks for pointing out my oversight. I've > > added the section "Iterative Balancing Assignments" to define this > > behavior: > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP-441:SmoothScalingOutforKafkaStreams-IterativeBalancingAssignments > > . > > > > Thanks! > > -John > > > > On Wed, Sep 4, 2019 at 5:58 PM Guozhang Wang <wangg...@gmail.com> wrote: > > > > > Hi John, > > > > > > I think I may mis-communicate my ideas a bit, what I meant is simply > for > > > case 1), let the active tasks reporting a lag of 0 instead of not > > reporting > > > any lags at all; in addition, for other hosts not hosting this tasks, > and > > > hence we do not know the offset lag at all we set the > > > "StatefulTasksToRankedCandidates[task][1] > > > := instance" just to make sure the current host is ranked the first > while > > > all others are ranked equally after it. In this case we would still > favor > > > stickiness for not moving such tasks out of its current host unless it > > > violates balance, in which case we are free to move it to any other > > hosts. > > > Does that make sense? > > > > > > Also I just realized that in the update wiki page the algorithm section > > of > > > iteratively assigning / moving tasks based on > > > StatefulTasksToRankedCandidates until convergence was missing (it was > > there > > > in the previous version), could you add it back? > > > > > > Guozhang > > > > > > > > > On Wed, Sep 4, 2019 at 2:59 PM John Roesler <j...@confluent.io> wrote: > > > > > > > Thanks for the reply, Guozhang, > > > > > > > > I'll start by re-stating your cases, just to make sure we're on the > > same > > > > page... > > > > > > > > 1) The task is stateful, and all its stores are non-logged. For this > > case > > > > under the proposal, we would not have any standbys and the active > > version > > > > would actually not report any lag at all (not a lag of 0). Thus, all > > > > instances would be considered equal when it comes to assignment, > > although > > > > the assignment logic would know that the task is "heavy" because it > has > > > > those non-logged stores and factor that in to the overall cluster > > > balance. > > > > > > > > 2) The task is stateful and maybe has one logged and one non-logged > > > store. > > > > As you say, in this case, the non-logged store would not contribute > at > > > all > > > > to anyone's reported lag. The active task would report a lag of 0, > and > > > the > > > > standbys would report their lag on the logged store. The assignment > > logic > > > > would take these reported lags into account. > > > > > > > > It sounds like there might be some dissonance on point (1). It sounds > > > like > > > > you're saying we should try to assign non-logged stateful tasks back > to > > > an > > > > instance that has previously hosted it, or maybe we should assign it > > back > > > > to the most recent instance that hosted it, and then only reassign it > > if > > > > the movement would affect balance. I guess I'm not opposed to this, > > but I > > > > also don't really see the advantage. The fact is that we'd still wind > > up > > > > migrating it much of the time, so no one could depend on it not > getting > > > > migrated, and at the same time we're paying a significant complexity > > cost > > > > in the assignor to support this case. Plus, we need to encode the > > > previous > > > > owner of the task somehow in the wire protocol, which means we pay a > > > > payload-size penalty to support it as well. > > > > > > > > On the other hand, we should make our assignment as stable as > possible > > in > > > > the implementation to avoid pointless "swapping" of logged and > > non-logged > > > > stateful tasks, as well as stateless tasks when it doesn't change the > > > > balance at all to do so. It seems like this should accomplish the > same > > > > spiritual goal with no special cases. > > > > > > > > WDYT? > > > > -John > > > > > > > > On Wed, Sep 4, 2019 at 2:26 PM Guozhang Wang <wangg...@gmail.com> > > wrote: > > > > > > > > > Hello John, > > > > > > > > > > Your reasoning about non-logged state store looks good to me. But I > > > think > > > > > it worth considering two cases differently: 1) a task's all state > > > stores > > > > > are non-logged, 2) a task has non-logged state store but also have > > > other > > > > > logged state store. > > > > > > > > > > For 1), we should not have any standbys for such tasks at all, but > it > > > is > > > > > not the same as a stateless task, because the only active task > could > > > > report > > > > > a lag of "0" while all others should logically report the lag as > > > infinity > > > > > (of course, they would not encode such value). > > > > > > > > > > For 2), it is sort of the same to a normal case: active task > > reporting > > > a > > > > > lag of 0 while standby task reporting a lag summing all other > logged > > > > state > > > > > stores, meaning that the non-logged state store does not matter in > > our > > > > > assignment semantics. > > > > > > > > > > The rationale I had is to effectively favor stickiness for those > > tasks > > > in > > > > > case 1) than arbitrarily reassign them even though it is true that > > for > > > a > > > > > non-logged store there's no durability and hence restoration > > guarantees > > > > > anyways from Streams. Then algorithmically we may consider > assigning > > > > tasks > > > > > with descending order of the number of instances reporting lags for > > it, > > > > > then for such tasks there's always one candidate reporting a lag of > > 0, > > > > and > > > > > assigning them to any instance does not actually change the > cumulated > > > > total > > > > > lag either. At that time we can decide "if it is still within load > > > > balance > > > > > factor, then let's respect stickiness, otherwise it's free to > move". > > > > WDYT? > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > On Wed, Sep 4, 2019 at 8:30 AM John Roesler <j...@confluent.io> > > wrote: > > > > > > > > > > > Hey Bruno, > > > > > > > > > > > > Thanks for taking another look. Some quick responses: > > > > > > > > > > > > 1) It just means the number of offsets in the topic. E.g., the > LSO > > is > > > > > 100, > > > > > > but the first offset is 40 due to retention, so there are 60 > > offsets > > > in > > > > > the > > > > > > topic. Further, the lag on that topic would be considered to be > 60 > > > for > > > > > any > > > > > > task that hadn't previously done any work on it. > > > > > > > > > > > > 2) This is undecidable in general. I.e., there's no way we can > know > > > > > whether > > > > > > the store is remote or not, and hence whether we can freely > assign > > it > > > > to > > > > > > another instance, or whether we have to keep it on the same > > instance. > > > > > > However, there are a couple of reasons to go ahead and assume we > > have > > > > the > > > > > > freedom to move such tasks. > > > > > > * We know that nothing can prevent the loss of an instance in a > > > cluster > > > > > > (I.e., this is true of all cloud environments, as well as any > > managed > > > > > > virtualized cluster like mesos or kubernetes), so any Streams > > program > > > > > that > > > > > > makes use of non-remote, non-logged state is doomed to lose its > > state > > > > > when > > > > > > it loses an instance. > > > > > > * If we take on a restriction that we cannot move such state > > between > > > > > > instances, we'd become overconstrained very quickly. Effectively, > > if > > > > you > > > > > > made use of non-logged stores, and we didn't assume freedom of > > > > movement, > > > > > > then we couldn't make use of any new instances in your cluster. > > > > > > * On the other hand, if we optimistically assume we can't move > > state, > > > > but > > > > > > only reassign it when we lose an instance, then we're supporting > > > > > > non-deterministic logic, because the program would produce > > different > > > > > > results, depending on whether you lost a node during the > execution > > or > > > > > not. > > > > > > 2b) That last point goes along with your side note. I'm not sure > if > > > we > > > > > > should bother dropping such state on every reassignment, though. > It > > > > seems > > > > > > to be undefined territory enough that we can just do the simplest > > > thing > > > > > and > > > > > > assume people have made their own (external) provisions for > > > durability. > > > > > > I.e., when we say "non-logged", we mean that it doesn't make use > of > > > > _our_ > > > > > > durability mechanism. I'm arguing that the only sane assumption > is > > > that > > > > > > such folks have opted to use their own durability measures, and > we > > > > should > > > > > > just assume it works with no special considerations in the > > assignment > > > > > > algorithm. > > > > > > > > > > > > 3) Good catch! I've fixed it. > > > > > > > > > > > > Thanks again! > > > > > > -John > > > > > > > > > > > > On Wed, Sep 4, 2019 at 6:09 AM Bruno Cadonna <br...@confluent.io > > > > > > wrote: > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > 1) What do you mean with "full set of offsets in the topic"? Is > > > this > > > > > > > the sum of all offsets of the changelog partitions of the task? > > > > > > > > > > > > > > 2) I am not sure whether non-logged stateful tasks should be > > > > > > > effectively treated as stateless tasks during assignment. First > > we > > > > > > > need to decide whether a non-logged stateful task should > > preferably > > > > be > > > > > > > assigned to the same instance on which it just run in order to > > > > > > > continue to use its state or not. > > > > > > > > > > > > > > 3) In the example, you define stand-by tasks {S1, S2, ...} but > > > never > > > > > > > use them, because below you use a dedicated row for stand-by > > tasks. > > > > > > > > > > > > > > As a side note to 2) since it is not directly related to this > > KIP: > > > We > > > > > > > should decide if we want to avoid the possible non-determinism > > > > > > > introduced by non-logged stores or not. That is, if an instance > > > hosts > > > > > > > a task with non-logged stores then we can have two cases after > > the > > > > > > > next rebalance: a) the task stays on the same instance and > > > continues > > > > > > > to use the same state store as used so far or b) the task is > > > assigned > > > > > > > to another instance and it starts an empty state store. The > > > produced > > > > > > > results for these two cases might differ. To avoid the > > > > nondeterminism, > > > > > > > non-logged state stores would need to be wiped out before > > > assignment. > > > > > > > Then the question arises, how the removal of non-logged state > > > stores > > > > > > > before assignment would affect backward-compatibility. > > > > > > > > > > > > > > Best, > > > > > > > Bruno > > > > > > > > > > > > > > On Wed, Aug 21, 2019 at 11:40 PM John Roesler < > j...@confluent.io > > > > > > > > wrote: > > > > > > > > > > > > > > > > Hi Guozhang, > > > > > > > > > > > > > > > > > My impression from your previous email is that inside the > > > > algorithm > > > > > > > when > > > > > > > > we > > > > > > > > are "filling" them to instances some deterministic logic > would > > be > > > > > used > > > > > > to > > > > > > > > avoid the above case, is that correct? > > > > > > > > > > > > > > > > Yes, that was my plan, but I didn't formalize it. There was a > > > > > > requirement > > > > > > > > that the assignment algorithm must not produce a new > assignment > > > if > > > > > the > > > > > > > > current assignment is already balanced, so at the least, any > > > > > thrashing > > > > > > > > would be restricted to the "balancing" phase while tasks are > > > moving > > > > > > > around > > > > > > > > the cluster. > > > > > > > > > > > > > > > > Anyway, I think it would be good to say that we'll "try to" > > > produce > > > > > > > stable > > > > > > > > assignments, so I've added a "should" clause to the > assignment > > > > spec: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP-441:SmoothScalingOutforKafkaStreams-AssignmentAlgorithm > > > > > > > > > > > > > > > > For example, we would sort the stateless tasks and available > > > > > instances > > > > > > > > before assigning them, so that the stateless task assignment > > > would > > > > > > mostly > > > > > > > > stay stable between assignments, modulo the compute capacity > of > > > the > > > > > > > > instances changing a little as active stateful tasks get > > assigned > > > > in > > > > > > more > > > > > > > > balanced ways. > > > > > > > > > > > > > > > > Thanks, > > > > > > > > -John > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Aug 21, 2019 at 1:55 PM Guozhang Wang < > > > wangg...@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > > > > > Hello John, > > > > > > > > > > > > > > > > > > That sounds reasonable. Just double checked the code that > > with > > > > > > logging > > > > > > > > > disabled the corresponding checkpoint file would not > contain > > > any > > > > > > > values, > > > > > > > > > just like a stateless task. So I think treating them > > logically > > > > the > > > > > > > same is > > > > > > > > > fine. > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Aug 21, 2019 at 11:41 AM John Roesler < > > > j...@confluent.io > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hi again, Guozhang, > > > > > > > > > > > > > > > > > > > > While writing up the section on stateless tasks ( > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP-441:SmoothScalingOutforKafkaStreams-Statelesstasks > > > > > > > > > > ), > > > > > > > > > > I reconsidered whether stateful, but non-logged, tasks > > should > > > > > > > actually > > > > > > > > > > report a lag of zero, versus not reporting any lag. By > the > > > > > > > definition of > > > > > > > > > > the "StatefulTasksToRankedCandidates" function, the > leader > > > > would > > > > > > > compute > > > > > > > > > a > > > > > > > > > > lag of zero for these tasks anyway. > > > > > > > > > > > > > > > > > > > > Therefore, I think the same reasoning that I supplied you > > for > > > > > > > stateless > > > > > > > > > > tasks applies, since the member and leader will agree on > a > > > lag > > > > of > > > > > > > zero > > > > > > > > > > anyway, we can avoid adding them to the "Task Lags" map, > > and > > > > save > > > > > > > some > > > > > > > > > > bytes in the JoinGroup request. This would be especially > > > > > beneficial > > > > > > > in an > > > > > > > > > > application that uses remote stores for _all_ its state > > > stores, > > > > > it > > > > > > > would > > > > > > > > > > have an extremely lightweight JoinGroup request, with no > > task > > > > > lags > > > > > > at > > > > > > > > > all. > > > > > > > > > > > > > > > > > > > > WDYT? > > > > > > > > > > -John > > > > > > > > > > > > > > > > > > > > On Wed, Aug 21, 2019 at 1:17 PM John Roesler < > > > > j...@confluent.io> > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Thanks, Guozhang. > > > > > > > > > > > > > > > > > > > > > > (Side note: I noticed on another pass over the > discussion > > > > that > > > > > > I'd > > > > > > > > > missed > > > > > > > > > > > addressing your comment about the potential race > > condition > > > > > > between > > > > > > > > > state > > > > > > > > > > > cleanup and lag-based assignment. I've added a solution > > to > > > > the > > > > > > > > > proposal: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP-441:SmoothScalingOutforKafkaStreams-Racebetweenassignmentandstatecleanup > > > > > > > > > > > ) > > > > > > > > > > > > > > > > > > > > > > In the JoinGroup (SubscriptionInfo) metadata, stateless > > > tasks > > > > > are > > > > > > > not > > > > > > > > > > > represented at all. This should save us some bytes in > the > > > > > request > > > > > > > > > > metadata. > > > > > > > > > > > If we treated them like non-logged stateful tasks and > > > > reported > > > > > a > > > > > > > lag of > > > > > > > > > > 0, > > > > > > > > > > > the only difference is that the assignor would be able > to > > > > tell > > > > > > > which > > > > > > > > > > > members previously hosted that stateless task. > > > > > > > > > > > > > > > > > > > > > > I'd like to make a simplifying assumption that > stateless > > > > tasks > > > > > > can > > > > > > > just > > > > > > > > > > be > > > > > > > > > > > freely reassigned with no regard to stickiness at all, > > > > without > > > > > > > > > impacting > > > > > > > > > > > performance. This is almost true. In fact, while > > assigned a > > > > > > > stateless > > > > > > > > > > task, > > > > > > > > > > > a member fetches batches of records from the broker, so > > if > > > we > > > > > > move > > > > > > > the > > > > > > > > > > > stateless task assignment, this buffered input is > wasted > > > and > > > > > just > > > > > > > gets > > > > > > > > > > > dropped. > > > > > > > > > > > > > > > > > > > > > > However, we won't be moving the stateless tasks around > > all > > > > the > > > > > > time > > > > > > > > > (just > > > > > > > > > > > during rebalances), and we have the requirement that > the > > > > > > assigment > > > > > > > > > > > algorithm must stabilize to guard against perpetually > > > > > shuffling a > > > > > > > > > > stateless > > > > > > > > > > > task from one node to another. So, my hope is that this > > > small > > > > > > > amount of > > > > > > > > > > > inefficiency would not be a performance-dominating > > factor. > > > In > > > > > > > exchange, > > > > > > > > > > we > > > > > > > > > > > gain the opportunity for the assignment algorithm to > use > > > the > > > > > > > stateless > > > > > > > > > > > tasks as "filler" during unbalanced assignments. For > > > example, > > > > > if > > > > > > > there > > > > > > > > > > is a > > > > > > > > > > > node that is just warming up with several standby > tasks, > > > > maybe > > > > > > the > > > > > > > > > > > assignment can give more stateless tasks to that node > to > > > > > balance > > > > > > > the > > > > > > > > > > > computational load across the cluster. > > > > > > > > > > > > > > > > > > > > > > It's worth noting that such an assignment would still > not > > > be > > > > > > > considered > > > > > > > > > > > "balanced", so the ultimately balanced final state of > the > > > > > > > assignment > > > > > > > > > > (after > > > > > > > > > > > task movements) would still have the desired property > > that > > > > each > > > > > > > > > stateful > > > > > > > > > > > and stateless task is evenly spread across the cluster. > > > > > > > > > > > > > > > > > > > > > > Does that seem reasonable? > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > -John > > > > > > > > > > > > > > > > > > > > > > On Wed, Aug 21, 2019 at 11:22 AM Guozhang Wang < > > > > > > wangg...@gmail.com > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > >> Hello John, > > > > > > > > > > >> > > > > > > > > > > >> I've made another pass on the wiki page again, overall > > > LGTM. > > > > > One > > > > > > > meta > > > > > > > > > > >> comment about the "stateless" tasks: how do we > represent > > > > them > > > > > in > > > > > > > the > > > > > > > > > > >> metadata? Are they just treated as stateful tasks with > > > > logging > > > > > > > > > disabled, > > > > > > > > > > >> or > > > > > > > > > > >> are specially handled? It is not very clear in the > > > > > description. > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> Guozhang > > > > > > > > > > >> > > > > > > > > > > >> On Wed, Aug 21, 2019 at 8:43 AM John Roesler < > > > > > j...@confluent.io > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > >> > > > > > > > > > > >> > I have also specifically called out that the > > assignment > > > > must > > > > > > > achieve > > > > > > > > > > >> both > > > > > > > > > > >> > "instance" and "task" balance: > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP-441:SmoothScalingOutforKafkaStreams-Defining%22balance%22 > > > > > > > > > > >> > > > > > > > > > > > >> > I've also addressed the problem of state stores with > > > > logging > > > > > > > > > disabled: > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP-441:SmoothScalingOutforKafkaStreams-Statewithloggingdisabled > > > > > > > > > > >> > > > > > > > > > > > >> > I believe this addresses all the concerns that have > > been > > > > > > raised > > > > > > > to > > > > > > > > > > date. > > > > > > > > > > >> > Apologies if I've overlooked one of your concerns. > > > > > > > > > > >> > > > > > > > > > > > >> > Please give the KIP another read and let me know of > > any > > > > > > further > > > > > > > > > > >> thoughts! > > > > > > > > > > >> > Hopefully, we can start the voting on this KIP by > the > > > end > > > > of > > > > > > the > > > > > > > > > week. > > > > > > > > > > >> > > > > > > > > > > > >> > Thanks, > > > > > > > > > > >> > -John > > > > > > > > > > >> > > > > > > > > > > > >> > On Tue, Aug 20, 2019 at 5:16 PM John Roesler < > > > > > > j...@confluent.io > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > >> > > > > > > > > > > > >> > > In response to Bruno's concern #2, I've also added > > > that > > > > > > > section to > > > > > > > > > > the > > > > > > > > > > >> > > "Rejected Alternatives" section. > > > > > > > > > > >> > > > > > > > > > > > > >> > > Additionally, after reviewing some other > assignment > > > > > papers, > > > > > > > I've > > > > > > > > > > >> > developed > > > > > > > > > > >> > > the concern that specifying which "phases" the > > > > assignment > > > > > > > > > algorithm > > > > > > > > > > >> > should > > > > > > > > > > >> > > have, or indeed the logic of it at all, might be a > > > > mistake > > > > > > > that > > > > > > > > > > >> > > over-constrains our ability to write an optimal > > > > algorithm. > > > > > > > > > > Therefore, > > > > > > > > > > >> > I've > > > > > > > > > > >> > > also refactored the KIP to just describe the > > protocol, > > > > and > > > > > > > specify > > > > > > > > > > the > > > > > > > > > > >> > > requirements for the assignment algorithm, but not > > its > > > > > exact > > > > > > > > > > behavior > > > > > > > > > > >> at > > > > > > > > > > >> > > all. > > > > > > > > > > >> > > > > > > > > > > > > >> > > Thanks, > > > > > > > > > > >> > > -John > > > > > > > > > > >> > > > > > > > > > > > > >> > > On Tue, Aug 20, 2019 at 5:13 PM John Roesler < > > > > > > > j...@confluent.io> > > > > > > > > > > >> wrote: > > > > > > > > > > >> > > > > > > > > > > > > >> > >> Hi All, > > > > > > > > > > >> > >> > > > > > > > > > > >> > >> Thanks for the discussion. I've been considering > > the > > > > idea > > > > > > of > > > > > > > > > giving > > > > > > > > > > >> the > > > > > > > > > > >> > >> "catching up" tasks a different name/role. I was > in > > > > favor > > > > > > > > > > initially, > > > > > > > > > > >> but > > > > > > > > > > >> > >> after working though some details, I think it > > causes > > > > some > > > > > > > > > problems, > > > > > > > > > > >> > which > > > > > > > > > > >> > >> I've written up in the "rejected alternatives" > part > > > of > > > > > the > > > > > > > KIP: > > > > > > > > > > >> > >> > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP-441:SmoothScalingOutforKafkaStreams-Addinganewkindoftask(%22moving%22,%22recovering%22,%22learner%22)fortaskmovements > > > > > > > > > > >> > >> > > > > > > > > > > >> > >> Please give it a read and let me know what you > > think. > > > > > > > > > > >> > >> > > > > > > > > > > >> > >> Thanks, > > > > > > > > > > >> > >> -John > > > > > > > > > > >> > >> > > > > > > > > > > >> > >> On Thu, Aug 8, 2019 at 7:57 PM Guozhang Wang < > > > > > > > wangg...@gmail.com > > > > > > > > > > > > > > > > > > > > >> > wrote: > > > > > > > > > > >> > >> > > > > > > > > > > >> > >>> I think I agree with you Sophie. My gut feeling > is > > > > that > > > > > 1) > > > > > > > it > > > > > > > > > > should > > > > > > > > > > >> > not > > > > > > > > > > >> > >>> be > > > > > > > > > > >> > >>> the major concern in assignor's algorithm for > > > standby > > > > > > tasks > > > > > > > not > > > > > > > > > > >> > catching > > > > > > > > > > >> > >>> up, but rather be tackled in different modules, > > and > > > > 2) a > > > > > > > lot of > > > > > > > > > > >> > >>> optimization can be down at the stream thread > > > itself, > > > > > like > > > > > > > > > > dedicated > > > > > > > > > > >> > >>> threading and larger batching, or even > complicated > > > > > > > scheduling > > > > > > > > > > >> > mechanisms > > > > > > > > > > >> > >>> between running, restoring and standby tasks. In > > > > > anyways, > > > > > > I > > > > > > > > > think > > > > > > > > > > we > > > > > > > > > > >> > can > > > > > > > > > > >> > >>> take this out of the scope of KIP-441 for now. > > > > > > > > > > >> > >>> > > > > > > > > > > >> > >>> > > > > > > > > > > >> > >>> Guozhang > > > > > > > > > > >> > >>> > > > > > > > > > > >> > >>> > > > > > > > > > > >> > >>> On Thu, Aug 8, 2019 at 5:48 PM Sophie > > Blee-Goldman < > > > > > > > > > > >> > sop...@confluent.io> > > > > > > > > > > >> > >>> wrote: > > > > > > > > > > >> > >>> > > > > > > > > > > >> > >>> > > we may have other ways to not starving the > > > standby > > > > > > > tasks, > > > > > > > > > for > > > > > > > > > > >> > >>> example, by > > > > > > > > > > >> > >>> > > using dedicate threads for standby tasks or > > even > > > > > > > consider > > > > > > > > > > having > > > > > > > > > > >> > >>> > *higher> priority for standby than active* so > > that > > > > we > > > > > > > always > > > > > > > > > try > > > > > > > > > > >> to > > > > > > > > > > >> > >>> caught > > > > > > > > > > >> > >>> > up standby > > > > > > > > > > >> > >>> > > first, then process active > > > > > > > > > > >> > >>> > > > > > > > > > > > >> > >>> > This is an interesting idea, but seems likely > to > > > get > > > > > in > > > > > > > the > > > > > > > > > way > > > > > > > > > > of > > > > > > > > > > >> > the > > > > > > > > > > >> > >>> > original idea of this KIP > > > > > > > > > > >> > >>> > -- if we always process standby tasks first, > > then > > > if > > > > > we > > > > > > > are > > > > > > > > > > >> assigned > > > > > > > > > > >> > a > > > > > > > > > > >> > >>> new > > > > > > > > > > >> > >>> > standby task we > > > > > > > > > > >> > >>> > will have to wait for it to catch up > completely > > > > before > > > > > > > > > > processing > > > > > > > > > > >> any > > > > > > > > > > >> > >>> > active tasks! That's > > > > > > > > > > >> > >>> > even worse than the situation this KIP is > trying > > > to > > > > > help > > > > > > > with, > > > > > > > > > > >> since > > > > > > > > > > >> > a > > > > > > > > > > >> > >>> new > > > > > > > > > > >> > >>> > standby task has to > > > > > > > > > > >> > >>> > restore from 0 (whereas an active task at > least > > > can > > > > > take > > > > > > > over > > > > > > > > > > from > > > > > > > > > > >> > >>> wherever > > > > > > > > > > >> > >>> > the standby was). > > > > > > > > > > >> > >>> > > > > > > > > > > > >> > >>> > During restoration -- while there exist any > > > > restoring > > > > > > > tasks > > > > > > > > > -- I > > > > > > > > > > >> > think > > > > > > > > > > >> > >>> it's > > > > > > > > > > >> > >>> > reasonable to de-prioritize the > > > > > > > > > > >> > >>> > standby tasks and just process restoring and > > > active > > > > > > tasks > > > > > > > so > > > > > > > > > > both > > > > > > > > > > >> can > > > > > > > > > > >> > >>> make > > > > > > > > > > >> > >>> > progress. But we should > > > > > > > > > > >> > >>> > let them catch up afterwards somehow -- maybe > we > > > can > > > > > > apply > > > > > > > > > some > > > > > > > > > > >> kind > > > > > > > > > > >> > of > > > > > > > > > > >> > >>> > heuristic, like "if we haven't > > > > > > > > > > >> > >>> > processed standbys for X iterations, or Y > > > > > milliseconds, > > > > > > > do so > > > > > > > > > > >> now." > > > > > > > > > > >> > >>> > > > > > > > > > > > >> > >>> > Actually, it might even be beneficial to avoid > > > > > > processing > > > > > > > > > > >> standbys a > > > > > > > > > > >> > >>> record > > > > > > > > > > >> > >>> > or two at a time and instead > > > > > > > > > > >> > >>> > wait for a large enough batch to build up for > > the > > > > > > RocksDB > > > > > > > > > > >> > bulk-loading > > > > > > > > > > >> > >>> > benefits. > > > > > > > > > > >> > >>> > > > > > > > > > > > >> > >>> > I think the "use dedicated threads for > standby" > > is > > > > the > > > > > > > more > > > > > > > > > > >> promising > > > > > > > > > > >> > >>> end > > > > > > > > > > >> > >>> > goal, especially since > > > > > > > > > > >> > >>> > if we split restoration into "restoring tasks" > > > then > > > > > > > active and > > > > > > > > > > >> > standbys > > > > > > > > > > >> > >>> > share almost nothing. But > > > > > > > > > > >> > >>> > that seems like follow-up work to the current > > KIP > > > :) > > > > > > > > > > >> > >>> > > > > > > > > > > > >> > >>> > On Thu, Aug 8, 2019 at 5:31 PM Sophie > > > Blee-Goldman < > > > > > > > > > > >> > >>> sop...@confluent.io> > > > > > > > > > > >> > >>> > wrote: > > > > > > > > > > >> > >>> > > > > > > > > > > > >> > >>> > > Stateful tasks with logging disabled seem to > > be > > > an > > > > > > > > > interesting > > > > > > > > > > >> edge > > > > > > > > > > >> > >>> case. > > > > > > > > > > >> > >>> > > On the one hand, > > > > > > > > > > >> > >>> > > for balancing purposes they should be > > considered > > > > > > > stateful > > > > > > > > > > since > > > > > > > > > > >> as > > > > > > > > > > >> > >>> > > Guozhang pointed out > > > > > > > > > > >> > >>> > > they are still "heavy" in IO costs. But for > > > > > "catching > > > > > > > up" > > > > > > > > > > >> purposes, > > > > > > > > > > >> > >>> ie > > > > > > > > > > >> > >>> > > when allocating standby > > > > > > > > > > >> > >>> > > tasks that will become active tasks, they > > should > > > > be > > > > > > > > > considered > > > > > > > > > > >> > >>> stateless > > > > > > > > > > >> > >>> > > as there is so > > > > > > > > > > >> > >>> > > meaningful sense of their lag. We should > never > > > > > > allocate > > > > > > > > > > standby > > > > > > > > > > >> > >>> tasks for > > > > > > > > > > >> > >>> > > them during the > > > > > > > > > > >> > >>> > > first rebalance, but should ensure they are > > > evenly > > > > > > > > > distributed > > > > > > > > > > >> > across > > > > > > > > > > >> > >>> > > instances. Maybe we > > > > > > > > > > >> > >>> > > should split these into a third category -- > > > after > > > > we > > > > > > > assign > > > > > > > > > > all > > > > > > > > > > >> > >>> stateful > > > > > > > > > > >> > >>> > > tasks with logging, we > > > > > > > > > > >> > >>> > > then distribute the set of logging-disabled > > > > stateful > > > > > > > tasks > > > > > > > > > to > > > > > > > > > > >> > improve > > > > > > > > > > >> > >>> > > balance, before lastly > > > > > > > > > > >> > >>> > > distributing stateless tasks? > > > > > > > > > > >> > >>> > > > > > > > > > > > > >> > >>> > > This actually leads into what I was just > > > thinking, > > > > > > > which is > > > > > > > > > > >> that we > > > > > > > > > > >> > >>> > really > > > > > > > > > > >> > >>> > > should distinguish the > > > > > > > > > > >> > >>> > > "catch-up" standbys from normal standbys as > > well > > > > as > > > > > > > > > > >> distinguishing > > > > > > > > > > >> > >>> > > actively processing tasks > > > > > > > > > > >> > >>> > > from active tasks that are still in the > > restore > > > > > phase. > > > > > > > It's > > > > > > > > > > >> > somewhat > > > > > > > > > > >> > >>> > > awkward that today, some > > > > > > > > > > >> > >>> > > active tasks just start processing > immediately > > > > while > > > > > > > others > > > > > > > > > > >> behave > > > > > > > > > > >> > >>> more > > > > > > > > > > >> > >>> > > like standby than active > > > > > > > > > > >> > >>> > > tasks for some time, before switching to > real > > > > > active. > > > > > > > They > > > > > > > > > > first > > > > > > > > > > >> > use > > > > > > > > > > >> > >>> the > > > > > > > > > > >> > >>> > > restoreConsumer, then > > > > > > > > > > >> > >>> > > later only the "normal" consumer. > > > > > > > > > > >> > >>> > > > > > > > > > > > > >> > >>> > > However, this restore period is still > distinct > > > > from > > > > > > > normal > > > > > > > > > > >> standbys > > > > > > > > > > >> > >>> in a > > > > > > > > > > >> > >>> > > lot of ways -- the code path > > > > > > > > > > >> > >>> > > for restoring is different than for updating > > > > > standbys, > > > > > > > for > > > > > > > > > > >> example > > > > > > > > > > >> > >>> in how > > > > > > > > > > >> > >>> > > long we block on #poll. > > > > > > > > > > >> > >>> > > So in addition to giving them their own name > > -- > > > > > let's > > > > > > go > > > > > > > > > with > > > > > > > > > > >> > >>> restoring > > > > > > > > > > >> > >>> > > task for now -- they really > > > > > > > > > > >> > >>> > > do seem to deserve being their own distinct > > > task. > > > > We > > > > > > can > > > > > > > > > > >> optimize > > > > > > > > > > >> > >>> them > > > > > > > > > > >> > >>> > for > > > > > > > > > > >> > >>> > > efficient conversion > > > > > > > > > > >> > >>> > > to active tasks since we know that's what > they > > > > will > > > > > > be. > > > > > > > > > > >> > >>> > > > > > > > > > > > > >> > >>> > > This resolves some of the awkwardness of > > dealing > > > > > with > > > > > > > the > > > > > > > > > > >> special > > > > > > > > > > >> > >>> case > > > > > > > > > > >> > >>> > > mentioned above: we > > > > > > > > > > >> > >>> > > find a balanced assignment of stateful and > > > > stateless > > > > > > > tasks, > > > > > > > > > > and > > > > > > > > > > >> > >>> create > > > > > > > > > > >> > >>> > > restoring tasks as needed. > > > > > > > > > > >> > >>> > > If logging is disabled, no restoring task is > > > > > created. > > > > > > > > > > >> > >>> > > > > > > > > > > > > >> > >>> > > > > > > > > > > > > >> > >>> > > On Thu, Aug 8, 2019 at 3:44 PM Guozhang > Wang < > > > > > > > > > > >> wangg...@gmail.com> > > > > > > > > > > >> > >>> wrote: > > > > > > > > > > >> > >>> > > > > > > > > > > > > >> > >>> > >> Regarding 3) above: I think for active task > > > they > > > > > > should > > > > > > > > > still > > > > > > > > > > >> be > > > > > > > > > > >> > >>> > >> considered > > > > > > > > > > >> > >>> > >> stateful since the processor would still > pay > > IO > > > > > cost > > > > > > > > > > accessing > > > > > > > > > > >> the > > > > > > > > > > >> > >>> > store, > > > > > > > > > > >> > >>> > >> but they would not have standby tasks? > > > > > > > > > > >> > >>> > >> > > > > > > > > > > >> > >>> > >> On Thu, Aug 8, 2019 at 7:49 AM Bruno > Cadonna > > < > > > > > > > > > > >> br...@confluent.io> > > > > > > > > > > >> > >>> > wrote: > > > > > > > > > > >> > >>> > >> > > > > > > > > > > >> > >>> > >> > Hi, > > > > > > > > > > >> > >>> > >> > > > > > > > > > > > >> > >>> > >> > Thank you for the KIP! > > > > > > > > > > >> > >>> > >> > > > > > > > > > > > >> > >>> > >> > Some questions/comments: > > > > > > > > > > >> > >>> > >> > > > > > > > > > > > >> > >>> > >> > 1. I am wondering if the "stand-by" tasks > > > that > > > > > > catch > > > > > > > up > > > > > > > > > > state > > > > > > > > > > >> > >>> before > > > > > > > > > > >> > >>> > >> > the active task is switched deserve its > own > > > > name > > > > > in > > > > > > > this > > > > > > > > > > KIP > > > > > > > > > > >> and > > > > > > > > > > >> > >>> maybe > > > > > > > > > > >> > >>> > >> > in the code. We have already stated that > > they > > > > are > > > > > > not > > > > > > > > > true > > > > > > > > > > >> > >>> stand-by > > > > > > > > > > >> > >>> > >> > tasks, they are not configured through > > > > > > > > > > >> `num.standby.replicas`, > > > > > > > > > > >> > and > > > > > > > > > > >> > >>> > >> > maybe they have also other properties > that > > > > > > > distinguish > > > > > > > > > them > > > > > > > > > > >> from > > > > > > > > > > >> > >>> true > > > > > > > > > > >> > >>> > >> > stand-by tasks of which we are not aware > > yet. > > > > For > > > > > > > > > example, > > > > > > > > > > >> they > > > > > > > > > > >> > >>> may be > > > > > > > > > > >> > >>> > >> > prioritized differently than other tasks. > > > > > > > Furthermore, > > > > > > > > > the > > > > > > > > > > >> name > > > > > > > > > > >> > >>> > >> > "stand-by" does not really fit with the > > > planned > > > > > > > > > > >> functionality of > > > > > > > > > > >> > >>> those > > > > > > > > > > >> > >>> > >> > tasks. In the following, I will call them > > > false > > > > > > > stand-by > > > > > > > > > > >> tasks. > > > > > > > > > > >> > >>> > >> > > > > > > > > > > > >> > >>> > >> > 2. Did you consider to trigger the > probing > > > > > > > rebalances not > > > > > > > > > > at > > > > > > > > > > >> > >>> regular > > > > > > > > > > >> > >>> > >> > time intervals but when the false > stand-by > > > > tasks > > > > > > > reach an > > > > > > > > > > >> > >>> acceptable > > > > > > > > > > >> > >>> > >> > lag? If you did consider, could you add a > > > > > paragraph > > > > > > > why > > > > > > > > > you > > > > > > > > > > >> > >>> rejected > > > > > > > > > > >> > >>> > >> > this idea to the "Rejected Alternatives" > > > > section. > > > > > > > > > > >> > >>> > >> > > > > > > > > > > > >> > >>> > >> > 3. Are tasks that solely contain stores > > with > > > > > > disabled > > > > > > > > > > logging > > > > > > > > > > >> > >>> > >> > classified as stateful or stateless in > the > > > > > > > algorithm? I > > > > > > > > > > would > > > > > > > > > > >> > >>> guess > > > > > > > > > > >> > >>> > >> > stateless, although if possible they > should > > > be > > > > > > > assigned > > > > > > > > > to > > > > > > > > > > >> the > > > > > > > > > > >> > >>> same > > > > > > > > > > >> > >>> > >> > instance they had run before the > rebalance. > > > As > > > > > far > > > > > > > as I > > > > > > > > > can > > > > > > > > > > >> see > > > > > > > > > > >> > >>> this > > > > > > > > > > >> > >>> > >> > special case is not handled in the > > algorithm. > > > > > > > > > > >> > >>> > >> > > > > > > > > > > > >> > >>> > >> > Best, > > > > > > > > > > >> > >>> > >> > Bruno > > > > > > > > > > >> > >>> > >> > > > > > > > > > > > >> > >>> > >> > > > > > > > > > > > >> > >>> > >> > > > > > > > > > > > >> > >>> > >> > On Thu, Aug 8, 2019 at 8:24 AM Guozhang > > Wang > > > < > > > > > > > > > > >> > wangg...@gmail.com> > > > > > > > > > > >> > >>> > >> wrote: > > > > > > > > > > >> > >>> > >> > > > > > > > > > > > > >> > >>> > >> > > 1. Sounds good, just wanted to clarify; > > and > > > > it > > > > > > may > > > > > > > > > worth > > > > > > > > > > >> > >>> documenting > > > > > > > > > > >> > >>> > >> it > > > > > > > > > > >> > >>> > >> > so > > > > > > > > > > >> > >>> > >> > > that users would not be surprised when > > > > > monitoring > > > > > > > their > > > > > > > > > > >> > >>> footprint. > > > > > > > > > > >> > >>> > >> > > > > > > > > > > > > >> > >>> > >> > > 2. Hmm I see... I think the trade-off > can > > > be > > > > > > > described > > > > > > > > > as > > > > > > > > > > >> "how > > > > > > > > > > >> > >>> much > > > > > > > > > > >> > >>> > >> > > imbalance would bother you to be > willing > > to > > > > pay > > > > > > > another > > > > > > > > > > >> > >>> rebalance, > > > > > > > > > > >> > >>> > >> along > > > > > > > > > > >> > >>> > >> > > with potentially more restoration lag", > > and > > > > the > > > > > > > current > > > > > > > > > > >> > >>> definition > > > > > > > > > > >> > >>> > of > > > > > > > > > > >> > >>> > >> > > rebalance_factor can be considered as a > > > rough > > > > > > > > > measurement > > > > > > > > > > >> of > > > > > > > > > > >> > >>> that > > > > > > > > > > >> > >>> > >> > > imbalance. Of course one can argue > that a > > > > finer > > > > > > > grained > > > > > > > > > > >> > >>> measurement > > > > > > > > > > >> > >>> > >> could > > > > > > > > > > >> > >>> > >> > > be "resource footprint" like CPU / > > storage > > > of > > > > > > each > > > > > > > > > > instance > > > > > > > > > > >> > >>> like we > > > > > > > > > > >> > >>> > >> have > > > > > > > > > > >> > >>> > >> > in > > > > > > > > > > >> > >>> > >> > > Kafka broker auto balancing tools, but > > I'd > > > > > prefer > > > > > > > not > > > > > > > > > > doing > > > > > > > > > > >> > >>> that as > > > > > > > > > > >> > >>> > >> part > > > > > > > > > > >> > >>> > >> > of > > > > > > > > > > >> > >>> > >> > > the library but more as an operational > > tool > > > > in > > > > > > the > > > > > > > > > > future. > > > > > > > > > > >> On > > > > > > > > > > >> > >>> the > > > > > > > > > > >> > >>> > >> other > > > > > > > > > > >> > >>> > >> > > hand, I've seen stateful and stateless > > > tasks > > > > > > having > > > > > > > > > very > > > > > > > > > > >> > >>> different > > > > > > > > > > >> > >>> > >> load, > > > > > > > > > > >> > >>> > >> > > and sometimes the only bottleneck of a > > > > Streams > > > > > > app > > > > > > > is > > > > > > > > > > just > > > > > > > > > > >> one > > > > > > > > > > >> > >>> > >> stateful > > > > > > > > > > >> > >>> > >> > > sub-topology and whoever gets tasks of > > that > > > > > > > > > sub-topology > > > > > > > > > > >> > become > > > > > > > > > > >> > >>> > >> hotspot > > > > > > > > > > >> > >>> > >> > > (and that's why our algorithm tries to > > > > balance > > > > > > per > > > > > > > > > > >> > sub-topology > > > > > > > > > > >> > >>> as > > > > > > > > > > >> > >>> > >> well), > > > > > > > > > > >> > >>> > >> > > so maybe we can just consider stateful > > > tasks > > > > > when > > > > > > > > > > >> calculating > > > > > > > > > > >> > >>> this > > > > > > > > > > >> > >>> > >> factor > > > > > > > > > > >> > >>> > >> > > as a very brute force heuristic? > > > > > > > > > > >> > >>> > >> > > > > > > > > > > > > >> > >>> > >> > > 3.a. Thinking about this a bit more, > > maybe > > > > it's > > > > > > > better > > > > > > > > > > not > > > > > > > > > > >> try > > > > > > > > > > >> > >>> to > > > > > > > > > > >> > >>> > >> tackle > > > > > > > > > > >> > >>> > >> > an > > > > > > > > > > >> > >>> > >> > > unseen enemy just yet, and observe if > it > > > > really > > > > > > > emerges > > > > > > > > > > >> later, > > > > > > > > > > >> > >>> and > > > > > > > > > > >> > >>> > by > > > > > > > > > > >> > >>> > >> > then > > > > > > > > > > >> > >>> > >> > > we may have other ways to not starving > > the > > > > > > standby > > > > > > > > > tasks, > > > > > > > > > > >> for > > > > > > > > > > >> > >>> > >> example, by > > > > > > > > > > >> > >>> > >> > > using dedicate threads for standby > tasks > > or > > > > > even > > > > > > > > > consider > > > > > > > > > > >> > having > > > > > > > > > > >> > >>> > >> higher > > > > > > > > > > >> > >>> > >> > > priority for standby than active so > that > > we > > > > > > always > > > > > > > try > > > > > > > > > to > > > > > > > > > > >> > >>> caught up > > > > > > > > > > >> > >>> > >> > standby > > > > > > > > > > >> > >>> > >> > > first, then process active; and if > > active's > > > > > > lagging > > > > > > > > > > >> compared > > > > > > > > > > >> > to > > > > > > > > > > >> > >>> > >> > > log-end-offset is increasing then we > > should > > > > > > > increase > > > > > > > > > > >> capacity, > > > > > > > > > > >> > >>> etc > > > > > > > > > > >> > >>> > >> etc. > > > > > > > > > > >> > >>> > >> > > > > > > > > > > > > >> > >>> > >> > > 4. Actually with KIP-429 this may not > be > > > the > > > > > > case: > > > > > > > we > > > > > > > > > may > > > > > > > > > > >> not > > > > > > > > > > >> > >>> call > > > > > > > > > > >> > >>> > >> > > onPartitionsRevoked prior to rebalance > > any > > > > more > > > > > > so > > > > > > > > > would > > > > > > > > > > >> not > > > > > > > > > > >> > >>> transit > > > > > > > > > > >> > >>> > >> > state > > > > > > > > > > >> > >>> > >> > > to PARTITIONS_REVOKED, and hence not > > cause > > > > the > > > > > > > state of > > > > > > > > > > the > > > > > > > > > > >> > >>> instance > > > > > > > > > > >> > >>> > >> to > > > > > > > > > > >> > >>> > >> > be > > > > > > > > > > >> > >>> > >> > > REBALANCING. In other words, even if a > > > > instance > > > > > > is > > > > > > > > > > >> undergoing > > > > > > > > > > >> > a > > > > > > > > > > >> > >>> > >> rebalance > > > > > > > > > > >> > >>> > >> > > it's state may still be RUNNING and it > > may > > > > > still > > > > > > be > > > > > > > > > > >> processing > > > > > > > > > > >> > >>> > >> records at > > > > > > > > > > >> > >>> > >> > > the same time. > > > > > > > > > > >> > >>> > >> > > > > > > > > > > > > >> > >>> > >> > > > > > > > > > > > > >> > >>> > >> > > On Wed, Aug 7, 2019 at 12:14 PM John > > > Roesler > > > > < > > > > > > > > > > >> > j...@confluent.io > > > > > > > > > > >> > >>> > > > > > > > > > > > >> > >>> > >> wrote: > > > > > > > > > > >> > >>> > >> > > > > > > > > > > > > >> > >>> > >> > > > Hey Guozhang, > > > > > > > > > > >> > >>> > >> > > > > > > > > > > > > > >> > >>> > >> > > > Thanks for the review! > > > > > > > > > > >> > >>> > >> > > > > > > > > > > > > > >> > >>> > >> > > > 1. Yes, even with > `num.standby.replicas > > > := > > > > > 0`, > > > > > > we > > > > > > > > > will > > > > > > > > > > >> still > > > > > > > > > > >> > >>> > >> > temporarily > > > > > > > > > > >> > >>> > >> > > > allocate standby tasks to accomplish > a > > > > > > > no-downtime > > > > > > > > > task > > > > > > > > > > >> > >>> migration. > > > > > > > > > > >> > >>> > >> > > > Although, I'd argue that this doesn't > > > > really > > > > > > > violate > > > > > > > > > > the > > > > > > > > > > >> > >>> config, > > > > > > > > > > >> > >>> > as > > > > > > > > > > >> > >>> > >> the > > > > > > > > > > >> > >>> > >> > > > task isn't a true hot standby. As > soon > > as > > > > it > > > > > > > catches > > > > > > > > > > up, > > > > > > > > > > >> > we'll > > > > > > > > > > >> > >>> > >> > rebalance > > > > > > > > > > >> > >>> > >> > > > again, that task will become active, > > and > > > > the > > > > > > > original > > > > > > > > > > >> > instance > > > > > > > > > > >> > >>> > that > > > > > > > > > > >> > >>> > >> > hosted > > > > > > > > > > >> > >>> > >> > > > the active task will no longer have > the > > > > task > > > > > > > assigned > > > > > > > > > > at > > > > > > > > > > >> > all. > > > > > > > > > > >> > >>> Once > > > > > > > > > > >> > >>> > >> the > > > > > > > > > > >> > >>> > >> > > > stateDirCleaner kicks in, we'll free > > the > > > > disk > > > > > > > space > > > > > > > > > > from > > > > > > > > > > >> it, > > > > > > > > > > >> > >>> and > > > > > > > > > > >> > >>> > >> > return to > > > > > > > > > > >> > >>> > >> > > > the steady-state of having just one > > copy > > > of > > > > > the > > > > > > > task > > > > > > > > > in > > > > > > > > > > >> the > > > > > > > > > > >> > >>> > cluster. > > > > > > > > > > >> > >>> > >> > > > > > > > > > > > > > >> > >>> > >> > > > We can of course do without this, > but I > > > > feel > > > > > > the > > > > > > > > > > current > > > > > > > > > > >> > >>> proposal > > > > > > > > > > >> > >>> > is > > > > > > > > > > >> > >>> > >> > > > operationally preferable, since it > > > doesn't > > > > > make > > > > > > > > > > >> configuring > > > > > > > > > > >> > >>> > >> > hot-standbys a > > > > > > > > > > >> > >>> > >> > > > pre-requisite for fast rebalances. > > > > > > > > > > >> > >>> > >> > > > > > > > > > > > > > >> > >>> > >> > > > 2. Yes, I think your interpretation > is > > > what > > > > > we > > > > > > > > > > intended. > > > > > > > > > > >> The > > > > > > > > > > >> > >>> > default > > > > > > > > > > >> > >>> > >> > > > balance_factor would be 1, as it is > > > > > implicitly > > > > > > > today. > > > > > > > > > > >> What > > > > > > > > > > >> > >>> this > > > > > > > > > > >> > >>> > >> does is > > > > > > > > > > >> > >>> > >> > > > allows operators to trade off less > > > balanced > > > > > > > > > assignments > > > > > > > > > > >> > >>> against > > > > > > > > > > >> > >>> > >> fewer > > > > > > > > > > >> > >>> > >> > > > rebalances. If you have lots of space > > > > > capacity > > > > > > in > > > > > > > > > your > > > > > > > > > > >> > >>> instances, > > > > > > > > > > >> > >>> > >> this > > > > > > > > > > >> > >>> > >> > may > > > > > > > > > > >> > >>> > >> > > > be a perfectly fine tradeoff, and you > > may > > > > > > prefer > > > > > > > for > > > > > > > > > > >> Streams > > > > > > > > > > >> > >>> not > > > > > > > > > > >> > >>> > to > > > > > > > > > > >> > >>> > >> > bother > > > > > > > > > > >> > >>> > >> > > > streaming GBs of data from the broker > > in > > > > > > pursuit > > > > > > > of > > > > > > > > > > >> perfect > > > > > > > > > > >> > >>> > balance. > > > > > > > > > > >> > >>> > >> > Not > > > > > > > > > > >> > >>> > >> > > > married to this configuration, > though. > > It > > > > was > > > > > > > > > inspired > > > > > > > > > > by > > > > > > > > > > >> > the > > > > > > > > > > >> > >>> > >> related > > > > > > > > > > >> > >>> > >> > work > > > > > > > > > > >> > >>> > >> > > > research we did. > > > > > > > > > > >> > >>> > >> > > > > > > > > > > > > > >> > >>> > >> > > > 3. I'll take a look > > > > > > > > > > >> > >>> > >> > > > > > > > > > > > > > >> > >>> > >> > > > 3a. I think this is a good idea. I'd > > > > classify > > > > > > it > > > > > > > as a > > > > > > > > > > >> type > > > > > > > > > > >> > of > > > > > > > > > > >> > >>> grey > > > > > > > > > > >> > >>> > >> > failure > > > > > > > > > > >> > >>> > >> > > > detection. It may make more sense to > > > tackle > > > > > > grey > > > > > > > > > > >> failures as > > > > > > > > > > >> > >>> part > > > > > > > > > > >> > >>> > of > > > > > > > > > > >> > >>> > >> > the > > > > > > > > > > >> > >>> > >> > > > heartbeat protocol (as I POCed here: > > > > > > > > > > >> > >>> > >> > > > > > > > > > https://github.com/apache/kafka/pull/7096/files > > > > > > > ). > > > > > > > > > > WDYT? > > > > > > > > > > >> > >>> > >> > > > > > > > > > > > > > >> > >>> > >> > > > 4. Good catch! I didn't think about > > that > > > > > > before. > > > > > > > > > > Looking > > > > > > > > > > >> at > > > > > > > > > > >> > it > > > > > > > > > > >> > >>> > now, > > > > > > > > > > >> > >>> > >> > though, > > > > > > > > > > >> > >>> > >> > > > I wonder if we're actually protected > > > > already. > > > > > > The > > > > > > > > > > >> > >>> stateDirCleaner > > > > > > > > > > >> > >>> > >> > thread > > > > > > > > > > >> > >>> > >> > > > only executes if the instance is in > > > RUNNING > > > > > > > state, > > > > > > > > > and > > > > > > > > > > >> > KIP-441 > > > > > > > > > > >> > >>> > >> > proposes to > > > > > > > > > > >> > >>> > >> > > > use "probing rebalances" to report > task > > > > lag. > > > > > > > Hence, > > > > > > > > > > >> during > > > > > > > > > > >> > the > > > > > > > > > > >> > >>> > >> window > > > > > > > > > > >> > >>> > >> > > > between when the instance reports a > lag > > > and > > > > > the > > > > > > > > > > assignor > > > > > > > > > > >> > >>> makes a > > > > > > > > > > >> > >>> > >> > decision > > > > > > > > > > >> > >>> > >> > > > about it, the instance should remain > in > > > > > > > REBALANCING > > > > > > > > > > >> state, > > > > > > > > > > >> > >>> right? > > > > > > > > > > >> > >>> > If > > > > > > > > > > >> > >>> > >> > so, > > > > > > > > > > >> > >>> > >> > > > then this should prevent the race > > > > condition. > > > > > If > > > > > > > not, > > > > > > > > > > >> then we > > > > > > > > > > >> > >>> do > > > > > > > > > > >> > >>> > >> indeed > > > > > > > > > > >> > >>> > >> > need > > > > > > > > > > >> > >>> > >> > > > to do something about it. > > > > > > > > > > >> > >>> > >> > > > > > > > > > > > > > >> > >>> > >> > > > 5. Good idea. I think that today, you > > can > > > > > only > > > > > > > see > > > > > > > > > the > > > > > > > > > > >> > >>> consumer > > > > > > > > > > >> > >>> > lag, > > > > > > > > > > >> > >>> > >> > which > > > > > > > > > > >> > >>> > >> > > > is a poor substitute. I'll add some > > > metrics > > > > > to > > > > > > > the > > > > > > > > > > >> proposal. > > > > > > > > > > >> > >>> > >> > > > > > > > > > > > > > >> > >>> > >> > > > Thanks again for the comments! > > > > > > > > > > >> > >>> > >> > > > -John > > > > > > > > > > >> > >>> > >> > > > > > > > > > > > > > >> > >>> > >> > > > On Tue, Aug 6, 2019 at 4:27 PM > Guozhang > > > > Wang > > > > > < > > > > > > > > > > >> > >>> wangg...@gmail.com> > > > > > > > > > > >> > >>> > >> > wrote: > > > > > > > > > > >> > >>> > >> > > > > > > > > > > > > > >> > >>> > >> > > > > Hello Sophie, > > > > > > > > > > >> > >>> > >> > > > > > > > > > > > > > > >> > >>> > >> > > > > Thanks for the proposed KIP. I left > > > some > > > > > > > comments > > > > > > > > > on > > > > > > > > > > >> the > > > > > > > > > > >> > >>> wiki > > > > > > > > > > >> > >>> > >> itself, > > > > > > > > > > >> > >>> > >> > > > and I > > > > > > > > > > >> > >>> > >> > > > > think I'm still not very clear on a > > > > couple > > > > > or > > > > > > > > > those: > > > > > > > > > > >> > >>> > >> > > > > > > > > > > > > > > >> > >>> > >> > > > > 1. With this proposal, does that > mean > > > > with > > > > > > > > > > >> > >>> num.standby.replicas > > > > > > > > > > >> > >>> > == > > > > > > > > > > >> > >>> > >> > 0, we > > > > > > > > > > >> > >>> > >> > > > > may sometimes still have some > standby > > > > tasks > > > > > > > which > > > > > > > > > may > > > > > > > > > > >> > >>> violate > > > > > > > > > > >> > >>> > the > > > > > > > > > > >> > >>> > >> > config? > > > > > > > > > > >> > >>> > >> > > > > > > > > > > > > > > >> > >>> > >> > > > > 2. I think I understand the > rationale > > > to > > > > > > > consider > > > > > > > > > > lags > > > > > > > > > > >> > that > > > > > > > > > > >> > >>> is > > > > > > > > > > >> > >>> > >> below > > > > > > > > > > >> > >>> > >> > the > > > > > > > > > > >> > >>> > >> > > > > specified threshold to be equal, > > rather > > > > > than > > > > > > > still > > > > > > > > > > >> > >>> considering > > > > > > > > > > >> > >>> > >> 5000 > > > > > > > > > > >> > >>> > >> > is > > > > > > > > > > >> > >>> > >> > > > > better than 5001 -- we do not want > to > > > > > > > > > "over-optimize" > > > > > > > > > > >> and > > > > > > > > > > >> > >>> > >> potentially > > > > > > > > > > >> > >>> > >> > > > falls > > > > > > > > > > >> > >>> > >> > > > > into endless rebalances back and > > forth. > > > > > > > > > > >> > >>> > >> > > > > > > > > > > > > > > >> > >>> > >> > > > > But I'm not clear about the > rationale > > > of > > > > > the > > > > > > > second > > > > > > > > > > >> > >>> parameter of > > > > > > > > > > >> > >>> > >> > > > > > > > > > > > > > > >> > >>> > > > > > > > constrainedBalancedAssignment(StatefulTasksToRankedCandidates, > > > > > > > > > > >> > >>> > >> > > > > balance_factor): > > > > > > > > > > >> > >>> > >> > > > > > > > > > > > > > > >> > >>> > >> > > > > Does that mean, e.g. with > > > balance_factor > > > > of > > > > > > 3, > > > > > > > we'd > > > > > > > > > > >> > >>> consider two > > > > > > > > > > >> > >>> > >> > > > > assignments one resulting > > > balance_factor > > > > 0 > > > > > > and > > > > > > > one > > > > > > > > > > >> > resulting > > > > > > > > > > >> > >>> > >> > > > balance_factor > > > > > > > > > > >> > >>> > >> > > > > 3 to be equally optimized > assignment > > > and > > > > > > > therefore > > > > > > > > > > may > > > > > > > > > > >> > "stop > > > > > > > > > > >> > >>> > >> early"? > > > > > > > > > > >> > >>> > >> > This > > > > > > > > > > >> > >>> > >> > > > > was not very convincing to me :P > > > > > > > > > > >> > >>> > >> > > > > > > > > > > > > > > >> > >>> > >> > > > > 3. There are a couple of minor > > comments > > > > > about > > > > > > > the > > > > > > > > > > >> > algorithm > > > > > > > > > > >> > >>> > >> itself, > > > > > > > > > > >> > >>> > >> > left > > > > > > > > > > >> > >>> > >> > > > on > > > > > > > > > > >> > >>> > >> > > > > the wiki page since it needs to > refer > > > to > > > > > the > > > > > > > exact > > > > > > > > > > line > > > > > > > > > > >> > and > > > > > > > > > > >> > >>> > better > > > > > > > > > > >> > >>> > >> > > > > displayed there. > > > > > > > > > > >> > >>> > >> > > > > > > > > > > > > > > >> > >>> > >> > > > > 3.a Another wild thought about the > > > > > threshold > > > > > > > > > itself: > > > > > > > > > > >> today > > > > > > > > > > >> > >>> the > > > > > > > > > > >> > >>> > >> > assignment > > > > > > > > > > >> > >>> > >> > > > > itself is memoryless, so we would > not > > > > know > > > > > if > > > > > > > the > > > > > > > > > > >> reported > > > > > > > > > > >> > >>> > >> `TaskLag` > > > > > > > > > > >> > >>> > >> > > > itself > > > > > > > > > > >> > >>> > >> > > > > is increasing or decreasing even if > > the > > > > > > current > > > > > > > > > value > > > > > > > > > > >> is > > > > > > > > > > >> > >>> under > > > > > > > > > > >> > >>> > the > > > > > > > > > > >> > >>> > >> > > > > threshold. I wonder if it worthy to > > > make > > > > > it a > > > > > > > bit > > > > > > > > > > more > > > > > > > > > > >> > >>> > >> complicated to > > > > > > > > > > >> > >>> > >> > > > track > > > > > > > > > > >> > >>> > >> > > > > task lag trend at the assignor? > > > > Practically > > > > > > it > > > > > > > may > > > > > > > > > > not > > > > > > > > > > >> be > > > > > > > > > > >> > >>> very > > > > > > > > > > >> > >>> > >> > uncommon > > > > > > > > > > >> > >>> > >> > > > > that stand-by tasks are not keeping > > up > > > > due > > > > > to > > > > > > > the > > > > > > > > > > fact > > > > > > > > > > >> > that > > > > > > > > > > >> > >>> > other > > > > > > > > > > >> > >>> > >> > active > > > > > > > > > > >> > >>> > >> > > > > tasks hosted on the same thread is > > > > starving > > > > > > the > > > > > > > > > > standby > > > > > > > > > > >> > >>> tasks. > > > > > > > > > > >> > >>> > >> > > > > > > > > > > > > > > >> > >>> > >> > > > > 4. There's a potential race > condition > > > > risk > > > > > > when > > > > > > > > > > >> reporting > > > > > > > > > > >> > >>> > >> `TaskLags` > > > > > > > > > > >> > >>> > >> > in > > > > > > > > > > >> > >>> > >> > > > the > > > > > > > > > > >> > >>> > >> > > > > subscription: right after reporting > > it > > > to > > > > > the > > > > > > > > > leader, > > > > > > > > > > >> the > > > > > > > > > > >> > >>> > cleanup > > > > > > > > > > >> > >>> > >> > thread > > > > > > > > > > >> > >>> > >> > > > > kicks in and deletes the state > > > directory. > > > > > If > > > > > > > the > > > > > > > > > task > > > > > > > > > > >> was > > > > > > > > > > >> > >>> > assigned > > > > > > > > > > >> > >>> > >> > to the > > > > > > > > > > >> > >>> > >> > > > > host it would cause it to restore > > from > > > > > > > beginning > > > > > > > > > and > > > > > > > > > > >> > >>> effectively > > > > > > > > > > >> > >>> > >> > make the > > > > > > > > > > >> > >>> > >> > > > > seemingly optimized assignment very > > > > > > > sub-optimal. > > > > > > > > > > >> > >>> > >> > > > > > > > > > > > > > > >> > >>> > >> > > > > To be on the safer side we should > > > > consider > > > > > > > either > > > > > > > > > > prune > > > > > > > > > > >> > out > > > > > > > > > > >> > >>> > those > > > > > > > > > > >> > >>> > >> > tasks > > > > > > > > > > >> > >>> > >> > > > > that are "close to be cleaned up" > in > > > the > > > > > > > > > > subscription, > > > > > > > > > > >> or > > > > > > > > > > >> > we > > > > > > > > > > >> > >>> > >> should > > > > > > > > > > >> > >>> > >> > delay > > > > > > > > > > >> > >>> > >> > > > > the cleanup right after we've > > included > > > > them > > > > > > in > > > > > > > the > > > > > > > > > > >> > >>> subscription > > > > > > > > > > >> > >>> > in > > > > > > > > > > >> > >>> > >> > case > > > > > > > > > > >> > >>> > >> > > > > they are been selected as assigned > > > tasks > > > > by > > > > > > the > > > > > > > > > > >> assignor. > > > > > > > > > > >> > >>> > >> > > > > > > > > > > > > > > >> > >>> > >> > > > > 5. This is a meta comment: I think > it > > > > would > > > > > > be > > > > > > > > > > helpful > > > > > > > > > > >> to > > > > > > > > > > >> > >>> add > > > > > > > > > > >> > >>> > some > > > > > > > > > > >> > >>> > >> > user > > > > > > > > > > >> > >>> > >> > > > > visibility on the standby tasks > > lagging > > > > as > > > > > > > well, > > > > > > > > > via > > > > > > > > > > >> > >>> metrics for > > > > > > > > > > >> > >>> > >> > example. > > > > > > > > > > >> > >>> > >> > > > > Today it is hard for us to observe > > how > > > > far > > > > > > are > > > > > > > our > > > > > > > > > > >> current > > > > > > > > > > >> > >>> > standby > > > > > > > > > > >> > >>> > >> > tasks > > > > > > > > > > >> > >>> > >> > > > > compared to the active tasks and > > > whether > > > > > that > > > > > > > lag > > > > > > > > > is > > > > > > > > > > >> being > > > > > > > > > > >> > >>> > >> > increasing or > > > > > > > > > > >> > >>> > >> > > > > decreasing. As a follow-up task, > for > > > > > example, > > > > > > > the > > > > > > > > > > >> > rebalance > > > > > > > > > > >> > >>> > should > > > > > > > > > > >> > >>> > >> > also > > > > > > > > > > >> > >>> > >> > > > be > > > > > > > > > > >> > >>> > >> > > > > triggered if we realize that some > > > standby > > > > > > > task's > > > > > > > > > lag > > > > > > > > > > is > > > > > > > > > > >> > >>> > increasing > > > > > > > > > > >> > >>> > >> > > > > indefinitely means that it cannot > > keep > > > up > > > > > > > (which is > > > > > > > > > > >> > another > > > > > > > > > > >> > >>> > >> indicator > > > > > > > > > > >> > >>> > >> > > > > either you need to add more > resources > > > > with > > > > > > the > > > > > > > > > > >> > num.standbys > > > > > > > > > > >> > >>> or > > > > > > > > > > >> > >>> > >> your > > > > > > > > > > >> > >>> > >> > are > > > > > > > > > > >> > >>> > >> > > > > still not balanced enough). > > > > > > > > > > >> > >>> > >> > > > > > > > > > > > > > > >> > >>> > >> > > > > > > > > > > > > > > >> > >>> > >> > > > > On Tue, Aug 6, 2019 at 1:32 PM > Sophie > > > > > > > Blee-Goldman > > > > > > > > > < > > > > > > > > > > >> > >>> > >> > sop...@confluent.io> > > > > > > > > > > >> > >>> > >> > > > > wrote: > > > > > > > > > > >> > >>> > >> > > > > > > > > > > > > > > >> > >>> > >> > > > > > Hey all, > > > > > > > > > > >> > >>> > >> > > > > > > > > > > > > > > > >> > >>> > >> > > > > > I'd like to kick off discussion > on > > > > > KIP-441, > > > > > > > aimed > > > > > > > > > > at > > > > > > > > > > >> the > > > > > > > > > > >> > >>> long > > > > > > > > > > >> > >>> > >> > restore > > > > > > > > > > >> > >>> > >> > > > > times > > > > > > > > > > >> > >>> > >> > > > > > in Streams during which further > > > active > > > > > > > processing > > > > > > > > > > >> and IQ > > > > > > > > > > >> > >>> are > > > > > > > > > > >> > >>> > >> > blocked. > > > > > > > > > > >> > >>> > >> > > > > > Please give it a read and let us > > know > > > > > your > > > > > > > > > thoughts > > > > > > > > > > >> > >>> > >> > > > > > > > > > > > > > > > >> > >>> > >> > > > > > > > > > > > > > > > >> > >>> > >> > > > > > > > > > > > > > > > >> > >>> > >> > > > > > > > > > > > > > > >> > >>> > >> > > > > > > > > > > > > > >> > >>> > >> > > > > > > > > > > > >> > >>> > >> > > > > > > > > > > >> > >>> > > > > > > > > > > > >> > >>> > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-441:+Smooth+Scaling+Out+for+Kafka+Streams > > > > > > > > > > >> > >>> > >> > > > > > > > > > > > > > > > >> > >>> > >> > > > > > Cheers, > > > > > > > > > > >> > >>> > >> > > > > > Sophie > > > > > > > > > > >> > >>> > >> > > > > > > > > > > > > > > > >> > >>> > >> > > > > > > > > > > > > > > >> > >>> > >> > > > > > > > > > > > > > > >> > >>> > >> > > > > -- > > > > > > > > > > >> > >>> > >> > > > > -- Guozhang > > > > > > > > > > >> > >>> > >> > > > > > > > > > > > > > > >> > >>> > >> > > > > > > > > > > > > > >> > >>> > >> > > > > > > > > > > > > >> > >>> > >> > > > > > > > > > > > > >> > >>> > >> > > -- > > > > > > > > > > >> > >>> > >> > > -- Guozhang > > > > > > > > > > >> > >>> > >> > > > > > > > > > > > >> > >>> > >> > > > > > > > > > > >> > >>> > >> > > > > > > > > > > >> > >>> > >> -- > > > > > > > > > > >> > >>> > >> -- Guozhang > > > > > > > > > > >> > >>> > >> > > > > > > > > > > >> > >>> > > > > > > > > > > > > >> > >>> > > > > > > > > > > > >> > >>> > > > > > > > > > > >> > >>> > > > > > > > > > > >> > >>> -- > > > > > > > > > > >> > >>> -- Guozhang > > > > > > > > > > >> > >>> > > > > > > > > > > >> > >> > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> -- > > > > > > > > > > >> -- Guozhang > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > > > -- > -- Guozhang >