ableegoldman commented on code in PR #16129: URL: https://github.com/apache/kafka/pull/16129#discussion_r1619732219
########## streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java: ########## @@ -278,7 +278,7 @@ public void processOptimizedAssignments(final Map<ProcessId, KafkaStreamsAssignm for (final Map.Entry<ProcessId, KafkaStreamsAssignment> entry : optimizedAssignments.entrySet()) { final ProcessId processId = entry.getKey(); - final Set<AssignedTask> assignedTasks = optimizedAssignments.get(processId).assignment(); + final Set<AssignedTask> assignedTasks = new HashSet<>(optimizedAssignments.get(processId).tasks().values()); Review Comment: This can be a followup PR, but one of the nice things about making KafkaStreamsAssignment mutable is that we should be able to get rid of the `newAssignments` field altogether and stop converting back and forth between the KafkaStreamsAssignment and raw Sets so that we can add/remove tasks. In addition to the obvious code simplification that should result from this change, it should save us a lot of copying things into various different data structures and reduce a lot of overhead ########## streams/src/main/java/org/apache/kafka/streams/processor/assignment/KafkaStreamsAssignment.java: ########## @@ -62,14 +64,24 @@ public static KafkaStreamsAssignment of(final ProcessId processId, final Set<Ass * @return a new KafkaStreamsAssignment object with the same processId and assignment but with the given rebalanceDeadline */ public KafkaStreamsAssignment withFollowupRebalance(final Instant rebalanceDeadline) { - return new KafkaStreamsAssignment(this.processId(), this.assignment(), Optional.of(rebalanceDeadline)); + return new KafkaStreamsAssignment(this.processId(), this.tasks(), Optional.of(rebalanceDeadline)); } private KafkaStreamsAssignment(final ProcessId processId, final Set<AssignedTask> assignment, final Optional<Instant> followupRebalanceDeadline) { + this( + processId, + assignment.stream().collect(Collectors.toMap(AssignedTask::id, Function.identity())), + followupRebalanceDeadline + ); + } + + private KafkaStreamsAssignment(final ProcessId processId, Review Comment: nit: seems kind of unnecessary to introduce yet another constructor for this, we can just inline things. ie ``` this.assignment = assignment.stream().collect(Collectors.toMap(AssignedTask::id, Function.identity())); ``` ########## streams/src/main/java/org/apache/kafka/streams/processor/assignment/KafkaStreamsAssignment.java: ########## @@ -83,16 +95,10 @@ public ProcessId processId() { /** * - * @return a set of assigned tasks that are part of this {@code KafkaStreamsAssignment} + * @return a read-only set of assigned tasks that are part of this {@code KafkaStreamsAssignment} */ - public Set<AssignedTask> assignment() { - // TODO change assignment to return a map so we aren't forced to copy this into a Set - return new HashSet<>(assignment.values()); - } - - // TODO: merge this with #assignment by having it return a Map<TaskId, AssignedTask> - public Set<TaskId> assignedTaskIds() { - return assignment.keySet(); + public Map<TaskId, AssignedTask> tasks() { + return unmodifiableMap(assignment); Review Comment: nit: should probably changed the field name too (ie `assignment` --> `tasks`) ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java: ########## @@ -489,10 +489,10 @@ public SortedMap<String, Set<TaskId>> taskIdsByPreviousConsumer() { } public void setAssignedTasks(final KafkaStreamsAssignment assignment) { - final Set<TaskId> activeTasks = assignment.assignment().stream() + final Set<TaskId> activeTasks = assignment.tasks().values().stream() .filter(task -> task.type() == ACTIVE).map(KafkaStreamsAssignment.AssignedTask::id) .collect(Collectors.toSet()); - final Set<TaskId> standbyTasks = assignment.assignment().stream() + final Set<TaskId> standbyTasks = assignment.tasks().values().stream() .filter(task -> task.type() == STANDBY).map(KafkaStreamsAssignment.AssignedTask::id) .collect(Collectors.toSet()); assignedActiveTasks.taskIds(activeTasks); Review Comment: nit: I keep getting confused by this and then realizing it's just a poorly-named method, can you rename it to `#setTaskIds` or something like that? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java: ########## @@ -785,12 +789,13 @@ private UserTaskAssignmentListener assignTasksToClients(final Cluster fullMetada ); final org.apache.kafka.streams.processor.assignment.TaskAssignor assignor = userTaskAssignor.get(); final TaskAssignment taskAssignment = assignor.assign(applicationState); - processStreamsPartitionAssignment(clientMetadataMap, taskAssignment); final AssignmentError assignmentError = validateTaskAssignment(applicationState, taskAssignment); - userTaskAssignmentListener = (GroupAssignment assignment, GroupSubscription subscription) -> { + processStreamsPartitionAssignment(clientMetadataMap, taskAssignment); + userTaskAssignmentListener = (assignment, subscription) -> { assignor.onAssignmentComputed(assignment, subscription, assignmentError); }; Review Comment: Really doesn't matter, this is just an fyi, but you don't need the brackets for one-line lambdas and we typically use the inline/bracket-less style when possible in Streams code ```suggestion userTaskAssignmentListener = (assignment, subscription) -> assignor.onAssignmentComputed(assignment, subscription, assignmentError) ``` ########## streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java: ########## @@ -244,18 +274,117 @@ public static Map<ProcessId, KafkaStreamsAssignment> optimizeRackAwareStandbyTas ); LOG.info("Assignment before standby task optimization has cost {}", initialCost); - throw new UnsupportedOperationException("Not yet Implemented."); + final MoveStandbyTaskPredicate moveablePredicate = getStandbyTaskMovePredicate(applicationState); + final BiFunction<KafkaStreamsAssignment, KafkaStreamsAssignment, List<TaskId>> getMovableTasks = (source, destination) -> { + return source.tasks().values().stream().filter(task -> task.type() == AssignedTask.Type.STANDBY) Review Comment: nit: put `.filter(task -> task.type() == AssignedTask.Type.STANDBY)` on a separate line ########## streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java: ########## @@ -244,18 +274,117 @@ public static Map<ProcessId, KafkaStreamsAssignment> optimizeRackAwareStandbyTas ); LOG.info("Assignment before standby task optimization has cost {}", initialCost); - throw new UnsupportedOperationException("Not yet Implemented."); + final MoveStandbyTaskPredicate moveablePredicate = getStandbyTaskMovePredicate(applicationState); + final BiFunction<KafkaStreamsAssignment, KafkaStreamsAssignment, List<TaskId>> getMovableTasks = (source, destination) -> { + return source.tasks().values().stream().filter(task -> task.type() == AssignedTask.Type.STANDBY) + .filter(task -> { + return !destination.tasks().containsKey(task.id()); + }) Review Comment: ```suggestion .filter(task -> !destination.tasks().containsKey(task.id())) ``` Just makes it a bit easier to read when we keep lambdas on one line by getting rid of the brackets, especially when we have a complex structure containing multiple steps with lambdas, nested lambda, etc ########## streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java: ########## @@ -72,6 +83,27 @@ public static Map<ProcessId, KafkaStreamsAssignment> identityAssignment(final Ap return assignments; } + /** + * Assign standby tasks to KafkaStreams clients according to the default logic. + * <p> + * If rack-aware client tags are configured, the rack-aware standby task assignor will be used + * + * @param applicationState the metadata and other info describing the current application state + * @param kafkaStreamsAssignments the current assignment of tasks to KafkaStreams clients + * + * @return a new map containing the mappings from KafkaStreamsAssignments updated with the default standby assignment + */ + public static Map<ProcessId, KafkaStreamsAssignment> defaultStandbyTaskAssignment(final ApplicationState applicationState, + final Map<ProcessId, KafkaStreamsAssignment> kafkaStreamsAssignments) { + if (!applicationState.assignmentConfigs().rackAwareAssignmentTags().isEmpty()) { + return tagBasedStandbyTaskAssignment(applicationState, kafkaStreamsAssignments); + } else if (canPerformRackAwareOptimization(applicationState, AssignedTask.Type.STANDBY)) { + return tagBasedStandbyTaskAssignment(applicationState, kafkaStreamsAssignments); Review Comment: I'm guessing this represents the middle conditional in the `StandbyTaskAssignorFactory#create` method? I was struggling to understand the point of that because how can you do tag-based rack-aware assignment with no rack-aware assignment tags (ie the first condition) Then I realized it's actually only ever hit when called from testing code. In the real code, the `rackAwareTaskAssignor` parameter is always null and therefore the middle condition (ie `if (rackAwareTaskAssignor != null && rackAwareTaskAssignor.validClientRack())`) is always false I have to assume this just exists to make it easier to hook into and/or set up from unit tests, but it's definitely bad practice to have conditions in production code that just exist for test cases. That or it's a bug/regression. Anyways, the tl;dr is I'm pretty sure we should remove these two lines and only have the if/else branches -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org