This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c58de757128 KAFKA-19204: Add timestamp to share state metadata init 
maps [1/N] (#19781)
c58de757128 is described below

commit c58de7571289f05a734f9fe7e197b23c004803d3
Author: Sushant Mahajan <smaha...@confluent.io>
AuthorDate: Fri May 23 13:26:05 2025 +0530

    KAFKA-19204: Add timestamp to share state metadata init maps [1/N] (#19781)
    
    1.  Currently, the code allows for retrying any initializing topics in
    subsequent heartbeats. This can result in duplicate calls to persister
    if multiple share consumers join the same group concurrently.
    Furthermore, only one of these will succeed as the others will have a
    lower state epoch and will be fenced.
    2. The existing change was made in
    https://github.com/apache/kafka/pull/19603 to allow for retrying
    initialization of initializing topics, in case the original caller was
    not able to persist the information in the persister due to a dead
    broker/timeout.
    3. To prevent multiple calls as well as allow for retry we have
    supplemented the timelinehashmap holding the
    `ShareGroupStatePartitionMetadataInfo` to also hold the timestamp at
    which this record gets replayed.
      a. Now when we get multiple consumers for the same group and topic,
    only one of them is allowed to make the persister initialize request and
    this information is added to the map when it is replayed. Thus solving
    issue 1.
      b. To allow for retries, if an initializing topic is found with a
    timestamp which is older than 2*offset_write_commit_ms, that topic will
    be allowed to be retried. Here too only one consumer would be able to
    retry thus resolving issue 2 as well.
    4. Tests have been added wherever applicable and existing ones updated.
    5. No record schema changes are involved.
    6. The `ShareGroupStatePartitionMetadataInfo` and `InitMapValue` records
    have been moved to the `ShareGroup` class for better encapsulation.
    7. Some logs have been changed from error to info in
    `ShareCoordinatorShard` and extra information is logged.
    
    Reviewers: Andrew Schofield <aschofi...@confluent.io>
---
 .../group/GroupCoordinatorRecordHelpers.java       |  13 +-
 .../coordinator/group/GroupMetadataManager.java    | 239 ++++++++++++---------
 .../coordinator/group/modern/share/ShareGroup.java |  31 +++
 .../group/GroupCoordinatorRecordHelpersTest.java   |   3 +-
 .../group/GroupMetadataManagerTest.java            | 190 +++++++++++-----
 .../group/GroupMetadataManagerTestContext.java     |   7 +-
 .../coordinator/share/ShareCoordinatorShard.java   |   8 +-
 7 files changed, 328 insertions(+), 163 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java
index ad17bfe34f2..288e8d3abe1 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java
@@ -56,6 +56,7 @@ import 
org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMe
 import org.apache.kafka.coordinator.group.modern.TopicMetadata;
 import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember;
 import 
org.apache.kafka.coordinator.group.modern.consumer.ResolvedRegularExpression;
+import org.apache.kafka.coordinator.group.modern.share.ShareGroup.InitMapValue;
 import org.apache.kafka.coordinator.group.modern.share.ShareGroupMember;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 
@@ -844,22 +845,22 @@ public class GroupCoordinatorRecordHelpers {
      */
     public static CoordinatorRecord newShareGroupStatePartitionMetadataRecord(
         String groupId,
-        Map<Uuid, Map.Entry<String, Set<Integer>>> initializingTopics,
-        Map<Uuid, Map.Entry<String, Set<Integer>>> initializedTopics,
+        Map<Uuid, InitMapValue> initializingTopics,
+        Map<Uuid, InitMapValue> initializedTopics,
         Map<Uuid, String> deletingTopics
     ) {
         List<ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo> 
initializingTopicPartitionInfo = initializingTopics.entrySet().stream()
             .map(entry -> new 
ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
                 .setTopicId(entry.getKey())
-                .setTopicName(entry.getValue().getKey())
-                .setPartitions(entry.getValue().getValue().stream().toList()))
+                .setTopicName(entry.getValue().name())
+                
.setPartitions(entry.getValue().partitions().stream().toList()))
             .toList();
 
         List<ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo> 
initializedTopicPartitionInfo = initializedTopics.entrySet().stream()
             .map(entry -> new 
ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
                 .setTopicId(entry.getKey())
-                .setTopicName(entry.getValue().getKey())
-                .setPartitions(entry.getValue().getValue().stream().toList()))
+                .setTopicName(entry.getValue().name())
+                
.setPartitions(entry.getValue().partitions().stream().toList()))
             .toList();
 
         List<ShareGroupStatePartitionMetadataValue.TopicInfo> 
deletingTopicsInfo = deletingTopics.entrySet().stream()
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 5412b2ab87e..4312cb47b09 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -147,6 +147,8 @@ import 
org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember;
 import 
org.apache.kafka.coordinator.group.modern.consumer.CurrentAssignmentBuilder;
 import 
org.apache.kafka.coordinator.group.modern.consumer.ResolvedRegularExpression;
 import org.apache.kafka.coordinator.group.modern.share.ShareGroup;
+import org.apache.kafka.coordinator.group.modern.share.ShareGroup.InitMapValue;
+import 
org.apache.kafka.coordinator.group.modern.share.ShareGroup.ShareGroupStatePartitionMetadataInfo;
 import 
org.apache.kafka.coordinator.group.modern.share.ShareGroupAssignmentBuilder;
 import org.apache.kafka.coordinator.group.modern.share.ShareGroupMember;
 import org.apache.kafka.coordinator.group.streams.StreamsGroup;
@@ -469,7 +471,7 @@ public class GroupMetadataManager {
     /**
      * The share group partition metadata info keyed by group id.
      */
-    private final TimelineHashMap<String, 
ShareGroupStatePartitionMetadataInfo> shareGroupPartitionMetadata;
+    private final TimelineHashMap<String, 
ShareGroupStatePartitionMetadataInfo> shareGroupStatePartitionMetadata;
 
     /**
      * The group manager.
@@ -506,20 +508,6 @@ public class GroupMetadataManager {
      */
     private final ShareGroupPartitionAssignor shareGroupAssignor;
 
-    /**
-     * A record class to hold the value representing 
ShareGroupStatePartitionMetadata for the TimelineHashmap
-     * keyed on share group id.
-     *
-     * @param initializedTopics Map of set of partition ids keyed on the topic 
id.
-     * @param deletingTopics    Set of topic ids.
-     */
-    private record ShareGroupStatePartitionMetadataInfo(
-        Map<Uuid, Set<Integer>> initializingTopics,
-        Map<Uuid, Set<Integer>> initializedTopics,
-        Set<Uuid> deletingTopics
-    ) {
-    }
-
     /**
      * The authorizer to validate the regex subscription topics.
      */
@@ -555,7 +543,7 @@ public class GroupMetadataManager {
         this.defaultConsumerGroupAssignor = 
config.consumerGroupAssignors().get(0);
         this.groups = new TimelineHashMap<>(snapshotRegistry, 0);
         this.groupsByTopics = new TimelineHashMap<>(snapshotRegistry, 0);
-        this.shareGroupPartitionMetadata = new 
TimelineHashMap<>(snapshotRegistry, 0);
+        this.shareGroupStatePartitionMetadata = new 
TimelineHashMap<>(snapshotRegistry, 0);
         this.groupConfigManager = groupConfigManager;
         this.shareGroupAssignor = shareGroupAssignor;
         this.authorizerPlugin = authorizerPlugin;
@@ -2642,7 +2630,7 @@ public class GroupMetadataManager {
     }
 
     private boolean initializedAssignmentPending(ShareGroup group) {
-        if (!shareGroupPartitionMetadata.containsKey(group.groupId())) {
+        if (!shareGroupStatePartitionMetadata.containsKey(group.groupId())) {
             // No initialized share partitions for the group so nothing can be 
assigned.
             return false;
         }
@@ -2653,7 +2641,7 @@ public class GroupMetadataManager {
         }
 
         // We need to check if all the group initialized share partitions are 
part of the group assignment.
-        Map<Uuid, Set<Integer>> initializedTps = 
shareGroupPartitionMetadata.get(group.groupId()).initializedTopics();
+        Map<Uuid, Set<Integer>> initializedTps = 
stripInitValue(shareGroupStatePartitionMetadata.get(group.groupId()).initializedTopics());
         Map<Uuid, Set<Integer>> currentAssigned = new HashMap<>();
         for (Assignment assignment : group.targetAssignment().values()) {
             for (Map.Entry<Uuid, Set<Integer>> tps : 
assignment.partitions().entrySet()) {
@@ -2674,27 +2662,38 @@ public class GroupMetadataManager {
      * @return  A map of topic partitions which are subscribed by the share 
group but not initialized yet.
      */
     // Visibility for testing
-    Map<Uuid, Set<Integer>> subscribedTopicsChangeMap(String groupId, 
Map<String, TopicMetadata> subscriptionMetadata) {
+    Map<Uuid, InitMapValue> subscribedTopicsChangeMap(String groupId, 
Map<String, TopicMetadata> subscriptionMetadata) {
         if (subscriptionMetadata == null || subscriptionMetadata.isEmpty()) {
             return Map.of();
         }
 
-        Map<Uuid, Set<Integer>> topicPartitionChangeMap = new HashMap<>();
-        ShareGroupStatePartitionMetadataInfo info = 
shareGroupPartitionMetadata.get(groupId);
-
-        // We are only considering initialized TPs here. This is because it 
could happen
-        // that some topics have been moved to initializing but the 
corresponding persister request
-        // could not be made/failed (invoked by the group coordinator). Then 
there would be no way to try
-        // the persister call. This way we get the opportunity to retry.
-        Map<Uuid, Set<Integer>> alreadyInitialized = info == null ? new 
HashMap<>() : info.initializedTopics();
+        Map<Uuid, InitMapValue> topicPartitionChangeMap = new HashMap<>();
+        ShareGroupStatePartitionMetadataInfo info = 
shareGroupStatePartitionMetadata.get(groupId);
+
+        // We must only consider initializing topics whose timestamp is 
fresher than delta elapsed.
+        // Any initializing topics which are older than delta and are part of 
the subscribed topics
+        // must be returned so that they can be retried.
+        long curTimestamp = time.milliseconds();
+        long delta = config.offsetCommitTimeoutMs() * 2L;
+        Map<Uuid, InitMapValue> alreadyInitialized = info == null ? new 
HashMap<>() :
+            combineInitMaps(
+                info.initializedTopics(),
+                info.initializingTopics().entrySet().stream()
+                    .filter(entry -> curTimestamp - 
entry.getValue().timestamp() < delta)
+                    .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue))
+            );
 
+        // Here will add any topics which are subscribed but not initialized 
and initializing
+        // topics whose timestamp indicates that they are older than delta 
elapsed.
         subscriptionMetadata.forEach((topicName, topicMetadata) -> {
-            Set<Integer> alreadyInitializedPartSet = 
alreadyInitialized.getOrDefault(topicMetadata.id(), Set.of());
+            Set<Integer> alreadyInitializedPartSet = 
alreadyInitialized.containsKey(topicMetadata.id()) ? 
alreadyInitialized.get(topicMetadata.id()).partitions() : Set.of();
             if (alreadyInitializedPartSet.isEmpty() || 
alreadyInitializedPartSet.size() < topicMetadata.numPartitions()) {
                 Set<Integer> partitionSet = IntStream.range(0, 
topicMetadata.numPartitions()).boxed().collect(Collectors.toSet());
                 partitionSet.removeAll(alreadyInitializedPartSet);
-
-                topicPartitionChangeMap.computeIfAbsent(topicMetadata.id(), k 
-> partitionSet);
+                // alreadyInitialized contains all initialized topics and 
initializing topics which are less than delta old
+                // which means we are putting subscribed topics which are 
unseen or initializing for more than delta. But, we
+                // are also updating the timestamp here which means, old 
initializing will not be included repeatedly.
+                topicPartitionChangeMap.computeIfAbsent(topicMetadata.id(), k 
-> new InitMapValue(topicMetadata.name(), partitionSet, curTimestamp));
             }
         });
         return topicPartitionChangeMap;
@@ -2719,7 +2718,7 @@ public class GroupMetadataManager {
             return Optional.empty();
         }
 
-        Map<Uuid, Set<Integer>> topicPartitionChangeMap = 
subscribedTopicsChangeMap(groupId, subscriptionMetadata);
+        Map<Uuid, InitMapValue> topicPartitionChangeMap = 
subscribedTopicsChangeMap(groupId, subscriptionMetadata);
 
         // Nothing to initialize.
         if (topicPartitionChangeMap.isEmpty()) {
@@ -2730,12 +2729,12 @@ public class GroupMetadataManager {
         return Optional.of(buildInitializeShareGroupStateRequest(groupId, 
groupEpoch, topicPartitionChangeMap));
     }
 
-    private InitializeShareGroupStateParameters 
buildInitializeShareGroupStateRequest(String groupId, int groupEpoch, Map<Uuid, 
Set<Integer>> topicPartitions) {
+    private InitializeShareGroupStateParameters 
buildInitializeShareGroupStateRequest(String groupId, int groupEpoch, Map<Uuid, 
InitMapValue> topicPartitions) {
         return new 
InitializeShareGroupStateParameters.Builder().setGroupTopicPartitionData(
             new GroupTopicPartitionData<>(groupId, 
topicPartitions.entrySet().stream()
                 .map(entry -> new TopicData<>(
                     entry.getKey(),
-                    entry.getValue().stream()
+                    entry.getValue().partitions().stream()
                         .map(partitionId -> 
PartitionFactory.newPartitionStateData(partitionId, groupEpoch, -1))
                         .toList())
                 ).toList()
@@ -2743,19 +2742,19 @@ public class GroupMetadataManager {
     }
 
     // Visibility for tests
-    void addInitializingTopicsRecords(String groupId, List<CoordinatorRecord> 
records, Map<Uuid, Set<Integer>> topicPartitionMap) {
+    void addInitializingTopicsRecords(String groupId, List<CoordinatorRecord> 
records, Map<Uuid, InitMapValue> topicPartitionMap) {
         if (topicPartitionMap == null || topicPartitionMap.isEmpty()) {
             return;
         }
 
-        ShareGroupStatePartitionMetadataInfo currentMap = 
shareGroupPartitionMetadata.get(groupId);
+        ShareGroupStatePartitionMetadataInfo currentMap = 
shareGroupStatePartitionMetadata.get(groupId);
         if (currentMap == null) {
-            records.add(newShareGroupStatePartitionMetadataRecord(groupId, 
attachTopicName(topicPartitionMap), Map.of(), Map.of()));
+            records.add(newShareGroupStatePartitionMetadataRecord(groupId, 
topicPartitionMap, Map.of(), Map.of()));
             return;
         }
 
         // We must combine the existing information in the record with the 
topicPartitionMap argument.
-        Map<Uuid, Set<Integer>> finalInitializingMap = 
mergeShareGroupInitMaps(currentMap.initializingTopics(), topicPartitionMap);
+        Map<Uuid, InitMapValue> finalInitializingMap = 
combineInitMaps(currentMap.initializingTopics(), topicPartitionMap);
 
         // If any initializing topics are also present in the deleting state
         // we should remove them from deleting.
@@ -2771,33 +2770,50 @@ public class GroupMetadataManager {
         records.add(
             newShareGroupStatePartitionMetadataRecord(
                 groupId,
-                attachTopicName(finalInitializingMap),
-                attachTopicName(currentMap.initializedTopics()),
+                finalInitializingMap,
+                currentMap.initializedTopics(),
                 attachTopicName(currentDeleting)
             )
         );
     }
 
-    // Visibility for tests
-    static Map<Uuid, Set<Integer>> mergeShareGroupInitMaps(
-        Map<Uuid, Set<Integer>> existingShareGroupInitMap,
-        Map<Uuid, Set<Integer>> newShareGroupInitMap
+    // Visibility for testing
+    static Map<Uuid, InitMapValue> combineInitMaps(
+        Map<Uuid, InitMapValue> initialized,
+        Map<Uuid, InitMapValue> initializing
     ) {
-        Map<Uuid, Set<Integer>> finalInitMap = new HashMap<>();
-        Set<Uuid> combinedTopicIdSet = new 
HashSet<>(existingShareGroupInitMap.keySet());
-        combinedTopicIdSet.addAll(newShareGroupInitMap.keySet());
+        Map<Uuid, InitMapValue> finalInitMap = new HashMap<>();
+        Set<Uuid> combinedTopicIdSet = new HashSet<>(initialized.keySet());
+
+        Set<Uuid> initializingSet = initializing.keySet();
+
+        combinedTopicIdSet.addAll(initializingSet);
 
         for (Uuid topicId : combinedTopicIdSet) {
-            Set<Integer> partitions = new 
HashSet<>(existingShareGroupInitMap.getOrDefault(topicId, new HashSet<>()));
-            if (newShareGroupInitMap.containsKey(topicId)) {
-                partitions.addAll(newShareGroupInitMap.get(topicId));
+            Set<Integer> initializedPartitions = 
initialized.containsKey(topicId) ? initialized.get(topicId).partitions() : new 
HashSet<>();
+            long timestamp = initialized.containsKey(topicId) ? 
initialized.get(topicId).timestamp() : -1;
+            String name = initialized.containsKey(topicId) ? 
initialized.get(topicId).name() : "UNKNOWN";
+
+            Set<Integer> finalPartitions = new 
HashSet<>(initializedPartitions);
+            if (initializingSet.contains(topicId)) {
+                finalPartitions.addAll(initializing.get(topicId).partitions());
+                timestamp = initializing.get(topicId).timestamp();
+                name = initializing.get(topicId).name();
             }
-            finalInitMap.putIfAbsent(topicId, partitions);
+            finalInitMap.putIfAbsent(topicId, new InitMapValue(name, 
finalPartitions, timestamp));
         }
 
         return finalInitMap;
     }
 
+    static Map<Uuid, Set<Integer>> stripInitValue(
+        Map<Uuid, InitMapValue> initMap
+    ) {
+        return initMap.entrySet().stream()
+            .map(entry -> Map.entry(entry.getKey(), 
entry.getValue().partitions()))
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+    }
+
     /**
      * Gets or subscribes a new dynamic consumer group member.
      *
@@ -3707,8 +3723,8 @@ public class GroupMetadataManager {
         List<CoordinatorRecord> records
     ) {
         try {
-            Map<Uuid, Set<Integer>> initializedTopicPartitions = 
shareGroupPartitionMetadata.containsKey(group.groupId()) ?
-                
shareGroupPartitionMetadata.get(group.groupId()).initializedTopics() :
+            Map<Uuid, Set<Integer>> initializedTopicPartitions = 
shareGroupStatePartitionMetadata.containsKey(group.groupId()) ?
+                
stripInitValue(shareGroupStatePartitionMetadata.get(group.groupId()).initializedTopics())
 :
                 Map.of();
 
             TargetAssignmentBuilder.ShareTargetAssignmentBuilder 
assignmentResultBuilder =
@@ -4729,26 +4745,28 @@ public class GroupMetadataManager {
         }
         ShareGroup group = (ShareGroup) groups.get(groupId);
 
-        ShareGroupStatePartitionMetadataInfo currentMap = 
shareGroupPartitionMetadata.get(groupId);
+        ShareGroupStatePartitionMetadataInfo currentMap = 
shareGroupStatePartitionMetadata.get(groupId);
+        Map<Uuid, InitMapValue> enrichedTopicPartitionMap = 
attachInitValue(topicPartitionMap);
         if (currentMap == null) {
             return new CoordinatorResult<>(
-                
List.of(newShareGroupStatePartitionMetadataRecord(group.groupId(), Map.of(), 
attachTopicName(topicPartitionMap), Map.of())),
+                
List.of(newShareGroupStatePartitionMetadataRecord(group.groupId(), Map.of(), 
enrichedTopicPartitionMap, Map.of())),
                 null
             );
         }
 
         // We must combine the existing information in the record with the 
topicPartitionMap argument so that the final
         // record has up-to-date information.
-        Map<Uuid, Set<Integer>> finalInitializedMap = 
mergeShareGroupInitMaps(currentMap.initializedTopics(), topicPartitionMap);
+        Map<Uuid, InitMapValue> finalInitializedMap = 
combineInitMaps(currentMap.initializedTopics(), enrichedTopicPartitionMap);
 
         // Fetch initializing info from state metadata.
-        Map<Uuid, Set<Integer>> finalInitializingMap = new 
HashMap<>(currentMap.initializingTopics());
+        Map<Uuid, InitMapValue> finalInitializingMap = new HashMap<>();
+        currentMap.initializingTopics().forEach((k, v) -> 
finalInitializingMap.put(k, new InitMapValue(v.name(), new 
HashSet<>(v.partitions()), v.timestamp())));
 
         // Remove any entries which are already initialized.
         for (Map.Entry<Uuid, Set<Integer>> entry : 
topicPartitionMap.entrySet()) {
             Uuid topicId = entry.getKey();
             if (finalInitializingMap.containsKey(topicId)) {
-                Set<Integer> partitions = finalInitializingMap.get(topicId);
+                Set<Integer> partitions = 
finalInitializingMap.get(topicId).partitions();
                 partitions.removeAll(entry.getValue());
                 if (partitions.isEmpty()) {
                     finalInitializingMap.remove(topicId);
@@ -4759,8 +4777,8 @@ public class GroupMetadataManager {
         return new CoordinatorResult<>(List.of(
             newShareGroupStatePartitionMetadataRecord(
                 group.groupId(),
-                attachTopicName(finalInitializingMap),
-                attachTopicName(finalInitializedMap),
+                finalInitializingMap,
+                finalInitializedMap,
                 attachTopicName(currentMap.deletingTopics())
             )),
             null
@@ -4779,24 +4797,24 @@ public class GroupMetadataManager {
         String groupId,
         Map<Uuid, Set<Integer>> topicPartitionMap
     ) {
-        ShareGroupStatePartitionMetadataInfo info = 
shareGroupPartitionMetadata.get(groupId);
+        ShareGroupStatePartitionMetadataInfo info = 
shareGroupStatePartitionMetadata.get(groupId);
         if (info == null || info.initializingTopics().isEmpty() || 
topicPartitionMap.isEmpty()) {
             return new CoordinatorResult<>(List.of(), null);
         }
 
-        Map<Uuid, Set<Integer>> initializingTopics = info.initializingTopics();
-        Map<Uuid, Set<Integer>> finalInitializingTopics = new HashMap<>();
+        Map<Uuid, InitMapValue> initializingTopics = info.initializingTopics();
+        Map<Uuid, InitMapValue> finalInitializingTopics = new HashMap<>();
 
-        for (Map.Entry<Uuid, Set<Integer>> entry : 
initializingTopics.entrySet()) {
+        for (Map.Entry<Uuid, InitMapValue> entry : 
initializingTopics.entrySet()) {
             Uuid topicId = entry.getKey();
             // If topicId to clean is not present in topicPartitionMap map, 
retain it.
             if (!topicPartitionMap.containsKey(topicId)) {
                 finalInitializingTopics.put(entry.getKey(), entry.getValue());
             } else {
-                Set<Integer> partitions = new HashSet<>(entry.getValue());
+                Set<Integer> partitions = new 
HashSet<>(entry.getValue().partitions());
                 partitions.removeAll(topicPartitionMap.get(topicId));
                 if (!partitions.isEmpty()) {
-                    finalInitializingTopics.put(entry.getKey(), partitions);
+                    finalInitializingTopics.put(entry.getKey(), new 
InitMapValue(entry.getValue().name(), partitions, 
entry.getValue().timestamp()));
                 }
             }
         }
@@ -4805,8 +4823,8 @@ public class GroupMetadataManager {
             List.of(
                 newShareGroupStatePartitionMetadataRecord(
                     groupId,
-                    attachTopicName(finalInitializingTopics),
-                    attachTopicName(info.initializedTopics()),
+                    finalInitializingTopics,
+                    info.initializedTopics(),
                     attachTopicName(info.deletingTopics())
                 )
             ),
@@ -4825,14 +4843,15 @@ public class GroupMetadataManager {
         return Collections.unmodifiableMap(finalMap);
     }
 
-    private Map<Uuid, Map.Entry<String, Set<Integer>>> 
attachTopicName(Map<Uuid, Set<Integer>> initMap) {
+    private Map<Uuid, InitMapValue> attachInitValue(Map<Uuid, Set<Integer>> 
initMap) {
         TopicsImage topicsImage = metadataImage.topics();
-        Map<Uuid, Map.Entry<String, Set<Integer>>> finalMap = new HashMap<>();
+        Map<Uuid, InitMapValue> finalMap = new HashMap<>();
+        long timestamp = time.milliseconds();
         for (Map.Entry<Uuid, Set<Integer>> entry : initMap.entrySet()) {
             Uuid topicId = entry.getKey();
             TopicImage topicImage = topicsImage.getTopic(topicId);
             String topicName = (topicImage != null) ? topicImage.name() : 
"<UNKNOWN>";
-            finalMap.put(topicId, Map.entry(topicName, entry.getValue()));
+            finalMap.put(topicId, new InitMapValue(topicName, 
entry.getValue(), timestamp));
         }
         return Collections.unmodifiableMap(finalMap);
     }
@@ -4849,10 +4868,10 @@ public class GroupMetadataManager {
     ) {
         Map<Uuid, Set<Integer>> resultMap = new HashMap<>();
 
-        ShareGroupStatePartitionMetadataInfo currentMap = 
shareGroupPartitionMetadata.get(groupId);
+        ShareGroupStatePartitionMetadataInfo currentMap = 
shareGroupStatePartitionMetadata.get(groupId);
         if (currentMap != null) {
-            currentMap.initializedTopics().forEach((topicId, partitions) -> {
-                resultMap.put(topicId, new HashSet<>(partitions));
+            currentMap.initializedTopics().forEach((topicId, initMapValue) -> {
+                resultMap.put(topicId, new 
HashSet<>(initMapValue.partitions()));
             });
         }
 
@@ -5643,22 +5662,29 @@ public class GroupMetadataManager {
         // Update timeline structures with info about initialized/deleted 
topics.
         if (value == null) {
             // Tombstone!
-            shareGroupPartitionMetadata.remove(groupId);
+            shareGroupStatePartitionMetadata.remove(groupId);
         } else {
+            long timestamp = time.milliseconds();
             ShareGroupStatePartitionMetadataInfo info = new 
ShareGroupStatePartitionMetadataInfo(
                 value.initializingTopics().stream()
-                    .map(topicPartitionInfo -> 
Map.entry(topicPartitionInfo.topicId(), new 
HashSet<>(topicPartitionInfo.partitions())))
+                    .map(topicPartitionInfo -> Map.entry(
+                        topicPartitionInfo.topicId(),
+                        new InitMapValue(topicPartitionInfo.topicName(), new 
HashSet<>(topicPartitionInfo.partitions()), timestamp)))
                     .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue)),
+
                 value.initializedTopics().stream()
-                    .map(topicPartitionInfo -> 
Map.entry(topicPartitionInfo.topicId(), new 
HashSet<>(topicPartitionInfo.partitions())))
+                    .map(topicPartitionInfo -> Map.entry(
+                        topicPartitionInfo.topicId(),
+                        new InitMapValue(topicPartitionInfo.topicName(), new 
HashSet<>(topicPartitionInfo.partitions()), timestamp)))
                     .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue)),
+
                 value.deletingTopics().stream()
                     
.map(ShareGroupStatePartitionMetadataValue.TopicInfo::topicId)
                     .collect(Collectors.toSet())
             );
 
             // Init java record.
-            shareGroupPartitionMetadata.put(groupId, info);
+            shareGroupStatePartitionMetadata.put(groupId, info);
         }
     }
 
@@ -7916,13 +7942,13 @@ public class GroupMetadataManager {
      * @return Optional of object representing the share group state delete 
request.
      */
     public Optional<DeleteShareGroupStateParameters> 
shareGroupBuildPartitionDeleteRequest(String shareGroupId, 
List<CoordinatorRecord> records) {
-        if (!shareGroupPartitionMetadata.containsKey(shareGroupId)) {
+        if (!shareGroupStatePartitionMetadata.containsKey(shareGroupId)) {
             return Optional.empty();
         }
 
-        Map<Uuid, Set<Integer>> deleteCandidates = mergeShareGroupInitMaps(
-            shareGroupPartitionMetadata.get(shareGroupId).initializedTopics(),
-            shareGroupPartitionMetadata.get(shareGroupId).initializingTopics()
+        Map<Uuid, InitMapValue> deleteCandidates = combineInitMaps(
+            
shareGroupStatePartitionMetadata.get(shareGroupId).initializedTopics(),
+            
shareGroupStatePartitionMetadata.get(shareGroupId).initializingTopics()
         );
 
         // Ideally the deleting should be empty - if it is not then it implies
@@ -7931,16 +7957,16 @@ public class GroupMetadataManager {
         // a retry for the same is possible. Since this is part of an admin 
operation
         // retrying delete should not pose issues related to
         // performance. Also, the share coordinator is idempotent on delete 
partitions.
-        Map<Uuid, Set<Integer>> deletingTopics = 
shareGroupPartitionMetadata.get(shareGroupId).deletingTopics().stream()
-            .map(tid -> Map.entry(tid, 
metadataImage.topics().getTopic(tid).partitions().keySet()))
+        Map<Uuid, InitMapValue> deletingTopics = 
shareGroupStatePartitionMetadata.get(shareGroupId).deletingTopics().stream()
+            .map(tid -> {
+                TopicImage image = metadataImage.topics().getTopic(tid);
+                return Map.entry(tid, new InitMapValue(image.name(), 
image.partitions().keySet(), -1));
+            })
             .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
 
         if (!deletingTopics.isEmpty()) {
             log.info("Existing deleting entries found in share group {} - {}", 
shareGroupId, deletingTopics);
-            deleteCandidates = mergeShareGroupInitMaps(
-                deleteCandidates,
-                deletingTopics
-            );
+            deleteCandidates = combineInitMaps(deleteCandidates, 
deletingTopics);
         }
 
         if (deleteCandidates.isEmpty()) {
@@ -7949,10 +7975,10 @@ public class GroupMetadataManager {
 
         List<TopicData<PartitionIdData>> topicDataList = new 
ArrayList<>(deleteCandidates.size());
 
-        for (Map.Entry<Uuid, Set<Integer>> entry : 
deleteCandidates.entrySet()) {
+        for (Map.Entry<Uuid, InitMapValue> entry : 
deleteCandidates.entrySet()) {
             topicDataList.add(new TopicData<>(
                 entry.getKey(),
-                entry.getValue().stream()
+                entry.getValue().partitions().stream()
                     .map(PartitionFactory::newPartitionIdData)
                     .toList()
             ));
@@ -7992,15 +8018,17 @@ public class GroupMetadataManager {
         List<CoordinatorRecord> records
     ) {
         List<DeleteShareGroupStateRequestData.DeleteStateData> 
deleteShareGroupStateRequestTopicsData = new ArrayList<>();
-        Map<Uuid, Set<Integer>> initializedTopics = new HashMap<>();
 
-        ShareGroupStatePartitionMetadataInfo currentMap = 
shareGroupPartitionMetadata.get(groupId);
+        ShareGroupStatePartitionMetadataInfo currentMap = 
shareGroupStatePartitionMetadata.get(groupId);
 
         if (currentMap == null) {
             return deleteShareGroupStateRequestTopicsData;
         }
 
-        currentMap.initializedTopics().forEach((topicId, partitions) -> 
initializedTopics.put(topicId, new HashSet<>(partitions)));
+        Map<Uuid, InitMapValue> initializedTopics = new HashMap<>();
+        currentMap.initializedTopics().forEach((topicId, initValue) -> {
+            initializedTopics.put(topicId, new InitMapValue(initValue.name(), 
new HashSet<>(initValue.partitions()), initValue.timestamp()));
+        });
         Set<Uuid> deletingTopics = new HashSet<>(currentMap.deletingTopics());
 
         requestData.topics().forEach(topic -> {
@@ -8011,7 +8039,7 @@ public class GroupMetadataManager {
                 // share partitions are initialized for the group.
                 if (initializedTopics.containsKey(topicId)) {
                     List<DeleteShareGroupStateRequestData.PartitionData> 
partitions = new ArrayList<>();
-                    initializedTopics.get(topicId).forEach(partition ->
+                    
initializedTopics.get(topicId).partitions().forEach(partition ->
                         partitions.add(new 
DeleteShareGroupStateRequestData.PartitionData().setPartition(partition)));
                     deleteShareGroupStateRequestTopicsData.add(
                         new DeleteShareGroupStateRequestData.DeleteStateData()
@@ -8052,8 +8080,8 @@ public class GroupMetadataManager {
         records.add(
             
GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(
                 groupId,
-                attachTopicName(currentMap.initializingTopics()),
-                attachTopicName(initializedTopics),
+                currentMap.initializingTopics(),
+                initializedTopics,
                 attachTopicName(deletingTopics)
             )
         );
@@ -8080,7 +8108,7 @@ public class GroupMetadataManager {
             return new CoordinatorResult<>(List.of());
         }
         List<CoordinatorRecord> records = new ArrayList<>();
-        shareGroupPartitionMetadata.forEach((groupId, metadata) -> {
+        shareGroupStatePartitionMetadata.forEach((groupId, metadata) -> {
             Set<Uuid> initializingDeletedCurrent = new 
HashSet<>(metadata.initializingTopics().keySet());
             Set<Uuid> initializedDeletedCurrent = new 
HashSet<>(metadata.initializedTopics().keySet());
 
@@ -8098,10 +8126,10 @@ public class GroupMetadataManager {
             // because the call setup of this method is such that the
             // persister call is automatically done by the 
BrokerMetadataPublisher
             // increasing efficiency and removing need of chained futures.
-            Map<Uuid, Set<Integer>> finalInitializing = new 
HashMap<>(metadata.initializingTopics());
+            Map<Uuid, InitMapValue> finalInitializing = new 
HashMap<>(metadata.initializingTopics());
             initializingDeletedCurrent.forEach(finalInitializing::remove);
 
-            Map<Uuid, Set<Integer>> finalInitialized = new 
HashMap<>(metadata.initializedTopics());
+            Map<Uuid, InitMapValue> finalInitialized = new 
HashMap<>(metadata.initializedTopics());
             initializedDeletedCurrent.forEach(finalInitialized::remove);
 
             Set<Uuid> deletingTopics = new 
HashSet<>(metadata.deletingTopics());
@@ -8109,8 +8137,8 @@ public class GroupMetadataManager {
 
             
records.add(GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(
                 groupId,
-                attachTopicName(finalInitializing),
-                attachTopicName(finalInitialized),
+                finalInitializing,
+                finalInitialized,
                 attachTopicName(deletingTopics)
             ));
         });
@@ -8132,7 +8160,7 @@ public class GroupMetadataManager {
         Map<Uuid, String> topics,
         List<CoordinatorRecord> records
     ) {
-        ShareGroupStatePartitionMetadataInfo currentMap = 
shareGroupPartitionMetadata.get(groupId);
+        ShareGroupStatePartitionMetadataInfo currentMap = 
shareGroupStatePartitionMetadata.get(groupId);
 
         if (currentMap == null) {
             return List.of();
@@ -8145,8 +8173,8 @@ public class GroupMetadataManager {
         records.add(
             
GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(
                 groupId,
-                attachTopicName(currentMap.initializingTopics()),
-                attachTopicName(currentMap.initializedTopics()),
+                currentMap.initializingTopics(),
+                currentMap.initializedTopics(),
                 attachTopicName(updatedDeletingTopics)
             )
         );
@@ -8398,4 +8426,9 @@ public class GroupMetadataManager {
     static String consumerGroupSyncKey(String groupId, String memberId) {
         return "sync-" + groupId + "-" + memberId;
     }
+
+    // Visibility for testing
+    Map<String, ShareGroupStatePartitionMetadataInfo> 
shareGroupStatePartitionMetadata() {
+        return shareGroupStatePartitionMetadata;
+    }
 }
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java
index eacbab7dbff..be4711604e5 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.coordinator.group.modern.share;
 
+import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.ApiException;
 import org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.errors.UnknownMemberIdException;
@@ -31,6 +32,7 @@ import org.apache.kafka.timeline.TimelineObject;
 
 import java.util.List;
 import java.util.Locale;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 
@@ -66,6 +68,35 @@ public class ShareGroup extends 
ModernGroup<ShareGroupMember> {
         }
     }
 
+    /**
+     * A record class to hold the value representing 
ShareGroupStatePartitionMetadata for the TimelineHashmap
+     * keyed on share group id.
+     *
+     * @param initializedTopics Map of set of partition ids keyed on the topic 
id.
+     * @param deletingTopics    Set of topic ids.
+     */
+    public record ShareGroupStatePartitionMetadataInfo(
+        Map<Uuid, InitMapValue> initializingTopics,
+        Map<Uuid, InitMapValue> initializedTopics,
+        Set<Uuid> deletingTopics
+    ) {
+    }
+
+    /**
+     * Represents the value part for the initializing and initialized topic 
partitions in
+     * ShareGroupStatePartitionMetadataValue
+     *
+     * @param name          Topic name
+     * @param partitions    Set of partitions in the topic
+     * @param timestamp     Timestamp at which the record was replayed
+     */
+    public record InitMapValue(
+        String name,
+        Set<Integer> partitions,
+        long timestamp
+    ) {
+    }
+
     /**
      * The group state.
      */
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java
index be2e9df5fea..4c479d8da05 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java
@@ -52,6 +52,7 @@ import org.apache.kafka.coordinator.group.modern.MemberState;
 import org.apache.kafka.coordinator.group.modern.TopicMetadata;
 import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember;
 import 
org.apache.kafka.coordinator.group.modern.consumer.ResolvedRegularExpression;
+import org.apache.kafka.coordinator.group.modern.share.ShareGroup.InitMapValue;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 
 import org.junit.jupiter.api.Test;
@@ -333,7 +334,7 @@ public class GroupCoordinatorRecordHelpersTest {
             Map.of(),
             Map.of(
                 topicId1,
-                Map.entry(topicName1, partitions)
+                new InitMapValue(topicName1, partitions, 1)
             ),
             Map.of(
                 topicId2,
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index a511c2529ea..9c644bb7e13 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -76,6 +76,7 @@ import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse.Status;
 import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
 import org.apache.kafka.coordinator.common.runtime.CoordinatorResult;
@@ -106,6 +107,7 @@ import 
org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupBuilder;
 import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember;
 import 
org.apache.kafka.coordinator.group.modern.consumer.ResolvedRegularExpression;
 import org.apache.kafka.coordinator.group.modern.share.ShareGroup;
+import org.apache.kafka.coordinator.group.modern.share.ShareGroup.InitMapValue;
 import org.apache.kafka.coordinator.group.modern.share.ShareGroupBuilder;
 import org.apache.kafka.coordinator.group.modern.share.ShareGroupMember;
 import org.apache.kafka.coordinator.group.streams.MockTaskAssignor;
@@ -15094,10 +15096,10 @@ public class GroupMetadataManagerTest {
         assertRecordsEquals(expectedRecords, result.records());
     }
 
-    private Map<Uuid, Map.Entry<String, Set<Integer>>> 
mkShareGroupStateMap(List<Map.Entry<Uuid, Map.Entry<String, Set<Integer>>>> 
entries) {
-        Map<Uuid, Map.Entry<String, Set<Integer>>> map = new HashMap<>();
+    private Map<Uuid, InitMapValue> mkShareGroupStateMap(List<Map.Entry<Uuid, 
Map.Entry<String, Set<Integer>>>> entries) {
+        Map<Uuid, InitMapValue> map = new HashMap<>();
         for (Map.Entry<Uuid, Map.Entry<String, Set<Integer>>> entry : entries) 
{
-            map.put(entry.getKey(), entry.getValue());
+            map.put(entry.getKey(), new 
InitMapValue(entry.getValue().getKey(), entry.getValue().getValue(), 1));
         }
         return map;
     }
@@ -20675,8 +20677,8 @@ public class GroupMetadataManagerTest {
         context.replay(
             
GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(
                 groupId,
-                Map.of(t1Uuid, Map.entry(t1Name, Set.of(0, 1))),
-                Map.of(t2Uuid, Map.entry(t2Name, Set.of(0, 1))),
+                Map.of(t1Uuid, new InitMapValue(t1Name, Set.of(0, 1), 1)),
+                Map.of(t2Uuid, new InitMapValue(t2Name, Set.of(0, 1), 1)),
                 Map.of()
             )
         );
@@ -20742,8 +20744,8 @@ public class GroupMetadataManagerTest {
         context.replay(
             
GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(
                 groupId,
-                Map.of(t1Uuid, Map.entry(t1Name, Set.of(0, 1))),
-                Map.of(t2Uuid, Map.entry(t2Name, Set.of(0, 1))),
+                Map.of(t1Uuid, new InitMapValue(t1Name, Set.of(0, 1), 1)),
+                Map.of(t2Uuid, new InitMapValue(t2Name, Set.of(0, 1), 1)),
                 Map.of(t3Uuid, t3Name)
             )
         );
@@ -20804,8 +20806,8 @@ public class GroupMetadataManagerTest {
                 groupId,
                 Map.of(),
                 Map.of(
-                    topicId1, Map.entry(topicName1, Set.of(0, 1, 2)),
-                    topicId2, Map.entry(topicName2, Set.of(0, 1))
+                    topicId1, new InitMapValue(topicName1, Set.of(0, 1, 2), 1),
+                    topicId2, new InitMapValue(topicName2, Set.of(0, 1), 1)
                 ),
                 Map.of()
             )
@@ -20897,8 +20899,8 @@ public class GroupMetadataManagerTest {
                 groupId,
                 Map.of(),
                 Map.of(
-                    topicId1, Map.entry(topicName1, Set.of(0, 1, 2)),
-                    topicId2, Map.entry(topicName2, Set.of(0, 1))
+                    topicId1, new InitMapValue(topicName1, Set.of(0, 1, 2), 1),
+                    topicId2, new InitMapValue(topicName2, Set.of(0, 1), 1)
                 ),
                 Map.of(
                     topicId3, topicName3,
@@ -21003,7 +21005,7 @@ public class GroupMetadataManagerTest {
             
GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(
                 groupId,
                 Map.of(),
-                Map.of(topicId1, Map.entry(topicName1, Set.of(0, 1, 2))),
+                Map.of(topicId1, new InitMapValue(topicName1, Set.of(0, 1, 2), 
1)),
                 Map.of()
             )
         );
@@ -21086,8 +21088,8 @@ public class GroupMetadataManagerTest {
         context.replay(
             
GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(
                 groupId,
-                Map.of(topicId2, Map.entry(topicName2, Set.of(0, 1))),
-                Map.of(topicId1, Map.entry(topicName1, Set.of(0, 1, 2))),
+                Map.of(topicId2, new InitMapValue(topicName2, Set.of(0, 1), 
1)),
+                Map.of(topicId1, new InitMapValue(topicName1, Set.of(0, 1, 2), 
1)),
                 Map.of()
             )
         );
@@ -21110,7 +21112,7 @@ public class GroupMetadataManagerTest {
         List<CoordinatorRecord> expectedRecords = List.of(
             newShareGroupStatePartitionMetadataRecord(
                 groupId,
-                Map.of(topicId2, Map.entry(topicName2, Set.of(0, 1))),
+                Map.of(topicId2, new InitMapValue(topicName2, Set.of(0, 1), 
1)),
                 Map.of(),
                 Map.of(topicId1, topicName1)
             )
@@ -21171,8 +21173,8 @@ public class GroupMetadataManagerTest {
         context.replay(
             
GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(
                 groupId,
-                Map.of(topicId2, Map.entry(topicName2, Set.of(0, 1))),
-                Map.of(topicId1, Map.entry(topicName1, Set.of(0, 1, 2))),
+                Map.of(topicId2, new InitMapValue(topicName2, Set.of(0, 1), 
1)),
+                Map.of(topicId1, new InitMapValue(topicName1, Set.of(0, 1, 2), 
1)),
                 Map.of()
             )
         );
@@ -21195,7 +21197,7 @@ public class GroupMetadataManagerTest {
         List<CoordinatorRecord> expectedRecords = List.of(
             newShareGroupStatePartitionMetadataRecord(
                 groupId,
-                Map.of(topicId2, Map.entry(topicName2, Set.of(0, 1))),
+                Map.of(topicId2, new InitMapValue(topicName2, Set.of(0, 1), 
1)),
                 Map.of(),
                 Map.of(topicId1, topicName1)
             )
@@ -21488,8 +21490,12 @@ public class GroupMetadataManagerTest {
     public void testShareGroupHeartbeatPersisterRequestWithInitializing() {
         MockPartitionAssignor assignor = new MockPartitionAssignor("range");
         assignor.prepareGroupAssignment(new GroupAssignment(Map.of()));
+        MockTime time = new MockTime();
+        int offsetWriteTimeout = 10;
         GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
             .withShareGroupAssignor(assignor)
+            .withTime(time)
+            
.withConfig(GroupCoordinatorConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG, 
offsetWriteTimeout)
             .build();
 
         Uuid t1Uuid = Uuid.randomUuid();
@@ -21529,7 +21535,7 @@ public class GroupMetadataManagerTest {
                 .setMemberEpoch(0)
                 .setSubscribedTopicNames(List.of(t1Name)));
 
-        assertTrue(result.records().contains(
+        assertFalse(result.records().contains(
             newShareGroupStatePartitionMetadataRecord(groupId, 
mkShareGroupStateMap(List.of(
                     mkShareGroupStateMetadataEntry(t1Uuid, t1Name, List.of(0, 
1))
                 )),
@@ -21543,6 +21549,48 @@ public class GroupMetadataManagerTest {
             Map.of(t1Uuid, Set.of(0, 1)),
             groupId,
             1,
+            false
+        );
+
+        // Manipulate time so the initializing topic becomes eligible for retry
+        context.groupMetadataManager.replay(
+            new ShareGroupStatePartitionMetadataKey()
+                .setGroupId(groupId),
+            new ShareGroupStatePartitionMetadataValue()
+                .setInitializingTopics(List.of(
+                    new 
ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
+                        .setTopicId(t1Uuid)
+                        .setTopicName(t1Name)
+                        .setPartitions(List.of(0, 1))
+                ))
+                .setInitializedTopics(List.of())
+                .setDeletingTopics(List.of())
+        );
+
+        long timeNow = time.milliseconds() + offsetWriteTimeout * 2 + 1;
+        time.setCurrentTimeMs(timeNow);
+        memberId = Uuid.randomUuid();
+        result = context.shareGroupHeartbeat(
+            new ShareGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId.toString())
+                .setMemberEpoch(0)
+                .setSubscribedTopicNames(List.of(t1Name)));
+
+        assertTrue(result.records().contains(
+            newShareGroupStatePartitionMetadataRecord(groupId, 
mkShareGroupStateMap(List.of(
+                    mkShareGroupStateMetadataEntry(t1Uuid, t1Name, List.of(0, 
1))
+                )),
+                Map.of(),
+                Map.of()
+            ))
+        );
+
+        verifyShareGroupHeartbeatInitializeRequest(
+            result.response().getValue(),
+            Map.of(t1Uuid, Set.of(0, 1)),
+            groupId,
+            2,
             true
         );
     }
@@ -21586,7 +21634,7 @@ public class GroupMetadataManagerTest {
         );
 
         List<CoordinatorRecord> records = new ArrayList<>();
-        context.groupMetadataManager.addInitializingTopicsRecords(groupId, 
records, Map.of(t1Uuid, Set.of(0, 1)));
+        context.groupMetadataManager.addInitializingTopicsRecords(groupId, 
records, Map.of(t1Uuid, new InitMapValue(t1Name, Set.of(0, 1), 1)));
 
         List<CoordinatorRecord> expectedRecords = List.of(
             CoordinatorRecord.record(
@@ -21633,16 +21681,31 @@ public class GroupMetadataManagerTest {
                 .setEpoch(0)
         );
 
+        context.groupMetadataManager.replay(
+            new ShareGroupStatePartitionMetadataKey()
+                .setGroupId(groupId),
+            new ShareGroupStatePartitionMetadataValue()
+                .setInitializingTopics(List.of(
+                    new 
ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
+                        .setTopicId(topicId)
+                        .setTopicName(topicName)
+                        .setPartitions(List.of(0, 1))
+                ))
+                .setInitializedTopics(List.of())
+                .setDeletingTopics(List.of())
+        );
+
         Map<Uuid, Set<Integer>> snapshotMetadataInitializeMap = Map.of(
             topicId,
             Set.of(0, 1)
         );
 
-        Map<Uuid, Map.Entry<String, Set<Integer>>> 
snapshotMetadataInitializeRecordMap = Map.of(
+        Map<Uuid, InitMapValue> snapshotMetadataInitializeRecordMap = Map.of(
             topicId,
-            Map.entry(
+            new InitMapValue(
                 topicName,
-                Set.of(0, 1)
+                Set.of(0, 1),
+                0
             )
         );
 
@@ -21652,6 +21715,8 @@ public class GroupMetadataManagerTest {
 
         assertNull(result.response());
         assertEquals(List.of(record), result.records());
+        // Make sure the timeline map is not modified yet.
+        assertEquals(snapshotMetadataInitializeRecordMap, 
context.groupMetadataManager.shareGroupStatePartitionMetadata().get(groupId).initializingTopics());
     }
 
     @Test
@@ -21689,11 +21754,15 @@ public class GroupMetadataManagerTest {
         Uuid topicId = Uuid.randomUuid();
         int partitions = 1;
         String groupId = "foogrp";
+        MockTime time = new MockTime();
+        int offsetWriteTimeout = 10;
 
         MockPartitionAssignor assignor = new MockPartitionAssignor("simple");
         assignor.prepareGroupAssignment(new GroupAssignment(Map.of()));
         GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
             .withShareGroupAssignor(assignor)
+            .withTime(time)
+            
.withConfig(GroupCoordinatorConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG, 
offsetWriteTimeout)
             .build();
 
         // Empty on empty subscription metadata
@@ -21703,9 +21772,11 @@ public class GroupMetadataManagerTest {
         );
 
         // No error on empty initialized metadata (no replay of initialized 
topics)
+        long timeNow = time.milliseconds() + 100;
+        time.setCurrentTimeMs(timeNow);
         assertEquals(
             Map.of(
-                topicId, Set.of(0)
+                topicId, new InitMapValue(topicName, Set.of(0), timeNow)
             ),
             context.groupMetadataManager.subscribedTopicsChangeMap(groupId, 
Map.of(
                 topicName, new TopicMetadata(topicId, topicName, partitions)
@@ -21726,6 +21797,7 @@ public class GroupMetadataManagerTest {
             new ShareGroupMetadataValue()
                 .setEpoch(0)
         );
+
         context.groupMetadataManager.replay(
             new ShareGroupStatePartitionMetadataKey()
                 .setGroupId(groupId),
@@ -21746,10 +21818,12 @@ public class GroupMetadataManagerTest {
         );
 
         // Since t1 is initializing and t2 is initialized due to replay above.
+        timeNow = timeNow + 2 * offsetWriteTimeout + 1;
+        time.setCurrentTimeMs(timeNow);
         assertEquals(
             Map.of(
-                t1Id, Set.of(0, 1),
-                t3Id, Set.of(0, 1, 2)
+                t1Id, new InitMapValue(t1Name, Set.of(0, 1), timeNow),      // 
initializing
+                t3Id, new InitMapValue(t3Name, Set.of(0, 1, 2), timeNow)    // 
initialized
             ),
             context.groupMetadataManager.subscribedTopicsChangeMap(groupId, 
Map.of(
                 t1Name, new TopicMetadata(t1Id, t1Name, 2),
@@ -21818,32 +21892,11 @@ public class GroupMetadataManagerTest {
         result = 
context.groupMetadataManager.uninitializeShareGroupState(groupId, Map.of(t1Id, 
Set.of(0, 1)));
         Set<Integer> partitions = new LinkedHashSet<>(List.of(0, 1, 2));
         assertEquals(
-            List.of(newShareGroupStatePartitionMetadataRecord(groupId, 
Map.of(), Map.of(t2Id, Map.entry(t2Name, partitions)), Map.of())),
+            List.of(newShareGroupStatePartitionMetadataRecord(groupId, 
Map.of(), Map.of(t2Id, new InitMapValue(t2Name, partitions, 1)), Map.of())),
             result.records()
         );
     }
 
-    @Test
-    public void testMergeShareGroupInitMaps() {
-        Map<Uuid, Set<Integer>> m1 = new HashMap<>();
-        Map<Uuid, Set<Integer>> m2 = new HashMap<>();
-
-        Uuid t1 = Uuid.randomUuid();
-        Uuid t2 = Uuid.randomUuid();
-        Uuid t3 = Uuid.randomUuid();
-
-        m1.put(t1, new HashSet<>(List.of(1, 2)));
-        m1.put(t2, new HashSet<>(List.of(3, 4)));
-        m2.put(t1, new HashSet<>(List.of(3, 4)));
-        m2.put(t3, new HashSet<>(List.of(5, 6)));
-
-        Map<Uuid, Set<Integer>> m3 = 
GroupMetadataManager.mergeShareGroupInitMaps(m1, m2);
-        // The arg maps should not be overridden.
-        assertEquals(Map.of(t1, Set.of(1, 2), t2, Set.of(3, 4)), m1);
-        assertEquals(Map.of(t1, Set.of(3, 4), t3, Set.of(5, 6)), m2);
-        assertEquals(Map.of(t1, Set.of(1, 2, 3, 4), t2, Set.of(3, 4), t3, 
Set.of(5, 6)), m3);
-    }
-
     @Test
     public void testMaybeCleanupShareGroupStateEmptyTopicIds() {
         MockPartitionAssignor assignor = new MockPartitionAssignor("simple");
@@ -21963,6 +22016,47 @@ public class GroupMetadataManagerTest {
         assertEquals(expectedResult, 
context.groupMetadataManager.maybeCleanupShareGroupState(Set.of(t1Id, t2Id)));
     }
 
+    @Test
+    public void testCombineInitMaps() {
+        // Both empty.
+        Map<Uuid, InitMapValue> m1 = Map.of();
+        Map<Uuid, InitMapValue> m2 = Map.of();
+
+        assertEquals(Map.of(), GroupMetadataManager.combineInitMaps(m1, m2));
+
+        Uuid t1Id = Uuid.randomUuid();
+        String t1Name = "t1";
+        Uuid t2Id = Uuid.randomUuid();
+        String t2Name = "t2";
+
+        // m1 non-empty, m2 empty.
+        m1 = Map.of(t1Id, new InitMapValue(t1Name, Set.of(0), 1));
+        assertEquals(m1, GroupMetadataManager.combineInitMaps(m1, m2));
+
+        // m1 empty, m2 non-empty.
+        m1 = Map.of();
+        m2 = Map.of(t1Id, new InitMapValue(t1Name, Set.of(0), 1));
+        assertEquals(m2, GroupMetadataManager.combineInitMaps(m1, m2));
+
+        // m1 non-empty, m2 non-empty.
+        m1 = Map.of(t1Id, new InitMapValue(t1Name, Set.of(0), 1));
+        m2 = Map.of(t2Id, new InitMapValue(t2Name, Set.of(0), 1));
+        assertEquals(Map.of(
+            t1Id, new InitMapValue(t1Name, Set.of(0), 1),
+            t2Id, new InitMapValue(t2Name, Set.of(0), 1)
+        ), GroupMetadataManager.combineInitMaps(m1, m2));
+
+        // m1 non-empty, m2 non-empty (differ by partition)
+        m1 = Map.of(t1Id, new InitMapValue(t1Name, Set.of(0), 1));
+        m2 = Map.of(t1Id, new InitMapValue(t1Name, Set.of(1), 2));
+        assertEquals(Map.of(t1Id, new InitMapValue(t1Name, Set.of(0, 1), 2)), 
GroupMetadataManager.combineInitMaps(m1, m2));
+
+        // m1 and m2 exactly same
+        m1 = Map.of(t1Id, new InitMapValue(t1Name, Set.of(0), 1));
+        m2 = Map.of(t1Id, new InitMapValue(t1Name, Set.of(0), 1));
+        assertEquals(Map.of(t1Id, new InitMapValue(t1Name, Set.of(0), 1)), 
GroupMetadataManager.combineInitMaps(m1, m2));
+    }
+
     private static void checkJoinGroupResponse(
         JoinGroupResponseData expectedResponse,
         JoinGroupResponseData actualResponse,
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
index 112653dceea..dc0670656d0 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
@@ -460,7 +460,7 @@ public class GroupMetadataManagerTestContext {
     }
 
     public static class Builder {
-        private final MockTime time = new MockTime(0, 0, 0);
+        private MockTime time = new MockTime(0, 0, 0);
         private final MockCoordinatorTimer<Void, CoordinatorRecord> timer = 
new MockCoordinatorTimer<>(time);
         private final MockCoordinatorExecutor<CoordinatorRecord> executor = 
new MockCoordinatorExecutor<>();
         private final LogContext logContext = new LogContext();
@@ -516,6 +516,11 @@ public class GroupMetadataManagerTestContext {
             return this;
         }
 
+        public Builder withTime(MockTime time) {
+            this.time = time;
+            return this;
+        }
+
         public GroupMetadataManagerTestContext build() {
             if (metadataImage == null) metadataImage = MetadataImage.EMPTY;
             if (groupConfigManager == null) groupConfigManager = 
createConfigManager();
diff --git 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
index 7f6c1c47ff5..b61671ed991 100644
--- 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
+++ 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
@@ -772,11 +772,11 @@ public class ShareCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
 
         SharePartitionKey mapKey = SharePartitionKey.getInstance(groupId, 
topicId, partitionId);
         if (partitionData.leaderEpoch() != -1 && 
leaderEpochMap.containsKey(mapKey) && leaderEpochMap.get(mapKey) > 
partitionData.leaderEpoch()) {
-            log.error("Request leader epoch smaller than last recorded.");
+            log.error("Write request leader epoch is smaller than last 
recorded current: {}, requested: {}.", leaderEpochMap.get(mapKey), 
partitionData.leaderEpoch());
             return 
Optional.of(getWriteErrorCoordinatorResult(Errors.FENCED_LEADER_EPOCH, null, 
topicId, partitionId));
         }
         if (partitionData.stateEpoch() != -1 && 
stateEpochMap.containsKey(mapKey) && stateEpochMap.get(mapKey) > 
partitionData.stateEpoch()) {
-            log.error("Request state epoch smaller than last recorded.");
+            log.info("Write request state epoch is smaller than last recorded 
current: {}, requested: {}.", stateEpochMap.get(mapKey), 
partitionData.stateEpoch());
             return 
Optional.of(getWriteErrorCoordinatorResult(Errors.FENCED_STATE_EPOCH, null, 
topicId, partitionId));
         }
         if (metadataImage == null) {
@@ -814,7 +814,7 @@ public class ShareCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
 
         SharePartitionKey mapKey = SharePartitionKey.getInstance(groupId, 
topicId, partitionId);
         if (leaderEpochMap.containsKey(mapKey) && leaderEpochMap.get(mapKey) > 
partitionData.leaderEpoch()) {
-            log.error("Request leader epoch id is smaller than last 
recorded.");
+            log.error("Read request leader epoch is smaller than last recorded 
current: {}, requested: {}.", leaderEpochMap.get(mapKey), 
partitionData.leaderEpoch());
             return 
Optional.of(ReadShareGroupStateResponse.toErrorResponseData(topicId, 
partitionId, Errors.FENCED_LEADER_EPOCH, Errors.FENCED_LEADER_EPOCH.message()));
         }
 
@@ -915,7 +915,7 @@ public class ShareCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
 
         SharePartitionKey key = 
SharePartitionKey.getInstance(request.groupId(), topicId, partitionId);
         if (partitionData.stateEpoch() != -1 && stateEpochMap.containsKey(key) 
&& stateEpochMap.get(key) > partitionData.stateEpoch()) {
-            log.error("Initialize request state epoch smaller than last 
recorded.");
+            log.info("Initialize request state epoch is smaller than last 
recorded current: {}, requested: {}.", stateEpochMap.get(key), 
partitionData.stateEpoch());
             return 
Optional.of(getInitializeErrorCoordinatorResult(Errors.FENCED_STATE_EPOCH, 
Errors.FENCED_STATE_EPOCH.exception(), topicId, partitionId));
         }
 

Reply via email to