This is an automated email from the ASF dual-hosted git repository.
mittal pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.1 by this push:
new 71f56002832 KAFKA-19453: Ignore group not found in share group record
replay (#20100)
71f56002832 is described below
commit 71f56002832895be06b9a378744e947ce405c460
Author: Sushant Mahajan <[email protected]>
AuthorDate: Fri Jul 4 00:16:59 2025 +0530
KAFKA-19453: Ignore group not found in share group record replay (#20100)
* When a `ShareGroup*` record is replayed in group metadata manager,
there is a call to check if the group exists. If the group does not
exist - we are throwing an exception which is unnecessary.
* In this PR, we have added check to ignore this exception.
* New test to validate the logic has been added.
Reviewers: Apoorv Mittal <[email protected]>
Note: cherry pick from PR https://github.com/apache/kafka/pull/20076 in
trunk.
---
.../coordinator/group/GroupMetadataManager.java | 83 ++++++++++---
.../group/GroupMetadataManagerTest.java | 128 +++++++++++++++++++++
2 files changed, 192 insertions(+), 19 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index ca47af5d4a3..080fa265221 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -5358,16 +5358,26 @@ public class GroupMetadataManager {
String groupId = key.groupId();
String memberId = key.memberId();
- ShareGroup shareGroup = getOrMaybeCreatePersistedShareGroup(groupId,
value != null);
+ ShareGroup shareGroup;
+ ShareGroupMember oldMember;
+ try {
+ shareGroup = getOrMaybeCreatePersistedShareGroup(groupId, value !=
null);
+ oldMember = shareGroup.getOrMaybeCreateMember(memberId, value !=
null);
+ } catch (GroupIdNotFoundException ex) {
+ log.debug("ShareGroupMemberMetadata tombstone without group - {}",
groupId, ex);
+ return;
+ } catch (UnknownMemberIdException ex) {
+ log.debug("ShareGroupMemberMetadata tombstone for groupId - {}
without member - {}", groupId, memberId, ex);
+ return;
+ }
+
Set<String> oldSubscribedTopicNames = new
HashSet<>(shareGroup.subscribedTopicNames().keySet());
if (value != null) {
- ShareGroupMember oldMember =
shareGroup.getOrMaybeCreateMember(memberId, true);
shareGroup.updateMember(new ShareGroupMember.Builder(oldMember)
.updateWith(value)
.build());
} else {
- ShareGroupMember oldMember =
shareGroup.getOrMaybeCreateMember(memberId, false);
if (oldMember.memberEpoch() != LEAVE_GROUP_MEMBER_EPOCH) {
throw new IllegalStateException("Received a tombstone record
to delete member " + memberId
+ " with invalid leave group epoch.");
@@ -5396,12 +5406,18 @@ public class GroupMetadataManager {
) {
String groupId = key.groupId();
+ ShareGroup shareGroup;
+ try {
+ shareGroup = getOrMaybeCreatePersistedShareGroup(groupId, value !=
null);
+ } catch (GroupIdNotFoundException ex) {
+ log.debug("ShareGroupMetadata tombstone without group - {}",
groupId, ex);
+ return;
+ }
+
if (value != null) {
- ShareGroup shareGroup =
getOrMaybeCreatePersistedShareGroup(groupId, true);
shareGroup.setGroupEpoch(value.epoch());
shareGroup.setMetadataHash(value.metadataHash());
} else {
- ShareGroup shareGroup =
getOrMaybeCreatePersistedShareGroup(groupId, false);
if (!shareGroup.members().isEmpty()) {
throw new IllegalStateException("Received a tombstone record
to delete group " + groupId
+ " but the group still has " +
shareGroup.members().size() + " members.");
@@ -5593,7 +5609,14 @@ public class GroupMetadataManager {
) {
String groupId = key.groupId();
String memberId = key.memberId();
- ShareGroup group = getOrMaybeCreatePersistedShareGroup(groupId, false);
+
+ ShareGroup group;
+ try {
+ group = getOrMaybeCreatePersistedShareGroup(groupId, value !=
null);
+ } catch (GroupIdNotFoundException ex) {
+ log.debug("ShareGroupTargetAssignmentMember tombstone without
group - {}", groupId, ex);
+ return;
+ }
if (value != null) {
group.updateTargetAssignment(memberId,
Assignment.fromRecord(value));
@@ -5615,7 +5638,14 @@ public class GroupMetadataManager {
ShareGroupTargetAssignmentMetadataValue value
) {
String groupId = key.groupId();
- ShareGroup group = getOrMaybeCreatePersistedShareGroup(groupId, false);
+
+ ShareGroup group;
+ try {
+ group = getOrMaybeCreatePersistedShareGroup(groupId, value !=
null);
+ } catch (GroupIdNotFoundException ex) {
+ log.debug("ShareGroupTargetAssignmentMetadata tombstone without
group - {}", groupId, ex);
+ return;
+ }
if (value != null) {
group.setTargetAssignmentEpoch(value.assignmentEpoch());
@@ -5642,20 +5672,31 @@ public class GroupMetadataManager {
String groupId = key.groupId();
String memberId = key.memberId();
- ShareGroup group = getOrMaybeCreatePersistedShareGroup(groupId, false);
- ShareGroupMember oldMember = group.getOrMaybeCreateMember(memberId,
false);
+ ShareGroup group;
+ ShareGroupMember oldMember;
+
+ try {
+ group = getOrMaybeCreatePersistedShareGroup(groupId, value !=
null);
+ oldMember = group.getOrMaybeCreateMember(memberId, value != null);
+ } catch (GroupIdNotFoundException ex) {
+ log.debug("ShareGroupCurrentMemberAssignment tombstone without
group - {}", groupId, ex);
+ return;
+ } catch (UnknownMemberIdException ex) {
+ log.debug("ShareGroupCurrentMemberAssignment tombstone for groupId
- {} without member - {}", groupId, memberId, ex);
+ return;
+ }
if (value != null) {
ShareGroupMember newMember = new
ShareGroupMember.Builder(oldMember)
- .updateWith(value)
- .build();
+ .updateWith(value)
+ .build();
group.updateMember(newMember);
} else {
ShareGroupMember newMember = new
ShareGroupMember.Builder(oldMember)
- .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)
- .setPreviousMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)
- .setAssignedPartitions(Map.of())
- .build();
+ .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)
+ .setPreviousMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)
+ .setAssignedPartitions(Map.of())
+ .build();
group.updateMember(newMember);
}
}
@@ -5673,12 +5714,16 @@ public class GroupMetadataManager {
) {
String groupId = key.groupId();
- getOrMaybeCreatePersistedShareGroup(groupId, false);
-
// Update timeline structures with info about initialized/deleted
topics.
+ try {
+ getOrMaybeCreatePersistedShareGroup(groupId, value != null);
+ } catch (GroupIdNotFoundException ex) {
+ // Ignore tombstone if group not found.
+ log.debug("ShareGroupStatePartitionMetadata tombstone for
non-existent share group {}", groupId, ex);
+ }
+
if (value == null) {
- // Tombstone!
- shareGroupStatePartitionMetadata.remove(groupId);
+ shareGroupStatePartitionMetadata.remove(groupId); // Should not
throw any exceptions.
} else {
long timestamp = time.milliseconds();
ShareGroupStatePartitionMetadataInfo info = new
ShareGroupStatePartitionMetadataInfo(
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 1cf13c70490..bc5afd7704f 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -95,10 +95,18 @@ import
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataV
import
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
import
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
+import
org.apache.kafka.coordinator.group.generated.ShareGroupCurrentMemberAssignmentKey;
+import
org.apache.kafka.coordinator.group.generated.ShareGroupCurrentMemberAssignmentValue;
+import
org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataKey;
+import
org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataValue;
import
org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataKey;
import
org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataValue;
+import
org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberKey;
+import
org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberValue;
+import
org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataKey;
+import
org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataValue;
import
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue.Endpoint;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
import org.apache.kafka.coordinator.group.modern.Assignment;
@@ -204,6 +212,7 @@ import static
org.apache.kafka.coordinator.group.classic.ClassicGroupState.STABL
import static
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME;
import static
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CONSUMER_GROUP_REBALANCES_SENSOR_NAME;
import static
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.SHARE_GROUP_REBALANCES_SENSOR_NAME;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -215,6 +224,7 @@ import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -22895,6 +22905,124 @@ public class GroupMetadataManagerTest {
assertEquals(Map.of(t1Id, new InitMapValue(t1Name, Set.of(0), 1)),
GroupMetadataManager.combineInitMaps(m1, m2));
}
+ private static Stream<CoordinatorRecord> shareGroupRecords() {
+ String groupId = "groupId";
+ String memberId = Uuid.randomUuid().toString();
+
+ return Stream.of(
+ // Tombstones
+ CoordinatorRecord.tombstone(
+ new ShareGroupMemberMetadataKey()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ ),
+ CoordinatorRecord.tombstone(
+ new ShareGroupMetadataKey()
+ .setGroupId(groupId)
+ ),
+ CoordinatorRecord.tombstone(
+ new ShareGroupTargetAssignmentMemberKey()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ ),
+ CoordinatorRecord.tombstone(
+ new ShareGroupTargetAssignmentMetadataKey()
+ .setGroupId(groupId)
+ ),
+ CoordinatorRecord.tombstone(
+ new ShareGroupCurrentMemberAssignmentKey()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ ),
+ CoordinatorRecord.tombstone(
+ new ShareGroupStatePartitionMetadataKey()
+ .setGroupId(groupId)
+ ),
+ // Data
+ CoordinatorRecord.record(
+ new ShareGroupMemberMetadataKey()
+ .setGroupId(groupId)
+ .setMemberId(memberId),
+ new ApiMessageAndVersion(
+ new ShareGroupMemberMetadataValue()
+ .setSubscribedTopicNames(List.of("tp1")),
+ (short) 10
+ )
+ ),
+ CoordinatorRecord.record(
+ new ShareGroupMetadataKey()
+ .setGroupId(groupId),
+ new ApiMessageAndVersion(
+ new ShareGroupMetadataValue()
+ .setEpoch(1)
+ .setMetadataHash(2L),
+ (short) 11
+ )
+ ),
+ CoordinatorRecord.record(
+ new ShareGroupTargetAssignmentMetadataKey()
+ .setGroupId(groupId),
+ new ApiMessageAndVersion(
+ new ShareGroupTargetAssignmentMetadataValue()
+ .setAssignmentEpoch(5),
+ (short) 12
+ )
+ ),
+ CoordinatorRecord.record(
+ new ShareGroupTargetAssignmentMemberKey()
+ .setGroupId(groupId)
+ .setMemberId(memberId),
+ new ApiMessageAndVersion(new
ShareGroupTargetAssignmentMemberValue()
+ .setTopicPartitions(List.of(
+ new
ShareGroupTargetAssignmentMemberValue.TopicPartition()
+ .setTopicId(Uuid.randomUuid())
+ .setPartitions(List.of(0, 1, 2))
+ )),
+ (short) 13
+ )
+ ),
+ CoordinatorRecord.record(
+ new ShareGroupCurrentMemberAssignmentKey()
+ .setGroupId(groupId)
+ .setMemberId(memberId),
+ new ApiMessageAndVersion(new
ShareGroupCurrentMemberAssignmentValue()
+ .setAssignedPartitions(List.of(
+ new
ShareGroupCurrentMemberAssignmentValue.TopicPartitions()
+ .setTopicId(Uuid.randomUuid())
+ .setPartitions(List.of(0, 1, 2))
+ )
+ )
+ .setMemberEpoch(5)
+ .setPreviousMemberEpoch(4)
+ .setState((byte) 0),
+ (short) 14
+ )
+ ),
+ CoordinatorRecord.record(
+ new ShareGroupStatePartitionMetadataKey()
+ .setGroupId(groupId),
+ new ApiMessageAndVersion(new
ShareGroupStatePartitionMetadataValue()
+ .setInitializingTopics(List.of())
+ .setInitializedTopics(List.of())
+ .setDeletingTopics(List.of()),
+ (short) 15
+ )
+ )
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource("shareGroupRecords")
+ public void testShareGroupRecordsNoExceptionOnReplay(CoordinatorRecord
record) {
+ MockPartitionAssignor assignor = new MockPartitionAssignor("simple");
+ assignor.prepareGroupAssignment(new GroupAssignment(Map.of()));
+ GroupMetadataManagerTestContext context = spy(new
GroupMetadataManagerTestContext.Builder()
+ .withShareGroupAssignor(assignor)
+ .build());
+
+ assertDoesNotThrow(() -> context.replay(record));
+ }
+
private static void checkJoinGroupResponse(
JoinGroupResponseData expectedResponse,
JoinGroupResponseData actualResponse,