cadonna commented on code in PR #18809:
URL: https://github.com/apache/kafka/pull/18809#discussion_r1952406373
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3138,6 +3274,45 @@ private void replaceMember(
));
}
+ /**
+ * Fences a member from a streams group and maybe downgrade the streams
group to a classic group.
+ *
+ * @param group The group.
+ * @param member The member.
+ * @param response The response of the CoordinatorResult.
+ *
+ * @return The CoordinatorResult to be applied.
+ */
+ private <T> CoordinatorResult<T, CoordinatorRecord>
streamsGroupFenceMember(
+ StreamsGroup group,
+ StreamsGroupMember member,
+ T response
+ ) {
+ List<CoordinatorRecord> records = new ArrayList<>();
+ removeStreamsMember(records, group.groupId(), member.memberId());
+
+ // We bump the group epoch.
+ int groupEpoch = group.groupEpoch() + 1;
+ records.add(newStreamsGroupEpochRecord(group.groupId(), groupEpoch));
+
+ cancelTimers(group.groupId(), member.memberId());
+
+ return new CoordinatorResult<>(records, response);
+ }
+
+ /**
+ * Write tombstones for the member. The order matters here.
+ *
+ * @param records The list of records to append the member
assignment tombstone records.
+ * @param groupId The group id.
+ * @param memberId The member id.
+ */
+ private void removeStreamsMember(List<CoordinatorRecord> records, String
groupId, String memberId) {
Review Comment:
nit: Why not return a list of records?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3329,6 +3550,27 @@ private void scheduleShareGroupSessionTimeout(
);
}
+ /**
+ * Schedules (or reschedules) the session timeout for the member.
+ *
+ * @param groupId The group id.
+ * @param memberId The member id.
+ * @param sessionTimeoutMs The session timeout.
+ */
+ private void scheduleStreamsGroupSessionTimeout(
Review Comment:
This is only used in `scheduleStreamsGroupSessionTimeout(groupId,
memberId)`. Will this be used somewhere else in future? Why not calling
`timer.schedule()` directly in `scheduleStreamsGroupSessionTimeout(groupId,
memberId)`?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java:
##########
@@ -830,6 +844,385 @@ public void
testReplayConsumerGroupCurrentMemberAssignmentWithNullValue() {
verify(groupMetadataManager, times(1)).replay(key, null);
}
+ @Test
+ public void testReplayStreamsGroupMetadata() {
+ GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
+ OffsetMetadataManager offsetMetadataManager =
mock(OffsetMetadataManager.class);
+ CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
+ CoordinatorMetricsShard metricsShard =
mock(CoordinatorMetricsShard.class);
+ GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+ new LogContext(),
+ groupMetadataManager,
+ offsetMetadataManager,
+ Time.SYSTEM,
+ new MockCoordinatorTimer<>(Time.SYSTEM),
+ mock(GroupCoordinatorConfig.class),
+ coordinatorMetrics,
+ metricsShard
+ );
+
+ StreamsGroupMetadataKey key = new StreamsGroupMetadataKey();
+ StreamsGroupMetadataValue value = new StreamsGroupMetadataValue();
+
+ coordinator.replay(0L, RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, CoordinatorRecord.record(
+ key,
+ new ApiMessageAndVersion(value, (short) 0)
+ ));
+
+ verify(groupMetadataManager, times(1)).replay(key, value);
Review Comment:
You could drop `times(1)` since it is the default. I am fine either way.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3385,6 +3627,49 @@ private void scheduleConsumerGroupRebalanceTimeout(
});
}
+ /**
+ * Schedules a rebalance timeout for the member.
+ *
+ * @param groupId The group id.
+ * @param memberId The member id.
+ * @param memberEpoch The member epoch.
+ * @param rebalanceTimeoutMs The rebalance timeout.
+ */
+ private void scheduleStreamsGroupRebalanceTimeout(
+ String groupId,
+ String memberId,
+ int memberEpoch,
+ int rebalanceTimeoutMs
+ ) {
+ String key = streamsGroupRebalanceTimeoutKey(groupId, memberId);
+ timer.schedule(key, rebalanceTimeoutMs, TimeUnit.MILLISECONDS, true,
() -> {
+ try {
+ StreamsGroup group = streamsGroup(groupId);
+ StreamsGroupMember member =
group.getOrMaybeCreateMember(memberId, false);
+
+ if (member.memberEpoch() == memberEpoch) {
+ log.info("[GroupId {}] Member {} fenced from the group
because " +
+ "it failed to transition from epoch {} within
{}ms.",
+ groupId, memberId, memberEpoch, rebalanceTimeoutMs);
+
+ return streamsGroupFenceMember(group, member, null);
+ } else {
+ log.debug("[GroupId {}] Ignoring rebalance timeout for {}
because the member " +
+ "left the epoch {}.", groupId, memberId, memberEpoch);
Review Comment:
What does "leave the epoch" mean?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -15135,6 +15284,229 @@ public void
testReplayConsumerGroupCurrentMemberAssignmentTombstone() {
assertThrows(GroupIdNotFoundException.class, () ->
context.groupMetadataManager.consumerGroup("bar"));
}
+ @Test
+ public void testReplayStreamsGroupMemberMetadata() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ StreamsGroupMember member = new StreamsGroupMember.Builder("member")
+ .setClientId("clientid")
+ .setClientHost("clienthost")
+ .setRackId("rackid")
+ .setInstanceId("instanceid")
+ .setRebalanceTimeoutMs(1000)
+ .setTopologyEpoch(10)
+ .setProcessId("processid")
+ .setUserEndpoint(new Endpoint().setHost("localhost").setPort(9999))
+ .setClientTags(Collections.singletonMap("key", "value"))
+ .build();
+
+ // The group and the member are created if they do not exist.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord("foo",
member));
+ assertEquals(member,
context.groupMetadataManager.streamsGroup("foo").getOrMaybeCreateMember("member",
false));
+ }
+
+ @Test
+ public void testReplayStreamsGroupMemberMetadataTombstone() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ // The group still exists but the member is already gone. Replaying the
+ // StreamsGroupMemberMetadata tombstone should be a no-op.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo",
10));
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord("foo",
"m1"));
+ assertThrows(UnknownMemberIdException.class, () ->
context.groupMetadataManager.streamsGroup("foo").getOrMaybeCreateMember("m1",
false));
+
+ // The group may not exist at all. Replaying the
StreamsGroupMemberMetadata tombstone
+ // should be a no-op.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord("bar",
"m1"));
+ assertThrows(GroupIdNotFoundException.class, () ->
context.groupMetadataManager.streamsGroup("bar"));
+ }
+
+ @Test
+ public void testReplayStreamsGroupMetadata() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ // The group is created if it does not exist.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo",
10));
+ assertEquals(10,
context.groupMetadataManager.streamsGroup("foo").groupEpoch());
+ }
+
+ @Test
+ public void testReplayStreamsGroupMetadataTombstone() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ // The group may not exist at all. Replaying the StreamsGroupMetadata
tombstone
+ // should be a no-op.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochTombstoneRecord("foo"));
+ assertThrows(GroupIdNotFoundException.class, () ->
context.groupMetadataManager.streamsGroup("foo"));
+ }
+
+ @Test
+ public void testReplayStreamsGroupPartitionMetadata() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata>
metadata = Map.of(
+ "bar",
+ new
org.apache.kafka.coordinator.group.streams.TopicMetadata(Uuid.randomUuid(),
"bar", 10)
+ );
+
+ // The group is created if it does not exist.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord("foo",
metadata));
+ assertEquals(metadata,
context.groupMetadataManager.streamsGroup("foo").partitionMetadata());
+ }
+
+ @Test
+ public void testReplayStreamsGroupPartitionMetadataTombstone() {
Review Comment:
Could you please also add a test that removes an existing group?
What about all the `IllegalStateException` that are thrown in the method
under test?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3138,6 +3274,45 @@ private void replaceMember(
));
}
+ /**
+ * Fences a member from a streams group and maybe downgrade the streams
group to a classic group.
Review Comment:
Is the classic part true for Streams?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -15135,6 +15284,229 @@ public void
testReplayConsumerGroupCurrentMemberAssignmentTombstone() {
assertThrows(GroupIdNotFoundException.class, () ->
context.groupMetadataManager.consumerGroup("bar"));
}
+ @Test
+ public void testReplayStreamsGroupMemberMetadata() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ StreamsGroupMember member = new StreamsGroupMember.Builder("member")
+ .setClientId("clientid")
+ .setClientHost("clienthost")
+ .setRackId("rackid")
+ .setInstanceId("instanceid")
+ .setRebalanceTimeoutMs(1000)
+ .setTopologyEpoch(10)
+ .setProcessId("processid")
+ .setUserEndpoint(new Endpoint().setHost("localhost").setPort(9999))
+ .setClientTags(Collections.singletonMap("key", "value"))
+ .build();
+
+ // The group and the member are created if they do not exist.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord("foo",
member));
+ assertEquals(member,
context.groupMetadataManager.streamsGroup("foo").getOrMaybeCreateMember("member",
false));
+ }
+
+ @Test
+ public void testReplayStreamsGroupMemberMetadataTombstone() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ // The group still exists but the member is already gone. Replaying the
+ // StreamsGroupMemberMetadata tombstone should be a no-op.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo",
10));
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord("foo",
"m1"));
+ assertThrows(UnknownMemberIdException.class, () ->
context.groupMetadataManager.streamsGroup("foo").getOrMaybeCreateMember("m1",
false));
+
+ // The group may not exist at all. Replaying the
StreamsGroupMemberMetadata tombstone
+ // should be a no-op.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord("bar",
"m1"));
+ assertThrows(GroupIdNotFoundException.class, () ->
context.groupMetadataManager.streamsGroup("bar"));
+ }
+
+ @Test
+ public void testReplayStreamsGroupMetadata() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ // The group is created if it does not exist.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo",
10));
+ assertEquals(10,
context.groupMetadataManager.streamsGroup("foo").groupEpoch());
+ }
+
+ @Test
+ public void testReplayStreamsGroupMetadataTombstone() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ // The group may not exist at all. Replaying the StreamsGroupMetadata
tombstone
+ // should be a no-op.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochTombstoneRecord("foo"));
+ assertThrows(GroupIdNotFoundException.class, () ->
context.groupMetadataManager.streamsGroup("foo"));
+ }
+
+ @Test
+ public void testReplayStreamsGroupPartitionMetadata() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata>
metadata = Map.of(
+ "bar",
+ new
org.apache.kafka.coordinator.group.streams.TopicMetadata(Uuid.randomUuid(),
"bar", 10)
+ );
+
+ // The group is created if it does not exist.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord("foo",
metadata));
+ assertEquals(metadata,
context.groupMetadataManager.streamsGroup("foo").partitionMetadata());
+ }
+
+ @Test
+ public void testReplayStreamsGroupPartitionMetadataTombstone() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ // The group may not exist at all. Replaying the
StreamsGroupPartitionMetadata tombstone
+ // should be a no-op.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataTombstoneRecord("foo"));
+ assertThrows(GroupIdNotFoundException.class, () ->
context.groupMetadataManager.streamsGroup("foo"));
+ }
+
+ @Test
+ public void testReplayStreamsGroupTargetAssignmentMember() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ // The group is created if it does not exist.
+ final TasksTuple tasks =
+ new TasksTuple(
+ TaskAssignmentTestUtil.mkTasksPerSubtopology(
+ TaskAssignmentTestUtil.mkTasks("subtopology-1", 0, 1, 2)),
+ TaskAssignmentTestUtil.mkTasksPerSubtopology(
+ TaskAssignmentTestUtil.mkTasks("subtopology-1", 3, 4, 5)),
+ TaskAssignmentTestUtil.mkTasksPerSubtopology(
+ TaskAssignmentTestUtil.mkTasks("subtopology-1", 6, 7, 8))
+ );
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord("foo",
"m1", tasks));
+ assertEquals(tasks.activeTasks(),
context.groupMetadataManager.streamsGroup("foo").targetAssignment("m1").activeTasks());
+ assertEquals(tasks.standbyTasks(),
context.groupMetadataManager.streamsGroup("foo").targetAssignment("m1").standbyTasks());
+ assertEquals(tasks.warmupTasks(),
context.groupMetadataManager.streamsGroup("foo").targetAssignment("m1").warmupTasks());
+ }
+
+ @Test
+ public void testReplayStreamsGroupTargetAssignmentMemberTombstone() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ // The group may not exist at all. Replaying the
StreamsGroupTargetAssignmentMember tombstone
+ // should be a no-op.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord("foo",
"m1"));
+ assertThrows(GroupIdNotFoundException.class, () ->
context.groupMetadataManager.streamsGroup("foo"));
+ }
+
+ @Test
+ public void testReplayStreamsGroupTargetAssignmentMetadata() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ // The group is created if it does not exist.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord("foo",
10));
+ assertEquals(10,
context.groupMetadataManager.streamsGroup("foo").assignmentEpoch());
+ }
+
+ @Test
+ public void testReplayStreamsGroupTargetAssignmentMetadataTombstone() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ // The group may not exist at all. Replaying the
StreamsGroupTargetAssignmentMetadata tombstone
+ // should be a no-op.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochTombstoneRecord("foo"));
+ assertThrows(GroupIdNotFoundException.class, () ->
context.groupMetadataManager.streamsGroup("foo"));
+ }
+
+ @Test
+ public void testReplayStreamsGroupCurrentMemberAssignment() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ StreamsGroupMember member = new StreamsGroupMember.Builder("member")
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(9)
+
.setState(org.apache.kafka.coordinator.group.streams.MemberState.UNRELEASED_TASKS)
+ .setAssignedTasks(new TasksTuple(
+
TaskAssignmentTestUtil.mkTasksPerSubtopology(TaskAssignmentTestUtil.mkTasks("subtopology-1",
0, 1, 2)),
+
TaskAssignmentTestUtil.mkTasksPerSubtopology(TaskAssignmentTestUtil.mkTasks("subtopology-1",
3, 4, 5)),
+
TaskAssignmentTestUtil.mkTasksPerSubtopology(TaskAssignmentTestUtil.mkTasks("subtopology-1",
6, 7, 8))
+ ))
+ .setTasksPendingRevocation(TasksTuple.EMPTY)
+ .build();
+
+ // The group and the member are created if they do not exist.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord("bar",
member));
+ assertEquals(member,
context.groupMetadataManager.streamsGroup("bar").getOrMaybeCreateMember("member",
false));
+ }
+
+ @Test
+ public void testReplayStreamsGroupCurrentMemberAssignmentTombstone() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ // The group still exists, but the member is already gone. Replaying
the
+ // StreamsGroupCurrentMemberAssignment tombstone should be a no-op.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo",
10));
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord("foo",
"m1"));
+ assertThrows(UnknownMemberIdException.class, () ->
context.groupMetadataManager.streamsGroup("foo").getOrMaybeCreateMember("m1",
false));
+
+ // The group may not exist at all. Replaying the
StreamsGroupCurrentMemberAssignment tombstone
+ // should be a no-op.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord("bar",
"m1"));
+ assertThrows(GroupIdNotFoundException.class, () ->
context.groupMetadataManager.streamsGroup("bar"));
+ }
+
+ @Test
+ public void testReplayStreamsGroupTopology() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ StreamsGroupTopologyValue topology = new StreamsGroupTopologyValue()
+ .setEpoch(12)
+ .setSubtopologies(
+ List.of(
+ new StreamsGroupTopologyValue.Subtopology()
+ .setSubtopologyId("subtopology-1")
+ .setSourceTopics(List.of("source-topic"))
+ .setRepartitionSinkTopics(List.of("sink-topic"))
+ )
+ );
+
+ // The group and the topology are created if they do not exist.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord("bar",
topology));
+ final Optional<StreamsTopology> actualTopology =
context.groupMetadataManager.streamsGroup("bar").topology();
+ assertTrue(actualTopology.isPresent(), "topology should be set");
+ assertEquals(topology.epoch(), actualTopology.get().topologyEpoch());
+ assertEquals(topology.subtopologies().size(),
actualTopology.get().subtopologies().size());
+ assertEquals(
+ topology.subtopologies().iterator().next(),
+ actualTopology.get().subtopologies().values().iterator().next()
+ );
+ }
+
+ @Test
+ public void testReplayStreamsGroupTopologyTombstone() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
Review Comment:
Could you please add a test for when a group exists?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -760,6 +859,43 @@ ConsumerGroup getOrMaybeCreatePersistedConsumerGroup(
}
}
+ /**
+ * The method should be called on the replay path.
+ * Gets or maybe creates a streams group and updates the groups map if a
new group is created.
+ *
+ * @param groupId The group id.
+ * @param createIfNotExists A boolean indicating whether the group should
be
+ * created if it does not exist.
+ *
+ * @return A StreamsGroup.
+ * @throws GroupIdNotFoundException if the group does not exist and
createIfNotExists is false or
+ * if the group is not a streams group.
+ * @throws IllegalStateException if the group does not have the
expected type.
+ * Package private for testing.
+ */
+ StreamsGroup getOrMaybeCreatePersistedStreamsGroup(
Review Comment:
why is this method package-private? It is not used outside of this class.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -15135,6 +15284,229 @@ public void
testReplayConsumerGroupCurrentMemberAssignmentTombstone() {
assertThrows(GroupIdNotFoundException.class, () ->
context.groupMetadataManager.consumerGroup("bar"));
}
+ @Test
+ public void testReplayStreamsGroupMemberMetadata() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ StreamsGroupMember member = new StreamsGroupMember.Builder("member")
+ .setClientId("clientid")
+ .setClientHost("clienthost")
+ .setRackId("rackid")
+ .setInstanceId("instanceid")
+ .setRebalanceTimeoutMs(1000)
+ .setTopologyEpoch(10)
+ .setProcessId("processid")
+ .setUserEndpoint(new Endpoint().setHost("localhost").setPort(9999))
+ .setClientTags(Collections.singletonMap("key", "value"))
+ .build();
+
+ // The group and the member are created if they do not exist.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord("foo",
member));
+ assertEquals(member,
context.groupMetadataManager.streamsGroup("foo").getOrMaybeCreateMember("member",
false));
+ }
+
+ @Test
+ public void testReplayStreamsGroupMemberMetadataTombstone() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ // The group still exists but the member is already gone. Replaying the
+ // StreamsGroupMemberMetadata tombstone should be a no-op.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo",
10));
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord("foo",
"m1"));
+ assertThrows(UnknownMemberIdException.class, () ->
context.groupMetadataManager.streamsGroup("foo").getOrMaybeCreateMember("m1",
false));
+
+ // The group may not exist at all. Replaying the
StreamsGroupMemberMetadata tombstone
+ // should be a no-op.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord("bar",
"m1"));
+ assertThrows(GroupIdNotFoundException.class, () ->
context.groupMetadataManager.streamsGroup("bar"));
+ }
+
+ @Test
+ public void testReplayStreamsGroupMetadata() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ // The group is created if it does not exist.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo",
10));
+ assertEquals(10,
context.groupMetadataManager.streamsGroup("foo").groupEpoch());
+ }
+
+ @Test
+ public void testReplayStreamsGroupMetadataTombstone() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ // The group may not exist at all. Replaying the StreamsGroupMetadata
tombstone
+ // should be a no-op.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochTombstoneRecord("foo"));
+ assertThrows(GroupIdNotFoundException.class, () ->
context.groupMetadataManager.streamsGroup("foo"));
+ }
+
+ @Test
+ public void testReplayStreamsGroupPartitionMetadata() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata>
metadata = Map.of(
+ "bar",
+ new
org.apache.kafka.coordinator.group.streams.TopicMetadata(Uuid.randomUuid(),
"bar", 10)
+ );
+
+ // The group is created if it does not exist.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord("foo",
metadata));
+ assertEquals(metadata,
context.groupMetadataManager.streamsGroup("foo").partitionMetadata());
+ }
+
+ @Test
+ public void testReplayStreamsGroupPartitionMetadataTombstone() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ // The group may not exist at all. Replaying the
StreamsGroupPartitionMetadata tombstone
+ // should be a no-op.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataTombstoneRecord("foo"));
+ assertThrows(GroupIdNotFoundException.class, () ->
context.groupMetadataManager.streamsGroup("foo"));
+ }
+
+ @Test
+ public void testReplayStreamsGroupTargetAssignmentMember() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ // The group is created if it does not exist.
+ final TasksTuple tasks =
+ new TasksTuple(
+ TaskAssignmentTestUtil.mkTasksPerSubtopology(
+ TaskAssignmentTestUtil.mkTasks("subtopology-1", 0, 1, 2)),
+ TaskAssignmentTestUtil.mkTasksPerSubtopology(
+ TaskAssignmentTestUtil.mkTasks("subtopology-1", 3, 4, 5)),
+ TaskAssignmentTestUtil.mkTasksPerSubtopology(
+ TaskAssignmentTestUtil.mkTasks("subtopology-1", 6, 7, 8))
+ );
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord("foo",
"m1", tasks));
+ assertEquals(tasks.activeTasks(),
context.groupMetadataManager.streamsGroup("foo").targetAssignment("m1").activeTasks());
+ assertEquals(tasks.standbyTasks(),
context.groupMetadataManager.streamsGroup("foo").targetAssignment("m1").standbyTasks());
+ assertEquals(tasks.warmupTasks(),
context.groupMetadataManager.streamsGroup("foo").targetAssignment("m1").warmupTasks());
+ }
+
+ @Test
+ public void testReplayStreamsGroupTargetAssignmentMemberTombstone() {
Review Comment:
Could you please add a test for when a group exists?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java:
##########
@@ -830,6 +844,385 @@ public void
testReplayConsumerGroupCurrentMemberAssignmentWithNullValue() {
verify(groupMetadataManager, times(1)).replay(key, null);
}
+ @Test
+ public void testReplayStreamsGroupMetadata() {
+ GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
+ OffsetMetadataManager offsetMetadataManager =
mock(OffsetMetadataManager.class);
+ CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
+ CoordinatorMetricsShard metricsShard =
mock(CoordinatorMetricsShard.class);
+ GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+ new LogContext(),
+ groupMetadataManager,
+ offsetMetadataManager,
+ Time.SYSTEM,
+ new MockCoordinatorTimer<>(Time.SYSTEM),
+ mock(GroupCoordinatorConfig.class),
+ coordinatorMetrics,
+ metricsShard
+ );
+
Review Comment:
nit: I would delete this line so that the call under test is more easily
identifiable (also in the other tests). Just a proposal. Feel free to ignore if
you want.
```suggestion
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3962,7 +4359,167 @@ public void replay(
}
removeGroup(groupId);
}
+ }
+
+ /**
+ * Replays StreamsGroupMemberMetadataKey/Value to update the hard state of
+ * the streams group.
+ * It updates the subscription part of the member or deletes the member.
+ *
+ * @param key A StreamsGroupMemberMetadataKey key.
+ * @param value A StreamsGroupMemberMetadataValue record.
+ */
+ public void replay(
+ StreamsGroupMemberMetadataKey key,
+ StreamsGroupMemberMetadataValue value
+ ) {
+ String groupId = key.groupId();
+ String memberId = key.memberId();
+
+ StreamsGroup streamsGroup;
+ try {
+ streamsGroup = getOrMaybeCreatePersistedStreamsGroup(groupId,
value != null);
+ } catch (GroupIdNotFoundException ex) {
+ // If the group does not exist and a tombstone is replayed, we can
ignore it.
+ return;
+ }
+
+ if (value != null) {
+ StreamsGroupMember oldMember =
streamsGroup.getOrMaybeCreateMember(memberId, true);
+ streamsGroup.updateMember(new StreamsGroupMember.Builder(oldMember)
+ .updateWith(value)
+ .build());
Review Comment:
I find it confusing that the Streams group object creates a member without
adding it to its members and the member needs to be added with
`updateMember()`. I know that it is done this way for the other groups.
Creating a member outside of the group should not be a concern of the group.
You do not need to change anything. Just a thought that I had for discussion.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -15135,6 +15284,229 @@ public void
testReplayConsumerGroupCurrentMemberAssignmentTombstone() {
assertThrows(GroupIdNotFoundException.class, () ->
context.groupMetadataManager.consumerGroup("bar"));
}
+ @Test
+ public void testReplayStreamsGroupMemberMetadata() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ StreamsGroupMember member = new StreamsGroupMember.Builder("member")
+ .setClientId("clientid")
+ .setClientHost("clienthost")
+ .setRackId("rackid")
+ .setInstanceId("instanceid")
+ .setRebalanceTimeoutMs(1000)
+ .setTopologyEpoch(10)
+ .setProcessId("processid")
+ .setUserEndpoint(new Endpoint().setHost("localhost").setPort(9999))
+ .setClientTags(Collections.singletonMap("key", "value"))
+ .build();
+
+ // The group and the member are created if they do not exist.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord("foo",
member));
+ assertEquals(member,
context.groupMetadataManager.streamsGroup("foo").getOrMaybeCreateMember("member",
false));
+ }
+
+ @Test
+ public void testReplayStreamsGroupMemberMetadataTombstone() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ // The group still exists but the member is already gone. Replaying the
+ // StreamsGroupMemberMetadata tombstone should be a no-op.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo",
10));
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord("foo",
"m1"));
+ assertThrows(UnknownMemberIdException.class, () ->
context.groupMetadataManager.streamsGroup("foo").getOrMaybeCreateMember("m1",
false));
+
+ // The group may not exist at all. Replaying the
StreamsGroupMemberMetadata tombstone
+ // should be a no-op.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord("bar",
"m1"));
+ assertThrows(GroupIdNotFoundException.class, () ->
context.groupMetadataManager.streamsGroup("bar"));
+ }
+
+ @Test
+ public void testReplayStreamsGroupMetadata() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ // The group is created if it does not exist.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo",
10));
+ assertEquals(10,
context.groupMetadataManager.streamsGroup("foo").groupEpoch());
+ }
+
+ @Test
+ public void testReplayStreamsGroupMetadataTombstone() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ // The group may not exist at all. Replaying the StreamsGroupMetadata
tombstone
+ // should be a no-op.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochTombstoneRecord("foo"));
+ assertThrows(GroupIdNotFoundException.class, () ->
context.groupMetadataManager.streamsGroup("foo"));
+ }
+
+ @Test
+ public void testReplayStreamsGroupPartitionMetadata() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata>
metadata = Map.of(
+ "bar",
+ new
org.apache.kafka.coordinator.group.streams.TopicMetadata(Uuid.randomUuid(),
"bar", 10)
+ );
+
+ // The group is created if it does not exist.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord("foo",
metadata));
+ assertEquals(metadata,
context.groupMetadataManager.streamsGroup("foo").partitionMetadata());
+ }
+
+ @Test
+ public void testReplayStreamsGroupPartitionMetadataTombstone() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ // The group may not exist at all. Replaying the
StreamsGroupPartitionMetadata tombstone
+ // should be a no-op.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataTombstoneRecord("foo"));
+ assertThrows(GroupIdNotFoundException.class, () ->
context.groupMetadataManager.streamsGroup("foo"));
+ }
+
+ @Test
+ public void testReplayStreamsGroupTargetAssignmentMember() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ // The group is created if it does not exist.
+ final TasksTuple tasks =
+ new TasksTuple(
+ TaskAssignmentTestUtil.mkTasksPerSubtopology(
+ TaskAssignmentTestUtil.mkTasks("subtopology-1", 0, 1, 2)),
+ TaskAssignmentTestUtil.mkTasksPerSubtopology(
+ TaskAssignmentTestUtil.mkTasks("subtopology-1", 3, 4, 5)),
+ TaskAssignmentTestUtil.mkTasksPerSubtopology(
+ TaskAssignmentTestUtil.mkTasks("subtopology-1", 6, 7, 8))
+ );
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord("foo",
"m1", tasks));
+ assertEquals(tasks.activeTasks(),
context.groupMetadataManager.streamsGroup("foo").targetAssignment("m1").activeTasks());
+ assertEquals(tasks.standbyTasks(),
context.groupMetadataManager.streamsGroup("foo").targetAssignment("m1").standbyTasks());
+ assertEquals(tasks.warmupTasks(),
context.groupMetadataManager.streamsGroup("foo").targetAssignment("m1").warmupTasks());
+ }
+
+ @Test
+ public void testReplayStreamsGroupTargetAssignmentMemberTombstone() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ // The group may not exist at all. Replaying the
StreamsGroupTargetAssignmentMember tombstone
+ // should be a no-op.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord("foo",
"m1"));
+ assertThrows(GroupIdNotFoundException.class, () ->
context.groupMetadataManager.streamsGroup("foo"));
+ }
+
+ @Test
+ public void testReplayStreamsGroupTargetAssignmentMetadata() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ // The group is created if it does not exist.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord("foo",
10));
+ assertEquals(10,
context.groupMetadataManager.streamsGroup("foo").assignmentEpoch());
+ }
+
+ @Test
+ public void testReplayStreamsGroupTargetAssignmentMetadataTombstone() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ // The group may not exist at all. Replaying the
StreamsGroupTargetAssignmentMetadata tombstone
+ // should be a no-op.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochTombstoneRecord("foo"));
+ assertThrows(GroupIdNotFoundException.class, () ->
context.groupMetadataManager.streamsGroup("foo"));
+ }
+
+ @Test
+ public void testReplayStreamsGroupCurrentMemberAssignment() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ StreamsGroupMember member = new StreamsGroupMember.Builder("member")
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(9)
+
.setState(org.apache.kafka.coordinator.group.streams.MemberState.UNRELEASED_TASKS)
+ .setAssignedTasks(new TasksTuple(
+
TaskAssignmentTestUtil.mkTasksPerSubtopology(TaskAssignmentTestUtil.mkTasks("subtopology-1",
0, 1, 2)),
+
TaskAssignmentTestUtil.mkTasksPerSubtopology(TaskAssignmentTestUtil.mkTasks("subtopology-1",
3, 4, 5)),
+
TaskAssignmentTestUtil.mkTasksPerSubtopology(TaskAssignmentTestUtil.mkTasks("subtopology-1",
6, 7, 8))
+ ))
+ .setTasksPendingRevocation(TasksTuple.EMPTY)
+ .build();
+
+ // The group and the member are created if they do not exist.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord("bar",
member));
+ assertEquals(member,
context.groupMetadataManager.streamsGroup("bar").getOrMaybeCreateMember("member",
false));
+ }
+
+ @Test
+ public void testReplayStreamsGroupCurrentMemberAssignmentTombstone() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
Review Comment:
Could you please add a test for when a group exists?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -687,6 +717,75 @@ ConsumerGroup getOrMaybeCreateConsumerGroup(
}
}
+ /**
+ * Gets or maybe creates a streams group without updating the groups map.
+ * The group will be materialized during the replay.
+ *
+ * @param groupId The group id.
+ * @param createIfNotExists A boolean indicating whether the group should
be
+ * created if it does not exist or is an empty
classic group.
+ *
+ * @return A StreamsGroup.
+ * @throws GroupIdNotFoundException if the group does not exist and
createIfNotExists is false or
+ * if the group is not a streams group.
+ *
+ * Package private for testing.
+ */
+ StreamsGroup getOrMaybeCreateStreamsGroup(
+ String groupId,
+ boolean createIfNotExists
+ ) throws GroupIdNotFoundException {
+ Group group = groups.get(groupId);
+
+ if (group == null && !createIfNotExists) {
+ throw new GroupIdNotFoundException(String.format("Streams group %s
not found.", groupId));
+ }
+
+ if (group == null) {
+ return new StreamsGroup(logContext, snapshotRegistry, groupId,
metrics);
+ } else {
+ if (group.type() == STREAMS) {
+ return (StreamsGroup) group;
+ } else {
+ throw new GroupIdNotFoundException(String.format("Group %s is
not a streams group.", groupId));
+ }
+ }
+ }
+
+ /**
+ * Gets a streams group by committed offset.
+ *
+ * @param groupId The group id.
+ * @param committedOffset A specified committed offset corresponding to
this shard.
+ *
+ * @return A StreamsGroup.
+ * @throws GroupIdNotFoundException if the group does not exist or is not
a streams group.
+ */
+ public StreamsGroup streamsGroup(
Review Comment:
Why is this method `public`? It is not used outside of this class.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -15135,6 +15284,229 @@ public void
testReplayConsumerGroupCurrentMemberAssignmentTombstone() {
assertThrows(GroupIdNotFoundException.class, () ->
context.groupMetadataManager.consumerGroup("bar"));
}
+ @Test
+ public void testReplayStreamsGroupMemberMetadata() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ StreamsGroupMember member = new StreamsGroupMember.Builder("member")
+ .setClientId("clientid")
+ .setClientHost("clienthost")
+ .setRackId("rackid")
+ .setInstanceId("instanceid")
+ .setRebalanceTimeoutMs(1000)
+ .setTopologyEpoch(10)
+ .setProcessId("processid")
+ .setUserEndpoint(new Endpoint().setHost("localhost").setPort(9999))
+ .setClientTags(Collections.singletonMap("key", "value"))
+ .build();
+
+ // The group and the member are created if they do not exist.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord("foo",
member));
+ assertEquals(member,
context.groupMetadataManager.streamsGroup("foo").getOrMaybeCreateMember("member",
false));
+ }
+
+ @Test
+ public void testReplayStreamsGroupMemberMetadataTombstone() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ // The group still exists but the member is already gone. Replaying the
+ // StreamsGroupMemberMetadata tombstone should be a no-op.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo",
10));
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord("foo",
"m1"));
+ assertThrows(UnknownMemberIdException.class, () ->
context.groupMetadataManager.streamsGroup("foo").getOrMaybeCreateMember("m1",
false));
+
+ // The group may not exist at all. Replaying the
StreamsGroupMemberMetadata tombstone
+ // should be a no-op.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord("bar",
"m1"));
+ assertThrows(GroupIdNotFoundException.class, () ->
context.groupMetadataManager.streamsGroup("bar"));
+ }
+
+ @Test
+ public void testReplayStreamsGroupMetadata() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ // The group is created if it does not exist.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo",
10));
+ assertEquals(10,
context.groupMetadataManager.streamsGroup("foo").groupEpoch());
+ }
+
+ @Test
+ public void testReplayStreamsGroupMetadataTombstone() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ // The group may not exist at all. Replaying the StreamsGroupMetadata
tombstone
+ // should be a no-op.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochTombstoneRecord("foo"));
+ assertThrows(GroupIdNotFoundException.class, () ->
context.groupMetadataManager.streamsGroup("foo"));
+ }
+
+ @Test
+ public void testReplayStreamsGroupPartitionMetadata() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata>
metadata = Map.of(
+ "bar",
+ new
org.apache.kafka.coordinator.group.streams.TopicMetadata(Uuid.randomUuid(),
"bar", 10)
+ );
+
+ // The group is created if it does not exist.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord("foo",
metadata));
+ assertEquals(metadata,
context.groupMetadataManager.streamsGroup("foo").partitionMetadata());
+ }
+
+ @Test
+ public void testReplayStreamsGroupPartitionMetadataTombstone() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ // The group may not exist at all. Replaying the
StreamsGroupPartitionMetadata tombstone
+ // should be a no-op.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataTombstoneRecord("foo"));
+ assertThrows(GroupIdNotFoundException.class, () ->
context.groupMetadataManager.streamsGroup("foo"));
+ }
+
+ @Test
+ public void testReplayStreamsGroupTargetAssignmentMember() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ // The group is created if it does not exist.
+ final TasksTuple tasks =
+ new TasksTuple(
+ TaskAssignmentTestUtil.mkTasksPerSubtopology(
+ TaskAssignmentTestUtil.mkTasks("subtopology-1", 0, 1, 2)),
+ TaskAssignmentTestUtil.mkTasksPerSubtopology(
+ TaskAssignmentTestUtil.mkTasks("subtopology-1", 3, 4, 5)),
+ TaskAssignmentTestUtil.mkTasksPerSubtopology(
+ TaskAssignmentTestUtil.mkTasks("subtopology-1", 6, 7, 8))
+ );
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord("foo",
"m1", tasks));
+ assertEquals(tasks.activeTasks(),
context.groupMetadataManager.streamsGroup("foo").targetAssignment("m1").activeTasks());
+ assertEquals(tasks.standbyTasks(),
context.groupMetadataManager.streamsGroup("foo").targetAssignment("m1").standbyTasks());
+ assertEquals(tasks.warmupTasks(),
context.groupMetadataManager.streamsGroup("foo").targetAssignment("m1").warmupTasks());
+ }
+
+ @Test
+ public void testReplayStreamsGroupTargetAssignmentMemberTombstone() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ // The group may not exist at all. Replaying the
StreamsGroupTargetAssignmentMember tombstone
+ // should be a no-op.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord("foo",
"m1"));
+ assertThrows(GroupIdNotFoundException.class, () ->
context.groupMetadataManager.streamsGroup("foo"));
+ }
+
+ @Test
+ public void testReplayStreamsGroupTargetAssignmentMetadata() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ // The group is created if it does not exist.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord("foo",
10));
+ assertEquals(10,
context.groupMetadataManager.streamsGroup("foo").assignmentEpoch());
+ }
+
+ @Test
+ public void testReplayStreamsGroupTargetAssignmentMetadataTombstone() {
Review Comment:
Could you please add a test for when a group exists?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]