This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.2 by this push:
new 08a1660fba4 KAFKA-20254: Fix streams group creation failing when
simple classic group exists (#21641)
08a1660fba4 is described below
commit 08a1660fba4697740183156e63cd8cfa4deca441
Author: Lucas Brutschy <[email protected]>
AuthorDate: Fri Mar 6 10:36:21 2026 +0100
KAFKA-20254: Fix streams group creation failing when simple classic group
exists (#21641)
During replay of the __consumer_offsets topic, offset commit records can
appear before streams group records, for example after log compaction.
When OffsetMetadataManager replays an offset commit for a group that
doesn't exist yet, it automatically creates a simple ClassicGroup to
hold the offsets. When the streams group records are subsequently
replayed, getOrMaybeCreatePersistedStreamsGroup fails with "Group X is
not a streams group" because it does not handle simple classic groups.
This adds handling for simple classic groups in
getOrMaybeCreatePersistedStreamsGroup, matching the existing pattern in
getOrMaybeCreatePersistedConsumerGroup. Simple classic groups have no
backing records in __consumer_offsets and can safely be replaced.
Reviewers: Matthias J. Sax <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../coordinator/group/GroupMetadataManager.java | 9 ++++++++
.../group/GroupMetadataManagerTest.java | 27 ++++++++++++++++++++++
2 files changed, 36 insertions(+)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 23a8f02b3ea..37996fc1da7 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -1029,6 +1029,15 @@ public class GroupMetadataManager {
return streamsGroup;
} else if (group.type() == STREAMS) {
return (StreamsGroup) group;
+ } else if (group.type() == CLASSIC && ((ClassicGroup)
group).isSimpleGroup()) {
+ // If the group is a simple classic group, it was automatically
created to hold committed
+ // offsets when no group-metadata-backed group existed. Simple
classic groups do not have
+ // any GroupMetadataKey/Value records in the __consumer_offsets
topic, only offset commit
+ // records, so the in-memory group can be safely replaced here.
Without this, replaying
+ // streams group records after offset commit records would not
work.
+ StreamsGroup streamsGroup = new StreamsGroup(logContext,
snapshotRegistry, groupId);
+ groups.put(groupId, streamsGroup);
+ return streamsGroup;
} else {
// We don't support upgrading/downgrading between protocols at the
moment, so
// we throw an exception if a group exists with the wrong type.
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 03a109817ec..f8fb2099ae0 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -20705,6 +20705,33 @@ public class GroupMetadataManagerTest {
assertEquals(member,
context.groupMetadataManager.streamsGroup("foo").getMemberOrThrow("member"));
}
+ @Test
+ public void testReplayStreamsGroupMemberMetadataWithSimpleClassicGroup() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ // A simple classic group is created when replaying offset commits
without a group.
+ // This simulates the scenario where offset commit records are
replayed before streams
+ // group records after log compaction has cleaned up the group
metadata tombstone.
+ context.groupMetadataManager.getOrMaybeCreateClassicGroup("foo", true);
+
+ StreamsGroupMember member = new StreamsGroupMember.Builder("member")
+ .setClientId("clientid")
+ .setClientHost("clienthost")
+ .setRackId("rackid")
+ .setInstanceId("instanceid")
+ .setRebalanceTimeoutMs(1000)
+ .setTopologyEpoch(10)
+ .setProcessId("processid")
+ .setUserEndpoint(new Endpoint().setHost("localhost").setPort(9999))
+ .setClientTags(Map.of("key", "value"))
+ .build();
+
+ // The simple classic group should be replaced by a streams group.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord("foo",
member));
+ assertEquals(member,
context.groupMetadataManager.streamsGroup("foo").getMemberOrThrow("member"));
+ }
+
@Test
public void testReplayStreamsGroupMemberMetadataTombstoneNotExisting() {
GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()