vvcephei commented on a change in pull request #8588:
URL: https://github.com/apache/kafka/pull/8588#discussion_r418390934



##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
##########
@@ -358,6 +369,57 @@ private static void runRandomizedScenario(final long seed) 
{
         }
     }
 
+    private static void verifyBalancedAssignment(final Harness harness, final 
int balanceFactor) {
+        final int activeDiff;
+        final int activeStatefulDiff;
+        final double activeStatelessPerThreadDiff;
+        final int assignedDiff;
+        final int standbyDiff;
+
+        {
+            final int maxActive = 
harness.clientStates.values().stream().map(ClientState::activeTaskCount).max(comparingInt(i
 -> i)).orElse(0);
+            final int minActive = 
harness.clientStates.values().stream().map(ClientState::activeTaskCount).min(comparingInt(i
 -> i)).orElse(0);
+            activeDiff = maxActive - minActive;
+        }
+        {
+            final int maxActiveStateful = 
harness.clientStates.values().stream().map(s -> diff(TreeSet::new, 
s.activeTasks(), harness.statelessTasks).size()).max(comparingInt(i -> 
i)).orElse(0);
+            final int minActiveStateful = 
harness.clientStates.values().stream().map(s -> diff(TreeSet::new, 
s.activeTasks(), harness.statelessTasks).size()).min(comparingInt(i -> 
i)).orElse(0);
+            activeStatefulDiff = maxActiveStateful - minActiveStateful;
+        }
+        {
+            final double maxActiveStatefulPerThread = 
harness.clientStates.values().stream().map(s1 -> 1.0 * 
intersection(TreeSet::new, s1.activeTasks(), harness.statelessTasks).size() / 
s1.capacity()).max(comparingDouble(i -> i)).orElse(0.0);
+            final double minActiveStatefulPerThread = 
harness.clientStates.values().stream().map(s -> 1.0 * 
intersection(TreeSet::new, s.activeTasks(), harness.statelessTasks).size() / 
s.capacity()).min(comparingDouble(i -> i)).orElse(0.0);
+            activeStatelessPerThreadDiff = maxActiveStatefulPerThread - 
minActiveStatefulPerThread;
+        }
+        {
+            final int maxAssigned = 
harness.clientStates.values().stream().map(ClientState::assignedTaskCount).max(comparingInt(i
 -> i)).orElse(0);
+            final int minAssigned = 
harness.clientStates.values().stream().map(ClientState::assignedTaskCount).min(comparingInt(i
 -> i)).orElse(0);
+            assignedDiff = maxAssigned - minAssigned;
+        }
+        {
+            final int maxStandby = 
harness.clientStates.values().stream().map(ClientState::standbyTaskCount).max(comparingInt(i
 -> i)).orElse(0);
+            final int minStandby = 
harness.clientStates.values().stream().map(ClientState::standbyTaskCount).min(comparingInt(i
 -> i)).orElse(0);
+            standbyDiff = maxStandby - minStandby;
+        }
+
+        final Map<String, ? extends Number> results = new TreeMap<>(mkMap(
+            mkEntry("activeDiff", activeDiff),
+            mkEntry("activeStatefulDiff", activeStatefulDiff),
+            mkEntry("activeStatelessPerThreadDiff", 
activeStatelessPerThreadDiff),

Review comment:
       Nailed it! This is what I was working up toward. It seems dangerous to 
consider only the number of threads when assigning stateful tasks. We actually 
don't know how much disk is available, but it seems more reasonable to assume 
each computer has about the same amount of disk than that each thread has about 
the same amount of disk.
   
   So, now (and this may be a departure from past conversations), I'm thinking 
perhaps we should do the following:
   1. round-robin assign active stateful tasks over the _clients_ that have 
spare capacity (aka threads). Once no client has spare capacity, then we just 
round-robin over all clients
   2. do the same with each standby replica. If the colocation constraint is 
violated, we just skip the client and keep going. If my intuition serves, this 
should still result in a diff of no more than one between clients.
   3. do the same with stateless tasks. The one trick is that for the stateless 
tasks, we start round-robining on the first node after we left off with the 
active stateful tasks. Assuming the round-robin algorithm above, this should 
ultimately produce an assignment that is just as balanced as picking the least 
loaded client for each task.
   
   And we would always sort the tasks by subtopology first, then by partition, 
so that we still get workload parallelism. Actually, I should add this to the 
validation.
   
   I'll update the PR tomorrow with these ideas, so you can see them in code 
form, since words kind of suck for this kind of thing.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to