ableegoldman commented on code in PR #16129:
URL: https://github.com/apache/kafka/pull/16129#discussion_r1619732219


##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java:
##########
@@ -278,7 +278,7 @@ public void processOptimizedAssignments(final 
Map<ProcessId, KafkaStreamsAssignm
 
             for (final Map.Entry<ProcessId, KafkaStreamsAssignment> entry : 
optimizedAssignments.entrySet()) {
                 final ProcessId processId = entry.getKey();
-                final Set<AssignedTask> assignedTasks = 
optimizedAssignments.get(processId).assignment();
+                final Set<AssignedTask> assignedTasks = new 
HashSet<>(optimizedAssignments.get(processId).tasks().values());

Review Comment:
   This can be a followup PR, but one of the nice things about making 
KafkaStreamsAssignment mutable is that we should be able to get rid of the 
`newAssignments` field altogether and stop converting back and forth between 
the KafkaStreamsAssignment and raw Sets so that we can add/remove tasks. 
   
   In addition to the obvious code simplification that should result from this 
change, it should save us a lot of copying things into various different data 
structures and reduce a lot of overhead



##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/KafkaStreamsAssignment.java:
##########
@@ -62,14 +64,24 @@ public static KafkaStreamsAssignment of(final ProcessId 
processId, final Set<Ass
      * @return a new KafkaStreamsAssignment object with the same processId and 
assignment but with the given rebalanceDeadline
      */
     public KafkaStreamsAssignment withFollowupRebalance(final Instant 
rebalanceDeadline) {
-        return new KafkaStreamsAssignment(this.processId(), this.assignment(), 
Optional.of(rebalanceDeadline));
+        return new KafkaStreamsAssignment(this.processId(), this.tasks(), 
Optional.of(rebalanceDeadline));
     }
 
     private KafkaStreamsAssignment(final ProcessId processId,
                                    final Set<AssignedTask> assignment,
                                    final Optional<Instant> 
followupRebalanceDeadline) {
+        this(
+            processId,
+            assignment.stream().collect(Collectors.toMap(AssignedTask::id, 
Function.identity())),
+            followupRebalanceDeadline
+        );
+    }
+
+    private KafkaStreamsAssignment(final ProcessId processId,

Review Comment:
   nit: seems kind of unnecessary to introduce yet another constructor for 
this, we can just inline things. ie
   
   ```
   this.assignment = 
assignment.stream().collect(Collectors.toMap(AssignedTask::id, 
Function.identity()));
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/KafkaStreamsAssignment.java:
##########
@@ -83,16 +95,10 @@ public ProcessId processId() {
 
     /**
      *
-     * @return a set of assigned tasks that are part of this {@code 
KafkaStreamsAssignment}
+     * @return a read-only set of assigned tasks that are part of this {@code 
KafkaStreamsAssignment}
      */
-    public Set<AssignedTask> assignment() {
-        // TODO change assignment to return a map so we aren't forced to copy 
this into a Set
-        return new HashSet<>(assignment.values());
-    }
-
-    // TODO: merge this with #assignment by having it return a Map<TaskId, 
AssignedTask>
-    public Set<TaskId> assignedTaskIds() {
-        return assignment.keySet();
+    public Map<TaskId, AssignedTask> tasks() {
+        return unmodifiableMap(assignment);

Review Comment:
   nit: should probably changed the field name too (ie `assignment` --> `tasks`)



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java:
##########
@@ -489,10 +489,10 @@ public SortedMap<String, Set<TaskId>> 
taskIdsByPreviousConsumer() {
     }
 
     public void setAssignedTasks(final KafkaStreamsAssignment assignment) {
-        final Set<TaskId> activeTasks = assignment.assignment().stream()
+        final Set<TaskId> activeTasks = assignment.tasks().values().stream()
             .filter(task -> task.type() == 
ACTIVE).map(KafkaStreamsAssignment.AssignedTask::id)
             .collect(Collectors.toSet());
-        final Set<TaskId> standbyTasks = assignment.assignment().stream()
+        final Set<TaskId> standbyTasks = assignment.tasks().values().stream()
             .filter(task -> task.type() == 
STANDBY).map(KafkaStreamsAssignment.AssignedTask::id)
             .collect(Collectors.toSet());
         assignedActiveTasks.taskIds(activeTasks);

Review Comment:
   nit: I keep getting confused by this and then realizing it's just a 
poorly-named method, can you rename it to `#setTaskIds` or something like that?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -785,12 +789,13 @@ private UserTaskAssignmentListener 
assignTasksToClients(final Cluster fullMetada
             );
             final org.apache.kafka.streams.processor.assignment.TaskAssignor 
assignor = userTaskAssignor.get();
             final TaskAssignment taskAssignment = 
assignor.assign(applicationState);
-            processStreamsPartitionAssignment(clientMetadataMap, 
taskAssignment);
             final AssignmentError assignmentError = 
validateTaskAssignment(applicationState, taskAssignment);
-            userTaskAssignmentListener = (GroupAssignment assignment, 
GroupSubscription subscription) -> {
+            processStreamsPartitionAssignment(clientMetadataMap, 
taskAssignment);
+            userTaskAssignmentListener = (assignment, subscription) -> {
                 assignor.onAssignmentComputed(assignment, subscription, 
assignmentError);
             };

Review Comment:
   Really doesn't matter, this is just an fyi, but you don't need the brackets 
for one-line lambdas and we typically use the inline/bracket-less style when 
possible in Streams code
   
   ```suggestion
               userTaskAssignmentListener = (assignment, subscription) -> 
assignor.onAssignmentComputed(assignment, subscription, assignmentError)
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##########
@@ -244,18 +274,117 @@ public static Map<ProcessId, KafkaStreamsAssignment> 
optimizeRackAwareStandbyTas
         );
         LOG.info("Assignment before standby task optimization has cost {}", 
initialCost);
 
-        throw new UnsupportedOperationException("Not yet Implemented.");
+        final MoveStandbyTaskPredicate moveablePredicate = 
getStandbyTaskMovePredicate(applicationState);
+        final BiFunction<KafkaStreamsAssignment, KafkaStreamsAssignment, 
List<TaskId>> getMovableTasks = (source, destination) -> {
+            return source.tasks().values().stream().filter(task -> task.type() 
== AssignedTask.Type.STANDBY)

Review Comment:
   nit: put `.filter(task -> task.type() == AssignedTask.Type.STANDBY)` on a 
separate line



##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##########
@@ -244,18 +274,117 @@ public static Map<ProcessId, KafkaStreamsAssignment> 
optimizeRackAwareStandbyTas
         );
         LOG.info("Assignment before standby task optimization has cost {}", 
initialCost);
 
-        throw new UnsupportedOperationException("Not yet Implemented.");
+        final MoveStandbyTaskPredicate moveablePredicate = 
getStandbyTaskMovePredicate(applicationState);
+        final BiFunction<KafkaStreamsAssignment, KafkaStreamsAssignment, 
List<TaskId>> getMovableTasks = (source, destination) -> {
+            return source.tasks().values().stream().filter(task -> task.type() 
== AssignedTask.Type.STANDBY)
+                .filter(task -> {
+                    return !destination.tasks().containsKey(task.id());
+                })

Review Comment:
   ```suggestion
                   .filter(task -> !destination.tasks().containsKey(task.id()))
   ```
   Just makes it a bit easier to read when we keep lambdas on one line by 
getting rid of the brackets, especially when we have a complex structure 
containing multiple steps with lambdas, nested lambda, etc 



##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##########
@@ -72,6 +83,27 @@ public static Map<ProcessId, KafkaStreamsAssignment> 
identityAssignment(final Ap
         return assignments;
     }
 
+    /**
+     * Assign standby tasks to KafkaStreams clients according to the default 
logic.
+     * <p>
+     * If rack-aware client tags are configured, the rack-aware standby task 
assignor will be used
+     *
+     * @param applicationState        the metadata and other info describing 
the current application state
+     * @param kafkaStreamsAssignments the current assignment of tasks to 
KafkaStreams clients
+     *
+     * @return a new map containing the mappings from KafkaStreamsAssignments 
updated with the default standby assignment
+     */
+    public static Map<ProcessId, KafkaStreamsAssignment> 
defaultStandbyTaskAssignment(final ApplicationState applicationState,
+                                                                               
       final Map<ProcessId, KafkaStreamsAssignment> kafkaStreamsAssignments) {
+        if 
(!applicationState.assignmentConfigs().rackAwareAssignmentTags().isEmpty()) {
+            return tagBasedStandbyTaskAssignment(applicationState, 
kafkaStreamsAssignments);
+        } else if (canPerformRackAwareOptimization(applicationState, 
AssignedTask.Type.STANDBY)) {
+            return tagBasedStandbyTaskAssignment(applicationState, 
kafkaStreamsAssignments);

Review Comment:
   I'm guessing this represents the middle conditional in the 
`StandbyTaskAssignorFactory#create` method? I was struggling to understand the 
point of that because how can you do tag-based rack-aware assignment with no 
rack-aware assignment tags (ie the first condition)
   
   Then I realized it's actually only ever hit when called from testing code. 
In the real code, the `rackAwareTaskAssignor` parameter is always null and 
therefore the middle condition (ie `if (rackAwareTaskAssignor != null && 
rackAwareTaskAssignor.validClientRack())`) is always false
   
   I have to assume this just exists to make it easier to hook into and/or set 
up from unit tests, but it's definitely bad practice to have conditions in 
production code that just exist for test cases. That or it's a bug/regression. 
Anyways, the tl;dr is I'm pretty sure we should remove these two lines and only 
have the if/else branches



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to