This is an automated email from the ASF dual-hosted git repository. schofielaj pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new c58de757128 KAFKA-19204: Add timestamp to share state metadata init maps [1/N] (#19781) c58de757128 is described below commit c58de7571289f05a734f9fe7e197b23c004803d3 Author: Sushant Mahajan <smaha...@confluent.io> AuthorDate: Fri May 23 13:26:05 2025 +0530 KAFKA-19204: Add timestamp to share state metadata init maps [1/N] (#19781) 1. Currently, the code allows for retrying any initializing topics in subsequent heartbeats. This can result in duplicate calls to persister if multiple share consumers join the same group concurrently. Furthermore, only one of these will succeed as the others will have a lower state epoch and will be fenced. 2. The existing change was made in https://github.com/apache/kafka/pull/19603 to allow for retrying initialization of initializing topics, in case the original caller was not able to persist the information in the persister due to a dead broker/timeout. 3. To prevent multiple calls as well as allow for retry we have supplemented the timelinehashmap holding the `ShareGroupStatePartitionMetadataInfo` to also hold the timestamp at which this record gets replayed. a. Now when we get multiple consumers for the same group and topic, only one of them is allowed to make the persister initialize request and this information is added to the map when it is replayed. Thus solving issue 1. b. To allow for retries, if an initializing topic is found with a timestamp which is older than 2*offset_write_commit_ms, that topic will be allowed to be retried. Here too only one consumer would be able to retry thus resolving issue 2 as well. 4. Tests have been added wherever applicable and existing ones updated. 5. No record schema changes are involved. 6. The `ShareGroupStatePartitionMetadataInfo` and `InitMapValue` records have been moved to the `ShareGroup` class for better encapsulation. 7. Some logs have been changed from error to info in `ShareCoordinatorShard` and extra information is logged. Reviewers: Andrew Schofield <aschofi...@confluent.io> --- .../group/GroupCoordinatorRecordHelpers.java | 13 +- .../coordinator/group/GroupMetadataManager.java | 239 ++++++++++++--------- .../coordinator/group/modern/share/ShareGroup.java | 31 +++ .../group/GroupCoordinatorRecordHelpersTest.java | 3 +- .../group/GroupMetadataManagerTest.java | 190 +++++++++++----- .../group/GroupMetadataManagerTestContext.java | 7 +- .../coordinator/share/ShareCoordinatorShard.java | 8 +- 7 files changed, 328 insertions(+), 163 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java index ad17bfe34f2..288e8d3abe1 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java @@ -56,6 +56,7 @@ import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMe import org.apache.kafka.coordinator.group.modern.TopicMetadata; import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember; import org.apache.kafka.coordinator.group.modern.consumer.ResolvedRegularExpression; +import org.apache.kafka.coordinator.group.modern.share.ShareGroup.InitMapValue; import org.apache.kafka.coordinator.group.modern.share.ShareGroupMember; import org.apache.kafka.server.common.ApiMessageAndVersion; @@ -844,22 +845,22 @@ public class GroupCoordinatorRecordHelpers { */ public static CoordinatorRecord newShareGroupStatePartitionMetadataRecord( String groupId, - Map<Uuid, Map.Entry<String, Set<Integer>>> initializingTopics, - Map<Uuid, Map.Entry<String, Set<Integer>>> initializedTopics, + Map<Uuid, InitMapValue> initializingTopics, + Map<Uuid, InitMapValue> initializedTopics, Map<Uuid, String> deletingTopics ) { List<ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo> initializingTopicPartitionInfo = initializingTopics.entrySet().stream() .map(entry -> new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo() .setTopicId(entry.getKey()) - .setTopicName(entry.getValue().getKey()) - .setPartitions(entry.getValue().getValue().stream().toList())) + .setTopicName(entry.getValue().name()) + .setPartitions(entry.getValue().partitions().stream().toList())) .toList(); List<ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo> initializedTopicPartitionInfo = initializedTopics.entrySet().stream() .map(entry -> new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo() .setTopicId(entry.getKey()) - .setTopicName(entry.getValue().getKey()) - .setPartitions(entry.getValue().getValue().stream().toList())) + .setTopicName(entry.getValue().name()) + .setPartitions(entry.getValue().partitions().stream().toList())) .toList(); List<ShareGroupStatePartitionMetadataValue.TopicInfo> deletingTopicsInfo = deletingTopics.entrySet().stream() diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 5412b2ab87e..4312cb47b09 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -147,6 +147,8 @@ import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember; import org.apache.kafka.coordinator.group.modern.consumer.CurrentAssignmentBuilder; import org.apache.kafka.coordinator.group.modern.consumer.ResolvedRegularExpression; import org.apache.kafka.coordinator.group.modern.share.ShareGroup; +import org.apache.kafka.coordinator.group.modern.share.ShareGroup.InitMapValue; +import org.apache.kafka.coordinator.group.modern.share.ShareGroup.ShareGroupStatePartitionMetadataInfo; import org.apache.kafka.coordinator.group.modern.share.ShareGroupAssignmentBuilder; import org.apache.kafka.coordinator.group.modern.share.ShareGroupMember; import org.apache.kafka.coordinator.group.streams.StreamsGroup; @@ -469,7 +471,7 @@ public class GroupMetadataManager { /** * The share group partition metadata info keyed by group id. */ - private final TimelineHashMap<String, ShareGroupStatePartitionMetadataInfo> shareGroupPartitionMetadata; + private final TimelineHashMap<String, ShareGroupStatePartitionMetadataInfo> shareGroupStatePartitionMetadata; /** * The group manager. @@ -506,20 +508,6 @@ public class GroupMetadataManager { */ private final ShareGroupPartitionAssignor shareGroupAssignor; - /** - * A record class to hold the value representing ShareGroupStatePartitionMetadata for the TimelineHashmap - * keyed on share group id. - * - * @param initializedTopics Map of set of partition ids keyed on the topic id. - * @param deletingTopics Set of topic ids. - */ - private record ShareGroupStatePartitionMetadataInfo( - Map<Uuid, Set<Integer>> initializingTopics, - Map<Uuid, Set<Integer>> initializedTopics, - Set<Uuid> deletingTopics - ) { - } - /** * The authorizer to validate the regex subscription topics. */ @@ -555,7 +543,7 @@ public class GroupMetadataManager { this.defaultConsumerGroupAssignor = config.consumerGroupAssignors().get(0); this.groups = new TimelineHashMap<>(snapshotRegistry, 0); this.groupsByTopics = new TimelineHashMap<>(snapshotRegistry, 0); - this.shareGroupPartitionMetadata = new TimelineHashMap<>(snapshotRegistry, 0); + this.shareGroupStatePartitionMetadata = new TimelineHashMap<>(snapshotRegistry, 0); this.groupConfigManager = groupConfigManager; this.shareGroupAssignor = shareGroupAssignor; this.authorizerPlugin = authorizerPlugin; @@ -2642,7 +2630,7 @@ public class GroupMetadataManager { } private boolean initializedAssignmentPending(ShareGroup group) { - if (!shareGroupPartitionMetadata.containsKey(group.groupId())) { + if (!shareGroupStatePartitionMetadata.containsKey(group.groupId())) { // No initialized share partitions for the group so nothing can be assigned. return false; } @@ -2653,7 +2641,7 @@ public class GroupMetadataManager { } // We need to check if all the group initialized share partitions are part of the group assignment. - Map<Uuid, Set<Integer>> initializedTps = shareGroupPartitionMetadata.get(group.groupId()).initializedTopics(); + Map<Uuid, Set<Integer>> initializedTps = stripInitValue(shareGroupStatePartitionMetadata.get(group.groupId()).initializedTopics()); Map<Uuid, Set<Integer>> currentAssigned = new HashMap<>(); for (Assignment assignment : group.targetAssignment().values()) { for (Map.Entry<Uuid, Set<Integer>> tps : assignment.partitions().entrySet()) { @@ -2674,27 +2662,38 @@ public class GroupMetadataManager { * @return A map of topic partitions which are subscribed by the share group but not initialized yet. */ // Visibility for testing - Map<Uuid, Set<Integer>> subscribedTopicsChangeMap(String groupId, Map<String, TopicMetadata> subscriptionMetadata) { + Map<Uuid, InitMapValue> subscribedTopicsChangeMap(String groupId, Map<String, TopicMetadata> subscriptionMetadata) { if (subscriptionMetadata == null || subscriptionMetadata.isEmpty()) { return Map.of(); } - Map<Uuid, Set<Integer>> topicPartitionChangeMap = new HashMap<>(); - ShareGroupStatePartitionMetadataInfo info = shareGroupPartitionMetadata.get(groupId); - - // We are only considering initialized TPs here. This is because it could happen - // that some topics have been moved to initializing but the corresponding persister request - // could not be made/failed (invoked by the group coordinator). Then there would be no way to try - // the persister call. This way we get the opportunity to retry. - Map<Uuid, Set<Integer>> alreadyInitialized = info == null ? new HashMap<>() : info.initializedTopics(); + Map<Uuid, InitMapValue> topicPartitionChangeMap = new HashMap<>(); + ShareGroupStatePartitionMetadataInfo info = shareGroupStatePartitionMetadata.get(groupId); + + // We must only consider initializing topics whose timestamp is fresher than delta elapsed. + // Any initializing topics which are older than delta and are part of the subscribed topics + // must be returned so that they can be retried. + long curTimestamp = time.milliseconds(); + long delta = config.offsetCommitTimeoutMs() * 2L; + Map<Uuid, InitMapValue> alreadyInitialized = info == null ? new HashMap<>() : + combineInitMaps( + info.initializedTopics(), + info.initializingTopics().entrySet().stream() + .filter(entry -> curTimestamp - entry.getValue().timestamp() < delta) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)) + ); + // Here will add any topics which are subscribed but not initialized and initializing + // topics whose timestamp indicates that they are older than delta elapsed. subscriptionMetadata.forEach((topicName, topicMetadata) -> { - Set<Integer> alreadyInitializedPartSet = alreadyInitialized.getOrDefault(topicMetadata.id(), Set.of()); + Set<Integer> alreadyInitializedPartSet = alreadyInitialized.containsKey(topicMetadata.id()) ? alreadyInitialized.get(topicMetadata.id()).partitions() : Set.of(); if (alreadyInitializedPartSet.isEmpty() || alreadyInitializedPartSet.size() < topicMetadata.numPartitions()) { Set<Integer> partitionSet = IntStream.range(0, topicMetadata.numPartitions()).boxed().collect(Collectors.toSet()); partitionSet.removeAll(alreadyInitializedPartSet); - - topicPartitionChangeMap.computeIfAbsent(topicMetadata.id(), k -> partitionSet); + // alreadyInitialized contains all initialized topics and initializing topics which are less than delta old + // which means we are putting subscribed topics which are unseen or initializing for more than delta. But, we + // are also updating the timestamp here which means, old initializing will not be included repeatedly. + topicPartitionChangeMap.computeIfAbsent(topicMetadata.id(), k -> new InitMapValue(topicMetadata.name(), partitionSet, curTimestamp)); } }); return topicPartitionChangeMap; @@ -2719,7 +2718,7 @@ public class GroupMetadataManager { return Optional.empty(); } - Map<Uuid, Set<Integer>> topicPartitionChangeMap = subscribedTopicsChangeMap(groupId, subscriptionMetadata); + Map<Uuid, InitMapValue> topicPartitionChangeMap = subscribedTopicsChangeMap(groupId, subscriptionMetadata); // Nothing to initialize. if (topicPartitionChangeMap.isEmpty()) { @@ -2730,12 +2729,12 @@ public class GroupMetadataManager { return Optional.of(buildInitializeShareGroupStateRequest(groupId, groupEpoch, topicPartitionChangeMap)); } - private InitializeShareGroupStateParameters buildInitializeShareGroupStateRequest(String groupId, int groupEpoch, Map<Uuid, Set<Integer>> topicPartitions) { + private InitializeShareGroupStateParameters buildInitializeShareGroupStateRequest(String groupId, int groupEpoch, Map<Uuid, InitMapValue> topicPartitions) { return new InitializeShareGroupStateParameters.Builder().setGroupTopicPartitionData( new GroupTopicPartitionData<>(groupId, topicPartitions.entrySet().stream() .map(entry -> new TopicData<>( entry.getKey(), - entry.getValue().stream() + entry.getValue().partitions().stream() .map(partitionId -> PartitionFactory.newPartitionStateData(partitionId, groupEpoch, -1)) .toList()) ).toList() @@ -2743,19 +2742,19 @@ public class GroupMetadataManager { } // Visibility for tests - void addInitializingTopicsRecords(String groupId, List<CoordinatorRecord> records, Map<Uuid, Set<Integer>> topicPartitionMap) { + void addInitializingTopicsRecords(String groupId, List<CoordinatorRecord> records, Map<Uuid, InitMapValue> topicPartitionMap) { if (topicPartitionMap == null || topicPartitionMap.isEmpty()) { return; } - ShareGroupStatePartitionMetadataInfo currentMap = shareGroupPartitionMetadata.get(groupId); + ShareGroupStatePartitionMetadataInfo currentMap = shareGroupStatePartitionMetadata.get(groupId); if (currentMap == null) { - records.add(newShareGroupStatePartitionMetadataRecord(groupId, attachTopicName(topicPartitionMap), Map.of(), Map.of())); + records.add(newShareGroupStatePartitionMetadataRecord(groupId, topicPartitionMap, Map.of(), Map.of())); return; } // We must combine the existing information in the record with the topicPartitionMap argument. - Map<Uuid, Set<Integer>> finalInitializingMap = mergeShareGroupInitMaps(currentMap.initializingTopics(), topicPartitionMap); + Map<Uuid, InitMapValue> finalInitializingMap = combineInitMaps(currentMap.initializingTopics(), topicPartitionMap); // If any initializing topics are also present in the deleting state // we should remove them from deleting. @@ -2771,33 +2770,50 @@ public class GroupMetadataManager { records.add( newShareGroupStatePartitionMetadataRecord( groupId, - attachTopicName(finalInitializingMap), - attachTopicName(currentMap.initializedTopics()), + finalInitializingMap, + currentMap.initializedTopics(), attachTopicName(currentDeleting) ) ); } - // Visibility for tests - static Map<Uuid, Set<Integer>> mergeShareGroupInitMaps( - Map<Uuid, Set<Integer>> existingShareGroupInitMap, - Map<Uuid, Set<Integer>> newShareGroupInitMap + // Visibility for testing + static Map<Uuid, InitMapValue> combineInitMaps( + Map<Uuid, InitMapValue> initialized, + Map<Uuid, InitMapValue> initializing ) { - Map<Uuid, Set<Integer>> finalInitMap = new HashMap<>(); - Set<Uuid> combinedTopicIdSet = new HashSet<>(existingShareGroupInitMap.keySet()); - combinedTopicIdSet.addAll(newShareGroupInitMap.keySet()); + Map<Uuid, InitMapValue> finalInitMap = new HashMap<>(); + Set<Uuid> combinedTopicIdSet = new HashSet<>(initialized.keySet()); + + Set<Uuid> initializingSet = initializing.keySet(); + + combinedTopicIdSet.addAll(initializingSet); for (Uuid topicId : combinedTopicIdSet) { - Set<Integer> partitions = new HashSet<>(existingShareGroupInitMap.getOrDefault(topicId, new HashSet<>())); - if (newShareGroupInitMap.containsKey(topicId)) { - partitions.addAll(newShareGroupInitMap.get(topicId)); + Set<Integer> initializedPartitions = initialized.containsKey(topicId) ? initialized.get(topicId).partitions() : new HashSet<>(); + long timestamp = initialized.containsKey(topicId) ? initialized.get(topicId).timestamp() : -1; + String name = initialized.containsKey(topicId) ? initialized.get(topicId).name() : "UNKNOWN"; + + Set<Integer> finalPartitions = new HashSet<>(initializedPartitions); + if (initializingSet.contains(topicId)) { + finalPartitions.addAll(initializing.get(topicId).partitions()); + timestamp = initializing.get(topicId).timestamp(); + name = initializing.get(topicId).name(); } - finalInitMap.putIfAbsent(topicId, partitions); + finalInitMap.putIfAbsent(topicId, new InitMapValue(name, finalPartitions, timestamp)); } return finalInitMap; } + static Map<Uuid, Set<Integer>> stripInitValue( + Map<Uuid, InitMapValue> initMap + ) { + return initMap.entrySet().stream() + .map(entry -> Map.entry(entry.getKey(), entry.getValue().partitions())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + /** * Gets or subscribes a new dynamic consumer group member. * @@ -3707,8 +3723,8 @@ public class GroupMetadataManager { List<CoordinatorRecord> records ) { try { - Map<Uuid, Set<Integer>> initializedTopicPartitions = shareGroupPartitionMetadata.containsKey(group.groupId()) ? - shareGroupPartitionMetadata.get(group.groupId()).initializedTopics() : + Map<Uuid, Set<Integer>> initializedTopicPartitions = shareGroupStatePartitionMetadata.containsKey(group.groupId()) ? + stripInitValue(shareGroupStatePartitionMetadata.get(group.groupId()).initializedTopics()) : Map.of(); TargetAssignmentBuilder.ShareTargetAssignmentBuilder assignmentResultBuilder = @@ -4729,26 +4745,28 @@ public class GroupMetadataManager { } ShareGroup group = (ShareGroup) groups.get(groupId); - ShareGroupStatePartitionMetadataInfo currentMap = shareGroupPartitionMetadata.get(groupId); + ShareGroupStatePartitionMetadataInfo currentMap = shareGroupStatePartitionMetadata.get(groupId); + Map<Uuid, InitMapValue> enrichedTopicPartitionMap = attachInitValue(topicPartitionMap); if (currentMap == null) { return new CoordinatorResult<>( - List.of(newShareGroupStatePartitionMetadataRecord(group.groupId(), Map.of(), attachTopicName(topicPartitionMap), Map.of())), + List.of(newShareGroupStatePartitionMetadataRecord(group.groupId(), Map.of(), enrichedTopicPartitionMap, Map.of())), null ); } // We must combine the existing information in the record with the topicPartitionMap argument so that the final // record has up-to-date information. - Map<Uuid, Set<Integer>> finalInitializedMap = mergeShareGroupInitMaps(currentMap.initializedTopics(), topicPartitionMap); + Map<Uuid, InitMapValue> finalInitializedMap = combineInitMaps(currentMap.initializedTopics(), enrichedTopicPartitionMap); // Fetch initializing info from state metadata. - Map<Uuid, Set<Integer>> finalInitializingMap = new HashMap<>(currentMap.initializingTopics()); + Map<Uuid, InitMapValue> finalInitializingMap = new HashMap<>(); + currentMap.initializingTopics().forEach((k, v) -> finalInitializingMap.put(k, new InitMapValue(v.name(), new HashSet<>(v.partitions()), v.timestamp()))); // Remove any entries which are already initialized. for (Map.Entry<Uuid, Set<Integer>> entry : topicPartitionMap.entrySet()) { Uuid topicId = entry.getKey(); if (finalInitializingMap.containsKey(topicId)) { - Set<Integer> partitions = finalInitializingMap.get(topicId); + Set<Integer> partitions = finalInitializingMap.get(topicId).partitions(); partitions.removeAll(entry.getValue()); if (partitions.isEmpty()) { finalInitializingMap.remove(topicId); @@ -4759,8 +4777,8 @@ public class GroupMetadataManager { return new CoordinatorResult<>(List.of( newShareGroupStatePartitionMetadataRecord( group.groupId(), - attachTopicName(finalInitializingMap), - attachTopicName(finalInitializedMap), + finalInitializingMap, + finalInitializedMap, attachTopicName(currentMap.deletingTopics()) )), null @@ -4779,24 +4797,24 @@ public class GroupMetadataManager { String groupId, Map<Uuid, Set<Integer>> topicPartitionMap ) { - ShareGroupStatePartitionMetadataInfo info = shareGroupPartitionMetadata.get(groupId); + ShareGroupStatePartitionMetadataInfo info = shareGroupStatePartitionMetadata.get(groupId); if (info == null || info.initializingTopics().isEmpty() || topicPartitionMap.isEmpty()) { return new CoordinatorResult<>(List.of(), null); } - Map<Uuid, Set<Integer>> initializingTopics = info.initializingTopics(); - Map<Uuid, Set<Integer>> finalInitializingTopics = new HashMap<>(); + Map<Uuid, InitMapValue> initializingTopics = info.initializingTopics(); + Map<Uuid, InitMapValue> finalInitializingTopics = new HashMap<>(); - for (Map.Entry<Uuid, Set<Integer>> entry : initializingTopics.entrySet()) { + for (Map.Entry<Uuid, InitMapValue> entry : initializingTopics.entrySet()) { Uuid topicId = entry.getKey(); // If topicId to clean is not present in topicPartitionMap map, retain it. if (!topicPartitionMap.containsKey(topicId)) { finalInitializingTopics.put(entry.getKey(), entry.getValue()); } else { - Set<Integer> partitions = new HashSet<>(entry.getValue()); + Set<Integer> partitions = new HashSet<>(entry.getValue().partitions()); partitions.removeAll(topicPartitionMap.get(topicId)); if (!partitions.isEmpty()) { - finalInitializingTopics.put(entry.getKey(), partitions); + finalInitializingTopics.put(entry.getKey(), new InitMapValue(entry.getValue().name(), partitions, entry.getValue().timestamp())); } } } @@ -4805,8 +4823,8 @@ public class GroupMetadataManager { List.of( newShareGroupStatePartitionMetadataRecord( groupId, - attachTopicName(finalInitializingTopics), - attachTopicName(info.initializedTopics()), + finalInitializingTopics, + info.initializedTopics(), attachTopicName(info.deletingTopics()) ) ), @@ -4825,14 +4843,15 @@ public class GroupMetadataManager { return Collections.unmodifiableMap(finalMap); } - private Map<Uuid, Map.Entry<String, Set<Integer>>> attachTopicName(Map<Uuid, Set<Integer>> initMap) { + private Map<Uuid, InitMapValue> attachInitValue(Map<Uuid, Set<Integer>> initMap) { TopicsImage topicsImage = metadataImage.topics(); - Map<Uuid, Map.Entry<String, Set<Integer>>> finalMap = new HashMap<>(); + Map<Uuid, InitMapValue> finalMap = new HashMap<>(); + long timestamp = time.milliseconds(); for (Map.Entry<Uuid, Set<Integer>> entry : initMap.entrySet()) { Uuid topicId = entry.getKey(); TopicImage topicImage = topicsImage.getTopic(topicId); String topicName = (topicImage != null) ? topicImage.name() : "<UNKNOWN>"; - finalMap.put(topicId, Map.entry(topicName, entry.getValue())); + finalMap.put(topicId, new InitMapValue(topicName, entry.getValue(), timestamp)); } return Collections.unmodifiableMap(finalMap); } @@ -4849,10 +4868,10 @@ public class GroupMetadataManager { ) { Map<Uuid, Set<Integer>> resultMap = new HashMap<>(); - ShareGroupStatePartitionMetadataInfo currentMap = shareGroupPartitionMetadata.get(groupId); + ShareGroupStatePartitionMetadataInfo currentMap = shareGroupStatePartitionMetadata.get(groupId); if (currentMap != null) { - currentMap.initializedTopics().forEach((topicId, partitions) -> { - resultMap.put(topicId, new HashSet<>(partitions)); + currentMap.initializedTopics().forEach((topicId, initMapValue) -> { + resultMap.put(topicId, new HashSet<>(initMapValue.partitions())); }); } @@ -5643,22 +5662,29 @@ public class GroupMetadataManager { // Update timeline structures with info about initialized/deleted topics. if (value == null) { // Tombstone! - shareGroupPartitionMetadata.remove(groupId); + shareGroupStatePartitionMetadata.remove(groupId); } else { + long timestamp = time.milliseconds(); ShareGroupStatePartitionMetadataInfo info = new ShareGroupStatePartitionMetadataInfo( value.initializingTopics().stream() - .map(topicPartitionInfo -> Map.entry(topicPartitionInfo.topicId(), new HashSet<>(topicPartitionInfo.partitions()))) + .map(topicPartitionInfo -> Map.entry( + topicPartitionInfo.topicId(), + new InitMapValue(topicPartitionInfo.topicName(), new HashSet<>(topicPartitionInfo.partitions()), timestamp))) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)), + value.initializedTopics().stream() - .map(topicPartitionInfo -> Map.entry(topicPartitionInfo.topicId(), new HashSet<>(topicPartitionInfo.partitions()))) + .map(topicPartitionInfo -> Map.entry( + topicPartitionInfo.topicId(), + new InitMapValue(topicPartitionInfo.topicName(), new HashSet<>(topicPartitionInfo.partitions()), timestamp))) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)), + value.deletingTopics().stream() .map(ShareGroupStatePartitionMetadataValue.TopicInfo::topicId) .collect(Collectors.toSet()) ); // Init java record. - shareGroupPartitionMetadata.put(groupId, info); + shareGroupStatePartitionMetadata.put(groupId, info); } } @@ -7916,13 +7942,13 @@ public class GroupMetadataManager { * @return Optional of object representing the share group state delete request. */ public Optional<DeleteShareGroupStateParameters> shareGroupBuildPartitionDeleteRequest(String shareGroupId, List<CoordinatorRecord> records) { - if (!shareGroupPartitionMetadata.containsKey(shareGroupId)) { + if (!shareGroupStatePartitionMetadata.containsKey(shareGroupId)) { return Optional.empty(); } - Map<Uuid, Set<Integer>> deleteCandidates = mergeShareGroupInitMaps( - shareGroupPartitionMetadata.get(shareGroupId).initializedTopics(), - shareGroupPartitionMetadata.get(shareGroupId).initializingTopics() + Map<Uuid, InitMapValue> deleteCandidates = combineInitMaps( + shareGroupStatePartitionMetadata.get(shareGroupId).initializedTopics(), + shareGroupStatePartitionMetadata.get(shareGroupId).initializingTopics() ); // Ideally the deleting should be empty - if it is not then it implies @@ -7931,16 +7957,16 @@ public class GroupMetadataManager { // a retry for the same is possible. Since this is part of an admin operation // retrying delete should not pose issues related to // performance. Also, the share coordinator is idempotent on delete partitions. - Map<Uuid, Set<Integer>> deletingTopics = shareGroupPartitionMetadata.get(shareGroupId).deletingTopics().stream() - .map(tid -> Map.entry(tid, metadataImage.topics().getTopic(tid).partitions().keySet())) + Map<Uuid, InitMapValue> deletingTopics = shareGroupStatePartitionMetadata.get(shareGroupId).deletingTopics().stream() + .map(tid -> { + TopicImage image = metadataImage.topics().getTopic(tid); + return Map.entry(tid, new InitMapValue(image.name(), image.partitions().keySet(), -1)); + }) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); if (!deletingTopics.isEmpty()) { log.info("Existing deleting entries found in share group {} - {}", shareGroupId, deletingTopics); - deleteCandidates = mergeShareGroupInitMaps( - deleteCandidates, - deletingTopics - ); + deleteCandidates = combineInitMaps(deleteCandidates, deletingTopics); } if (deleteCandidates.isEmpty()) { @@ -7949,10 +7975,10 @@ public class GroupMetadataManager { List<TopicData<PartitionIdData>> topicDataList = new ArrayList<>(deleteCandidates.size()); - for (Map.Entry<Uuid, Set<Integer>> entry : deleteCandidates.entrySet()) { + for (Map.Entry<Uuid, InitMapValue> entry : deleteCandidates.entrySet()) { topicDataList.add(new TopicData<>( entry.getKey(), - entry.getValue().stream() + entry.getValue().partitions().stream() .map(PartitionFactory::newPartitionIdData) .toList() )); @@ -7992,15 +8018,17 @@ public class GroupMetadataManager { List<CoordinatorRecord> records ) { List<DeleteShareGroupStateRequestData.DeleteStateData> deleteShareGroupStateRequestTopicsData = new ArrayList<>(); - Map<Uuid, Set<Integer>> initializedTopics = new HashMap<>(); - ShareGroupStatePartitionMetadataInfo currentMap = shareGroupPartitionMetadata.get(groupId); + ShareGroupStatePartitionMetadataInfo currentMap = shareGroupStatePartitionMetadata.get(groupId); if (currentMap == null) { return deleteShareGroupStateRequestTopicsData; } - currentMap.initializedTopics().forEach((topicId, partitions) -> initializedTopics.put(topicId, new HashSet<>(partitions))); + Map<Uuid, InitMapValue> initializedTopics = new HashMap<>(); + currentMap.initializedTopics().forEach((topicId, initValue) -> { + initializedTopics.put(topicId, new InitMapValue(initValue.name(), new HashSet<>(initValue.partitions()), initValue.timestamp())); + }); Set<Uuid> deletingTopics = new HashSet<>(currentMap.deletingTopics()); requestData.topics().forEach(topic -> { @@ -8011,7 +8039,7 @@ public class GroupMetadataManager { // share partitions are initialized for the group. if (initializedTopics.containsKey(topicId)) { List<DeleteShareGroupStateRequestData.PartitionData> partitions = new ArrayList<>(); - initializedTopics.get(topicId).forEach(partition -> + initializedTopics.get(topicId).partitions().forEach(partition -> partitions.add(new DeleteShareGroupStateRequestData.PartitionData().setPartition(partition))); deleteShareGroupStateRequestTopicsData.add( new DeleteShareGroupStateRequestData.DeleteStateData() @@ -8052,8 +8080,8 @@ public class GroupMetadataManager { records.add( GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord( groupId, - attachTopicName(currentMap.initializingTopics()), - attachTopicName(initializedTopics), + currentMap.initializingTopics(), + initializedTopics, attachTopicName(deletingTopics) ) ); @@ -8080,7 +8108,7 @@ public class GroupMetadataManager { return new CoordinatorResult<>(List.of()); } List<CoordinatorRecord> records = new ArrayList<>(); - shareGroupPartitionMetadata.forEach((groupId, metadata) -> { + shareGroupStatePartitionMetadata.forEach((groupId, metadata) -> { Set<Uuid> initializingDeletedCurrent = new HashSet<>(metadata.initializingTopics().keySet()); Set<Uuid> initializedDeletedCurrent = new HashSet<>(metadata.initializedTopics().keySet()); @@ -8098,10 +8126,10 @@ public class GroupMetadataManager { // because the call setup of this method is such that the // persister call is automatically done by the BrokerMetadataPublisher // increasing efficiency and removing need of chained futures. - Map<Uuid, Set<Integer>> finalInitializing = new HashMap<>(metadata.initializingTopics()); + Map<Uuid, InitMapValue> finalInitializing = new HashMap<>(metadata.initializingTopics()); initializingDeletedCurrent.forEach(finalInitializing::remove); - Map<Uuid, Set<Integer>> finalInitialized = new HashMap<>(metadata.initializedTopics()); + Map<Uuid, InitMapValue> finalInitialized = new HashMap<>(metadata.initializedTopics()); initializedDeletedCurrent.forEach(finalInitialized::remove); Set<Uuid> deletingTopics = new HashSet<>(metadata.deletingTopics()); @@ -8109,8 +8137,8 @@ public class GroupMetadataManager { records.add(GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord( groupId, - attachTopicName(finalInitializing), - attachTopicName(finalInitialized), + finalInitializing, + finalInitialized, attachTopicName(deletingTopics) )); }); @@ -8132,7 +8160,7 @@ public class GroupMetadataManager { Map<Uuid, String> topics, List<CoordinatorRecord> records ) { - ShareGroupStatePartitionMetadataInfo currentMap = shareGroupPartitionMetadata.get(groupId); + ShareGroupStatePartitionMetadataInfo currentMap = shareGroupStatePartitionMetadata.get(groupId); if (currentMap == null) { return List.of(); @@ -8145,8 +8173,8 @@ public class GroupMetadataManager { records.add( GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord( groupId, - attachTopicName(currentMap.initializingTopics()), - attachTopicName(currentMap.initializedTopics()), + currentMap.initializingTopics(), + currentMap.initializedTopics(), attachTopicName(updatedDeletingTopics) ) ); @@ -8398,4 +8426,9 @@ public class GroupMetadataManager { static String consumerGroupSyncKey(String groupId, String memberId) { return "sync-" + groupId + "-" + memberId; } + + // Visibility for testing + Map<String, ShareGroupStatePartitionMetadataInfo> shareGroupStatePartitionMetadata() { + return shareGroupStatePartitionMetadata; + } } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java index eacbab7dbff..be4711604e5 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.coordinator.group.modern.share; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.UnknownMemberIdException; @@ -31,6 +32,7 @@ import org.apache.kafka.timeline.TimelineObject; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.Optional; import java.util.Set; @@ -66,6 +68,35 @@ public class ShareGroup extends ModernGroup<ShareGroupMember> { } } + /** + * A record class to hold the value representing ShareGroupStatePartitionMetadata for the TimelineHashmap + * keyed on share group id. + * + * @param initializedTopics Map of set of partition ids keyed on the topic id. + * @param deletingTopics Set of topic ids. + */ + public record ShareGroupStatePartitionMetadataInfo( + Map<Uuid, InitMapValue> initializingTopics, + Map<Uuid, InitMapValue> initializedTopics, + Set<Uuid> deletingTopics + ) { + } + + /** + * Represents the value part for the initializing and initialized topic partitions in + * ShareGroupStatePartitionMetadataValue + * + * @param name Topic name + * @param partitions Set of partitions in the topic + * @param timestamp Timestamp at which the record was replayed + */ + public record InitMapValue( + String name, + Set<Integer> partitions, + long timestamp + ) { + } + /** * The group state. */ diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java index be2e9df5fea..4c479d8da05 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java @@ -52,6 +52,7 @@ import org.apache.kafka.coordinator.group.modern.MemberState; import org.apache.kafka.coordinator.group.modern.TopicMetadata; import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember; import org.apache.kafka.coordinator.group.modern.consumer.ResolvedRegularExpression; +import org.apache.kafka.coordinator.group.modern.share.ShareGroup.InitMapValue; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.junit.jupiter.api.Test; @@ -333,7 +334,7 @@ public class GroupCoordinatorRecordHelpersTest { Map.of(), Map.of( topicId1, - Map.entry(topicName1, partitions) + new InitMapValue(topicName1, partitions, 1) ), Map.of( topicId2, diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index a511c2529ea..9c644bb7e13 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -76,6 +76,7 @@ import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse.Status; import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; import org.apache.kafka.coordinator.common.runtime.CoordinatorResult; @@ -106,6 +107,7 @@ import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupBuilder; import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember; import org.apache.kafka.coordinator.group.modern.consumer.ResolvedRegularExpression; import org.apache.kafka.coordinator.group.modern.share.ShareGroup; +import org.apache.kafka.coordinator.group.modern.share.ShareGroup.InitMapValue; import org.apache.kafka.coordinator.group.modern.share.ShareGroupBuilder; import org.apache.kafka.coordinator.group.modern.share.ShareGroupMember; import org.apache.kafka.coordinator.group.streams.MockTaskAssignor; @@ -15094,10 +15096,10 @@ public class GroupMetadataManagerTest { assertRecordsEquals(expectedRecords, result.records()); } - private Map<Uuid, Map.Entry<String, Set<Integer>>> mkShareGroupStateMap(List<Map.Entry<Uuid, Map.Entry<String, Set<Integer>>>> entries) { - Map<Uuid, Map.Entry<String, Set<Integer>>> map = new HashMap<>(); + private Map<Uuid, InitMapValue> mkShareGroupStateMap(List<Map.Entry<Uuid, Map.Entry<String, Set<Integer>>>> entries) { + Map<Uuid, InitMapValue> map = new HashMap<>(); for (Map.Entry<Uuid, Map.Entry<String, Set<Integer>>> entry : entries) { - map.put(entry.getKey(), entry.getValue()); + map.put(entry.getKey(), new InitMapValue(entry.getValue().getKey(), entry.getValue().getValue(), 1)); } return map; } @@ -20675,8 +20677,8 @@ public class GroupMetadataManagerTest { context.replay( GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord( groupId, - Map.of(t1Uuid, Map.entry(t1Name, Set.of(0, 1))), - Map.of(t2Uuid, Map.entry(t2Name, Set.of(0, 1))), + Map.of(t1Uuid, new InitMapValue(t1Name, Set.of(0, 1), 1)), + Map.of(t2Uuid, new InitMapValue(t2Name, Set.of(0, 1), 1)), Map.of() ) ); @@ -20742,8 +20744,8 @@ public class GroupMetadataManagerTest { context.replay( GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord( groupId, - Map.of(t1Uuid, Map.entry(t1Name, Set.of(0, 1))), - Map.of(t2Uuid, Map.entry(t2Name, Set.of(0, 1))), + Map.of(t1Uuid, new InitMapValue(t1Name, Set.of(0, 1), 1)), + Map.of(t2Uuid, new InitMapValue(t2Name, Set.of(0, 1), 1)), Map.of(t3Uuid, t3Name) ) ); @@ -20804,8 +20806,8 @@ public class GroupMetadataManagerTest { groupId, Map.of(), Map.of( - topicId1, Map.entry(topicName1, Set.of(0, 1, 2)), - topicId2, Map.entry(topicName2, Set.of(0, 1)) + topicId1, new InitMapValue(topicName1, Set.of(0, 1, 2), 1), + topicId2, new InitMapValue(topicName2, Set.of(0, 1), 1) ), Map.of() ) @@ -20897,8 +20899,8 @@ public class GroupMetadataManagerTest { groupId, Map.of(), Map.of( - topicId1, Map.entry(topicName1, Set.of(0, 1, 2)), - topicId2, Map.entry(topicName2, Set.of(0, 1)) + topicId1, new InitMapValue(topicName1, Set.of(0, 1, 2), 1), + topicId2, new InitMapValue(topicName2, Set.of(0, 1), 1) ), Map.of( topicId3, topicName3, @@ -21003,7 +21005,7 @@ public class GroupMetadataManagerTest { GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord( groupId, Map.of(), - Map.of(topicId1, Map.entry(topicName1, Set.of(0, 1, 2))), + Map.of(topicId1, new InitMapValue(topicName1, Set.of(0, 1, 2), 1)), Map.of() ) ); @@ -21086,8 +21088,8 @@ public class GroupMetadataManagerTest { context.replay( GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord( groupId, - Map.of(topicId2, Map.entry(topicName2, Set.of(0, 1))), - Map.of(topicId1, Map.entry(topicName1, Set.of(0, 1, 2))), + Map.of(topicId2, new InitMapValue(topicName2, Set.of(0, 1), 1)), + Map.of(topicId1, new InitMapValue(topicName1, Set.of(0, 1, 2), 1)), Map.of() ) ); @@ -21110,7 +21112,7 @@ public class GroupMetadataManagerTest { List<CoordinatorRecord> expectedRecords = List.of( newShareGroupStatePartitionMetadataRecord( groupId, - Map.of(topicId2, Map.entry(topicName2, Set.of(0, 1))), + Map.of(topicId2, new InitMapValue(topicName2, Set.of(0, 1), 1)), Map.of(), Map.of(topicId1, topicName1) ) @@ -21171,8 +21173,8 @@ public class GroupMetadataManagerTest { context.replay( GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord( groupId, - Map.of(topicId2, Map.entry(topicName2, Set.of(0, 1))), - Map.of(topicId1, Map.entry(topicName1, Set.of(0, 1, 2))), + Map.of(topicId2, new InitMapValue(topicName2, Set.of(0, 1), 1)), + Map.of(topicId1, new InitMapValue(topicName1, Set.of(0, 1, 2), 1)), Map.of() ) ); @@ -21195,7 +21197,7 @@ public class GroupMetadataManagerTest { List<CoordinatorRecord> expectedRecords = List.of( newShareGroupStatePartitionMetadataRecord( groupId, - Map.of(topicId2, Map.entry(topicName2, Set.of(0, 1))), + Map.of(topicId2, new InitMapValue(topicName2, Set.of(0, 1), 1)), Map.of(), Map.of(topicId1, topicName1) ) @@ -21488,8 +21490,12 @@ public class GroupMetadataManagerTest { public void testShareGroupHeartbeatPersisterRequestWithInitializing() { MockPartitionAssignor assignor = new MockPartitionAssignor("range"); assignor.prepareGroupAssignment(new GroupAssignment(Map.of())); + MockTime time = new MockTime(); + int offsetWriteTimeout = 10; GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withShareGroupAssignor(assignor) + .withTime(time) + .withConfig(GroupCoordinatorConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG, offsetWriteTimeout) .build(); Uuid t1Uuid = Uuid.randomUuid(); @@ -21529,7 +21535,7 @@ public class GroupMetadataManagerTest { .setMemberEpoch(0) .setSubscribedTopicNames(List.of(t1Name))); - assertTrue(result.records().contains( + assertFalse(result.records().contains( newShareGroupStatePartitionMetadataRecord(groupId, mkShareGroupStateMap(List.of( mkShareGroupStateMetadataEntry(t1Uuid, t1Name, List.of(0, 1)) )), @@ -21543,6 +21549,48 @@ public class GroupMetadataManagerTest { Map.of(t1Uuid, Set.of(0, 1)), groupId, 1, + false + ); + + // Manipulate time so the initializing topic becomes eligible for retry + context.groupMetadataManager.replay( + new ShareGroupStatePartitionMetadataKey() + .setGroupId(groupId), + new ShareGroupStatePartitionMetadataValue() + .setInitializingTopics(List.of( + new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo() + .setTopicId(t1Uuid) + .setTopicName(t1Name) + .setPartitions(List.of(0, 1)) + )) + .setInitializedTopics(List.of()) + .setDeletingTopics(List.of()) + ); + + long timeNow = time.milliseconds() + offsetWriteTimeout * 2 + 1; + time.setCurrentTimeMs(timeNow); + memberId = Uuid.randomUuid(); + result = context.shareGroupHeartbeat( + new ShareGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId.toString()) + .setMemberEpoch(0) + .setSubscribedTopicNames(List.of(t1Name))); + + assertTrue(result.records().contains( + newShareGroupStatePartitionMetadataRecord(groupId, mkShareGroupStateMap(List.of( + mkShareGroupStateMetadataEntry(t1Uuid, t1Name, List.of(0, 1)) + )), + Map.of(), + Map.of() + )) + ); + + verifyShareGroupHeartbeatInitializeRequest( + result.response().getValue(), + Map.of(t1Uuid, Set.of(0, 1)), + groupId, + 2, true ); } @@ -21586,7 +21634,7 @@ public class GroupMetadataManagerTest { ); List<CoordinatorRecord> records = new ArrayList<>(); - context.groupMetadataManager.addInitializingTopicsRecords(groupId, records, Map.of(t1Uuid, Set.of(0, 1))); + context.groupMetadataManager.addInitializingTopicsRecords(groupId, records, Map.of(t1Uuid, new InitMapValue(t1Name, Set.of(0, 1), 1))); List<CoordinatorRecord> expectedRecords = List.of( CoordinatorRecord.record( @@ -21633,16 +21681,31 @@ public class GroupMetadataManagerTest { .setEpoch(0) ); + context.groupMetadataManager.replay( + new ShareGroupStatePartitionMetadataKey() + .setGroupId(groupId), + new ShareGroupStatePartitionMetadataValue() + .setInitializingTopics(List.of( + new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo() + .setTopicId(topicId) + .setTopicName(topicName) + .setPartitions(List.of(0, 1)) + )) + .setInitializedTopics(List.of()) + .setDeletingTopics(List.of()) + ); + Map<Uuid, Set<Integer>> snapshotMetadataInitializeMap = Map.of( topicId, Set.of(0, 1) ); - Map<Uuid, Map.Entry<String, Set<Integer>>> snapshotMetadataInitializeRecordMap = Map.of( + Map<Uuid, InitMapValue> snapshotMetadataInitializeRecordMap = Map.of( topicId, - Map.entry( + new InitMapValue( topicName, - Set.of(0, 1) + Set.of(0, 1), + 0 ) ); @@ -21652,6 +21715,8 @@ public class GroupMetadataManagerTest { assertNull(result.response()); assertEquals(List.of(record), result.records()); + // Make sure the timeline map is not modified yet. + assertEquals(snapshotMetadataInitializeRecordMap, context.groupMetadataManager.shareGroupStatePartitionMetadata().get(groupId).initializingTopics()); } @Test @@ -21689,11 +21754,15 @@ public class GroupMetadataManagerTest { Uuid topicId = Uuid.randomUuid(); int partitions = 1; String groupId = "foogrp"; + MockTime time = new MockTime(); + int offsetWriteTimeout = 10; MockPartitionAssignor assignor = new MockPartitionAssignor("simple"); assignor.prepareGroupAssignment(new GroupAssignment(Map.of())); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withShareGroupAssignor(assignor) + .withTime(time) + .withConfig(GroupCoordinatorConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG, offsetWriteTimeout) .build(); // Empty on empty subscription metadata @@ -21703,9 +21772,11 @@ public class GroupMetadataManagerTest { ); // No error on empty initialized metadata (no replay of initialized topics) + long timeNow = time.milliseconds() + 100; + time.setCurrentTimeMs(timeNow); assertEquals( Map.of( - topicId, Set.of(0) + topicId, new InitMapValue(topicName, Set.of(0), timeNow) ), context.groupMetadataManager.subscribedTopicsChangeMap(groupId, Map.of( topicName, new TopicMetadata(topicId, topicName, partitions) @@ -21726,6 +21797,7 @@ public class GroupMetadataManagerTest { new ShareGroupMetadataValue() .setEpoch(0) ); + context.groupMetadataManager.replay( new ShareGroupStatePartitionMetadataKey() .setGroupId(groupId), @@ -21746,10 +21818,12 @@ public class GroupMetadataManagerTest { ); // Since t1 is initializing and t2 is initialized due to replay above. + timeNow = timeNow + 2 * offsetWriteTimeout + 1; + time.setCurrentTimeMs(timeNow); assertEquals( Map.of( - t1Id, Set.of(0, 1), - t3Id, Set.of(0, 1, 2) + t1Id, new InitMapValue(t1Name, Set.of(0, 1), timeNow), // initializing + t3Id, new InitMapValue(t3Name, Set.of(0, 1, 2), timeNow) // initialized ), context.groupMetadataManager.subscribedTopicsChangeMap(groupId, Map.of( t1Name, new TopicMetadata(t1Id, t1Name, 2), @@ -21818,32 +21892,11 @@ public class GroupMetadataManagerTest { result = context.groupMetadataManager.uninitializeShareGroupState(groupId, Map.of(t1Id, Set.of(0, 1))); Set<Integer> partitions = new LinkedHashSet<>(List.of(0, 1, 2)); assertEquals( - List.of(newShareGroupStatePartitionMetadataRecord(groupId, Map.of(), Map.of(t2Id, Map.entry(t2Name, partitions)), Map.of())), + List.of(newShareGroupStatePartitionMetadataRecord(groupId, Map.of(), Map.of(t2Id, new InitMapValue(t2Name, partitions, 1)), Map.of())), result.records() ); } - @Test - public void testMergeShareGroupInitMaps() { - Map<Uuid, Set<Integer>> m1 = new HashMap<>(); - Map<Uuid, Set<Integer>> m2 = new HashMap<>(); - - Uuid t1 = Uuid.randomUuid(); - Uuid t2 = Uuid.randomUuid(); - Uuid t3 = Uuid.randomUuid(); - - m1.put(t1, new HashSet<>(List.of(1, 2))); - m1.put(t2, new HashSet<>(List.of(3, 4))); - m2.put(t1, new HashSet<>(List.of(3, 4))); - m2.put(t3, new HashSet<>(List.of(5, 6))); - - Map<Uuid, Set<Integer>> m3 = GroupMetadataManager.mergeShareGroupInitMaps(m1, m2); - // The arg maps should not be overridden. - assertEquals(Map.of(t1, Set.of(1, 2), t2, Set.of(3, 4)), m1); - assertEquals(Map.of(t1, Set.of(3, 4), t3, Set.of(5, 6)), m2); - assertEquals(Map.of(t1, Set.of(1, 2, 3, 4), t2, Set.of(3, 4), t3, Set.of(5, 6)), m3); - } - @Test public void testMaybeCleanupShareGroupStateEmptyTopicIds() { MockPartitionAssignor assignor = new MockPartitionAssignor("simple"); @@ -21963,6 +22016,47 @@ public class GroupMetadataManagerTest { assertEquals(expectedResult, context.groupMetadataManager.maybeCleanupShareGroupState(Set.of(t1Id, t2Id))); } + @Test + public void testCombineInitMaps() { + // Both empty. + Map<Uuid, InitMapValue> m1 = Map.of(); + Map<Uuid, InitMapValue> m2 = Map.of(); + + assertEquals(Map.of(), GroupMetadataManager.combineInitMaps(m1, m2)); + + Uuid t1Id = Uuid.randomUuid(); + String t1Name = "t1"; + Uuid t2Id = Uuid.randomUuid(); + String t2Name = "t2"; + + // m1 non-empty, m2 empty. + m1 = Map.of(t1Id, new InitMapValue(t1Name, Set.of(0), 1)); + assertEquals(m1, GroupMetadataManager.combineInitMaps(m1, m2)); + + // m1 empty, m2 non-empty. + m1 = Map.of(); + m2 = Map.of(t1Id, new InitMapValue(t1Name, Set.of(0), 1)); + assertEquals(m2, GroupMetadataManager.combineInitMaps(m1, m2)); + + // m1 non-empty, m2 non-empty. + m1 = Map.of(t1Id, new InitMapValue(t1Name, Set.of(0), 1)); + m2 = Map.of(t2Id, new InitMapValue(t2Name, Set.of(0), 1)); + assertEquals(Map.of( + t1Id, new InitMapValue(t1Name, Set.of(0), 1), + t2Id, new InitMapValue(t2Name, Set.of(0), 1) + ), GroupMetadataManager.combineInitMaps(m1, m2)); + + // m1 non-empty, m2 non-empty (differ by partition) + m1 = Map.of(t1Id, new InitMapValue(t1Name, Set.of(0), 1)); + m2 = Map.of(t1Id, new InitMapValue(t1Name, Set.of(1), 2)); + assertEquals(Map.of(t1Id, new InitMapValue(t1Name, Set.of(0, 1), 2)), GroupMetadataManager.combineInitMaps(m1, m2)); + + // m1 and m2 exactly same + m1 = Map.of(t1Id, new InitMapValue(t1Name, Set.of(0), 1)); + m2 = Map.of(t1Id, new InitMapValue(t1Name, Set.of(0), 1)); + assertEquals(Map.of(t1Id, new InitMapValue(t1Name, Set.of(0), 1)), GroupMetadataManager.combineInitMaps(m1, m2)); + } + private static void checkJoinGroupResponse( JoinGroupResponseData expectedResponse, JoinGroupResponseData actualResponse, diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java index 112653dceea..dc0670656d0 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java @@ -460,7 +460,7 @@ public class GroupMetadataManagerTestContext { } public static class Builder { - private final MockTime time = new MockTime(0, 0, 0); + private MockTime time = new MockTime(0, 0, 0); private final MockCoordinatorTimer<Void, CoordinatorRecord> timer = new MockCoordinatorTimer<>(time); private final MockCoordinatorExecutor<CoordinatorRecord> executor = new MockCoordinatorExecutor<>(); private final LogContext logContext = new LogContext(); @@ -516,6 +516,11 @@ public class GroupMetadataManagerTestContext { return this; } + public Builder withTime(MockTime time) { + this.time = time; + return this; + } + public GroupMetadataManagerTestContext build() { if (metadataImage == null) metadataImage = MetadataImage.EMPTY; if (groupConfigManager == null) groupConfigManager = createConfigManager(); diff --git a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java index 7f6c1c47ff5..b61671ed991 100644 --- a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java +++ b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java @@ -772,11 +772,11 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord SharePartitionKey mapKey = SharePartitionKey.getInstance(groupId, topicId, partitionId); if (partitionData.leaderEpoch() != -1 && leaderEpochMap.containsKey(mapKey) && leaderEpochMap.get(mapKey) > partitionData.leaderEpoch()) { - log.error("Request leader epoch smaller than last recorded."); + log.error("Write request leader epoch is smaller than last recorded current: {}, requested: {}.", leaderEpochMap.get(mapKey), partitionData.leaderEpoch()); return Optional.of(getWriteErrorCoordinatorResult(Errors.FENCED_LEADER_EPOCH, null, topicId, partitionId)); } if (partitionData.stateEpoch() != -1 && stateEpochMap.containsKey(mapKey) && stateEpochMap.get(mapKey) > partitionData.stateEpoch()) { - log.error("Request state epoch smaller than last recorded."); + log.info("Write request state epoch is smaller than last recorded current: {}, requested: {}.", stateEpochMap.get(mapKey), partitionData.stateEpoch()); return Optional.of(getWriteErrorCoordinatorResult(Errors.FENCED_STATE_EPOCH, null, topicId, partitionId)); } if (metadataImage == null) { @@ -814,7 +814,7 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord SharePartitionKey mapKey = SharePartitionKey.getInstance(groupId, topicId, partitionId); if (leaderEpochMap.containsKey(mapKey) && leaderEpochMap.get(mapKey) > partitionData.leaderEpoch()) { - log.error("Request leader epoch id is smaller than last recorded."); + log.error("Read request leader epoch is smaller than last recorded current: {}, requested: {}.", leaderEpochMap.get(mapKey), partitionData.leaderEpoch()); return Optional.of(ReadShareGroupStateResponse.toErrorResponseData(topicId, partitionId, Errors.FENCED_LEADER_EPOCH, Errors.FENCED_LEADER_EPOCH.message())); } @@ -915,7 +915,7 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord SharePartitionKey key = SharePartitionKey.getInstance(request.groupId(), topicId, partitionId); if (partitionData.stateEpoch() != -1 && stateEpochMap.containsKey(key) && stateEpochMap.get(key) > partitionData.stateEpoch()) { - log.error("Initialize request state epoch smaller than last recorded."); + log.info("Initialize request state epoch is smaller than last recorded current: {}, requested: {}.", stateEpochMap.get(key), partitionData.stateEpoch()); return Optional.of(getInitializeErrorCoordinatorResult(Errors.FENCED_STATE_EPOCH, Errors.FENCED_STATE_EPOCH.exception(), topicId, partitionId)); }