lucasbru commented on code in PR #18809:
URL: https://github.com/apache/kafka/pull/18809#discussion_r1952918550
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -15135,6 +15284,229 @@ public void
testReplayConsumerGroupCurrentMemberAssignmentTombstone() {
assertThrows(GroupIdNotFoundException.class, () ->
context.groupMetadataManager.consumerGroup("bar"));
}
+ @Test
+ public void testReplayStreamsGroupMemberMetadata() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ StreamsGroupMember member = new StreamsGroupMember.Builder("member")
+ .setClientId("clientid")
+ .setClientHost("clienthost")
+ .setRackId("rackid")
+ .setInstanceId("instanceid")
+ .setRebalanceTimeoutMs(1000)
+ .setTopologyEpoch(10)
+ .setProcessId("processid")
+ .setUserEndpoint(new Endpoint().setHost("localhost").setPort(9999))
+ .setClientTags(Collections.singletonMap("key", "value"))
+ .build();
+
+ // The group and the member are created if they do not exist.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord("foo",
member));
+ assertEquals(member,
context.groupMetadataManager.streamsGroup("foo").getOrMaybeCreateMember("member",
false));
+ }
+
+ @Test
+ public void testReplayStreamsGroupMemberMetadataTombstone() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ // The group still exists but the member is already gone. Replaying the
+ // StreamsGroupMemberMetadata tombstone should be a no-op.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo",
10));
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord("foo",
"m1"));
+ assertThrows(UnknownMemberIdException.class, () ->
context.groupMetadataManager.streamsGroup("foo").getOrMaybeCreateMember("m1",
false));
+
+ // The group may not exist at all. Replaying the
StreamsGroupMemberMetadata tombstone
+ // should be a no-op.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord("bar",
"m1"));
+ assertThrows(GroupIdNotFoundException.class, () ->
context.groupMetadataManager.streamsGroup("bar"));
+ }
+
+ @Test
+ public void testReplayStreamsGroupMetadata() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ // The group is created if it does not exist.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo",
10));
+ assertEquals(10,
context.groupMetadataManager.streamsGroup("foo").groupEpoch());
+ }
+
+ @Test
+ public void testReplayStreamsGroupMetadataTombstone() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ // The group may not exist at all. Replaying the StreamsGroupMetadata
tombstone
+ // should be a no-op.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochTombstoneRecord("foo"));
+ assertThrows(GroupIdNotFoundException.class, () ->
context.groupMetadataManager.streamsGroup("foo"));
+ }
+
+ @Test
+ public void testReplayStreamsGroupPartitionMetadata() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata>
metadata = Map.of(
+ "bar",
+ new
org.apache.kafka.coordinator.group.streams.TopicMetadata(Uuid.randomUuid(),
"bar", 10)
+ );
+
+ // The group is created if it does not exist.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord("foo",
metadata));
+ assertEquals(metadata,
context.groupMetadataManager.streamsGroup("foo").partitionMetadata());
+ }
+
+ @Test
+ public void testReplayStreamsGroupPartitionMetadataTombstone() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ // The group may not exist at all. Replaying the
StreamsGroupPartitionMetadata tombstone
+ // should be a no-op.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataTombstoneRecord("foo"));
+ assertThrows(GroupIdNotFoundException.class, () ->
context.groupMetadataManager.streamsGroup("foo"));
+ }
+
+ @Test
+ public void testReplayStreamsGroupTargetAssignmentMember() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ // The group is created if it does not exist.
+ final TasksTuple tasks =
+ new TasksTuple(
+ TaskAssignmentTestUtil.mkTasksPerSubtopology(
+ TaskAssignmentTestUtil.mkTasks("subtopology-1", 0, 1, 2)),
+ TaskAssignmentTestUtil.mkTasksPerSubtopology(
+ TaskAssignmentTestUtil.mkTasks("subtopology-1", 3, 4, 5)),
+ TaskAssignmentTestUtil.mkTasksPerSubtopology(
+ TaskAssignmentTestUtil.mkTasks("subtopology-1", 6, 7, 8))
+ );
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord("foo",
"m1", tasks));
+ assertEquals(tasks.activeTasks(),
context.groupMetadataManager.streamsGroup("foo").targetAssignment("m1").activeTasks());
+ assertEquals(tasks.standbyTasks(),
context.groupMetadataManager.streamsGroup("foo").targetAssignment("m1").standbyTasks());
+ assertEquals(tasks.warmupTasks(),
context.groupMetadataManager.streamsGroup("foo").targetAssignment("m1").warmupTasks());
+ }
+
+ @Test
+ public void testReplayStreamsGroupTargetAssignmentMemberTombstone() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ // The group may not exist at all. Replaying the
StreamsGroupTargetAssignmentMember tombstone
+ // should be a no-op.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord("foo",
"m1"));
+ assertThrows(GroupIdNotFoundException.class, () ->
context.groupMetadataManager.streamsGroup("foo"));
+ }
+
+ @Test
+ public void testReplayStreamsGroupTargetAssignmentMetadata() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ // The group is created if it does not exist.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord("foo",
10));
+ assertEquals(10,
context.groupMetadataManager.streamsGroup("foo").assignmentEpoch());
+ }
+
+ @Test
+ public void testReplayStreamsGroupTargetAssignmentMetadataTombstone() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ // The group may not exist at all. Replaying the
StreamsGroupTargetAssignmentMetadata tombstone
+ // should be a no-op.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochTombstoneRecord("foo"));
+ assertThrows(GroupIdNotFoundException.class, () ->
context.groupMetadataManager.streamsGroup("foo"));
+ }
+
+ @Test
+ public void testReplayStreamsGroupCurrentMemberAssignment() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ StreamsGroupMember member = new StreamsGroupMember.Builder("member")
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(9)
+
.setState(org.apache.kafka.coordinator.group.streams.MemberState.UNRELEASED_TASKS)
+ .setAssignedTasks(new TasksTuple(
+
TaskAssignmentTestUtil.mkTasksPerSubtopology(TaskAssignmentTestUtil.mkTasks("subtopology-1",
0, 1, 2)),
+
TaskAssignmentTestUtil.mkTasksPerSubtopology(TaskAssignmentTestUtil.mkTasks("subtopology-1",
3, 4, 5)),
+
TaskAssignmentTestUtil.mkTasksPerSubtopology(TaskAssignmentTestUtil.mkTasks("subtopology-1",
6, 7, 8))
+ ))
+ .setTasksPendingRevocation(TasksTuple.EMPTY)
+ .build();
+
+ // The group and the member are created if they do not exist.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord("bar",
member));
+ assertEquals(member,
context.groupMetadataManager.streamsGroup("bar").getOrMaybeCreateMember("member",
false));
+ }
+
+ @Test
+ public void testReplayStreamsGroupCurrentMemberAssignmentTombstone() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ // The group still exists, but the member is already gone. Replaying
the
+ // StreamsGroupCurrentMemberAssignment tombstone should be a no-op.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo",
10));
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord("foo",
"m1"));
+ assertThrows(UnknownMemberIdException.class, () ->
context.groupMetadataManager.streamsGroup("foo").getOrMaybeCreateMember("m1",
false));
+
+ // The group may not exist at all. Replaying the
StreamsGroupCurrentMemberAssignment tombstone
+ // should be a no-op.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord("bar",
"m1"));
+ assertThrows(GroupIdNotFoundException.class, () ->
context.groupMetadataManager.streamsGroup("bar"));
+ }
+
+ @Test
+ public void testReplayStreamsGroupTopology() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ StreamsGroupTopologyValue topology = new StreamsGroupTopologyValue()
+ .setEpoch(12)
+ .setSubtopologies(
+ List.of(
+ new StreamsGroupTopologyValue.Subtopology()
+ .setSubtopologyId("subtopology-1")
+ .setSourceTopics(List.of("source-topic"))
+ .setRepartitionSinkTopics(List.of("sink-topic"))
+ )
+ );
+
+ // The group and the topology are created if they do not exist.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord("bar",
topology));
+ final Optional<StreamsTopology> actualTopology =
context.groupMetadataManager.streamsGroup("bar").topology();
+ assertTrue(actualTopology.isPresent(), "topology should be set");
+ assertEquals(topology.epoch(), actualTopology.get().topologyEpoch());
+ assertEquals(topology.subtopologies().size(),
actualTopology.get().subtopologies().size());
+ assertEquals(
+ topology.subtopologies().iterator().next(),
+ actualTopology.get().subtopologies().values().iterator().next()
+ );
+ }
+
+ @Test
+ public void testReplayStreamsGroupTopologyTombstone() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
Review Comment:
Dome
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]