This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 425f0285569 KAFKA-17747: [5/N] Replace subscription metadata with 
metadata hash in stream group (#19802)
425f0285569 is described below

commit 425f0285569c67e6553d33822cb42370a4948010
Author: PoAn Yang <[email protected]>
AuthorDate: Tue Jun 3 19:21:34 2025 +0800

    KAFKA-17747: [5/N] Replace subscription metadata with metadata hash in 
stream group (#19802)
    
    * Use metadata hash to replace subscription metadata.
    * Remove `StreamsGroupPartitionMetadataKey` and
    `StreamsGroupPartitionMetadataValue`.
    * Check whether `configuredTopology` is empty. If it's, call
    `InternalTopicManager.configureTopics` and set the result to the group.
    
    Reviewers: Lucas Brutschy <[email protected]>
    
    ---------
    
    Signed-off-by: PoAn Yang <[email protected]>
---
 .../coordinator/group/GroupCoordinatorShard.java   |   9 -
 .../coordinator/group/GroupMetadataManager.java    |  75 ++---
 .../streams/StreamsCoordinatorRecordHelpers.java   |  54 ----
 .../coordinator/group/streams/StreamsGroup.java    |  81 +++---
 .../group/streams/TargetAssignmentBuilder.java     |  17 +-
 .../coordinator/group/streams/TopicMetadata.java   |   8 -
 .../group/streams/TopologyMetadata.java            |  26 +-
 .../group/streams/topics/ConfiguredTopology.java   |   2 +
 .../topics/EndpointToPartitionsManager.java        |  22 +-
 .../group/streams/topics/InternalTopicManager.java |  57 ++--
 .../message/StreamsGroupPartitionMetadataKey.json  |  27 --
 .../StreamsGroupPartitionMetadataValue.json        |  34 ---
 .../group/GroupCoordinatorShardTest.java           |  56 ----
 .../group/GroupMetadataManagerTest.java            | 305 ++++++++++-----------
 .../group/GroupMetadataManagerTestContext.java     |  23 +-
 .../StreamsCoordinatorRecordHelpersTest.java       |  65 -----
 .../group/streams/StreamsGroupBuilder.java         |  18 +-
 .../group/streams/StreamsGroupTest.java            | 178 +++---------
 .../group/streams/TargetAssignmentBuilderTest.java |  14 +-
 .../group/streams/TopicMetadataTest.java           |  15 -
 .../group/streams/TopologyMetadataTest.java        |  24 +-
 .../streams/topics/ConfiguredTopologyTest.java     |  11 +-
 .../topics/EndpointToPartitionsManagerTest.java    |  23 +-
 .../streams/topics/InternalTopicManagerTest.java   |  23 +-
 24 files changed, 380 insertions(+), 787 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
index 27d2183b5a0..8016378943c 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
@@ -109,8 +109,6 @@ import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKe
 import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
 import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey;
 import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue;
-import 
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey;
-import 
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue;
 import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey;
 import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue;
 import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey;
@@ -1299,13 +1297,6 @@ public class GroupCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
                 );
                 break;
 
-            case STREAMS_GROUP_PARTITION_METADATA:
-                groupMetadataManager.replay(
-                    (StreamsGroupPartitionMetadataKey) key,
-                    (StreamsGroupPartitionMetadataValue) 
Utils.messageOrNull(value)
-                );
-                break;
-
             case STREAMS_GROUP_MEMBER_METADATA:
                 groupMetadataManager.replay(
                     (StreamsGroupMemberMetadataKey) key,
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index bb2106d6ce0..9a5fdf2bdab 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -129,8 +129,6 @@ import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKe
 import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
 import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey;
 import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue;
-import 
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey;
-import 
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue;
 import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey;
 import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue;
 import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey;
@@ -260,7 +258,6 @@ import static 
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecor
 import static 
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord;
 import static 
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord;
 import static 
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord;
-import static 
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord;
 import static 
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord;
 import static 
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord;
 import static 
org.apache.kafka.coordinator.group.streams.StreamsGroupMember.hasAssignedTasksChanged;
@@ -1894,43 +1891,42 @@ public class GroupMetadataManager {
         StreamsTopology updatedTopology = maybeUpdateTopology(groupId, 
memberId, topology, group, records);
         maybeSetTopologyStaleStatus(group, updatedMember, returnedStatus);
 
-        // 3. Determine the partition metadata and any internal topics if 
needed.
+        // 3. Determine any internal topics if needed.
         ConfiguredTopology updatedConfiguredTopology;
-        Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> 
updatedPartitionMetadata;
         boolean reconfigureTopology = group.topology().isEmpty();
-        if (reconfigureTopology || group.hasMetadataExpired(currentTimeMs)) {
+        long metadataHash = group.metadataHash();
+        if (reconfigureTopology || group.configuredTopology().isEmpty() || 
group.hasMetadataExpired(currentTimeMs)) {
 
-            updatedPartitionMetadata = group.computePartitionMetadata(
-                metadataImage.topics(),
+            metadataHash = group.computeMetadataHash(
+                metadataImage,
+                topicHashCache,
                 updatedTopology
             );
 
-            if (!updatedPartitionMetadata.equals(group.partitionMetadata())) {
-                log.info("[GroupId {}][MemberId {}] Computed new partition 
metadata: {}.",
-                    groupId, memberId, updatedPartitionMetadata);
+            if (metadataHash != group.metadataHash()) {
+                log.info("[GroupId {}][MemberId {}] Computed new metadata 
hash: {}.",
+                    groupId, memberId, metadataHash);
                 bumpGroupEpoch = true;
                 reconfigureTopology = true;
-                records.add(newStreamsGroupPartitionMetadataRecord(groupId, 
updatedPartitionMetadata));
-                group.setPartitionMetadata(updatedPartitionMetadata);
             }
 
             if (reconfigureTopology || group.configuredTopology().isEmpty()) {
                 log.info("[GroupId {}][MemberId {}] Configuring the topology 
{}", groupId, memberId, updatedTopology);
-                updatedConfiguredTopology = 
InternalTopicManager.configureTopics(logContext, updatedTopology, 
updatedPartitionMetadata);
+                updatedConfiguredTopology = 
InternalTopicManager.configureTopics(logContext, metadataHash, updatedTopology, 
metadataImage.topics());
+                group.setConfiguredTopology(updatedConfiguredTopology);
             } else {
                 updatedConfiguredTopology = group.configuredTopology().get();
             }
         } else {
             updatedConfiguredTopology = group.configuredTopology().get();
-            updatedPartitionMetadata = group.partitionMetadata();
         }
 
         // Actually bump the group epoch
         int groupEpoch = group.groupEpoch();
         if (bumpGroupEpoch) {
             groupEpoch += 1;
-            records.add(newStreamsGroupEpochRecord(groupId, groupEpoch, 0));
-            log.info("[GroupId {}][MemberId {}] Bumped streams group epoch to 
{}.", groupId, memberId, groupEpoch);
+            records.add(newStreamsGroupEpochRecord(groupId, groupEpoch, 
metadataHash));
+            log.info("[GroupId {}][MemberId {}] Bumped streams group epoch to 
{} with metadata hash {}.", groupId, memberId, groupEpoch, metadataHash);
             metrics.record(STREAMS_GROUP_REBALANCES_SENSOR_NAME);
             group.setMetadataRefreshDeadline(currentTimeMs + 
METADATA_REFRESH_INTERVAL_MS, groupEpoch);
         }
@@ -1946,7 +1942,7 @@ public class GroupMetadataManager {
                 groupEpoch,
                 updatedMember,
                 updatedConfiguredTopology,
-                updatedPartitionMetadata,
+                metadataImage,
                 records
             );
             targetAssignmentEpoch = groupEpoch;
@@ -2111,7 +2107,7 @@ public class GroupMetadataManager {
                 final StreamsGroupHeartbeatResponseData.Endpoint 
responseEndpoint = new StreamsGroupHeartbeatResponseData.Endpoint();
                 responseEndpoint.setHost(endpoint.host());
                 responseEndpoint.setPort(endpoint.port());
-                StreamsGroupHeartbeatResponseData.EndpointToPartitions 
endpointToPartitions = 
EndpointToPartitionsManager.endpointToPartitions(groupMember, responseEndpoint, 
group);
+                StreamsGroupHeartbeatResponseData.EndpointToPartitions 
endpointToPartitions = 
EndpointToPartitionsManager.endpointToPartitions(groupMember, responseEndpoint, 
group, metadataImage);
                 endpointToPartitionsList.add(endpointToPartitions);
             }
         }
@@ -3795,12 +3791,12 @@ public class GroupMetadataManager {
     }
 
     /**
-     * Updates the target assignment according to the updated member and 
subscription metadata.
+     * Updates the target assignment according to the updated member and 
metadata image.
      *
      * @param group                The StreamsGroup.
      * @param groupEpoch           The group epoch.
      * @param updatedMember        The updated member.
-     * @param subscriptionMetadata The subscription metadata.
+     * @param metadataImage        The metadata image.
      * @param records              The list to accumulate any new records.
      * @return The new target assignment.
      */
@@ -3809,7 +3805,7 @@ public class GroupMetadataManager {
         int groupEpoch,
         StreamsGroupMember updatedMember,
         ConfiguredTopology configuredTopology,
-        Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> 
subscriptionMetadata,
+        MetadataImage metadataImage,
         List<CoordinatorRecord> records
     ) {
         TaskAssignor assignor = streamsGroupAssignor(group.groupId());
@@ -3825,7 +3821,7 @@ public class GroupMetadataManager {
                 .withMembers(group.members())
                 .withTopology(configuredTopology)
                 .withStaticMembers(group.staticMembers())
-                .withPartitionMetadata(subscriptionMetadata)
+                .withMetadataImage(metadataImage)
                 .withTargetAssignment(group.targetAssignment())
                 .addOrUpdateMember(updatedMember.memberId(), updatedMember);
 
@@ -5282,6 +5278,7 @@ public class GroupMetadataManager {
         if (value != null) {
             StreamsGroup streamsGroup = 
getOrMaybeCreatePersistedStreamsGroup(groupId, true);
             streamsGroup.setGroupEpoch(value.epoch());
+            streamsGroup.setMetadataHash(value.metadataHash());
         } else {
             StreamsGroup streamsGroup;
             try {
@@ -5304,38 +5301,6 @@ public class GroupMetadataManager {
 
     }
 
-    /**
-     * Replays StreamsGroupPartitionMetadataKey/Value to update the hard state 
of
-     * the streams group. It updates the subscription metadata of the streams
-     * group.
-     *
-     * @param key   A StreamsGroupPartitionMetadataKey key.
-     * @param value A StreamsGroupPartitionMetadataValue record.
-     */
-    public void replay(
-        StreamsGroupPartitionMetadataKey key,
-        StreamsGroupPartitionMetadataValue value
-    ) {
-        String groupId = key.groupId();
-        StreamsGroup streamsGroup;
-        try {
-            streamsGroup = getOrMaybeCreatePersistedStreamsGroup(groupId, 
value != null);
-        } catch (GroupIdNotFoundException ex) {
-            // If the group does not exist, we can ignore the tombstone.
-            return;
-        }
-
-        if (value != null) {
-            Map<String, 
org.apache.kafka.coordinator.group.streams.TopicMetadata> partitionMetadata = 
new HashMap<>();
-            value.topics().forEach(topicMetadata -> {
-                partitionMetadata.put(topicMetadata.topicName(), 
org.apache.kafka.coordinator.group.streams.TopicMetadata.fromRecord(topicMetadata));
-            });
-            streamsGroup.setPartitionMetadata(partitionMetadata);
-        } else {
-            streamsGroup.setPartitionMetadata(Map.of());
-        }
-    }
-
     /**
      * Replays ShareGroupMemberMetadataKey/Value to update the hard state of
      * the share group. It updates the subscription part of the member or
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java
index ea002d2e130..d54f7273eb0 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java
@@ -24,8 +24,6 @@ import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKe
 import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
 import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey;
 import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue;
-import 
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey;
-import 
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue;
 import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey;
 import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue;
 import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey;
@@ -98,58 +96,6 @@ public class StreamsCoordinatorRecordHelpers {
         );
     }
 
-    /**
-     * Creates a StreamsGroupPartitionMetadata record.
-     *
-     * @param groupId              The streams group id.
-     * @param newPartitionMetadata The partition metadata.
-     * @return The record.
-     */
-    public static CoordinatorRecord newStreamsGroupPartitionMetadataRecord(
-        String groupId,
-        Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> 
newPartitionMetadata
-    ) {
-        Objects.requireNonNull(groupId, "groupId should not be null here");
-        Objects.requireNonNull(newPartitionMetadata, "newPartitionMetadata 
should not be null here");
-
-        StreamsGroupPartitionMetadataValue value = new 
StreamsGroupPartitionMetadataValue();
-        newPartitionMetadata.forEach((topicName, topicMetadata) -> {
-            value.topics().add(new 
StreamsGroupPartitionMetadataValue.TopicMetadata()
-                .setTopicId(topicMetadata.id())
-                .setTopicName(topicMetadata.name())
-                .setNumPartitions(topicMetadata.numPartitions())
-            );
-        });
-
-        
value.topics().sort(Comparator.comparing(StreamsGroupPartitionMetadataValue.TopicMetadata::topicName));
-
-        return CoordinatorRecord.record(
-            new StreamsGroupPartitionMetadataKey()
-                .setGroupId(groupId),
-            new ApiMessageAndVersion(
-                value,
-                (short) 0
-            )
-        );
-    }
-
-    /**
-     * Creates a StreamsGroupPartitionMetadata tombstone.
-     *
-     * @param groupId The streams group id.
-     * @return The record.
-     */
-    public static CoordinatorRecord 
newStreamsGroupPartitionMetadataTombstoneRecord(
-        String groupId
-    ) {
-        Objects.requireNonNull(groupId, "groupId should not be null here");
-
-        return CoordinatorRecord.tombstone(
-            new StreamsGroupPartitionMetadataKey()
-                .setGroupId(groupId)
-        );
-    }
-
     public static CoordinatorRecord newStreamsGroupEpochRecord(
         String groupId,
         int newGroupEpoch,
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
index 3a38e1d0a1d..061f816296a 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
@@ -29,15 +29,16 @@ import 
org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
 import org.apache.kafka.coordinator.group.Group;
 import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
 import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
+import org.apache.kafka.coordinator.group.Utils;
 import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
 import org.apache.kafka.coordinator.group.streams.topics.ConfiguredSubtopology;
 import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
-import org.apache.kafka.coordinator.group.streams.topics.InternalTopicManager;
+import org.apache.kafka.image.MetadataImage;
 import org.apache.kafka.image.TopicImage;
-import org.apache.kafka.image.TopicsImage;
 import org.apache.kafka.timeline.SnapshotRegistry;
 import org.apache.kafka.timeline.TimelineHashMap;
 import org.apache.kafka.timeline.TimelineInteger;
+import org.apache.kafka.timeline.TimelineLong;
 import org.apache.kafka.timeline.TimelineObject;
 
 import org.slf4j.Logger;
@@ -152,6 +153,11 @@ public class StreamsGroup implements Group {
      */
     private final TimelineHashMap<String, TopicMetadata> partitionMetadata;
 
+    /**
+     * The metadata hash which is computed based on the all subscribed topics.
+     */
+    protected final TimelineLong metadataHash;
+
     /**
      * The target assignment epoch. An assignment epoch smaller than the group 
epoch means that a new assignment is required. The assignment
      * epoch is updated when a new assignment is installed.
@@ -226,6 +232,7 @@ public class StreamsGroup implements Group {
         this.members = new TimelineHashMap<>(snapshotRegistry, 0);
         this.staticMembers = new TimelineHashMap<>(snapshotRegistry, 0);
         this.partitionMetadata = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.metadataHash = new TimelineLong(snapshotRegistry);
         this.targetAssignmentEpoch = new TimelineInteger(snapshotRegistry);
         this.targetAssignment = new TimelineHashMap<>(snapshotRegistry, 0);
         this.currentActiveTaskToProcessId = new 
TimelineHashMap<>(snapshotRegistry, 0);
@@ -280,7 +287,11 @@ public class StreamsGroup implements Group {
 
     public void setTopology(StreamsTopology topology) {
         this.topology.set(Optional.ofNullable(topology));
-        maybeUpdateConfiguredTopology();
+        maybeUpdateGroupState();
+    }
+
+    public void setConfiguredTopology(ConfiguredTopology configuredTopology) {
+        this.configuredTopology.set(Optional.ofNullable(configuredTopology));
         maybeUpdateGroupState();
     }
 
@@ -582,54 +593,47 @@ public class StreamsGroup implements Group {
     }
 
     /**
-     * @return An immutable map of partition metadata for each topic that are 
inputs for this streams group.
+     * @return The metadata hash.
      */
-    public Map<String, TopicMetadata> partitionMetadata() {
-        return Collections.unmodifiableMap(partitionMetadata);
+    public long metadataHash() {
+        return metadataHash.get();
     }
 
     /**
-     * Updates the partition metadata. This replaces the previous one.
+     * Updates the metadata hash.
      *
-     * @param partitionMetadata The new partition metadata.
+     * @param metadataHash The new metadata hash.
      */
-    public void setPartitionMetadata(
-        Map<String, TopicMetadata> partitionMetadata
-    ) {
-        this.partitionMetadata.clear();
-        this.partitionMetadata.putAll(partitionMetadata);
-        maybeUpdateConfiguredTopology();
-        maybeUpdateGroupState();
+    public void setMetadataHash(long metadataHash) {
+        this.metadataHash.set(metadataHash);
     }
 
     /**
-     * Computes the partition metadata based on the current topology and the 
current topics image.
+     * Computes the metadata hash based on the current topology and the 
current metadata image.
      *
-     * @param topicsImage The current metadata for all available topics.
-     * @param topology    The current metadata for the Streams topology
-     * @return An immutable map of partition metadata for each topic that the 
Streams topology is using (besides non-repartition sink topics)
-     */
-    public Map<String, TopicMetadata> computePartitionMetadata(
-        TopicsImage topicsImage,
+     * @param metadataImage  The current metadata image.
+     * @param topicHashCache The cache for the topic hashes.
+     * @param topology       The current metadata for the Streams topology
+     * @return The metadata hash.
+     */
+    public long computeMetadataHash(
+        MetadataImage metadataImage,
+        Map<String, Long> topicHashCache,
         StreamsTopology topology
     ) {
         Set<String> requiredTopicNames = topology.requiredTopics();
 
-        // Create the topic metadata for each subscribed topic.
-        Map<String, TopicMetadata> newPartitionMetadata = new 
HashMap<>(requiredTopicNames.size());
-
+        Map<String, Long> topicHash = new HashMap<>(requiredTopicNames.size());
         requiredTopicNames.forEach(topicName -> {
-            TopicImage topicImage = topicsImage.getTopic(topicName);
+            TopicImage topicImage = metadataImage.topics().getTopic(topicName);
             if (topicImage != null) {
-                newPartitionMetadata.put(topicName, new TopicMetadata(
-                    topicImage.id(),
-                    topicImage.name(),
-                    topicImage.partitions().size())
+                topicHash.put(
+                    topicName,
+                    topicHashCache.computeIfAbsent(topicName, k -> 
Utils.computeTopicHash(topicName, metadataImage))
                 );
             }
         });
-
-        return Collections.unmodifiableMap(newPartitionMetadata);
+        return Utils.computeGroupHash(topicHash);
     }
 
     /**
@@ -793,7 +797,6 @@ public class StreamsGroup implements Group {
             
records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(groupId(),
 memberId))
         );
 
-        
records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataTombstoneRecord(groupId()));
         
records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochTombstoneRecord(groupId()));
         
records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecordTombstone(groupId()));
     }
@@ -855,18 +858,6 @@ public class StreamsGroup implements Group {
         state.set(newState);
     }
 
-    private void maybeUpdateConfiguredTopology() {
-        if (topology.get().isPresent()) {
-            final StreamsTopology streamsTopology = topology.get().get();
-
-            log.info("[GroupId {}] Configuring the topology {}", groupId, 
streamsTopology);
-            
this.configuredTopology.set(Optional.of(InternalTopicManager.configureTopics(logContext,
 streamsTopology, partitionMetadata)));
-
-        } else {
-            configuredTopology.set(Optional.empty());
-        }
-    }
-
     /**
      * Updates the tasks process IDs based on the old and the new member.
      *
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java
index 4c1adeec839..7f8b504bab9 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java
@@ -24,6 +24,7 @@ import 
org.apache.kafka.coordinator.group.streams.assignor.MemberAssignment;
 import org.apache.kafka.coordinator.group.streams.assignor.TaskAssignor;
 import 
org.apache.kafka.coordinator.group.streams.assignor.TaskAssignorException;
 import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
+import org.apache.kafka.image.MetadataImage;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -75,9 +76,9 @@ public class TargetAssignmentBuilder {
     private Map<String, StreamsGroupMember> members = Map.of();
 
     /**
-     * The partition metadata.
+     * The metadata image.
      */
-    private Map<String, 
org.apache.kafka.coordinator.group.streams.TopicMetadata> partitionMetadata = 
Map.of();
+    private MetadataImage metadataImage = MetadataImage.EMPTY;
 
     /**
      * The existing target assignment.
@@ -157,15 +158,15 @@ public class TargetAssignmentBuilder {
     }
 
     /**
-     * Adds the partition metadata to use.
+     * Adds the metadata image to use.
      *
-     * @param partitionMetadata The partition metadata.
+     * @param metadataImage The metadata image.
      * @return This object.
      */
-    public TargetAssignmentBuilder withPartitionMetadata(
-        Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> 
partitionMetadata
+    public TargetAssignmentBuilder withMetadataImage(
+        MetadataImage metadataImage
     ) {
-        this.partitionMetadata = partitionMetadata;
+        this.metadataImage = metadataImage;
         return this;
     }
 
@@ -273,7 +274,7 @@ public class TargetAssignmentBuilder {
                     Collections.unmodifiableMap(memberSpecs),
                     assignmentConfigs
                 ),
-                new TopologyMetadata(partitionMetadata, 
topology.subtopologies().get())
+                new TopologyMetadata(metadataImage, 
topology.subtopologies().get())
             );
         } else {
             newGroupAssignment = new GroupAssignment(
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopicMetadata.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopicMetadata.java
index f4fa3dc7aa7..bd07156041f 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopicMetadata.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopicMetadata.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.coordinator.group.streams;
 
 import org.apache.kafka.common.Uuid;
-import 
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue;
 
 import java.util.Objects;
 
@@ -46,11 +45,4 @@ public record TopicMetadata(Uuid id, String name, int 
numPartitions) {
             throw new IllegalArgumentException("Number of partitions must be 
positive.");
         }
     }
-
-    public static TopicMetadata 
fromRecord(StreamsGroupPartitionMetadataValue.TopicMetadata record) {
-        return new TopicMetadata(
-            record.topicId(),
-            record.topicName(),
-            record.numPartitions());
-    }
 }
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopologyMetadata.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopologyMetadata.java
index d1119cfe011..0241083233b 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopologyMetadata.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopologyMetadata.java
@@ -18,10 +18,11 @@ package org.apache.kafka.coordinator.group.streams;
 
 import org.apache.kafka.coordinator.group.streams.assignor.TopologyDescriber;
 import org.apache.kafka.coordinator.group.streams.topics.ConfiguredSubtopology;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.TopicImage;
 
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.SortedMap;
@@ -31,25 +32,22 @@ import java.util.stream.Stream;
  * The topology metadata class is used by the {@link 
org.apache.kafka.coordinator.group.streams.assignor.TaskAssignor} to get topic 
and
  * partition metadata for the topology that the streams group using.
  *
- * @param topicMetadata  The topic Ids mapped to their corresponding {@link 
TopicMetadata} object, which contains topic and partition
- *                       metadata.
+ * @param metadataImage  The metadata image
  * @param subtopologyMap The configured subtopologies
  */
-public record TopologyMetadata(Map<String, TopicMetadata> topicMetadata, 
SortedMap<String, ConfiguredSubtopology> subtopologyMap) implements 
TopologyDescriber {
+public record TopologyMetadata(MetadataImage metadataImage, SortedMap<String, 
ConfiguredSubtopology> subtopologyMap) implements TopologyDescriber {
 
     public TopologyMetadata {
-        topicMetadata = 
Objects.requireNonNull(Collections.unmodifiableMap(topicMetadata));
+        metadataImage = Objects.requireNonNull(metadataImage);
         subtopologyMap = 
Objects.requireNonNull(Collections.unmodifiableSortedMap(subtopologyMap));
     }
 
     /**
-     * Map of topic names to topic metadata.
-     *
-     * @return The map of topic Ids to topic metadata.
+     * @return The metadata image in topology metadata.
      */
     @Override
-    public Map<String, TopicMetadata> topicMetadata() {
-        return this.topicMetadata;
+    public MetadataImage metadataImage() {
+        return this.metadataImage;
     }
 
     /**
@@ -90,7 +88,13 @@ public record TopologyMetadata(Map<String, TopicMetadata> 
topicMetadata, SortedM
         return Stream.concat(
             subtopology.sourceTopics().stream(),
             subtopology.repartitionSourceTopics().keySet().stream()
-        ).map(topic -> 
this.topicMetadata.get(topic).numPartitions()).max(Integer::compareTo).orElseThrow(
+        ).map(topic -> {
+            TopicImage topicImage = metadataImage.topics().getTopic(topic);
+            if (topicImage == null) {
+                throw new IllegalStateException("Topic " + topic + " not found 
in metadata image");
+            }
+            return topicImage.partitions().size();
+        }).max(Integer::compareTo).orElseThrow(
             () -> new IllegalStateException("Subtopology does not contain any 
source topics")
         );
     }
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopology.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopology.java
index 93e03050b4d..85e3ba53db4 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopology.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopology.java
@@ -31,6 +31,7 @@ import java.util.SortedMap;
  *
  * @param topologyEpoch               The epoch of the topology. Same as the 
topology epoch in the heartbeat request that last initialized
  *                                    the topology.
+ * @param metadataHash                The metadata hash of the group.
  * @param subtopologies               Contains the subtopologies that have 
been configured. This can be used by the task assignors, since it
  *                                    specifies the number of tasks available 
for every subtopology. Undefined if topology configuration
  *                                    failed.
@@ -41,6 +42,7 @@ import java.util.SortedMap;
  *                                    reported back to the client.
  */
 public record ConfiguredTopology(int topologyEpoch,
+                                 long metadataHash,
                                  Optional<SortedMap<String, 
ConfiguredSubtopology>> subtopologies,
                                  Map<String, CreatableTopic> 
internalTopicsToBeCreated,
                                  Optional<TopicConfigurationException> 
topicConfigurationException) {
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/EndpointToPartitionsManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/EndpointToPartitionsManager.java
index ea3eca20935..09876efd804 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/EndpointToPartitionsManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/EndpointToPartitionsManager.java
@@ -20,7 +20,8 @@ package org.apache.kafka.coordinator.group.streams.topics;
 import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
 import org.apache.kafka.coordinator.group.streams.StreamsGroup;
 import org.apache.kafka.coordinator.group.streams.StreamsGroupMember;
-import org.apache.kafka.coordinator.group.streams.TopicMetadata;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.TopicImage;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -36,14 +37,15 @@ public class EndpointToPartitionsManager {
 
     public static StreamsGroupHeartbeatResponseData.EndpointToPartitions 
endpointToPartitions(final StreamsGroupMember streamsGroupMember,
                                                                                
               final StreamsGroupHeartbeatResponseData.Endpoint 
responseEndpoint,
-                                                                               
               final StreamsGroup streamsGroup) {
+                                                                               
               final StreamsGroup streamsGroup,
+                                                                               
               final MetadataImage metadataImage) {
         StreamsGroupHeartbeatResponseData.EndpointToPartitions 
endpointToPartitions = new 
StreamsGroupHeartbeatResponseData.EndpointToPartitions();
         Map<String, Set<Integer>> activeTasks = 
streamsGroupMember.assignedTasks().activeTasks();
         Map<String, Set<Integer>> standbyTasks = 
streamsGroupMember.assignedTasks().standbyTasks();
         endpointToPartitions.setUserEndpoint(responseEndpoint);
         Map<String, ConfiguredSubtopology> configuredSubtopologies = 
streamsGroup.configuredTopology().flatMap(ConfiguredTopology::subtopologies).get();
-        List<StreamsGroupHeartbeatResponseData.TopicPartition> 
activeTopicPartitions = topicPartitions(activeTasks, configuredSubtopologies, 
streamsGroup.partitionMetadata());
-        List<StreamsGroupHeartbeatResponseData.TopicPartition> 
standbyTopicPartitions = topicPartitions(standbyTasks, configuredSubtopologies, 
streamsGroup.partitionMetadata());
+        List<StreamsGroupHeartbeatResponseData.TopicPartition> 
activeTopicPartitions = topicPartitions(activeTasks, configuredSubtopologies, 
metadataImage);
+        List<StreamsGroupHeartbeatResponseData.TopicPartition> 
standbyTopicPartitions = topicPartitions(standbyTasks, configuredSubtopologies, 
metadataImage);
         endpointToPartitions.setActivePartitions(activeTopicPartitions);
         endpointToPartitions.setStandbyPartitions(standbyTopicPartitions);
         return endpointToPartitions;
@@ -51,7 +53,7 @@ public class EndpointToPartitionsManager {
 
     private static List<StreamsGroupHeartbeatResponseData.TopicPartition> 
topicPartitions(final Map<String, Set<Integer>> tasks,
                                                                                
           final Map<String, ConfiguredSubtopology> configuredSubtopologies,
-                                                                               
           final Map<String, TopicMetadata> groupTopicMetadata) {
+                                                                               
           final MetadataImage metadataImage) {
         List<StreamsGroupHeartbeatResponseData.TopicPartition> 
topicPartitionsForTasks = new ArrayList<>();
         for (Map.Entry<String, Set<Integer>> taskEntry : tasks.entrySet()) {
             String subtopologyId = taskEntry.getKey();
@@ -60,7 +62,7 @@ public class EndpointToPartitionsManager {
             Set<String> repartitionSourceTopics = 
configuredSubtopology.repartitionSourceTopics().keySet();
             Set<String> allSourceTopic = new HashSet<>(sourceTopics);
             allSourceTopic.addAll(repartitionSourceTopics);
-            List<StreamsGroupHeartbeatResponseData.TopicPartition> 
topicPartitionList = topicPartitionListForTask(taskEntry.getValue(), 
allSourceTopic, groupTopicMetadata);
+            List<StreamsGroupHeartbeatResponseData.TopicPartition> 
topicPartitionList = topicPartitionListForTask(taskEntry.getValue(), 
allSourceTopic, metadataImage);
             topicPartitionsForTasks.addAll(topicPartitionList);
         }
         return topicPartitionsForTasks;
@@ -68,9 +70,13 @@ public class EndpointToPartitionsManager {
 
     private static List<StreamsGroupHeartbeatResponseData.TopicPartition> 
topicPartitionListForTask(final Set<Integer> taskSet,
                                                                                
                     final Set<String> topicNames,
-                                                                               
                     final Map<String, TopicMetadata> groupTopicMetadata) {
+                                                                               
                     final MetadataImage metadataImage) {
         return topicNames.stream().map(topic -> {
-            int numPartitionsForTopic = 
groupTopicMetadata.get(topic).numPartitions();
+            TopicImage topicImage = metadataImage.topics().getTopic(topic);
+            if (topicImage == null) {
+                throw new IllegalStateException("Topic " + topic + " not found 
in metadata image");
+            }
+            int numPartitionsForTopic = topicImage.partitions().size();
             StreamsGroupHeartbeatResponseData.TopicPartition tp = new 
StreamsGroupHeartbeatResponseData.TopicPartition();
             tp.setTopic(topic);
             List<Integer> tpPartitions = new ArrayList<>(taskSet);
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java
index 1d14d9a8477..490289c2c85 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java
@@ -22,7 +22,8 @@ import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCon
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
 import org.apache.kafka.coordinator.group.streams.StreamsTopology;
-import org.apache.kafka.coordinator.group.streams.TopicMetadata;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsImage;
 
 import org.slf4j.Logger;
 
@@ -46,17 +47,19 @@ import java.util.stream.Stream;
 public class InternalTopicManager {
 
     /**
-     * Configures the internal topics for the given topology. Given a topology 
and the topic metadata, this method determines the number of
+     * Configures the internal topics for the given topology. Given a topology 
and the topics image, this method determines the number of
      * partitions for all internal topics and returns a {@link 
ConfiguredTopology} object.
      *
-     * @param logContext    The log context.
-     * @param topology      The topology.
-     * @param topicMetadata The topic metadata.
+     * @param logContext   The log context.
+     * @param metadataHash The metadata hash of the group.
+     * @param topology     The topology.
+     * @param topicsImage  The topics image.
      * @return The configured topology.
      */
     public static ConfiguredTopology configureTopics(LogContext logContext,
+                                                     long metadataHash,
                                                      StreamsTopology topology,
-                                                     Map<String, 
TopicMetadata> topicMetadata) {
+                                                     TopicsImage topicsImage) {
         final Logger log = logContext.logger(InternalTopicManager.class);
         final Collection<StreamsGroupTopologyValue.Subtopology> subtopologies 
= topology.subtopologies().values();
 
@@ -70,10 +73,10 @@ public class InternalTopicManager {
         try {
             Optional<TopicConfigurationException> topicConfigurationException 
= Optional.empty();
 
-            throwOnMissingSourceTopics(topology, topicMetadata);
+            throwOnMissingSourceTopics(topology, topicsImage);
 
             Map<String, Integer> decidedPartitionCountsForInternalTopics =
-                decidePartitionCounts(logContext, topology, topicMetadata, 
copartitionGroupsBySubtopology, log);
+                decidePartitionCounts(logContext, topology, topicsImage, 
copartitionGroupsBySubtopology, log);
 
             final SortedMap<String, ConfiguredSubtopology> 
configuredSubtopologies =
                 subtopologies.stream()
@@ -86,7 +89,7 @@ public class InternalTopicManager {
                         TreeMap::new
                     ));
 
-            Map<String, CreatableTopic> internalTopicsToCreate = 
missingInternalTopics(configuredSubtopologies, topicMetadata);
+            Map<String, CreatableTopic> internalTopicsToCreate = 
missingInternalTopics(configuredSubtopologies, topology, topicsImage);
             if (!internalTopicsToCreate.isEmpty()) {
                 topicConfigurationException = 
Optional.of(TopicConfigurationException.missingInternalTopics(
                     "Internal topics are missing: " + 
internalTopicsToCreate.keySet()
@@ -99,6 +102,7 @@ public class InternalTopicManager {
 
             return new ConfiguredTopology(
                 topology.topologyEpoch(),
+                metadataHash,
                 Optional.of(configuredSubtopologies),
                 internalTopicsToCreate,
                 topicConfigurationException
@@ -109,6 +113,7 @@ public class InternalTopicManager {
                 topology.topologyEpoch(), e.toString());
             return new ConfiguredTopology(
                 topology.topologyEpoch(),
+                metadataHash,
                 Optional.empty(),
                 Map.of(),
                 Optional.of(e)
@@ -117,11 +122,11 @@ public class InternalTopicManager {
     }
 
     private static void throwOnMissingSourceTopics(final StreamsTopology 
topology,
-                                                   final Map<String, 
TopicMetadata> topicMetadata) {
+                                                   final TopicsImage 
topicsImage) {
         TreeSet<String> sortedMissingTopics = new TreeSet<>();
         for (StreamsGroupTopologyValue.Subtopology subtopology : 
topology.subtopologies().values()) {
             for (String sourceTopic : subtopology.sourceTopics()) {
-                if (!topicMetadata.containsKey(sourceTopic)) {
+                if (topicsImage.getTopic(sourceTopic) == null) {
                     sortedMissingTopics.add(sourceTopic);
                 }
             }
@@ -134,12 +139,12 @@ public class InternalTopicManager {
 
     private static Map<String, Integer> decidePartitionCounts(final LogContext 
logContext,
                                                               final 
StreamsTopology topology,
-                                                              final 
Map<String, TopicMetadata> topicMetadata,
+                                                              final 
TopicsImage topicsImage,
                                                               final 
Map<String, Collection<Set<String>>> copartitionGroupsBySubtopology,
                                                               final Logger 
log) {
         final Map<String, Integer> decidedPartitionCountsForInternalTopics = 
new HashMap<>();
         final Function<String, OptionalInt> topicPartitionCountProvider =
-            topic -> getPartitionCount(topicMetadata, topic, 
decidedPartitionCountsForInternalTopics);
+            topic -> getPartitionCount(topicsImage, topic, 
decidedPartitionCountsForInternalTopics);
         final RepartitionTopics repartitionTopics = new RepartitionTopics(
             logContext,
             topology.subtopologies().values(),
@@ -190,7 +195,8 @@ public class InternalTopicManager {
     }
 
     private static Map<String, CreatableTopic> 
missingInternalTopics(Map<String, ConfiguredSubtopology> subtopologyMap,
-                                                                     
Map<String, TopicMetadata> topicMetadata) {
+                                                                     
StreamsTopology topology,
+                                                                     
TopicsImage topicsImage) {
 
         final Map<String, CreatableTopic> topicsToCreate = new HashMap<>();
         for (ConfiguredSubtopology subtopology : subtopologyMap.values()) {
@@ -199,31 +205,34 @@ public class InternalTopicManager {
             subtopology.stateChangelogTopics().values()
                 .forEach(x -> topicsToCreate.put(x.name(), 
toCreatableTopic(x)));
         }
-        for (Map.Entry<String, TopicMetadata> topic : 
topicMetadata.entrySet()) {
-            final TopicMetadata existingTopic = topic.getValue();
-            final CreatableTopic expectedTopic = 
topicsToCreate.remove(topic.getKey());
+        for (String topic : topology.requiredTopics()) {
+            TopicImage topicImage = topicsImage.getTopic(topic);
+            if (topicImage == null) {
+                continue;
+            }
+            final CreatableTopic expectedTopic = topicsToCreate.remove(topic);
             if (expectedTopic != null) {
-                if (existingTopic.numPartitions() != 
expectedTopic.numPartitions()) {
-                    throw 
TopicConfigurationException.incorrectlyPartitionedTopics("Existing topic " + 
topic.getKey() + " has different"
-                        + " number of partitions: expected " + 
expectedTopic.numPartitions() + ", found " + existingTopic.numPartitions());
+                if (topicImage.partitions().size() != 
expectedTopic.numPartitions()) {
+                    throw 
TopicConfigurationException.incorrectlyPartitionedTopics("Existing topic " + 
topic + " has different"
+                        + " number of partitions: expected " + 
expectedTopic.numPartitions() + ", found " + topicImage.partitions().size());
                 }
             }
         }
         return topicsToCreate;
     }
 
-    private static OptionalInt getPartitionCount(Map<String, TopicMetadata> 
topicMetadata,
+    private static OptionalInt getPartitionCount(TopicsImage topicsImage,
                                                  String topic,
                                                  Map<String, Integer> 
decidedPartitionCountsForInternalTopics) {
-        final TopicMetadata metadata = topicMetadata.get(topic);
-        if (metadata == null) {
+        final TopicImage topicImage = topicsImage.getTopic(topic);
+        if (topicImage == null) {
             if (decidedPartitionCountsForInternalTopics.containsKey(topic)) {
                 return 
OptionalInt.of(decidedPartitionCountsForInternalTopics.get(topic));
             } else {
                 return OptionalInt.empty();
             }
         } else {
-            return OptionalInt.of(metadata.numPartitions());
+            return OptionalInt.of(topicImage.partitions().size());
         }
     }
 
diff --git 
a/group-coordinator/src/main/resources/common/message/StreamsGroupPartitionMetadataKey.json
 
b/group-coordinator/src/main/resources/common/message/StreamsGroupPartitionMetadataKey.json
deleted file mode 100644
index cb82e930a09..00000000000
--- 
a/group-coordinator/src/main/resources/common/message/StreamsGroupPartitionMetadataKey.json
+++ /dev/null
@@ -1,27 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements.  See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License.  You may obtain a copy of the License at
-//
-//    http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-// The streams rebalance protocol is in development. This schema is subject to 
non-backwards-compatible changes.
-{
-  "apiKey": 18,
-  "type": "coordinator-key",
-  "name": "StreamsGroupPartitionMetadataKey",
-  "validVersions": "0",
-  "flexibleVersions": "none",
-  "fields": [
-    { "name": "GroupId", "type": "string", "versions": "0",
-      "about": "The group ID." }
-  ]
-}
diff --git 
a/group-coordinator/src/main/resources/common/message/StreamsGroupPartitionMetadataValue.json
 
b/group-coordinator/src/main/resources/common/message/StreamsGroupPartitionMetadataValue.json
deleted file mode 100644
index f9be55b9e42..00000000000
--- 
a/group-coordinator/src/main/resources/common/message/StreamsGroupPartitionMetadataValue.json
+++ /dev/null
@@ -1,34 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements.  See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License.  You may obtain a copy of the License at
-//
-//    http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-// The streams rebalance protocol is in development. This schema is subject to 
non-backwards-compatible changes.
-{
-  "apiKey": 18,
-  "type": "coordinator-value",
-  "name": "StreamsGroupPartitionMetadataValue",
-  "validVersions": "0",
-  "flexibleVersions": "0+",
-  "fields": [
-    { "name": "Topics", "versions": "0+", "type": "[]TopicMetadata",
-      "about": "The list of topic metadata.", "fields": [
-      { "name": "TopicId", "versions": "0+", "type": "uuid",
-        "about": "The topic ID." },
-      { "name": "TopicName", "versions": "0+", "type": "string",
-        "about": "The topic name." },
-      { "name": "NumPartitions", "versions": "0+", "type": "int32",
-        "about": "The number of partitions of the topic." }
-    ]}
-  ]
-}
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
index a90ffd73a04..d58c6b6b1ac 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
@@ -77,8 +77,6 @@ import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKe
 import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
 import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey;
 import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue;
-import 
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey;
-import 
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue;
 import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey;
 import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue;
 import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey;
@@ -1006,60 +1004,6 @@ public class GroupCoordinatorShardTest {
         verify(groupMetadataManager).replay(key, null);
     }
 
-    @Test
-    public void testReplayStreamsGroupPartitionMetadata() {
-        GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
-        OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
-        CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
-        CoordinatorMetricsShard metricsShard = 
mock(CoordinatorMetricsShard.class);
-        GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
-            new LogContext(),
-            groupMetadataManager,
-            offsetMetadataManager,
-            Time.SYSTEM,
-            new MockCoordinatorTimer<>(Time.SYSTEM),
-            mock(GroupCoordinatorConfig.class),
-            coordinatorMetrics,
-            metricsShard
-        );
-
-        StreamsGroupPartitionMetadataKey key = new 
StreamsGroupPartitionMetadataKey();
-        StreamsGroupPartitionMetadataValue value = new 
StreamsGroupPartitionMetadataValue();
-
-        coordinator.replay(0L, RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, CoordinatorRecord.record(
-            key,
-            new ApiMessageAndVersion(value, (short) 0)
-        ));
-
-        verify(groupMetadataManager).replay(key, value);
-    }
-
-    @Test
-    public void testReplayStreamsGroupPartitionMetadataWithNullValue() {
-        GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
-        OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
-        CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
-        CoordinatorMetricsShard metricsShard = 
mock(CoordinatorMetricsShard.class);
-        GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
-            new LogContext(),
-            groupMetadataManager,
-            offsetMetadataManager,
-            Time.SYSTEM,
-            new MockCoordinatorTimer<>(Time.SYSTEM),
-            mock(GroupCoordinatorConfig.class),
-            coordinatorMetrics,
-            metricsShard
-        );
-
-        StreamsGroupPartitionMetadataKey key = new 
StreamsGroupPartitionMetadataKey();
-
-        coordinator.replay(0L, RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, CoordinatorRecord.tombstone(
-            key
-        ));
-
-        verify(groupMetadataManager).replay(key, null);
-    }
-
     @Test
     public void testReplayStreamsGroupMemberMetadata() {
         GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index fc95e78e9eb..55cc5f7e48a 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -125,6 +125,7 @@ import 
org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.TaskRol
 import org.apache.kafka.coordinator.group.streams.TasksTuple;
 import org.apache.kafka.coordinator.group.streams.assignor.TaskAssignor;
 import 
org.apache.kafka.coordinator.group.streams.assignor.TaskAssignorException;
+import org.apache.kafka.coordinator.group.streams.topics.InternalTopicManager;
 import org.apache.kafka.image.MetadataDelta;
 import org.apache.kafka.image.MetadataImage;
 import org.apache.kafka.image.MetadataProvenance;
@@ -16030,7 +16031,6 @@ public class GroupMetadataManagerTest {
                     .build())
                 .withTargetAssignmentEpoch(10)
                 .withTopology(StreamsTopology.fromHeartbeatRequest(topology))
-                .withPartitionMetadata(Map.of())
             )
             .build();
 
@@ -16066,12 +16066,17 @@ public class GroupMetadataManagerTest {
         ));
 
         MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+        MetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 6)
+            .addTopic(barTopicId, barTopicName, 3)
+            .build();
+        long groupMetadataHash = computeGroupHash(Map.of(
+            fooTopicName, computeTopicHash(fooTopicName, metadataImage),
+            barTopicName, computeTopicHash(barTopicName, metadataImage)
+        ));
         GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
             .withStreamsGroupTaskAssignors(List.of(assignor))
-            .withMetadataImage(new MetadataImageBuilder()
-                .addTopic(fooTopicId, fooTopicName, 6)
-                .addTopic(barTopicId, barTopicName, 3)
-                .build())
+            .withMetadataImage(metadataImage)
             .build();
 
         assignor.prepareGroupAssignment(Map.of(memberId, 
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
@@ -16127,11 +16132,7 @@ public class GroupMetadataManagerTest {
         List<CoordinatorRecord> expectedRecords = List.of(
             
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, 
expectedMember),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, 
topology),
-            
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId, 
Map.of(
-                fooTopicName, new 
org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, 
fooTopicName, 6),
-                barTopicName, new 
org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId, 
barTopicName, 3)
-            )),
-            
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1, 0),
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1, 
groupMetadataHash),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, 
memberId,
                 TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
                     TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 
4, 5),
@@ -16158,12 +16159,14 @@ public class GroupMetadataManagerTest {
             new 
Subtopology().setSubtopologyId(subtopology2).setSourceTopics(List.of(barTopicName))
         ));
 
+        MetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 6)
+            .build();
+
         MockTaskAssignor assignor = new MockTaskAssignor("sticky");
         GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
             .withStreamsGroupTaskAssignors(List.of(assignor))
-            .withMetadataImage(new MetadataImageBuilder()
-                .addTopic(fooTopicId, fooTopicName, 6)
-                .build())
+            .withMetadataImage(metadataImage)
             .build();
 
         // Member joins the streams group.
@@ -16210,12 +16213,9 @@ public class GroupMetadataManagerTest {
         List<CoordinatorRecord> expectedRecords = List.of(
             
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, 
expectedMember),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, 
topology),
-            
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId,
-                Map.of(
-                    fooTopicName, new 
org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, 
fooTopicName, 6)
-                )
-            ),
-            
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1, 0),
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1, 
computeGroupHash(Map.of(
+                fooTopicName, computeTopicHash(fooTopicName, metadataImage)
+            ))),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, 
memberId, TasksTuple.EMPTY),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
 1),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, 
expectedMember)
@@ -16240,12 +16240,14 @@ public class GroupMetadataManagerTest {
             )
         );
 
+        MetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 6)
+            .build();
+
         MockTaskAssignor assignor = new MockTaskAssignor("sticky");
         GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
             .withStreamsGroupTaskAssignors(List.of(assignor))
-            .withMetadataImage(new MetadataImageBuilder()
-                .addTopic(fooTopicId, fooTopicName, 6)
-                .build())
+            .withMetadataImage(metadataImage)
             .build();
 
         // Member joins the streams group.
@@ -16297,10 +16299,9 @@ public class GroupMetadataManagerTest {
         List<CoordinatorRecord> expectedRecords = List.of(
             
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, 
expectedMember),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, 
topology),
-            
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId, 
Map.of(
-                fooTopicName, new 
org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, 
fooTopicName, 6)
-            )),
-            
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1, 0),
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1, 
computeGroupHash(Map.of(
+                fooTopicName, computeTopicHash(fooTopicName, metadataImage)
+            ))),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, 
memberId, TasksTuple.EMPTY),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
 1),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, 
expectedMember)
@@ -16326,13 +16327,14 @@ public class GroupMetadataManagerTest {
             )
         );
 
+        MetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 6)
+            .addTopic(barTopicId, barTopicName, 3)
+            .build();
         MockTaskAssignor assignor = new MockTaskAssignor("sticky");
         GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
             .withStreamsGroupTaskAssignors(List.of(assignor))
-            .withMetadataImage(new MetadataImageBuilder()
-                .addTopic(fooTopicId, fooTopicName, 6)
-                .addTopic(barTopicId, barTopicName, 3)
-                .build())
+            .withMetadataImage(metadataImage)
             .build();
 
         // Member joins the streams group.
@@ -16379,11 +16381,10 @@ public class GroupMetadataManagerTest {
         List<CoordinatorRecord> expectedRecords = List.of(
             
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, 
expectedMember),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, 
topology),
-            
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId, 
Map.of(
-                fooTopicName, new 
org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, 
fooTopicName, 6),
-                barTopicName, new 
org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId, 
barTopicName, 3)
-            )),
-            
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1, 0),
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1, 
computeGroupHash(Map.of(
+                fooTopicName, computeTopicHash(fooTopicName, metadataImage),
+                barTopicName, computeTopicHash(barTopicName, metadataImage)
+            ))),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, 
memberId, TasksTuple.EMPTY),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
 1),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, 
expectedMember)
@@ -16414,13 +16415,14 @@ public class GroupMetadataManagerTest {
             )
         );
 
+        MetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 6)
+            .addTopic(barTopicId, barTopicName, 3)
+            .build();
         MockTaskAssignor assignor = new MockTaskAssignor("sticky");
         GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
             .withStreamsGroupTaskAssignors(List.of(assignor))
-            .withMetadataImage(new MetadataImageBuilder()
-                .addTopic(fooTopicId, fooTopicName, 6)
-                .addTopic(barTopicId, barTopicName, 3)
-                .build())
+            .withMetadataImage(metadataImage)
             .withStreamsGroup(
                 new StreamsGroupBuilder(groupId, 10)
                     
.withTopology(StreamsTopology.fromHeartbeatRequest(topology1))
@@ -16474,11 +16476,10 @@ public class GroupMetadataManagerTest {
 
         List<CoordinatorRecord> expectedRecords = List.of(
             
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, 
expectedMember),
-            
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId, 
Map.of(
-                fooTopicName, new 
org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, 
fooTopicName, 6),
-                barTopicName, new 
org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId, 
barTopicName, 3)
-            )),
-            
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, 0),
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, 
computeGroupHash(Map.of(
+                fooTopicName, computeTopicHash(fooTopicName, metadataImage),
+                barTopicName, computeTopicHash(barTopicName, metadataImage)
+            ))),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, 
memberId, TasksTuple.EMPTY),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
 11),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, 
expectedMember)
@@ -16499,12 +16500,15 @@ public class GroupMetadataManagerTest {
             new 
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
         ));
 
+        MetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 6)
+            .build();
+        long groupMetadataHash = computeGroupHash(Map.of(fooTopicName, 
computeTopicHash(fooTopicName, metadataImage)));
+
         MockTaskAssignor assignor = new MockTaskAssignor("sticky");
         GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
             .withStreamsGroupTaskAssignors(List.of(assignor))
-            .withMetadataImage(new MetadataImageBuilder()
-                .addTopic(fooTopicId, fooTopicName, 6)
-                .build())
+            .withMetadataImage(metadataImage)
             .withStreamsGroup(new StreamsGroupBuilder(groupId, 10)
                 .withMember(streamsGroupMemberBuilderWithDefaults(memberId1)
                     
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
@@ -16526,9 +16530,7 @@ public class GroupMetadataManagerTest {
                     TaskAssignmentTestUtil.mkTasks(subtopology1, 3, 4, 5)))
                 .withTargetAssignmentEpoch(10)
                 .withTopology(StreamsTopology.fromHeartbeatRequest(topology))
-                .withPartitionMetadata(Map.of(
-                    fooTopicName, new 
org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, 
fooTopicName, 6)
-                ))
+                .withMetadataHash(groupMetadataHash)
             )
             .build();
 
@@ -16593,12 +16595,14 @@ public class GroupMetadataManagerTest {
             new 
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
         ));
 
+        MetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 2)
+            .build();
+
         MockTaskAssignor assignor = new MockTaskAssignor("sticky");
         GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
             .withStreamsGroupTaskAssignors(List.of(assignor))
-            .withMetadataImage(new MetadataImageBuilder()
-                .addTopic(fooTopicId, fooTopicName, 2)
-                .build())
+            .withMetadataImage(metadataImage)
             .withStreamsGroup(new StreamsGroupBuilder(groupId, 10)
                 .withMember(streamsGroupMemberBuilderWithDefaults(memberId1)
                     
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
@@ -16612,9 +16616,7 @@ public class GroupMetadataManagerTest {
                     .build())
                 .withTargetAssignmentEpoch(10)
                 .withTopology(StreamsTopology.fromHeartbeatRequest(topology))
-                .withPartitionMetadata(Map.of(
-                    fooTopicName, new 
org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, 
fooTopicName, 6)
-                ))
+                .withMetadataHash(computeGroupHash(Map.of(fooTopicName, 
computeTopicHash(fooTopicName, metadataImage))))
             )
             .build();
 
@@ -16688,13 +16690,19 @@ public class GroupMetadataManagerTest {
             new 
Subtopology().setSubtopologyId(subtopology2).setSourceTopics(List.of(barTopicName))
         ));
 
+        MetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 6)
+            .addTopic(barTopicId, barTopicName, 3)
+            .build();
+        long groupMetadataHash = computeGroupHash(Map.of(
+            fooTopicName, computeTopicHash(fooTopicName, metadataImage),
+            barTopicName, computeTopicHash(barTopicName, metadataImage)
+        ));
+
         MockTaskAssignor assignor = new MockTaskAssignor("sticky");
         GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
             .withStreamsGroupTaskAssignors(List.of(assignor))
-            .withMetadataImage(new MetadataImageBuilder()
-                .addTopic(fooTopicId, fooTopicName, 6)
-                .addTopic(barTopicId, barTopicName, 3)
-                .build())
+            .withMetadataImage(metadataImage)
             .withStreamsGroup(new StreamsGroupBuilder(groupId, 10)
                 .withMember(streamsGroupMemberBuilderWithDefaults(memberId)
                     
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
@@ -16707,10 +16715,7 @@ public class GroupMetadataManagerTest {
                     TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 
4, 5)))
                 .withTargetAssignmentEpoch(10)
                 .withTopology(StreamsTopology.fromHeartbeatRequest(topology))
-                .withPartitionMetadata(Map.of(
-                    fooTopicName, new 
org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, 
fooTopicName, 6),
-                    barTopicName, new 
org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId, 
barTopicName, 3)
-                ))
+                .withMetadataHash(groupMetadataHash)
             )
             .build();
 
@@ -16759,7 +16764,7 @@ public class GroupMetadataManagerTest {
 
         List<CoordinatorRecord> expectedRecords = List.of(
             
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, 
expectedMember),
-            
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, 0),
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, 
groupMetadataHash),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, 
memberId,
                 TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
                     TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 
4, 5),
@@ -16788,13 +16793,24 @@ public class GroupMetadataManagerTest {
             new 
Subtopology().setSubtopologyId(subtopology2).setSourceTopics(List.of(barTopicName))
         ));
 
+        MetadataImage newMetadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 6)
+            .addTopic(barTopicId, barTopicName, changedPartitionCount)
+            .build();
+
+        MetadataImage oldMetadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 6)
+            .addTopic(barTopicId, barTopicName, 3)
+            .build();
+        long oldGroupMetadataHash = computeGroupHash(Map.of(
+            fooTopicName, computeTopicHash(fooTopicName, oldMetadataImage),
+            barTopicName, computeTopicHash(barTopicName, oldMetadataImage)
+        ));
+
         MockTaskAssignor assignor = new MockTaskAssignor("sticky");
         GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
             .withStreamsGroupTaskAssignors(List.of(assignor))
-            .withMetadataImage(new MetadataImageBuilder()
-                .addTopic(fooTopicId, fooTopicName, 6)
-                .addTopic(barTopicId, barTopicName, changedPartitionCount)
-                .build())
+            .withMetadataImage(newMetadataImage)
             .withStreamsGroup(new StreamsGroupBuilder(groupId, 10)
                 .withMember(streamsGroupMemberBuilderWithDefaults(memberId)
                     
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
@@ -16807,10 +16823,7 @@ public class GroupMetadataManagerTest {
                     TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 
4, 5)))
                 .withTargetAssignmentEpoch(10)
                 .withTopology(StreamsTopology.fromHeartbeatRequest(topology))
-                .withPartitionMetadata(Map.of(
-                    fooTopicName, new 
org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, 
fooTopicName, 6),
-                    barTopicName, new 
org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId, 
barTopicName, 3)
-                ))
+                .withMetadataHash(oldGroupMetadataHash)
             )
             .build();
 
@@ -16857,11 +16870,10 @@ public class GroupMetadataManagerTest {
             .build();
 
         List<CoordinatorRecord> expectedRecords = List.of(
-            
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId, 
Map.of(
-                fooTopicName, new 
org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, 
fooTopicName, 6),
-                barTopicName, new 
org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId, 
barTopicName, changedPartitionCount)
-            )),
-            
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, 0),
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, 
computeGroupHash(Map.of(
+                fooTopicName, computeTopicHash(fooTopicName, newMetadataImage),
+                barTopicName, computeTopicHash(barTopicName, newMetadataImage)
+            ))),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, 
memberId,
                 TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
                     TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 
4, 5),
@@ -16891,6 +16903,11 @@ public class GroupMetadataManagerTest {
             new 
Subtopology().setSubtopologyId(subtopology2).setSourceTopics(List.of(barTopicName))
         ));
 
+        MetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 6)
+            .addTopic(barTopicId, barTopicName, 3)
+            .build();
+
         MockTaskAssignor assignor = new MockTaskAssignor("sticky");
         GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
             .withStreamsGroupTaskAssignors(List.of(assignor))
@@ -16922,10 +16939,10 @@ public class GroupMetadataManagerTest {
                     TaskAssignmentTestUtil.mkTasks(subtopology2, 2)))
                 .withTargetAssignmentEpoch(10)
                 .withTopology(StreamsTopology.fromHeartbeatRequest(topology))
-                .withPartitionMetadata(Map.of(
-                    fooTopicName, new 
org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, 
fooTopicName, 6),
-                    barTopicName, new 
org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId, 
barTopicName, 3)
-                ))
+                .withMetadataHash(computeGroupHash(Map.of(
+                    fooTopicName, computeTopicHash(fooTopicName, 
metadataImage),
+                    barTopicName, computeTopicHash(barTopicName, metadataImage)
+                )))
             )
             .build();
 
@@ -17128,13 +17145,19 @@ public class GroupMetadataManagerTest {
             new 
Subtopology().setSubtopologyId(subtopology2).setSourceTopics(List.of(barTopicName))
         ));
 
+        MetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 6)
+            .addTopic(barTopicId, barTopicName, 3)
+            .build();
+        long groupMetadataHash = computeGroupHash(Map.of(
+            fooTopicName, computeTopicHash(fooTopicName, metadataImage),
+            barTopicName, computeTopicHash(barTopicName, metadataImage)
+        ));
+
         MockTaskAssignor assignor = new MockTaskAssignor("sticky");
         GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
             .withStreamsGroupTaskAssignors(List.of(assignor))
-            .withMetadataImage(new MetadataImageBuilder()
-                .addTopic(fooTopicId, fooTopicName, 6)
-                .addTopic(barTopicId, barTopicName, 3)
-                .build())
+            .withMetadataImage(metadataImage)
             .withStreamsGroup(new StreamsGroupBuilder(groupId, 10)
                 .withMember(streamsGroupMemberBuilderWithDefaults(memberId1)
                     .setMemberEpoch(10)
@@ -17158,10 +17181,7 @@ public class GroupMetadataManagerTest {
                     TaskAssignmentTestUtil.mkTasks(subtopology1, 3, 4, 5),
                     TaskAssignmentTestUtil.mkTasks(subtopology2, 2)))
                 .withTargetAssignmentEpoch(10)
-                .withPartitionMetadata(Map.of(
-                    fooTopicName, new 
org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, 
fooTopicName, 6),
-                    barTopicName, new 
org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId, 
barTopicName, 3)
-                ))
+                .withMetadataHash(groupMetadataHash)
             )
             .build();
 
@@ -17582,9 +17602,19 @@ public class GroupMetadataManagerTest {
             new 
Subtopology().setSubtopologyId(subtopology2).setSourceTopics(List.of(barTopicName))
         ));
 
+        MetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 6)
+            .addTopic(barTopicId, barTopicName, 3)
+            .build();
+        long groupMetadataHash = computeGroupHash(Map.of(
+            fooTopicName, computeTopicHash(fooTopicName, metadataImage),
+            barTopicName, computeTopicHash(barTopicName, metadataImage)
+        ));
+
         MockTaskAssignor assignor = new MockTaskAssignor("sticky");
         GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
             .withStreamsGroupTaskAssignors(List.of(assignor))
+            .withMetadataImage(metadataImage)
             .withStreamsGroup(new StreamsGroupBuilder(groupId, 10))
             .build();
 
@@ -17593,16 +17623,16 @@ public class GroupMetadataManagerTest {
         
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId,
 topology));
         
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
 streamsGroupMemberBuilderWithDefaults(memberId1)
             .build()));
-        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId,
 11, 0));
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId,
 11, groupMetadataHash));
 
         assertEquals(StreamsGroupState.NOT_READY, 
context.streamsGroupState(groupId));
 
-        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId,
-            Map.of(
-                fooTopicName, new 
org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, 
fooTopicName, 6),
-                barTopicName, new 
org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId, 
barTopicName, 3)
-            )
-        ));
+        context.groupMetadataManager.getStreamsGroupOrThrow(groupId)
+            .setConfiguredTopology(InternalTopicManager.configureTopics(
+                new LogContext(),
+                groupMetadataHash,
+                
StreamsTopology.fromRecord(StreamsCoordinatorRecordHelpers.convertToStreamsGroupTopologyRecord(topology)),
+                metadataImage.topics()));
 
         assertEquals(StreamsGroup.StreamsGroupState.ASSIGNING, 
context.streamsGroupState(groupId));
 
@@ -17688,12 +17718,14 @@ public class GroupMetadataManagerTest {
             new 
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
         ));
 
+        MetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 6)
+            .build();
+
         MockTaskAssignor assignor = new MockTaskAssignor("sticky");
         GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
             .withStreamsGroupTaskAssignors(List.of(assignor))
-            .withMetadataImage(new MetadataImageBuilder()
-                .addTopic(fooTopicId, fooTopicName, 6)
-                .build())
+            .withMetadataImage(metadataImage)
             .withStreamsGroup(new StreamsGroupBuilder(groupId, 10)
                 .withMember(streamsGroupMemberBuilderWithDefaults(memberId)
                     .setMemberEpoch(10)
@@ -17705,11 +17737,11 @@ public class GroupMetadataManagerTest {
                 .withTargetAssignment(memberId, 
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
                     TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2)))
                 .withTargetAssignmentEpoch(10)
-                .withPartitionMetadata(
+                .withMetadataHash(computeGroupHash(Map.of(
                     // foo only has 3 tasks stored in the metadata but foo has
                     // 6 partitions the metadata image.
-                    Map.of(fooTopicName, new 
org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, 
fooTopicName, 3))
-                ))
+                    fooTopicName, computeTopicHash(fooTopicName, new 
MetadataImageBuilder().addTopic(fooTopicId, fooTopicName, 3).build())
+                ))))
             .build();
 
         // The metadata refresh flag should be true.
@@ -17753,10 +17785,9 @@ public class GroupMetadataManagerTest {
             .build();
 
         List<CoordinatorRecord> expectedRecords = List.of(
-            
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId,
-                Map.of(fooTopicName, new 
org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, 
fooTopicName, 6))
-            ),
-            
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, 0),
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, 
computeGroupHash(Map.of(
+                fooTopicName, computeTopicHash(fooTopicName, metadataImage)
+            ))),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, 
memberId,
                 TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
                     TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 
4, 5)
@@ -17784,12 +17815,14 @@ public class GroupMetadataManagerTest {
             new 
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
         ));
 
+        MetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 6)
+            .build();
+
         MockTaskAssignor assignor = new MockTaskAssignor("sticky");
         GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
             .withStreamsGroupTaskAssignors(List.of(assignor))
-            .withMetadataImage(new MetadataImageBuilder()
-                .addTopic(fooTopicId, fooTopicName, 6)
-                .build())
+            .withMetadataImage(metadataImage)
             .withStreamsGroup(new StreamsGroupBuilder(groupId, 10)
                 .withMember(streamsGroupMemberBuilderWithDefaults(memberId)
                     .setMemberEpoch(10)
@@ -17801,11 +17834,11 @@ public class GroupMetadataManagerTest {
                 .withTargetAssignment(memberId, 
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
                     TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2)))
                 .withTargetAssignmentEpoch(10)
-                .withPartitionMetadata(
+                .withMetadataHash(computeGroupHash(Map.of(
                     // foo only has 3 partitions stored in the metadata but 
foo has
                     // 6 partitions the metadata image.
-                    Map.of(fooTopicName, new 
org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, 
fooTopicName, 3))
-                ))
+                    fooTopicName, computeTopicHash(fooTopicName, new 
MetadataImageBuilder().addTopic(fooTopicId, fooTopicName, 3).build())
+                ))))
             .build();
 
         // The metadata refresh flag should be true.
@@ -17870,10 +17903,9 @@ public class GroupMetadataManagerTest {
             .build();
 
         List<CoordinatorRecord> expectedRecords = List.of(
-            
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId,
-                Map.of(fooTopicName, new 
org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, 
fooTopicName, 6))
-            ),
-            
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, 0),
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, 
computeGroupHash(Map.of(
+                fooTopicName, computeTopicHash(fooTopicName, metadataImage)
+            ))),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, 
memberId,
                 TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
                     TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 
4, 5)
@@ -19156,45 +19188,6 @@ public class GroupMetadataManagerTest {
         assertThrows(GroupIdNotFoundException.class, () -> 
context.groupMetadataManager.streamsGroup("foo"));
     }
 
-    @Test
-    public void testReplayStreamsGroupPartitionMetadata() {
-        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
-            .build();
-
-        Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> 
metadata = Map.of(
-            "bar",
-            new 
org.apache.kafka.coordinator.group.streams.TopicMetadata(Uuid.randomUuid(), 
"bar", 10)
-        );
-
-        // The group is created if it does not exist.
-        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord("foo",
 metadata));
-        assertEquals(metadata, 
context.groupMetadataManager.streamsGroup("foo").partitionMetadata());
-    }
-
-    @Test
-    public void testReplayStreamsGroupPartitionMetadataTombstoneNotExisting() {
-        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
-            .build();
-
-        // The group may not exist at all. Replaying the 
StreamsGroupPartitionMetadata tombstone
-        // should be a no-op.
-        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataTombstoneRecord("foo"));
-        assertThrows(GroupIdNotFoundException.class, () -> 
context.groupMetadataManager.streamsGroup("foo"));
-    }
-
-    @Test
-    public void testReplayStreamsGroupPartitionMetadataTombstoneExisting() {
-        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
-            .withStreamsGroup(new StreamsGroupBuilder("foo", 
10).withPartitionMetadata(
-                Map.of("topic1", new 
org.apache.kafka.coordinator.group.streams.TopicMetadata(Uuid.randomUuid(), 
"topic1", 10))
-            ))
-            .build();
-
-        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataTombstoneRecord("foo"));
-
-        
assertTrue(context.groupMetadataManager.streamsGroup("foo").partitionMetadata().isEmpty());
-    }
-
     @Test
     public void testReplayStreamsGroupTargetAssignmentMember() {
         GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
index d1bbc012717..c71225f86fd 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
@@ -97,8 +97,6 @@ import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKe
 import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
 import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey;
 import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue;
-import 
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey;
-import 
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue;
 import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey;
 import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue;
 import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey;
@@ -118,6 +116,7 @@ import 
org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult;
 import org.apache.kafka.coordinator.group.streams.StreamsGroupMember;
 import org.apache.kafka.coordinator.group.streams.TasksTuple;
 import org.apache.kafka.coordinator.group.streams.assignor.TaskAssignor;
+import org.apache.kafka.coordinator.group.streams.topics.InternalTopicManager;
 import org.apache.kafka.image.MetadataImage;
 import org.apache.kafka.server.authorizer.Authorizer;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
@@ -558,7 +557,18 @@ public class GroupMetadataManagerTestContext {
 
             consumerGroupBuilders.forEach(builder -> 
builder.build().forEach(context::replay));
             shareGroupBuilders.forEach(builder -> 
builder.build(metadataImage.topics()).forEach(context::replay));
-            streamsGroupBuilders.forEach(builder -> 
builder.build().forEach(context::replay));
+            streamsGroupBuilders.forEach(builder -> {
+                builder.build().forEach(context::replay);
+                StreamsGroup group = 
context.groupMetadataManager.getStreamsGroupOrThrow(builder.groupId());
+                if (group.topology().isPresent()) {
+                    
group.setConfiguredTopology(InternalTopicManager.configureTopics(
+                        new LogContext(),
+                        0,
+                        group.topology().get(),
+                        metadataImage.topics())
+                    );
+                }
+            });
 
             context.commit();
 
@@ -1744,13 +1754,6 @@ public class GroupMetadataManagerTestContext {
                 );
                 break;
 
-            case STREAMS_GROUP_PARTITION_METADATA:
-                groupMetadataManager.replay(
-                    (StreamsGroupPartitionMetadataKey) key,
-                    (StreamsGroupPartitionMetadataValue) messageOrNull(value)
-                );
-                break;
-
             case STREAMS_GROUP_TARGET_ASSIGNMENT_MEMBER:
                 groupMetadataManager.replay(
                     (StreamsGroupTargetAssignmentMemberKey) key,
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java
index 0509b74d330..2485cb65e6f 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.coordinator.group.streams;
 
-import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
 import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
 import 
org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentKey;
@@ -26,8 +25,6 @@ import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataVa
 import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue.Endpoint;
 import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey;
 import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue;
-import 
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey;
-import 
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue;
 import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey;
 import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue;
 import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue.TaskIds;
@@ -252,47 +249,6 @@ class StreamsCoordinatorRecordHelpersTest {
         assertEquals(expectedRecord, 
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(GROUP_ID, 
MEMBER_ID));
     }
 
-    @Test
-    public void testNewStreamsGroupPartitionMetadataRecord() {
-        Uuid uuid1 = Uuid.randomUuid();
-        Uuid uuid2 = Uuid.randomUuid();
-        Map<String, TopicMetadata> newPartitionMetadata = Map.of(
-            TOPIC_1, new TopicMetadata(uuid1, TOPIC_1, 1),
-            TOPIC_2, new TopicMetadata(uuid2, TOPIC_2, 2)
-        );
-
-        StreamsGroupPartitionMetadataValue value = new 
StreamsGroupPartitionMetadataValue();
-        value.topics().add(new 
StreamsGroupPartitionMetadataValue.TopicMetadata()
-            .setTopicId(uuid1)
-            .setTopicName(TOPIC_1)
-            .setNumPartitions(1)
-        );
-        value.topics().add(new 
StreamsGroupPartitionMetadataValue.TopicMetadata()
-            .setTopicId(uuid2)
-            .setTopicName(TOPIC_2)
-            .setNumPartitions(2)
-        );
-
-        CoordinatorRecord expectedRecord = CoordinatorRecord.record(
-            new StreamsGroupPartitionMetadataKey()
-                .setGroupId(GROUP_ID),
-            new ApiMessageAndVersion(value, (short) 0)
-        );
-
-        assertEquals(expectedRecord,
-            
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(GROUP_ID,
 newPartitionMetadata));
-    }
-
-    @Test
-    public void testNewStreamsGroupPartitionMetadataTombstoneRecord() {
-        CoordinatorRecord expectedRecord = CoordinatorRecord.tombstone(
-            new StreamsGroupPartitionMetadataKey()
-                .setGroupId(GROUP_ID)
-        );
-
-        assertEquals(expectedRecord, 
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataTombstoneRecord(GROUP_ID));
-    }
-
     @Test
     public void testNewStreamsGroupEpochRecord() {
         CoordinatorRecord expectedRecord = CoordinatorRecord.record(
@@ -717,27 +673,6 @@ class StreamsCoordinatorRecordHelpersTest {
         assertEquals("memberId should not be null here", 
exception.getMessage());
     }
 
-    @Test
-    public void testNewStreamsGroupPartitionMetadataRecordNullGroupId() {
-        NullPointerException exception = 
assertThrows(NullPointerException.class, () ->
-            
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(null, 
Map.of()));
-        assertEquals("groupId should not be null here", 
exception.getMessage());
-    }
-
-    @Test
-    public void 
testNewStreamsGroupPartitionMetadataRecordNullNewPartitionMetadata() {
-        NullPointerException exception = 
assertThrows(NullPointerException.class, () ->
-            
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord("groupId",
 null));
-        assertEquals("newPartitionMetadata should not be null here", 
exception.getMessage());
-    }
-
-    @Test
-    public void 
testNewStreamsGroupPartitionMetadataTombstoneRecordNullGroupId() {
-        NullPointerException exception = 
assertThrows(NullPointerException.class, () ->
-            
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataTombstoneRecord(null));
-        assertEquals("groupId should not be null here", 
exception.getMessage());
-    }
-
     @Test
     public void testNewStreamsGroupEpochRecordNullGroupId() {
         NullPointerException exception = 
assertThrows(NullPointerException.class, () ->
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java
index a3dc088badc..5d291d9884d 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java
@@ -34,7 +34,7 @@ public class StreamsGroupBuilder {
     private StreamsTopology topology;
     private final Map<String, StreamsGroupMember> members = new HashMap<>();
     private final Map<String, TasksTuple> targetAssignments = new HashMap<>();
-    private Map<String, TopicMetadata> partitionMetadata = new HashMap<>();
+    private long metadataHash = 0L;
 
     public StreamsGroupBuilder(String groupId, int groupEpoch) {
         this.groupId = groupId;
@@ -48,8 +48,8 @@ public class StreamsGroupBuilder {
         return this;
     }
 
-    public StreamsGroupBuilder withPartitionMetadata(Map<String, 
TopicMetadata> partitionMetadata) {
-        this.partitionMetadata = partitionMetadata;
+    public StreamsGroupBuilder withMetadataHash(long metadataHash) {
+        this.metadataHash = metadataHash;
         return this;
     }
 
@@ -77,15 +77,9 @@ public class StreamsGroupBuilder {
                 
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, member))
         );
 
-        if (!partitionMetadata.isEmpty()) {
-            records.add(
-                
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId,
-                    partitionMetadata));
-        }
-
         // Add group epoch record.
         records.add(
-            
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, groupEpoch, 
0));
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, groupEpoch, 
metadataHash));
 
         // Add target assignment records.
         targetAssignments.forEach((memberId, assignment) ->
@@ -115,4 +109,8 @@ public class StreamsGroupBuilder {
 
         return records;
     }
+
+    public String groupId() {
+        return groupId;
+    }
 }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
index 99e13bbf155..0bd8caf3bbd 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
 import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import org.apache.kafka.coordinator.group.MetadataImageBuilder;
 import org.apache.kafka.coordinator.group.OffsetAndMetadata;
 import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
 import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
@@ -37,7 +38,6 @@ import 
org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAss
 import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKey;
 import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
 import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey;
-import 
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey;
 import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey;
 import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey;
 import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey;
@@ -47,18 +47,15 @@ import 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState
 import 
org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.TaskRole;
 import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
 import org.apache.kafka.coordinator.group.streams.topics.InternalTopicManager;
-import org.apache.kafka.image.TopicImage;
-import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.image.MetadataImage;
 import org.apache.kafka.timeline.SnapshotRegistry;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
-import org.mockito.MockedStatic;
 
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -66,6 +63,7 @@ import java.util.Optional;
 import java.util.OptionalInt;
 import java.util.OptionalLong;
 import java.util.Set;
+import java.util.TreeMap;
 import java.util.stream.Collectors;
 
 import static 
org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasks;
@@ -79,11 +77,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.mockStatic;
-import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.when;
 
 public class StreamsGroupTest {
@@ -505,6 +499,7 @@ public class StreamsGroupTest {
         assertEquals(StreamsGroup.StreamsGroupState.NOT_READY, 
streamsGroup.state());
 
         streamsGroup.setTopology(new StreamsTopology(1, Map.of()));
+        streamsGroup.setConfiguredTopology(new ConfiguredTopology(1, 0, 
Optional.of(new TreeMap<>()), Map.of(), Optional.empty()));
 
         assertEquals(MemberState.STABLE, member1.state());
         assertEquals(StreamsGroup.StreamsGroupState.ASSIGNING, 
streamsGroup.state());
@@ -702,6 +697,7 @@ public class StreamsGroupTest {
         );
         group.setGroupEpoch(1);
         group.setTopology(new StreamsTopology(1, Map.of()));
+        group.setConfiguredTopology(new ConfiguredTopology(1, 0, 
Optional.of(new TreeMap<>()), Map.of(), Optional.empty()));
         group.setTargetAssignmentEpoch(1);
         group.updateMember(new StreamsGroupMember.Builder("member1")
             .setMemberEpoch(1)
@@ -767,6 +763,7 @@ public class StreamsGroupTest {
         assertThrows(GroupNotEmptyException.class, 
streamsGroup::validateDeleteGroup);
 
         streamsGroup.setTopology(new StreamsTopology(1, Map.of()));
+        streamsGroup.setConfiguredTopology(new ConfiguredTopology(1, 0, 
Optional.of(new TreeMap<>()), Map.of(), Optional.empty()));
 
         assertEquals(StreamsGroup.StreamsGroupState.RECONCILING, 
streamsGroup.state());
         assertThrows(GroupNotEmptyException.class, 
streamsGroup::validateDeleteGroup);
@@ -811,6 +808,7 @@ public class StreamsGroupTest {
 
         group.setGroupEpoch(1);
         group.setTopology(new StreamsTopology(1, Map.of()));
+        group.setConfiguredTopology(new ConfiguredTopology(1, 0, 
Optional.of(new TreeMap<>()), Map.of(), Optional.empty()));
         group.setTargetAssignmentEpoch(1);
         group.updateMember(new StreamsGroupMember.Builder("member1")
             .setMemberEpoch(1)
@@ -907,109 +905,7 @@ public class StreamsGroupTest {
     }
 
     @Test
-    public void testSetTopologyUpdatesStateAndConfiguredTopology() {
-        SnapshotRegistry snapshotRegistry = new SnapshotRegistry(LOG_CONTEXT);
-        GroupCoordinatorMetricsShard metricsShard = 
mock(GroupCoordinatorMetricsShard.class);
-        StreamsGroup streamsGroup = new StreamsGroup(LOG_CONTEXT, 
snapshotRegistry, "test-group", metricsShard);
-
-        StreamsTopology topology = new StreamsTopology(1, Map.of());
-
-        ConfiguredTopology topo = mock(ConfiguredTopology.class);
-        when(topo.isReady()).thenReturn(true);
-
-        try (MockedStatic<InternalTopicManager> mocked = 
mockStatic(InternalTopicManager.class)) {
-            mocked.when(() -> InternalTopicManager.configureTopics(any(), 
eq(topology), eq(Map.of()))).thenReturn(topo);
-            streamsGroup.setTopology(topology);
-            mocked.verify(() -> InternalTopicManager.configureTopics(any(), 
eq(topology), eq(Map.of())));
-        }
-
-        Optional<ConfiguredTopology> configuredTopology = 
streamsGroup.configuredTopology();
-        assertTrue(configuredTopology.isPresent(), "Configured topology should 
be present");
-        assertEquals(StreamsGroupState.EMPTY, streamsGroup.state());
-
-        streamsGroup.updateMember(new StreamsGroupMember.Builder("member1")
-            .setMemberEpoch(1)
-            .build());
-
-        assertEquals(StreamsGroupState.RECONCILING, streamsGroup.state());
-    }
-
-    @Test
-    public void 
testSetTopologyUpdatesStateAndConfiguredTopologyWithPreviousCallToSetMetadata() 
{
-        Uuid topicUuid = Uuid.randomUuid();
-        SnapshotRegistry snapshotRegistry = new SnapshotRegistry(LOG_CONTEXT);
-        GroupCoordinatorMetricsShard metricsShard = 
mock(GroupCoordinatorMetricsShard.class);
-        StreamsGroup streamsGroup = new StreamsGroup(LOG_CONTEXT, 
snapshotRegistry, "test-group", metricsShard);
-
-        assertEquals(StreamsGroup.StreamsGroupState.EMPTY, 
streamsGroup.state());
-
-        Map<String, TopicMetadata> partitionMetadata = new HashMap<>();
-        partitionMetadata.put("topic1", new TopicMetadata(topicUuid, "topic1", 
1));
-
-        try (MockedStatic<InternalTopicManager> mocked = 
mockStatic(InternalTopicManager.class)) {
-            streamsGroup.setPartitionMetadata(partitionMetadata);
-            mocked.verify(() -> InternalTopicManager.configureTopics(any(), 
any(), any()), never());
-        }
-
-        assertTrue(streamsGroup.configuredTopology().isEmpty(), "Configured 
topology should not be present");
-        assertEquals(partitionMetadata, streamsGroup.partitionMetadata());
-
-        StreamsTopology topology = new StreamsTopology(1, Map.of());
-        ConfiguredTopology topo = mock(ConfiguredTopology.class);
-        when(topo.isReady()).thenReturn(true);
-        try (MockedStatic<InternalTopicManager> mocked = 
mockStatic(InternalTopicManager.class)) {
-            mocked.when(() -> InternalTopicManager.configureTopics(any(), 
eq(topology), eq(partitionMetadata))).thenReturn(topo);
-            streamsGroup.setTopology(topology);
-            mocked.verify(() -> InternalTopicManager.configureTopics(any(), 
eq(topology), eq(partitionMetadata)));
-        }
-    }
-
-    @Test
-    public void testSetPartitionMetadataUpdatesStateAndConfiguredTopology() {
-        Uuid topicUuid = Uuid.randomUuid();
-        SnapshotRegistry snapshotRegistry = new SnapshotRegistry(LOG_CONTEXT);
-        GroupCoordinatorMetricsShard metricsShard = 
mock(GroupCoordinatorMetricsShard.class);
-        StreamsGroup streamsGroup = new StreamsGroup(LOG_CONTEXT, 
snapshotRegistry, "test-group", metricsShard);
-
-        assertEquals(StreamsGroup.StreamsGroupState.EMPTY, 
streamsGroup.state());
-
-        Map<String, TopicMetadata> partitionMetadata = new HashMap<>();
-        partitionMetadata.put("topic1", new TopicMetadata(topicUuid, "topic1", 
1));
-
-        try (MockedStatic<InternalTopicManager> mocked = 
mockStatic(InternalTopicManager.class)) {
-            streamsGroup.setPartitionMetadata(partitionMetadata);
-            mocked.verify(() -> InternalTopicManager.configureTopics(any(), 
any(), any()), never());
-        }
-
-        assertTrue(streamsGroup.configuredTopology().isEmpty(), "Configured 
topology should not be present");
-        assertEquals(partitionMetadata, streamsGroup.partitionMetadata());
-
-        StreamsTopology topology = new StreamsTopology(1, Map.of());
-        streamsGroup.setTopology(topology);
-        ConfiguredTopology topo = mock(ConfiguredTopology.class);
-        when(topo.isReady()).thenReturn(true);
-
-        try (MockedStatic<InternalTopicManager> mocked = 
mockStatic(InternalTopicManager.class)) {
-            mocked.when(() -> InternalTopicManager.configureTopics(any(), 
eq(topology), eq(partitionMetadata))).thenReturn(topo);
-            streamsGroup.setPartitionMetadata(partitionMetadata);
-            mocked.verify(() -> InternalTopicManager.configureTopics(any(), 
eq(topology), eq(partitionMetadata)));
-        }
-
-        Optional<ConfiguredTopology> configuredTopology = 
streamsGroup.configuredTopology();
-        assertTrue(configuredTopology.isPresent(), "Configured topology should 
be present");
-        assertEquals(topo, configuredTopology.get());
-        assertEquals(partitionMetadata, streamsGroup.partitionMetadata());
-        assertEquals(StreamsGroupState.EMPTY, streamsGroup.state());
-
-        streamsGroup.updateMember(new StreamsGroupMember.Builder("member1")
-            .setMemberEpoch(1)
-            .build());
-
-        assertEquals(StreamsGroupState.RECONCILING, streamsGroup.state());
-    }
-
-    @Test
-    public void testComputePartitionMetadata() {
+    public void testComputeMetadataHash() {
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(LOG_CONTEXT);
         StreamsGroup streamsGroup = new StreamsGroup(
             LOG_CONTEXT,
@@ -1017,24 +913,17 @@ public class StreamsGroupTest {
             "group-foo",
             mock(GroupCoordinatorMetricsShard.class)
         );
-        TopicsImage topicsImage = mock(TopicsImage.class);
-        TopicImage topicImage = mock(TopicImage.class);
-        when(topicImage.id()).thenReturn(Uuid.randomUuid());
-        when(topicImage.name()).thenReturn("topic1");
-        when(topicImage.partitions()).thenReturn(Collections.singletonMap(0, 
null));
-        when(topicsImage.getTopic("topic1")).thenReturn(topicImage);
+
+        MetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(Uuid.randomUuid(), "topic1", 1)
+            .build();
+
         StreamsTopology topology = mock(StreamsTopology.class);
         when(topology.requiredTopics()).thenReturn(Set.of("topic1"));
 
-        Map<String, TopicMetadata> partitionMetadata = 
streamsGroup.computePartitionMetadata(topicsImage, topology);
-
-        assertEquals(1, partitionMetadata.size());
-        assertTrue(partitionMetadata.containsKey("topic1"));
-        TopicMetadata topicMetadata = partitionMetadata.get("topic1");
-        assertNotNull(topicMetadata);
-        assertEquals(topicImage.id(), topicMetadata.id());
-        assertEquals("topic1", topicMetadata.name());
-        assertEquals(1, topicMetadata.numPartitions());
+        long metadataHash = streamsGroup.computeMetadataHash(metadataImage, 
new HashMap<>(), topology);
+        // The metadata hash means no topic.
+        assertNotEquals(0, metadataHash);
     }
 
     @Test
@@ -1053,7 +942,7 @@ public class StreamsGroupTest {
 
         streamsGroup.createGroupTombstoneRecords(records);
 
-        assertEquals(7, records.size());
+        assertEquals(6, records.size());
         for (CoordinatorRecord record : records) {
             assertNotNull(record.key());
             assertNull(record.value());
@@ -1061,7 +950,6 @@ public class StreamsGroupTest {
         final Set<ApiMessage> keys = 
records.stream().map(CoordinatorRecord::key).collect(Collectors.toSet());
         assertTrue(keys.contains(new 
StreamsGroupMetadataKey().setGroupId("test-group")));
         assertTrue(keys.contains(new 
StreamsGroupTargetAssignmentMetadataKey().setGroupId("test-group")));
-        assertTrue(keys.contains(new 
StreamsGroupPartitionMetadataKey().setGroupId("test-group")));
         assertTrue(keys.contains(new 
StreamsGroupTopologyKey().setGroupId("test-group")));
         assertTrue(keys.contains(new 
StreamsGroupMemberMetadataKey().setGroupId("test-group").setMemberId("member1")));
         assertTrue(keys.contains(new 
StreamsGroupTargetAssignmentMemberKey().setGroupId("test-group").setMemberId("member1")));
@@ -1079,28 +967,26 @@ public class StreamsGroupTest {
         assertFalse(streamsGroup.isSubscribedToTopic("test-topic2"));
         assertFalse(streamsGroup.isSubscribedToTopic("non-existent-topic"));
 
-        streamsGroup.setTopology(
-            new StreamsTopology(1,
-                Map.of("test-subtopology",
-                    new StreamsGroupTopologyValue.Subtopology()
-                        .setSubtopologyId("test-subtopology")
-                        .setSourceTopics(List.of("test-topic1"))
-                        .setRepartitionSourceTopics(List.of(new 
StreamsGroupTopologyValue.TopicInfo().setName("test-topic2")))
-                        .setRepartitionSinkTopics(List.of("test-topic2"))
-                )
-            )
-        );
+        StreamsTopology topology = new StreamsTopology(1,
+            Map.of("test-subtopology",
+                new StreamsGroupTopologyValue.Subtopology()
+                    .setSubtopologyId("test-subtopology")
+                    .setSourceTopics(List.of("test-topic1"))
+                    .setRepartitionSourceTopics(List.of(new 
StreamsGroupTopologyValue.TopicInfo().setName("test-topic2")))
+                    .setRepartitionSinkTopics(List.of("test-topic2"))
+            ));
+        streamsGroup.setTopology(topology);
 
         assertFalse(streamsGroup.isSubscribedToTopic("test-topic1"));
         assertFalse(streamsGroup.isSubscribedToTopic("test-topic2"));
         assertFalse(streamsGroup.isSubscribedToTopic("non-existent-topic"));
 
-        streamsGroup.setPartitionMetadata(
-            Map.of(
-                "test-topic1", new TopicMetadata(Uuid.randomUuid(), 
"test-topic1", 1),
-                "test-topic2", new TopicMetadata(Uuid.randomUuid(), 
"test-topic2", 1)
-            )
-        );
+        MetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(Uuid.randomUuid(), "test-topic1", 1)
+            .addTopic(Uuid.randomUuid(), "test-topic2", 1)
+            .build();
+
+        
streamsGroup.setConfiguredTopology(InternalTopicManager.configureTopics(logContext,
 0, topology, metadataImage.topics()));
 
         assertTrue(streamsGroup.isSubscribedToTopic("test-topic1"));
         assertTrue(streamsGroup.isSubscribedToTopic("test-topic2"));
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java
index 8b51b1b58b4..b55b05d30d9 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java
@@ -28,6 +28,7 @@ import 
org.apache.kafka.coordinator.group.streams.assignor.MemberAssignment;
 import org.apache.kafka.coordinator.group.streams.assignor.TaskAssignor;
 import org.apache.kafka.coordinator.group.streams.topics.ConfiguredSubtopology;
 import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
+import org.apache.kafka.image.MetadataImage;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -659,7 +660,7 @@ public class TargetAssignmentBuilderTest {
         private final int groupEpoch;
         private final TaskAssignor assignor = mock(TaskAssignor.class);
         private final SortedMap<String, ConfiguredSubtopology> subtopologies = 
new TreeMap<>();
-        private final ConfiguredTopology topology = new ConfiguredTopology(0, 
Optional.of(subtopologies), new HashMap<>(),
+        private final ConfiguredTopology topology = new ConfiguredTopology(0, 
0, Optional.of(subtopologies), new HashMap<>(),
             Optional.empty());
         private final Map<String, StreamsGroupMember> members = new 
HashMap<>();
         private final Map<String, 
org.apache.kafka.coordinator.group.streams.TopicMetadata> subscriptionMetadata 
= new HashMap<>();
@@ -711,11 +712,6 @@ public class TargetAssignmentBuilderTest {
         ) {
             String subtopologyId = Uuid.randomUuid().toString();
             Uuid topicId = Uuid.randomUuid();
-            subscriptionMetadata.put(topicName, new 
org.apache.kafka.coordinator.group.streams.TopicMetadata(
-                topicId,
-                topicName,
-                numTasks
-            ));
             topicsImageBuilder = topicsImageBuilder.addTopic(topicId, 
topicName, numTasks);
             subtopologies.put(subtopologyId, new 
ConfiguredSubtopology(Set.of(topicId.toString()), Map.of(), Set.of(), 
Map.of()));
 
@@ -805,8 +801,10 @@ public class TargetAssignmentBuilderTest {
                 }
             });
 
+            MetadataImage metadataImage = topicsImageBuilder.build();
+
             // Prepare the expected topology metadata.
-            TopologyMetadata topologyMetadata = new 
TopologyMetadata(subscriptionMetadata, subtopologies);
+            TopologyMetadata topologyMetadata = new 
TopologyMetadata(metadataImage, subtopologies);
 
             // Prepare the expected assignment spec.
             GroupSpecImpl groupSpec = new GroupSpecImpl(memberSpecs, new 
HashMap<>());
@@ -822,7 +820,7 @@ public class TargetAssignmentBuilderTest {
                 .withMembers(members)
                 .withTopology(topology)
                 .withStaticMembers(staticMembers)
-                .withPartitionMetadata(subscriptionMetadata)
+                .withMetadataImage(metadataImage)
                 .withTargetAssignment(targetAssignment);
 
             // Add the updated members or delete the deleted members.
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TopicMetadataTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TopicMetadataTest.java
index 59712d5c954..6105a4dfe3b 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TopicMetadataTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TopicMetadataTest.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.coordinator.group.streams;
 
 import org.apache.kafka.common.Uuid;
-import 
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue;
 
 import org.junit.jupiter.api.Test;
 
@@ -72,18 +71,4 @@ public class TopicMetadataTest {
             new TopicMetadata(Uuid.randomUuid(), "valid-topic", -1));
         assertEquals("Number of partitions must be positive.", 
exception.getMessage());
     }
-
-    @Test
-    public void testFromRecord() {
-        StreamsGroupPartitionMetadataValue.TopicMetadata record = new 
StreamsGroupPartitionMetadataValue.TopicMetadata()
-            .setTopicId(Uuid.randomUuid())
-            .setTopicName("test-topic")
-            .setNumPartitions(3);
-
-        TopicMetadata topicMetadata = TopicMetadata.fromRecord(record);
-
-        assertEquals(record.topicId(), topicMetadata.id());
-        assertEquals(record.topicName(), topicMetadata.name());
-        assertEquals(record.numPartitions(), topicMetadata.numPartitions());
-    }
 }
\ No newline at end of file
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TopologyMetadataTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TopologyMetadataTest.java
index a5c18a6f0f2..a39914db1bc 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TopologyMetadataTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TopologyMetadataTest.java
@@ -16,13 +16,15 @@
  */
 package org.apache.kafka.coordinator.group.streams;
 
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.MetadataImageBuilder;
 import 
org.apache.kafka.coordinator.group.streams.topics.ConfiguredInternalTopic;
 import org.apache.kafka.coordinator.group.streams.topics.ConfiguredSubtopology;
+import org.apache.kafka.image.MetadataImage;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
@@ -40,20 +42,23 @@ import static org.mockito.Mockito.when;
 
 class TopologyMetadataTest {
 
-    private Map<String, TopicMetadata> topicMetadata;
+    private MetadataImage metadataImage;
     private SortedMap<String, ConfiguredSubtopology> subtopologyMap;
     private TopologyMetadata topologyMetadata;
 
     @BeforeEach
     void setUp() {
-        topicMetadata = new HashMap<>();
+        metadataImage = new MetadataImageBuilder()
+            .addTopic(Uuid.randomUuid(), "source_topic", 3)
+            .addTopic(Uuid.randomUuid(), "repartition_source_topic", 4)
+            .build();
         subtopologyMap = new TreeMap<>();
-        topologyMetadata = new TopologyMetadata(topicMetadata, subtopologyMap);
+        topologyMetadata = new TopologyMetadata(metadataImage, subtopologyMap);
     }
 
     @Test
-    void testTopicMetadata() {
-        assertEquals(topicMetadata, topologyMetadata.topicMetadata());
+    void testMetadataImage() {
+        assertEquals(metadataImage, topologyMetadata.metadataImage());
     }
 
     @Test
@@ -83,13 +88,6 @@ class TopologyMetadataTest {
         when(subtopology.sourceTopics()).thenReturn(Set.of("source_topic"));
         
when(subtopology.repartitionSourceTopics()).thenReturn(Map.of("repartition_source_topic",
 internalTopic));
 
-        TopicMetadata topicMeta1 = mock(TopicMetadata.class);
-        TopicMetadata topicMeta2 = mock(TopicMetadata.class);
-        topicMetadata.put("source_topic", topicMeta1);
-        topicMetadata.put("repartition_source_topic", topicMeta2);
-        when(topicMeta1.numPartitions()).thenReturn(3);
-        when(topicMeta2.numPartitions()).thenReturn(4);
-
         assertEquals(4, 
topologyMetadata.maxNumInputPartitions("subtopology1"));
     }
 
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopologyTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopologyTest.java
index b298be7300a..eadaaeb1338 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopologyTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopologyTest.java
@@ -41,6 +41,7 @@ public class ConfiguredTopologyTest {
     public void testConstructorWithNullSubtopologies() {
         assertThrows(NullPointerException.class,
             () -> new ConfiguredTopology(
+                0,
                 0,
                 null,
                 Map.of(),
@@ -53,6 +54,7 @@ public class ConfiguredTopologyTest {
     public void testConstructorWithNullInternalTopicsToBeCreated() {
         assertThrows(NullPointerException.class,
             () -> new ConfiguredTopology(
+                0,
                 0,
                 Optional.of(new TreeMap<>()),
                 null,
@@ -65,6 +67,7 @@ public class ConfiguredTopologyTest {
     public void testConstructorWithNullTopicConfigurationException() {
         assertThrows(NullPointerException.class,
             () -> new ConfiguredTopology(
+                0,
                 0,
                 Optional.empty(),
                 Map.of(),
@@ -78,6 +81,7 @@ public class ConfiguredTopologyTest {
         assertThrows(IllegalArgumentException.class,
             () -> new ConfiguredTopology(
                 -1,
+                0,
                 Optional.of(new TreeMap<>()),
                 Map.of(),
                 Optional.empty()
@@ -90,6 +94,7 @@ public class ConfiguredTopologyTest {
         final IllegalArgumentException ex = 
assertThrows(IllegalArgumentException.class,
             () -> new ConfiguredTopology(
                 1,
+                0,
                 Optional.empty(),
                 Map.of(),
                 Optional.empty()
@@ -101,11 +106,11 @@ public class ConfiguredTopologyTest {
     @Test
     public void testIsReady() {
         ConfiguredTopology readyTopology = new ConfiguredTopology(
-            1, Optional.of(new TreeMap<>()), new HashMap<>(), 
Optional.empty());
+            1, 0, Optional.of(new TreeMap<>()), new HashMap<>(), 
Optional.empty());
         assertTrue(readyTopology.isReady());
 
         ConfiguredTopology notReadyTopology = new ConfiguredTopology(
-            1, Optional.empty(), new HashMap<>(), 
Optional.of(TopicConfigurationException.missingSourceTopics("missing")));
+            1, 0, Optional.empty(), new HashMap<>(), 
Optional.of(TopicConfigurationException.missingSourceTopics("missing")));
         assertFalse(notReadyTopology.isReady());
     }
 
@@ -120,7 +125,7 @@ public class ConfiguredTopologyTest {
         Map<String, CreatableTopic> internalTopicsToBeCreated = new 
HashMap<>();
         Optional<TopicConfigurationException> topicConfigurationException = 
Optional.empty();
         ConfiguredTopology configuredTopology = new ConfiguredTopology(
-            topologyEpoch, Optional.of(subtopologies), 
internalTopicsToBeCreated, topicConfigurationException);
+            topologyEpoch, 0, Optional.of(subtopologies), 
internalTopicsToBeCreated, topicConfigurationException);
 
         StreamsGroupDescribeResponseData.Topology topology = 
configuredTopology.asStreamsGroupDescribeTopology();
 
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/EndpointToPartitionsManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/EndpointToPartitionsManagerTest.java
index 2002774b60b..cba28e0163d 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/EndpointToPartitionsManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/EndpointToPartitionsManagerTest.java
@@ -19,10 +19,11 @@ package org.apache.kafka.coordinator.group.streams.topics;
 
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
+import org.apache.kafka.coordinator.group.MetadataImageBuilder;
 import org.apache.kafka.coordinator.group.streams.StreamsGroup;
 import org.apache.kafka.coordinator.group.streams.StreamsGroupMember;
 import org.apache.kafka.coordinator.group.streams.TasksTuple;
-import org.apache.kafka.coordinator.group.streams.TopicMetadata;
+import org.apache.kafka.image.MetadataImage;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -79,16 +80,16 @@ class EndpointToPartitionsManagerTest {
 
     @Test
     void testEndpointToPartitionsWithStandbyTaskAssignments() {
-        Map<String, TopicMetadata> topicMetadata = new HashMap<>();
-        topicMetadata.put("Topic-A", new TopicMetadata(Uuid.randomUuid(), 
"Topic-A", 3));
-        topicMetadata.put("Topic-B", new TopicMetadata(Uuid.randomUuid(), 
"Topic-B", 3));
+        MetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(Uuid.randomUuid(), "Topic-A", 3)
+            .addTopic(Uuid.randomUuid(), "Topic-B", 3)
+            .build();
 
         activeTasks.put("0", Set.of(0, 1, 2));
         standbyTasks.put("1", Set.of(0, 1, 2));
         tasksTuple = new TasksTuple(activeTasks, standbyTasks, 
Collections.emptyMap());
         when(streamsGroupMember.assignedTasks()).thenReturn(tasksTuple);
         
//when(streamsGroupMember.assignedTasks().standbyTasks()).thenReturn(tasksTuple.standbyTasks());
-        when((streamsGroup.partitionMetadata())).thenReturn(topicMetadata);
         
when(streamsGroup.configuredTopology()).thenReturn(Optional.of(configuredTopology));
         SortedMap<String, ConfiguredSubtopology> configuredSubtopologyMap = 
new TreeMap<>();
         configuredSubtopologyMap.put("0", configuredSubtopologyOne);
@@ -96,7 +97,7 @@ class EndpointToPartitionsManagerTest {
         
when(configuredTopology.subtopologies()).thenReturn(Optional.of(configuredSubtopologyMap));
 
         StreamsGroupHeartbeatResponseData.EndpointToPartitions result =
-                
EndpointToPartitionsManager.endpointToPartitions(streamsGroupMember, 
responseEndpoint, streamsGroup);
+                
EndpointToPartitionsManager.endpointToPartitions(streamsGroupMember, 
responseEndpoint, streamsGroup, metadataImage);
 
         assertEquals(responseEndpoint, result.userEndpoint());
         assertEquals(1, result.activePartitions().size());
@@ -123,20 +124,20 @@ class EndpointToPartitionsManagerTest {
                                                                      
List<Integer> topicBExpectedPartitions,
                                                                      String 
testName
                                                                      ) {
-        Map<String, TopicMetadata> topicMetadata = new HashMap<>();
-        topicMetadata.put("Topic-A", new TopicMetadata(Uuid.randomUuid(), 
"Topic-A", topicAPartitions));
-        topicMetadata.put("Topic-B", new TopicMetadata(Uuid.randomUuid(), 
"Topic-B", topicBPartitions));
+        MetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(Uuid.randomUuid(), "Topic-A", topicAPartitions)
+            .addTopic(Uuid.randomUuid(), "Topic-B", topicBPartitions)
+            .build();
         configuredSubtopologyOne = new ConfiguredSubtopology(Set.of("Topic-A", 
"Topic-B"), new HashMap<>(), new HashSet<>(), new HashMap<>());
 
         activeTasks.put("0", Set.of(0, 1, 2, 3, 4));
         when(streamsGroupMember.assignedTasks()).thenReturn(new 
TasksTuple(activeTasks, Collections.emptyMap(), Collections.emptyMap()));
-        when(streamsGroup.partitionMetadata()).thenReturn(topicMetadata);
         
when(streamsGroup.configuredTopology()).thenReturn(Optional.of(configuredTopology));
         SortedMap<String, ConfiguredSubtopology> configuredSubtopologyOneMap = 
new TreeMap<>();
         configuredSubtopologyOneMap.put("0", configuredSubtopologyOne);
         
when(configuredTopology.subtopologies()).thenReturn(Optional.of(configuredSubtopologyOneMap));
 
-        StreamsGroupHeartbeatResponseData.EndpointToPartitions result = 
EndpointToPartitionsManager.endpointToPartitions(streamsGroupMember, 
responseEndpoint, streamsGroup);
+        StreamsGroupHeartbeatResponseData.EndpointToPartitions result = 
EndpointToPartitionsManager.endpointToPartitions(streamsGroupMember, 
responseEndpoint, streamsGroup, metadataImage);
 
         assertEquals(responseEndpoint, result.userEndpoint());
         assertEquals(2, result.activePartitions().size());
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManagerTest.java
index bc013ed875d..f3b40dc282b 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManagerTest.java
@@ -22,14 +22,14 @@ import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCon
 import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicConfigCollection;
 import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse.Status;
 import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.MetadataImageBuilder;
 import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
 import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology;
 import org.apache.kafka.coordinator.group.streams.StreamsTopology;
-import org.apache.kafka.coordinator.group.streams.TopicMetadata;
+import org.apache.kafka.image.MetadataImage;
 
 import org.junit.jupiter.api.Test;
 
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -54,12 +54,13 @@ class InternalTopicManagerTest {
 
     @Test
     void 
testConfigureTopicsSetsConfigurationExceptionWhenSourceTopicIsMissing() {
-        Map<String, TopicMetadata> topicMetadata = new HashMap<>();
-        topicMetadata.put(SOURCE_TOPIC_1, new TopicMetadata(Uuid.randomUuid(), 
SOURCE_TOPIC_1, 2));
+        MetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(Uuid.randomUuid(), SOURCE_TOPIC_1, 2)
+            .build();
         // SOURCE_TOPIC_2 is missing from topicMetadata
         StreamsTopology topology = makeTestTopology();
 
-        final ConfiguredTopology configuredTopology = 
InternalTopicManager.configureTopics(new LogContext(), topology, topicMetadata);
+        final ConfiguredTopology configuredTopology = 
InternalTopicManager.configureTopics(new LogContext(), 0, topology, 
metadataImage.topics());
 
         assertEquals(Optional.empty(), configuredTopology.subtopologies());
         
assertTrue(configuredTopology.topicConfigurationException().isPresent());
@@ -69,14 +70,14 @@ class InternalTopicManagerTest {
 
     @Test
     void testConfigureTopics() {
-        Map<String, TopicMetadata> topicMetadata = new HashMap<>();
-        topicMetadata.put(SOURCE_TOPIC_1, new TopicMetadata(Uuid.randomUuid(), 
SOURCE_TOPIC_1, 2));
-        topicMetadata.put(SOURCE_TOPIC_2, new TopicMetadata(Uuid.randomUuid(), 
SOURCE_TOPIC_2, 2));
-        topicMetadata.put(STATE_CHANGELOG_TOPIC_2,
-            new TopicMetadata(Uuid.randomUuid(), STATE_CHANGELOG_TOPIC_2, 2));
+        MetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(Uuid.randomUuid(), SOURCE_TOPIC_1, 2)
+            .addTopic(Uuid.randomUuid(), SOURCE_TOPIC_2, 2)
+            .addTopic(Uuid.randomUuid(), STATE_CHANGELOG_TOPIC_2, 2)
+            .build();
         StreamsTopology topology = makeTestTopology();
 
-        ConfiguredTopology configuredTopology = 
InternalTopicManager.configureTopics(new LogContext(), topology, topicMetadata);
+        ConfiguredTopology configuredTopology = 
InternalTopicManager.configureTopics(new LogContext(), 0, topology, 
metadataImage.topics());
         final Map<String, CreatableTopic> internalTopicsToBeCreated = 
configuredTopology.internalTopicsToBeCreated();
 
         assertEquals(2, internalTopicsToBeCreated.size());


Reply via email to