This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new cdbb8fa4cec KAFKA-18629: Account for existing deleting topics in share
group delete. (#19463)
cdbb8fa4cec is described below
commit cdbb8fa4cec2734e653adf7f8ca36f7b9d943c11
Author: Sushant Mahajan <[email protected]>
AuthorDate: Tue Apr 15 20:24:09 2025 +0530
KAFKA-18629: Account for existing deleting topics in share group delete.
(#19463)
* When deleting share groups, only initialized and initializing
information in the metadata record is considered. However, it could
happen that the deleting topics also contains info due to other RPCs
(share group offsets delete).
* We need to account for existing information while writing the metadata
record in the delete flow.
* This PR aims to add the impl for the same. New tests have been added
to check the functionality.
Reviewers: Andrew Schofield <[email protected]>
---
.../coordinator/group/GroupMetadataManager.java | 37 +++++-
.../group/GroupMetadataManagerTest.java | 134 ++++++++++++++++++++-
2 files changed, 167 insertions(+), 4 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 2e2b02f9432..5ec1ab1d75e 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -2968,7 +2968,8 @@ public class GroupMetadataManager {
)).build();
}
- private void addInitializingTopicsRecords(String groupId,
List<CoordinatorRecord> records, Map<Uuid, Set<Integer>> topicPartitionMap) {
+ // Visibility for tests
+ void addInitializingTopicsRecords(String groupId, List<CoordinatorRecord>
records, Map<Uuid, Set<Integer>> topicPartitionMap) {
if (topicPartitionMap == null || topicPartitionMap.isEmpty()) {
return;
}
@@ -2982,12 +2983,23 @@ public class GroupMetadataManager {
// We must combine the existing information in the record with the
topicPartitionMap argument.
Map<Uuid, Set<Integer>> finalInitializingMap =
mergeShareGroupInitMaps(currentMap.initializingTopics(), topicPartitionMap);
+ // If any initializing topics are also present in the deleting state
+ // we should remove them from deleting.
+ Set<Uuid> currentDeleting = new HashSet<>(currentMap.deletingTopics());
+ if (!currentDeleting.isEmpty()) {
+ finalInitializingMap.keySet().forEach(key -> {
+ if (currentDeleting.remove(key)) {
+ log.warn("Initializing topic {} for share group {} found
in deleting state as well, removing from deleting.",
metadataImage.topics().getTopic(key).name(), groupId);
+ }
+ });
+ }
+
records.add(
newShareGroupStatePartitionMetadataRecord(
groupId,
attachTopicName(finalInitializingMap),
attachTopicName(currentMap.initializedTopics()),
- attachTopicName(currentMap.deletingTopics())
+ attachTopicName(currentDeleting)
)
);
}
@@ -8166,6 +8178,24 @@ public class GroupMetadataManager {
shareGroupPartitionMetadata.get(shareGroupId).initializingTopics()
);
+ // Ideally the deleting should be empty - if it is not then it implies
+ // that some previous share group delete or delete offsets command
+ // did not complete successfully. So, set up the delete request such
that
+ // a retry for the same is possible. Since this is part of an admin
operation
+ // retrying delete should not pose issues related to
+ // performance. Also, the share coordinator is idempotent on delete
partitions.
+ Map<Uuid, Set<Integer>> deletingTopics =
shareGroupPartitionMetadata.get(shareGroupId).deletingTopics().stream()
+ .map(tid -> Map.entry(tid,
metadataImage.topics().getTopic(tid).partitions().keySet()))
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+ if (!deletingTopics.isEmpty()) {
+ log.info("Existing deleting entries found in share group {} - {}",
shareGroupId, deletingTopics);
+ deleteCandidates = mergeShareGroupInitMaps(
+ deleteCandidates,
+ deletingTopics
+ );
+ }
+
if (deleteCandidates.isEmpty()) {
return Optional.empty();
}
@@ -8181,7 +8211,8 @@ public class GroupMetadataManager {
));
}
- // Remove all initializing and initialized topic info from record and
add deleting.
+ // Remove all initializing and initialized topic info from record and
add deleting. There
+ // could be previous deleting topics due to offsets delete, we need to
account for them as well.
records.add(GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(
shareGroupId,
Map.of(),
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 4665e81e9be..459a3f4cbc4 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -129,6 +129,7 @@ import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.server.authorizer.Action;
import org.apache.kafka.server.authorizer.AuthorizationResult;
import org.apache.kafka.server.authorizer.Authorizer;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.share.persister.DeleteShareGroupStateParameters;
import
org.apache.kafka.server.share.persister.InitializeShareGroupStateParameters;
import org.apache.kafka.server.share.persister.PartitionIdData;
@@ -20744,7 +20745,7 @@ public class GroupMetadataManagerTest {
}
@Test
- public void testShareGroupDeleteRequest() {
+ public void testShareGroupDeleteRequestNoDeletingTopics() {
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
assignor.prepareGroupAssignment(new GroupAssignment(Map.of()));
GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
@@ -20807,6 +20808,74 @@ public class GroupMetadataManagerTest {
assertRecordsEquals(expectedRecords, records);
}
+ @Test
+ public void testShareGroupDeleteRequestWithAlreadyDeletingTopics() {
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ assignor.prepareGroupAssignment(new GroupAssignment(Map.of()));
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
List.of(assignor))
+ .build();
+
+ Uuid t1Uuid = Uuid.randomUuid();
+ Uuid t2Uuid = Uuid.randomUuid();
+ Uuid t3Uuid = Uuid.randomUuid();
+ String t1Name = "t1";
+ String t2Name = "t2";
+ String t3Name = "t3";
+
+ String groupId = "share-group";
+ ShareGroup shareGroup = mock(ShareGroup.class);
+ when(shareGroup.groupId()).thenReturn(groupId);
+ when(shareGroup.isEmpty()).thenReturn(false);
+
+ MetadataImage image = new MetadataImageBuilder()
+ .addTopic(t1Uuid, t1Name, 2)
+ .addTopic(t2Uuid, t2Name, 2)
+ .addTopic(t3Uuid, t3Name, 2)
+ .build();
+
+ MetadataDelta delta = new MetadataDelta(image);
+ context.groupMetadataManager.onNewMetadataImage(image, delta);
+
+
context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId,
0));
+
+ context.replay(
+
GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(
+ groupId,
+ Map.of(t1Uuid, Map.entry(t1Name, Set.of(0, 1))),
+ Map.of(t2Uuid, Map.entry(t2Name, Set.of(0, 1))),
+ Map.of(t3Uuid, t3Name)
+ )
+ );
+
+ context.commit();
+
+ Map<Uuid, Set<Integer>> expectedTopicPartitionMap = Map.of(
+ t1Uuid, Set.of(0, 1),
+ t2Uuid, Set.of(0, 1),
+ t3Uuid, Set.of(0, 1)
+ );
+
+ List<CoordinatorRecord> expectedRecords = List.of(
+ newShareGroupStatePartitionMetadataRecord(
+ groupId,
+ Map.of(),
+ Map.of(),
+ Map.of(t1Uuid, t1Name, t2Uuid, t2Name, t3Uuid, t3Name) //
Existing deleting topics should be included here.
+ )
+ );
+
+ List<CoordinatorRecord> records = new ArrayList<>();
+ Optional<DeleteShareGroupStateParameters> params =
context.groupMetadataManager.shareGroupBuildPartitionDeleteRequest(groupId,
records);
+ verifyShareGroupDeleteRequest(
+ params,
+ expectedTopicPartitionMap,
+ groupId,
+ true
+ );
+ assertRecordsEquals(expectedRecords, records);
+ }
+
@Test
public void testSharePartitionsEligibleForOffsetDeletionSuccess() {
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
@@ -21351,6 +21420,69 @@ public class GroupMetadataManagerTest {
);
}
+ @Test
+ public void testShareGroupInitializingClearsCommonDeleting() {
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ assignor.prepareGroupAssignment(new GroupAssignment(Map.of()));
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withShareGroupAssignor(assignor)
+ .build();
+
+ Uuid t1Uuid = Uuid.randomUuid();
+ String t1Name = "t1";
+ MetadataImage image = new MetadataImageBuilder()
+ .addTopic(t1Uuid, t1Name, 2)
+ .build();
+
+ String groupId = "share-group";
+
+ context.groupMetadataManager.onNewMetadataImage(image,
mock(MetadataDelta.class));
+ context.groupMetadataManager.replay(
+ new ShareGroupMetadataKey()
+ .setGroupId(groupId),
+ new ShareGroupMetadataValue()
+ .setEpoch(0)
+ );
+
+ // Replay a deleting record.
+ context.groupMetadataManager.replay(
+ new ShareGroupStatePartitionMetadataKey()
+ .setGroupId(groupId),
+ new ShareGroupStatePartitionMetadataValue()
+ .setInitializingTopics(List.of())
+ .setInitializedTopics(List.of())
+ .setDeletingTopics(List.of(
+ new ShareGroupStatePartitionMetadataValue.TopicInfo()
+ .setTopicId(t1Uuid)
+ .setTopicName(t1Name)
+ ))
+ );
+
+ List<CoordinatorRecord> records = new ArrayList<>();
+ context.groupMetadataManager.addInitializingTopicsRecords(groupId,
records, Map.of(t1Uuid, Set.of(0, 1)));
+
+ List<CoordinatorRecord> expectedRecords = List.of(
+ CoordinatorRecord.record(
+ new ShareGroupStatePartitionMetadataKey()
+ .setGroupId(groupId),
+ new ApiMessageAndVersion(
+ new ShareGroupStatePartitionMetadataValue()
+ .setInitializingTopics(List.of(
+ new
ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
+ .setTopicId(t1Uuid)
+ .setTopicName(t1Name)
+ .setPartitions(List.of(0, 1))
+ ))
+ .setInitializedTopics(List.of())
+ .setDeletingTopics(List.of()),
+ (short) 0
+ )
+ )
+ );
+
+ assertEquals(expectedRecords, records);
+ }
+
@Test
public void testShareGroupInitializeSuccess() {
String groupId = "groupId";