This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 199772adc5a KAFKA-19141; Persist topic id in OffsetCommit record 
(#19683)
199772adc5a is described below

commit 199772adc5a758ef74ae3adc6a5da471e50bfb27
Author: David Jacot <[email protected]>
AuthorDate: Fri May 16 16:26:36 2025 +0200

    KAFKA-19141; Persist topic id in OffsetCommit record (#19683)
    
    This patch adds the `TopicId` field to the `OffsetCommitValue` record as
    a tagged field. It will be later used on the offset fetch path to ensure
    that the persisted offset matches the requested one.
    
    Reviewers: Dongnuo Lyu <[email protected]>, Sean Quah
     <[email protected]>, Lianet Magrans <[email protected]>
---
 .../group/GroupCoordinatorRecordHelpers.java       |   7 +-
 .../kafka/coordinator/group/OffsetAndMetadata.java |  29 +++-
 .../coordinator/group/OffsetMetadataManager.java   |   1 +
 .../common/message/OffsetCommitValue.json          |   4 +-
 .../group/GroupCoordinatorRecordHelpersTest.java   |  33 +++-
 .../coordinator/group/OffsetAndMetadataTest.java   |  53 ++++--
 .../group/OffsetExpirationConditionImplTest.java   |   8 +-
 .../group/OffsetMetadataManagerTest.java           | 191 +++++++++------------
 .../group/classic/ClassicGroupTest.java            |   2 +-
 .../group/modern/consumer/ConsumerGroupTest.java   |   2 +-
 .../group/streams/StreamsGroupTest.java            |   2 +-
 .../tools/consumer/OffsetMessageFormatterTest.java |   8 +-
 12 files changed, 195 insertions(+), 145 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java
index 457d090fc53..ad17bfe34f2 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java
@@ -517,7 +517,8 @@ public class GroupCoordinatorRecordHelpers {
                     .setMetadata(offsetAndMetadata.metadata)
                     .setCommitTimestamp(offsetAndMetadata.commitTimestampMs)
                     // Version 1 has a non-empty expireTimestamp field
-                    
.setExpireTimestamp(offsetAndMetadata.expireTimestampMs.orElse(OffsetCommitRequest.DEFAULT_TIMESTAMP)),
+                    
.setExpireTimestamp(offsetAndMetadata.expireTimestampMs.orElse(OffsetCommitRequest.DEFAULT_TIMESTAMP))
+                    .setTopicId(offsetAndMetadata.topicId),
                 version
             )
         );
@@ -527,9 +528,7 @@ public class GroupCoordinatorRecordHelpers {
         if (expireTimestampMs) {
             return 1;
         } else {
-            // Serialize with the highest supported non-flexible version
-            // until a tagged field is introduced or the version is bumped.
-            return  3;
+            return  4;
         }
     }
 
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetAndMetadata.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetAndMetadata.java
index a5635f58ac5..b8c8ee28238 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetAndMetadata.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetAndMetadata.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.coordinator.group;
 
+import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.message.OffsetCommitRequestData;
 import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
 import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
@@ -65,12 +66,18 @@ public class OffsetAndMetadata {
      */
     public final long recordOffset;
 
+    /**
+     * The topic id used to commit the offset.
+     */
+    public final Uuid topicId;
+
     public OffsetAndMetadata(
         long committedOffset,
         OptionalInt leaderEpoch,
         String metadata,
         long commitTimestampMs,
-        OptionalLong expireTimestampMs
+        OptionalLong expireTimestampMs,
+        Uuid topicId
     ) {
         this(
             -1L,
@@ -78,7 +85,8 @@ public class OffsetAndMetadata {
             leaderEpoch,
             metadata,
             commitTimestampMs,
-            expireTimestampMs
+            expireTimestampMs,
+            topicId
         );
     }
 
@@ -88,7 +96,8 @@ public class OffsetAndMetadata {
         OptionalInt leaderEpoch,
         String metadata,
         long commitTimestampMs,
-        OptionalLong expireTimestampMs
+        OptionalLong expireTimestampMs,
+        Uuid topicId
     ) {
         this.recordOffset = recordOffset;
         this.committedOffset = committedOffset;
@@ -96,6 +105,7 @@ public class OffsetAndMetadata {
         this.metadata = Objects.requireNonNull(metadata);
         this.commitTimestampMs = commitTimestampMs;
         this.expireTimestampMs = Objects.requireNonNull(expireTimestampMs);
+        this.topicId = topicId;
     }
 
     @Override
@@ -105,6 +115,7 @@ public class OffsetAndMetadata {
             ", metadata=" + metadata +
             ", commitTimestampMs=" + commitTimestampMs +
             ", expireTimestampMs=" + expireTimestampMs +
+            ", topicId=" + topicId +
             ", recordOffset=" + recordOffset +
             ')';
     }
@@ -121,6 +132,7 @@ public class OffsetAndMetadata {
         if (recordOffset != that.recordOffset) return false;
         if (!Objects.equals(leaderEpoch, that.leaderEpoch)) return false;
         if (!Objects.equals(metadata, that.metadata)) return false;
+        if (!Objects.equals(topicId, that.topicId)) return false;
         return Objects.equals(expireTimestampMs, that.expireTimestampMs);
     }
 
@@ -129,6 +141,7 @@ public class OffsetAndMetadata {
         int result = (int) (committedOffset ^ (committedOffset >>> 32));
         result = 31 * result + (leaderEpoch != null ? leaderEpoch.hashCode() : 
0);
         result = 31 * result + (metadata != null ? metadata.hashCode() : 0);
+        result = 31 * result + (topicId != null ? topicId.hashCode() : 0);
         result = 31 * result + (int) (commitTimestampMs ^ (commitTimestampMs 
>>> 32));
         result = 31 * result + (expireTimestampMs != null ? 
expireTimestampMs.hashCode() : 0);
         result = 31 * result + (int) (recordOffset ^ (recordOffset >>> 32));
@@ -148,7 +161,8 @@ public class OffsetAndMetadata {
             ofSentinel(record.leaderEpoch()),
             record.metadata(),
             record.commitTimestamp(),
-            ofSentinel(record.expireTimestamp())
+            ofSentinel(record.expireTimestamp()),
+            record.topicId()
         );
     }
 
@@ -156,6 +170,7 @@ public class OffsetAndMetadata {
      * @return An OffsetAndMetadata created from an 
OffsetCommitRequestPartition request.
      */
     public static OffsetAndMetadata fromRequest(
+        Uuid topicId,
         OffsetCommitRequestData.OffsetCommitRequestPartition partition,
         long currentTimeMs,
         OptionalLong expireTimestampMs
@@ -166,7 +181,8 @@ public class OffsetAndMetadata {
             partition.committedMetadata() == null ?
                 OffsetAndMetadata.NO_METADATA : partition.committedMetadata(),
             currentTimeMs,
-            expireTimestampMs
+            expireTimestampMs,
+            topicId
         );
     }
 
@@ -183,7 +199,8 @@ public class OffsetAndMetadata {
             partition.committedMetadata() == null ?
                 OffsetAndMetadata.NO_METADATA : partition.committedMetadata(),
             currentTimeMs,
-            OptionalLong.empty()
+            OptionalLong.empty(),
+            Uuid.ZERO_UUID
         );
     }
 }
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
index 4629df32217..1f67d23246c 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
@@ -641,6 +641,7 @@ public class OffsetMetadataManager {
                         .setErrorCode(Errors.NONE.code()));
 
                     final OffsetAndMetadata offsetAndMetadata = 
OffsetAndMetadata.fromRequest(
+                        topic.topicId(),
                         partition,
                         currentTimeMs,
                         expireTimestampMs
diff --git 
a/group-coordinator/src/main/resources/common/message/OffsetCommitValue.json 
b/group-coordinator/src/main/resources/common/message/OffsetCommitValue.json
index 8f7d32d5447..d9eca6cebe8 100644
--- a/group-coordinator/src/main/resources/common/message/OffsetCommitValue.json
+++ b/group-coordinator/src/main/resources/common/message/OffsetCommitValue.json
@@ -32,6 +32,8 @@
     { "name": "commitTimestamp", "type": "int64", "versions": "0+",
       "about": "The time at which the commit was added to the log."},
     { "name": "expireTimestamp", "type": "int64", "versions": "1", "default": 
-1, "ignorable": true,
-      "about": "The time at which the offset will expire."}
+      "about": "The time at which the offset will expire."},
+    { "name": "topicId", "type": "uuid", "versions": "4+", "taggedVersions": 
"4+", "tag": 0, "ignorable": true,
+      "about": "The topic id of the committed offset."}
   ]
 }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java
index 8bd0987ac46..be2e9df5fea 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java
@@ -55,6 +55,8 @@ import 
org.apache.kafka.coordinator.group.modern.consumer.ResolvedRegularExpress
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -68,6 +70,7 @@ import java.util.Optional;
 import java.util.OptionalInt;
 import java.util.OptionalLong;
 import java.util.Set;
+import java.util.stream.Stream;
 
 import static org.apache.kafka.coordinator.group.Assertions.assertRecordEquals;
 import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkOrderedAssignment;
@@ -720,11 +723,19 @@ public class GroupCoordinatorRecordHelpersTest {
     @Test
     public void testOffsetCommitValueVersion() {
         assertEquals((short) 1, 
GroupCoordinatorRecordHelpers.offsetCommitValueVersion(true));
-        assertEquals((short) 3, 
GroupCoordinatorRecordHelpers.offsetCommitValueVersion(false));
+        assertEquals((short) 4, 
GroupCoordinatorRecordHelpers.offsetCommitValueVersion(false));
     }
 
-    @Test
-    public void testNewOffsetCommitRecord() {
+    private static Stream<Uuid> uuids() {
+        return Stream.of(
+            Uuid.ZERO_UUID,
+            Uuid.randomUuid()
+        );
+    }
+
+    @ParameterizedTest
+    @MethodSource("uuids")
+    public void testNewOffsetCommitRecord(Uuid topicId) {
         OffsetCommitKey key = new OffsetCommitKey()
             .setGroup("group-id")
             .setTopic("foo")
@@ -734,7 +745,8 @@ public class GroupCoordinatorRecordHelpersTest {
             .setLeaderEpoch(10)
             .setMetadata("metadata")
             .setCommitTimestamp(1234L)
-            .setExpireTimestamp(-1L);
+            .setExpireTimestamp(-1L)
+            .setTopicId(topicId);
 
         CoordinatorRecord expectedRecord = CoordinatorRecord.record(
             key,
@@ -749,11 +761,14 @@ public class GroupCoordinatorRecordHelpersTest {
             "foo",
             1,
             new OffsetAndMetadata(
+                -1L,
                 100L,
                 OptionalInt.of(10),
                 "metadata",
                 1234L,
-                OptionalLong.empty())
+                OptionalLong.empty(),
+                topicId
+            )
         ));
 
         value.setLeaderEpoch(-1);
@@ -767,7 +782,9 @@ public class GroupCoordinatorRecordHelpersTest {
                 OptionalInt.empty(),
                 "metadata",
                 1234L,
-                OptionalLong.empty())
+                OptionalLong.empty(),
+                topicId
+            )
         ));
     }
 
@@ -798,7 +815,9 @@ public class GroupCoordinatorRecordHelpersTest {
                 OptionalInt.of(10),
                 "metadata",
                 1234L,
-                OptionalLong.of(5678L))
+                OptionalLong.of(5678L),
+                Uuid.ZERO_UUID
+            )
         ));
     }
 
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetAndMetadataTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetAndMetadataTest.java
index e6be1f27883..2cc585b06af 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetAndMetadataTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetAndMetadataTest.java
@@ -16,27 +16,33 @@
  */
 package org.apache.kafka.coordinator.group;
 
+import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.message.OffsetCommitRequestData;
 import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
 import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
 import org.apache.kafka.server.util.MockTime;
 
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.util.OptionalInt;
 import java.util.OptionalLong;
+import java.util.stream.Stream;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 public class OffsetAndMetadataTest {
     @Test
     public void testAttributes() {
+        Uuid topicId = Uuid.randomUuid();
         OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(
             100L,
             OptionalInt.of(10),
             "metadata",
             1234L,
-            OptionalLong.of(5678L)
+            OptionalLong.of(5678L),
+            topicId
         );
 
         assertEquals(100L, offsetAndMetadata.committedOffset);
@@ -44,16 +50,26 @@ public class OffsetAndMetadataTest {
         assertEquals("metadata", offsetAndMetadata.metadata);
         assertEquals(1234L, offsetAndMetadata.commitTimestampMs);
         assertEquals(OptionalLong.of(5678L), 
offsetAndMetadata.expireTimestampMs);
+        assertEquals(topicId, offsetAndMetadata.topicId);
     }
 
-    @Test
-    public void testFromRecord() {
+    private static Stream<Uuid> uuids() {
+        return Stream.of(
+            Uuid.ZERO_UUID,
+            Uuid.randomUuid()
+        );
+    }
+
+    @ParameterizedTest
+    @MethodSource("uuids")
+    public void testFromRecord(Uuid uuid) {
         OffsetCommitValue record = new OffsetCommitValue()
             .setOffset(100L)
             .setLeaderEpoch(-1)
             .setMetadata("metadata")
             .setCommitTimestamp(1234L)
-            .setExpireTimestamp(-1L);
+            .setExpireTimestamp(-1L)
+            .setTopicId(uuid);
 
         assertEquals(new OffsetAndMetadata(
             10L,
@@ -61,7 +77,8 @@ public class OffsetAndMetadataTest {
             OptionalInt.empty(),
             "metadata",
             1234L,
-            OptionalLong.empty()
+            OptionalLong.empty(),
+            uuid
         ), OffsetAndMetadata.fromRecord(10L, record));
 
         record
@@ -74,12 +91,14 @@ public class OffsetAndMetadataTest {
             OptionalInt.of(12),
             "metadata",
             1234L,
-            OptionalLong.of(5678L)
+            OptionalLong.of(5678L),
+            uuid
         ), OffsetAndMetadata.fromRecord(11L, record));
     }
 
-    @Test
-    public void testFromRequest() {
+    @ParameterizedTest
+    @MethodSource("uuids")
+    public void testFromRequest(Uuid uuid) {
         MockTime time = new MockTime();
 
         OffsetCommitRequestData.OffsetCommitRequestPartition partition =
@@ -95,8 +114,10 @@ public class OffsetAndMetadataTest {
                 OptionalInt.empty(),
                 "",
                 time.milliseconds(),
-                OptionalLong.empty()
+                OptionalLong.empty(),
+                uuid
             ), OffsetAndMetadata.fromRequest(
+                uuid,
                 partition,
                 time.milliseconds(),
                 OptionalLong.empty()
@@ -113,8 +134,10 @@ public class OffsetAndMetadataTest {
                 OptionalInt.of(10),
                 "hello",
                 time.milliseconds(),
-                OptionalLong.empty()
+                OptionalLong.empty(),
+                uuid
             ), OffsetAndMetadata.fromRequest(
+                uuid,
                 partition,
                 time.milliseconds(),
                 OptionalLong.empty()
@@ -127,8 +150,10 @@ public class OffsetAndMetadataTest {
                 OptionalInt.of(10),
                 "hello",
                 time.milliseconds(),
-                OptionalLong.of(5678L)
+                OptionalLong.of(5678L),
+                uuid
             ), OffsetAndMetadata.fromRequest(
+                uuid,
                 partition,
                 time.milliseconds(),
                 OptionalLong.of(5678L)
@@ -153,7 +178,8 @@ public class OffsetAndMetadataTest {
                 OptionalInt.empty(),
                 "",
                 time.milliseconds(),
-                OptionalLong.empty()
+                OptionalLong.empty(),
+                Uuid.ZERO_UUID
             ), OffsetAndMetadata.fromRequest(
                 partition,
                 time.milliseconds()
@@ -170,7 +196,8 @@ public class OffsetAndMetadataTest {
                 OptionalInt.of(10),
                 "hello",
                 time.milliseconds(),
-                OptionalLong.empty()
+                OptionalLong.empty(),
+                Uuid.ZERO_UUID
             ), OffsetAndMetadata.fromRequest(
                 partition,
                 time.milliseconds()
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetExpirationConditionImplTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetExpirationConditionImplTest.java
index 2f1cb354a5a..63bbad94467 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetExpirationConditionImplTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetExpirationConditionImplTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.coordinator.group;
 
+import org.apache.kafka.common.Uuid;
+
 import org.junit.jupiter.api.Test;
 
 import java.util.OptionalInt;
@@ -39,7 +41,8 @@ public class OffsetExpirationConditionImplTest {
             OptionalInt.of(1),
             "metadata",
             commitTimestamp,
-            expireTimestampMs
+            expireTimestampMs,
+            Uuid.ZERO_UUID
         );
 
         // Test when expire timestamp exists (older versions with per 
partition retention)
@@ -56,7 +59,8 @@ public class OffsetExpirationConditionImplTest {
             OptionalInt.of(1),
             "metadata",
             commitTimestamp,
-            OptionalLong.empty()
+            OptionalLong.empty(),
+            Uuid.ZERO_UUID
         );
 
         // 3. Current timestamp - base timestamp >= offsets retention => 
should expire
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
index e90927cfdd7..d3ddef15773 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
@@ -68,6 +68,7 @@ import org.apache.kafka.timeline.SnapshotRegistry;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.net.InetAddress;
 import java.time.Duration;
@@ -80,6 +81,7 @@ import java.util.Optional;
 import java.util.OptionalInt;
 import java.util.OptionalLong;
 import java.util.Set;
+import java.util.stream.Stream;
 
 import static 
org.apache.kafka.common.requests.OffsetFetchResponse.INVALID_OFFSET;
 import static 
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.OFFSET_COMMITS_SENSOR_NAME;
@@ -449,7 +451,8 @@ public class OffsetMetadataManagerTest {
                     OptionalInt.of(leaderEpoch),
                     "metadata",
                     commitTimestamp,
-                    OptionalLong.empty()
+                    OptionalLong.empty(),
+                    Uuid.ZERO_UUID
                 )
             ));
         }
@@ -580,6 +583,13 @@ public class OffsetMetadataManagerTest {
         }
     }
 
+    private static Stream<Uuid> uuids() {
+        return Stream.of(
+            Uuid.ZERO_UUID,
+            Uuid.randomUuid()
+        );
+    }
+
     @ParameterizedTest
     @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
     public void testOffsetCommitWithUnknownGroup(short version) {
@@ -906,7 +916,8 @@ public class OffsetMetadataManagerTest {
                     OptionalInt.empty(),
                     "",
                     context.time.milliseconds(),
-                    OptionalLong.of(context.time.milliseconds() + 1234L)
+                    OptionalLong.of(context.time.milliseconds() + 1234L),
+                    Uuid.ZERO_UUID
                 )
             )),
             result.records()
@@ -971,8 +982,9 @@ public class OffsetMetadataManagerTest {
         assertFalse(group.hasMember(member.memberId()));
     }
 
-    @Test
-    public void testSimpleGroupOffsetCommit() {
+    @ParameterizedTest
+    @MethodSource("uuids")
+    public void testSimpleGroupOffsetCommit(Uuid topicId) {
         OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
 
         CoordinatorResult<OffsetCommitResponseData, CoordinatorRecord> result 
= context.commitOffset(
@@ -981,6 +993,7 @@ public class OffsetMetadataManagerTest {
                 .setTopics(List.of(
                     new OffsetCommitRequestData.OffsetCommitRequestTopic()
                         .setName("bar")
+                        .setTopicId(topicId)
                         .setPartitions(List.of(
                             new 
OffsetCommitRequestData.OffsetCommitRequestPartition()
                                 .setPartitionIndex(0)
@@ -994,6 +1007,7 @@ public class OffsetMetadataManagerTest {
                 .setTopics(List.of(
                     new OffsetCommitResponseData.OffsetCommitResponseTopic()
                         .setName("bar")
+                        .setTopicId(topicId)
                         .setPartitions(List.of(
                             new 
OffsetCommitResponseData.OffsetCommitResponsePartition()
                                 .setPartitionIndex(0)
@@ -1013,7 +1027,8 @@ public class OffsetMetadataManagerTest {
                     OptionalInt.empty(),
                     "",
                     context.time.milliseconds(),
-                    OptionalLong.empty()
+                    OptionalLong.empty(),
+                    topicId
                 )
             )),
             result.records()
@@ -1072,7 +1087,8 @@ public class OffsetMetadataManagerTest {
                     OptionalInt.empty(),
                     "",
                     context.time.milliseconds(),
-                    OptionalLong.empty()
+                    OptionalLong.empty(),
+                    Uuid.ZERO_UUID
                 )
             )),
             result.records()
@@ -1235,15 +1251,17 @@ public class OffsetMetadataManagerTest {
                     OptionalInt.empty(),
                     "",
                     context.time.milliseconds(),
-                    OptionalLong.empty()
+                    OptionalLong.empty(),
+                    Uuid.ZERO_UUID
                 )
             )),
             result.records()
         );
     }
 
-    @Test
-    public void testConsumerGroupOffsetCommit() {
+    @ParameterizedTest
+    @MethodSource("uuids")
+    public void testConsumerGroupOffsetCommit(Uuid topicId) {
         OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
 
         // Create an empty group.
@@ -1267,74 +1285,7 @@ public class OffsetMetadataManagerTest {
                 .setTopics(List.of(
                     new OffsetCommitRequestData.OffsetCommitRequestTopic()
                         .setName("bar")
-                        .setPartitions(List.of(
-                            new 
OffsetCommitRequestData.OffsetCommitRequestPartition()
-                                .setPartitionIndex(0)
-                                .setCommittedOffset(100L)
-                                .setCommittedLeaderEpoch(10)
-                                .setCommittedMetadata("metadata")
-                        ))
-                ))
-        );
-
-        assertEquals(
-            new OffsetCommitResponseData()
-                .setTopics(List.of(
-                    new OffsetCommitResponseData.OffsetCommitResponseTopic()
-                        .setName("bar")
-                        .setPartitions(List.of(
-                            new 
OffsetCommitResponseData.OffsetCommitResponsePartition()
-                                .setPartitionIndex(0)
-                                .setErrorCode(Errors.NONE.code())
-                        ))
-                )),
-            result.response()
-        );
-
-        assertEquals(
-            List.of(GroupCoordinatorRecordHelpers.newOffsetCommitRecord(
-                "foo",
-                "bar",
-                0,
-                new OffsetAndMetadata(
-                    100L,
-                    OptionalInt.of(10),
-                    "metadata",
-                    context.time.milliseconds(),
-                    OptionalLong.empty()
-                )
-            )),
-            result.records()
-        );
-    }
-
-    @Test
-    public void testConsumerGroupOffsetCommitWithTopicIds() {
-        Uuid topicId = Uuid.randomUuid();
-        OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
-
-        // Create an empty group.
-        ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup(
-            "foo",
-            true
-        );
-
-        // Add member.
-        group.updateMember(new ConsumerGroupMember.Builder("member")
-            .setMemberEpoch(10)
-            .setPreviousMemberEpoch(10)
-            .build()
-        );
-
-        CoordinatorResult<OffsetCommitResponseData, CoordinatorRecord> result 
= context.commitOffset(
-            new OffsetCommitRequestData()
-                .setGroupId("foo")
-                .setMemberId("member")
-                .setGenerationIdOrMemberEpoch(10)
-                .setTopics(List.of(
-                    new OffsetCommitRequestData.OffsetCommitRequestTopic()
                         .setTopicId(topicId)
-                        .setName("bar")
                         .setPartitions(List.of(
                             new 
OffsetCommitRequestData.OffsetCommitRequestPartition()
                                 .setPartitionIndex(0)
@@ -1349,8 +1300,8 @@ public class OffsetMetadataManagerTest {
             new OffsetCommitResponseData()
                 .setTopics(List.of(
                     new OffsetCommitResponseData.OffsetCommitResponseTopic()
-                        .setTopicId(topicId)
                         .setName("bar")
+                        .setTopicId(topicId)
                         .setPartitions(List.of(
                             new 
OffsetCommitResponseData.OffsetCommitResponsePartition()
                                 .setPartitionIndex(0)
@@ -1370,7 +1321,8 @@ public class OffsetMetadataManagerTest {
                     OptionalInt.of(10),
                     "metadata",
                     context.time.milliseconds(),
-                    OptionalLong.empty()
+                    OptionalLong.empty(),
+                    topicId
                 )
             )),
             result.records()
@@ -1446,7 +1398,8 @@ public class OffsetMetadataManagerTest {
                     OptionalInt.of(10),
                     "small",
                     context.time.milliseconds(),
-                    OptionalLong.empty()
+                    OptionalLong.empty(),
+                    Uuid.ZERO_UUID
                 )
             )),
             result.records()
@@ -1512,7 +1465,8 @@ public class OffsetMetadataManagerTest {
                     OptionalInt.of(10),
                     "metadata",
                     context.time.milliseconds(),
-                    OptionalLong.empty()
+                    OptionalLong.empty(),
+                    Uuid.ZERO_UUID
                 )
             )),
             result.records()
@@ -1669,7 +1623,8 @@ public class OffsetMetadataManagerTest {
                     OptionalInt.of(10),
                     "metadata",
                     context.time.milliseconds(),
-                    OptionalLong.empty()
+                    OptionalLong.empty(),
+                    Uuid.ZERO_UUID
                 )
             )),
             result.records()
@@ -2766,8 +2721,9 @@ public class OffsetMetadataManagerTest {
             .setMetadata("");
     }
 
-    @Test
-    public void testReplay() {
+    @ParameterizedTest
+    @MethodSource("uuids")
+    public void testReplay(Uuid topicId) {
         OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
 
         verifyReplay(context, "foo", "bar", 0, new OffsetAndMetadata(
@@ -2776,7 +2732,8 @@ public class OffsetMetadataManagerTest {
             OptionalInt.empty(),
             "small",
             context.time.milliseconds(),
-            OptionalLong.empty()
+            OptionalLong.empty(),
+            topicId
         ));
 
         verifyReplay(context, "foo", "bar", 0, new OffsetAndMetadata(
@@ -2785,7 +2742,8 @@ public class OffsetMetadataManagerTest {
             OptionalInt.of(10),
             "small",
             context.time.milliseconds(),
-            OptionalLong.empty()
+            OptionalLong.empty(),
+            topicId
         ));
 
         verifyReplay(context, "foo", "bar", 1, new OffsetAndMetadata(
@@ -2794,7 +2752,8 @@ public class OffsetMetadataManagerTest {
             OptionalInt.of(10),
             "small",
             context.time.milliseconds(),
-            OptionalLong.empty()
+            OptionalLong.empty(),
+            topicId
         ));
 
         verifyReplay(context, "foo", "bar", 1, new OffsetAndMetadata(
@@ -2803,7 +2762,8 @@ public class OffsetMetadataManagerTest {
             OptionalInt.of(10),
             "small",
             context.time.milliseconds(),
-            OptionalLong.of(12345L)
+            OptionalLong.of(12345L),
+            topicId
         ));
     }
 
@@ -2817,7 +2777,8 @@ public class OffsetMetadataManagerTest {
             OptionalInt.empty(),
             "small",
             context.time.milliseconds(),
-            OptionalLong.empty()
+            OptionalLong.empty(),
+            Uuid.ZERO_UUID
         ));
 
         verifyTransactionalReplay(context, 5, "foo", "bar", 1, new 
OffsetAndMetadata(
@@ -2826,7 +2787,8 @@ public class OffsetMetadataManagerTest {
             OptionalInt.empty(),
             "small",
             context.time.milliseconds(),
-            OptionalLong.empty()
+            OptionalLong.empty(),
+            Uuid.ZERO_UUID
         ));
 
         verifyTransactionalReplay(context, 5, "bar", "zar", 0, new 
OffsetAndMetadata(
@@ -2835,7 +2797,8 @@ public class OffsetMetadataManagerTest {
             OptionalInt.empty(),
             "small",
             context.time.milliseconds(),
-            OptionalLong.empty()
+            OptionalLong.empty(),
+            Uuid.ZERO_UUID
         ));
 
         verifyTransactionalReplay(context, 5, "bar", "zar", 1, new 
OffsetAndMetadata(
@@ -2844,7 +2807,8 @@ public class OffsetMetadataManagerTest {
             OptionalInt.empty(),
             "small",
             context.time.milliseconds(),
-            OptionalLong.empty()
+            OptionalLong.empty(),
+            Uuid.ZERO_UUID
         ));
 
         verifyTransactionalReplay(context, 6, "foo", "bar", 2, new 
OffsetAndMetadata(
@@ -2853,7 +2817,8 @@ public class OffsetMetadataManagerTest {
             OptionalInt.empty(),
             "small",
             context.time.milliseconds(),
-            OptionalLong.empty()
+            OptionalLong.empty(),
+            Uuid.ZERO_UUID
         ));
 
         verifyTransactionalReplay(context, 6, "foo", "bar", 3, new 
OffsetAndMetadata(
@@ -2862,7 +2827,8 @@ public class OffsetMetadataManagerTest {
             OptionalInt.empty(),
             "small",
             context.time.milliseconds(),
-            OptionalLong.empty()
+            OptionalLong.empty(),
+            Uuid.ZERO_UUID
         ));
     }
 
@@ -2877,7 +2843,8 @@ public class OffsetMetadataManagerTest {
             OptionalInt.empty(),
             "small",
             context.time.milliseconds(),
-            OptionalLong.empty()
+            OptionalLong.empty(),
+            Uuid.ZERO_UUID
         ));
 
         verifyTransactionalReplay(context, 10L, "foo", "bar", 0, new 
OffsetAndMetadata(
@@ -2886,7 +2853,8 @@ public class OffsetMetadataManagerTest {
             OptionalInt.empty(),
             "small",
             context.time.milliseconds(),
-            OptionalLong.empty()
+            OptionalLong.empty(),
+            Uuid.ZERO_UUID
         ));
 
         verifyTransactionalReplay(context, 10L, "foo", "bar", 1, new 
OffsetAndMetadata(
@@ -2895,7 +2863,8 @@ public class OffsetMetadataManagerTest {
             OptionalInt.empty(),
             "small",
             context.time.milliseconds(),
-            OptionalLong.empty()
+            OptionalLong.empty(),
+            Uuid.ZERO_UUID
         ));
 
         // Delete the offsets.
@@ -2927,7 +2896,8 @@ public class OffsetMetadataManagerTest {
             OptionalInt.empty(),
             "small",
             context.time.milliseconds(),
-            OptionalLong.empty()
+            OptionalLong.empty(),
+            Uuid.ZERO_UUID
         ));
 
         // Add pending transactional commit for producer id 5.
@@ -2937,7 +2907,8 @@ public class OffsetMetadataManagerTest {
             OptionalInt.empty(),
             "small",
             context.time.milliseconds(),
-            OptionalLong.empty()
+            OptionalLong.empty(),
+            Uuid.ZERO_UUID
         ));
 
         // Add pending transactional commit for producer id 6.
@@ -2947,7 +2918,8 @@ public class OffsetMetadataManagerTest {
             OptionalInt.empty(),
             "small",
             context.time.milliseconds(),
-            OptionalLong.empty()
+            OptionalLong.empty(),
+            Uuid.ZERO_UUID
         ));
 
         // Replaying an end marker with an unknown producer id should not fail.
@@ -2971,7 +2943,8 @@ public class OffsetMetadataManagerTest {
             OptionalInt.empty(),
             "small",
             context.time.milliseconds(),
-            OptionalLong.empty()
+            OptionalLong.empty(),
+            Uuid.ZERO_UUID
         ), context.offsetMetadataManager.offset(
             "foo",
             "bar",
@@ -3007,7 +2980,8 @@ public class OffsetMetadataManagerTest {
             OptionalInt.empty(),
             "small",
             context.time.milliseconds(),
-            OptionalLong.empty()
+            OptionalLong.empty(),
+            Uuid.ZERO_UUID
         ));
 
         // Add regular offset commit.
@@ -3017,7 +2991,8 @@ public class OffsetMetadataManagerTest {
             OptionalInt.empty(),
             "small",
             context.time.milliseconds(),
-            OptionalLong.empty()
+            OptionalLong.empty(),
+            Uuid.ZERO_UUID
         ));
 
         // Replaying an end marker to commit transaction of producer id 5.
@@ -3039,7 +3014,8 @@ public class OffsetMetadataManagerTest {
             OptionalInt.empty(),
             "small",
             context.time.milliseconds(),
-            OptionalLong.empty()
+            OptionalLong.empty(),
+            Uuid.ZERO_UUID
         ), context.offsetMetadataManager.offset(
             "foo",
             "bar",
@@ -3058,7 +3034,8 @@ public class OffsetMetadataManagerTest {
             OptionalInt.empty(),
             "small",
             context.time.milliseconds(),
-            OptionalLong.empty()
+            OptionalLong.empty(),
+            Uuid.ZERO_UUID
         ));
 
         // Add pending transactional commit for producer id 5.
@@ -3068,7 +3045,8 @@ public class OffsetMetadataManagerTest {
             OptionalInt.empty(),
             "small",
             context.time.milliseconds(),
-            OptionalLong.empty()
+            OptionalLong.empty(),
+            Uuid.ZERO_UUID
         ));
 
         // Add pending transactional commit for producer id 6.
@@ -3078,7 +3056,8 @@ public class OffsetMetadataManagerTest {
             OptionalInt.empty(),
             "small",
             context.time.milliseconds(),
-            OptionalLong.empty()
+            OptionalLong.empty(),
+            Uuid.ZERO_UUID
         ));
 
         // Commit all the transactions.
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java
index a8b4d639e1c..dfcb415fd3e 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java
@@ -1124,7 +1124,7 @@ public class ClassicGroupTest {
         long currentTimestamp = 30000L;
         long commitTimestamp = 20000L;
         long offsetsRetentionMs = 10000L;
-        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(15000L, 
OptionalInt.empty(), "", commitTimestamp, OptionalLong.empty());
+        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(15000L, 
OptionalInt.empty(), "", commitTimestamp, OptionalLong.empty(), Uuid.ZERO_UUID);
         MockTime time = new MockTime();
         long currentStateTimestamp = time.milliseconds();
         ClassicGroup group = new ClassicGroup(new LogContext(), "groupId", 
EMPTY, time);
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
index 333df21d9c3..a6b91e5a83b 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
@@ -1198,7 +1198,7 @@ public class ConsumerGroupTest {
         long currentTimestamp = 30000L;
         long commitTimestamp = 20000L;
         long offsetsRetentionMs = 10000L;
-        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(15000L, 
OptionalInt.empty(), "", commitTimestamp, OptionalLong.empty());
+        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(15000L, 
OptionalInt.empty(), "", commitTimestamp, OptionalLong.empty(), Uuid.ZERO_UUID);
         ConsumerGroup group = new ConsumerGroup(new SnapshotRegistry(new 
LogContext()), "group-id", mock(GroupCoordinatorMetricsShard.class));
 
         Optional<OffsetExpirationCondition> offsetExpirationCondition = 
group.offsetExpirationCondition();
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
index e9038b33402..99e13bbf155 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
@@ -791,7 +791,7 @@ public class StreamsGroupTest {
         long currentTimestamp = 30000L;
         long commitTimestamp = 20000L;
         long offsetsRetentionMs = 10000L;
-        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(15000L, 
OptionalInt.empty(), "", commitTimestamp, OptionalLong.empty());
+        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(15000L, 
OptionalInt.empty(), "", commitTimestamp, OptionalLong.empty(), Uuid.ZERO_UUID);
         StreamsGroup group = new StreamsGroup(LOG_CONTEXT, new 
SnapshotRegistry(LOG_CONTEXT), "group-id", 
mock(GroupCoordinatorMetricsShard.class));
 
         Optional<OffsetExpirationCondition> offsetExpirationCondition = 
group.offsetExpirationCondition();
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/consumer/OffsetMessageFormatterTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/consumer/OffsetMessageFormatterTest.java
index 331b252dcc4..d774d0fed79 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/consumer/OffsetMessageFormatterTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/consumer/OffsetMessageFormatterTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.tools.consumer;
 
+import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.protocol.MessageUtil;
 import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
 import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
@@ -28,7 +29,6 @@ import java.util.Collections;
 import java.util.stream.Stream;
 
 public class OffsetMessageFormatterTest extends 
CoordinatorRecordMessageFormatterTest {
-
     private static final OffsetCommitKey OFFSET_COMMIT_KEY = new 
OffsetCommitKey()
         .setGroup("group-id")
         .setTopic("foo")
@@ -38,7 +38,8 @@ public class OffsetMessageFormatterTest extends 
CoordinatorRecordMessageFormatte
         .setLeaderEpoch(10)
         .setMetadata("metadata")
         .setCommitTimestamp(1234L)
-        .setExpireTimestamp(5678L);
+        .setExpireTimestamp(5678L)
+        .setTopicId(Uuid.fromString("MKXx1fIkQy2J9jXHhK8m1w"));
     private static final GroupMetadataKey GROUP_METADATA_KEY = new 
GroupMetadataKey().setGroup("group-id");
     private static final GroupMetadataValue GROUP_METADATA_VALUE = new 
GroupMetadataValue()
         .setProtocolType("consumer")
@@ -121,7 +122,8 @@ public class OffsetMessageFormatterTest extends 
CoordinatorRecordMessageFormatte
                               "data":{"offset":100,
                                       "leaderEpoch":10,
                                       "metadata":"metadata",
-                                      "commitTimestamp":1234}}}
+                                      "commitTimestamp":1234,
+                                      "topicId":"MKXx1fIkQy2J9jXHhK8m1w"}}}
                 """
             ),
             Arguments.of(


Reply via email to