lucasbru commented on code in PR #18809:
URL: https://github.com/apache/kafka/pull/18809#discussion_r1952919339
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -15135,6 +15284,229 @@ public void
testReplayConsumerGroupCurrentMemberAssignmentTombstone() {
assertThrows(GroupIdNotFoundException.class, () ->
context.groupMetadataManager.consumerGroup("bar"));
}
+ @Test
+ public void testReplayStreamsGroupMemberMetadata() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ StreamsGroupMember member = new StreamsGroupMember.Builder("member")
+ .setClientId("clientid")
+ .setClientHost("clienthost")
+ .setRackId("rackid")
+ .setInstanceId("instanceid")
+ .setRebalanceTimeoutMs(1000)
+ .setTopologyEpoch(10)
+ .setProcessId("processid")
+ .setUserEndpoint(new Endpoint().setHost("localhost").setPort(9999))
+ .setClientTags(Collections.singletonMap("key", "value"))
+ .build();
+
+ // The group and the member are created if they do not exist.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord("foo",
member));
+ assertEquals(member,
context.groupMetadataManager.streamsGroup("foo").getOrMaybeCreateMember("member",
false));
+ }
+
+ @Test
+ public void testReplayStreamsGroupMemberMetadataTombstone() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ // The group still exists but the member is already gone. Replaying the
+ // StreamsGroupMemberMetadata tombstone should be a no-op.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo",
10));
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord("foo",
"m1"));
+ assertThrows(UnknownMemberIdException.class, () ->
context.groupMetadataManager.streamsGroup("foo").getOrMaybeCreateMember("m1",
false));
+
+ // The group may not exist at all. Replaying the
StreamsGroupMemberMetadata tombstone
+ // should be a no-op.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord("bar",
"m1"));
+ assertThrows(GroupIdNotFoundException.class, () ->
context.groupMetadataManager.streamsGroup("bar"));
+ }
+
+ @Test
+ public void testReplayStreamsGroupMetadata() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ // The group is created if it does not exist.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo",
10));
+ assertEquals(10,
context.groupMetadataManager.streamsGroup("foo").groupEpoch());
+ }
+
+ @Test
+ public void testReplayStreamsGroupMetadataTombstone() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ // The group may not exist at all. Replaying the StreamsGroupMetadata
tombstone
+ // should be a no-op.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochTombstoneRecord("foo"));
+ assertThrows(GroupIdNotFoundException.class, () ->
context.groupMetadataManager.streamsGroup("foo"));
+ }
+
+ @Test
+ public void testReplayStreamsGroupPartitionMetadata() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata>
metadata = Map.of(
+ "bar",
+ new
org.apache.kafka.coordinator.group.streams.TopicMetadata(Uuid.randomUuid(),
"bar", 10)
+ );
+
+ // The group is created if it does not exist.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord("foo",
metadata));
+ assertEquals(metadata,
context.groupMetadataManager.streamsGroup("foo").partitionMetadata());
+ }
+
+ @Test
+ public void testReplayStreamsGroupPartitionMetadataTombstone() {
Review Comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]