ableegoldman commented on a change in pull request #8588: URL: https://github.com/apache/kafka/pull/8588#discussion_r418267732
########## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java ########## @@ -358,6 +369,57 @@ private static void runRandomizedScenario(final long seed) { } } + private static void verifyBalancedAssignment(final Harness harness, final int balanceFactor) { + final int activeDiff; + final int activeStatefulDiff; + final double activeStatelessPerThreadDiff; + final int assignedDiff; + final int standbyDiff; + + { + final int maxActive = harness.clientStates.values().stream().map(ClientState::activeTaskCount).max(comparingInt(i -> i)).orElse(0); + final int minActive = harness.clientStates.values().stream().map(ClientState::activeTaskCount).min(comparingInt(i -> i)).orElse(0); + activeDiff = maxActive - minActive; + } + { + final int maxActiveStateful = harness.clientStates.values().stream().map(s -> diff(TreeSet::new, s.activeTasks(), harness.statelessTasks).size()).max(comparingInt(i -> i)).orElse(0); + final int minActiveStateful = harness.clientStates.values().stream().map(s -> diff(TreeSet::new, s.activeTasks(), harness.statelessTasks).size()).min(comparingInt(i -> i)).orElse(0); + activeStatefulDiff = maxActiveStateful - minActiveStateful; + } + { + final double maxActiveStatefulPerThread = harness.clientStates.values().stream().map(s1 -> 1.0 * intersection(TreeSet::new, s1.activeTasks(), harness.statelessTasks).size() / s1.capacity()).max(comparingDouble(i -> i)).orElse(0.0); Review comment: The HATA originally tried to balance each type of task individually (ie stateful active, standby, stateless active) and IIRC you made a convincing argument against doing that during the review and for balancing only the total task load. What's the rationale for enforcing this now? Or did I misremember and/or misinterpret your earlier point ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org