ableegoldman commented on code in PR #16052: URL: https://github.com/apache/kafka/pull/16052#discussion_r1614388204
########## streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java: ########## @@ -0,0 +1,471 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.assignment.assignors; + +import static java.util.Collections.unmodifiableMap; + +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.TreeSet; +import java.util.stream.Collectors; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.assignment.ApplicationState; +import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment; +import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment.AssignedTask; +import org.apache.kafka.streams.processor.assignment.KafkaStreamsState; +import org.apache.kafka.streams.processor.assignment.ProcessId; +import org.apache.kafka.streams.processor.assignment.TaskAssignmentUtils; +import org.apache.kafka.streams.processor.assignment.TaskAssignor; +import org.apache.kafka.streams.processor.assignment.TaskInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class StickyTaskAssignor implements TaskAssignor { + private static final Logger LOG = LoggerFactory.getLogger(StickyTaskAssignor.class); + + private final boolean mustPreserveActiveTaskAssignment; + + public StickyTaskAssignor() { + this(false); + } + + public StickyTaskAssignor(final boolean mustPreserveActiveTaskAssignment) { + this.mustPreserveActiveTaskAssignment = mustPreserveActiveTaskAssignment; + } + + @Override + public void configure(final Map<String, ?> configs) {} + + @Override + public TaskAssignment assign(final ApplicationState applicationState) { + final Map<ProcessId, KafkaStreamsState> clients = applicationState.kafkaStreamsStates(false); + final Map<TaskId, ProcessId> previousActiveAssignment = mapPreviousActiveTasks(clients); + final Map<TaskId, Set<ProcessId>> previousStandbyAssignment = mapPreviousStandbyTasks(clients); + final AssignmentState assignmentState = new AssignmentState(applicationState, clients, + previousActiveAssignment, previousStandbyAssignment); + + assignActive(applicationState, clients.values(), assignmentState, this.mustPreserveActiveTaskAssignment); + optimizeActive(applicationState, assignmentState); + assignStandby(applicationState, assignmentState); + optimizeStandby(applicationState, assignmentState); + + final Map<ProcessId, KafkaStreamsAssignment> finalAssignments = assignmentState.buildKafkaStreamsAssignments(); + if (mustPreserveActiveTaskAssignment && !finalAssignments.isEmpty()) { + // We set the followup deadline for only one of the clients. + final ProcessId clientId = finalAssignments.keySet().iterator().next(); + final KafkaStreamsAssignment previousAssignment = finalAssignments.get(clientId); + finalAssignments.put(clientId, previousAssignment.withFollowupRebalance(Instant.ofEpochMilli(0))); + } + return new TaskAssignment(finalAssignments.values()); + } + + private void optimizeActive(final ApplicationState applicationState, + final AssignmentState assignmentState) { + if (mustPreserveActiveTaskAssignment) { + return; + } + + final Map<ProcessId, KafkaStreamsAssignment> currentAssignments = assignmentState.buildKafkaStreamsAssignments(); + + final Set<TaskId> statefulTasks = applicationState.allTasks().stream() + .filter(TaskInfo::isStateful) + .map(TaskInfo::id) + .collect(Collectors.toSet()); + final Map<ProcessId, KafkaStreamsAssignment> optimizedAssignmentsForStatefulTasks = TaskAssignmentUtils.optimizeRackAwareActiveTasks( + applicationState, currentAssignments, new TreeSet<>(statefulTasks)); + + final Set<TaskId> statelessTasks = applicationState.allTasks().stream() + .filter(task -> !task.isStateful()) + .map(TaskInfo::id) + .collect(Collectors.toSet()); + final Map<ProcessId, KafkaStreamsAssignment> optimizedAssignmentsForAllTasks = TaskAssignmentUtils.optimizeRackAwareActiveTasks( + applicationState, optimizedAssignmentsForStatefulTasks, new TreeSet<>(statelessTasks)); + + assignmentState.processOptimizedAssignments(optimizedAssignmentsForAllTasks); + } + + private void optimizeStandby(final ApplicationState applicationState, final AssignmentState assignmentState) { + if (applicationState.assignmentConfigs().numStandbyReplicas() <= 0) { + return; + } + + if (mustPreserveActiveTaskAssignment) { + return; + } + + final Map<ProcessId, KafkaStreamsAssignment> currentAssignments = assignmentState.buildKafkaStreamsAssignments(); + final Map<ProcessId, KafkaStreamsAssignment> optimizedAssignments = TaskAssignmentUtils.optimizeRackAwareStandbyTasks( + applicationState, currentAssignments); + assignmentState.processOptimizedAssignments(optimizedAssignments); + } + + private static void assignActive(final ApplicationState applicationState, + final Collection<KafkaStreamsState> clients, + final AssignmentState assignmentState, + final boolean mustPreserveActiveTaskAssignment) { + final int totalCapacity = computeStreamThreadCount(clients); + if (totalCapacity == 0) { + throw new IllegalStateException("`totalCapacity` should never be zero."); + } Review Comment: this is another superfluous check now, we should be able to trust the assignor inputs ########## streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java: ########## @@ -0,0 +1,471 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.assignment.assignors; + +import static java.util.Collections.unmodifiableMap; + +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.TreeSet; +import java.util.stream.Collectors; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.assignment.ApplicationState; +import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment; +import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment.AssignedTask; +import org.apache.kafka.streams.processor.assignment.KafkaStreamsState; +import org.apache.kafka.streams.processor.assignment.ProcessId; +import org.apache.kafka.streams.processor.assignment.TaskAssignmentUtils; +import org.apache.kafka.streams.processor.assignment.TaskAssignor; +import org.apache.kafka.streams.processor.assignment.TaskInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class StickyTaskAssignor implements TaskAssignor { + private static final Logger LOG = LoggerFactory.getLogger(StickyTaskAssignor.class); + + private final boolean mustPreserveActiveTaskAssignment; + + public StickyTaskAssignor() { + this(false); + } + + public StickyTaskAssignor(final boolean mustPreserveActiveTaskAssignment) { + this.mustPreserveActiveTaskAssignment = mustPreserveActiveTaskAssignment; + } + + @Override + public void configure(final Map<String, ?> configs) {} Review Comment: Can we add a default implementation to the TaskAssignor interface so users don't have to implement an empty method like this (and then remove this overload) ########## streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java: ########## @@ -0,0 +1,471 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.assignment.assignors; + +import static java.util.Collections.unmodifiableMap; + +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.TreeSet; +import java.util.stream.Collectors; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.assignment.ApplicationState; +import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment; +import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment.AssignedTask; +import org.apache.kafka.streams.processor.assignment.KafkaStreamsState; +import org.apache.kafka.streams.processor.assignment.ProcessId; +import org.apache.kafka.streams.processor.assignment.TaskAssignmentUtils; +import org.apache.kafka.streams.processor.assignment.TaskAssignor; +import org.apache.kafka.streams.processor.assignment.TaskInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class StickyTaskAssignor implements TaskAssignor { + private static final Logger LOG = LoggerFactory.getLogger(StickyTaskAssignor.class); + + private final boolean mustPreserveActiveTaskAssignment; + + public StickyTaskAssignor() { + this(false); + } + + public StickyTaskAssignor(final boolean mustPreserveActiveTaskAssignment) { + this.mustPreserveActiveTaskAssignment = mustPreserveActiveTaskAssignment; + } + + @Override + public void configure(final Map<String, ?> configs) {} + + @Override + public TaskAssignment assign(final ApplicationState applicationState) { + final Map<ProcessId, KafkaStreamsState> clients = applicationState.kafkaStreamsStates(false); + final Map<TaskId, ProcessId> previousActiveAssignment = mapPreviousActiveTasks(clients); + final Map<TaskId, Set<ProcessId>> previousStandbyAssignment = mapPreviousStandbyTasks(clients); + final AssignmentState assignmentState = new AssignmentState(applicationState, clients, + previousActiveAssignment, previousStandbyAssignment); + + assignActive(applicationState, clients.values(), assignmentState, this.mustPreserveActiveTaskAssignment); + optimizeActive(applicationState, assignmentState); + assignStandby(applicationState, assignmentState); + optimizeStandby(applicationState, assignmentState); + + final Map<ProcessId, KafkaStreamsAssignment> finalAssignments = assignmentState.buildKafkaStreamsAssignments(); + if (mustPreserveActiveTaskAssignment && !finalAssignments.isEmpty()) { + // We set the followup deadline for only one of the clients. + final ProcessId clientId = finalAssignments.keySet().iterator().next(); + final KafkaStreamsAssignment previousAssignment = finalAssignments.get(clientId); + finalAssignments.put(clientId, previousAssignment.withFollowupRebalance(Instant.ofEpochMilli(0))); + } + return new TaskAssignment(finalAssignments.values()); + } + + private void optimizeActive(final ApplicationState applicationState, + final AssignmentState assignmentState) { + if (mustPreserveActiveTaskAssignment) { + return; + } + + final Map<ProcessId, KafkaStreamsAssignment> currentAssignments = assignmentState.buildKafkaStreamsAssignments(); + + final Set<TaskId> statefulTasks = applicationState.allTasks().stream() + .filter(TaskInfo::isStateful) + .map(TaskInfo::id) + .collect(Collectors.toSet()); + final Map<ProcessId, KafkaStreamsAssignment> optimizedAssignmentsForStatefulTasks = TaskAssignmentUtils.optimizeRackAwareActiveTasks( + applicationState, currentAssignments, new TreeSet<>(statefulTasks)); + + final Set<TaskId> statelessTasks = applicationState.allTasks().stream() + .filter(task -> !task.isStateful()) + .map(TaskInfo::id) + .collect(Collectors.toSet()); + final Map<ProcessId, KafkaStreamsAssignment> optimizedAssignmentsForAllTasks = TaskAssignmentUtils.optimizeRackAwareActiveTasks( + applicationState, optimizedAssignmentsForStatefulTasks, new TreeSet<>(statelessTasks)); + + assignmentState.processOptimizedAssignments(optimizedAssignmentsForAllTasks); + } + + private void optimizeStandby(final ApplicationState applicationState, final AssignmentState assignmentState) { + if (applicationState.assignmentConfigs().numStandbyReplicas() <= 0) { + return; + } + + if (mustPreserveActiveTaskAssignment) { + return; + } + + final Map<ProcessId, KafkaStreamsAssignment> currentAssignments = assignmentState.buildKafkaStreamsAssignments(); + final Map<ProcessId, KafkaStreamsAssignment> optimizedAssignments = TaskAssignmentUtils.optimizeRackAwareStandbyTasks( + applicationState, currentAssignments); + assignmentState.processOptimizedAssignments(optimizedAssignments); + } + + private static void assignActive(final ApplicationState applicationState, + final Collection<KafkaStreamsState> clients, + final AssignmentState assignmentState, + final boolean mustPreserveActiveTaskAssignment) { + final int totalCapacity = computeStreamThreadCount(clients); + if (totalCapacity == 0) { + throw new IllegalStateException("`totalCapacity` should never be zero."); + } + + final Set<TaskId> allTaskIds = applicationState.allTasks().stream() + .map(TaskInfo::id).collect(Collectors.toSet()); + final int taskCount = allTaskIds.size(); + final int activeTasksPerThread = taskCount / totalCapacity; + final Set<TaskId> unassigned = new HashSet<>(allTaskIds); + + // first try and re-assign existing active tasks to clients that previously had + // the same active task + for (final TaskId taskId : assignmentState.previousActiveAssignment.keySet()) { + final ProcessId previousClientForTask = assignmentState.previousActiveAssignment.get(taskId); + if (allTaskIds.contains(taskId)) { + if (mustPreserveActiveTaskAssignment || assignmentState.hasRoomForActiveTask(previousClientForTask, activeTasksPerThread)) { + assignmentState.finalizeAssignment(taskId, previousClientForTask, AssignedTask.Type.ACTIVE); + unassigned.remove(taskId); + } + } + } + + // try and assign any remaining unassigned tasks to clients that previously + // have seen the task. + for (final Iterator<TaskId> iterator = unassigned.iterator(); iterator.hasNext(); ) { + final TaskId taskId = iterator.next(); + final Set<ProcessId> previousClientsForStandbyTask = assignmentState.previousStandbyAssignment.getOrDefault(taskId, new HashSet<>()); + for (final ProcessId client: previousClientsForStandbyTask) { + if (assignmentState.hasRoomForActiveTask(client, activeTasksPerThread)) { + assignmentState.finalizeAssignment(taskId, client, AssignedTask.Type.ACTIVE); + iterator.remove(); + break; + } + } + } + + // assign any remaining unassigned tasks + final List<TaskId> sortedTasks = new ArrayList<>(unassigned); + Collections.sort(sortedTasks); + for (final TaskId taskId : sortedTasks) { + final Set<ProcessId> candidateClients = clients.stream() + .map(KafkaStreamsState::processId) + .collect(Collectors.toSet()); + final ProcessId bestClient = assignmentState.findBestClientForTask(taskId, candidateClients); + assignmentState.finalizeAssignment(taskId, bestClient, AssignedTask.Type.ACTIVE); + } + } + + private static void assignStandby(final ApplicationState applicationState, + final AssignmentState assignmentState) { + final Set<TaskInfo> statefulTasks = applicationState.allTasks().stream() + .filter(TaskInfo::isStateful) + .collect(Collectors.toSet()); + final int numStandbyReplicas = applicationState.assignmentConfigs().numStandbyReplicas(); + for (final TaskInfo task : statefulTasks) { + for (int i = 0; i < numStandbyReplicas; i++) { + final Set<ProcessId> candidateClients = assignmentState.findClientsWithoutAssignedTask(task.id()); + if (candidateClients.isEmpty()) { + LOG.warn("Unable to assign {} of {} standby tasks for task [{}]. " + + "There is not enough available capacity. You should " + + "increase the number of threads and/or application instances " + + "to maintain the requested number of standby replicas.", + numStandbyReplicas - i, + numStandbyReplicas, task.id()); + break; + } + + final ProcessId bestClient = assignmentState.findBestClientForTask(task.id(), candidateClients); + assignmentState.finalizeAssignment(task.id(), bestClient, AssignedTask.Type.STANDBY); + } + } + } + + private static Map<TaskId, ProcessId> mapPreviousActiveTasks(final Map<ProcessId, KafkaStreamsState> clients) { + final Map<TaskId, ProcessId> previousActiveTasks = new HashMap<>(); + for (final KafkaStreamsState client : clients.values()) { + for (final TaskId taskId : client.previousActiveTasks()) { + previousActiveTasks.put(taskId, client.processId()); + } + } + return previousActiveTasks; + } + + private static Map<TaskId, Set<ProcessId>> mapPreviousStandbyTasks(final Map<ProcessId, KafkaStreamsState> clients) { + final Map<TaskId, Set<ProcessId>> previousStandbyTasks = new HashMap<>(); + for (final KafkaStreamsState client : clients.values()) { + for (final TaskId taskId : client.previousActiveTasks()) { + previousStandbyTasks.computeIfAbsent(taskId, k -> new HashSet<>()); + previousStandbyTasks.get(taskId).add(client.processId()); + } + } + return previousStandbyTasks; + } + + private static int computeStreamThreadCount(final Collection<KafkaStreamsState> clients) { Review Comment: nit: rename to `computeTotalProcessingThreads` (Technically StreamThread is correct but we want to start using "processing thread" vs "consumer" to be precise when we mean one vs the other, since we have upcoming plans to decouple these. Whereas right now "StreamThread" can mean either. In case you're wondering why I'm being pedantic) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org