This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 620a01b74bd KAFKA-19661 [4/N]: Prefer range-style assignment (#20486)
620a01b74bd is described below

commit 620a01b74bd8bbaf42eff002dd0a0692123ec2d5
Author: Lucas Brutschy <[email protected]>
AuthorDate: Tue Sep 9 10:44:37 2025 +0200

    KAFKA-19661 [4/N]: Prefer range-style assignment (#20486)
    
    This is actually fixing a difference between the old and the new
    assignor. Given the assignment ordering, the legacy assignor has a
    preference for range-style assignments built in, that is, assigning
    
    C1: 0_0, 1_0  C2: 0_1, 1_1
    
    instead of
    
    C1: 0_0, 0_1  C2: 1_0, 1_1
    
    We add tests to both assignors to check for this behavior, and improve
    the new assingor by enforcing corresponding orderings.
    
    Reviewers: Bill Bejeck <[email protected]>
---
 .../group/streams/assignor/StickyTaskAssignor.java |  30 +++--
 .../streams/assignor/StickyTaskAssignorTest.java   | 142 +++++++++++++++++++++
 .../assignment/LegacyStickyTaskAssignorTest.java   | 113 ++++++++++++++++
 .../tools/streams/DescribeStreamsGroupTest.java    |  12 +-
 4 files changed, 283 insertions(+), 14 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java
index 7ef5a382584..fc29f93b883 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java
@@ -25,6 +25,7 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.Map;
 import java.util.Optional;
 import java.util.PriorityQueue;
@@ -53,8 +54,7 @@ public class StickyTaskAssignor implements TaskAssignor {
     }
 
     private GroupAssignment doAssign(final GroupSpec groupSpec, final 
TopologyDescriber topologyDescriber) {
-        //active
-        final Set<TaskId> activeTasks = taskIds(topologyDescriber, true);
+        final LinkedList<TaskId> activeTasks = taskIds(topologyDescriber, 
true);
         assignActive(activeTasks);
 
         //standby
@@ -62,15 +62,15 @@ public class StickyTaskAssignor implements TaskAssignor {
             groupSpec.assignmentConfigs().isEmpty() ? 0
                 : 
Integer.parseInt(groupSpec.assignmentConfigs().get("num.standby.replicas"));
         if (numStandbyReplicas > 0) {
-            final Set<TaskId> statefulTasks = taskIds(topologyDescriber, 
false);
+            final LinkedList<TaskId> statefulTasks = 
taskIds(topologyDescriber, false);
             assignStandby(statefulTasks, numStandbyReplicas);
         }
 
         return buildGroupAssignment(groupSpec.members().keySet());
     }
 
-    private Set<TaskId> taskIds(final TopologyDescriber topologyDescriber, 
final boolean isActive) {
-        final Set<TaskId> ret = new HashSet<>();
+    private LinkedList<TaskId> taskIds(final TopologyDescriber 
topologyDescriber, final boolean isActive) {
+        final LinkedList<TaskId> ret = new LinkedList<>();
         for (final String subtopology : topologyDescriber.subtopologies()) {
             if (isActive || topologyDescriber.isStateful(subtopology)) {
                 final int numberOfPartitions = 
topologyDescriber.maxNumInputPartitions(subtopology);
@@ -166,7 +166,10 @@ public class StickyTaskAssignor implements TaskAssignor {
         return ret;
     }
 
-    private void assignActive(final Set<TaskId> activeTasks) {
+    private void assignActive(final LinkedList<TaskId> activeTasks) {
+
+        // Assuming our current assignment pairs same partitions 
(range-based), we want to sort by partition first
+        
activeTasks.sort(Comparator.comparing(TaskId::partition).thenComparing(TaskId::subtopologyId));
 
         // 1. re-assigning existing active tasks to clients that previously 
had the same active tasks
         for (final Iterator<TaskId> it = activeTasks.iterator(); 
it.hasNext();) {
@@ -193,6 +196,9 @@ public class StickyTaskAssignor implements TaskAssignor {
             }
         }
 
+        // To achieve an initially range-based assignment, sort by subtopology
+        
activeTasks.sort(Comparator.comparing(TaskId::subtopologyId).thenComparing(TaskId::partition));
+
         // 3. assign any remaining unassigned tasks
         final PriorityQueue<ProcessState> processByLoad = new 
PriorityQueue<>(Comparator.comparingDouble(ProcessState::load));
         processByLoad.addAll(localState.processIdToState.values());
@@ -296,9 +302,13 @@ public class StickyTaskAssignor implements TaskAssignor {
         return 
localState.processIdToState.get(member.processId).memberToTaskCounts().get(member.memberId)
 < localState.tasksPerMember;
     }
 
-    private void assignStandby(final Set<TaskId> standbyTasks, final int 
numStandbyReplicas) {
+    private void assignStandby(final LinkedList<TaskId> standbyTasks, int 
numStandbyReplicas) {
         final ArrayList<StandbyToAssign> toLeastLoaded = new 
ArrayList<>(standbyTasks.size() * numStandbyReplicas);
-        for (final TaskId task : standbyTasks) {
+
+        // Assuming our current assignment is range-based, we want to sort by 
partition first.
+        
standbyTasks.sort(Comparator.comparing(TaskId::partition).thenComparing(TaskId::subtopologyId).reversed());
+
+        for (TaskId task : standbyTasks) {
             for (int i = 0; i < numStandbyReplicas; i++) {
 
                 // prev active task
@@ -329,6 +339,10 @@ public class StickyTaskAssignor implements TaskAssignor {
             }
         }
 
+        // To achieve a range-based assignment, sort by subtopology
+        toLeastLoaded.sort(Comparator.<StandbyToAssign, String>comparing(x -> 
x.taskId.subtopologyId())
+            .thenComparing(x -> x.taskId.partition()).reversed());
+
         final PriorityQueue<ProcessState> processByLoad = new 
PriorityQueue<>(Comparator.comparingDouble(ProcessState::load));
         processByLoad.addAll(localState.processIdToState.values());
         for (final StandbyToAssign toAssign : toLeastLoaded) {
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignorTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignorTest.java
index b4fa9c4db99..1e9d4115cb2 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignorTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignorTest.java
@@ -1091,6 +1091,148 @@ public class StickyTaskAssignorTest {
         assertEquals(numTasks, allStandbyTasks.size());
     }
 
+    @Test
+    public void 
shouldReassignTasksWhenNewNodeJoinsWithExistingActiveAndStandbyAssignments() {
+        // Initial setup: Node 1 has active tasks 0,1 and standby tasks 2,3
+        // Node 2 has active tasks 2,3 and standby tasks 0,1
+        final AssignmentMemberSpec memberSpec1 = 
createAssignmentMemberSpec("process1",
+            mkMap(mkEntry("test-subtopology", Sets.newSet(0, 1))),
+            mkMap(mkEntry("test-subtopology", Sets.newSet(2, 3))));
+
+        final AssignmentMemberSpec memberSpec2 = 
createAssignmentMemberSpec("process2",
+            mkMap(mkEntry("test-subtopology", Sets.newSet(2, 3))),
+            mkMap(mkEntry("test-subtopology", Sets.newSet(0, 1))));
+
+        // Node 3 joins as new client
+        final AssignmentMemberSpec memberSpec3 = 
createAssignmentMemberSpec("process3");
+
+        final Map<String, AssignmentMemberSpec> members = mkMap(
+            mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2), 
mkEntry("member3", memberSpec3));
+
+        final GroupAssignment result = assignor.assign(
+            new GroupSpecImpl(members, 
mkMap(mkEntry(NUM_STANDBY_REPLICAS_CONFIG, "1"))),
+            new TopologyDescriberImpl(4, true, List.of("test-subtopology"))
+        );
+
+        // Verify all active tasks are assigned
+        final Set<Integer> allAssignedActiveTasks = new HashSet<>();
+        allAssignedActiveTasks.addAll(getAllActiveTaskIds(result, "member1"));
+        allAssignedActiveTasks.addAll(getAllActiveTaskIds(result, "member2"));
+        allAssignedActiveTasks.addAll(getAllActiveTaskIds(result, "member3"));
+        assertEquals(Sets.newSet(0, 1, 2, 3), allAssignedActiveTasks);
+
+        // Verify all standby tasks are assigned
+        final Set<Integer> allAssignedStandbyTasks = new HashSet<>();
+        allAssignedStandbyTasks.addAll(getAllStandbyTaskIds(result, 
"member1"));
+        allAssignedStandbyTasks.addAll(getAllStandbyTaskIds(result, 
"member2"));
+        allAssignedStandbyTasks.addAll(getAllStandbyTaskIds(result, 
"member3"));
+        assertEquals(Sets.newSet(0, 1, 2, 3), allAssignedStandbyTasks);
+
+        // Verify each member has 1-2 active tasks and at most 3 tasks total
+        assertTrue(getAllActiveTaskIds(result, "member1").size() >= 1 && 
getAllActiveTaskIds(result, "member1").size() <= 2);
+        assertTrue(getAllActiveTaskIds(result, "member1").size() + 
getAllStandbyTaskIds(result, "member1").size() <= 3);
+
+        assertTrue(getAllActiveTaskIds(result, "member2").size() >= 1 && 
getAllActiveTaskIds(result, "member2").size() <= 2);
+        assertTrue(getAllActiveTaskIds(result, "member2").size() + 
getAllStandbyTaskIds(result, "member2").size() <= 3);
+
+        assertTrue(getAllActiveTaskIds(result, "member3").size() >= 1 && 
getAllActiveTaskIds(result, "member3").size() <= 2);
+        assertTrue(getAllActiveTaskIds(result, "member3").size() + 
getAllStandbyTaskIds(result, "member3").size() <= 3);
+    }
+
+    @Test
+    public void shouldRangeAssignTasksWhenScalingUp() {
+        // Two clients, the second one is new
+        final AssignmentMemberSpec memberSpec1 = 
createAssignmentMemberSpec("process1",
+            Map.of("test-subtopology1", Set.of(0, 1), "test-subtopology2", 
Set.of(0, 1)),
+            Map.of());
+        final AssignmentMemberSpec memberSpec2 = 
createAssignmentMemberSpec("process2");
+        final Map<String, AssignmentMemberSpec> members = mkMap(
+            mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2));
+
+        // Two subtopologies with 2 tasks each (4 tasks total) with standby 
replicas enabled
+        final GroupAssignment result = assignor.assign(
+            new GroupSpecImpl(members, 
mkMap(mkEntry(NUM_STANDBY_REPLICAS_CONFIG, String.valueOf(1)))),
+            new TopologyDescriberImpl(2, true, 
Arrays.asList("test-subtopology1", "test-subtopology2"))
+        );
+
+        // Each client should get one task from each subtopology
+        final MemberAssignment testMember1 = result.members().get("member1");
+        assertNotNull(testMember1);
+        assertEquals(1, 
testMember1.activeTasks().get("test-subtopology1").size());
+        assertEquals(1, 
testMember1.activeTasks().get("test-subtopology2").size());
+        
+        final MemberAssignment testMember2 = result.members().get("member2");
+        assertNotNull(testMember2);
+        assertEquals(1, 
testMember2.activeTasks().get("test-subtopology1").size());
+        assertEquals(1, 
testMember2.activeTasks().get("test-subtopology2").size());
+        
+        // Verify all tasks are assigned exactly once
+        final Set<Integer> allSubtopology1Tasks = new HashSet<>();
+        
allSubtopology1Tasks.addAll(testMember1.activeTasks().get("test-subtopology1"));
+        
allSubtopology1Tasks.addAll(testMember2.activeTasks().get("test-subtopology1"));
+        assertEquals(Sets.newSet(0, 1), allSubtopology1Tasks);
+        
+        final Set<Integer> allSubtopology2Tasks = new HashSet<>();
+        
allSubtopology2Tasks.addAll(testMember1.activeTasks().get("test-subtopology2"));
+        
allSubtopology2Tasks.addAll(testMember2.activeTasks().get("test-subtopology2"));
+        assertEquals(Sets.newSet(0, 1), allSubtopology2Tasks);
+
+        // Each client should get one task from each subtopology
+        assertNotNull(testMember1);
+        assertEquals(1, 
testMember1.standbyTasks().get("test-subtopology1").size());
+        assertEquals(1, 
testMember1.standbyTasks().get("test-subtopology2").size());
+
+        assertNotNull(testMember2);
+        assertEquals(1, 
testMember2.standbyTasks().get("test-subtopology1").size());
+        assertEquals(1, 
testMember2.standbyTasks().get("test-subtopology2").size());
+    }
+
+    @Test
+    public void shouldRangeAssignTasksWhenStartingEmpty() {
+        // Two clients starting empty (no previous tasks)
+        final AssignmentMemberSpec memberSpec1 = 
createAssignmentMemberSpec("process1");
+        final AssignmentMemberSpec memberSpec2 = 
createAssignmentMemberSpec("process2");
+        final Map<String, AssignmentMemberSpec> members = mkMap(
+            mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2));
+
+        // Two subtopologies with 2 tasks each (4 tasks total) with standby 
replicas enabled
+        final GroupAssignment result = assignor.assign(
+            new GroupSpecImpl(members, 
mkMap(mkEntry(NUM_STANDBY_REPLICAS_CONFIG, String.valueOf(1)))),
+            new TopologyDescriberImpl(2, true, 
Arrays.asList("test-subtopology1", "test-subtopology2"))
+        );
+
+        // Each client should get one task from each subtopology
+        final MemberAssignment testMember1 = result.members().get("member1");
+        assertNotNull(testMember1);
+        assertEquals(1, 
testMember1.activeTasks().get("test-subtopology1").size());
+        assertEquals(1, 
testMember1.activeTasks().get("test-subtopology2").size());
+
+        final MemberAssignment testMember2 = result.members().get("member2");
+        assertNotNull(testMember2);
+        assertEquals(1, 
testMember2.activeTasks().get("test-subtopology1").size());
+        assertEquals(1, 
testMember2.activeTasks().get("test-subtopology2").size());
+
+        // Verify all tasks are assigned exactly once
+        final Set<Integer> allSubtopology1Tasks = new HashSet<>();
+        
allSubtopology1Tasks.addAll(testMember1.activeTasks().get("test-subtopology1"));
+        
allSubtopology1Tasks.addAll(testMember2.activeTasks().get("test-subtopology1"));
+        assertEquals(Sets.newSet(0, 1), allSubtopology1Tasks);
+
+        final Set<Integer> allSubtopology2Tasks = new HashSet<>();
+        
allSubtopology2Tasks.addAll(testMember1.activeTasks().get("test-subtopology2"));
+        
allSubtopology2Tasks.addAll(testMember2.activeTasks().get("test-subtopology2"));
+        assertEquals(Sets.newSet(0, 1), allSubtopology2Tasks);
+
+        // Each client should get one task from each subtopology
+        assertNotNull(testMember1);
+        assertEquals(1, 
testMember1.standbyTasks().get("test-subtopology1").size());
+        assertEquals(1, 
testMember1.standbyTasks().get("test-subtopology2").size());
+
+        assertNotNull(testMember2);
+        assertEquals(1, 
testMember2.standbyTasks().get("test-subtopology1").size());
+        assertEquals(1, 
testMember2.standbyTasks().get("test-subtopology2").size());
+    }
+
 
     private int getAllActiveTaskCount(GroupAssignment result, String... 
memberIds) {
         int size = 0;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/LegacyStickyTaskAssignorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/LegacyStickyTaskAssignorTest.java
index 3103e72cd52..64830546f0f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/LegacyStickyTaskAssignorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/LegacyStickyTaskAssignorTest.java
@@ -1263,6 +1263,119 @@ public class LegacyStickyTaskAssignorTest {
         }
     }
 
+    @ParameterizedTest
+    @ValueSource(strings = {
+        StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE,
+        StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC,
+        StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY
+    })
+    public void 
shouldReassignTasksWhenNewNodeJoinsWithExistingActiveAndStandbyAssignments(final
 String rackAwareStrategy) {
+        setUp(rackAwareStrategy);
+
+        // Initial setup: Node 1 has active tasks 0,1 and standby tasks 2,3
+        // Node 2 has active tasks 2,3 and standby tasks 0,1
+        final ClientState node1 = createClientWithPreviousActiveTasks(PID_1, 
1, TASK_0_0, TASK_0_1);
+        node1.addPreviousStandbyTasks(Set.of(TASK_0_2, TASK_0_3));
+
+        final ClientState node2 = createClientWithPreviousActiveTasks(PID_2, 
1, TASK_0_2, TASK_0_3);
+        node2.addPreviousStandbyTasks(Set.of(TASK_0_0, TASK_0_1));
+
+        // Node 3 joins as new client
+        final ClientState node3 = createClient(PID_3, 1);
+
+        final boolean probingRebalanceNeeded = assign(1, rackAwareStrategy, 
TASK_0_0, TASK_0_1, TASK_0_2, TASK_0_3);
+        assertThat(probingRebalanceNeeded, is(false));
+
+        // Verify all active tasks are assigned
+        final Set<TaskId> allAssignedActiveTasks = new HashSet<>();
+        allAssignedActiveTasks.addAll(node1.activeTasks());
+        allAssignedActiveTasks.addAll(node2.activeTasks());
+        allAssignedActiveTasks.addAll(node3.activeTasks());
+        assertThat(allAssignedActiveTasks, equalTo(Set.of(TASK_0_0, TASK_0_1, 
TASK_0_2, TASK_0_3)));
+
+        // Verify all standby tasks are assigned
+        final Set<TaskId> allAssignedStandbyTasks = new HashSet<>();
+        allAssignedStandbyTasks.addAll(node1.standbyTasks());
+        allAssignedStandbyTasks.addAll(node2.standbyTasks());
+        allAssignedStandbyTasks.addAll(node3.standbyTasks());
+        assertThat(allAssignedStandbyTasks, equalTo(Set.of(TASK_0_0, TASK_0_1, 
TASK_0_2, TASK_0_3)));
+
+        // Verify each client has 1-2 active tasks and at most 3 tasks total
+        assertThat(node1.activeTasks().size(), greaterThanOrEqualTo(1));
+        assertThat(node1.activeTasks().size(), lessThanOrEqualTo(2));
+        assertThat(node1.activeTasks().size() + node1.standbyTasks().size(), 
lessThanOrEqualTo(3));
+
+        assertThat(node2.activeTasks().size(), greaterThanOrEqualTo(1));
+        assertThat(node2.activeTasks().size(), lessThanOrEqualTo(2));
+        assertThat(node2.activeTasks().size() + node2.standbyTasks().size(), 
lessThanOrEqualTo(3));
+
+        assertThat(node3.activeTasks().size(), greaterThanOrEqualTo(1));
+        assertThat(node3.activeTasks().size(), lessThanOrEqualTo(2));
+        assertThat(node3.activeTasks().size() + node3.standbyTasks().size(), 
lessThanOrEqualTo(3));
+    }
+
+    @ParameterizedTest
+    @ValueSource(strings = {
+        StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE,
+        StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC,
+        StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY
+    })
+    public void shouldRangeAssignTasksWhenStartingEmpty(final String 
rackAwareStrategy) {
+        setUp(rackAwareStrategy);
+        
+        // Two clients with capacity 1 each, starting empty (no previous tasks)
+        createClient(PID_1, 1);
+        createClient(PID_2, 1);
+        
+        // Two subtopologies with 2 tasks each (4 tasks total)
+        final boolean probingRebalanceNeeded = assign(1, rackAwareStrategy, 
TASK_0_0, TASK_0_1, TASK_1_0, TASK_1_1);
+        assertThat(probingRebalanceNeeded, is(false));
+        
+        // Each client should get one active task from each subtopology
+        final ClientState client1 = clients.get(PID_1);
+        final ClientState client2 = clients.get(PID_2);
+        
+        // Check that each client has one active task from subtopology 0
+        final long client1Subtopology0ActiveCount = 
client1.activeTasks().stream()
+            .filter(task -> task.subtopology() == 0)
+            .count();
+        final long client2Subtopology0ActiveCount = 
client2.activeTasks().stream()
+            .filter(task -> task.subtopology() == 0)
+            .count();
+        assertThat(client1Subtopology0ActiveCount, equalTo(1L));
+        assertThat(client2Subtopology0ActiveCount, equalTo(1L));
+        
+        // Check that each client has one active task from subtopology 1
+        final long client1Subtopology1ActiveCount = 
client1.activeTasks().stream()
+            .filter(task -> task.subtopology() == 1)
+            .count();
+        final long client2Subtopology1ActiveCount = 
client2.activeTasks().stream()
+            .filter(task -> task.subtopology() == 1)
+            .count();
+        assertThat(client1Subtopology1ActiveCount, equalTo(1L));
+        assertThat(client2Subtopology1ActiveCount, equalTo(1L));
+        
+        // Check that each client has one standby task from subtopology 0
+        final long client1Subtopology0StandbyCount = 
client1.standbyTasks().stream()
+            .filter(task -> task.subtopology() == 0)
+            .count();
+        final long client2Subtopology0StandbyCount = 
client2.standbyTasks().stream()
+            .filter(task -> task.subtopology() == 0)
+            .count();
+        assertThat(client1Subtopology0StandbyCount, equalTo(1L));
+        assertThat(client2Subtopology0StandbyCount, equalTo(1L));
+        
+        // Check that each client has one standby task from subtopology 1
+        final long client1Subtopology1StandbyCount = 
client1.standbyTasks().stream()
+            .filter(task -> task.subtopology() == 1)
+            .count();
+        final long client2Subtopology1StandbyCount = 
client2.standbyTasks().stream()
+            .filter(task -> task.subtopology() == 1)
+            .count();
+        assertThat(client1Subtopology1StandbyCount, equalTo(1L));
+        assertThat(client2Subtopology1StandbyCount, equalTo(1L));
+    }
+
     private boolean assign(final String rackAwareStrategy, final TaskId... 
tasks) {
         return assign(0, rackAwareStrategy, tasks);
     }
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java
index 9c4d4016748..6d7ea57b8ac 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java
@@ -180,8 +180,8 @@ public class DescribeStreamsGroupTest {
     public void testDescribeStreamsGroupWithMembersOption() throws Exception {
         final List<String> expectedHeader = List.of("GROUP", "MEMBER", 
"PROCESS", "CLIENT-ID", "ASSIGNMENTS");
         final Set<List<String>> expectedRows = Set.of(
-            List.of(APP_ID, "", "", "", "ACTIVE:", "0:[0,1];"),
-            List.of(APP_ID, "", "", "", "ACTIVE:", "1:[0,1];"));
+            List.of(APP_ID, "", "", "", "ACTIVE:", "0:[1];", "1:[1];"),
+            List.of(APP_ID, "", "", "", "ACTIVE:", "0:[0];", "1:[0];"));
         // The member and process names as well as client-id are not 
deterministic, so we don't care about them.
         final List<Integer> dontCares = List.of(1, 2, 3);
 
@@ -193,8 +193,8 @@ public class DescribeStreamsGroupTest {
     public void testDescribeStreamsGroupWithMembersAndVerboseOptions() throws 
Exception {
         final List<String> expectedHeader = List.of("GROUP", 
"TARGET-ASSIGNMENT-EPOCH", "TOPOLOGY-EPOCH", "MEMBER", "MEMBER-PROTOCOL", 
"MEMBER-EPOCH", "PROCESS", "CLIENT-ID", "ASSIGNMENTS");
         final Set<List<String>> expectedRows = Set.of(
-            List.of(APP_ID, "3", "0", "", "streams", "3", "", "", "ACTIVE:", 
"0:[0,1];", "TARGET-ACTIVE:", "0:[0,1];"),
-            List.of(APP_ID, "3", "0", "", "streams", "3", "", "", "ACTIVE:", 
"1:[0,1];", "TARGET-ACTIVE:", "1:[0,1];"));
+            List.of(APP_ID, "3", "0", "", "streams", "3", "", "", "ACTIVE:", 
"0:[1];", "1:[1];", "TARGET-ACTIVE:", "0:[1];", "1:[1];"),
+            List.of(APP_ID, "3", "0", "", "streams", "3", "", "", "ACTIVE:", 
"0:[0];", "1:[0];", "TARGET-ACTIVE:", "0:[0];", "1:[0];"));
         // The member and process names as well as client-id are not 
deterministic, so we don't care about them.
         final List<Integer> dontCares = List.of(3, 6, 7);
 
@@ -212,8 +212,8 @@ public class DescribeStreamsGroupTest {
 
         final List<String> expectedHeader = List.of("GROUP", 
"TARGET-ASSIGNMENT-EPOCH", "TOPOLOGY-EPOCH", "MEMBER", "MEMBER-PROTOCOL", 
"MEMBER-EPOCH", "PROCESS", "CLIENT-ID", "ASSIGNMENTS");
         final Set<List<String>> expectedRows1 = Set.of(
-            List.of(APP_ID, "3", "0", "", "streams", "3", "", "", "ACTIVE:", 
"0:[0,1];", "TARGET-ACTIVE:", "0:[0,1];"),
-            List.of(APP_ID, "3", "0", "", "streams", "3", "", "", "ACTIVE:", 
"1:[0,1];", "TARGET-ACTIVE:", "1:[0,1];"));
+            List.of(APP_ID, "3", "0", "", "streams", "3", "", "", "ACTIVE:", 
"0:[1];", "1:[1];", "TARGET-ACTIVE:", "0:[1];", "1:[1];"),
+            List.of(APP_ID, "3", "0", "", "streams", "3", "", "", "ACTIVE:", 
"0:[0];", "1:[0];", "TARGET-ACTIVE:", "0:[0];", "1:[0];"));
         final Set<List<String>> expectedRows2 = Set.of(
             List.of(APP_ID_2, "3", "0", "", "streams", "3", "", "", "ACTIVE:", 
"1:[0];", "TARGET-ACTIVE:", "1:[0];"),
             List.of(APP_ID_2, "3", "0", "", "streams", "3", "", "", "ACTIVE:", 
"0:[0];", "TARGET-ACTIVE:", "0:[0];"));

Reply via email to