vvcephei commented on a change in pull request #8696:
URL: https://github.com/apache/kafka/pull/8696#discussion_r427615915



##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
##########
@@ -338,7 +338,7 @@ private static void runRandomizedScenario(final long seed) {
                         throw new IllegalStateException("Unexpected event: " + 
event);
                 }
                 if (!harness.clientStates.isEmpty()) {
-                    testForConvergence(harness, configs, numStatefulTasks * 2);
+                    testForConvergence(harness, configs, 2 * (numStatefulTasks 
+ numStatefulTasks * numStandbyReplicas));

Review comment:
       Now that we're warming up standbys also, we need to relax the 
convergence limit.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
##########
@@ -57,14 +59,42 @@ public boolean assign(final Map<UUID, ClientState> clients,
             configs.numStandbyReplicas
         );
 
-        final boolean probingRebalanceNeeded = assignTaskMovements(
-            tasksToCaughtUpClients(statefulTasks, clientStates, 
configs.acceptableRecoveryLag),
+        final AtomicInteger remainingWarmupReplicas = new 
AtomicInteger(configs.maxWarmupReplicas);

Review comment:
       Moved the counter out here because we need to decrement it while 
assigning both active and standby warmups

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
##########
@@ -78,6 +80,64 @@
         /*probingRebalanceIntervalMs*/ 60 * 1000L
     );
 
+    @Test
+    public void shouldBeStickyForActiveAndStandbyTasksEvenIfNoWarmups() {

Review comment:
       First test for stickiness: we should be 100% sticky and also not 
schedule a probing rebalance when we are configured for no warmups.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
##########
@@ -78,6 +80,64 @@
         /*probingRebalanceIntervalMs*/ 60 * 1000L
     );
 
+    @Test
+    public void shouldBeStickyForActiveAndStandbyTasksEvenIfNoWarmups() {
+        final Set<TaskId> allTaskIds = mkSet(TASK_0_0, TASK_0_1, TASK_0_2, 
TASK_1_0, TASK_1_1, TASK_1_2, TASK_2_0, TASK_2_1, TASK_2_2);
+        final ClientState clientState1 = new ClientState(allTaskIds, 
emptySet(), allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 0L)), 1);
+        final ClientState clientState2 = new ClientState(emptySet(), 
allTaskIds, allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 10L)), 1);
+        final ClientState clientState3 = new ClientState(emptySet(), 
emptySet(), allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 
Long.MAX_VALUE)), 1);
+
+        final Map<UUID, ClientState> clientStates = mkMap(
+            mkEntry(UUID_1, clientState1),
+            mkEntry(UUID_2, clientState2),
+            mkEntry(UUID_3, clientState3)
+        );
+
+        final boolean unstable = new HighAvailabilityTaskAssignor().assign(
+            clientStates,
+            allTaskIds,
+            allTaskIds,
+            new AssignmentConfigs(11L, 0, 1, 0L)
+        );
+
+        assertThat(clientState1, hasAssignedTasks(allTaskIds.size()));
+
+        assertThat(clientState2, hasAssignedTasks(allTaskIds.size()));
+
+        assertThat(clientState3, hasAssignedTasks(0));
+
+        assertThat(unstable, is(false));
+    }
+
+    @Test
+    public void shouldBeStickyForActiveAndStandbyTasksWhileWarmingUp() {

Review comment:
       Main test case for stickiness: we should be sticky for standbys, and 
also schedule warmups.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##########
@@ -64,12 +66,10 @@ private static boolean 
taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(final Task
         return caughtUpClients == null || caughtUpClients.contains(client);
     }
 
-    /**
-     * @return whether any warmup replicas were assigned
-     */
-    static boolean assignTaskMovements(final Map<TaskId, SortedSet<UUID>> 
tasksToCaughtUpClients,
-                                       final Map<UUID, ClientState> 
clientStates,
-                                       final int maxWarmupReplicas) {
+    static int assignActiveTaskMovements(final Map<TaskId, SortedSet<UUID>> 
tasksToCaughtUpClients,

Review comment:
       I changed this to return an int just because it made stepping through 
the assignment in the debugger a bit easier to understand. It serves no 
algorithmic purpose.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java
##########
@@ -326,6 +326,10 @@ static TaskSkewReport analyzeTaskAssignmentBalance(final 
Map<UUID, ClientState>
         return new TaskSkewReport(maxTaskSkew, skewedSubtopologies, 
subtopologyToClientsWithPartition);
     }
 
+    static Matcher<ClientState> hasAssignedTasks(final int taskCount) {
+        return hasProperty("assignedTasks", ClientState::assignedTaskCount, 
taskCount);
+    }

Review comment:
       Similar to the other matchers, it just gives us mildly nicer test output.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
##########
@@ -57,14 +59,42 @@ public boolean assign(final Map<UUID, ClientState> clients,
             configs.numStandbyReplicas
         );
 
-        final boolean probingRebalanceNeeded = assignTaskMovements(
-            tasksToCaughtUpClients(statefulTasks, clientStates, 
configs.acceptableRecoveryLag),
+        final AtomicInteger remainingWarmupReplicas = new 
AtomicInteger(configs.maxWarmupReplicas);
+
+        final Map<TaskId, SortedSet<UUID>> tasksToCaughtUpClients = 
tasksToCaughtUpClients(
+            statefulTasks,
+            clientStates,
+            configs.acceptableRecoveryLag
+        );
+
+        // We temporarily need to know which standby tasks were intended as 
warmups
+        // for active tasks, so that we don't move them (again) when we plan 
standby
+        // task movements. We can then immediately treat warmups exactly the 
same as
+        // hot-standby replicas, so we just track it right here as metadata, 
rather
+        // than add "warmup" assignments to ClientState, for example.
+        final Map<UUID, Set<TaskId>> warmups = new TreeMap<>();
+
+        final int neededActiveTaskMovements = assignActiveTaskMovements(
+            tasksToCaughtUpClients,
             clientStates,
-            configs.maxWarmupReplicas
+            warmups,
+            remainingWarmupReplicas
+        );
+
+        final int neededStandbyTaskMovements = assignStandbyTaskMovements(
+            tasksToCaughtUpClients,
+            clientStates,
+            remainingWarmupReplicas,
+            warmups

Review comment:
       The mechanism by which we enforce "stickiness" is by assigning movements 
after computing the ideal assignment, so if we want standbys as well as actives 
to be sticky, we need to assign movements for those as well.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
##########
@@ -57,14 +59,42 @@ public boolean assign(final Map<UUID, ClientState> clients,
             configs.numStandbyReplicas
         );
 
-        final boolean probingRebalanceNeeded = assignTaskMovements(
-            tasksToCaughtUpClients(statefulTasks, clientStates, 
configs.acceptableRecoveryLag),
+        final AtomicInteger remainingWarmupReplicas = new 
AtomicInteger(configs.maxWarmupReplicas);
+
+        final Map<TaskId, SortedSet<UUID>> tasksToCaughtUpClients = 
tasksToCaughtUpClients(
+            statefulTasks,
+            clientStates,
+            configs.acceptableRecoveryLag
+        );
+
+        // We temporarily need to know which standby tasks were intended as 
warmups
+        // for active tasks, so that we don't move them (again) when we plan 
standby
+        // task movements. We can then immediately treat warmups exactly the 
same as
+        // hot-standby replicas, so we just track it right here as metadata, 
rather
+        // than add "warmup" assignments to ClientState, for example.
+        final Map<UUID, Set<TaskId>> warmups = new TreeMap<>();
+
+        final int neededActiveTaskMovements = assignActiveTaskMovements(
+            tasksToCaughtUpClients,
             clientStates,
-            configs.maxWarmupReplicas
+            warmups,
+            remainingWarmupReplicas
+        );
+
+        final int neededStandbyTaskMovements = assignStandbyTaskMovements(
+            tasksToCaughtUpClients,
+            clientStates,
+            remainingWarmupReplicas,
+            warmups
         );
 
         assignStatelessActiveTasks(clientStates, diff(TreeSet::new, 
allTaskIds, statefulTasks));
 
+        // We shouldn't plan a probing rebalance if we _needed_ task 
movements, but couldn't do any
+        // due to being configured for no warmups.

Review comment:
       One might wonder whether we should even allow "max_warmups := 0". I 
think this is actually ok, as someone might want to completely disable this 
state shuffling mechanism and instead just be 100% sticky. Also factoring into 
my thinking is that it's pretty obvious what will happen if you configure "no 
warmups", so I don't think it's going to hurt someone who didn't actually want 
to completely disable the warmup mechanism.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##########
@@ -132,22 +132,97 @@ static boolean assignTaskMovements(final Map<TaskId, 
SortedSet<UUID>> tasksToCau
         return movementsNeeded;
     }
 
+    static int assignStandbyTaskMovements(final Map<TaskId, SortedSet<UUID>> 
tasksToCaughtUpClients,

Review comment:
       This algorithm is similar to the active one, but there are also 
important differences, so I didn't converge them.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovementTest.java
##########
@@ -65,11 +67,13 @@ public void 
shouldAssignTasksToClientsAndReturnFalseWhenAllClientsCaughtUp() {
         final ClientState client3 = 
getClientStateWithActiveAssignment(asList(TASK_0_2, TASK_1_2));
 
         assertThat(
-            assignTaskMovements(
+            assignActiveTaskMovements(
                 tasksToCaughtUpClients,
                 getClientStatesMap(client1, client2, client3),
-                maxWarmupReplicas),
-            is(false)
+                new TreeMap<>(),
+                new AtomicInteger(maxWarmupReplicas)
+            ),
+            is(0)

Review comment:
       Just accommodating the new method signature, no semantic changes.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to