This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 28c53ba09a5 KAFKA-19453: Ignore group not found in share group record 
replay. (#20076)
28c53ba09a5 is described below

commit 28c53ba09a546f8ac7c02e07fcb6b31f9a708414
Author: Sushant Mahajan <[email protected]>
AuthorDate: Wed Jul 2 14:40:14 2025 +0530

    KAFKA-19453: Ignore group not found in share group record replay. (#20076)
    
    * When a `ShareGroup*` record is replayed in group
    metadata manager, there is a call to check if the group exists. If the
    group does not exist - we are throwing an exception which is
    unnecessary.
    * In this PR, we have added check to ignore this exception.
    * New test to validate the logic has been added.
    
    Reviewers: Andrew Schofield <[email protected]>, Dongnuo Lyu
    <[email protected]>
---
 .../coordinator/group/GroupMetadataManager.java    |  83 ++++++++++---
 .../group/GroupMetadataManagerTest.java            | 128 +++++++++++++++++++++
 2 files changed, 192 insertions(+), 19 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 0879283931a..ab186575e79 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -5356,16 +5356,26 @@ public class GroupMetadataManager {
         String groupId = key.groupId();
         String memberId = key.memberId();
 
-        ShareGroup shareGroup = getOrMaybeCreatePersistedShareGroup(groupId, 
value != null);
+        ShareGroup shareGroup;
+        ShareGroupMember oldMember;
+        try {
+            shareGroup = getOrMaybeCreatePersistedShareGroup(groupId, value != 
null);
+            oldMember = shareGroup.getOrMaybeCreateMember(memberId, value != 
null);
+        } catch (GroupIdNotFoundException ex) {
+            log.debug("ShareGroupMemberMetadata tombstone without group - {}", 
groupId, ex);
+            return;
+        } catch (UnknownMemberIdException ex) {
+            log.debug("ShareGroupMemberMetadata tombstone for groupId - {} 
without member - {}", groupId, memberId, ex);
+            return;
+        }
+
         Set<String> oldSubscribedTopicNames = new 
HashSet<>(shareGroup.subscribedTopicNames().keySet());
 
         if (value != null) {
-            ShareGroupMember oldMember = 
shareGroup.getOrMaybeCreateMember(memberId, true);
             shareGroup.updateMember(new ShareGroupMember.Builder(oldMember)
                 .updateWith(value)
                 .build());
         } else {
-            ShareGroupMember oldMember = 
shareGroup.getOrMaybeCreateMember(memberId, false);
             if (oldMember.memberEpoch() != LEAVE_GROUP_MEMBER_EPOCH) {
                 throw new IllegalStateException("Received a tombstone record 
to delete member " + memberId
                     + " with invalid leave group epoch.");
@@ -5394,12 +5404,18 @@ public class GroupMetadataManager {
     ) {
         String groupId = key.groupId();
 
+        ShareGroup shareGroup;
+        try {
+            shareGroup = getOrMaybeCreatePersistedShareGroup(groupId, value != 
null);
+        } catch (GroupIdNotFoundException ex) {
+            log.debug("ShareGroupMetadata tombstone without group - {}", 
groupId, ex);
+            return;
+        }
+
         if (value != null) {
-            ShareGroup shareGroup = 
getOrMaybeCreatePersistedShareGroup(groupId, true);
             shareGroup.setGroupEpoch(value.epoch());
             shareGroup.setMetadataHash(value.metadataHash());
         } else {
-            ShareGroup shareGroup = 
getOrMaybeCreatePersistedShareGroup(groupId, false);
             if (!shareGroup.members().isEmpty()) {
                 throw new IllegalStateException("Received a tombstone record 
to delete group " + groupId
                     + " but the group still has " + 
shareGroup.members().size() + " members.");
@@ -5591,7 +5607,14 @@ public class GroupMetadataManager {
     ) {
         String groupId = key.groupId();
         String memberId = key.memberId();
-        ShareGroup group = getOrMaybeCreatePersistedShareGroup(groupId, false);
+
+        ShareGroup group;
+        try {
+            group = getOrMaybeCreatePersistedShareGroup(groupId, value != 
null);
+        } catch (GroupIdNotFoundException ex) {
+            log.debug("ShareGroupTargetAssignmentMember tombstone without 
group - {}", groupId, ex);
+            return;
+        }
 
         if (value != null) {
             group.updateTargetAssignment(memberId, 
Assignment.fromRecord(value));
@@ -5613,7 +5636,14 @@ public class GroupMetadataManager {
         ShareGroupTargetAssignmentMetadataValue value
     ) {
         String groupId = key.groupId();
-        ShareGroup group = getOrMaybeCreatePersistedShareGroup(groupId, false);
+
+        ShareGroup group;
+        try {
+            group = getOrMaybeCreatePersistedShareGroup(groupId, value != 
null);
+        } catch (GroupIdNotFoundException ex) {
+            log.debug("ShareGroupTargetAssignmentMetadata tombstone without 
group - {}", groupId, ex);
+            return;
+        }
 
         if (value != null) {
             group.setTargetAssignmentEpoch(value.assignmentEpoch());
@@ -5640,20 +5670,31 @@ public class GroupMetadataManager {
         String groupId = key.groupId();
         String memberId = key.memberId();
 
-        ShareGroup group = getOrMaybeCreatePersistedShareGroup(groupId, false);
-        ShareGroupMember oldMember = group.getOrMaybeCreateMember(memberId, 
false);
+        ShareGroup group;
+        ShareGroupMember oldMember;
+
+        try {
+            group = getOrMaybeCreatePersistedShareGroup(groupId, value != 
null);
+            oldMember = group.getOrMaybeCreateMember(memberId, value != null);
+        } catch (GroupIdNotFoundException ex) {
+            log.debug("ShareGroupCurrentMemberAssignment tombstone without 
group - {}", groupId, ex);
+            return;
+        } catch (UnknownMemberIdException ex) {
+            log.debug("ShareGroupCurrentMemberAssignment tombstone for groupId 
- {} without member - {}", groupId, memberId, ex);
+            return;
+        }
 
         if (value != null) {
             ShareGroupMember newMember = new 
ShareGroupMember.Builder(oldMember)
-                    .updateWith(value)
-                    .build();
+                .updateWith(value)
+                .build();
             group.updateMember(newMember);
         } else {
             ShareGroupMember newMember = new 
ShareGroupMember.Builder(oldMember)
-                    .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)
-                    .setPreviousMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)
-                    .setAssignedPartitions(Map.of())
-                    .build();
+                .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)
+                .setPreviousMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)
+                .setAssignedPartitions(Map.of())
+                .build();
             group.updateMember(newMember);
         }
     }
@@ -5671,12 +5712,16 @@ public class GroupMetadataManager {
     ) {
         String groupId = key.groupId();
 
-        getOrMaybeCreatePersistedShareGroup(groupId, false);
-
         // Update timeline structures with info about initialized/deleted 
topics.
+        try {
+            getOrMaybeCreatePersistedShareGroup(groupId, value != null);
+        } catch (GroupIdNotFoundException ex) {
+            // Ignore tombstone if group not found.
+            log.debug("ShareGroupStatePartitionMetadata tombstone for 
non-existent share group {}", groupId, ex);
+        }
+
         if (value == null) {
-            // Tombstone!
-            shareGroupStatePartitionMetadata.remove(groupId);
+            shareGroupStatePartitionMetadata.remove(groupId);   // Should not 
throw any exceptions.
         } else {
             long timestamp = time.milliseconds();
             ShareGroupStatePartitionMetadataInfo info = new 
ShareGroupStatePartitionMetadataInfo(
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 1cf13c70490..bc5afd7704f 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -95,10 +95,18 @@ import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataV
 import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
 import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
 import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ShareGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ShareGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataValue;
 import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataKey;
 import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataValue;
 import 
org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataKey;
 import 
org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataValue;
 import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue.Endpoint;
 import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
 import org.apache.kafka.coordinator.group.modern.Assignment;
@@ -204,6 +212,7 @@ import static 
org.apache.kafka.coordinator.group.classic.ClassicGroupState.STABL
 import static 
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME;
 import static 
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CONSUMER_GROUP_REBALANCES_SENSOR_NAME;
 import static 
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.SHARE_GROUP_REBALANCES_SENSOR_NAME;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -215,6 +224,7 @@ import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -22895,6 +22905,124 @@ public class GroupMetadataManagerTest {
         assertEquals(Map.of(t1Id, new InitMapValue(t1Name, Set.of(0), 1)), 
GroupMetadataManager.combineInitMaps(m1, m2));
     }
 
+    private static Stream<CoordinatorRecord> shareGroupRecords() {
+        String groupId = "groupId";
+        String memberId = Uuid.randomUuid().toString();
+
+        return Stream.of(
+            // Tombstones
+            CoordinatorRecord.tombstone(
+                new ShareGroupMemberMetadataKey()
+                    .setGroupId(groupId)
+                    .setMemberId(memberId)
+            ),
+            CoordinatorRecord.tombstone(
+                new ShareGroupMetadataKey()
+                    .setGroupId(groupId)
+            ),
+            CoordinatorRecord.tombstone(
+                new ShareGroupTargetAssignmentMemberKey()
+                    .setGroupId(groupId)
+                    .setMemberId(memberId)
+            ),
+            CoordinatorRecord.tombstone(
+                new ShareGroupTargetAssignmentMetadataKey()
+                    .setGroupId(groupId)
+            ),
+            CoordinatorRecord.tombstone(
+                new ShareGroupCurrentMemberAssignmentKey()
+                    .setGroupId(groupId)
+                    .setMemberId(memberId)
+            ),
+            CoordinatorRecord.tombstone(
+                new ShareGroupStatePartitionMetadataKey()
+                    .setGroupId(groupId)
+            ),
+            // Data
+            CoordinatorRecord.record(
+                new ShareGroupMemberMetadataKey()
+                    .setGroupId(groupId)
+                    .setMemberId(memberId),
+                new ApiMessageAndVersion(
+                    new ShareGroupMemberMetadataValue()
+                        .setSubscribedTopicNames(List.of("tp1")),
+                    (short) 10
+                )
+            ),
+            CoordinatorRecord.record(
+                new ShareGroupMetadataKey()
+                    .setGroupId(groupId),
+                new ApiMessageAndVersion(
+                    new ShareGroupMetadataValue()
+                        .setEpoch(1)
+                        .setMetadataHash(2L),
+                    (short) 11
+                )
+            ),
+            CoordinatorRecord.record(
+                new ShareGroupTargetAssignmentMetadataKey()
+                    .setGroupId(groupId),
+                new ApiMessageAndVersion(
+                    new ShareGroupTargetAssignmentMetadataValue()
+                        .setAssignmentEpoch(5),
+                    (short) 12
+                )
+            ),
+            CoordinatorRecord.record(
+                new ShareGroupTargetAssignmentMemberKey()
+                    .setGroupId(groupId)
+                    .setMemberId(memberId),
+                new ApiMessageAndVersion(new 
ShareGroupTargetAssignmentMemberValue()
+                    .setTopicPartitions(List.of(
+                        new 
ShareGroupTargetAssignmentMemberValue.TopicPartition()
+                            .setTopicId(Uuid.randomUuid())
+                            .setPartitions(List.of(0, 1, 2))
+                    )),
+                    (short) 13
+                )
+            ),
+            CoordinatorRecord.record(
+                new ShareGroupCurrentMemberAssignmentKey()
+                    .setGroupId(groupId)
+                    .setMemberId(memberId),
+                new ApiMessageAndVersion(new 
ShareGroupCurrentMemberAssignmentValue()
+                    .setAssignedPartitions(List.of(
+                            new 
ShareGroupCurrentMemberAssignmentValue.TopicPartitions()
+                                .setTopicId(Uuid.randomUuid())
+                                .setPartitions(List.of(0, 1, 2))
+                        )
+                    )
+                    .setMemberEpoch(5)
+                    .setPreviousMemberEpoch(4)
+                    .setState((byte) 0),
+                    (short) 14
+                )
+            ),
+            CoordinatorRecord.record(
+                new ShareGroupStatePartitionMetadataKey()
+                    .setGroupId(groupId),
+                new ApiMessageAndVersion(new 
ShareGroupStatePartitionMetadataValue()
+                    .setInitializingTopics(List.of())
+                    .setInitializedTopics(List.of())
+                    .setDeletingTopics(List.of()),
+                    (short) 15
+                )
+            )
+        );
+    }
+
+    @ParameterizedTest
+    @MethodSource("shareGroupRecords")
+    public void testShareGroupRecordsNoExceptionOnReplay(CoordinatorRecord 
record) {
+        MockPartitionAssignor assignor = new MockPartitionAssignor("simple");
+        assignor.prepareGroupAssignment(new GroupAssignment(Map.of()));
+        GroupMetadataManagerTestContext context = spy(new 
GroupMetadataManagerTestContext.Builder()
+            .withShareGroupAssignor(assignor)
+            .build());
+
+        assertDoesNotThrow(() -> context.replay(record));
+    }
+
     private static void checkJoinGroupResponse(
         JoinGroupResponseData expectedResponse,
         JoinGroupResponseData actualResponse,

Reply via email to