ableegoldman commented on code in PR #16129:
URL: https://github.com/apache/kafka/pull/16129#discussion_r1619920514


##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##########
@@ -407,4 +543,345 @@ private static boolean hasValidRackInformation(final 
TaskInfo task,
         }
         return true;
     }
+
+    private static Map<ProcessId, KafkaStreamsAssignment> 
tagBasedStandbyTaskAssignment(final ApplicationState applicationState,
+                                                                               
         final Map<ProcessId, KafkaStreamsAssignment> kafkaStreamsAssignments) {
+        final int numStandbyReplicas = 
applicationState.assignmentConfigs().numStandbyReplicas();
+        final Map<TaskId, Integer> tasksToRemainingStandbys = 
applicationState.allTasks().values().stream()
+            .collect(Collectors.toMap(TaskInfo::id, taskInfo -> 
numStandbyReplicas));
+        final Map<ProcessId, KafkaStreamsState> streamStates = 
applicationState.kafkaStreamsStates(false);
+
+        final Set<String> rackAwareAssignmentTags = new 
HashSet<>(getRackAwareAssignmentTags(applicationState));
+        final TagStatistics tagStatistics = new 
TagStatistics(applicationState);
+
+        final ConstrainedPrioritySet standbyTaskClientsByTaskLoad = 
standbyTaskPriorityListByLoad(streamStates, kafkaStreamsAssignments);
+
+        final Set<TaskId> statefulTaskIds = 
applicationState.allTasks().values().stream()
+            .filter(TaskInfo::isStateful)
+            .map(TaskInfo::id)
+            .collect(Collectors.toSet());
+        final Map<UUID, KafkaStreamsAssignment> clientsByUuid = 
kafkaStreamsAssignments.entrySet().stream().collect(Collectors.toMap(
+            entry -> entry.getKey().id(),
+            Map.Entry::getValue
+        ));
+        final Function<KafkaStreamsState, Map<String, String>> clientTagGetter 
= createClientTagGetter(applicationState);
+
+        final Map<TaskId, ProcessId> pendingStandbyTasksToClientId = new 
HashMap<>();
+        for (final TaskId statefulTaskId : statefulTaskIds) {
+            for (final KafkaStreamsAssignment assignment : 
clientsByUuid.values()) {
+                if (assignment.tasks().containsKey(statefulTaskId)) {
+                    assignStandbyTasksToClientsWithDifferentTags(
+                        numStandbyReplicas,
+                        standbyTaskClientsByTaskLoad,
+                        statefulTaskId,
+                        assignment.processId(),
+                        rackAwareAssignmentTags,
+                        streamStates,
+                        kafkaStreamsAssignments,
+                        tasksToRemainingStandbys,
+                        tagStatistics.tagKeyToValues,
+                        tagStatistics.tagEntryToClients,
+                        pendingStandbyTasksToClientId,
+                        clientTagGetter
+                    );
+                }
+            }
+        }
+
+        if (!tasksToRemainingStandbys.isEmpty()) {
+            assignPendingStandbyTasksToLeastLoadedClients(clientsByUuid,
+                numStandbyReplicas,
+                standbyTaskClientsByTaskLoad,
+                tasksToRemainingStandbys);
+        }
+
+        return kafkaStreamsAssignments;
+    }
+
+    private static Map<ProcessId, KafkaStreamsAssignment> 
loadBasedStandbyTaskAssignment(final ApplicationState applicationState,
+                                                                               
          final Map<ProcessId, KafkaStreamsAssignment> kafkaStreamsAssignments) 
{
+        final int numStandbyReplicas = 
applicationState.assignmentConfigs().numStandbyReplicas();
+        final Map<TaskId, Integer> tasksToRemainingStandbys = 
applicationState.allTasks().values().stream()
+            .collect(Collectors.toMap(TaskInfo::id, taskInfo -> 
numStandbyReplicas));
+        final Map<ProcessId, KafkaStreamsState> streamStates = 
applicationState.kafkaStreamsStates(false);
+
+        final Set<TaskId> statefulTaskIds = 
applicationState.allTasks().values().stream()
+            .filter(TaskInfo::isStateful)
+            .map(TaskInfo::id)
+            .collect(Collectors.toSet());
+        final Map<UUID, KafkaStreamsAssignment> clients = 
kafkaStreamsAssignments.entrySet().stream().collect(Collectors.toMap(
+            entry -> entry.getKey().id(),
+            Map.Entry::getValue
+        ));
+
+        final ConstrainedPrioritySet standbyTaskClientsByTaskLoad = 
standbyTaskPriorityListByLoad(streamStates, kafkaStreamsAssignments);
+        
standbyTaskClientsByTaskLoad.offerAll(streamStates.keySet().stream().map(ProcessId::id).collect(Collectors.toSet()));
+        for (final TaskId task : statefulTaskIds) {
+            assignStandbyTasksForActiveTask(numStandbyReplicas, clients,
+                tasksToRemainingStandbys, standbyTaskClientsByTaskLoad, task);
+        }
+        return kafkaStreamsAssignments;
+    }
+
+    private static void assignStandbyTasksForActiveTask(final int 
numStandbyReplicas,
+                                                        final Map<UUID, 
KafkaStreamsAssignment> clients,
+                                                        final Map<TaskId, 
Integer> tasksToRemainingStandbys,
+                                                        final 
ConstrainedPrioritySet standbyTaskClientsByTaskLoad,
+                                                        final TaskId 
activeTaskId) {
+        int numRemainingStandbys = tasksToRemainingStandbys.get(activeTaskId);
+        while (numRemainingStandbys > 0) {
+            final UUID client = 
standbyTaskClientsByTaskLoad.poll(activeTaskId);
+            if (client == null) {
+                break;
+            }
+            clients.get(client).assignTask(new AssignedTask(activeTaskId, 
AssignedTask.Type.STANDBY));
+            numRemainingStandbys--;
+            standbyTaskClientsByTaskLoad.offer(client);
+            tasksToRemainingStandbys.put(activeTaskId, numRemainingStandbys);
+        }
+
+        if (numRemainingStandbys > 0) {
+            LOG.warn("Unable to assign {} of {} standby tasks for task [{}]. " 
+
+                     "There is not enough available capacity. You should " +
+                     "increase the number of application instances " +
+                     "to maintain the requested number of standby replicas.",
+                numRemainingStandbys, numStandbyReplicas, activeTaskId);
+        }
+    }
+
+    private static void assignStandbyTasksToClientsWithDifferentTags(final int 
numberOfStandbyClients,
+                                                                     final 
ConstrainedPrioritySet standbyTaskClientsByTaskLoad,
+                                                                     final 
TaskId activeTaskId,
+                                                                     final 
ProcessId activeTaskClient,
+                                                                     final 
Set<String> rackAwareAssignmentTags,
+                                                                     final 
Map<ProcessId, KafkaStreamsState> clientStates,
+                                                                     final 
Map<ProcessId, KafkaStreamsAssignment> kafkaStreamsAssignments,
+                                                                     final 
Map<TaskId, Integer> tasksToRemainingStandbys,
+                                                                     final 
Map<String, Set<String>> tagKeyToValues,
+                                                                     final 
Map<KeyValue<String, String>, Set<ProcessId>> tagEntryToClients,
+                                                                     final 
Map<TaskId, ProcessId> pendingStandbyTasksToClientId,
+                                                                     final 
Function<KafkaStreamsState, Map<String, String>> clientTagGetter) {
+        standbyTaskClientsByTaskLoad.offerAll(clientStates.keySet().stream()
+            .map(ProcessId::id).collect(Collectors.toSet()));
+
+        // We set countOfUsedClients as 1 because client where active task is 
located has to be considered as used.
+        int countOfUsedClients = 1;
+        int numRemainingStandbys = tasksToRemainingStandbys.get(activeTaskId);
+
+        final Map<KeyValue<String, String>, Set<ProcessId>> 
tagEntryToUsedClients = new HashMap<>();
+
+        ProcessId lastUsedClient = activeTaskClient;
+        do {
+            updateClientsOnAlreadyUsedTagEntries(
+                clientStates.get(lastUsedClient),
+                countOfUsedClients,
+                rackAwareAssignmentTags,
+                tagEntryToClients,
+                tagKeyToValues,
+                tagEntryToUsedClients,
+                clientTagGetter
+            );
+
+            final UUID clientOnUnusedTagDimensions = 
standbyTaskClientsByTaskLoad.poll(
+                activeTaskId, uuid -> !isClientUsedOnAnyOfTheTagEntries(new 
ProcessId(uuid), tagEntryToUsedClients)
+            );
+
+            if (clientOnUnusedTagDimensions == null) {
+                break;
+            }
+
+            final KafkaStreamsState clientStateOnUsedTagDimensions = 
clientStates.get(new ProcessId(clientOnUnusedTagDimensions));
+            countOfUsedClients++;
+            numRemainingStandbys--;
+
+            LOG.debug("Assigning {} out of {} standby tasks for an active task 
[{}] with client tags {}. " +
+                      "Standby task client tags are {}.",
+                numberOfStandbyClients - numRemainingStandbys, 
numberOfStandbyClients, activeTaskId,
+                clientTagGetter.apply(clientStates.get(activeTaskClient)),
+                clientTagGetter.apply(clientStateOnUsedTagDimensions));
+
+            
kafkaStreamsAssignments.get(clientStateOnUsedTagDimensions.processId()).assignTask(
+                new AssignedTask(activeTaskId, AssignedTask.Type.STANDBY)
+            );
+            lastUsedClient = new ProcessId(clientOnUnusedTagDimensions);
+        } while (numRemainingStandbys > 0);
+
+        if (numRemainingStandbys > 0) {
+            pendingStandbyTasksToClientId.put(activeTaskId, activeTaskClient);
+            tasksToRemainingStandbys.put(activeTaskId, numRemainingStandbys);
+            LOG.warn("Rack aware standby task assignment was not able to 
assign {} of {} standby tasks for the " +
+                     "active task [{}] with the rack aware assignment tags {}. 
" +
+                     "This may happen when there aren't enough application 
instances on different tag " +
+                     "dimensions compared to an active and corresponding 
standby task. " +
+                     "Consider launching application instances on different 
tag dimensions than [{}]. " +
+                     "Standby task assignment will fall back to assigning 
standby tasks to the least loaded clients.",
+                numRemainingStandbys, numberOfStandbyClients,
+                activeTaskId, rackAwareAssignmentTags,
+                clientTagGetter.apply(clientStates.get(activeTaskClient)));
+
+        } else {
+            tasksToRemainingStandbys.remove(activeTaskId);
+        }
+    }
+
+    private static boolean isClientUsedOnAnyOfTheTagEntries(final ProcessId 
client,
+                                                            final 
Map<KeyValue<String, String>, Set<ProcessId>> tagEntryToUsedClients) {
+        return tagEntryToUsedClients.values().stream().anyMatch(usedClients -> 
usedClients.contains(client));
+    }
+
+    private static void updateClientsOnAlreadyUsedTagEntries(final 
KafkaStreamsState usedClient,
+                                                             final int 
countOfUsedClients,
+                                                             final Set<String> 
rackAwareAssignmentTags,
+                                                             final 
Map<KeyValue<String, String>, Set<ProcessId>> tagEntryToClients,
+                                                             final Map<String, 
Set<String>> tagKeyToValues,
+                                                             final 
Map<KeyValue<String, String>, Set<ProcessId>> tagEntryToUsedClients,
+                                                             final 
Function<KafkaStreamsState, Map<String, String>> clientTagGetter) {
+        final Map<String, String> usedClientTags = 
clientTagGetter.apply(usedClient);
+
+        for (final Map.Entry<String, String> usedClientTagEntry : 
usedClientTags.entrySet()) {
+            final String tagKey = usedClientTagEntry.getKey();
+
+            if (!rackAwareAssignmentTags.contains(tagKey)) {
+                LOG.warn("Client tag with key [{}] will be ignored when 
computing rack aware standby " +
+                         "task assignment because it is not part of the 
configured rack awareness [{}].",
+                    tagKey, rackAwareAssignmentTags);
+                continue;
+            }
+
+            final Set<String> allTagValues = tagKeyToValues.get(tagKey);
+
+            if (allTagValues.size() <= countOfUsedClients) {
+                allTagValues.forEach(tagValue -> 
tagEntryToUsedClients.remove(new KeyValue<>(tagKey, tagValue)));
+            } else {
+                final String tagValue = usedClientTagEntry.getValue();
+                final KeyValue<String, String> tagEntry = new 
KeyValue<>(tagKey, tagValue);
+                final Set<ProcessId> clientsOnUsedTagValue = 
tagEntryToClients.get(tagEntry);
+                tagEntryToUsedClients.put(tagEntry, clientsOnUsedTagValue);
+            }
+        }
+    }
+
+    private static Function<KafkaStreamsState, Map<String, String>> 
createClientTagGetter(final ApplicationState applicationState) {
+        final boolean hasRackAwareAssignmentTags = 
!applicationState.assignmentConfigs().rackAwareAssignmentTags().isEmpty();
+        final boolean canPerformRackAwareOptimization = 
canPerformRackAwareOptimization(applicationState, AssignedTask.Type.STANDBY);
+
+        if (hasRackAwareAssignmentTags || !canPerformRackAwareOptimization) {
+            return KafkaStreamsState::clientTags;
+        } else {
+            return state -> mkMap(mkEntry("rack", state.rackId().get()));

Review Comment:
   IIUC this also relates back to [this whole 
thing](https://github.com/apache/kafka/pull/16129/files#r1619754391). So if we 
get rid of that, we should be able to simplify (or even outright remove) this 
method as well



##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##########
@@ -72,6 +83,27 @@ public static Map<ProcessId, KafkaStreamsAssignment> 
identityAssignment(final Ap
         return assignments;
     }
 
+    /**
+     * Assign standby tasks to KafkaStreams clients according to the default 
logic.
+     * <p>
+     * If rack-aware client tags are configured, the rack-aware standby task 
assignor will be used
+     *
+     * @param applicationState        the metadata and other info describing 
the current application state
+     * @param kafkaStreamsAssignments the current assignment of tasks to 
KafkaStreams clients
+     *
+     * @return a new map containing the mappings from KafkaStreamsAssignments 
updated with the default standby assignment
+     */
+    public static Map<ProcessId, KafkaStreamsAssignment> 
defaultStandbyTaskAssignment(final ApplicationState applicationState,
+                                                                               
       final Map<ProcessId, KafkaStreamsAssignment> kafkaStreamsAssignments) {
+        if 
(!applicationState.assignmentConfigs().rackAwareAssignmentTags().isEmpty()) {
+            return tagBasedStandbyTaskAssignment(applicationState, 
kafkaStreamsAssignments);
+        } else if (canPerformRackAwareOptimization(applicationState, 
AssignedTask.Type.STANDBY)) {
+            return tagBasedStandbyTaskAssignment(applicationState, 
kafkaStreamsAssignments);

Review Comment:
   This is driving me crazy, why add so much complexity with these fake "rack" 
tags if we never use it in non-testing code? I looked up the PR that added this 
whole thing in the first place 
([#14097](https://github.com/apache/kafka/pull/14097)) and the description 
makes it sound like the "rack" tags are supposed to be a real feature, 
essentially piggybacking on the older tag-based rack-aware assignment logic. 
Yet even in that PR, the only thing that actually uses it is from test code. So 
now I can't tell if it's just an oversight/bug, an unfinished feature that was 
forgotten about, or really truly just meant to make it easier to write tests. 
   
   Anyways I just need to ask Hao (guy who did the rack-aware optimization KIP) 
at this point. So in the meantime, until I hear back from him or one of the 
Confluent folks can answer for him, feel free to ignore all of my comments that 
relate to this (there are many since this resulted in a lot of branching logic 
and complexity) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to