[ https://issues.apache.org/jira/browse/KAFKA-13600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17486714#comment-17486714 ]
John Roesler commented on KAFKA-13600: -------------------------------------- Hi [~tim.patterson] , thanks for the report and the patch! It sounds like you're reporting two things here: # a bug around the acceptable recovery lag. # an improvement on assignment balance If we can discuss those things independently, then we can definitely merge the bugfix immediately. Depending on the impact of the improvement, it might also fall into the category of a simple ticket, or it might be more appropriate to have a KIP as [~cadonna] suggested. Regarding the bug, I find it completely plausible that we have a bug, but I have to confess that I'm not 100% sure I understand the report. Is the situation that there's an active that's happens to be processing quite a bit ahead of the replicas, such that when the active goes offline, there's no "caught-up" node, and instead of failing the task over to the least-lagging node, we just assign it to a fresh node? If that's it, then it is certainly not the desired behavior. The notion of acceptableRecoveryLag was introduced because follower replicas will always lag the active task, by definition. We want task ownership to be able to swap over from the active to a warm-up when it's caught up, but it will never be 100% caught up (because it is a follower until it takes over). acceptableRecoveryLag is a way to define a small amount of lag that "acceptable" so that when a warm-up is only lagging by that amount, we can consider it to be effectively caught up and move the active to the warm-up node. As you can see, this has nothing at all to do with which nodes are eligible to take over when an active exits the cluster. In that case, it was always the intent that the most-caught-up node should take over active processing, regardless of its lag. I've been squinting at our existing code, and also your patch ([https://github.com/apache/kafka/commit/a4b622685423fbfd68b1291dad85cc1f44b086f1)] . It looks to me like the flaw in the existing implementation is essentially just here: [https://github.com/apache/kafka/commit/a4b622685423fbfd68b1291dad85cc1f44b086f1#diff-83a301514ee18b410df40a91595f6f1afd51f6152ff813b5789516cf5c3605baL92-L96] {code:java} // if the desired client is not caught up, and there is another client that _is_ caught up, then // we schedule a movement, so we can move the active task to the caught-up client. We'll try to // assign a warm-up to the desired client so that we can move it later on.{code} which should indeed be just like what you described: {code:java} // if the desired client is not caught up, and there is another client that _is_ more caught up, // then we schedule a movement [to] move the active task to the [most] caught-up client. // We'll try to assign a warm-up to the desired client so that we can move it later on.{code} On the other hand, we should not lose this important predicate to determine whether a task is considered "caught up: [https://github.com/apache/kafka/commit/a4b622685423fbfd68b1291dad85cc1f44b086f1#diff-e50a755ba2a4d2f7306d1016d079018cba22f9f32993ef5dd64408d1a94d79acL245] {code:java} activeRunning(taskLag) || unbounded(acceptableRecoveryLag) || acceptable(acceptableRecoveryLag, taskLag) {code} This captures a couple of subtleties in addition to the obvious "a task is caught up if it's under the acceptable recovery lag": # A running, active task doesn't have a real lag at all, but instead its "lag" is the sentinel value `-2` # You can disable the "warm up" phase completely by setting acceptableRecoveryLag to `Long.MAX_VALUE`, in which case, we ignore lags completely and consider all nodes to be caught up, even if they didn't report a lag at all. One extra thing I like about your patch is this: [https://github.com/apache/kafka/commit/a4b622685423fbfd68b1291dad85cc1f44b086f1#diff-83a301514ee18b410df40a91595f6f1afd51f6152ff813b5789516cf5c3605baR54-R56] {code:java} // Even if there is a more caught up client, as long as we're within allowable lag then // its best just to stick with what we've got {code} I agree that, if two nodes are within the acceptableRecoveryLag of each other, we should consider their lags to be effectively the same. That's something I wanted to do when we wrote this code, but couldn't figure out a good way to do it. One thing I'd need more time on is the TaskMovementTest. At first glance, it looks like those changes are just about the slightly different method signature, but I'd want to be very sure that we're still testing the same invariants that we wanted to test. Would you be willing to submit this bugfix as a PR so that we can formally review and merge it? > Rebalances while streams is in degraded state can cause stores to be > reassigned and restore from scratch > -------------------------------------------------------------------------------------------------------- > > Key: KAFKA-13600 > URL: https://issues.apache.org/jira/browse/KAFKA-13600 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 3.1.0, 2.8.1, 3.0.0 > Reporter: Tim Patterson > Priority: Major > > Consider this scenario: > # A node is lost from the cluster. > # A rebalance is kicked off with a new "target assignment"'s(ie the > rebalance is attempting to move a lot of tasks - see > https://issues.apache.org/jira/browse/KAFKA-10121). > # The kafka cluster is now a bit more sluggish from the increased load. > # A Rolling Deploy happens triggering rebalances, during the rebalance > processing continues but offsets can't be committed(Or nodes are restarted > but fail to commit offsets) > # The most caught up nodes now aren't within `acceptableRecoveryLag` and so > the task is started in it's "target assignment" location, restoring all state > from scratch and delaying further processing instead of using the "almost > caught up" node. > We've hit this a few times and having lots of state (~25TB worth) and being > heavy users of IQ this is not ideal for us. > While we can increase `acceptableRecoveryLag` to larger values to try get > around this that causes other issues (ie a warmup becoming active when its > still quite far behind) > The solution seems to be to balance "balanced assignment" with "most caught > up nodes". > We've got a fork where we do just this and it's made a huge difference to the > reliability of our cluster. > Our change is to simply use the most caught up node if the "target node" is > more than `acceptableRecoveryLag` behind. > This gives up some of the load balancing type behaviour of the existing code > but in practise doesn't seem to matter too much. > I guess maybe an algorithm that identified candidate nodes as those being > within `acceptableRecoveryLag` of the most caught up node might allow the > best of both worlds. > > Our fork is > [https://github.com/apache/kafka/compare/trunk...tim-patterson:fix_balance_uncaughtup?expand=1] > (We also moved the capacity constraint code to happen after all the stateful > assignment to prioritise standby tasks over warmup tasks) > Ideally we don't want to maintain a fork of kafka streams going forward so > are hoping to get a bit of discussion / agreement on the best way to handle > this. > More than happy to contribute code/test different algo's in production system > or anything else to help with this issue -- This message was sent by Atlassian Jira (v8.20.1#820001)