tim-patterson commented on a change in pull request #11760: URL: https://github.com/apache/kafka/pull/11760#discussion_r811393001
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java ########## @@ -102,17 +109,26 @@ static int assignActiveTaskMovements(final Map<TaskId, SortedSet<UUID>> tasksToC final int movementsNeeded = taskMovements.size(); for (final TaskMovement movement : taskMovements) { - final UUID standbySourceClient = caughtUpClientsByTaskLoad.poll( + // Attempt to find a caught up standby, otherwise find any caught up client, failing that use the most + // caught up client. + UUID sourceClient = caughtUpClientsByTaskLoad.poll( movement.task, c -> clientStates.get(c).hasStandbyTask(movement.task) ); - if (standbySourceClient == null) { - // there's not a caught-up standby available to take over the task, so we'll schedule a warmup instead - final UUID sourceClient = requireNonNull( - caughtUpClientsByTaskLoad.poll(movement.task), - "Tried to move task to caught-up client but none exist" + + if (sourceClient == null) { + sourceClient = caughtUpClientsByTaskLoad.poll(movement.task); + } + + if (sourceClient == null) { + sourceClient = requireNonNull( + mostCaughtUpEligibleClient(tasksToClientByLag, movement.task, movement.destination), + "Tried to move task to more caught-up client but none exist" ); + } + if (!clientStates.get(sourceClient).hasStandbyTask(movement.task)) { + // there's not a standby available to take over the task, so we'll schedule a warmup instead Review comment: Sure let me have a bit of a play around. All these methods being static means that there's a lot of state/method arguments to pass to each method call. So I'm unsure about how much of a win we're going to get extracting smaller bits of code out into separate methods. Maybe some closures/local functions might help.... I'll have a bit of a play and get back to you :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org