This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 03e6aa43988 KAFKA-20266: Add tracking of assignment timestamps (#21652)
03e6aa43988 is described below

commit 03e6aa4398869ae20dc58976fa6c4215f9a24bc7
Author: Sean Quah <[email protected]>
AuthorDate: Sat Mar 7 11:10:16 2026 +0000

    KAFKA-20266: Add tracking of assignment timestamps (#21652)
    
    Add Timestamp fields to TargetAssignmentMetadata records which default
    to 0 when absent. The new timestamp field records when the last target
    assignment calculation finished.
    
    When upgrading a classic group, the new timestamp field is initialized
    to 0.
    
    Reviewers: khilesh Chaganti <[email protected]>,
     Chia-Ping Tsai <[email protected]>, David Jacot <[email protected]>
---
 .../group/GroupCoordinatorRecordHelpers.java       |  26 +--
 .../coordinator/group/GroupMetadataManager.java    |  15 +-
 .../group/TargetAssignmentMetadata.java            |  42 +++++
 .../coordinator/group/modern/ModernGroup.java      |  25 ++-
 .../group/modern/TargetAssignmentBuilder.java      |  46 +++++-
 .../group/modern/consumer/ConsumerGroup.java       |  10 +-
 .../coordinator/group/modern/share/ShareGroup.java |   2 +-
 .../streams/StreamsCoordinatorRecordHelpers.java   |  17 +-
 .../coordinator/group/streams/StreamsGroup.java    |  30 ++--
 .../group/streams/TargetAssignmentBuilder.java     |  24 ++-
 ...ConsumerGroupTargetAssignmentMetadataValue.json |   5 +-
 .../ShareGroupTargetAssignmentMetadataValue.json   |   5 +-
 .../StreamsGroupTargetAssignmentMetadataValue.json |   5 +-
 .../group/GroupCoordinatorRecordHelpersTest.java   |  10 +-
 .../group/GroupMetadataManagerTest.java            | 184 ++++++++++++++-------
 .../group/GroupMetadataManagerTestContext.java     |   2 +-
 .../group/classic/ClassicGroupTest.java            |   4 +-
 .../group/modern/TargetAssignmentBuilderTest.java  |  81 +++++----
 .../modern/consumer/ConsumerGroupBuilder.java      |   9 +-
 .../group/modern/consumer/ConsumerGroupTest.java   |   8 +-
 .../group/modern/share/ShareGroupBuilder.java      |   9 +-
 .../group/modern/share/ShareGroupTest.java         |   2 +-
 .../StreamsCoordinatorRecordHelpersTest.java       |   7 +-
 .../group/streams/StreamsGroupBuilder.java         |  11 +-
 .../group/streams/StreamsGroupTest.java            |  14 +-
 .../group/streams/TargetAssignmentBuilderTest.java |  79 +++++----
 26 files changed, 467 insertions(+), 205 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java
index f77a092d2ab..0700b11858c 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java
@@ -235,20 +235,23 @@ public class GroupCoordinatorRecordHelpers {
     /**
      * Creates a ConsumerGroupTargetAssignmentMetadata record.
      *
-     * @param groupId           The consumer group id.
-     * @param assignmentEpoch   The consumer group epoch.
+     * @param groupId             The consumer group id.
+     * @param assignmentEpoch     The consumer group epoch.
+     * @param assignmentTimestamp The time at which the target assignment 
calculation finished.
      * @return The record.
      */
-    public static CoordinatorRecord 
newConsumerGroupTargetAssignmentEpochRecord(
+    public static CoordinatorRecord 
newConsumerGroupTargetAssignmentMetadataRecord(
         String groupId,
-        int assignmentEpoch
+        int assignmentEpoch,
+        long assignmentTimestamp
     ) {
         return CoordinatorRecord.record(
             new ConsumerGroupTargetAssignmentMetadataKey()
                 .setGroupId(groupId),
             new ApiMessageAndVersion(
                 new ConsumerGroupTargetAssignmentMetadataValue()
-                    .setAssignmentEpoch(assignmentEpoch),
+                    .setAssignmentEpoch(assignmentEpoch)
+                    .setAssignmentTimestamp(assignmentTimestamp),
                 (short) 0
             )
         );
@@ -663,20 +666,23 @@ public class GroupCoordinatorRecordHelpers {
     /**
      * Creates a ShareGroupTargetAssignmentMetadata record.
      *
-     * @param groupId           The group id.
-     * @param assignmentEpoch   The group epoch.
+     * @param groupId             The group id.
+     * @param assignmentEpoch     The group epoch.
+     * @param assignmentTimestamp The time at which the target assignment 
calculation finished.
      * @return The record.
      */
-    public static CoordinatorRecord newShareGroupTargetAssignmentEpochRecord(
+    public static CoordinatorRecord 
newShareGroupTargetAssignmentMetadataRecord(
         String groupId,
-        int assignmentEpoch
+        int assignmentEpoch,
+        long assignmentTimestamp
     ) {
         return CoordinatorRecord.record(
             new ShareGroupTargetAssignmentMetadataKey()
                 .setGroupId(groupId),
             new ApiMessageAndVersion(
                 new ShareGroupTargetAssignmentMetadataValue()
-                    .setAssignmentEpoch(assignmentEpoch),
+                    .setAssignmentEpoch(assignmentEpoch)
+                    .setAssignmentTimestamp(assignmentTimestamp),
                 (short) 0
             )
         );
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index fa45d1c6e8a..2033e24286a 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -3821,6 +3821,7 @@ public class GroupMetadataManager {
         try {
             TargetAssignmentBuilder.ConsumerTargetAssignmentBuilder 
assignmentResultBuilder =
                 new 
TargetAssignmentBuilder.ConsumerTargetAssignmentBuilder(group.groupId(), 
groupEpoch, consumerGroupAssignors.get(preferredServerAssignor))
+                    .withTime(time)
                     .withMembers(group.members())
                     .withStaticMembers(group.staticMembers())
                     .withSubscriptionType(subscriptionType)
@@ -3890,6 +3891,7 @@ public class GroupMetadataManager {
 
             TargetAssignmentBuilder.ShareTargetAssignmentBuilder 
assignmentResultBuilder =
                 new 
TargetAssignmentBuilder.ShareTargetAssignmentBuilder(group.groupId(), 
groupEpoch, shareGroupAssignor)
+                    .withTime(time)
                     .withMembers(group.members())
                     .withSubscriptionType(subscriptionType)
                     .withTargetAssignment(group.targetAssignment())
@@ -3955,6 +3957,7 @@ public class GroupMetadataManager {
                     assignor,
                     assignmentConfigs
                 )
+                .withTime(time)
                 .withMembers(group.members())
                 .withTopology(configuredTopology)
                 .withStaticMembers(group.staticMembers())
@@ -5337,7 +5340,7 @@ public class GroupMetadataManager {
 
         if (value != null) {
             ConsumerGroup group = 
getOrMaybeCreatePersistedConsumerGroup(groupId, true);
-            group.setTargetAssignmentEpoch(value.assignmentEpoch());
+            group.setTargetAssignmentMetadata(value.assignmentEpoch(), 
value.assignmentTimestamp());
         } else {
             ConsumerGroup group;
             try {
@@ -5350,7 +5353,7 @@ public class GroupMetadataManager {
                 throw new IllegalStateException("Received a tombstone record 
to delete target assignment of " + groupId
                     + " but the assignment still has " + 
group.targetAssignment().size() + " members.");
             }
-            group.setTargetAssignmentEpoch(-1);
+            group.setTargetAssignmentMetadata(-1, 0L);
         }
     }
 
@@ -5653,7 +5656,7 @@ public class GroupMetadataManager {
 
         if (value != null) {
             StreamsGroup streamsGroup = 
getOrMaybeCreatePersistedStreamsGroup(groupId, true);
-            streamsGroup.setTargetAssignmentEpoch(value.assignmentEpoch());
+            streamsGroup.setTargetAssignmentMetadata(value.assignmentEpoch(), 
value.assignmentTimestamp());
         } else {
             StreamsGroup streamsGroup;
             try {
@@ -5666,7 +5669,7 @@ public class GroupMetadataManager {
                 throw new IllegalStateException("Received a tombstone record 
to delete target assignment of " + groupId
                     + " but the assignment still has " + 
streamsGroup.targetAssignment().size() + " members.");
             }
-            streamsGroup.setTargetAssignmentEpoch(-1);
+            streamsGroup.setTargetAssignmentMetadata(-1, 0L);
         }
     }
 
@@ -5801,13 +5804,13 @@ public class GroupMetadataManager {
         }
 
         if (value != null) {
-            group.setTargetAssignmentEpoch(value.assignmentEpoch());
+            group.setTargetAssignmentMetadata(value.assignmentEpoch(), 
value.assignmentTimestamp());
         } else {
             if (!group.targetAssignment().isEmpty()) {
                 throw new IllegalStateException("Received a tombstone record 
to delete target assignment of " + groupId
                         + " but the assignment still has " + 
group.targetAssignment().size() + " members.");
             }
-            group.setTargetAssignmentEpoch(-1);
+            group.setTargetAssignmentMetadata(-1, 0L);
         }
     }
 
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/TargetAssignmentMetadata.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/TargetAssignmentMetadata.java
new file mode 100644
index 00000000000..ce9d6b30843
--- /dev/null
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/TargetAssignmentMetadata.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+/**
+ * The target assignment metadata.
+ *
+ * @param assignmentEpoch     The target assignment epoch. An assignment epoch 
smaller than the
+ *                            group epoch means that a new assignment is 
required. The
+ *                            assignment epoch is updated when a new 
assignment is installed.
+ * @param assignmentTimestamp The time at which the target assignment 
calculation finished.
+ */
+public record TargetAssignmentMetadata(int assignmentEpoch, long 
assignmentTimestamp) {
+    /**
+     * The initial target assignment metadata for groups.
+     * This is different to tombstoned assignment metadata which has an 
assignment epoch of -1.
+     */
+    public static final TargetAssignmentMetadata ZERO = new 
TargetAssignmentMetadata(0, 0L);
+
+    public TargetAssignmentMetadata {
+        if (assignmentEpoch < 0 && assignmentEpoch != -1) {
+            throw new IllegalArgumentException("The assignment epoch must be 
non-negative or -1.");
+        }
+        if (assignmentTimestamp < 0) {
+            throw new IllegalArgumentException("The assignment timestamp must 
be non-negative.");
+        }
+    }
+}
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java
index 1fa7ef24e95..6e882b29d09 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java
@@ -21,6 +21,7 @@ import 
org.apache.kafka.common.errors.UnknownMemberIdException;
 import org.apache.kafka.common.message.ListGroupsResponseData;
 import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage;
 import org.apache.kafka.coordinator.group.Group;
+import org.apache.kafka.coordinator.group.TargetAssignmentMetadata;
 import org.apache.kafka.coordinator.group.Utils;
 import org.apache.kafka.coordinator.group.api.assignor.SubscriptionType;
 import org.apache.kafka.timeline.SnapshotRegistry;
@@ -96,11 +97,9 @@ public abstract class ModernGroup<T extends 
ModernGroupMember> implements Group
     protected final TimelineObject<SubscriptionType> subscriptionType;
 
     /**
-     * The target assignment epoch. An assignment epoch smaller than the group 
epoch
-     * means that a new assignment is required. The assignment epoch is 
updated when
-     * a new assignment is installed.
+     * The target assignment metadata.
      */
-    protected final TimelineInteger targetAssignmentEpoch;
+    protected final TimelineObject<TargetAssignmentMetadata> 
targetAssignmentMetadata;
 
     /**
      * The target assignment per member id.
@@ -136,7 +135,7 @@ public abstract class ModernGroup<T extends 
ModernGroupMember> implements Group
         this.subscribedTopicNames = new TimelineHashMap<>(snapshotRegistry, 0);
         this.metadataHash = new TimelineLong(snapshotRegistry);
         this.subscriptionType = new TimelineObject<>(snapshotRegistry, 
HOMOGENEOUS);
-        this.targetAssignmentEpoch = new TimelineInteger(snapshotRegistry);
+        this.targetAssignmentMetadata = new TimelineObject<>(snapshotRegistry, 
TargetAssignmentMetadata.ZERO);
         this.targetAssignment = new TimelineHashMap<>(snapshotRegistry, 0);
         this.invertedTargetAssignment = new 
TimelineHashMap<>(snapshotRegistry, 0);
     }
@@ -181,16 +180,24 @@ public abstract class ModernGroup<T extends 
ModernGroupMember> implements Group
      * @return The target assignment epoch.
      */
     public int assignmentEpoch() {
-        return targetAssignmentEpoch.get();
+        return targetAssignmentMetadata.get().assignmentEpoch();
     }
 
     /**
-     * Sets the assignment epoch.
+     * @return The time at which the target assignment calculation finished.
+     */
+    public long assignmentTimestamp() {
+        return targetAssignmentMetadata.get().assignmentTimestamp();
+    }
+
+    /**
+     * Sets the assignment metadata.
      *
      * @param targetAssignmentEpoch The new assignment epoch.
+     * @param targetAssignmentTimestamp The time at which the assignment 
calculation finished.
      */
-    public void setTargetAssignmentEpoch(int targetAssignmentEpoch) {
-        this.targetAssignmentEpoch.set(targetAssignmentEpoch);
+    public void setTargetAssignmentMetadata(int targetAssignmentEpoch, long 
targetAssignmentTimestamp) {
+        this.targetAssignmentMetadata.set(new 
TargetAssignmentMetadata(targetAssignmentEpoch, targetAssignmentTimestamp));
         maybeUpdateGroupState();
     }
 
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilder.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilder.java
index d08c0342ce2..86c64e5c22d 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilder.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilder.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.coordinator.group.modern;
 
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage;
 import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
 import org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers;
@@ -139,10 +140,15 @@ public abstract class TargetAssignmentBuilder<T extends 
ModernGroupMember, U ext
         }
 
         @Override
-        protected CoordinatorRecord newTargetAssignmentEpochRecord(String 
groupId, int assignmentEpoch) {
-            return 
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(
+        protected CoordinatorRecord newTargetAssignmentMetadataRecord(
+            String groupId,
+            int assignmentEpoch,
+            long assignmentTimestamp
+        ) {
+            return 
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(
                 groupId,
-                assignmentEpoch
+                assignmentEpoch,
+                assignmentTimestamp
             );
         }
 
@@ -208,10 +214,15 @@ public abstract class TargetAssignmentBuilder<T extends 
ModernGroupMember, U ext
         }
 
         @Override
-        protected CoordinatorRecord newTargetAssignmentEpochRecord(String 
groupId, int assignmentEpoch) {
-            return 
GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentEpochRecord(
+        protected CoordinatorRecord newTargetAssignmentMetadataRecord(
+            String groupId,
+            int assignmentEpoch,
+            long assignmentTimestamp
+        ) {
+            return 
GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentMetadataRecord(
                 groupId,
-                assignmentEpoch
+                assignmentEpoch,
+                assignmentTimestamp
             );
         }
 
@@ -230,6 +241,11 @@ public abstract class TargetAssignmentBuilder<T extends 
ModernGroupMember, U ext
         }
     }
 
+    /**
+     * The time.
+     */
+    private Time time;
+
     /**
      * The group id.
      */
@@ -304,6 +320,17 @@ public abstract class TargetAssignmentBuilder<T extends 
ModernGroupMember, U ext
         this.assignor = Objects.requireNonNull(assignor);
     }
 
+    /**
+     * Sets the time.
+     *
+     * @param time The time.
+     * @return This object.
+     */
+    public U withTime(Time time) {
+        this.time = time;
+        return self();
+    }
+
     /**
      * Adds all the existing members.
      *
@@ -491,7 +518,7 @@ public abstract class TargetAssignmentBuilder<T extends 
ModernGroupMember, U ext
         }
 
         // Bump the target assignment epoch.
-        records.add(newTargetAssignmentEpochRecord(groupId, groupEpoch));
+        records.add(newTargetAssignmentMetadataRecord(groupId, groupEpoch, 
time.milliseconds()));
 
         return new TargetAssignmentResult(records, 
newGroupAssignment.members());
     }
@@ -504,9 +531,10 @@ public abstract class TargetAssignmentBuilder<T extends 
ModernGroupMember, U ext
         Map<Uuid, Set<Integer>> partitions
     );
 
-    protected abstract CoordinatorRecord newTargetAssignmentEpochRecord(
+    protected abstract CoordinatorRecord newTargetAssignmentMetadataRecord(
         String groupId,
-        int assignmentEpoch
+        int assignmentEpoch,
+        long timestampMs
     );
 
     protected abstract MemberSubscriptionAndAssignmentImpl 
newMemberSubscriptionAndAssignment(
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
index 8aa858ced02..9e6de2ea24d 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
@@ -896,11 +896,11 @@ public class ConsumerGroup extends 
ModernGroup<ConsumerGroupMember> {
         ConsumerGroupState newState = STABLE;
         if (members.isEmpty()) {
             newState = EMPTY;
-        } else if (groupEpoch.get() > targetAssignmentEpoch.get()) {
+        } else if (groupEpoch.get() > assignmentEpoch()) {
             newState = ASSIGNING;
         } else {
             for (ModernGroupMember member : members.values()) {
-                if (!member.isReconciledTo(targetAssignmentEpoch.get())) {
+                if (!member.isReconciledTo(assignmentEpoch())) {
                     newState = RECONCILING;
                     break;
                 }
@@ -1121,7 +1121,7 @@ public class ConsumerGroup extends 
ModernGroup<ConsumerGroupMember> {
             
.setAssignorName(preferredServerAssignor(committedOffset).orElse(defaultAssignor))
             .setGroupEpoch(groupEpoch.get(committedOffset))
             .setGroupState(state.get(committedOffset).toString())
-            .setAssignmentEpoch(targetAssignmentEpoch.get(committedOffset));
+            
.setAssignmentEpoch(targetAssignmentMetadata.get(committedOffset).assignmentEpoch());
         members.entrySet(committedOffset).forEach(
             entry -> describedGroup.members().add(
                 entry.getValue().asConsumerGroupDescribeMember(
@@ -1156,7 +1156,7 @@ public class ConsumerGroup extends 
ModernGroup<ConsumerGroupMember> {
         String groupId = classicGroup.groupId();
         ConsumerGroup consumerGroup = new ConsumerGroup(logContext, 
snapshotRegistry, groupId);
         consumerGroup.setGroupEpoch(classicGroup.generationId());
-        consumerGroup.setTargetAssignmentEpoch(classicGroup.generationId());
+        consumerGroup.setTargetAssignmentMetadata(classicGroup.generationId(), 
0L);
 
         classicGroup.allMembers().forEach(classicGroupMember -> {
             // The assigned partition can be empty if the member just joined 
and has never synced.
@@ -1238,7 +1238,7 @@ public class ConsumerGroup extends 
ModernGroup<ConsumerGroupMember> {
             ))
         );
 
-        
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId(),
 groupEpoch()));
+        
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId(),
 assignmentEpoch(), assignmentTimestamp()));
 
         members().forEach((__, consumerGroupMember) ->
             
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId(),
 consumerGroupMember))
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java
index 57c9cf5cb65..4d8be291743 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java
@@ -313,7 +313,7 @@ public class ShareGroup extends 
ModernGroup<ShareGroupMember> {
             .setAssignorName(defaultAssignor)
             .setGroupEpoch(groupEpoch.get(committedOffset))
             .setGroupState(state.get(committedOffset).toString())
-            .setAssignmentEpoch(targetAssignmentEpoch.get(committedOffset));
+            
.setAssignmentEpoch(targetAssignmentMetadata.get(committedOffset).assignmentEpoch());
         members.entrySet(committedOffset).forEach(
             entry -> describedGroup.members().add(
                 entry.getValue().asShareGroupDescribeMember(
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java
index 516dc44b0d2..1cac2849d8e 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java
@@ -215,10 +215,18 @@ public class StreamsCoordinatorRecordHelpers {
         );
     }
 
-
-    public static CoordinatorRecord newStreamsGroupTargetAssignmentEpochRecord(
+    /**
+     * Creates a StreamsGroupTargetAssignmentMetadata record.
+     *
+     * @param groupId             The streams group id.
+     * @param assignmentEpoch     The assignment epoch.
+     * @param assignmentTimestamp The time at which the target assignment 
calculation finished.
+     * @return The record.
+     */
+    public static CoordinatorRecord 
newStreamsGroupTargetAssignmentMetadataRecord(
         String groupId,
-        int assignmentEpoch
+        int assignmentEpoch,
+        long assignmentTimestamp
     ) {
         Objects.requireNonNull(groupId, "groupId should not be null here");
 
@@ -227,7 +235,8 @@ public class StreamsCoordinatorRecordHelpers {
                 .setGroupId(groupId),
             new ApiMessageAndVersion(
                 new StreamsGroupTargetAssignmentMetadataValue()
-                    .setAssignmentEpoch(assignmentEpoch),
+                    .setAssignmentEpoch(assignmentEpoch)
+                    .setAssignmentTimestamp(assignmentTimestamp),
                 (short) 0
             )
         );
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
index b59081f7ea3..2e62f306749 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
@@ -33,6 +33,7 @@ import 
org.apache.kafka.coordinator.group.CommitPartitionValidator;
 import org.apache.kafka.coordinator.group.Group;
 import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
 import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
+import org.apache.kafka.coordinator.group.TargetAssignmentMetadata;
 import org.apache.kafka.coordinator.group.Utils;
 import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
 import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
@@ -161,10 +162,9 @@ public class StreamsGroup implements Group {
     protected final TimelineLong metadataHash;
 
     /**
-     * The target assignment epoch. An assignment epoch smaller than the group 
epoch means that a new assignment is required. The assignment
-     * epoch is updated when a new assignment is installed.
+     * The target assignment metadata.
      */
-    private final TimelineInteger targetAssignmentEpoch;
+    private final TimelineObject<TargetAssignmentMetadata> 
targetAssignmentMetadata;
 
     /**
      * The target assignment per member ID.
@@ -241,7 +241,7 @@ public class StreamsGroup implements Group {
         this.staticMembers = new TimelineHashMap<>(snapshotRegistry, 0);
         this.validatedTopologyEpoch = new TimelineInteger(snapshotRegistry);
         this.metadataHash = new TimelineLong(snapshotRegistry);
-        this.targetAssignmentEpoch = new TimelineInteger(snapshotRegistry);
+        this.targetAssignmentMetadata = new TimelineObject<>(snapshotRegistry, 
TargetAssignmentMetadata.ZERO);
         this.targetAssignment = new TimelineHashMap<>(snapshotRegistry, 0);
         this.currentActiveTaskToProcessId = new 
TimelineHashMap<>(snapshotRegistry, 0);
         this.currentStandbyTaskToProcessIds = new 
TimelineHashMap<>(snapshotRegistry, 0);
@@ -340,16 +340,24 @@ public class StreamsGroup implements Group {
      * @return The target assignment epoch.
      */
     public int assignmentEpoch() {
-        return targetAssignmentEpoch.get();
+        return targetAssignmentMetadata.get().assignmentEpoch();
     }
 
     /**
-     * Sets the assignment epoch.
+     * @return The time at which the target assignment calculation finished.
+     */
+    public long assignmentTimestamp() {
+        return targetAssignmentMetadata.get().assignmentTimestamp();
+    }
+
+    /**
+     * Sets the assignment metadata.
      *
      * @param targetAssignmentEpoch The new assignment epoch.
+     * @param targetAssignmentTimestamp The time at which the assignment 
calculation finished.
      */
-    public void setTargetAssignmentEpoch(int targetAssignmentEpoch) {
-        this.targetAssignmentEpoch.set(targetAssignmentEpoch);
+    public void setTargetAssignmentMetadata(int targetAssignmentEpoch, long 
targetAssignmentTimestamp) {
+        this.targetAssignmentMetadata.set(new 
TargetAssignmentMetadata(targetAssignmentEpoch, targetAssignmentTimestamp));
         maybeUpdateGroupState();
     }
 
@@ -896,11 +904,11 @@ public class StreamsGroup implements Group {
             clearShutdownRequestMemberId();
         } else if (topology().filter(t -> t.topologyEpoch() == 
validatedTopologyEpoch.get()).isEmpty()) {
             newState = NOT_READY;
-        } else if (groupEpoch.get() > targetAssignmentEpoch.get()) {
+        } else if (groupEpoch.get() > assignmentEpoch()) {
             newState = ASSIGNING;
         } else {
             for (StreamsGroupMember member : members.values()) {
-                if (!member.isReconciledTo(targetAssignmentEpoch.get())) {
+                if (!member.isReconciledTo(assignmentEpoch())) {
                     newState = RECONCILING;
                     break;
                 }
@@ -1094,7 +1102,7 @@ public class StreamsGroup implements Group {
             .setGroupId(groupId)
             .setGroupEpoch(groupEpoch.get(committedOffset))
             .setGroupState(state.get(committedOffset).toString())
-            .setAssignmentEpoch(targetAssignmentEpoch.get(committedOffset))
+            
.setAssignmentEpoch(targetAssignmentMetadata.get(committedOffset).assignmentEpoch())
             .setTopology(
                 configuredTopology.get(committedOffset)
                     .filter(ConfiguredTopology::isReady)
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java
index 809489907ba..f00f629dd08 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.coordinator.group.streams;
 
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage;
 import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
 import 
org.apache.kafka.coordinator.group.streams.assignor.AssignmentMemberSpec;
@@ -46,6 +47,11 @@ import java.util.stream.Collectors;
  */
 public class TargetAssignmentBuilder {
 
+    /**
+     * The time.
+     */
+    private Time time;
+
     /**
      * The group ID.
      */
@@ -131,6 +137,17 @@ public class TargetAssignmentBuilder {
         );
     }
 
+    /**
+     * Sets the time.
+     *
+     * @param time The time.
+     * @return This object.
+     */
+    public TargetAssignmentBuilder withTime(Time time) {
+        this.time = time;
+        return this;
+    }
+
     /**
      * Adds all the existing members.
      *
@@ -196,7 +213,6 @@ public class TargetAssignmentBuilder {
         return this;
     }
 
-
     /**
      * Adds or updates a member. This is useful when the updated member is not 
yet materialized in memory.
      *
@@ -313,7 +329,11 @@ public class TargetAssignmentBuilder {
         });
 
         // Bump the target assignment epoch.
-        
records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
 groupEpoch));
+        
records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentMetadataRecord(
+            groupId,
+            groupEpoch,
+            time.milliseconds()
+        ));
 
         return new TargetAssignmentResult(records, newTargetAssignment);
     }
diff --git 
a/group-coordinator/src/main/resources/common/message/ConsumerGroupTargetAssignmentMetadataValue.json
 
b/group-coordinator/src/main/resources/common/message/ConsumerGroupTargetAssignmentMetadataValue.json
index 939794e1e5c..d59d809112b 100644
--- 
a/group-coordinator/src/main/resources/common/message/ConsumerGroupTargetAssignmentMetadataValue.json
+++ 
b/group-coordinator/src/main/resources/common/message/ConsumerGroupTargetAssignmentMetadataValue.json
@@ -21,6 +21,9 @@
   "flexibleVersions": "0+",
   "fields": [
     { "name": "AssignmentEpoch", "versions": "0+", "type": "int32",
-      "about": "The assignment epoch." }
+      "about": "The assignment epoch." },
+    // Added in 4.3 (KIP-1263).
+    { "name": "AssignmentTimestamp", "versions": "0+", "taggedVersions": "0+", 
"tag": 0, "type": "int64", "default": 0, "ignorable": true,
+      "about": "The time at which the assignment calculation finished." }
   ]
 }
diff --git 
a/group-coordinator/src/main/resources/common/message/ShareGroupTargetAssignmentMetadataValue.json
 
b/group-coordinator/src/main/resources/common/message/ShareGroupTargetAssignmentMetadataValue.json
index 284c28faf5c..112df37fa75 100644
--- 
a/group-coordinator/src/main/resources/common/message/ShareGroupTargetAssignmentMetadataValue.json
+++ 
b/group-coordinator/src/main/resources/common/message/ShareGroupTargetAssignmentMetadataValue.json
@@ -21,6 +21,9 @@
   "flexibleVersions": "0+",
   "fields": [
     { "name": "AssignmentEpoch", "versions": "0+", "type": "int32",
-      "about": "The assignment epoch." }
+      "about": "The assignment epoch." },
+    // Added in 4.3 (KIP-1263).
+    { "name": "AssignmentTimestamp", "versions": "0+", "taggedVersions": "0+", 
"tag": 0, "type": "int64", "default": 0, "ignorable": true,
+      "about": "The time at which the assignment calculation finished." }
   ]
 }
diff --git 
a/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMetadataValue.json
 
b/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMetadataValue.json
index 86143bd5a83..491b70b3007 100644
--- 
a/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMetadataValue.json
+++ 
b/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMetadataValue.json
@@ -21,6 +21,9 @@
   "flexibleVersions": "0+",
   "fields": [
     { "name": "AssignmentEpoch", "versions": "0+", "type": "int32",
-      "about": "The assignment epoch." }
+      "about": "The assignment epoch." },
+    // Added in 4.3 (KIP-1263).
+    { "name": "AssignmentTimestamp", "versions": "0+", "taggedVersions": "0+", 
"tag": 0, "type": "int64", "default": 0, "ignorable": true,
+      "about": "The time at which the assignment calculation finished." }
   ]
 }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java
index 38ee8501ee2..2f52046a7a5 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java
@@ -79,8 +79,8 @@ import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.n
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataTombstoneRecord;
-import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupEpochRecord;
@@ -312,14 +312,16 @@ public class GroupCoordinatorRecordHelpersTest {
                 .setGroupId("group-id"),
             new ApiMessageAndVersion(
                 new ConsumerGroupTargetAssignmentMetadataValue()
-                    .setAssignmentEpoch(10),
+                    .setAssignmentEpoch(10)
+                    .setAssignmentTimestamp(12345L),
                 (short) 0
             )
         );
 
-        assertEquals(expectedRecord, 
newConsumerGroupTargetAssignmentEpochRecord(
+        assertEquals(expectedRecord, 
newConsumerGroupTargetAssignmentMetadataRecord(
             "group-id",
-            10
+            10,
+            12345L
         ));
     }
 
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 91147aed051..a4e7b168c9a 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -429,7 +429,7 @@ public class GroupMetadataManagerTest {
             mkTopicAssignment(fooTopicId, 1, 2, 3)
         )));
 
-        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 100));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
 100, 12345L));
 
         
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 member));
 
@@ -519,7 +519,7 @@ public class GroupMetadataManagerTest {
         
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
 memberId, mkAssignment(
             mkTopicAssignment(fooTopicId, 0, 1, 2)
         )));
-        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 100));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
 100, 12345L));
         
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 member));
 
         assertEquals(MemberState.STABLE, 
context.consumerGroupMemberState(groupId, memberId));
@@ -593,7 +593,7 @@ public class GroupMetadataManagerTest {
         
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
 memberId, mkAssignment(
             mkTopicAssignment(fooTopicId, 0, 1)
         )));
-        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 101));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
 101, 12345L));
         
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 member));
 
         assertEquals(MemberState.UNREVOKED_PARTITIONS, 
context.consumerGroupMemberState(groupId, memberId));
@@ -663,7 +663,7 @@ public class GroupMetadataManagerTest {
         
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
 memberId, mkAssignment(
             mkTopicAssignment(fooTopicId, 0, 1, 2)
         )));
-        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 100));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
 100, 12345L));
         
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 member));
 
         assertEquals(MemberState.UNRELEASED_PARTITIONS, 
context.consumerGroupMemberState(groupId, memberId));
@@ -734,7 +734,7 @@ public class GroupMetadataManagerTest {
         
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
 memberId, mkAssignment(
             mkTopicAssignment(fooTopicId, 0, 1, 2)
         )));
-        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 100));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
 100, 12345L));
         
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 member));
 
         assertEquals(MemberState.STABLE, 
context.consumerGroupMemberState(groupId, memberId));
@@ -821,7 +821,7 @@ public class GroupMetadataManagerTest {
         
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
 memberId, mkAssignment(
             mkTopicAssignment(fooTopicId, 0, 1)
         )));
-        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 101));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
 101, 12345L));
         
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 member));
 
         assertEquals(MemberState.UNREVOKED_PARTITIONS, 
context.consumerGroupMemberState(groupId, memberId));
@@ -926,7 +926,7 @@ public class GroupMetadataManagerTest {
             mkTopicAssignment(fooTopicId, 0, 1, 2)
         )));
         
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
 memberId2, mkAssignment()));
-        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 100));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
 100, 12345L));
         
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 member1));
         
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 member2));
 
@@ -1016,7 +1016,7 @@ public class GroupMetadataManagerTest {
         
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
 memberId, mkAssignment(
             mkTopicAssignment(fooTopicId, 0, 1)
         )));
-        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 101));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
 101, 12345L));
         
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 member));
 
         assertEquals(MemberState.UNREVOKED_PARTITIONS, 
context.consumerGroupMemberState(groupId, memberId));
@@ -1101,7 +1101,7 @@ public class GroupMetadataManagerTest {
         
context.replay(GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentRecord(groupId,
 memberId, mkAssignment(
             mkTopicAssignment(fooTopicId, 0, 1, 2)
         )));
-        
context.replay(GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentEpochRecord(groupId,
 100));
+        
context.replay(GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentMetadataRecord(groupId,
 100, 12345L));
         
context.replay(GroupCoordinatorRecordHelpers.newShareGroupCurrentAssignmentRecord(groupId,
 member));
 
         // Member rejoins with epoch=0 - should succeed.
@@ -1170,7 +1170,7 @@ public class GroupMetadataManagerTest {
             TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
                 TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2)
             )));
-        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
 100));
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentMetadataRecord(groupId,
 100, 12345L));
         
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
 member));
 
         // Member rejoins with epoch=0 - should succeed per KIP-848.
@@ -1286,7 +1286,7 @@ public class GroupMetadataManagerTest {
                 mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5),
                 mkTopicAssignment(barTopicId, 0, 1, 2)
             )),
-            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 1),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
 1, context.time.milliseconds()),
             
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, 
expectedMember)
         );
 
@@ -1365,7 +1365,7 @@ public class GroupMetadataManagerTest {
             
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
memberId, mkAssignment(
                 mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
             )),
-            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 1),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
 1, context.time.milliseconds()),
             
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, 
expectedMember)
         );
 
@@ -1492,7 +1492,7 @@ public class GroupMetadataManagerTest {
                 mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5),
                 mkTopicAssignment(barTopicId, 0, 1, 2)
             )),
-            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 11),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
 11, context.time.milliseconds()),
             
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, 
expectedMember)
         );
 
@@ -1584,7 +1584,7 @@ public class GroupMetadataManagerTest {
             GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 
11, computeGroupHash(Map.of(
                 fooTopicName, computeTopicHash(fooTopicName, newMetadataImage)
             ))),
-            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 11),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
 11, context.time.milliseconds()),
             
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, 
expectedMember)
         );
 
@@ -1674,7 +1674,7 @@ public class GroupMetadataManagerTest {
         List<CoordinatorRecord> expectedRecords = List.of(
             GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 
11, 0),
             
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
memberId, Map.of()),
-            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 11),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
 11, context.time.milliseconds()),
             
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, 
expectedMember)
         );
 
@@ -1764,7 +1764,7 @@ public class GroupMetadataManagerTest {
                 fooTopicName, computeTopicHash(fooTopicName, new 
KRaftCoordinatorMetadataImage(metadataImage))
             ))),
             
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataTombstoneRecord(groupId),
-            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 11),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
 11, context.time.milliseconds()),
             
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, 
expectedMember)
         );
 
@@ -1899,7 +1899,7 @@ public class GroupMetadataManagerTest {
                         mkTopicAssignment(barTopicId, 2)
                     ))
                 ),
-                
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 11)),
+                
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
 11, context.time.milliseconds())),
                 
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 expectedMember3))
             ),
             result.records()
@@ -2139,7 +2139,7 @@ public class GroupMetadataManagerTest {
                         mkTopicAssignment(barTopicId, 2)
                     ))
                 ),
-                
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 11)),
+                
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
 11, context.time.milliseconds())),
                 
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 expectedMember3))
             ),
             result.records()
@@ -2504,7 +2504,7 @@ public class GroupMetadataManagerTest {
                 mkTopicAssignment(fooTopicId, 3, 4, 5),
                 mkTopicAssignment(barTopicId, 0, 1, 2)
             )),
-            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 11),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
 11, context.time.milliseconds()),
             
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, 
expectedRejoinedMember)
         );
 
@@ -2909,7 +2909,7 @@ public class GroupMetadataManagerTest {
             mkTopicAssignment(fooTopicId, 1, 2, 3)
         )));
 
-        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 100));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
 100, 12345L));
 
         
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 member));
 
@@ -3726,7 +3726,7 @@ public class GroupMetadataManagerTest {
 
         
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
 memberId1, mkAssignment(
             mkTopicAssignment(fooTopicId, 1, 2, 3))));
-        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 11));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
 11, 12345L));
 
         assertEquals(ConsumerGroup.ConsumerGroupState.RECONCILING, 
context.consumerGroupState(groupId));
 
@@ -3885,7 +3885,7 @@ public class GroupMetadataManagerTest {
             
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
memberId, mkAssignment(
                 mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
             )),
-            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 11),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
 11, context.time.milliseconds()),
             
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, 
expectedMember)
         );
 
@@ -4013,7 +4013,7 @@ public class GroupMetadataManagerTest {
             
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
memberId, mkAssignment(
                 mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
             )),
-            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 11),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
 11, context.time.milliseconds()),
             
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, 
expectedMember)
         );
 
@@ -11422,7 +11422,7 @@ public class GroupMetadataManagerTest {
                 
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(classicGroupId,
 expectedMember),
                 
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(classicGroupId, 1, 0),
                 
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(classicGroupId,
 memberId, Map.of()),
-                
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(classicGroupId,
 1),
+                
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(classicGroupId,
 1, context.time.milliseconds()),
                 
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(classicGroupId,
 expectedMember)
             ),
             result.records()
@@ -11589,7 +11589,7 @@ public class GroupMetadataManagerTest {
                 barTopicName, computeTopicHash(barTopicName, metadataImage)
             ))),
             
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
memberId1, expectedMember1.assignedPartitions()),
-            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 0),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
 0, 0),
             
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, 
expectedMember1),
 
             // Member 2 joins the new consumer group.
@@ -11602,7 +11602,7 @@ public class GroupMetadataManagerTest {
             ))),
             
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
memberId2, assignor.targetPartitions(memberId2)),
             
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
memberId1, assignor.targetPartitions(memberId1)),
-            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 1),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
 1, context.time.milliseconds()),
 
             // Member 2 has no pending revoking partition. Bump its member 
epoch and transition to UNRELEASED_PARTITIONS.
             
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, 
expectedMember2)
@@ -11806,7 +11806,7 @@ public class GroupMetadataManagerTest {
             
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
memberId1, expectedMember1.assignedPartitions()),
             
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
memberId2, expectedMember2.assignedPartitions()),
 
-            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 0),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
 0, 0),
 
             
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, 
expectedMember1),
             
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, 
expectedMember2),
@@ -11821,7 +11821,7 @@ public class GroupMetadataManagerTest {
             ))),
             
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
memberId1, assignor.targetPartitions(memberId1)),
             
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
memberId3, assignor.targetPartitions(memberId3)),
-            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 1),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
 1, context.time.milliseconds()),
 
             // Member 3 has no pending revoking partition. Bump its member 
epoch and transition to UNRELEASED_PARTITIONS.
             
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, 
expectedMember3)
@@ -12079,7 +12079,7 @@ public class GroupMetadataManagerTest {
                 fooTopicName, computeTopicHash(fooTopicName, metadataImage)
             ))),
             
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
memberId, expectedClassicMember.assignedPartitions()),
-            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 group.generationId()),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
 group.generationId(), 0),
             
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, 
expectedClassicMember),
 
             // Remove the static member because the rejoining member replaces 
it.
@@ -12205,7 +12205,7 @@ public class GroupMetadataManagerTest {
                 barTopicName, computeTopicHash(barTopicName, metadataImage)
             ))),
             
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
memberId1, expectedMember1.assignedPartitions()),
-            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 1),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
 1, 0),
             
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, 
expectedMember1),
 
             // Member 2 joins the new consumer group.
@@ -12217,7 +12217,7 @@ public class GroupMetadataManagerTest {
                 barTopicName, computeTopicHash(barTopicName, metadataImage)
             ))),
             
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
memberId2, Map.of()),
-            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 2),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
 2, context.time.milliseconds()),
 
             // Member 2 has no pending revoking partition or pending release 
partition.
             // Bump its member epoch and transition to STABLE.
@@ -12591,7 +12591,7 @@ public class GroupMetadataManagerTest {
             
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
memberId1, expectedMember1.assignedPartitions()),
             
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
memberId2, expectedMember2.assignedPartitions()),
 
-            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 1),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
 1, 0),
 
             
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, 
expectedMember1),
             
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, 
expectedMember2),
@@ -12606,7 +12606,7 @@ public class GroupMetadataManagerTest {
             ))),
             
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
memberId1, assignor.targetPartitions(memberId1)),
             
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
memberId3, assignor.targetPartitions(memberId3)),
-            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 2),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
 2, context.time.milliseconds()),
 
             // Member 3 has no pending revoking partition. Bump its member 
epoch and transition to UNRELEASED_PARTITIONS.
             
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, 
expectedMember3)
@@ -13723,7 +13723,7 @@ public class GroupMetadataManagerTest {
                         
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
memberId, assignor.targetPartitions(memberId)),
                         
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
newMemberId, assignor.targetPartitions(newMemberId))
                     ),
-                    
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 11)),
+                    
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
 11, context.time.milliseconds())),
 
                     
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 expectedMember))
                 ),
@@ -13883,7 +13883,7 @@ public class GroupMetadataManagerTest {
             ))),
 
             
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
newMemberId, Map.of()),
-            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 11),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
 11, context.time.milliseconds()),
 
             
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, 
expectedMember)
         );
@@ -14150,7 +14150,7 @@ public class GroupMetadataManagerTest {
                         mkTopicAssignment(barTopicId, 0),
                         mkTopicAssignment(fooTopicId, 1)))
                 ),
-                
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 11)),
+                
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
 11, context.time.milliseconds())),
                 
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 expectedMember))
             ),
             joinResult.records
@@ -14385,7 +14385,7 @@ public class GroupMetadataManagerTest {
                     
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
memberId2, mkAssignment(
                         mkTopicAssignment(barTopicId, 0)))
                 ),
-                
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 11)),
+                
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
 11, context.time.milliseconds())),
 
                 
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 expectedMember1))
             ),
@@ -14624,7 +14624,7 @@ public class GroupMetadataManagerTest {
                     
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
memberId2, mkAssignment(
                         mkTopicAssignment(barTopicId, 0)))
                 ),
-                
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 11)),
+                
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
 11, context.time.milliseconds())),
 
                 
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 expectedMember1))
             ),
@@ -16467,7 +16467,7 @@ public class GroupMetadataManagerTest {
                 mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5),
                 mkTopicAssignment(barTopicId, 0, 1, 2)
             )),
-            
GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentEpochRecord(groupId, 
1),
+            
GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentMetadataRecord(groupId,
 1, context.time.milliseconds()),
             
GroupCoordinatorRecordHelpers.newShareGroupCurrentAssignmentRecord(groupId, 
expectedMember),
             
GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(groupId,
 mkShareGroupStateMap(List.of(
                     mkShareGroupStateMetadataEntry(fooTopicId, fooTopicName, 
List.of(0, 1, 2, 3, 4, 5)),
@@ -16704,7 +16704,7 @@ public class GroupMetadataManagerTest {
 
         
context.replay(GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentRecord(groupId,
 memberId1, mkAssignment(
             mkTopicAssignment(fooTopicId, 1, 2, 3))));
-        
context.replay(GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentEpochRecord(groupId,
 11));
+        
context.replay(GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentMetadataRecord(groupId,
 11, 12345L));
 
         assertEquals(ShareGroup.ShareGroupState.STABLE, 
context.shareGroupState(groupId));
 
@@ -16826,7 +16826,7 @@ public class GroupMetadataManagerTest {
                 TaskAssignmentTestUtil.mkTasks(subtopology1, 1, 2, 3)
             )));
 
-        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
 100));
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentMetadataRecord(groupId,
 100, 12345L));
 
         
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
 member));
 
@@ -17074,7 +17074,7 @@ public class GroupMetadataManagerTest {
                     TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 
4, 5),
                     TaskAssignmentTestUtil.mkTasks(subtopology2, 0, 1, 2)
                 )),
-            
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
 2),
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentMetadataRecord(groupId,
 2, context.time.milliseconds()),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, 
expectedMember)
         );
 
@@ -17247,7 +17247,7 @@ public class GroupMetadataManagerTest {
                 fooTopicName, computeTopicHash(fooTopicName, metadataImage)
             )), -1, Map.of("num.standby.replicas", "0")),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, 
memberId, TasksTuple.EMPTY),
-            
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
 2),
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentMetadataRecord(groupId,
 2, context.time.milliseconds()),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, 
expectedMember)
         );
 
@@ -17333,7 +17333,7 @@ public class GroupMetadataManagerTest {
                 fooTopicName, computeTopicHash(fooTopicName, metadataImage)
             )), -1, Map.of("num.standby.replicas", "0")),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, 
memberId, TasksTuple.EMPTY),
-            
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
 2),
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentMetadataRecord(groupId,
 2, context.time.milliseconds()),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, 
expectedMember)
         );
 
@@ -17417,7 +17417,7 @@ public class GroupMetadataManagerTest {
                 barTopicName, computeTopicHash(barTopicName, metadataImage)
             )), -1, Map.of("num.standby.replicas", "0")),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, 
memberId, TasksTuple.EMPTY),
-            
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
 2),
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentMetadataRecord(groupId,
 2, context.time.milliseconds()),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, 
expectedMember)
         );
 
@@ -17514,7 +17514,7 @@ public class GroupMetadataManagerTest {
                 barTopicName, computeTopicHash(barTopicName, metadataImage)
             )), 1, Map.of("num.standby.replicas", "0")),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, 
memberId, TasksTuple.EMPTY),
-            
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
 11),
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentMetadataRecord(groupId,
 11, context.time.milliseconds()),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, 
expectedMember)
         );
 
@@ -17817,7 +17817,7 @@ public class GroupMetadataManagerTest {
                     TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 
4, 5),
                     TaskAssignmentTestUtil.mkTasks(subtopology2, 0, 1, 2)
                 )),
-            
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
 11),
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentMetadataRecord(groupId,
 11, context.time.milliseconds()),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, 
expectedMember)
         );
 
@@ -17940,7 +17940,7 @@ public class GroupMetadataManagerTest {
                     TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 
4, 5),
                     TaskAssignmentTestUtil.mkTasks(subtopology2, 0, 1, 2)
                 )),
-            
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
 11),
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentMetadataRecord(groupId,
 11, context.time.milliseconds()),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, 
expectedMember)
         );
 
@@ -18913,7 +18913,7 @@ public class GroupMetadataManagerTest {
         
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId,
 memberId1,
             TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
                 TaskAssignmentTestUtil.mkTasks(subtopology1, 1, 2, 3))));
-        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
 11));
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentMetadataRecord(groupId,
 11, 12345L));
 
         assertEquals(StreamsGroup.StreamsGroupState.RECONCILING, 
context.streamsGroupState(groupId));
 
@@ -19083,7 +19083,7 @@ public class GroupMetadataManagerTest {
                 TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
                     TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 
4, 5)
                 )),
-            
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
 11),
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentMetadataRecord(groupId,
 11, context.time.milliseconds()),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, 
expectedMember)
         );
 
@@ -19205,7 +19205,7 @@ public class GroupMetadataManagerTest {
                 TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
                     TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 
4, 5)
                 )),
-            
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
 11),
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentMetadataRecord(groupId,
 11, context.time.milliseconds()),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, 
expectedMember)
         );
 
@@ -20127,7 +20127,7 @@ public class GroupMetadataManagerTest {
                 
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(classicGroupId, 
topology),
                 
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(classicGroupId, 
2, 0, -1, Map.of("num.standby.replicas", "0")),
                 
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(classicGroupId,
 memberId, TasksTuple.EMPTY),
-                
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(classicGroupId,
 2),
+                
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentMetadataRecord(classicGroupId,
 2, context.time.milliseconds()),
                 
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(classicGroupId,
 expectedMember)
             ),
             result.records()
@@ -20631,8 +20631,9 @@ public class GroupMetadataManagerTest {
             .build();
 
         // The group is created if it does not exist.
-        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord("foo",
 10));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord("foo",
 10, 12345L));
         assertEquals(10, 
context.groupMetadataManager.consumerGroup("foo").assignmentEpoch());
+        assertEquals(12345L, 
context.groupMetadataManager.consumerGroup("foo").assignmentTimestamp());
     }
 
     @Test
@@ -20646,6 +20647,22 @@ public class GroupMetadataManagerTest {
         assertThrows(GroupIdNotFoundException.class, () -> 
context.groupMetadataManager.consumerGroup("foo"));
     }
 
+    @Test
+    public void 
testReplayConsumerGroupTargetAssignmentMetadataTombstoneExisting() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        // Create the group by replaying a value record.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord("foo",
 10, 12345L));
+        assertEquals(10, 
context.groupMetadataManager.consumerGroup("foo").assignmentEpoch());
+        assertEquals(12345L, 
context.groupMetadataManager.consumerGroup("foo").assignmentTimestamp());
+
+        // Replay the tombstone. It should reset both the epoch and the 
timestamp.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord("foo"));
+        assertEquals(-1, 
context.groupMetadataManager.consumerGroup("foo").assignmentEpoch());
+        assertEquals(0L, 
context.groupMetadataManager.consumerGroup("foo").assignmentTimestamp());
+    }
+
     @Test
     public void testReplayConsumerGroupCurrentMemberAssignment() {
         GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
@@ -20681,6 +20698,44 @@ public class GroupMetadataManagerTest {
         assertThrows(GroupIdNotFoundException.class, () -> 
context.groupMetadataManager.consumerGroup("bar"));
     }
 
+    @Test
+    public void testReplayShareGroupTargetAssignmentMetadata() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        // The group is created if it does not exist.
+        
context.replay(GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentMetadataRecord("foo",
 10, 12345L));
+        assertEquals(10, 
context.groupMetadataManager.shareGroup("foo").assignmentEpoch());
+        assertEquals(12345L, 
context.groupMetadataManager.shareGroup("foo").assignmentTimestamp());
+    }
+
+    @Test
+    public void testReplayShareGroupTargetAssignmentMetadataTombstone() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        // The group may not exist at all. Replaying the 
ShareGroupTargetAssignmentMetadata tombstone
+        // should be a no-op.
+        
context.replay(GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentEpochTombstoneRecord("foo"));
+        assertThrows(GroupIdNotFoundException.class, () -> 
context.groupMetadataManager.shareGroup("foo"));
+    }
+
+    @Test
+    public void 
testReplayShareGroupTargetAssignmentMetadataTombstoneExisting() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        // Create the group by replaying a value record.
+        
context.replay(GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentMetadataRecord("foo",
 10, 12345L));
+        assertEquals(10, 
context.groupMetadataManager.shareGroup("foo").assignmentEpoch());
+        assertEquals(12345L, 
context.groupMetadataManager.shareGroup("foo").assignmentTimestamp());
+
+        // Replay the tombstone. It should reset both the epoch and the 
timestamp.
+        
context.replay(GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentEpochTombstoneRecord("foo"));
+        assertEquals(-1, 
context.groupMetadataManager.shareGroup("foo").assignmentEpoch());
+        assertEquals(0L, 
context.groupMetadataManager.shareGroup("foo").assignmentTimestamp());
+    }
+
     @Test
     public void testReplayStreamsGroupMemberMetadata() {
         GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
@@ -20907,8 +20962,9 @@ public class GroupMetadataManagerTest {
             .build();
 
         // The group is created if it does not exist.
-        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord("foo",
 10));
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentMetadataRecord("foo",
 10, 12345L));
         assertEquals(10, 
context.groupMetadataManager.streamsGroup("foo").assignmentEpoch());
+        assertEquals(12345L, 
context.groupMetadataManager.streamsGroup("foo").assignmentTimestamp());
     }
 
     @Test
@@ -20938,6 +20994,7 @@ public class GroupMetadataManagerTest {
             .withStreamsGroup(
                 new StreamsGroupBuilder("foo", 10)
                     .withTargetAssignmentEpoch(10)
+                    .withTargetAssignmentTimestamp(12345L)
                     .withTargetAssignment("m1", tasks)
             )
             .build();
@@ -20954,6 +21011,7 @@ public class GroupMetadataManagerTest {
         
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochTombstoneRecord("foo"));
 
         assertEquals(-1, 
context.groupMetadataManager.streamsGroup("foo").assignmentEpoch());
+        assertEquals(0L, 
context.groupMetadataManager.streamsGroup("foo").assignmentTimestamp());
     }
 
     @Test
@@ -22083,7 +22141,7 @@ public class GroupMetadataManagerTest {
                 mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5),
                 mkTopicAssignment(barTopicId, 0, 1, 2)
             )),
-            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 11),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
 11, context.time.milliseconds()),
             
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, 
expectedMember2)
         );
 
@@ -22151,7 +22209,7 @@ public class GroupMetadataManagerTest {
             GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 
1, 0),
             // The target assignment is created.
             
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
memberId1, Map.of()),
-            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 1),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
 1, context.time.milliseconds()),
             // The member current state is created.
             
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, 
expectedMember1)
         );
@@ -22473,7 +22531,7 @@ public class GroupMetadataManagerTest {
             
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId,
 "foo*"),
             GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 
11, 0),
             
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
memberId1, Map.of()),
-            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 11),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
 11, context.time.milliseconds()),
             
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, 
expectedMember1)
         );
 
@@ -22622,7 +22680,7 @@ public class GroupMetadataManagerTest {
                 
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
expectedMember1.memberId(), Map.of()),
                 
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
memberId2, Map.of())
             ),
-            
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 11)),
+            
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
 11, context.time.milliseconds())),
             // The member assignment is updated.
             
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 expectedMember1))
         );
@@ -22766,7 +22824,7 @@ public class GroupMetadataManagerTest {
                 
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
expectedMember1.memberId(), Map.of()),
                 
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
memberId2, Map.of())
             ),
-            
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 11)),
+            
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
 11, context.time.milliseconds())),
             // The member assignment is updated.
             
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 expectedMember1))
         );
@@ -22971,7 +23029,7 @@ public class GroupMetadataManagerTest {
             List.of(
                 
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, 
expectedMember2),
                 
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId,
 "foo*|bar*"),
-                
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 11),
+                
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
 11, context.time.milliseconds()),
                 
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, 
expectedMember2)
             ),
             result2.records()
@@ -23196,7 +23254,7 @@ public class GroupMetadataManagerTest {
 
         assertRecordsEquals(
             List.of(
-                
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 11),
+                
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
 11, context.time.milliseconds()),
                 
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, 
expectedMember2)
             ),
             result2.records()
@@ -23430,7 +23488,7 @@ public class GroupMetadataManagerTest {
             
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
memberId2, mkAssignment(
                 mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
             )),
-            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 11),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
 11, context.time.milliseconds()),
             
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, 
expectedMember2)
         );
 
@@ -24895,7 +24953,7 @@ public class GroupMetadataManagerTest {
             new InitMapValue(
                 topicName,
                 Set.of(0, 1),
-                0
+                context.time.milliseconds()
             )
         );
 
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
index cbaa5770541..3edeb452d05 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
@@ -457,7 +457,7 @@ public class GroupMetadataManagerTestContext {
     }
 
     public static class Builder {
-        private MockTime time = new MockTime(0, 0, 0);
+        private MockTime time = new MockTime(0, 1000, 1000);
         private final MockCoordinatorTimer<CoordinatorRecord> timer = new 
MockCoordinatorTimer<>(time);
         private final MockCoordinatorExecutor<CoordinatorRecord> executor = 
new MockCoordinatorExecutor<>();
         private final LogContext logContext = new LogContext();
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java
index 445ba969648..3b1ae79bed6 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java
@@ -1385,7 +1385,7 @@ public class ClassicGroupTest {
             groupId
         );
         consumerGroup.setGroupEpoch(10);
-        consumerGroup.setTargetAssignmentEpoch(10);
+        consumerGroup.setTargetAssignmentMetadata(10, time.milliseconds());
 
         consumerGroup.updateTargetAssignment(memberId1, new 
Assignment(mkAssignment(
             mkTopicAssignment(fooTopicId, 0)
@@ -1538,7 +1538,7 @@ public class ClassicGroupTest {
             groupId
         );
         consumerGroup.setGroupEpoch(10);
-        consumerGroup.setTargetAssignmentEpoch(10);
+        consumerGroup.setTargetAssignmentMetadata(10, time.milliseconds());
         consumerGroup.updateTargetAssignment(memberId1, new 
Assignment(mkAssignment(
             mkTopicAssignment(fooTopicId, 0)
         )));
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilderTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilderTest.java
index 7c9de06c014..902864652dc 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilderTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilderTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.coordinator.group.modern;
 
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage;
 import 
org.apache.kafka.coordinator.common.runtime.KRaftCoordinatorMetadataImage;
 import org.apache.kafka.coordinator.common.runtime.MetadataImageBuilder;
@@ -41,7 +42,7 @@ import static 
org.apache.kafka.coordinator.group.Assertions.assertRecordsEquals;
 import static 
org.apache.kafka.coordinator.group.Assertions.assertUnorderedRecordsEquals;
 import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
 import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
-import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord;
 import static 
org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -56,6 +57,7 @@ public class TargetAssignmentBuilderTest {
     public static class TargetAssignmentBuilderTestContext {
         private final String groupId;
         private final int groupEpoch;
+        private final long assignmentTimestamp;
         private final PartitionAssignor assignor = 
mock(PartitionAssignor.class);
         private final Map<String, ConsumerGroupMember> members = new 
HashMap<>();
         private final Map<String, ConsumerGroupMember> updatedMembers = new 
HashMap<>();
@@ -67,10 +69,12 @@ public class TargetAssignmentBuilderTest {
 
         public TargetAssignmentBuilderTestContext(
             String groupId,
-            int groupEpoch
+            int groupEpoch,
+            long assignmentTimestamp
         ) {
             this.groupId = groupId;
             this.groupEpoch = groupEpoch;
+            this.assignmentTimestamp = assignmentTimestamp;
         }
 
         public void addGroupMember(
@@ -274,6 +278,7 @@ public class TargetAssignmentBuilderTest {
             // Create and populate the assignment builder.
             TargetAssignmentBuilder.ConsumerTargetAssignmentBuilder builder =
                 new 
TargetAssignmentBuilder.ConsumerTargetAssignmentBuilder(groupId, groupEpoch, 
assignor)
+                    .withTime(new MockTime(0, assignmentTimestamp, 
assignmentTimestamp))
                     .withMembers(members)
                     .withStaticMembers(staticMembers)
                     .withSubscriptionType(subscriptionType)
@@ -307,13 +312,15 @@ public class TargetAssignmentBuilderTest {
     public void testEmpty() {
         TargetAssignmentBuilderTestContext context = new 
TargetAssignmentBuilderTestContext(
             "my-group",
-            20
+            20,
+            12345L
         );
 
         TargetAssignmentBuilder.TargetAssignmentResult result = 
context.build();
-        assertEquals(List.of(newConsumerGroupTargetAssignmentEpochRecord(
+        assertEquals(List.of(newConsumerGroupTargetAssignmentMetadataRecord(
             "my-group",
-            20
+            20,
+            12345L
         )), result.records());
         assertEquals(Map.of(), result.targetAssignment());
     }
@@ -322,7 +329,8 @@ public class TargetAssignmentBuilderTest {
     public void testAssignmentHasNotChanged() {
         TargetAssignmentBuilderTestContext context = new 
TargetAssignmentBuilderTestContext(
             "my-group",
-            20
+            20,
+            12345L
         );
 
         Uuid fooTopicId = context.addTopicMetadata("foo", 6);
@@ -350,9 +358,10 @@ public class TargetAssignmentBuilderTest {
 
         TargetAssignmentBuilder.TargetAssignmentResult result = 
context.build();
 
-        assertEquals(List.of(newConsumerGroupTargetAssignmentEpochRecord(
+        assertEquals(List.of(newConsumerGroupTargetAssignmentMetadataRecord(
             "my-group",
-            20
+            20,
+            12345L
         )), result.records());
 
         Map<String, MemberAssignment> expectedAssignment = new HashMap<>();
@@ -372,7 +381,8 @@ public class TargetAssignmentBuilderTest {
     public void testAssignmentSwapped() {
         TargetAssignmentBuilderTestContext context = new 
TargetAssignmentBuilderTestContext(
             "my-group",
-            20
+            20,
+            12345L
         );
 
         Uuid fooTopicId = context.addTopicMetadata("foo", 6);
@@ -413,9 +423,10 @@ public class TargetAssignmentBuilderTest {
                     ))
                 ),
                 List.of(
-                    newConsumerGroupTargetAssignmentEpochRecord(
+                    newConsumerGroupTargetAssignmentMetadataRecord(
                         "my-group",
-                        20
+                        20,
+                        12345L
                     )
                 )
             ),
@@ -439,7 +450,8 @@ public class TargetAssignmentBuilderTest {
     public void testNewMember() {
         TargetAssignmentBuilderTestContext context = new 
TargetAssignmentBuilderTestContext(
             "my-group",
-            20
+            20,
+            12345L
         );
 
         Uuid fooTopicId = context.addTopicMetadata("foo", 6);
@@ -491,9 +503,10 @@ public class TargetAssignmentBuilderTest {
                     ))
                 ),
                 List.of(
-                    newConsumerGroupTargetAssignmentEpochRecord(
+                    newConsumerGroupTargetAssignmentMetadataRecord(
                         "my-group",
-                        20
+                        20,
+                        12345L
                     )
                 )
             ),
@@ -521,7 +534,8 @@ public class TargetAssignmentBuilderTest {
     public void testUpdateMember() {
         TargetAssignmentBuilderTestContext context = new 
TargetAssignmentBuilderTestContext(
             "my-group",
-            20
+            20,
+            12345L
         );
 
         Uuid fooTopicId = context.addTopicMetadata("foo", 6);
@@ -582,9 +596,10 @@ public class TargetAssignmentBuilderTest {
                     ))
                 ),
                 List.of(
-                    newConsumerGroupTargetAssignmentEpochRecord(
+                    newConsumerGroupTargetAssignmentMetadataRecord(
                         "my-group",
-                        20
+                        20,
+                        12345L
                     )
                 )
             ),
@@ -612,7 +627,8 @@ public class TargetAssignmentBuilderTest {
     public void testPartialAssignmentUpdate() {
         TargetAssignmentBuilderTestContext context = new 
TargetAssignmentBuilderTestContext(
             "my-group",
-            20
+            20,
+            12345L
         );
 
         Uuid fooTopicId = context.addTopicMetadata("foo", 6);
@@ -664,9 +680,10 @@ public class TargetAssignmentBuilderTest {
                     ))
                 ),
                 List.of(
-                    newConsumerGroupTargetAssignmentEpochRecord(
+                    newConsumerGroupTargetAssignmentMetadataRecord(
                         "my-group",
-                        20
+                        20,
+                        12345L
                     )
                 )
             ),
@@ -694,7 +711,8 @@ public class TargetAssignmentBuilderTest {
     public void testDeleteMember() {
         TargetAssignmentBuilderTestContext context = new 
TargetAssignmentBuilderTestContext(
             "my-group",
-            20
+            20,
+            12345L
         );
 
         Uuid fooTopicId = context.addTopicMetadata("foo", 6);
@@ -742,9 +760,10 @@ public class TargetAssignmentBuilderTest {
                     ))
                 ),
                 List.of(
-                    newConsumerGroupTargetAssignmentEpochRecord(
+                    newConsumerGroupTargetAssignmentMetadataRecord(
                         "my-group",
-                        20
+                        20,
+                        12345L
                     )
                 )
             ),
@@ -768,7 +787,8 @@ public class TargetAssignmentBuilderTest {
     public void testReplaceStaticMember() {
         TargetAssignmentBuilderTestContext context = new 
TargetAssignmentBuilderTestContext(
             "my-group",
-            20
+            20,
+            12345L
         );
 
         Uuid fooTopicId = context.addTopicMetadata("foo", 6);
@@ -818,9 +838,10 @@ public class TargetAssignmentBuilderTest {
                     mkTopicAssignment(fooTopicId, 5, 6),
                     mkTopicAssignment(barTopicId, 5, 6)
                 )),
-                newConsumerGroupTargetAssignmentEpochRecord(
+                newConsumerGroupTargetAssignmentMetadataRecord(
                     "my-group",
-                    20
+                    20,
+                    12345L
                 )
             ),
             result.records()
@@ -848,7 +869,8 @@ public class TargetAssignmentBuilderTest {
     public void testRegularExpressions() {
         TargetAssignmentBuilderTestContext context = new 
TargetAssignmentBuilderTestContext(
             "my-group",
-            20
+            20,
+            12345L
         );
 
         Uuid fooTopicId = context.addTopicMetadata("foo", 6);
@@ -898,9 +920,10 @@ public class TargetAssignmentBuilderTest {
                     ))
                 ),
                 List.of(
-                    newConsumerGroupTargetAssignmentEpochRecord(
+                    newConsumerGroupTargetAssignmentMetadataRecord(
                         "my-group",
-                        20
+                        20,
+                        12345L
                     )
                 )
             ),
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupBuilder.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupBuilder.java
index e086f283aac..40d3300b093 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupBuilder.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupBuilder.java
@@ -31,6 +31,7 @@ public class ConsumerGroupBuilder {
     private final String groupId;
     private final int groupEpoch;
     private int assignmentEpoch;
+    private long assignmentTimestamp;
     private final Map<String, ConsumerGroupMember> members = new HashMap<>();
     private final Map<String, Assignment> assignments = new HashMap<>();
     private long metadataHash = 0L;
@@ -40,6 +41,7 @@ public class ConsumerGroupBuilder {
         this.groupId = groupId;
         this.groupEpoch = groupEpoch;
         this.assignmentEpoch = 0;
+        this.assignmentTimestamp = 0L;
     }
 
     public ConsumerGroupBuilder withMember(ConsumerGroupMember member) {
@@ -70,6 +72,11 @@ public class ConsumerGroupBuilder {
         return this;
     }
 
+    public ConsumerGroupBuilder withAssignmentTimestamp(long 
assignmentTimestamp) {
+        this.assignmentTimestamp = assignmentTimestamp;
+        return this;
+    }
+
     public List<CoordinatorRecord> build() {
         List<CoordinatorRecord> records = new ArrayList<>();
 
@@ -92,7 +99,7 @@ public class ConsumerGroupBuilder {
         );
 
         // Add target assignment epoch.
-        
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 assignmentEpoch));
+        
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
 assignmentEpoch, assignmentTimestamp));
 
         // Add current assignment records for members.
         members.forEach((memberId, member) ->
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
index c8ada8fc5f4..a45eaa62991 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
@@ -475,7 +475,7 @@ public class ConsumerGroupTest {
         assertEquals(MemberState.STABLE, member2.state());
         assertEquals(ConsumerGroup.ConsumerGroupState.ASSIGNING, 
consumerGroup.state());
 
-        consumerGroup.setTargetAssignmentEpoch(2);
+        consumerGroup.setTargetAssignmentMetadata(2, 12345L);
 
         assertEquals(ConsumerGroup.ConsumerGroupState.RECONCILING, 
consumerGroup.state());
 
@@ -1000,7 +1000,7 @@ public class ConsumerGroupTest {
         assertEquals(ConsumerGroup.ConsumerGroupState.ASSIGNING, 
consumerGroup.state());
         assertThrows(GroupNotEmptyException.class, 
consumerGroup::validateDeleteGroup);
 
-        consumerGroup.setTargetAssignmentEpoch(1);
+        consumerGroup.setTargetAssignmentMetadata(1, 12345L);
 
         assertEquals(ConsumerGroup.ConsumerGroupState.STABLE, 
consumerGroup.state());
         assertThrows(GroupNotEmptyException.class, 
consumerGroup::validateDeleteGroup);
@@ -1329,7 +1329,7 @@ public class ConsumerGroupTest {
             groupId
         );
         expectedConsumerGroup.setGroupEpoch(10);
-        expectedConsumerGroup.setTargetAssignmentEpoch(10);
+        expectedConsumerGroup.setTargetAssignmentMetadata(10, 0L);
         expectedConsumerGroup.updateTargetAssignment(memberId, new 
Assignment(mkAssignment(
             mkTopicAssignment(fooTopicId, 0)
         )));
@@ -1366,6 +1366,8 @@ public class ConsumerGroupTest {
 
         assertEquals(expectedConsumerGroup.groupId(), consumerGroup.groupId());
         assertEquals(expectedConsumerGroup.groupEpoch(), 
consumerGroup.groupEpoch());
+        assertEquals(expectedConsumerGroup.assignmentEpoch(), 
consumerGroup.assignmentEpoch());
+        assertEquals(expectedConsumerGroup.assignmentTimestamp(), 
consumerGroup.assignmentTimestamp());
         assertEquals(expectedConsumerGroup.state(), consumerGroup.state());
         assertEquals(expectedConsumerGroup.preferredServerAssignor(), 
consumerGroup.preferredServerAssignor());
         assertEquals(expectedConsumerGroup.members(), consumerGroup.members());
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupBuilder.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupBuilder.java
index 67cfb756612..14d6442057c 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupBuilder.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupBuilder.java
@@ -31,6 +31,7 @@ public class ShareGroupBuilder {
     private final String groupId;
     private final int groupEpoch;
     private int assignmentEpoch;
+    private long assignmentTimestamp;
     private final Map<String, ShareGroupMember> members = new HashMap<>();
     private final Map<String, Assignment> assignments = new HashMap<>();
     private long metadataHash = 0L;
@@ -39,6 +40,7 @@ public class ShareGroupBuilder {
         this.groupId = groupId;
         this.groupEpoch = groupEpoch;
         this.assignmentEpoch = 0;
+        this.assignmentTimestamp = 0L;
     }
 
     public ShareGroupBuilder withMember(ShareGroupMember member) {
@@ -61,6 +63,11 @@ public class ShareGroupBuilder {
         return this;
     }
 
+    public ShareGroupBuilder withAssignmentTimestamp(long assignmentTimestamp) 
{
+        this.assignmentTimestamp = assignmentTimestamp;
+        return this;
+    }
+
     public List<CoordinatorRecord> build() {
         List<CoordinatorRecord> records = new ArrayList<>();
 
@@ -78,7 +85,7 @@ public class ShareGroupBuilder {
         );
 
         // Add target assignment epoch.
-        
records.add(GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentEpochRecord(groupId,
 assignmentEpoch));
+        
records.add(GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentMetadataRecord(groupId,
 assignmentEpoch, assignmentTimestamp));
 
         // Add current assignment records for members.
         members.forEach((memberId, member) ->
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupTest.java
index 6a6f963ce70..60daaa5d1bc 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupTest.java
@@ -406,7 +406,7 @@ public class ShareGroupTest {
         assertEquals(ShareGroupState.STABLE, shareGroup.state());
         assertThrows(GroupNotEmptyException.class, 
shareGroup::validateDeleteGroup);
 
-        shareGroup.setTargetAssignmentEpoch(1);
+        shareGroup.setTargetAssignmentMetadata(1, 12345L);
 
         assertEquals(ShareGroupState.STABLE, shareGroup.state());
         assertThrows(GroupNotEmptyException.class, 
shareGroup::validateDeleteGroup);
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java
index 4a5bd69c25b..bf5b1403e87 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java
@@ -381,12 +381,13 @@ class StreamsCoordinatorRecordHelpersTest {
                 .setGroupId(GROUP_ID),
             new ApiMessageAndVersion(
                 new StreamsGroupTargetAssignmentMetadataValue()
-                    .setAssignmentEpoch(42),
+                    .setAssignmentEpoch(42)
+                    .setAssignmentTimestamp(12345L),
                 (short) 0
             )
         );
 
-        assertEquals(expectedRecord, 
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(GROUP_ID,
 42));
+        assertEquals(expectedRecord, 
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentMetadataRecord(GROUP_ID,
 42, 12345L));
     }
 
     @Test
@@ -737,7 +738,7 @@ class StreamsCoordinatorRecordHelpersTest {
     @Test
     public void testNewStreamsGroupTargetAssignmentEpochRecordNullGroupId() {
         NullPointerException exception = 
assertThrows(NullPointerException.class, () ->
-            
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(null,
 1));
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentMetadataRecord(null,
 1, 12345L));
         assertEquals("groupId should not be null here", 
exception.getMessage());
     }
 
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java
index aa2f75ec846..5d3688693be 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java
@@ -31,6 +31,7 @@ public class StreamsGroupBuilder {
     private final String groupId;
     private final int groupEpoch;
     private int targetAssignmentEpoch;
+    private long targetAssignmentTimestamp;
     private StreamsTopology topology;
     private final Map<String, StreamsGroupMember> members = new HashMap<>();
     private final Map<String, TasksTuple> targetAssignments = new HashMap<>();
@@ -42,6 +43,7 @@ public class StreamsGroupBuilder {
         this.groupId = groupId;
         this.groupEpoch = groupEpoch;
         this.targetAssignmentEpoch = 0;
+        this.targetAssignmentTimestamp = 0L;
         this.topology = null;
     }
 
@@ -75,6 +77,11 @@ public class StreamsGroupBuilder {
         return this;
     }
 
+    public StreamsGroupBuilder withTargetAssignmentTimestamp(long 
targetAssignmentTimestamp) {
+        this.targetAssignmentTimestamp = targetAssignmentTimestamp;
+        return this;
+    }
+
     public StreamsGroupBuilder withLastAssignmentConfigs(Map<String, String> 
lastAssignmentConfigs) {
         this.lastAssignmentConfigs.putAll(lastAssignmentConfigs);
         return this;
@@ -110,8 +117,8 @@ public class StreamsGroupBuilder {
         }
 
         // Add target assignment epoch.
-        
records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
-            targetAssignmentEpoch));
+        
records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentMetadataRecord(groupId,
+            targetAssignmentEpoch, targetAssignmentTimestamp));
 
         // Add current assignment records for members.
         members.forEach((memberId, member) ->
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
index 3a144d00dda..d52c5e631f1 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
@@ -518,7 +518,7 @@ public class StreamsGroupTest {
         assertEquals(MemberState.STABLE, member2.state());
         assertEquals(StreamsGroup.StreamsGroupState.ASSIGNING, 
streamsGroup.state());
 
-        streamsGroup.setTargetAssignmentEpoch(2);
+        streamsGroup.setTargetAssignmentMetadata(2, 12345L);
 
         assertEquals(StreamsGroup.StreamsGroupState.RECONCILING, 
streamsGroup.state());
 
@@ -802,7 +802,7 @@ public class StreamsGroupTest {
         group.setTopology(new StreamsTopology(1, Map.of()));
         group.setValidatedTopologyEpoch(1);
         group.setConfiguredTopology(new ConfiguredTopology(1, 0, 
Optional.of(new TreeMap<>()), Map.of(), Optional.empty()));
-        group.setTargetAssignmentEpoch(1);
+        group.setTargetAssignmentMetadata(1, 12345L);
         group.updateMember(new StreamsGroupMember.Builder("member1")
             .setMemberEpoch(1)
             .build());
@@ -877,7 +877,7 @@ public class StreamsGroupTest {
         assertEquals(StreamsGroup.StreamsGroupState.ASSIGNING, 
streamsGroup.state());
         assertThrows(GroupNotEmptyException.class, 
streamsGroup::validateDeleteGroup);
 
-        streamsGroup.setTargetAssignmentEpoch(1);
+        streamsGroup.setTargetAssignmentMetadata(1, 12345L);
 
         assertEquals(StreamsGroup.StreamsGroupState.STABLE, 
streamsGroup.state());
         assertThrows(GroupNotEmptyException.class, 
streamsGroup::validateDeleteGroup);
@@ -914,7 +914,7 @@ public class StreamsGroupTest {
         group.setTopology(new StreamsTopology(1, Map.of()));
         group.setConfiguredTopology(new ConfiguredTopology(1, 0, 
Optional.of(new TreeMap<>()), Map.of(), Optional.empty()));
         group.setValidatedTopologyEpoch(1);
-        group.setTargetAssignmentEpoch(1);
+        group.setTargetAssignmentMetadata(1, 12345L);
         group.updateMember(new StreamsGroupMember.Builder("member1")
             .setMemberEpoch(1)
             .setPreviousMemberEpoch(0)
@@ -1139,7 +1139,7 @@ public class StreamsGroupTest {
 
         group.setGroupEpoch(2);
         group.setTopology(new StreamsTopology(2, subtopologies));
-        group.setTargetAssignmentEpoch(2);
+        group.setTargetAssignmentMetadata(2, 12345L);
         group.updateMember(new StreamsGroupMember.Builder("member1")
             .setMemberEpoch(2)
             .setPreviousMemberEpoch(1)
@@ -1198,7 +1198,7 @@ public class StreamsGroupTest {
         group.setGroupEpoch(3);
         group.setTopology(new StreamsTopology(2, subtopologies));
         group.setConfiguredTopology(new ConfiguredTopology(3, 0, 
Optional.of(new TreeMap<>()), Map.of(), Optional.empty()));
-        group.setTargetAssignmentEpoch(3);
+        group.setTargetAssignmentMetadata(3, 12345L);
         snapshotRegistry.idempotentCreateSnapshot(1);
 
         StreamsGroupDescribeResponseData.DescribedGroup describedGroup = 
group.asDescribedGroup(1);
@@ -1225,7 +1225,7 @@ public class StreamsGroupTest {
         group.setGroupEpoch(4);
         group.setTopology(new StreamsTopology(4, subtopologies));
         // No ConfiguredTopology set, so should fallback to StreamsTopology
-        group.setTargetAssignmentEpoch(4);
+        group.setTargetAssignmentMetadata(4, 12345L);
         snapshotRegistry.idempotentCreateSnapshot(1);
 
         StreamsGroupDescribeResponseData.DescribedGroup describedGroup = 
group.asDescribedGroup(1);
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java
index 7d8ba55e24e..dcce6b02461 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.coordinator.group.streams;
 
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage;
 import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
 import 
org.apache.kafka.coordinator.common.runtime.KRaftCoordinatorMetadataImage;
@@ -46,7 +47,7 @@ import java.util.TreeMap;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static 
org.apache.kafka.coordinator.group.Assertions.assertUnorderedRecordsEquals;
-import static 
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord;
+import static 
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentMetadataRecord;
 import static 
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord;
 import static 
org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.createAssignmentMemberSpec;
 import static 
org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasks;
@@ -71,12 +72,13 @@ public class TargetAssignmentBuilderTest {
         when(topology.isReady()).thenReturn(false);
 
         TargetAssignmentBuilder builder = new TargetAssignmentBuilder(groupId, 
groupEpoch, assignor, assignmentConfigs)
+            .withTime(new MockTime(0, 12345L, 12345L))
             .withTopology(topology);
 
         TargetAssignmentBuilder.TargetAssignmentResult result = 
builder.build();
 
         List<CoordinatorRecord> expectedRecords = List.of(
-            
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
 groupEpoch)
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentMetadataRecord(groupId,
 groupEpoch, 12345L)
         );
 
         assertEquals(expectedRecords, result.records());
@@ -124,13 +126,15 @@ public class TargetAssignmentBuilderTest {
     public void testEmpty() {
         TargetAssignmentBuilderTestContext context = new 
TargetAssignmentBuilderTestContext(
             "my-group",
-            20
+            20,
+            12345L
         );
 
         
org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.TargetAssignmentResult
 result = context.build();
-        assertEquals(List.of(newStreamsGroupTargetAssignmentEpochRecord(
+        assertEquals(List.of(newStreamsGroupTargetAssignmentMetadataRecord(
             "my-group",
-            20
+            20,
+            12345L
         )), result.records());
         assertEquals(Map.of(), result.targetAssignment());
     }
@@ -141,7 +145,8 @@ public class TargetAssignmentBuilderTest {
     public void testAssignmentHasNotChanged(TaskRole taskRole) {
         TargetAssignmentBuilderTestContext context = new 
TargetAssignmentBuilderTestContext(
             "my-group",
-            20
+            20,
+            12345L
         );
 
         String fooSubtopologyId = 
context.addSubtopologyWithSingleSourceTopic("foo", 6);
@@ -169,9 +174,10 @@ public class TargetAssignmentBuilderTest {
 
         
org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.TargetAssignmentResult
 result = context.build();
 
-        assertEquals(List.of(newStreamsGroupTargetAssignmentEpochRecord(
+        assertEquals(List.of(newStreamsGroupTargetAssignmentMetadataRecord(
             "my-group",
-            20
+            20,
+            12345L
         )), result.records());
 
         Map<String, TasksTuple> expectedAssignment = new HashMap<>();
@@ -193,7 +199,8 @@ public class TargetAssignmentBuilderTest {
     public void testAssignmentSwapped(TaskRole taskRole) {
         TargetAssignmentBuilderTestContext context = new 
TargetAssignmentBuilderTestContext(
             "my-group",
-            20
+            20,
+            12345L
         );
 
         String fooSubtopologyId = 
context.addSubtopologyWithSingleSourceTopic("foo", 6);
@@ -234,9 +241,10 @@ public class TargetAssignmentBuilderTest {
             ))
         )), result.records().subList(0, 2));
 
-        assertEquals(newStreamsGroupTargetAssignmentEpochRecord(
+        assertEquals(newStreamsGroupTargetAssignmentMetadataRecord(
             "my-group",
-            20
+            20,
+            12345L
         ), result.records().get(2));
 
         Map<String, TasksTuple> expectedAssignment = new HashMap<>();
@@ -258,7 +266,8 @@ public class TargetAssignmentBuilderTest {
     public void testNewMember(TaskRole taskRole) {
         TargetAssignmentBuilderTestContext context = new 
TargetAssignmentBuilderTestContext(
             "my-group",
-            20
+            20,
+            12345L
         );
 
         String fooSubtopologyId = 
context.addSubtopologyWithSingleSourceTopic("foo", 6);
@@ -310,9 +319,10 @@ public class TargetAssignmentBuilderTest {
             ))
         )), result.records().subList(0, 3));
 
-        assertEquals(newStreamsGroupTargetAssignmentEpochRecord(
+        assertEquals(newStreamsGroupTargetAssignmentMetadataRecord(
             "my-group",
-            20
+            20,
+            12345L
         ), result.records().get(3));
 
         Map<String, TasksTuple> expectedAssignment = new HashMap<>();
@@ -338,7 +348,8 @@ public class TargetAssignmentBuilderTest {
     public void testUpdateMember(TaskRole taskRole) {
         TargetAssignmentBuilderTestContext context = new 
TargetAssignmentBuilderTestContext(
             "my-group",
-            20
+            20,
+            12345L
         );
 
         String fooSubtopologyId = 
context.addSubtopologyWithSingleSourceTopic("foo", 6);
@@ -398,9 +409,10 @@ public class TargetAssignmentBuilderTest {
             ))
         )), result.records().subList(0, 3));
 
-        assertEquals(newStreamsGroupTargetAssignmentEpochRecord(
+        assertEquals(newStreamsGroupTargetAssignmentMetadataRecord(
             "my-group",
-            20
+            20,
+            12345L
         ), result.records().get(3));
 
         Map<String, TasksTuple> expectedAssignment = new HashMap<>();
@@ -426,7 +438,8 @@ public class TargetAssignmentBuilderTest {
     public void testPartialAssignmentUpdate(TaskRole taskRole) {
         TargetAssignmentBuilderTestContext context = new 
TargetAssignmentBuilderTestContext(
             "my-group",
-            20
+            20,
+            12345L
         );
 
         String fooSubtopologyId = 
context.addSubtopologyWithSingleSourceTopic("foo", 6);
@@ -478,9 +491,10 @@ public class TargetAssignmentBuilderTest {
             ))
         )), result.records().subList(0, 2));
 
-        assertEquals(newStreamsGroupTargetAssignmentEpochRecord(
+        assertEquals(newStreamsGroupTargetAssignmentMetadataRecord(
             "my-group",
-            20
+            20,
+            12345L
         ), result.records().get(2));
 
         Map<String, TasksTuple> expectedAssignment = new HashMap<>();
@@ -506,7 +520,8 @@ public class TargetAssignmentBuilderTest {
     public void testDeleteMember(TaskRole taskRole) {
         TargetAssignmentBuilderTestContext context = new 
TargetAssignmentBuilderTestContext(
             "my-group",
-            20
+            20,
+            12345L
         );
 
         String fooSubtopologyId = 
context.addSubtopologyWithSingleSourceTopic("foo", 6);
@@ -554,9 +569,10 @@ public class TargetAssignmentBuilderTest {
             ))
         )), result.records().subList(0, 2));
 
-        assertEquals(newStreamsGroupTargetAssignmentEpochRecord(
+        assertEquals(newStreamsGroupTargetAssignmentMetadataRecord(
             "my-group",
-            20
+            20,
+            12345L
         ), result.records().get(2));
 
         Map<String, TasksTuple> expectedAssignment = new HashMap<>();
@@ -578,7 +594,8 @@ public class TargetAssignmentBuilderTest {
     public void testReplaceStaticMember(TaskRole taskRole) {
         TargetAssignmentBuilderTestContext context = new 
TargetAssignmentBuilderTestContext(
             "my-group",
-            20
+            20,
+            12345L
         );
 
         String fooSubtopologyId = 
context.addSubtopologyWithSingleSourceTopic("foo", 6);
@@ -632,9 +649,10 @@ public class TargetAssignmentBuilderTest {
             ))
         )), result.records().subList(0, 1));
 
-        assertEquals(newStreamsGroupTargetAssignmentEpochRecord(
+        assertEquals(newStreamsGroupTargetAssignmentMetadataRecord(
             "my-group",
-            20
+            20,
+            12345L
         ), result.records().get(1));
 
         Map<String, TasksTuple> expectedAssignment = new HashMap<>();
@@ -659,6 +677,7 @@ public class TargetAssignmentBuilderTest {
 
         private final String groupId;
         private final int groupEpoch;
+        private final long assignmentTimestamp;
         private final TaskAssignor assignor = mock(TaskAssignor.class);
         private final SortedMap<String, ConfiguredSubtopology> subtopologies = 
new TreeMap<>();
         private final ConfiguredTopology topology = new ConfiguredTopology(0, 
0, Optional.of(subtopologies), new HashMap<>(),
@@ -673,10 +692,12 @@ public class TargetAssignmentBuilderTest {
 
         public TargetAssignmentBuilderTestContext(
             String groupId,
-            int groupEpoch
+            int groupEpoch,
+            long assignmentTimestamp
         ) {
             this.groupId = groupId;
             this.groupEpoch = groupEpoch;
+            this.assignmentTimestamp = assignmentTimestamp;
         }
 
         public void addGroupMember(
@@ -834,7 +855,9 @@ public class TargetAssignmentBuilderTest {
             });
 
             // Execute the builder.
-            
org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.TargetAssignmentResult
 result = builder.build();
+            
org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.TargetAssignmentResult
 result = builder
+                .withTime(new MockTime(0, assignmentTimestamp, 
assignmentTimestamp))
+                .build();
 
             // Verify that the assignor was called once with the expected
             // assignment spec.

Reply via email to