This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 620a01b74bd KAFKA-19661 [4/N]: Prefer range-style assignment (#20486)
620a01b74bd is described below
commit 620a01b74bd8bbaf42eff002dd0a0692123ec2d5
Author: Lucas Brutschy <[email protected]>
AuthorDate: Tue Sep 9 10:44:37 2025 +0200
KAFKA-19661 [4/N]: Prefer range-style assignment (#20486)
This is actually fixing a difference between the old and the new
assignor. Given the assignment ordering, the legacy assignor has a
preference for range-style assignments built in, that is, assigning
C1: 0_0, 1_0 C2: 0_1, 1_1
instead of
C1: 0_0, 0_1 C2: 1_0, 1_1
We add tests to both assignors to check for this behavior, and improve
the new assingor by enforcing corresponding orderings.
Reviewers: Bill Bejeck <[email protected]>
---
.../group/streams/assignor/StickyTaskAssignor.java | 30 +++--
.../streams/assignor/StickyTaskAssignorTest.java | 142 +++++++++++++++++++++
.../assignment/LegacyStickyTaskAssignorTest.java | 113 ++++++++++++++++
.../tools/streams/DescribeStreamsGroupTest.java | 12 +-
4 files changed, 283 insertions(+), 14 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java
index 7ef5a382584..fc29f93b883 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java
@@ -25,6 +25,7 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.Map;
import java.util.Optional;
import java.util.PriorityQueue;
@@ -53,8 +54,7 @@ public class StickyTaskAssignor implements TaskAssignor {
}
private GroupAssignment doAssign(final GroupSpec groupSpec, final
TopologyDescriber topologyDescriber) {
- //active
- final Set<TaskId> activeTasks = taskIds(topologyDescriber, true);
+ final LinkedList<TaskId> activeTasks = taskIds(topologyDescriber,
true);
assignActive(activeTasks);
//standby
@@ -62,15 +62,15 @@ public class StickyTaskAssignor implements TaskAssignor {
groupSpec.assignmentConfigs().isEmpty() ? 0
:
Integer.parseInt(groupSpec.assignmentConfigs().get("num.standby.replicas"));
if (numStandbyReplicas > 0) {
- final Set<TaskId> statefulTasks = taskIds(topologyDescriber,
false);
+ final LinkedList<TaskId> statefulTasks =
taskIds(topologyDescriber, false);
assignStandby(statefulTasks, numStandbyReplicas);
}
return buildGroupAssignment(groupSpec.members().keySet());
}
- private Set<TaskId> taskIds(final TopologyDescriber topologyDescriber,
final boolean isActive) {
- final Set<TaskId> ret = new HashSet<>();
+ private LinkedList<TaskId> taskIds(final TopologyDescriber
topologyDescriber, final boolean isActive) {
+ final LinkedList<TaskId> ret = new LinkedList<>();
for (final String subtopology : topologyDescriber.subtopologies()) {
if (isActive || topologyDescriber.isStateful(subtopology)) {
final int numberOfPartitions =
topologyDescriber.maxNumInputPartitions(subtopology);
@@ -166,7 +166,10 @@ public class StickyTaskAssignor implements TaskAssignor {
return ret;
}
- private void assignActive(final Set<TaskId> activeTasks) {
+ private void assignActive(final LinkedList<TaskId> activeTasks) {
+
+ // Assuming our current assignment pairs same partitions
(range-based), we want to sort by partition first
+
activeTasks.sort(Comparator.comparing(TaskId::partition).thenComparing(TaskId::subtopologyId));
// 1. re-assigning existing active tasks to clients that previously
had the same active tasks
for (final Iterator<TaskId> it = activeTasks.iterator();
it.hasNext();) {
@@ -193,6 +196,9 @@ public class StickyTaskAssignor implements TaskAssignor {
}
}
+ // To achieve an initially range-based assignment, sort by subtopology
+
activeTasks.sort(Comparator.comparing(TaskId::subtopologyId).thenComparing(TaskId::partition));
+
// 3. assign any remaining unassigned tasks
final PriorityQueue<ProcessState> processByLoad = new
PriorityQueue<>(Comparator.comparingDouble(ProcessState::load));
processByLoad.addAll(localState.processIdToState.values());
@@ -296,9 +302,13 @@ public class StickyTaskAssignor implements TaskAssignor {
return
localState.processIdToState.get(member.processId).memberToTaskCounts().get(member.memberId)
< localState.tasksPerMember;
}
- private void assignStandby(final Set<TaskId> standbyTasks, final int
numStandbyReplicas) {
+ private void assignStandby(final LinkedList<TaskId> standbyTasks, int
numStandbyReplicas) {
final ArrayList<StandbyToAssign> toLeastLoaded = new
ArrayList<>(standbyTasks.size() * numStandbyReplicas);
- for (final TaskId task : standbyTasks) {
+
+ // Assuming our current assignment is range-based, we want to sort by
partition first.
+
standbyTasks.sort(Comparator.comparing(TaskId::partition).thenComparing(TaskId::subtopologyId).reversed());
+
+ for (TaskId task : standbyTasks) {
for (int i = 0; i < numStandbyReplicas; i++) {
// prev active task
@@ -329,6 +339,10 @@ public class StickyTaskAssignor implements TaskAssignor {
}
}
+ // To achieve a range-based assignment, sort by subtopology
+ toLeastLoaded.sort(Comparator.<StandbyToAssign, String>comparing(x ->
x.taskId.subtopologyId())
+ .thenComparing(x -> x.taskId.partition()).reversed());
+
final PriorityQueue<ProcessState> processByLoad = new
PriorityQueue<>(Comparator.comparingDouble(ProcessState::load));
processByLoad.addAll(localState.processIdToState.values());
for (final StandbyToAssign toAssign : toLeastLoaded) {
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignorTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignorTest.java
index b4fa9c4db99..1e9d4115cb2 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignorTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignorTest.java
@@ -1091,6 +1091,148 @@ public class StickyTaskAssignorTest {
assertEquals(numTasks, allStandbyTasks.size());
}
+ @Test
+ public void
shouldReassignTasksWhenNewNodeJoinsWithExistingActiveAndStandbyAssignments() {
+ // Initial setup: Node 1 has active tasks 0,1 and standby tasks 2,3
+ // Node 2 has active tasks 2,3 and standby tasks 0,1
+ final AssignmentMemberSpec memberSpec1 =
createAssignmentMemberSpec("process1",
+ mkMap(mkEntry("test-subtopology", Sets.newSet(0, 1))),
+ mkMap(mkEntry("test-subtopology", Sets.newSet(2, 3))));
+
+ final AssignmentMemberSpec memberSpec2 =
createAssignmentMemberSpec("process2",
+ mkMap(mkEntry("test-subtopology", Sets.newSet(2, 3))),
+ mkMap(mkEntry("test-subtopology", Sets.newSet(0, 1))));
+
+ // Node 3 joins as new client
+ final AssignmentMemberSpec memberSpec3 =
createAssignmentMemberSpec("process3");
+
+ final Map<String, AssignmentMemberSpec> members = mkMap(
+ mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2),
mkEntry("member3", memberSpec3));
+
+ final GroupAssignment result = assignor.assign(
+ new GroupSpecImpl(members,
mkMap(mkEntry(NUM_STANDBY_REPLICAS_CONFIG, "1"))),
+ new TopologyDescriberImpl(4, true, List.of("test-subtopology"))
+ );
+
+ // Verify all active tasks are assigned
+ final Set<Integer> allAssignedActiveTasks = new HashSet<>();
+ allAssignedActiveTasks.addAll(getAllActiveTaskIds(result, "member1"));
+ allAssignedActiveTasks.addAll(getAllActiveTaskIds(result, "member2"));
+ allAssignedActiveTasks.addAll(getAllActiveTaskIds(result, "member3"));
+ assertEquals(Sets.newSet(0, 1, 2, 3), allAssignedActiveTasks);
+
+ // Verify all standby tasks are assigned
+ final Set<Integer> allAssignedStandbyTasks = new HashSet<>();
+ allAssignedStandbyTasks.addAll(getAllStandbyTaskIds(result,
"member1"));
+ allAssignedStandbyTasks.addAll(getAllStandbyTaskIds(result,
"member2"));
+ allAssignedStandbyTasks.addAll(getAllStandbyTaskIds(result,
"member3"));
+ assertEquals(Sets.newSet(0, 1, 2, 3), allAssignedStandbyTasks);
+
+ // Verify each member has 1-2 active tasks and at most 3 tasks total
+ assertTrue(getAllActiveTaskIds(result, "member1").size() >= 1 &&
getAllActiveTaskIds(result, "member1").size() <= 2);
+ assertTrue(getAllActiveTaskIds(result, "member1").size() +
getAllStandbyTaskIds(result, "member1").size() <= 3);
+
+ assertTrue(getAllActiveTaskIds(result, "member2").size() >= 1 &&
getAllActiveTaskIds(result, "member2").size() <= 2);
+ assertTrue(getAllActiveTaskIds(result, "member2").size() +
getAllStandbyTaskIds(result, "member2").size() <= 3);
+
+ assertTrue(getAllActiveTaskIds(result, "member3").size() >= 1 &&
getAllActiveTaskIds(result, "member3").size() <= 2);
+ assertTrue(getAllActiveTaskIds(result, "member3").size() +
getAllStandbyTaskIds(result, "member3").size() <= 3);
+ }
+
+ @Test
+ public void shouldRangeAssignTasksWhenScalingUp() {
+ // Two clients, the second one is new
+ final AssignmentMemberSpec memberSpec1 =
createAssignmentMemberSpec("process1",
+ Map.of("test-subtopology1", Set.of(0, 1), "test-subtopology2",
Set.of(0, 1)),
+ Map.of());
+ final AssignmentMemberSpec memberSpec2 =
createAssignmentMemberSpec("process2");
+ final Map<String, AssignmentMemberSpec> members = mkMap(
+ mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2));
+
+ // Two subtopologies with 2 tasks each (4 tasks total) with standby
replicas enabled
+ final GroupAssignment result = assignor.assign(
+ new GroupSpecImpl(members,
mkMap(mkEntry(NUM_STANDBY_REPLICAS_CONFIG, String.valueOf(1)))),
+ new TopologyDescriberImpl(2, true,
Arrays.asList("test-subtopology1", "test-subtopology2"))
+ );
+
+ // Each client should get one task from each subtopology
+ final MemberAssignment testMember1 = result.members().get("member1");
+ assertNotNull(testMember1);
+ assertEquals(1,
testMember1.activeTasks().get("test-subtopology1").size());
+ assertEquals(1,
testMember1.activeTasks().get("test-subtopology2").size());
+
+ final MemberAssignment testMember2 = result.members().get("member2");
+ assertNotNull(testMember2);
+ assertEquals(1,
testMember2.activeTasks().get("test-subtopology1").size());
+ assertEquals(1,
testMember2.activeTasks().get("test-subtopology2").size());
+
+ // Verify all tasks are assigned exactly once
+ final Set<Integer> allSubtopology1Tasks = new HashSet<>();
+
allSubtopology1Tasks.addAll(testMember1.activeTasks().get("test-subtopology1"));
+
allSubtopology1Tasks.addAll(testMember2.activeTasks().get("test-subtopology1"));
+ assertEquals(Sets.newSet(0, 1), allSubtopology1Tasks);
+
+ final Set<Integer> allSubtopology2Tasks = new HashSet<>();
+
allSubtopology2Tasks.addAll(testMember1.activeTasks().get("test-subtopology2"));
+
allSubtopology2Tasks.addAll(testMember2.activeTasks().get("test-subtopology2"));
+ assertEquals(Sets.newSet(0, 1), allSubtopology2Tasks);
+
+ // Each client should get one task from each subtopology
+ assertNotNull(testMember1);
+ assertEquals(1,
testMember1.standbyTasks().get("test-subtopology1").size());
+ assertEquals(1,
testMember1.standbyTasks().get("test-subtopology2").size());
+
+ assertNotNull(testMember2);
+ assertEquals(1,
testMember2.standbyTasks().get("test-subtopology1").size());
+ assertEquals(1,
testMember2.standbyTasks().get("test-subtopology2").size());
+ }
+
+ @Test
+ public void shouldRangeAssignTasksWhenStartingEmpty() {
+ // Two clients starting empty (no previous tasks)
+ final AssignmentMemberSpec memberSpec1 =
createAssignmentMemberSpec("process1");
+ final AssignmentMemberSpec memberSpec2 =
createAssignmentMemberSpec("process2");
+ final Map<String, AssignmentMemberSpec> members = mkMap(
+ mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2));
+
+ // Two subtopologies with 2 tasks each (4 tasks total) with standby
replicas enabled
+ final GroupAssignment result = assignor.assign(
+ new GroupSpecImpl(members,
mkMap(mkEntry(NUM_STANDBY_REPLICAS_CONFIG, String.valueOf(1)))),
+ new TopologyDescriberImpl(2, true,
Arrays.asList("test-subtopology1", "test-subtopology2"))
+ );
+
+ // Each client should get one task from each subtopology
+ final MemberAssignment testMember1 = result.members().get("member1");
+ assertNotNull(testMember1);
+ assertEquals(1,
testMember1.activeTasks().get("test-subtopology1").size());
+ assertEquals(1,
testMember1.activeTasks().get("test-subtopology2").size());
+
+ final MemberAssignment testMember2 = result.members().get("member2");
+ assertNotNull(testMember2);
+ assertEquals(1,
testMember2.activeTasks().get("test-subtopology1").size());
+ assertEquals(1,
testMember2.activeTasks().get("test-subtopology2").size());
+
+ // Verify all tasks are assigned exactly once
+ final Set<Integer> allSubtopology1Tasks = new HashSet<>();
+
allSubtopology1Tasks.addAll(testMember1.activeTasks().get("test-subtopology1"));
+
allSubtopology1Tasks.addAll(testMember2.activeTasks().get("test-subtopology1"));
+ assertEquals(Sets.newSet(0, 1), allSubtopology1Tasks);
+
+ final Set<Integer> allSubtopology2Tasks = new HashSet<>();
+
allSubtopology2Tasks.addAll(testMember1.activeTasks().get("test-subtopology2"));
+
allSubtopology2Tasks.addAll(testMember2.activeTasks().get("test-subtopology2"));
+ assertEquals(Sets.newSet(0, 1), allSubtopology2Tasks);
+
+ // Each client should get one task from each subtopology
+ assertNotNull(testMember1);
+ assertEquals(1,
testMember1.standbyTasks().get("test-subtopology1").size());
+ assertEquals(1,
testMember1.standbyTasks().get("test-subtopology2").size());
+
+ assertNotNull(testMember2);
+ assertEquals(1,
testMember2.standbyTasks().get("test-subtopology1").size());
+ assertEquals(1,
testMember2.standbyTasks().get("test-subtopology2").size());
+ }
+
private int getAllActiveTaskCount(GroupAssignment result, String...
memberIds) {
int size = 0;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/LegacyStickyTaskAssignorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/LegacyStickyTaskAssignorTest.java
index 3103e72cd52..64830546f0f 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/LegacyStickyTaskAssignorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/LegacyStickyTaskAssignorTest.java
@@ -1263,6 +1263,119 @@ public class LegacyStickyTaskAssignorTest {
}
}
+ @ParameterizedTest
+ @ValueSource(strings = {
+ StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE,
+ StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC,
+ StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY
+ })
+ public void
shouldReassignTasksWhenNewNodeJoinsWithExistingActiveAndStandbyAssignments(final
String rackAwareStrategy) {
+ setUp(rackAwareStrategy);
+
+ // Initial setup: Node 1 has active tasks 0,1 and standby tasks 2,3
+ // Node 2 has active tasks 2,3 and standby tasks 0,1
+ final ClientState node1 = createClientWithPreviousActiveTasks(PID_1,
1, TASK_0_0, TASK_0_1);
+ node1.addPreviousStandbyTasks(Set.of(TASK_0_2, TASK_0_3));
+
+ final ClientState node2 = createClientWithPreviousActiveTasks(PID_2,
1, TASK_0_2, TASK_0_3);
+ node2.addPreviousStandbyTasks(Set.of(TASK_0_0, TASK_0_1));
+
+ // Node 3 joins as new client
+ final ClientState node3 = createClient(PID_3, 1);
+
+ final boolean probingRebalanceNeeded = assign(1, rackAwareStrategy,
TASK_0_0, TASK_0_1, TASK_0_2, TASK_0_3);
+ assertThat(probingRebalanceNeeded, is(false));
+
+ // Verify all active tasks are assigned
+ final Set<TaskId> allAssignedActiveTasks = new HashSet<>();
+ allAssignedActiveTasks.addAll(node1.activeTasks());
+ allAssignedActiveTasks.addAll(node2.activeTasks());
+ allAssignedActiveTasks.addAll(node3.activeTasks());
+ assertThat(allAssignedActiveTasks, equalTo(Set.of(TASK_0_0, TASK_0_1,
TASK_0_2, TASK_0_3)));
+
+ // Verify all standby tasks are assigned
+ final Set<TaskId> allAssignedStandbyTasks = new HashSet<>();
+ allAssignedStandbyTasks.addAll(node1.standbyTasks());
+ allAssignedStandbyTasks.addAll(node2.standbyTasks());
+ allAssignedStandbyTasks.addAll(node3.standbyTasks());
+ assertThat(allAssignedStandbyTasks, equalTo(Set.of(TASK_0_0, TASK_0_1,
TASK_0_2, TASK_0_3)));
+
+ // Verify each client has 1-2 active tasks and at most 3 tasks total
+ assertThat(node1.activeTasks().size(), greaterThanOrEqualTo(1));
+ assertThat(node1.activeTasks().size(), lessThanOrEqualTo(2));
+ assertThat(node1.activeTasks().size() + node1.standbyTasks().size(),
lessThanOrEqualTo(3));
+
+ assertThat(node2.activeTasks().size(), greaterThanOrEqualTo(1));
+ assertThat(node2.activeTasks().size(), lessThanOrEqualTo(2));
+ assertThat(node2.activeTasks().size() + node2.standbyTasks().size(),
lessThanOrEqualTo(3));
+
+ assertThat(node3.activeTasks().size(), greaterThanOrEqualTo(1));
+ assertThat(node3.activeTasks().size(), lessThanOrEqualTo(2));
+ assertThat(node3.activeTasks().size() + node3.standbyTasks().size(),
lessThanOrEqualTo(3));
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {
+ StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE,
+ StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC,
+ StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY
+ })
+ public void shouldRangeAssignTasksWhenStartingEmpty(final String
rackAwareStrategy) {
+ setUp(rackAwareStrategy);
+
+ // Two clients with capacity 1 each, starting empty (no previous tasks)
+ createClient(PID_1, 1);
+ createClient(PID_2, 1);
+
+ // Two subtopologies with 2 tasks each (4 tasks total)
+ final boolean probingRebalanceNeeded = assign(1, rackAwareStrategy,
TASK_0_0, TASK_0_1, TASK_1_0, TASK_1_1);
+ assertThat(probingRebalanceNeeded, is(false));
+
+ // Each client should get one active task from each subtopology
+ final ClientState client1 = clients.get(PID_1);
+ final ClientState client2 = clients.get(PID_2);
+
+ // Check that each client has one active task from subtopology 0
+ final long client1Subtopology0ActiveCount =
client1.activeTasks().stream()
+ .filter(task -> task.subtopology() == 0)
+ .count();
+ final long client2Subtopology0ActiveCount =
client2.activeTasks().stream()
+ .filter(task -> task.subtopology() == 0)
+ .count();
+ assertThat(client1Subtopology0ActiveCount, equalTo(1L));
+ assertThat(client2Subtopology0ActiveCount, equalTo(1L));
+
+ // Check that each client has one active task from subtopology 1
+ final long client1Subtopology1ActiveCount =
client1.activeTasks().stream()
+ .filter(task -> task.subtopology() == 1)
+ .count();
+ final long client2Subtopology1ActiveCount =
client2.activeTasks().stream()
+ .filter(task -> task.subtopology() == 1)
+ .count();
+ assertThat(client1Subtopology1ActiveCount, equalTo(1L));
+ assertThat(client2Subtopology1ActiveCount, equalTo(1L));
+
+ // Check that each client has one standby task from subtopology 0
+ final long client1Subtopology0StandbyCount =
client1.standbyTasks().stream()
+ .filter(task -> task.subtopology() == 0)
+ .count();
+ final long client2Subtopology0StandbyCount =
client2.standbyTasks().stream()
+ .filter(task -> task.subtopology() == 0)
+ .count();
+ assertThat(client1Subtopology0StandbyCount, equalTo(1L));
+ assertThat(client2Subtopology0StandbyCount, equalTo(1L));
+
+ // Check that each client has one standby task from subtopology 1
+ final long client1Subtopology1StandbyCount =
client1.standbyTasks().stream()
+ .filter(task -> task.subtopology() == 1)
+ .count();
+ final long client2Subtopology1StandbyCount =
client2.standbyTasks().stream()
+ .filter(task -> task.subtopology() == 1)
+ .count();
+ assertThat(client1Subtopology1StandbyCount, equalTo(1L));
+ assertThat(client2Subtopology1StandbyCount, equalTo(1L));
+ }
+
private boolean assign(final String rackAwareStrategy, final TaskId...
tasks) {
return assign(0, rackAwareStrategy, tasks);
}
diff --git
a/tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java
b/tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java
index 9c4d4016748..6d7ea57b8ac 100644
---
a/tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java
@@ -180,8 +180,8 @@ public class DescribeStreamsGroupTest {
public void testDescribeStreamsGroupWithMembersOption() throws Exception {
final List<String> expectedHeader = List.of("GROUP", "MEMBER",
"PROCESS", "CLIENT-ID", "ASSIGNMENTS");
final Set<List<String>> expectedRows = Set.of(
- List.of(APP_ID, "", "", "", "ACTIVE:", "0:[0,1];"),
- List.of(APP_ID, "", "", "", "ACTIVE:", "1:[0,1];"));
+ List.of(APP_ID, "", "", "", "ACTIVE:", "0:[1];", "1:[1];"),
+ List.of(APP_ID, "", "", "", "ACTIVE:", "0:[0];", "1:[0];"));
// The member and process names as well as client-id are not
deterministic, so we don't care about them.
final List<Integer> dontCares = List.of(1, 2, 3);
@@ -193,8 +193,8 @@ public class DescribeStreamsGroupTest {
public void testDescribeStreamsGroupWithMembersAndVerboseOptions() throws
Exception {
final List<String> expectedHeader = List.of("GROUP",
"TARGET-ASSIGNMENT-EPOCH", "TOPOLOGY-EPOCH", "MEMBER", "MEMBER-PROTOCOL",
"MEMBER-EPOCH", "PROCESS", "CLIENT-ID", "ASSIGNMENTS");
final Set<List<String>> expectedRows = Set.of(
- List.of(APP_ID, "3", "0", "", "streams", "3", "", "", "ACTIVE:",
"0:[0,1];", "TARGET-ACTIVE:", "0:[0,1];"),
- List.of(APP_ID, "3", "0", "", "streams", "3", "", "", "ACTIVE:",
"1:[0,1];", "TARGET-ACTIVE:", "1:[0,1];"));
+ List.of(APP_ID, "3", "0", "", "streams", "3", "", "", "ACTIVE:",
"0:[1];", "1:[1];", "TARGET-ACTIVE:", "0:[1];", "1:[1];"),
+ List.of(APP_ID, "3", "0", "", "streams", "3", "", "", "ACTIVE:",
"0:[0];", "1:[0];", "TARGET-ACTIVE:", "0:[0];", "1:[0];"));
// The member and process names as well as client-id are not
deterministic, so we don't care about them.
final List<Integer> dontCares = List.of(3, 6, 7);
@@ -212,8 +212,8 @@ public class DescribeStreamsGroupTest {
final List<String> expectedHeader = List.of("GROUP",
"TARGET-ASSIGNMENT-EPOCH", "TOPOLOGY-EPOCH", "MEMBER", "MEMBER-PROTOCOL",
"MEMBER-EPOCH", "PROCESS", "CLIENT-ID", "ASSIGNMENTS");
final Set<List<String>> expectedRows1 = Set.of(
- List.of(APP_ID, "3", "0", "", "streams", "3", "", "", "ACTIVE:",
"0:[0,1];", "TARGET-ACTIVE:", "0:[0,1];"),
- List.of(APP_ID, "3", "0", "", "streams", "3", "", "", "ACTIVE:",
"1:[0,1];", "TARGET-ACTIVE:", "1:[0,1];"));
+ List.of(APP_ID, "3", "0", "", "streams", "3", "", "", "ACTIVE:",
"0:[1];", "1:[1];", "TARGET-ACTIVE:", "0:[1];", "1:[1];"),
+ List.of(APP_ID, "3", "0", "", "streams", "3", "", "", "ACTIVE:",
"0:[0];", "1:[0];", "TARGET-ACTIVE:", "0:[0];", "1:[0];"));
final Set<List<String>> expectedRows2 = Set.of(
List.of(APP_ID_2, "3", "0", "", "streams", "3", "", "", "ACTIVE:",
"1:[0];", "TARGET-ACTIVE:", "1:[0];"),
List.of(APP_ID_2, "3", "0", "", "streams", "3", "", "", "ACTIVE:",
"0:[0];", "TARGET-ACTIVE:", "0:[0];"));