This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 24385a89cf9 MINOR: Replace assertUnorderedListEquals by
assertUnorderedRecordsEquals in group-coordinator module (#18076)
24385a89cf9 is described below
commit 24385a89cf991ddd79e1dcb000cb680c0a2e137e
Author: David Jacot <[email protected]>
AuthorDate: Sat Dec 7 04:43:25 2024 +0100
MINOR: Replace assertUnorderedListEquals by assertUnorderedRecordsEquals in
group-coordinator module (#18076)
Reviewers: Lianet Magrans <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../apache/kafka/coordinator/group/Assertions.java | 18 +-
.../group/GroupMetadataManagerTest.java | 493 +++++++++++----------
.../group/modern/TargetAssignmentBuilderTest.java | 281 ++++++------
3 files changed, 412 insertions(+), 380 deletions(-)
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/Assertions.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/Assertions.java
index 15d3cd03f05..85c7705c740 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/Assertions.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/Assertions.java
@@ -37,7 +37,6 @@ import org.opentest4j.AssertionFailedError;
import java.nio.ByteBuffer;
import java.util.Comparator;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
@@ -65,13 +64,6 @@ public class Assertions {
ShareGroupPartitionMetadataValue.class,
Assertions::assertShareGroupPartitionMetadataValue
);
- public static <T> void assertUnorderedListEquals(
- List<T> expected,
- List<T> actual
- ) {
- assertEquals(new HashSet<>(expected), new HashSet<>(actual));
- }
-
public static void assertResponseEquals(
ApiMessage expected,
ApiMessage actual
@@ -101,6 +93,16 @@ public class Assertions {
}
}
+ /**
+ * Assert that the expected records are equal to the provided records.
+ *
+ * @param expectedRecords An ordered list of groupings. Each grouping
+ * defines a list of records that must be present,
+ * but they could be in any order.
+ * @param actualRecords An ordered list of records.
+ * @throws AssertionFailedError if the expected and the actual records do
+ * not match.
+ */
public static void assertUnorderedRecordsEquals(
List<List<CoordinatorRecord>> expectedRecords,
List<CoordinatorRecord> actualRecords
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 97ba6a5873a..b92ce049c3e 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -97,7 +97,6 @@ import org.apache.kafka.server.common.MetadataVersion;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
-import org.opentest4j.AssertionFailedError;
import java.util.ArrayList;
import java.util.Collections;
@@ -122,7 +121,7 @@ import static
org.apache.kafka.common.requests.JoinGroupRequest.UNKNOWN_MEMBER_I
import static org.apache.kafka.coordinator.group.Assertions.assertRecordEquals;
import static
org.apache.kafka.coordinator.group.Assertions.assertRecordsEquals;
import static
org.apache.kafka.coordinator.group.Assertions.assertResponseEquals;
-import static
org.apache.kafka.coordinator.group.Assertions.assertUnorderedListEquals;
+import static
org.apache.kafka.coordinator.group.Assertions.assertUnorderedRecordsEquals;
import static
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
import static
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
import static
org.apache.kafka.coordinator.group.GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG;
@@ -783,28 +782,29 @@ public class GroupMetadataManagerTest {
.setServerAssignorName("range")
.build();
- List<CoordinatorRecord> expectedRecords = List.of(
-
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedMember3),
- GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
11),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId1, mkAssignment(
- mkTopicAssignment(fooTopicId, 0, 1),
- mkTopicAssignment(barTopicId, 0)
- )),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId2, mkAssignment(
- mkTopicAssignment(fooTopicId, 2, 3),
- mkTopicAssignment(barTopicId, 1)
- )),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId3, mkAssignment(
- mkTopicAssignment(fooTopicId, 4, 5),
- mkTopicAssignment(barTopicId, 2)
- )),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
11),
-
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember3)
+ assertUnorderedRecordsEquals(
+ List.of(
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedMember3)),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11)),
+ List.of(
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId1, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1),
+ mkTopicAssignment(barTopicId, 0)
+ )),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId2, mkAssignment(
+ mkTopicAssignment(fooTopicId, 2, 3),
+ mkTopicAssignment(barTopicId, 1)
+ )),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId3, mkAssignment(
+ mkTopicAssignment(fooTopicId, 4, 5),
+ mkTopicAssignment(barTopicId, 2)
+ ))
+ ),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
11)),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember3))
+ ),
+ result.records()
);
-
- assertRecordsEquals(expectedRecords.subList(0, 2),
result.records().subList(0, 2));
- assertUnorderedListEquals(expectedRecords.subList(2, 5),
result.records().subList(2, 5));
- assertRecordsEquals(expectedRecords.subList(5, 7),
result.records().subList(5, 7));
}
@Test
@@ -1007,28 +1007,29 @@ public class GroupMetadataManagerTest {
.setServerAssignorName("range")
.build();
- List<CoordinatorRecord> expectedRecords = List.of(
-
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedMember3),
- GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
11),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId1, mkAssignment(
- mkTopicAssignment(fooTopicId, 0, 1),
- mkTopicAssignment(barTopicId, 0)
- )),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId2, mkAssignment(
- mkTopicAssignment(fooTopicId, 2, 3),
- mkTopicAssignment(barTopicId, 1)
- )),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId3, mkAssignment(
- mkTopicAssignment(fooTopicId, 4, 5),
- mkTopicAssignment(barTopicId, 2)
- )),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
11),
-
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember3)
+ assertUnorderedRecordsEquals(
+ List.of(
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedMember3)),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11)),
+ List.of(
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId1, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1),
+ mkTopicAssignment(barTopicId, 0)
+ )),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId2, mkAssignment(
+ mkTopicAssignment(fooTopicId, 2, 3),
+ mkTopicAssignment(barTopicId, 1)
+ )),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId3, mkAssignment(
+ mkTopicAssignment(fooTopicId, 4, 5),
+ mkTopicAssignment(barTopicId, 2)
+ ))
+ ),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
11)),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember3))
+ ),
+ result.records()
);
-
- assertRecordsEquals(expectedRecords.subList(0, 2),
result.records().subList(0, 2));
- assertUnorderedListEquals(expectedRecords.subList(2, 5),
result.records().subList(2, 5));
- assertRecordsEquals(expectedRecords.subList(5, 7),
result.records().subList(5, 7));
}
@Test
@@ -11025,28 +11026,28 @@ public class GroupMetadataManagerTest {
)
);
- List<CoordinatorRecord> expectedRecords = List.of(
-
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
memberId1),
-
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
memberId2),
-
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
memberId1),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
memberId2),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord(groupId),
-
-
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
memberId1),
-
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
memberId2),
-
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataTombstoneRecord(groupId),
-
GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord(groupId),
-
-
GroupCoordinatorRecordHelpers.newGroupMetadataRecord(expectedClassicGroup,
assignments, MetadataVersion.latestTesting())
+ assertUnorderedRecordsEquals(
+ List.of(
+ List.of(
+
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
memberId1),
+
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
memberId2)
+ ),
+ List.of(
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
memberId1),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
memberId2)
+ ),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord(groupId)),
+ List.of(
+
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
memberId1),
+
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
memberId2)
+ ),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataTombstoneRecord(groupId)),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord(groupId)),
+
List.of(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(expectedClassicGroup,
assignments, MetadataVersion.latestTesting()))
+ ),
+ result.records()
);
- assertUnorderedListEquals(expectedRecords.subList(0, 2),
result.records().subList(0, 2));
- assertUnorderedListEquals(expectedRecords.subList(2, 4),
result.records().subList(2, 4));
- assertRecordEquals(expectedRecords.get(4), result.records().get(4));
- assertUnorderedListEquals(expectedRecords.subList(5, 7),
result.records().subList(5, 7));
- assertRecordsEquals(expectedRecords.subList(7, 10),
result.records().subList(7, 10));
-
verify(context.metrics,
times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.STABLE,
null);
// The new classic member 1 has a heartbeat timeout.
@@ -11211,28 +11212,29 @@ public class GroupMetadataManagerTest {
assignment
)
);
- List<CoordinatorRecord> expectedRecords = List.of(
-
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
memberId1),
-
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
memberId2),
-
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
memberId1),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
memberId2),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord(groupId),
-
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
memberId1),
-
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
memberId2),
-
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataTombstoneRecord(groupId),
-
GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord(groupId),
-
-
GroupCoordinatorRecordHelpers.newGroupMetadataRecord(expectedClassicGroup,
assignments, MetadataVersion.latestTesting())
+ assertUnorderedRecordsEquals(
+ List.of(
+ List.of(
+
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
memberId1),
+
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
memberId2)
+ ),
+ List.of(
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
memberId1),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
memberId2)
+ ),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord(groupId)),
+ List.of(
+
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
memberId1),
+
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
memberId2)
+ ),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataTombstoneRecord(groupId)),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord(groupId)),
+
List.of(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(expectedClassicGroup,
assignments, MetadataVersion.latestTesting()))
+ ),
+ timeout.result.records()
);
- assertUnorderedListEquals(expectedRecords.subList(0, 2),
timeout.result.records().subList(0, 2));
- assertUnorderedListEquals(expectedRecords.subList(2, 4),
timeout.result.records().subList(2, 4));
- assertRecordEquals(expectedRecords.get(4),
timeout.result.records().get(4));
- assertUnorderedListEquals(expectedRecords.subList(5, 7),
timeout.result.records().subList(5, 7));
- assertRecordsEquals(expectedRecords.subList(7, 10),
timeout.result.records().subList(7, 10));
-
verify(context.metrics,
times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.STABLE,
null);
// The new classic member 1 has a heartbeat timeout.
@@ -11416,28 +11418,28 @@ public class GroupMetadataManagerTest {
)
);
- List<CoordinatorRecord> expectedRecords = List.of(
-
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
memberId1),
-
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
memberId2),
-
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
memberId1),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
memberId2),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord(groupId),
-
-
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
memberId1),
-
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
memberId2),
-
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataTombstoneRecord(groupId),
-
GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord(groupId),
-
-
GroupCoordinatorRecordHelpers.newGroupMetadataRecord(expectedClassicGroup,
assignments, MetadataVersion.latestTesting())
+ assertUnorderedRecordsEquals(
+ List.of(
+ List.of(
+
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
memberId1),
+
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
memberId2)
+ ),
+ List.of(
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
memberId1),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
memberId2)
+ ),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord(groupId)),
+ List.of(
+
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
memberId1),
+
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
memberId2)
+ ),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataTombstoneRecord(groupId)),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord(groupId)),
+
List.of(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(expectedClassicGroup,
assignments, MetadataVersion.latestTesting()))
+ ),
+ timeout.result.records()
);
- assertUnorderedListEquals(expectedRecords.subList(0, 2),
timeout.result.records().subList(0, 2));
- assertUnorderedListEquals(expectedRecords.subList(2, 4),
timeout.result.records().subList(2, 4));
- assertRecordEquals(expectedRecords.get(4),
timeout.result.records().get(4));
- assertUnorderedListEquals(expectedRecords.subList(5, 7),
timeout.result.records().subList(5, 7));
- assertRecordsEquals(expectedRecords.subList(7, 10),
timeout.result.records().subList(7, 10));
-
verify(context.metrics,
times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.RECONCILING,
null);
// The new classic member 1 has a heartbeat timeout.
@@ -11629,54 +11631,49 @@ public class GroupMetadataManagerTest {
)
);
- List<CoordinatorRecord> expectedRecords = List.of(
- // Remove the existing member 2 that uses the consumer protocol.
-
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
oldMemberId2),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
oldMemberId2),
-
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
oldMemberId2),
+ // The leader of the classic group is not deterministic.
+ String leader =
context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId,
false).leaderOrNull();
+ assertTrue(Set.of(memberId1, newMemberId2).contains(leader));
+ expectedClassicGroup.setLeaderId(Optional.of(leader));
- // Create the new member 2 that uses the consumer protocol.
-
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedNewConsumerMember2),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
newMemberId2, expectedNewConsumerMember2.assignedPartitions()),
-
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedNewConsumerMember2),
+ assertUnorderedRecordsEquals(
+ List.of(
+ // Remove the existing member 2 that uses the consumer
protocol.
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
oldMemberId2)),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
oldMemberId2)),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
oldMemberId2)),
- // Update the new member 2 to the member that uses classic
protocol.
-
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedNewClassicMember2),
-
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedNewClassicMember2),
+ // Create the new member 2 that uses the consumer protocol.
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedNewConsumerMember2)),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
newMemberId2, expectedNewConsumerMember2.assignedPartitions())),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedNewConsumerMember2)),
- // Remove member 1, member 2 and the consumer group.
-
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
memberId1),
-
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
newMemberId2),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
memberId1),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
newMemberId2),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord(groupId),
-
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
memberId1),
-
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
newMemberId2),
-
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataTombstoneRecord(groupId),
-
GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord(groupId),
-
- // Create the classic group.
-
GroupCoordinatorRecordHelpers.newGroupMetadataRecord(expectedClassicGroup,
assignments, MetadataVersion.latestTesting())
- );
-
- assertEquals(expectedRecords.size(), result.records.size());
- assertRecordsEquals(expectedRecords.subList(0, 8),
result.records.subList(0, 8));
- assertUnorderedListEquals(expectedRecords.subList(8, 10),
result.records.subList(8, 10));
- assertUnorderedListEquals(expectedRecords.subList(10, 12),
result.records.subList(10, 12));
- assertRecordEquals(expectedRecords.get(12), result.records.get(12));
- assertUnorderedListEquals(expectedRecords.subList(13, 15),
result.records.subList(13, 15));
- assertRecordsEquals(expectedRecords.subList(15, 17),
result.records.subList(15, 17));
-
- // Leader can be either member 1 or member 2.
- try {
- assertRecordEquals(expectedRecords.get(17),
result.records.get(17));
- } catch (AssertionFailedError e) {
- expectedClassicGroup.setLeaderId(Optional.of(newMemberId2));
- assertRecordEquals(
-
GroupCoordinatorRecordHelpers.newGroupMetadataRecord(expectedClassicGroup,
assignments, MetadataVersion.latestTesting()),
- result.records.get(9)
- );
- }
+ // Update the new member 2 to the member that uses classic
protocol.
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedNewClassicMember2)),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedNewClassicMember2)),
+
+ // Remove member 1, member 2 and the consumer group.
+ List.of(
+
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
memberId1),
+
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
newMemberId2)
+ ),
+ List.of(
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
memberId1),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
newMemberId2)
+ ),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord(groupId)),
+ List.of(
+
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
memberId1),
+
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
newMemberId2)
+ ),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataTombstoneRecord(groupId)),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord(groupId)),
+
+ // Create the classic group.
+
List.of(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(expectedClassicGroup,
assignments, MetadataVersion.latestTesting()))
+ ),
+ result.records
+ );
verify(context.metrics,
times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.STABLE,
null);
@@ -11849,23 +11846,25 @@ public class GroupMetadataManagerTest {
)
.build();
- List<CoordinatorRecord> expectedRecords = List.of(
-
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedMember),
-
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
Map.of(
- fooTopicName, new TopicMetadata(fooTopicId, fooTopicName,
2),
- barTopicName, new TopicMetadata(barTopicId, barTopicName,
1)
- )),
-
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11),
+ assertUnorderedRecordsEquals(
+ List.of(
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedMember)),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
Map.of(
+ fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 2),
+ barTopicName, new TopicMetadata(barTopicId,
barTopicName, 1)
+ ))),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11)),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId, assignor.targetPartitions(memberId)),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
newMemberId, assignor.targetPartitions(newMemberId)),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
11),
+ List.of(
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId, assignor.targetPartitions(memberId)),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
newMemberId, assignor.targetPartitions(newMemberId))
+ ),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
11)),
-
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember)
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember))
+ ),
+ secondJoinResult.records
);
- assertRecordsEquals(expectedRecords.subList(0, 3),
secondJoinResult.records.subList(0, 3));
- assertUnorderedListEquals(expectedRecords.subList(3, 5),
secondJoinResult.records.subList(3, 5));
- assertRecordsEquals(expectedRecords.subList(5, 7),
secondJoinResult.records.subList(5, 7));
secondJoinResult.appendFuture.complete(null);
assertTrue(secondJoinResult.joinFuture.isDone());
@@ -12268,21 +12267,23 @@ public class GroupMetadataManagerTest {
)
.build();
- List<CoordinatorRecord> expectedRecords = List.of(
-
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedMember),
- GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
11),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId1, mkAssignment(
- mkTopicAssignment(fooTopicId, 0),
- mkTopicAssignment(zarTopicId, 0))),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId2, mkAssignment(
- mkTopicAssignment(barTopicId, 0),
- mkTopicAssignment(fooTopicId, 1))),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
11),
-
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember)
+ assertUnorderedRecordsEquals(
+ List.of(
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedMember)),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11)),
+ List.of(
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId1, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0),
+ mkTopicAssignment(zarTopicId, 0))),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId2, mkAssignment(
+ mkTopicAssignment(barTopicId, 0),
+ mkTopicAssignment(fooTopicId, 1)))
+ ),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
11)),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember))
+ ),
+ joinResult.records
);
- assertRecordsEquals(expectedRecords.subList(0, 2),
joinResult.records.subList(0, 2));
- assertUnorderedListEquals(expectedRecords.subList(2, 4),
joinResult.records.subList(2, 4));
- assertRecordsEquals(expectedRecords.subList(4, 6),
joinResult.records.subList(4, 6));
joinResult.appendFuture.complete(null);
assertEquals(
@@ -12492,28 +12493,29 @@ public class GroupMetadataManagerTest {
)
.build();
- List<CoordinatorRecord> expectedRecords1 = List.of(
-
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedMember1),
-
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
Map.of(
- fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2),
- barTopicName, new TopicMetadata(barTopicId, barTopicName, 1),
- zarTopicName, new TopicMetadata(zarTopicId, zarTopicName, 1)
- )),
- GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
11),
+ assertUnorderedRecordsEquals(
+ List.of(
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedMember1)),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
Map.of(
+ fooTopicName, new TopicMetadata(fooTopicId, fooTopicName,
2),
+ barTopicName, new TopicMetadata(barTopicId, barTopicName,
1),
+ zarTopicName, new TopicMetadata(zarTopicId, zarTopicName,
1)
+ ))),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11)),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId1, mkAssignment(
- mkTopicAssignment(fooTopicId, 0, 1),
- mkTopicAssignment(zarTopicId, 0))),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId2, mkAssignment(
- mkTopicAssignment(barTopicId, 0))),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
11),
+ List.of(
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId1, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1),
+ mkTopicAssignment(zarTopicId, 0))),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId2, mkAssignment(
+ mkTopicAssignment(barTopicId, 0)))
+ ),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
11)),
-
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember1)
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember1))
+ ),
+ joinResult1.records
);
- assertEquals(expectedRecords1.size(), joinResult1.records.size());
- assertRecordsEquals(expectedRecords1.subList(0, 3),
joinResult1.records.subList(0, 3));
- assertUnorderedListEquals(expectedRecords1.subList(3, 5),
joinResult1.records.subList(3, 5));
- assertRecordsEquals(expectedRecords1.subList(5, 7),
joinResult1.records.subList(5, 7));
assertEquals(expectedMember1.state(),
group.getOrMaybeCreateMember(memberId1, false).state());
@@ -12726,28 +12728,29 @@ public class GroupMetadataManagerTest {
)
.build();
- List<CoordinatorRecord> expectedRecords1 = List.of(
-
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedMember1),
-
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
Map.of(
- fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2),
- barTopicName, new TopicMetadata(barTopicId, barTopicName, 1),
- zarTopicName, new TopicMetadata(zarTopicId, zarTopicName, 1)
- )),
- GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
11),
+ assertUnorderedRecordsEquals(
+ List.of(
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedMember1)),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
Map.of(
+ fooTopicName, new TopicMetadata(fooTopicId, fooTopicName,
2),
+ barTopicName, new TopicMetadata(barTopicId, barTopicName,
1),
+ zarTopicName, new TopicMetadata(zarTopicId, zarTopicName,
1)
+ ))),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11)),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId1, mkAssignment(
- mkTopicAssignment(fooTopicId, 0, 1),
- mkTopicAssignment(zarTopicId, 0))),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId2, mkAssignment(
- mkTopicAssignment(barTopicId, 0))),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
11),
+ List.of(
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId1, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1),
+ mkTopicAssignment(zarTopicId, 0))),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId2, mkAssignment(
+ mkTopicAssignment(barTopicId, 0)))
+ ),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
11)),
-
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember1)
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember1))
+ ),
+ joinResult1.records
);
- assertEquals(expectedRecords1.size(), joinResult1.records.size());
- assertRecordsEquals(expectedRecords1.subList(0, 3),
joinResult1.records.subList(0, 3));
- assertUnorderedListEquals(expectedRecords1.subList(3, 5),
joinResult1.records.subList(3, 5));
- assertRecordsEquals(expectedRecords1.subList(5, 7),
joinResult1.records.subList(5, 7));
assertEquals(expectedMember1.state(),
group.getOrMaybeCreateMember(memberId1, false).state());
@@ -15686,38 +15689,40 @@ public class GroupMetadataManagerTest {
MockCoordinatorExecutor.ExecutorResult<CoordinatorRecord> task =
tasks.get(0);
assertEquals(groupId + "-regex", task.key);
- List<CoordinatorRecord> expectedRecords = List.of(
-
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionRecord(
- groupId,
- "foo*",
- new ResolvedRegularExpression(
- Set.of(fooTopicName, foooTopicName),
- 2L,
- context.time.milliseconds()
- )
- ),
-
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionRecord(
- groupId,
- "bar*",
- new ResolvedRegularExpression(
- Set.of(barTopicName),
- 2L,
- context.time.milliseconds()
- )
- ),
-
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(
- groupId,
- Map.of(
- fooTopicName, new TopicMetadata(fooTopicId, fooTopicName,
6),
- barTopicName, new TopicMetadata(barTopicId, barTopicName,
3),
- foooTopicName, new TopicMetadata(foooTopicId,
foooTopicName, 1)
- )
+ assertUnorderedRecordsEquals(
+ List.of(
+ List.of(
+
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionRecord(
+ groupId,
+ "foo*",
+ new ResolvedRegularExpression(
+ Set.of(fooTopicName, foooTopicName),
+ 2L,
+ context.time.milliseconds()
+ )
+ ),
+
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionRecord(
+ groupId,
+ "bar*",
+ new ResolvedRegularExpression(
+ Set.of(barTopicName),
+ 2L,
+ context.time.milliseconds()
+ )
+ )
+ ),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(
+ groupId,
+ Map.of(
+ fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 6),
+ barTopicName, new TopicMetadata(barTopicId,
barTopicName, 3),
+ foooTopicName, new TopicMetadata(foooTopicId,
foooTopicName, 1)
+ )
+ )),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11))
),
- GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
11)
+ task.result.records()
);
-
- assertUnorderedListEquals(expectedRecords.subList(0, 2),
task.result.records().subList(0, 2));
- assertRecordsEquals(expectedRecords.subList(2, 4),
task.result.records().subList(2, 4));
}
@Test
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilderTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilderTest.java
index 03863ea9dca..c267195eeae 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilderTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilderTest.java
@@ -37,7 +37,8 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
-import static
org.apache.kafka.coordinator.group.Assertions.assertUnorderedListEquals;
+import static
org.apache.kafka.coordinator.group.Assertions.assertRecordsEquals;
+import static
org.apache.kafka.coordinator.group.Assertions.assertUnorderedRecordsEquals;
import static
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
import static
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord;
@@ -411,23 +412,27 @@ public class TargetAssignmentBuilderTest {
TargetAssignmentBuilder.TargetAssignmentResult result =
context.build();
- assertEquals(3, result.records().size());
-
- assertUnorderedListEquals(Arrays.asList(
- newConsumerGroupTargetAssignmentRecord("my-group", "member-1",
mkAssignment(
- mkTopicAssignment(fooTopicId, 4, 5, 6),
- mkTopicAssignment(barTopicId, 4, 5, 6)
- )),
- newConsumerGroupTargetAssignmentRecord("my-group", "member-2",
mkAssignment(
- mkTopicAssignment(fooTopicId, 1, 2, 3),
- mkTopicAssignment(barTopicId, 1, 2, 3)
- ))
- ), result.records().subList(0, 2));
-
- assertEquals(newConsumerGroupTargetAssignmentEpochRecord(
- "my-group",
- 20
- ), result.records().get(2));
+ assertUnorderedRecordsEquals(
+ List.of(
+ List.of(
+ newConsumerGroupTargetAssignmentRecord("my-group",
"member-1", mkAssignment(
+ mkTopicAssignment(fooTopicId, 4, 5, 6),
+ mkTopicAssignment(barTopicId, 4, 5, 6)
+ )),
+ newConsumerGroupTargetAssignmentRecord("my-group",
"member-2", mkAssignment(
+ mkTopicAssignment(fooTopicId, 1, 2, 3),
+ mkTopicAssignment(barTopicId, 1, 2, 3)
+ ))
+ ),
+ List.of(
+ newConsumerGroupTargetAssignmentEpochRecord(
+ "my-group",
+ 20
+ )
+ )
+ ),
+ result.records()
+ );
Map<String, MemberAssignment> expectedAssignment = new HashMap<>();
expectedAssignment.put("member-2", new
MemberAssignmentImpl(mkAssignment(
@@ -481,27 +486,31 @@ public class TargetAssignmentBuilderTest {
TargetAssignmentBuilder.TargetAssignmentResult result =
context.build();
- assertEquals(4, result.records().size());
-
- assertUnorderedListEquals(Arrays.asList(
- newConsumerGroupTargetAssignmentRecord("my-group", "member-1",
mkAssignment(
- mkTopicAssignment(fooTopicId, 1, 2),
- mkTopicAssignment(barTopicId, 1, 2)
- )),
- newConsumerGroupTargetAssignmentRecord("my-group", "member-2",
mkAssignment(
- mkTopicAssignment(fooTopicId, 3, 4),
- mkTopicAssignment(barTopicId, 3, 4)
- )),
- newConsumerGroupTargetAssignmentRecord("my-group", "member-3",
mkAssignment(
- mkTopicAssignment(fooTopicId, 5, 6),
- mkTopicAssignment(barTopicId, 5, 6)
- ))
- ), result.records().subList(0, 3));
-
- assertEquals(newConsumerGroupTargetAssignmentEpochRecord(
- "my-group",
- 20
- ), result.records().get(3));
+ assertUnorderedRecordsEquals(
+ List.of(
+ List.of(
+ newConsumerGroupTargetAssignmentRecord("my-group",
"member-1", mkAssignment(
+ mkTopicAssignment(fooTopicId, 1, 2),
+ mkTopicAssignment(barTopicId, 1, 2)
+ )),
+ newConsumerGroupTargetAssignmentRecord("my-group",
"member-2", mkAssignment(
+ mkTopicAssignment(fooTopicId, 3, 4),
+ mkTopicAssignment(barTopicId, 3, 4)
+ )),
+ newConsumerGroupTargetAssignmentRecord("my-group",
"member-3", mkAssignment(
+ mkTopicAssignment(fooTopicId, 5, 6),
+ mkTopicAssignment(barTopicId, 5, 6)
+ ))
+ ),
+ List.of(
+ newConsumerGroupTargetAssignmentEpochRecord(
+ "my-group",
+ 20
+ )
+ )
+ ),
+ result.records()
+ );
Map<String, MemberAssignment> expectedAssignment = new HashMap<>();
expectedAssignment.put("member-1", new
MemberAssignmentImpl(mkAssignment(
@@ -568,27 +577,31 @@ public class TargetAssignmentBuilderTest {
TargetAssignmentBuilder.TargetAssignmentResult result =
context.build();
- assertEquals(4, result.records().size());
-
- assertUnorderedListEquals(Arrays.asList(
- newConsumerGroupTargetAssignmentRecord("my-group", "member-1",
mkAssignment(
- mkTopicAssignment(fooTopicId, 1, 2),
- mkTopicAssignment(barTopicId, 1, 2)
- )),
- newConsumerGroupTargetAssignmentRecord("my-group", "member-2",
mkAssignment(
- mkTopicAssignment(fooTopicId, 3, 4),
- mkTopicAssignment(barTopicId, 3, 4)
- )),
- newConsumerGroupTargetAssignmentRecord("my-group", "member-3",
mkAssignment(
- mkTopicAssignment(fooTopicId, 5, 6),
- mkTopicAssignment(barTopicId, 5, 6)
- ))
- ), result.records().subList(0, 3));
-
- assertEquals(newConsumerGroupTargetAssignmentEpochRecord(
- "my-group",
- 20
- ), result.records().get(3));
+ assertUnorderedRecordsEquals(
+ List.of(
+ List.of(
+ newConsumerGroupTargetAssignmentRecord("my-group",
"member-1", mkAssignment(
+ mkTopicAssignment(fooTopicId, 1, 2),
+ mkTopicAssignment(barTopicId, 1, 2)
+ )),
+ newConsumerGroupTargetAssignmentRecord("my-group",
"member-2", mkAssignment(
+ mkTopicAssignment(fooTopicId, 3, 4),
+ mkTopicAssignment(barTopicId, 3, 4)
+ )),
+ newConsumerGroupTargetAssignmentRecord("my-group",
"member-3", mkAssignment(
+ mkTopicAssignment(fooTopicId, 5, 6),
+ mkTopicAssignment(barTopicId, 5, 6)
+ ))
+ ),
+ List.of(
+ newConsumerGroupTargetAssignmentEpochRecord(
+ "my-group",
+ 20
+ )
+ )
+ ),
+ result.records()
+ );
Map<String, MemberAssignment> expectedAssignment = new HashMap<>();
expectedAssignment.put("member-1", new
MemberAssignmentImpl(mkAssignment(
@@ -649,24 +662,28 @@ public class TargetAssignmentBuilderTest {
TargetAssignmentBuilder.TargetAssignmentResult result =
context.build();
- assertEquals(3, result.records().size());
-
- // Member 1 has no record because its assignment did not change.
- assertUnorderedListEquals(Arrays.asList(
- newConsumerGroupTargetAssignmentRecord("my-group", "member-2",
mkAssignment(
- mkTopicAssignment(fooTopicId, 3, 4, 5),
- mkTopicAssignment(barTopicId, 3, 4, 5)
- )),
- newConsumerGroupTargetAssignmentRecord("my-group", "member-3",
mkAssignment(
- mkTopicAssignment(fooTopicId, 6),
- mkTopicAssignment(barTopicId, 6)
- ))
- ), result.records().subList(0, 2));
-
- assertEquals(newConsumerGroupTargetAssignmentEpochRecord(
- "my-group",
- 20
- ), result.records().get(2));
+ assertUnorderedRecordsEquals(
+ List.of(
+ List.of(
+ // Member 1 has no record because its assignment did not
change.
+ newConsumerGroupTargetAssignmentRecord("my-group",
"member-2", mkAssignment(
+ mkTopicAssignment(fooTopicId, 3, 4, 5),
+ mkTopicAssignment(barTopicId, 3, 4, 5)
+ )),
+ newConsumerGroupTargetAssignmentRecord("my-group",
"member-3", mkAssignment(
+ mkTopicAssignment(fooTopicId, 6),
+ mkTopicAssignment(barTopicId, 6)
+ ))
+ ),
+ List.of(
+ newConsumerGroupTargetAssignmentEpochRecord(
+ "my-group",
+ 20
+ )
+ )
+ ),
+ result.records()
+ );
Map<String, MemberAssignment> expectedAssignment = new HashMap<>();
expectedAssignment.put("member-1", new
MemberAssignmentImpl(mkAssignment(
@@ -724,23 +741,27 @@ public class TargetAssignmentBuilderTest {
TargetAssignmentBuilder.TargetAssignmentResult result =
context.build();
- assertEquals(3, result.records().size());
-
- assertUnorderedListEquals(Arrays.asList(
- newConsumerGroupTargetAssignmentRecord("my-group", "member-1",
mkAssignment(
- mkTopicAssignment(fooTopicId, 1, 2, 3),
- mkTopicAssignment(barTopicId, 1, 2, 3)
- )),
- newConsumerGroupTargetAssignmentRecord("my-group", "member-2",
mkAssignment(
- mkTopicAssignment(fooTopicId, 4, 5, 6),
- mkTopicAssignment(barTopicId, 4, 5, 6)
- ))
- ), result.records().subList(0, 2));
-
- assertEquals(newConsumerGroupTargetAssignmentEpochRecord(
- "my-group",
- 20
- ), result.records().get(2));
+ assertUnorderedRecordsEquals(
+ List.of(
+ List.of(
+ newConsumerGroupTargetAssignmentRecord("my-group",
"member-1", mkAssignment(
+ mkTopicAssignment(fooTopicId, 1, 2, 3),
+ mkTopicAssignment(barTopicId, 1, 2, 3)
+ )),
+ newConsumerGroupTargetAssignmentRecord("my-group",
"member-2", mkAssignment(
+ mkTopicAssignment(fooTopicId, 4, 5, 6),
+ mkTopicAssignment(barTopicId, 4, 5, 6)
+ ))
+ ),
+ List.of(
+ newConsumerGroupTargetAssignmentEpochRecord(
+ "my-group",
+ 20
+ )
+ )
+ ),
+ result.records()
+ );
Map<String, MemberAssignment> expectedAssignment = new HashMap<>();
expectedAssignment.put("member-1", new
MemberAssignmentImpl(mkAssignment(
@@ -803,19 +824,19 @@ public class TargetAssignmentBuilderTest {
TargetAssignmentBuilder.TargetAssignmentResult result =
context.build();
- assertEquals(2, result.records().size());
-
- assertUnorderedListEquals(Collections.singletonList(
- newConsumerGroupTargetAssignmentRecord("my-group", "member-3-a",
mkAssignment(
- mkTopicAssignment(fooTopicId, 5, 6),
- mkTopicAssignment(barTopicId, 5, 6)
- ))
- ), result.records().subList(0, 1));
-
- assertEquals(newConsumerGroupTargetAssignmentEpochRecord(
- "my-group",
- 20
- ), result.records().get(1));
+ assertRecordsEquals(
+ List.of(
+ newConsumerGroupTargetAssignmentRecord("my-group",
"member-3-a", mkAssignment(
+ mkTopicAssignment(fooTopicId, 5, 6),
+ mkTopicAssignment(barTopicId, 5, 6)
+ )),
+ newConsumerGroupTargetAssignmentEpochRecord(
+ "my-group",
+ 20
+ )
+ ),
+ result.records()
+ );
Map<String, MemberAssignment> expectedAssignment = new HashMap<>();
expectedAssignment.put("member-1", new
MemberAssignmentImpl(mkAssignment(
@@ -873,26 +894,30 @@ public class TargetAssignmentBuilderTest {
TargetAssignmentBuilder.TargetAssignmentResult result =
context.build();
- assertEquals(4, result.records().size());
-
- assertUnorderedListEquals(Arrays.asList(
- newConsumerGroupTargetAssignmentRecord("my-group", "member-1",
mkAssignment(
- mkTopicAssignment(fooTopicId, 1, 2),
- mkTopicAssignment(barTopicId, 1, 2, 3)
- )),
- newConsumerGroupTargetAssignmentRecord("my-group", "member-2",
mkAssignment(
- mkTopicAssignment(fooTopicId, 3, 4),
- mkTopicAssignment(barTopicId, 4, 5, 6)
- )),
- newConsumerGroupTargetAssignmentRecord("my-group", "member-3",
mkAssignment(
- mkTopicAssignment(fooTopicId, 5, 6)
- ))
- ), result.records().subList(0, 3));
-
- assertEquals(newConsumerGroupTargetAssignmentEpochRecord(
- "my-group",
- 20
- ), result.records().get(3));
+ assertUnorderedRecordsEquals(
+ List.of(
+ List.of(
+ newConsumerGroupTargetAssignmentRecord("my-group",
"member-1", mkAssignment(
+ mkTopicAssignment(fooTopicId, 1, 2),
+ mkTopicAssignment(barTopicId, 1, 2, 3)
+ )),
+ newConsumerGroupTargetAssignmentRecord("my-group",
"member-2", mkAssignment(
+ mkTopicAssignment(fooTopicId, 3, 4),
+ mkTopicAssignment(barTopicId, 4, 5, 6)
+ )),
+ newConsumerGroupTargetAssignmentRecord("my-group",
"member-3", mkAssignment(
+ mkTopicAssignment(fooTopicId, 5, 6)
+ ))
+ ),
+ List.of(
+ newConsumerGroupTargetAssignmentEpochRecord(
+ "my-group",
+ 20
+ )
+ )
+ ),
+ result.records()
+ );
Map<String, MemberAssignment> expectedAssignment = new HashMap<>();
expectedAssignment.put("member-1", new
MemberAssignmentImpl(mkAssignment(