cadonna commented on a change in pull request #8541:
URL: https://github.com/apache/kafka/pull/8541#discussion_r415797071



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -713,23 +713,18 @@ private boolean assignTasksToClients(final Set<String> 
allSourceTopics,
             allTasks, clientStates, numStandbyReplicas());
 
         final TaskAssignor taskAssignor;
-        if (highAvailabilityEnabled) {
-            if (lagComputationSuccessful) {
-                taskAssignor = new HighAvailabilityTaskAssignor(
-                    clientStates,
-                    allTasks,
-                    statefulTasks,
-                    assignmentConfigs);
-            } else {
-                log.info("Failed to fetch end offsets for changelogs, will 
return previous assignment to clients and "
-                             + "trigger another rebalance to retry.");
-                setAssignmentErrorCode(AssignorError.REBALANCE_NEEDED.code());
-                taskAssignor = new StickyTaskAssignor(clientStates, allTasks, 
statefulTasks, assignmentConfigs, true);
-            }
+        if (!lagComputationSuccessful) {
+            log.info("Failed to fetch end offsets for changelogs, will return 
previous assignment to clients and "
+                         + "trigger another rebalance to retry.");
+            setAssignmentErrorCode(AssignorError.REBALANCE_NEEDED.code());
+            taskAssignor = new PriorTaskAssignor();
         } else {
-            taskAssignor = new StickyTaskAssignor(clientStates, allTasks, 
statefulTasks, assignmentConfigs, false);
+            taskAssignor = this.taskAssignor.get();
         }

Review comment:
       prop:
   Could we package this logic into a factory method to make the code more 
readable?
   
   ```
   final TaskAssignor taskAssignor = createTaskAssignor(boolean 
lagComputationSuccessful);
   ```

##########
File path: clients/src/main/java/org/apache/kafka/common/utils/Utils.java
##########
@@ -1146,4 +1146,13 @@ private static byte checkRange(final byte i) {
             }
         };
     }
+
+    @SafeVarargs
+    public static <E> Set<E> union(final Supplier<Set<E>> constructor, final 
Set<E>... set) {

Review comment:
       req: Please add unit tests for this method

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
##########
@@ -16,49 +16,50 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClientOrNoCaughtUpClientsExist;
-import static 
org.apache.kafka.streams.processor.internals.assignment.RankedClient.buildClientRankingsByTask;
-import static 
org.apache.kafka.streams.processor.internals.assignment.RankedClient.tasksToCaughtUpClients;
-import static 
org.apache.kafka.streams.processor.internals.assignment.TaskMovement.assignTaskMovements;
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.SortedMap;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.UUID;
 import java.util.stream.Collectors;
-import org.apache.kafka.streams.processor.TaskId;
-import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.util.Map;
-import java.util.Set;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClientOrNoCaughtUpClientsExist;
+import static 
org.apache.kafka.streams.processor.internals.assignment.RankedClient.buildClientRankingsByTask;
+import static 
org.apache.kafka.streams.processor.internals.assignment.RankedClient.tasksToCaughtUpClients;
+import static 
org.apache.kafka.streams.processor.internals.assignment.TaskMovement.assignTaskMovements;

Review comment:
       You are an exemplary boy scout!

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
##########
@@ -86,6 +90,22 @@ private ClientState(final Set<TaskId> activeTasks,
         this.capacity = capacity;
     }
 
+    public ClientState(final Set<TaskId> previousActiveTasks,

Review comment:
       req: Please add a unit test.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -713,23 +713,18 @@ private boolean assignTasksToClients(final Set<String> 
allSourceTopics,
             allTasks, clientStates, numStandbyReplicas());
 
         final TaskAssignor taskAssignor;
-        if (highAvailabilityEnabled) {
-            if (lagComputationSuccessful) {
-                taskAssignor = new HighAvailabilityTaskAssignor(
-                    clientStates,
-                    allTasks,
-                    statefulTasks,
-                    assignmentConfigs);
-            } else {
-                log.info("Failed to fetch end offsets for changelogs, will 
return previous assignment to clients and "
-                             + "trigger another rebalance to retry.");
-                setAssignmentErrorCode(AssignorError.REBALANCE_NEEDED.code());
-                taskAssignor = new StickyTaskAssignor(clientStates, allTasks, 
statefulTasks, assignmentConfigs, true);
-            }
+        if (!lagComputationSuccessful) {

Review comment:
       I see your point. What I do not like so much is that it is not very 
intuitive to require successful lag computation for sticky assignor. I 
understand that if lag computation is not successful other parts of Streams 
will fail, but it is not the responsibility of this class to avoid that. I 
think what I am trying to say is that the verifications should be done where 
they are required to make the code easily comprehensible. I am just imagining 
me coming back to this code and trying to understand why the lag computation 
must be successful for the sticky assignor.     

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
##########
@@ -350,20 +350,20 @@ public void 
shouldAssignMoreTasksToClientWithMoreCapacity() {
         createClient(UUID_2, 2);
         createClient(UUID_1, 1);
 
-        final StickyTaskAssignor taskAssignor = createTaskAssignor(TASK_0_0,
-                                                                            
TASK_0_1,
-                                                                            
TASK_0_2,
-                                                                            
new TaskId(1, 0),
-                                                                            
new TaskId(1, 1),
-                                                                            
new TaskId(1, 2),
-                                                                            
new TaskId(2, 0),
-                                                                            
new TaskId(2, 1),
-                                                                            
new TaskId(2, 2),
-                                                                            
new TaskId(3, 0),
-                                                                            
new TaskId(3, 1),
-                                                                            
new TaskId(3, 2));
-
-        taskAssignor.assign();
+        final boolean followupRebalanceNeeded = assign(TASK_0_0,
+                                                       TASK_0_1,
+                                                       TASK_0_2,
+                                                       new TaskId(1, 0),
+                                                       new TaskId(1, 1),
+                                                       new TaskId(1, 2),
+                                                       new TaskId(2, 0),
+                                                       new TaskId(2, 1),
+                                                       new TaskId(2, 2),
+                                                       new TaskId(3, 0),
+                                                       new TaskId(3, 1),
+                                                       new TaskId(3, 2));

Review comment:
       prop:
   ```suggestion
           final boolean followupRebalanceNeeded = assign(
               TASK_0_0,
               TASK_0_1,
               TASK_0_2,
               new TaskId(1, 0),
               new TaskId(1, 1),
               new TaskId(1, 2),
               new TaskId(2, 0),
               new TaskId(2, 1),
               new TaskId(2, 2),
               new TaskId(3, 0),
               new TaskId(3, 1),
               new TaskId(3, 2)
           );
   ```

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
##########
@@ -416,11 +416,10 @@ private static void testForConvergence(final Harness 
harness,
             iteration++;
             harness.prepareForNextRebalance();
             harness.recordBefore(iteration);
-            rebalancePending = new HighAvailabilityTaskAssignor(
-                harness.clientStates, allTasks,
-                harness.statefulTaskEndOffsetSums.keySet(),
-                configs
-            ).assign();
+            rebalancePending = new 
HighAvailabilityTaskAssignor().assign(harness.clientStates,
+                                                                         
allTasks,
+                                                                         
harness.statefulTaskEndOffsetSums.keySet(),
+                                                                         
configs);

Review comment:
       prop:
   ```suggestion
               rebalancePending = new HighAvailabilityTaskAssignor().assign(
                   harness.clientStates,
                   allTasks,
                   harness.statefulTaskEndOffsetSums.keySet(),
                   configs
               );
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to