This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a6dfde7ce64 KAFKA-18629: Utilize share group partition metadata for
delete group. (#19363)
a6dfde7ce64 is described below
commit a6dfde7ce64ee00d257bb6ff20ce923dacdc0edb
Author: Sushant Mahajan <[email protected]>
AuthorDate: Fri Apr 11 00:45:13 2025 +0530
KAFKA-18629: Utilize share group partition metadata for delete group.
(#19363)
* Currently, the delete share group code flow uses
`group.subscribedTopicNames()` to fetch information about all the share
partitions to which a share group is subscribed to.
* However, this is incorrect since once the group is EMPTY, a
precondition for delete, the aforementioned method will return an empty
list.
* In this PR we have leveraged the `ShareGroupStatePartitionMetadata`
record to grab the `initialized` and `initializing` partitions to build
the delete candidates, which remedies the situation.
Reviewers: Andrew Schofield <[email protected]>
---
.../group/GroupCoordinatorRecordHelpers.java | 15 +++
.../coordinator/group/GroupCoordinatorService.java | 34 ++++---
.../coordinator/group/GroupCoordinatorShard.java | 9 +-
.../coordinator/group/GroupMetadataManager.java | 75 ++++++++------
.../coordinator/group/modern/share/ShareGroup.java | 1 +
.../apache/kafka/coordinator/group/Assertions.java | 31 +++++-
.../group/GroupCoordinatorShardTest.java | 43 ++++----
.../group/GroupMetadataManagerTest.java | 108 ++++++++++++++-------
.../coordinator/share/ShareCoordinatorShard.java | 10 ++
.../share/ShareCoordinatorShardTest.java | 23 ++---
10 files changed, 232 insertions(+), 117 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java
index d0f6901d444..42790515a2d 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java
@@ -814,6 +814,21 @@ public class GroupCoordinatorRecordHelpers {
);
}
+ /**
+ * Creates a ShareGroupStatePartitionMetadata tombstone.
+ *
+ * @param groupId The share group id.
+ * @return The record.
+ */
+ public static CoordinatorRecord
newShareGroupStatePartitionMetadataTombstoneRecord(
+ String groupId
+ ) {
+ return CoordinatorRecord.tombstone(
+ new ShareGroupStatePartitionMetadataKey()
+ .setGroupId(groupId)
+ );
+ }
+
/**
* Creates a ShareGroupStatePartitionMetadata record.
*
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index 97bc7e69622..c00502af48f 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -1073,24 +1073,25 @@ public class GroupCoordinatorService implements
GroupCoordinator {
groupsByTopicPartition.forEach((topicPartition, groupList) -> {
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>
future = deleteShareGroups(topicPartition, groupList).thenCompose(groupErrMap
-> {
- DeleteGroupsResponseData.DeletableGroupResultCollection
collection = new DeleteGroupsResponseData.DeletableGroupResultCollection();
- List<String> retainedGroupIds =
deleteCandidateGroupIds(groupErrMap, groupList, collection);
+ DeleteGroupsResponseData.DeletableGroupResultCollection
deletableGroupResults = new
DeleteGroupsResponseData.DeletableGroupResultCollection();
+ List<String> retainedGroupIds =
updateResponseAndGetNonErrorGroupList(groupErrMap, groupList,
deletableGroupResults);
if (retainedGroupIds.isEmpty()) {
- return CompletableFuture.completedFuture(collection);
+ return
CompletableFuture.completedFuture(deletableGroupResults);
}
return handleDeleteGroups(context, topicPartition,
retainedGroupIds)
- .whenComplete((resp, __) -> resp.forEach(result ->
collection.add(result.duplicate())))
- .thenApply(__ -> collection);
+ .whenComplete((resp, __) -> resp.forEach(result ->
deletableGroupResults.add(result.duplicate())))
+ .thenApply(__ -> deletableGroupResults);
});
// deleteShareGroups has its own exceptionally block, so we don't
need one here.
// This future object has the following stages:
// - First it invokes the share group delete flow where the shard
sharePartitionDeleteRequests
// method is invoked, and it returns request objects for each
valid share group passed to it.
+ // All initialized and initializing share partitions are moved to
deleting.
// - Then the requests are passed to the persister.deleteState
method one at a time. The results
// are collated as a Map of groupId -> persister errors
- // - The above map is then used to decide whether to invoke the
group coordinator delete groups logic
+ // - The above map can be used to decide whether to invoke the
group coordinator delete groups logic
// - Share groups with failed persister delete are NOT CONSIDERED
for group coordinator delete.
// TLDR: DeleteShareGroups -> filter erroneous persister deletes
-> general delete groups logic
futures.add(future);
@@ -1102,17 +1103,26 @@ public class GroupCoordinatorService implements
GroupCoordinator {
(accumulator, newResults) -> newResults.forEach(result ->
accumulator.add(result.duplicate())));
}
- private List<String> deleteCandidateGroupIds(
- Map<String, Errors> groupErrMap,
+ /**
+ * Processes input shareGroupErrMap by retaining only those which do not
contain an error.
+ * Also updates the result collection input arg with share groups
containing errors.
+ *
+ * @param shareGroupErrMap Map keyed on share groupId and value as
the error (NONE for no error).
+ * @param groupList Entire list of groups (all types)
+ * @param deletableGroupResults Collection of responses for delete groups
request.
+ * @return A list of all non-error groupIds
+ */
+ private List<String> updateResponseAndGetNonErrorGroupList(
+ Map<String, Errors> shareGroupErrMap,
List<String> groupList,
- DeleteGroupsResponseData.DeletableGroupResultCollection collection
+ DeleteGroupsResponseData.DeletableGroupResultCollection
deletableGroupResults
) {
List<String> errGroupIds = new ArrayList<>();
- groupErrMap.forEach((groupId, error) -> {
+ shareGroupErrMap.forEach((groupId, error) -> {
if (error.code() != Errors.NONE.code()) {
log.error("Error deleting share group {} due to error {}",
groupId, error);
errGroupIds.add(groupId);
- collection.add(
+ deletableGroupResults.add(
new DeleteGroupsResponseData.DeletableGroupResult()
.setGroupId(groupId)
.setErrorCode(error.code())
@@ -1153,7 +1163,7 @@ public class GroupCoordinatorService implements
GroupCoordinator {
TopicPartition topicPartition,
List<String> groupList
) {
- // topicPartition refers to internal topic __consumer_offsets
+ // topicPartition refers to internal topic __consumer_offsets.
return runtime.scheduleWriteOperation(
"delete-share-groups",
topicPartition,
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
index ded14600dab..812641c9459 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
@@ -589,18 +589,19 @@ public class GroupCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
* Method returns a Map keyed on groupId and value as pair of {@link
DeleteShareGroupStateParameters}
* and any ERRORS while building the request corresponding
* to the valid share groups passed as the input.
- * <p></p>
+ * <p>
* The groupIds are first filtered by type to restrict the list to share
groups.
* @param groupIds - A list of groupIds as string
- * @return {@link CoordinatorResult} object always containing empty
records and Map keyed on groupId and value pair (req, error)
+ * @return A result object containing a map keyed on groupId and value
pair (req, error) and related coordinator records.
*/
public CoordinatorResult<Map<String,
Map.Entry<DeleteShareGroupStateParameters, Errors>>, CoordinatorRecord>
sharePartitionDeleteRequests(List<String> groupIds) {
Map<String, Map.Entry<DeleteShareGroupStateParameters, Errors>>
responseMap = new HashMap<>();
+ List<CoordinatorRecord> records = new ArrayList<>();
for (String groupId : groupIds) {
try {
ShareGroup group = groupMetadataManager.shareGroup(groupId);
group.validateDeleteGroup();
-
groupMetadataManager.shareGroupBuildPartitionDeleteRequest(group)
+
groupMetadataManager.shareGroupBuildPartitionDeleteRequest(groupId, records)
.ifPresent(req -> responseMap.put(groupId, Map.entry(req,
Errors.NONE)));
} catch (GroupIdNotFoundException exception) {
log.debug("GroupId {} not found as a share group.", groupId);
@@ -609,7 +610,7 @@ public class GroupCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
responseMap.put(groupId,
Map.entry(DeleteShareGroupStateParameters.EMPTY_PARAMS,
Errors.forException(exception)));
}
}
- return new CoordinatorResult<>(List.of(), responseMap);
+ return new CoordinatorResult<>(records, responseMap);
}
/**
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 08e9159b02d..f8a114d999f 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -2984,7 +2984,7 @@ public class GroupMetadataManager {
groupId,
attachTopicName(finalInitializingMap),
attachTopicName(currentMap.initializedTopics()),
- Map.of()
+ attachTopicName(currentMap.deletingTopics())
)
);
}
@@ -4979,7 +4979,7 @@ public class GroupMetadataManager {
group.groupId(),
attachTopicName(finalInitializingMap),
attachTopicName(finalInitializedMap),
- Map.of()
+ attachTopicName(currentMap.deletingTopics())
)),
null
);
@@ -5025,7 +5025,7 @@ public class GroupMetadataManager {
groupId,
attachTopicName(finalInitializingTopics),
attachTopicName(info.initializedTopics()),
- Map.of()
+ attachTopicName(info.deletingTopics())
)
),
null
@@ -5057,6 +5057,13 @@ public class GroupMetadataManager {
return requests;
}
+ private Map<Uuid, String> attachTopicName(Set<Uuid> topicIds) {
+ TopicsImage topicsImage = metadataImage.topics();
+ return topicIds.stream()
+ .map(topicId -> Map.entry(topicId,
topicsImage.getTopic(topicId).name()))
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ }
+
private Map<Uuid, Map.Entry<String, Set<Integer>>>
attachTopicName(Map<Uuid, Set<Integer>> initMap) {
TopicsImage topicsImage = metadataImage.topics();
Map<Uuid, Map.Entry<String, Set<Integer>>> finalMap = new HashMap<>();
@@ -8142,39 +8149,49 @@ public class GroupMetadataManager {
/**
* Returns an optional of delete share group request object to be used
with the persister.
* Empty if no subscribed topics or if the share group is empty.
- * @param shareGroup - A share group
+ * @param shareGroupId Share group id
+ * @param records List of coordinator records to append to
* @return Optional of object representing the share group state delete
request.
*/
- public Optional<DeleteShareGroupStateParameters>
shareGroupBuildPartitionDeleteRequest(ShareGroup shareGroup) {
- TopicsImage topicsImage = metadataImage.topics();
- Set<String> subscribedTopics =
shareGroup.subscribedTopicNames().keySet();
- List<TopicData<PartitionIdData>> topicDataList = new
ArrayList<>(subscribedTopics.size());
-
- for (String topic : subscribedTopics) {
- TopicImage topicImage = topicsImage.getTopic(topic);
- topicDataList.add(
- new TopicData<>(
- topicImage.id(),
- topicImage.partitions().keySet().stream()
- .map(PartitionFactory::newPartitionIdData)
- .toList()
- )
- );
+ public Optional<DeleteShareGroupStateParameters>
shareGroupBuildPartitionDeleteRequest(String shareGroupId,
List<CoordinatorRecord> records) {
+ if (!shareGroupPartitionMetadata.containsKey(shareGroupId)) {
+ return Optional.empty();
}
- if (topicDataList.isEmpty()) {
+ Map<Uuid, Set<Integer>> deleteCandidates = mergeShareGroupInitMaps(
+ shareGroupPartitionMetadata.get(shareGroupId).initializedTopics(),
+ shareGroupPartitionMetadata.get(shareGroupId).initializingTopics()
+ );
+
+ if (deleteCandidates.isEmpty()) {
return Optional.empty();
}
- return Optional.of(
- new DeleteShareGroupStateParameters.Builder()
- .setGroupTopicPartitionData(
- new GroupTopicPartitionData.Builder<PartitionIdData>()
- .setGroupId(shareGroup.groupId())
- .setTopicsData(topicDataList)
- .build()
- )
- .build()
+ List<TopicData<PartitionIdData>> topicDataList = new
ArrayList<>(deleteCandidates.size());
+
+ for (Map.Entry<Uuid, Set<Integer>> entry :
deleteCandidates.entrySet()) {
+ topicDataList.add(new TopicData<>(
+ entry.getKey(),
+ entry.getValue().stream()
+ .map(PartitionFactory::newPartitionIdData)
+ .toList()
+ ));
+ }
+
+ // Remove all initializing and initialized topic info from record and
add deleting.
+
records.add(GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(
+ shareGroupId,
+ Map.of(),
+ Map.of(),
+ attachTopicName(deleteCandidates.keySet())
+ ));
+
+ return Optional.of(new DeleteShareGroupStateParameters.Builder()
+ .setGroupTopicPartitionData(new
GroupTopicPartitionData.Builder<PartitionIdData>()
+ .setGroupId(shareGroupId)
+ .setTopicsData(topicDataList)
+ .build())
+ .build()
);
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java
index 1e7d11f866a..b63100744c6 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java
@@ -236,6 +236,7 @@ public class ShareGroup extends
ModernGroup<ShareGroupMember> {
);
records.add(GroupCoordinatorRecordHelpers.newShareGroupSubscriptionMetadataTombstoneRecord(groupId()));
+
records.add(GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataTombstoneRecord(groupId()));
records.add(GroupCoordinatorRecordHelpers.newShareGroupEpochTombstoneRecord(groupId()));
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/Assertions.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/Assertions.java
index 227d07a8def..a593bb3d664 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/Assertions.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/Assertions.java
@@ -31,6 +31,7 @@ import
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetada
import
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
import
org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataValue;
+import
org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataValue;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.opentest4j.AssertionFailedError;
@@ -60,7 +61,8 @@ public class Assertions {
ConsumerGroupPartitionMetadataValue.class,
Assertions::assertConsumerGroupPartitionMetadataValue,
GroupMetadataValue.class, Assertions::assertGroupMetadataValue,
ConsumerGroupTargetAssignmentMemberValue.class,
Assertions::assertConsumerGroupTargetAssignmentMemberValue,
- ShareGroupPartitionMetadataValue.class,
Assertions::assertShareGroupPartitionMetadataValue
+ ShareGroupPartitionMetadataValue.class,
Assertions::assertShareGroupPartitionMetadataValue,
+ ShareGroupStatePartitionMetadataValue.class,
Assertions::assertShareGroupStatePartitionMetadataValue
);
public static void assertResponseEquals(
@@ -285,6 +287,33 @@ public class Assertions {
assertEquals(expected, actual);
}
+ private static void assertShareGroupStatePartitionMetadataValue(
+ ApiMessage exp,
+ ApiMessage act
+ ) {
+ ShareGroupStatePartitionMetadataValue expected =
(ShareGroupStatePartitionMetadataValue) exp.duplicate();
+ ShareGroupStatePartitionMetadataValue actual =
(ShareGroupStatePartitionMetadataValue) act.duplicate();
+
+ Consumer<ShareGroupStatePartitionMetadataValue> normalize = message ->
{
+
message.initializedTopics().sort(Comparator.comparing(ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo::topicId));
+ message.initializedTopics().forEach(topic -> {
+ topic.partitions().sort(Comparator.naturalOrder());
+ });
+
+
message.initializingTopics().sort(Comparator.comparing(ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo::topicId));
+ message.initializingTopics().forEach(topic -> {
+ topic.partitions().sort(Comparator.naturalOrder());
+ });
+
+
message.deletingTopics().sort(Comparator.comparing(ShareGroupStatePartitionMetadataValue.TopicInfo::topicId));
+ };
+
+ normalize.accept(expected);
+ normalize.accept(actual);
+
+ assertEquals(expected, actual);
+ }
+
private static void assertGroupMetadataValue(
ApiMessage exp,
ApiMessage act
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
index f8e0d8a70f6..16665f98bf2 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
@@ -1860,9 +1860,10 @@ public class GroupCoordinatorShardTest {
metricsShard
);
- ShareGroup shareGroup = new ShareGroup(new
SnapshotRegistry(mock(LogContext.class)), "share-group");
+ String groupId = "share-group";
+ ShareGroup shareGroup = new ShareGroup(new
SnapshotRegistry(mock(LogContext.class)), groupId);
-
when(groupMetadataManager.shareGroup(eq("share-group"))).thenReturn(shareGroup);
+
when(groupMetadataManager.shareGroup(eq(groupId))).thenReturn(shareGroup);
when(groupMetadataManager.shareGroup(eq("non-share-group"))).thenThrow(GroupIdNotFoundException.class);
TopicData<PartitionIdData> topicData = new
TopicData<>(Uuid.randomUuid(),
@@ -1873,21 +1874,20 @@ public class GroupCoordinatorShardTest {
DeleteShareGroupStateParameters params = new
DeleteShareGroupStateParameters.Builder()
.setGroupTopicPartitionData(new
GroupTopicPartitionData.Builder<PartitionIdData>()
- .setGroupId("share-group")
+ .setGroupId(groupId)
.setTopicsData(List.of(topicData))
.build())
.build();
-
when(groupMetadataManager.shareGroupBuildPartitionDeleteRequest(eq(shareGroup))).thenReturn(Optional.of(params));
+
when(groupMetadataManager.shareGroupBuildPartitionDeleteRequest(eq(groupId),
anyList())).thenReturn(Optional.of(params));
CoordinatorResult<Map<String,
Map.Entry<DeleteShareGroupStateParameters, Errors>>, CoordinatorRecord>
expectedResult =
- new CoordinatorResult<>(List.of(), Map.of("share-group",
Map.entry(params, Errors.NONE)));
+ new CoordinatorResult<>(List.of(), Map.of(groupId,
Map.entry(params, Errors.NONE)));
-
- assertEquals(expectedResult,
coordinator.sharePartitionDeleteRequests(List.of("share-group",
"non-share-group")));
- verify(groupMetadataManager, times(1)).shareGroup(eq("share-group"));
+ assertEquals(expectedResult,
coordinator.sharePartitionDeleteRequests(List.of(groupId, "non-share-group")));
+ verify(groupMetadataManager, times(1)).shareGroup(eq(groupId));
verify(groupMetadataManager,
times(1)).shareGroup(eq("non-share-group"));
- verify(groupMetadataManager,
times(1)).shareGroupBuildPartitionDeleteRequest(eq(shareGroup));
+ verify(groupMetadataManager,
times(1)).shareGroupBuildPartitionDeleteRequest(eq(groupId), anyList());
// empty list
Mockito.reset(groupMetadataManager);
@@ -1897,9 +1897,9 @@ public class GroupCoordinatorShardTest {
coordinator.sharePartitionDeleteRequests(List.of())
);
- verify(groupMetadataManager, times(0)).group(eq("share-group"));
+ verify(groupMetadataManager, times(0)).group(eq(groupId));
verify(groupMetadataManager, times(0)).group(eq("non-share-group"));
- verify(groupMetadataManager,
times(0)).shareGroupBuildPartitionDeleteRequest(eq(shareGroup));
+ verify(groupMetadataManager,
times(0)).shareGroupBuildPartitionDeleteRequest(eq(groupId), anyList());
}
@Test
@@ -1919,19 +1919,24 @@ public class GroupCoordinatorShardTest {
metricsShard
);
+ String groupId = "share-group";
ShareGroup shareGroup = mock(ShareGroup.class);
doThrow(new GroupNotEmptyException("bad
stuff")).when(shareGroup).validateDeleteGroup();
-
when(groupMetadataManager.shareGroup(eq("share-group"))).thenReturn(shareGroup);
+
when(groupMetadataManager.shareGroup(eq(groupId))).thenReturn(shareGroup);
CoordinatorResult<Map<String,
Map.Entry<DeleteShareGroupStateParameters, Errors>>, CoordinatorRecord>
expectedResult =
- new CoordinatorResult<>(List.of(), Map.of("share-group",
Map.entry(DeleteShareGroupStateParameters.EMPTY_PARAMS,
- Errors.forException(new GroupNotEmptyException("bad stuff")))
- ));
- assertEquals(expectedResult,
coordinator.sharePartitionDeleteRequests(List.of("share-group")));
- verify(groupMetadataManager, times(1)).shareGroup(eq("share-group"));
+ new CoordinatorResult<>(
+ List.of(),
+ Map.of(
+ groupId,
+ Map.entry(DeleteShareGroupStateParameters.EMPTY_PARAMS,
Errors.forException(new GroupNotEmptyException("bad stuff")))
+ )
+ );
+ assertEquals(expectedResult,
coordinator.sharePartitionDeleteRequests(List.of(groupId)));
+ verify(groupMetadataManager, times(1)).shareGroup(eq(groupId));
// Not called because of NON-EMPTY group.
- verify(groupMetadataManager,
times(0)).shareGroupBuildPartitionDeleteRequest(eq(shareGroup));
+ verify(groupMetadataManager,
times(0)).shareGroupBuildPartitionDeleteRequest(eq(groupId), anyList());
// empty list
Mockito.reset(groupMetadataManager);
@@ -1942,6 +1947,6 @@ public class GroupCoordinatorShardTest {
);
verify(groupMetadataManager, times(0)).group(eq("share-group"));
- verify(groupMetadataManager,
times(0)).shareGroupBuildPartitionDeleteRequest(eq(shareGroup));
+ verify(groupMetadataManager,
times(0)).shareGroupBuildPartitionDeleteRequest(eq(groupId), anyList());
}
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 95e247beef7..35574d35ca3 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -99,7 +99,6 @@ import
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
import org.apache.kafka.coordinator.group.modern.Assignment;
import org.apache.kafka.coordinator.group.modern.MemberAssignmentImpl;
import org.apache.kafka.coordinator.group.modern.MemberState;
-import org.apache.kafka.coordinator.group.modern.SubscriptionCount;
import org.apache.kafka.coordinator.group.modern.TopicMetadata;
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup;
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupBuilder;
@@ -128,9 +127,7 @@ import org.apache.kafka.server.authorizer.Action;
import org.apache.kafka.server.authorizer.AuthorizationResult;
import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.server.share.persister.DeleteShareGroupStateParameters;
-import org.apache.kafka.server.share.persister.GroupTopicPartitionData;
import
org.apache.kafka.server.share.persister.InitializeShareGroupStateParameters;
-import org.apache.kafka.server.share.persister.PartitionFactory;
import org.apache.kafka.server.share.persister.PartitionIdData;
import org.apache.kafka.server.share.persister.PartitionStateData;
import org.apache.kafka.server.share.persister.TopicData;
@@ -147,7 +144,6 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
@@ -209,7 +205,6 @@ import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -15421,7 +15416,7 @@ public class GroupMetadataManagerTest {
}
@Test
- public void testShareGroupDelete() {
+ public void testShareGroupDeleteTombstones() {
String groupId = "share-group-id";
GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
.withShareGroup(new ShareGroupBuilder(groupId, 10))
@@ -15430,6 +15425,7 @@ public class GroupMetadataManagerTest {
List<CoordinatorRecord> expectedRecords = List.of(
GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentEpochTombstoneRecord(groupId),
GroupCoordinatorRecordHelpers.newShareGroupSubscriptionMetadataTombstoneRecord(groupId),
+
GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataTombstoneRecord(groupId),
GroupCoordinatorRecordHelpers.newShareGroupEpochTombstoneRecord(groupId)
);
List<CoordinatorRecord> records = new ArrayList<>();
@@ -20745,7 +20741,7 @@ public class GroupMetadataManagerTest {
}
@Test
- public void testSharePartitionDeleteRequest() {
+ public void testShareGroupDeleteRequest() {
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
assignor.prepareGroupAssignment(new GroupAssignment(Map.of()));
GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
@@ -20754,38 +20750,58 @@ public class GroupMetadataManagerTest {
Uuid t1Uuid = Uuid.randomUuid();
Uuid t2Uuid = Uuid.randomUuid();
- MetadataImage image = spy(new MetadataImageBuilder()
- .addTopic(t1Uuid, "t1", 2)
- .addTopic(t2Uuid, "t2", 2)
- .build());
-
- context.groupMetadataManager.onNewMetadataImage(image,
mock(MetadataDelta.class));
+ String t1Name = "t1";
+ String t2Name = "t2";
+ String groupId = "share-group";
ShareGroup shareGroup = mock(ShareGroup.class);
- Map<String, SubscriptionCount> topicMap = new LinkedHashMap<>();
- topicMap.put("t1", mock(SubscriptionCount.class));
- topicMap.put("t2", mock(SubscriptionCount.class));
- when(shareGroup.subscribedTopicNames()).thenReturn(topicMap);
- when(shareGroup.groupId()).thenReturn("share-group");
+ when(shareGroup.groupId()).thenReturn(groupId);
when(shareGroup.isEmpty()).thenReturn(false);
- DeleteShareGroupStateParameters expectedParameters = new
DeleteShareGroupStateParameters.Builder()
- .setGroupTopicPartitionData(new
GroupTopicPartitionData.Builder<PartitionIdData>()
- .setGroupId("share-group")
- .setTopicsData(List.of(
- new TopicData<>(t1Uuid,
List.of(PartitionFactory.newPartitionIdData(0),
PartitionFactory.newPartitionIdData(1))),
- new TopicData<>(t2Uuid,
List.of(PartitionFactory.newPartitionIdData(0),
PartitionFactory.newPartitionIdData(1)))
- ))
- .build()
- )
+ MetadataImage image = new MetadataImageBuilder()
+ .addTopic(t1Uuid, t1Name, 2)
+ .addTopic(t2Uuid, t2Name, 2)
.build();
- Optional<DeleteShareGroupStateParameters> params =
context.groupMetadataManager.shareGroupBuildPartitionDeleteRequest(shareGroup);
- assertTrue(params.isPresent());
- assertEquals(expectedParameters.groupTopicPartitionData(),
params.get().groupTopicPartitionData());
- verify(image, times(1)).topics();
- verify(shareGroup, times(1)).subscribedTopicNames();
- verify(shareGroup, times(1)).groupId();
+ MetadataDelta delta = new MetadataDelta(image);
+ context.groupMetadataManager.onNewMetadataImage(image, delta);
+
+
context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId,
0));
+
+ context.replay(
+
GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(
+ groupId,
+ Map.of(t1Uuid, Map.entry(t1Name, Set.of(0, 1))),
+ Map.of(t2Uuid, Map.entry(t2Name, Set.of(0, 1))),
+ Map.of()
+ )
+ );
+
+ context.commit();
+
+ Map<Uuid, Set<Integer>> expectedTopicPartitionMap = Map.of(
+ t1Uuid, Set.of(0, 1),
+ t2Uuid, Set.of(0, 1)
+ );
+
+ List<CoordinatorRecord> expectedRecords = List.of(
+ newShareGroupStatePartitionMetadataRecord(
+ groupId,
+ Map.of(),
+ Map.of(),
+ Map.of(t1Uuid, t1Name, t2Uuid, t2Name)
+ )
+ );
+
+ List<CoordinatorRecord> records = new ArrayList<>();
+ Optional<DeleteShareGroupStateParameters> params =
context.groupMetadataManager.shareGroupBuildPartitionDeleteRequest(groupId,
records);
+ verifyShareGroupDeleteRequest(
+ params,
+ expectedTopicPartitionMap,
+ groupId,
+ true
+ );
+ assertRecordsEquals(expectedRecords, records);
}
@Test
@@ -20844,14 +20860,15 @@ public class GroupMetadataManagerTest {
new ShareGroupStatePartitionMetadataKey()
.setGroupId(groupId),
new ShareGroupStatePartitionMetadataValue()
+ .setInitializingTopics(List.of())
.setInitializedTopics(List.of(
new
ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
.setTopicId(t1Uuid)
- .setTopicName("t1")
+ .setTopicName(t1Name)
.setPartitions(List.of(0, 1)),
new
ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
.setTopicId(t2Uuid)
- .setTopicName("t2")
+ .setTopicName(t2Name)
.setPartitions(List.of(0, 1))
))
.setDeletingTopics(List.of())
@@ -21292,4 +21309,25 @@ public class GroupMetadataManagerTest {
assertTrue(initRequest.isEmpty());
}
}
+
+ private void verifyShareGroupDeleteRequest(
+ Optional<DeleteShareGroupStateParameters> deleteRequest,
+ Map<Uuid, Set<Integer>> expectedTopicPartitionsMap,
+ String groupId,
+ boolean shouldExist
+ ) {
+ if (shouldExist) {
+ assertTrue(deleteRequest.isPresent());
+ DeleteShareGroupStateParameters request = deleteRequest.get();
+ assertEquals(groupId, request.groupTopicPartitionData().groupId());
+ Map<Uuid, Set<Integer>> actualTopicPartitionsMap = new HashMap<>();
+ for (TopicData<PartitionIdData> topicData :
request.groupTopicPartitionData().topicsData()) {
+ actualTopicPartitionsMap.computeIfAbsent(topicData.topicId(),
k -> new HashSet<>())
+
.addAll(topicData.partitions().stream().map(PartitionIdData::partition).toList());
+ }
+ assertEquals(expectedTopicPartitionsMap, actualTopicPartitionsMap);
+ } else {
+ assertTrue(deleteRequest.isEmpty());
+ }
+ }
}
diff --git
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
index e0118ee2499..35891bddf8d 100644
---
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
+++
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
@@ -515,6 +515,16 @@ public class ShareCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
DeleteShareGroupStateRequestData.PartitionData partitionData =
topicData.partitions().get(0);
SharePartitionKey key =
SharePartitionKey.getInstance(request.groupId(), topicData.topicId(),
partitionData.partition());
+ if (!shareStateMap.containsKey(key)) {
+ log.warn("Attempted to delete non-existent share partition {}.",
key);
+ return new CoordinatorResult<>(List.of(), new
DeleteShareGroupStateResponseData().setResults(
+
List.of(DeleteShareGroupStateResponse.toResponseDeleteStateResult(key.topicId(),
+
List.of(DeleteShareGroupStateResponse.toResponsePartitionResult(
+ key.partition()))
+ ))
+ ));
+ }
+
CoordinatorRecord record = generateTombstoneRecord(key);
// build successful response if record is correctly created
DeleteShareGroupStateResponseData responseData = new
DeleteShareGroupStateResponseData().setResults(
diff --git
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
index 0426c4e5005..20be20832d8 100644
---
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
+++
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
@@ -994,9 +994,7 @@ class ShareCoordinatorShardTest {
.setPartitions(List.of(new
DeleteShareGroupStateRequestData.PartitionData()
.setPartition(PARTITION)))));
- CoordinatorResult<DeleteShareGroupStateResponseData,
CoordinatorRecord> result = shard.deleteState(request);
-
- // apply a record in to verify delete
+ // Apply a record to the state machine so that delete can be verified.
CoordinatorRecord record =
ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
GROUP_ID,
TOPIC_ID,
@@ -1021,7 +1019,9 @@ class ShareCoordinatorShardTest {
assertNotNull(shard.getLeaderMapValue(shareCoordinatorKey));
assertNotNull(shard.getStateEpochMapValue(shareCoordinatorKey));
- // apply tombstone
+ CoordinatorResult<DeleteShareGroupStateResponseData,
CoordinatorRecord> result = shard.deleteState(request);
+
+ // Apply tombstone.
shard.replay(0L, 0L, (short) 0, result.records().get(0));
DeleteShareGroupStateResponseData expectedData =
DeleteShareGroupStateResponse.toResponseData(TOPIC_ID, PARTITION);
@@ -1039,7 +1039,7 @@ class ShareCoordinatorShardTest {
}
@Test
- public void testDeleteStateFirstRecordDeleteSuccess() {
+ public void testDeleteStateUnintializedRecord() {
ShareCoordinatorShard shard = new
ShareCoordinatorShardBuilder().build();
SharePartitionKey shareCoordinatorKey =
SharePartitionKey.getInstance(GROUP_ID, TOPIC_ID, PARTITION);
@@ -1057,21 +1057,10 @@ class ShareCoordinatorShardTest {
assertNull(shard.getLeaderMapValue(shareCoordinatorKey));
assertNull(shard.getStateEpochMapValue(shareCoordinatorKey));
- // apply tombstone
- shard.replay(0L, 0L, (short) 0, result.records().get(0));
-
DeleteShareGroupStateResponseData expectedData =
DeleteShareGroupStateResponse.toResponseData(TOPIC_ID, PARTITION);
- List<CoordinatorRecord> expectedRecords = List.of(
- ShareCoordinatorRecordHelpers.newShareStateTombstoneRecord(
- GROUP_ID, TOPIC_ID, PARTITION)
- );
assertEquals(expectedData, result.response());
- assertEquals(expectedRecords, result.records());
-
- assertNull(shard.getShareStateMapValue(shareCoordinatorKey));
- assertNull(shard.getLeaderMapValue(shareCoordinatorKey));
- assertNull(shard.getStateEpochMapValue(shareCoordinatorKey));
+ assertEquals(List.of(), result.records());
}
@Test