ableegoldman commented on code in PR #16052:
URL: https://github.com/apache/kafka/pull/16052#discussion_r1614388204


##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java:
##########
@@ -0,0 +1,471 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.assignment.assignors;
+
+import static java.util.Collections.unmodifiableMap;
+
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.assignment.ApplicationState;
+import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment;
+import 
org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment.AssignedTask;
+import org.apache.kafka.streams.processor.assignment.KafkaStreamsState;
+import org.apache.kafka.streams.processor.assignment.ProcessId;
+import org.apache.kafka.streams.processor.assignment.TaskAssignmentUtils;
+import org.apache.kafka.streams.processor.assignment.TaskAssignor;
+import org.apache.kafka.streams.processor.assignment.TaskInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class StickyTaskAssignor implements TaskAssignor {
+    private static final Logger LOG = 
LoggerFactory.getLogger(StickyTaskAssignor.class);
+
+    private final boolean mustPreserveActiveTaskAssignment;
+
+    public StickyTaskAssignor() {
+        this(false);
+    }
+
+    public StickyTaskAssignor(final boolean mustPreserveActiveTaskAssignment) {
+        this.mustPreserveActiveTaskAssignment = 
mustPreserveActiveTaskAssignment;
+    }
+
+    @Override
+    public void configure(final Map<String, ?> configs) {}
+
+    @Override
+    public TaskAssignment assign(final ApplicationState applicationState) {
+        final Map<ProcessId, KafkaStreamsState> clients = 
applicationState.kafkaStreamsStates(false);
+        final Map<TaskId, ProcessId> previousActiveAssignment = 
mapPreviousActiveTasks(clients);
+        final Map<TaskId, Set<ProcessId>> previousStandbyAssignment = 
mapPreviousStandbyTasks(clients);
+        final AssignmentState assignmentState = new 
AssignmentState(applicationState, clients,
+            previousActiveAssignment, previousStandbyAssignment);
+
+        assignActive(applicationState, clients.values(), assignmentState, 
this.mustPreserveActiveTaskAssignment);
+        optimizeActive(applicationState, assignmentState);
+        assignStandby(applicationState, assignmentState);
+        optimizeStandby(applicationState, assignmentState);
+
+        final Map<ProcessId, KafkaStreamsAssignment> finalAssignments = 
assignmentState.buildKafkaStreamsAssignments();
+        if (mustPreserveActiveTaskAssignment && !finalAssignments.isEmpty()) {
+            // We set the followup deadline for only one of the clients.
+            final ProcessId clientId = 
finalAssignments.keySet().iterator().next();
+            final KafkaStreamsAssignment previousAssignment = 
finalAssignments.get(clientId);
+            finalAssignments.put(clientId, 
previousAssignment.withFollowupRebalance(Instant.ofEpochMilli(0)));
+        }
+        return new TaskAssignment(finalAssignments.values());
+    }
+
+    private void optimizeActive(final ApplicationState applicationState,
+                                final AssignmentState assignmentState) {
+        if (mustPreserveActiveTaskAssignment) {
+            return;
+        }
+
+        final Map<ProcessId, KafkaStreamsAssignment> currentAssignments = 
assignmentState.buildKafkaStreamsAssignments();
+
+        final Set<TaskId> statefulTasks = applicationState.allTasks().stream()
+            .filter(TaskInfo::isStateful)
+            .map(TaskInfo::id)
+            .collect(Collectors.toSet());
+        final Map<ProcessId, KafkaStreamsAssignment> 
optimizedAssignmentsForStatefulTasks = 
TaskAssignmentUtils.optimizeRackAwareActiveTasks(
+            applicationState, currentAssignments, new 
TreeSet<>(statefulTasks));
+
+        final Set<TaskId> statelessTasks = applicationState.allTasks().stream()
+            .filter(task -> !task.isStateful())
+            .map(TaskInfo::id)
+            .collect(Collectors.toSet());
+        final Map<ProcessId, KafkaStreamsAssignment> 
optimizedAssignmentsForAllTasks = 
TaskAssignmentUtils.optimizeRackAwareActiveTasks(
+            applicationState, optimizedAssignmentsForStatefulTasks, new 
TreeSet<>(statelessTasks));
+
+        
assignmentState.processOptimizedAssignments(optimizedAssignmentsForAllTasks);
+    }
+
+    private void optimizeStandby(final ApplicationState applicationState, 
final AssignmentState assignmentState) {
+        if (applicationState.assignmentConfigs().numStandbyReplicas() <= 0) {
+            return;
+        }
+
+        if (mustPreserveActiveTaskAssignment) {
+            return;
+        }
+
+        final Map<ProcessId, KafkaStreamsAssignment> currentAssignments = 
assignmentState.buildKafkaStreamsAssignments();
+        final Map<ProcessId, KafkaStreamsAssignment> optimizedAssignments = 
TaskAssignmentUtils.optimizeRackAwareStandbyTasks(
+            applicationState, currentAssignments);
+        assignmentState.processOptimizedAssignments(optimizedAssignments);
+    }
+
+    private static void assignActive(final ApplicationState applicationState,
+                                     final Collection<KafkaStreamsState> 
clients,
+                                     final AssignmentState assignmentState,
+                                     final boolean 
mustPreserveActiveTaskAssignment) {
+        final int totalCapacity = computeStreamThreadCount(clients);
+        if (totalCapacity == 0) {
+            throw new IllegalStateException("`totalCapacity` should never be 
zero.");
+        }

Review Comment:
   this is another superfluous check now, we should be able to trust the 
assignor inputs



##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java:
##########
@@ -0,0 +1,471 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.assignment.assignors;
+
+import static java.util.Collections.unmodifiableMap;
+
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.assignment.ApplicationState;
+import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment;
+import 
org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment.AssignedTask;
+import org.apache.kafka.streams.processor.assignment.KafkaStreamsState;
+import org.apache.kafka.streams.processor.assignment.ProcessId;
+import org.apache.kafka.streams.processor.assignment.TaskAssignmentUtils;
+import org.apache.kafka.streams.processor.assignment.TaskAssignor;
+import org.apache.kafka.streams.processor.assignment.TaskInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class StickyTaskAssignor implements TaskAssignor {
+    private static final Logger LOG = 
LoggerFactory.getLogger(StickyTaskAssignor.class);
+
+    private final boolean mustPreserveActiveTaskAssignment;
+
+    public StickyTaskAssignor() {
+        this(false);
+    }
+
+    public StickyTaskAssignor(final boolean mustPreserveActiveTaskAssignment) {
+        this.mustPreserveActiveTaskAssignment = 
mustPreserveActiveTaskAssignment;
+    }
+
+    @Override
+    public void configure(final Map<String, ?> configs) {}

Review Comment:
   Can we add a default implementation to the TaskAssignor interface so users 
don't have to implement an empty method like this (and then remove this 
overload)



##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java:
##########
@@ -0,0 +1,471 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.assignment.assignors;
+
+import static java.util.Collections.unmodifiableMap;
+
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.assignment.ApplicationState;
+import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment;
+import 
org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment.AssignedTask;
+import org.apache.kafka.streams.processor.assignment.KafkaStreamsState;
+import org.apache.kafka.streams.processor.assignment.ProcessId;
+import org.apache.kafka.streams.processor.assignment.TaskAssignmentUtils;
+import org.apache.kafka.streams.processor.assignment.TaskAssignor;
+import org.apache.kafka.streams.processor.assignment.TaskInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class StickyTaskAssignor implements TaskAssignor {
+    private static final Logger LOG = 
LoggerFactory.getLogger(StickyTaskAssignor.class);
+
+    private final boolean mustPreserveActiveTaskAssignment;
+
+    public StickyTaskAssignor() {
+        this(false);
+    }
+
+    public StickyTaskAssignor(final boolean mustPreserveActiveTaskAssignment) {
+        this.mustPreserveActiveTaskAssignment = 
mustPreserveActiveTaskAssignment;
+    }
+
+    @Override
+    public void configure(final Map<String, ?> configs) {}
+
+    @Override
+    public TaskAssignment assign(final ApplicationState applicationState) {
+        final Map<ProcessId, KafkaStreamsState> clients = 
applicationState.kafkaStreamsStates(false);
+        final Map<TaskId, ProcessId> previousActiveAssignment = 
mapPreviousActiveTasks(clients);
+        final Map<TaskId, Set<ProcessId>> previousStandbyAssignment = 
mapPreviousStandbyTasks(clients);
+        final AssignmentState assignmentState = new 
AssignmentState(applicationState, clients,
+            previousActiveAssignment, previousStandbyAssignment);
+
+        assignActive(applicationState, clients.values(), assignmentState, 
this.mustPreserveActiveTaskAssignment);
+        optimizeActive(applicationState, assignmentState);
+        assignStandby(applicationState, assignmentState);
+        optimizeStandby(applicationState, assignmentState);
+
+        final Map<ProcessId, KafkaStreamsAssignment> finalAssignments = 
assignmentState.buildKafkaStreamsAssignments();
+        if (mustPreserveActiveTaskAssignment && !finalAssignments.isEmpty()) {
+            // We set the followup deadline for only one of the clients.
+            final ProcessId clientId = 
finalAssignments.keySet().iterator().next();
+            final KafkaStreamsAssignment previousAssignment = 
finalAssignments.get(clientId);
+            finalAssignments.put(clientId, 
previousAssignment.withFollowupRebalance(Instant.ofEpochMilli(0)));
+        }
+        return new TaskAssignment(finalAssignments.values());
+    }
+
+    private void optimizeActive(final ApplicationState applicationState,
+                                final AssignmentState assignmentState) {
+        if (mustPreserveActiveTaskAssignment) {
+            return;
+        }
+
+        final Map<ProcessId, KafkaStreamsAssignment> currentAssignments = 
assignmentState.buildKafkaStreamsAssignments();
+
+        final Set<TaskId> statefulTasks = applicationState.allTasks().stream()
+            .filter(TaskInfo::isStateful)
+            .map(TaskInfo::id)
+            .collect(Collectors.toSet());
+        final Map<ProcessId, KafkaStreamsAssignment> 
optimizedAssignmentsForStatefulTasks = 
TaskAssignmentUtils.optimizeRackAwareActiveTasks(
+            applicationState, currentAssignments, new 
TreeSet<>(statefulTasks));
+
+        final Set<TaskId> statelessTasks = applicationState.allTasks().stream()
+            .filter(task -> !task.isStateful())
+            .map(TaskInfo::id)
+            .collect(Collectors.toSet());
+        final Map<ProcessId, KafkaStreamsAssignment> 
optimizedAssignmentsForAllTasks = 
TaskAssignmentUtils.optimizeRackAwareActiveTasks(
+            applicationState, optimizedAssignmentsForStatefulTasks, new 
TreeSet<>(statelessTasks));
+
+        
assignmentState.processOptimizedAssignments(optimizedAssignmentsForAllTasks);
+    }
+
+    private void optimizeStandby(final ApplicationState applicationState, 
final AssignmentState assignmentState) {
+        if (applicationState.assignmentConfigs().numStandbyReplicas() <= 0) {
+            return;
+        }
+
+        if (mustPreserveActiveTaskAssignment) {
+            return;
+        }
+
+        final Map<ProcessId, KafkaStreamsAssignment> currentAssignments = 
assignmentState.buildKafkaStreamsAssignments();
+        final Map<ProcessId, KafkaStreamsAssignment> optimizedAssignments = 
TaskAssignmentUtils.optimizeRackAwareStandbyTasks(
+            applicationState, currentAssignments);
+        assignmentState.processOptimizedAssignments(optimizedAssignments);
+    }
+
+    private static void assignActive(final ApplicationState applicationState,
+                                     final Collection<KafkaStreamsState> 
clients,
+                                     final AssignmentState assignmentState,
+                                     final boolean 
mustPreserveActiveTaskAssignment) {
+        final int totalCapacity = computeStreamThreadCount(clients);
+        if (totalCapacity == 0) {
+            throw new IllegalStateException("`totalCapacity` should never be 
zero.");
+        }
+
+        final Set<TaskId> allTaskIds = applicationState.allTasks().stream()
+            .map(TaskInfo::id).collect(Collectors.toSet());
+        final int taskCount = allTaskIds.size();
+        final int activeTasksPerThread = taskCount / totalCapacity;
+        final Set<TaskId> unassigned = new HashSet<>(allTaskIds);
+
+        // first try and re-assign existing active tasks to clients that 
previously had
+        // the same active task
+        for (final TaskId taskId : 
assignmentState.previousActiveAssignment.keySet()) {
+            final ProcessId previousClientForTask = 
assignmentState.previousActiveAssignment.get(taskId);
+            if (allTaskIds.contains(taskId)) {
+                if (mustPreserveActiveTaskAssignment || 
assignmentState.hasRoomForActiveTask(previousClientForTask, 
activeTasksPerThread)) {
+                    assignmentState.finalizeAssignment(taskId, 
previousClientForTask, AssignedTask.Type.ACTIVE);
+                    unassigned.remove(taskId);
+                }
+            }
+        }
+
+        // try and assign any remaining unassigned tasks to clients that 
previously
+        // have seen the task.
+        for (final Iterator<TaskId> iterator = unassigned.iterator(); 
iterator.hasNext(); ) {
+            final TaskId taskId = iterator.next();
+            final Set<ProcessId> previousClientsForStandbyTask = 
assignmentState.previousStandbyAssignment.getOrDefault(taskId, new HashSet<>());
+            for (final ProcessId client: previousClientsForStandbyTask) {
+                if (assignmentState.hasRoomForActiveTask(client, 
activeTasksPerThread)) {
+                    assignmentState.finalizeAssignment(taskId, client, 
AssignedTask.Type.ACTIVE);
+                    iterator.remove();
+                    break;
+                }
+            }
+        }
+
+        // assign any remaining unassigned tasks
+        final List<TaskId> sortedTasks = new ArrayList<>(unassigned);
+        Collections.sort(sortedTasks);
+        for (final TaskId taskId : sortedTasks) {
+            final Set<ProcessId> candidateClients = clients.stream()
+                .map(KafkaStreamsState::processId)
+                .collect(Collectors.toSet());
+            final ProcessId bestClient = 
assignmentState.findBestClientForTask(taskId, candidateClients);
+            assignmentState.finalizeAssignment(taskId, bestClient, 
AssignedTask.Type.ACTIVE);
+        }
+    }
+
+    private static void assignStandby(final ApplicationState applicationState,
+                                      final AssignmentState assignmentState) {
+        final Set<TaskInfo> statefulTasks = 
applicationState.allTasks().stream()
+            .filter(TaskInfo::isStateful)
+            .collect(Collectors.toSet());
+        final int numStandbyReplicas = 
applicationState.assignmentConfigs().numStandbyReplicas();
+        for (final TaskInfo task : statefulTasks) {
+            for (int i = 0; i < numStandbyReplicas; i++) {
+                final Set<ProcessId> candidateClients = 
assignmentState.findClientsWithoutAssignedTask(task.id());
+                if (candidateClients.isEmpty()) {
+                    LOG.warn("Unable to assign {} of {} standby tasks for task 
[{}]. " +
+                             "There is not enough available capacity. You 
should " +
+                             "increase the number of threads and/or 
application instances " +
+                             "to maintain the requested number of standby 
replicas.",
+                        numStandbyReplicas - i,
+                        numStandbyReplicas, task.id());
+                    break;
+                }
+
+                final ProcessId bestClient = 
assignmentState.findBestClientForTask(task.id(), candidateClients);
+                assignmentState.finalizeAssignment(task.id(), bestClient, 
AssignedTask.Type.STANDBY);
+            }
+        }
+    }
+
+    private static Map<TaskId, ProcessId> mapPreviousActiveTasks(final 
Map<ProcessId, KafkaStreamsState> clients) {
+        final Map<TaskId, ProcessId> previousActiveTasks = new HashMap<>();
+        for (final KafkaStreamsState client : clients.values()) {
+            for (final TaskId taskId : client.previousActiveTasks()) {
+                previousActiveTasks.put(taskId, client.processId());
+            }
+        }
+        return previousActiveTasks;
+    }
+
+    private static Map<TaskId, Set<ProcessId>> mapPreviousStandbyTasks(final 
Map<ProcessId, KafkaStreamsState> clients) {
+        final Map<TaskId, Set<ProcessId>> previousStandbyTasks = new 
HashMap<>();
+        for (final KafkaStreamsState client : clients.values()) {
+            for (final TaskId taskId : client.previousActiveTasks()) {
+                previousStandbyTasks.computeIfAbsent(taskId, k -> new 
HashSet<>());
+                previousStandbyTasks.get(taskId).add(client.processId());
+            }
+        }
+        return previousStandbyTasks;
+    }
+
+    private static int computeStreamThreadCount(final 
Collection<KafkaStreamsState> clients) {

Review Comment:
   nit: rename to `computeTotalProcessingThreads`
   
   (Technically StreamThread is correct but we want to start using "processing 
thread" vs "consumer" to be precise when we mean one vs the other, since we 
have upcoming plans to decouple these. Whereas right now "StreamThread" can 
mean either. In case you're wondering why I'm being pedantic)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to