ableegoldman commented on a change in pull request #8588:
URL: https://github.com/apache/kafka/pull/8588#discussion_r418346499



##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
##########
@@ -358,6 +369,57 @@ private static void runRandomizedScenario(final long seed) 
{
         }
     }
 
+    private static void verifyBalancedAssignment(final Harness harness, final 
int balanceFactor) {
+        final int activeDiff;
+        final int activeStatefulDiff;
+        final double activeStatelessPerThreadDiff;
+        final int assignedDiff;
+        final int standbyDiff;
+
+        {
+            final int maxActive = 
harness.clientStates.values().stream().map(ClientState::activeTaskCount).max(comparingInt(i
 -> i)).orElse(0);
+            final int minActive = 
harness.clientStates.values().stream().map(ClientState::activeTaskCount).min(comparingInt(i
 -> i)).orElse(0);
+            activeDiff = maxActive - minActive;
+        }
+        {
+            final int maxActiveStateful = 
harness.clientStates.values().stream().map(s -> diff(TreeSet::new, 
s.activeTasks(), harness.statelessTasks).size()).max(comparingInt(i -> 
i)).orElse(0);
+            final int minActiveStateful = 
harness.clientStates.values().stream().map(s -> diff(TreeSet::new, 
s.activeTasks(), harness.statelessTasks).size()).min(comparingInt(i -> 
i)).orElse(0);
+            activeStatefulDiff = maxActiveStateful - minActiveStateful;
+        }
+        {
+            final double maxActiveStatefulPerThread = 
harness.clientStates.values().stream().map(s1 -> 1.0 * 
intersection(TreeSet::new, s1.activeTasks(), harness.statelessTasks).size() / 
s1.capacity()).max(comparingDouble(i -> i)).orElse(0.0);
+            final double minActiveStatefulPerThread = 
harness.clientStates.values().stream().map(s -> 1.0 * 
intersection(TreeSet::new, s.activeTasks(), harness.statelessTasks).size() / 
s.capacity()).min(comparingDouble(i -> i)).orElse(0.0);
+            activeStatelessPerThreadDiff = maxActiveStatefulPerThread - 
minActiveStatefulPerThread;
+        }
+        {
+            final int maxAssigned = 
harness.clientStates.values().stream().map(ClientState::assignedTaskCount).max(comparingInt(i
 -> i)).orElse(0);
+            final int minAssigned = 
harness.clientStates.values().stream().map(ClientState::assignedTaskCount).min(comparingInt(i
 -> i)).orElse(0);
+            assignedDiff = maxAssigned - minAssigned;
+        }
+        {
+            final int maxStandby = 
harness.clientStates.values().stream().map(ClientState::standbyTaskCount).max(comparingInt(i
 -> i)).orElse(0);
+            final int minStandby = 
harness.clientStates.values().stream().map(ClientState::standbyTaskCount).min(comparingInt(i
 -> i)).orElse(0);
+            standbyDiff = maxStandby - minStandby;
+        }
+
+        final Map<String, ? extends Number> results = new TreeMap<>(mkMap(
+            mkEntry("activeDiff", activeDiff),
+            mkEntry("activeStatefulDiff", activeStatefulDiff),
+            mkEntry("activeStatelessPerThreadDiff", 
activeStatelessPerThreadDiff),

Review comment:
       Ok, this gives me an idea of where you're coming from w.r.t client-level 
balance. I was thinking that we should scale the entire task load with the 
thread capacity, but that only makes sense when considering some resources. 
Mainly (or only?) cpu, which I suppose it unlikely to be the bottleneck or 
resource constraint in a stateful application. Of course, it would still be for 
stateless tasks. So I guess I do see that we might want to balance stateless 
tasks at a thread level, and anything stateful at the client-level where IO is 
more likely to be the constraint.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to