This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f621a635c15 KAFKA-19570: Implement offline migration for streams
groups (#20288)
f621a635c15 is described below
commit f621a635c1546246b80ae5d4624241aa4b58fd39
Author: Lucas Brutschy <[email protected]>
AuthorDate: Tue Aug 26 10:05:30 2025 +0200
KAFKA-19570: Implement offline migration for streams groups (#20288)
Offline migration essentially preserves offsets and nothing else. So
effectively write tombstones for classic group type when a streams
heartbeat is sent to with the group ID of an empty classic group, and
write tombstones for the streams group type when a classic consumer
attempts to join with a group ID of an empty streams group.
Reviewers: Bill Bejeck <[email protected]>, Sean Quah
<[email protected]>, Dongnuo Lyu <[email protected]>
---
.../coordinator/group/GroupMetadataManager.java | 48 ++++++-
.../group/GroupMetadataManagerTest.java | 150 +++++++++++++++++++++
2 files changed, 195 insertions(+), 3 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 44380427284..f87af4897a7 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -833,19 +833,28 @@ public class GroupMetadataManager {
* Gets or creates a streams group without updating the groups map.
* The group will be materialized during the replay.
*
+ * If there is an empty classic consumer group of the same name, it will
be deleted and a new streams
+ * group will be created.
+ *
* @param groupId The group ID.
+ * @param records The record list to which the group tombstones
are written
+ * if the group is empty and is a classic group.
*
* @return A StreamsGroup.
*
* Package private for testing.
*/
StreamsGroup getOrCreateStreamsGroup(
- String groupId
+ String groupId,
+ List<CoordinatorRecord> records
) {
Group group = groups.get(groupId);
if (group == null) {
return new StreamsGroup(logContext, snapshotRegistry, groupId,
metrics);
+ } else if (maybeDeleteEmptyClassicGroup(group, records)) {
+ log.info("[GroupId {}] Converted the empty classic group to a
streams group.", groupId);
+ return new StreamsGroup(logContext, snapshotRegistry, groupId,
metrics);
} else {
return castToStreamsGroup(group);
}
@@ -1871,7 +1880,7 @@ public class GroupMetadataManager {
boolean isJoining = memberEpoch == 0;
StreamsGroup group;
if (isJoining) {
- group = getOrCreateStreamsGroup(groupId);
+ group = getOrCreateStreamsGroup(groupId, records);
throwIfStreamsGroupIsFull(group);
} else {
group = getStreamsGroupOrThrow(groupId);
@@ -6066,7 +6075,11 @@ public class GroupMetadataManager {
// classicGroupJoinToConsumerGroup takes the join requests to
non-empty consumer groups.
// The empty consumer groups should be converted to classic
groups in classicGroupJoinToClassicGroup.
return classicGroupJoinToConsumerGroup((ConsumerGroup) group,
context, request, responseFuture);
- } else if (group.type() == CONSUMER || group.type() == CLASSIC) {
+ } else if (group.type() == CONSUMER || group.type() == CLASSIC ||
group.type() == STREAMS && group.isEmpty()) {
+ // classicGroupJoinToClassicGroup accepts:
+ // - classic groups
+ // - empty streams groups
+ // - empty consumer groups
return classicGroupJoinToClassicGroup(context, request,
responseFuture);
} else {
// Group exists but it's not a consumer group
@@ -6107,6 +6120,8 @@ public class GroupMetadataManager {
ClassicGroup group;
if (maybeDeleteEmptyConsumerGroup(groupId, records)) {
log.info("[GroupId {}] Converted the empty consumer group to a
classic group.", groupId);
+ } else if (maybeDeleteEmptyStreamsGroup(groupId, records)) {
+ log.info("[GroupId {}] Converted the empty streams group to a
classic group.", groupId);
}
boolean isNewGroup = !groups.containsKey(groupId);
try {
@@ -8398,6 +8413,13 @@ public class GroupMetadataManager {
return group != null && group.type() == CONSUMER && group.isEmpty();
}
+ /**
+ * @return true if the group is an empty streams group.
+ */
+ private static boolean isEmptyStreamsGroup(Group group) {
+ return group != null && group.type() == STREAMS && group.isEmpty();
+ }
+
/**
* Write tombstones for the group if it's empty and is a classic group.
*
@@ -8435,6 +8457,26 @@ public class GroupMetadataManager {
}
return false;
}
+
+ /**
+ * Delete and write tombstones for the group if it's empty and is a
streams group.
+ *
+ * @param groupId The group id to be deleted.
+ * @param records The list of records to delete the group.
+ *
+ * @return true if the group is an empty streams group.
+ */
+ private boolean maybeDeleteEmptyStreamsGroup(String groupId,
List<CoordinatorRecord> records) {
+ Group group = groups.get(groupId, Long.MAX_VALUE);
+ if (isEmptyStreamsGroup(group)) {
+ // Add tombstones for the previous streams group. The tombstones
won't actually be
+ // replayed because its coordinator result has a non-null
appendFuture.
+ createGroupTombstoneRecords(group, records);
+ removeGroup(groupId);
+ return true;
+ }
+ return false;
+ }
/**
* Checks whether the given protocol type or name in the request is
inconsistent with the group's.
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 3f1e49b955d..efe2ad96435 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -18633,6 +18633,156 @@ public class GroupMetadataManagerTest {
assertNull(result.response().data().partitionsByUserEndpoint());
}
+ @Test
+ public void testStreamsGroupHeartbeatWithNonEmptyClassicGroup() {
+ String classicGroupId = "classic-group-id";
+ String memberId = Uuid.randomUuid().toString();
+
+ String subtopology1 = "subtopology1";
+ String fooTopicName = "foo";
+ Topology topology = new Topology().setSubtopologies(List.of(
+ new
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
+ ));
+
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder().build();
+ ClassicGroup classicGroup = new ClassicGroup(
+ new LogContext(),
+ classicGroupId,
+ EMPTY,
+ context.time
+ );
+
context.replay(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(classicGroup,
classicGroup.groupAssignment()));
+
+
context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId,
false).transitionTo(PREPARING_REBALANCE);
+ assertThrows(GroupIdNotFoundException.class, () ->
+ context.streamsGroupHeartbeat(
+ new StreamsGroupHeartbeatRequestData()
+ .setGroupId(classicGroupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(12000)
+ .setTopology(topology)
+ .setActiveTasks(List.of())
+ .setStandbyTasks(List.of())
+ .setWarmupTasks(List.of())));
+ }
+
+ @Test
+ public void testStreamsGroupHeartbeatWithEmptyClassicGroup() {
+ String classicGroupId = "classic-group-id";
+ String memberId = Uuid.randomUuid().toString();
+ String fooTopicName = "foo";
+ String subtopology1 = "subtopology1";
+ Topology topology = new Topology().setSubtopologies(List.of(
+ new
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
+ ));
+
+ MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withStreamsGroupTaskAssignors(List.of(assignor))
+ .build();
+ ClassicGroup classicGroup = new ClassicGroup(
+ new LogContext(),
+ classicGroupId,
+ EMPTY,
+ context.time
+ );
+
context.replay(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(classicGroup,
classicGroup.groupAssignment()));
+
+ CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord>
result = context.streamsGroupHeartbeat(
+ new StreamsGroupHeartbeatRequestData()
+ .setGroupId(classicGroupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(12000)
+ .setTopology(topology)
+ .setActiveTasks(List.of())
+ .setStandbyTasks(List.of())
+ .setWarmupTasks(List.of()));
+
+ StreamsGroupMember expectedMember =
StreamsGroupMember.Builder.withDefaults(memberId)
+
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
+ .setMemberEpoch(1)
+ .setPreviousMemberEpoch(0)
+ .setRebalanceTimeoutMs(5000)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setAssignedTasks(TasksTuple.EMPTY)
+ .setTasksPendingRevocation(TasksTuple.EMPTY)
+ .setRebalanceTimeoutMs(12000)
+ .setTopologyEpoch(0)
+ .build();
+
+ assertEquals(Errors.NONE.code(), result.response().data().errorCode());
+ assertEquals(
+ List.of(
+
GroupCoordinatorRecordHelpers.newGroupMetadataTombstoneRecord(classicGroupId),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(classicGroupId,
expectedMember),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(classicGroupId,
topology),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(classicGroupId, 1,
0),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(classicGroupId,
memberId, TasksTuple.EMPTY),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(classicGroupId,
1),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(classicGroupId,
expectedMember)
+ ),
+ result.records()
+ );
+ assertEquals(
+ Group.GroupType.STREAMS,
+ context.groupMetadataManager.streamsGroup(classicGroupId).type()
+ );
+ }
+
+ @Test
+ public void testClassicGroupJoinWithEmptyStreamsGroup() throws Exception {
+ String streamsGroupId = "streams-group-id";
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withStreamsGroup(new StreamsGroupBuilder(streamsGroupId, 10))
+ .build();
+
+ JoinGroupRequestData request = new
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+ .withGroupId(streamsGroupId)
+ .withMemberId(UNKNOWN_MEMBER_ID)
+ .withDefaultProtocolTypeAndProtocols()
+ .build();
+ GroupMetadataManagerTestContext.JoinResult joinResult =
context.sendClassicGroupJoin(request, true);
+
+ List<CoordinatorRecord> expectedRecords = List.of(
+
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochTombstoneRecord(streamsGroupId),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochTombstoneRecord(streamsGroupId),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecordTombstone(streamsGroupId)
+ );
+
+ assertEquals(Errors.MEMBER_ID_REQUIRED.code(),
joinResult.joinFuture.get().errorCode());
+ assertEquals(expectedRecords, joinResult.records.subList(0,
expectedRecords.size()));
+ assertEquals(
+ Group.GroupType.CLASSIC,
+
context.groupMetadataManager.getOrMaybeCreateClassicGroup(streamsGroupId,
false).type()
+ );
+ }
+
+ @Test
+ public void testClassicGroupJoinWithNonEmptyStreamsGroup() throws
Exception {
+ String streamsGroupId = "streams-group-id";
+ String memberId = Uuid.randomUuid().toString();
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withStreamsGroup(new StreamsGroupBuilder(streamsGroupId, 10)
+ .withMember(StreamsGroupMember.Builder.withDefaults(memberId)
+
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .build()))
+ .build();
+
+ JoinGroupRequestData request = new
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+ .withGroupId(streamsGroupId)
+ .withMemberId(UNKNOWN_MEMBER_ID)
+ .withDefaultProtocolTypeAndProtocols()
+ .build();
+
+ GroupMetadataManagerTestContext.JoinResult joinResult =
context.sendClassicGroupJoin(request);
+ assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL.code(),
joinResult.joinFuture.get().errorCode());
+ }
+
@Test
public void testConsumerGroupDynamicConfigs() {
String groupId = "fooup";