This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 71d08780d11 KAFKA-14690; Add TopicId to OffsetCommit API (#19461)
71d08780d11 is described below

commit 71d08780d11b23ec4e931efaa8ca329c03f161e3
Author: David Jacot <[email protected]>
AuthorDate: Wed Apr 23 08:22:09 2025 +0200

    KAFKA-14690; Add TopicId to OffsetCommit API (#19461)
    
    This patch extends the OffsetCommit API to support topic ids. From
    version 10 of the API, topic ids must be used. Originally, we wanted to
    support both using topic ids and topic names from version 10 but it
    turns out that it makes everything more complicated. Hence we propose to
    only support topic ids from version 10. Clients which only support using
    topic names can either lookup the topic ids using the Metadata API or
    stay on using an earlier version.
    
    The patch only contains the server side changes and it keeps the version
    10 as unstable for now. We will mark the version as stable when the
    client side changes are merged in.
    
    Reviewers: Lianet Magrans <[email protected]>, PoAn Yang 
<[email protected]>
---
 .../AlterConsumerGroupOffsetsHandler.java          |   2 +-
 .../consumer/internals/CommitRequestManager.java   |   2 +-
 .../consumer/internals/ConsumerCoordinator.java    |   2 +-
 .../kafka/common/requests/OffsetCommitRequest.java |  33 ++-
 .../common/requests/OffsetCommitResponse.java      | 128 ++++++++--
 .../common/message/OffsetCommitRequest.json        |   9 +-
 .../common/message/OffsetCommitResponse.json       |   9 +-
 .../internals/ConsumerCoordinatorTest.java         |   2 +-
 .../apache/kafka/common/message/MessageTest.java   | 125 ++++------
 .../common/requests/OffsetCommitRequestTest.java   |  11 +-
 .../kafka/common/requests/RequestResponseTest.java |   6 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   |  91 +++----
 .../kafka/api/AuthorizerIntegrationTest.scala      |   2 +-
 .../server/ConsumerProtocolMigrationTest.scala     |   9 +-
 .../kafka/server/DeleteGroupsRequestTest.scala     |   3 +-
 .../server/GroupCoordinatorBaseRequestTest.scala   |  21 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 261 ++++++++++++++++-----
 .../kafka/server/OffsetCommitRequestTest.scala     |  25 +-
 .../kafka/server/OffsetDeleteRequestTest.scala     |   3 +-
 .../unit/kafka/server/OffsetFetchRequestTest.scala |   9 +-
 .../scala/unit/kafka/server/RequestQuotaTest.scala |   2 +-
 .../coordinator/group/OffsetMetadataManager.java   |   8 +-
 .../group/OffsetMetadataManagerTest.java           |  69 ++++++
 23 files changed, 592 insertions(+), 240 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
index 5ef72f327d6..99111a70d4b 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
@@ -108,7 +108,7 @@ public class AlterConsumerGroupOffsetsHandler extends 
AdminApiHandler.Batched<Co
             .setGroupId(groupId.idValue)
             .setTopics(new ArrayList<>(offsetData.values()));
 
-        return new OffsetCommitRequest.Builder(data);
+        return OffsetCommitRequest.Builder.forTopicNames(data);
     }
 
     @Override
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
index 284707a812b..62d1fe3a866 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
@@ -727,7 +727,7 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
                 lastEpochSentOnCommit = Optional.empty();
             }
 
-            OffsetCommitRequest.Builder builder = new 
OffsetCommitRequest.Builder(data);
+            OffsetCommitRequest.Builder builder = 
OffsetCommitRequest.Builder.forTopicNames(data);
 
             return buildRequestWithResponseHandling(builder);
         }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 01fc605ea79..1cba10ef15d 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -1327,7 +1327,7 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
             groupInstanceId = null;
         }
 
-        OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(
+        OffsetCommitRequest.Builder builder = 
OffsetCommitRequest.Builder.forTopicNames(
                 new OffsetCommitRequestData()
                         .setGroupId(this.rebalanceConfig.groupId)
                         .setGenerationIdOrMemberEpoch(generation.generationId)
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
index 8f6ab39d1fc..1bd9c41f668 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.OffsetCommitRequestData;
 import 
org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestTopic;
@@ -45,20 +46,39 @@ public class OffsetCommitRequest extends AbstractRequest {
 
         private final OffsetCommitRequestData data;
 
-        public Builder(OffsetCommitRequestData data, boolean 
enableUnstableLastVersion) {
-            super(ApiKeys.OFFSET_COMMIT, enableUnstableLastVersion);
+        private Builder(OffsetCommitRequestData data, short 
oldestAllowedVersion, short latestAllowedVersion) {
+            super(ApiKeys.OFFSET_COMMIT, oldestAllowedVersion, 
latestAllowedVersion);
             this.data = data;
         }
 
-        public Builder(OffsetCommitRequestData data) {
-            this(data, false);
+        public static Builder forTopicIdsOrNames(OffsetCommitRequestData data, 
boolean enableUnstableLastVersion) {
+            return new Builder(data, ApiKeys.OFFSET_COMMIT.oldestVersion(), 
ApiKeys.OFFSET_COMMIT.latestVersion(enableUnstableLastVersion));
+        }
+
+        public static Builder forTopicNames(OffsetCommitRequestData data) {
+            return new Builder(data, ApiKeys.OFFSET_COMMIT.oldestVersion(), 
(short) 9);
         }
 
         @Override
         public OffsetCommitRequest build(short version) {
             if (data.groupInstanceId() != null && version < 7) {
-                throw new UnsupportedVersionException("The broker offset 
commit protocol version " +
-                        version + " does not support usage of config 
group.instance.id.");
+                throw new UnsupportedVersionException("The broker offset 
commit api version " +
+                    version + " does not support usage of config 
group.instance.id.");
+            }
+            if (version >= 10) {
+                data.topics().forEach(topic -> {
+                    if (topic.topicId() == null || 
topic.topicId().equals(Uuid.ZERO_UUID)) {
+                        throw new UnsupportedVersionException("The broker 
offset commit api version " +
+                            version + " does require usage of topic ids.");
+                    }
+                });
+            } else {
+                data.topics().forEach(topic -> {
+                    if (topic.name() == null || topic.name().isEmpty()) {
+                        throw new UnsupportedVersionException("The broker 
offset commit api version " +
+                            version + " does require usage of topic names.");
+                    }
+                });
             }
             return new OffsetCommitRequest(data, version);
         }
@@ -97,6 +117,7 @@ public class OffsetCommitRequest extends AbstractRequest {
         OffsetCommitResponseData response = new OffsetCommitResponseData();
         request.topics().forEach(topic -> {
             OffsetCommitResponseTopic responseTopic = new 
OffsetCommitResponseTopic()
+                .setTopicId(topic.topicId())
                 .setName(topic.name());
             response.topics().add(responseTopic);
 
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
index 2b6d00b1a47..a4d740c06f9 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.message.OffsetCommitResponseData;
 import 
org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponsePartition;
 import 
org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponseTopic;
@@ -123,43 +124,56 @@ public class OffsetCommitResponse extends 
AbstractResponse {
         return version >= 4;
     }
 
-    public static class Builder {
-        OffsetCommitResponseData data = new OffsetCommitResponseData();
-        HashMap<String, OffsetCommitResponseTopic> byTopicName = new 
HashMap<>();
+    public static boolean useTopicIds(short version) {
+        return version >= 10;
+    }
 
-        private OffsetCommitResponseTopic getOrCreateTopic(
-            String topicName
-        ) {
-            OffsetCommitResponseTopic topic = byTopicName.get(topicName);
-            if (topic == null) {
-                topic = new OffsetCommitResponseTopic().setName(topicName);
-                data.topics().add(topic);
-                byTopicName.put(topicName, topic);
-            }
-            return topic;
+    public static Builder newBuilder(boolean useTopicIds) {
+        if (useTopicIds) {
+            return new TopicIdBuilder();
+        } else {
+            return new TopicNameBuilder();
         }
+    }
+
+    public abstract static class Builder {
+        protected OffsetCommitResponseData data = new 
OffsetCommitResponseData();
+
+        protected abstract void add(
+            OffsetCommitResponseTopic topic
+        );
+
+        protected abstract OffsetCommitResponseTopic get(
+            Uuid topicId,
+            String topicName
+        );
+
+        protected abstract OffsetCommitResponseTopic getOrCreate(
+            Uuid topicId,
+            String topicName
+        );
 
         public Builder addPartition(
+            Uuid topicId,
             String topicName,
             int partitionIndex,
             Errors error
         ) {
-            final OffsetCommitResponseTopic topicResponse = 
getOrCreateTopic(topicName);
-
+            final OffsetCommitResponseTopic topicResponse = 
getOrCreate(topicId, topicName);
             topicResponse.partitions().add(new OffsetCommitResponsePartition()
                 .setPartitionIndex(partitionIndex)
                 .setErrorCode(error.code()));
-
             return this;
         }
 
         public <P> Builder addPartitions(
+            Uuid topicId,
             String topicName,
             List<P> partitions,
             Function<P, Integer> partitionIndex,
             Errors error
         ) {
-            final OffsetCommitResponseTopic topicResponse = 
getOrCreateTopic(topicName);
+            final OffsetCommitResponseTopic topicResponse = 
getOrCreate(topicId, topicName);
             partitions.forEach(partition ->
                 topicResponse.partitions().add(new 
OffsetCommitResponsePartition()
                     .setPartitionIndex(partitionIndex.apply(partition))
@@ -177,11 +191,10 @@ public class OffsetCommitResponse extends 
AbstractResponse {
             } else {
                 // Otherwise, we have to merge them together.
                 newData.topics().forEach(newTopic -> {
-                    OffsetCommitResponseTopic existingTopic = 
byTopicName.get(newTopic.name());
+                    OffsetCommitResponseTopic existingTopic = 
get(newTopic.topicId(), newTopic.name());
                     if (existingTopic == null) {
                         // If no topic exists, we can directly copy the new 
topic data.
-                        data.topics().add(newTopic);
-                        byTopicName.put(newTopic.name(), newTopic);
+                        add(newTopic);
                     } else {
                         // Otherwise, we add the partitions to the existing 
one. Note we
                         // expect non-overlapping partitions here as we don't 
verify
@@ -190,7 +203,6 @@ public class OffsetCommitResponse extends AbstractResponse {
                     }
                 });
             }
-
             return this;
         }
 
@@ -198,4 +210,78 @@ public class OffsetCommitResponse extends AbstractResponse 
{
             return new OffsetCommitResponse(data);
         }
     }
+
+    public static class TopicIdBuilder extends Builder {
+        private final HashMap<Uuid, OffsetCommitResponseTopic> byTopicId = new 
HashMap<>();
+
+        @Override
+        protected void add(OffsetCommitResponseTopic topic) {
+            throwIfTopicIdIsNull(topic.topicId());
+            data.topics().add(topic);
+            byTopicId.put(topic.topicId(), topic);
+        }
+
+        @Override
+        protected OffsetCommitResponseTopic get(Uuid topicId, String 
topicName) {
+            throwIfTopicIdIsNull(topicId);
+            return byTopicId.get(topicId);
+        }
+
+        @Override
+        protected OffsetCommitResponseTopic getOrCreate(Uuid topicId, String 
topicName) {
+            throwIfTopicIdIsNull(topicId);
+            OffsetCommitResponseTopic topic = byTopicId.get(topicId);
+            if (topic == null) {
+                topic = new OffsetCommitResponseTopic()
+                    .setName(topicName)
+                    .setTopicId(topicId);
+                data.topics().add(topic);
+                byTopicId.put(topicId, topic);
+            }
+            return topic;
+        }
+
+        private static void throwIfTopicIdIsNull(Uuid topicId) {
+            if (topicId == null) {
+                throw new IllegalArgumentException("TopicId cannot be null.");
+            }
+        }
+    }
+
+    public static class TopicNameBuilder extends Builder {
+        private final HashMap<String, OffsetCommitResponseTopic> byTopicName = 
new HashMap<>();
+
+        @Override
+        protected void add(OffsetCommitResponseTopic topic) {
+            throwIfTopicNameIsNull(topic.name());
+            data.topics().add(topic);
+            byTopicName.put(topic.name(), topic);
+        }
+
+        @Override
+        protected OffsetCommitResponseTopic get(Uuid topicId, String 
topicName) {
+            throwIfTopicNameIsNull(topicName);
+            return byTopicName.get(topicName);
+        }
+
+        @Override
+        protected OffsetCommitResponseTopic getOrCreate(Uuid topicId, String 
topicName) {
+            throwIfTopicNameIsNull(topicName);
+            OffsetCommitResponseTopic topic = byTopicName.get(topicName);
+            if (topic == null) {
+                topic = new OffsetCommitResponseTopic()
+                    .setName(topicName)
+                    .setTopicId(topicId);
+                data.topics().add(topic);
+                byTopicName.put(topicName, topic);
+            }
+            return topic;
+        }
+
+        private void throwIfTopicNameIsNull(String topicName) {
+            if (topicName == null) {
+                throw new IllegalArgumentException("TopicName cannot be 
null.");
+            }
+        }
+    }
 }
diff --git a/clients/src/main/resources/common/message/OffsetCommitRequest.json 
b/clients/src/main/resources/common/message/OffsetCommitRequest.json
index 348ed2b90c5..ba3c12f0e2b 100644
--- a/clients/src/main/resources/common/message/OffsetCommitRequest.json
+++ b/clients/src/main/resources/common/message/OffsetCommitRequest.json
@@ -36,8 +36,11 @@
   //
   // Version 9 is the first version that can be used with the new consumer 
group protocol (KIP-848). The
   // request is the same as version 8.
-  "validVersions": "2-9",
+  //
+  // Version 10 adds support for topic ids and removes support for topic names 
(KIP-848).
+  "validVersions": "2-10",
   "flexibleVersions": "8+",
+  "latestVersionUnstable": true,
   "fields": [
     { "name": "GroupId", "type": "string", "versions": "0+", "entityType": 
"groupId",
       "about": "The unique group identifier." },
@@ -52,8 +55,10 @@
       "about": "The time period in ms to retain the offset." },
     { "name": "Topics", "type": "[]OffsetCommitRequestTopic", "versions": "0+",
       "about": "The topics to commit offsets for.",  "fields": [
-      { "name": "Name", "type": "string", "versions": "0+", "entityType": 
"topicName",
+      { "name": "Name", "type": "string", "versions": "0-9", "entityType": 
"topicName", "ignorable": true,
         "about": "The topic name." },
+      { "name":  "TopicId", "type": "uuid", "versions": "10+", "ignorable": 
true,
+        "about": "The topic ID." },
       { "name": "Partitions", "type": "[]OffsetCommitRequestPartition", 
"versions": "0+",
         "about": "Each partition to commit offsets for.", "fields": [
         { "name": "PartitionIndex", "type": "int32", "versions": "0+",
diff --git 
a/clients/src/main/resources/common/message/OffsetCommitResponse.json 
b/clients/src/main/resources/common/message/OffsetCommitResponse.json
index 0cccd64816c..0228733ce6b 100644
--- a/clients/src/main/resources/common/message/OffsetCommitResponse.json
+++ b/clients/src/main/resources/common/message/OffsetCommitResponse.json
@@ -34,7 +34,9 @@
   // Version 9 is the first version that can be used with the new consumer 
group protocol (KIP-848). The response is
   // the same as version 8 but can return STALE_MEMBER_EPOCH when the new 
consumer group protocol is used and
   // GROUP_ID_NOT_FOUND when the group does not exist for both protocols.
-  "validVersions": "2-9",
+  //
+  // Version 10 adds support for topic ids and removes support for topic names 
(KIP-848).
+  "validVersions": "2-10",
   "flexibleVersions": "8+",
   // Supported errors:
   // - GROUP_AUTHORIZATION_FAILED (version 0+)
@@ -47,13 +49,16 @@
   // - FENCED_MEMBER_EPOCH (version 7+)
   // - GROUP_ID_NOT_FOUND (version 9+)
   // - STALE_MEMBER_EPOCH (version 9+)
+  // - UNKNOWN_TOPIC_ID (version 10+)
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "3+", 
"ignorable": true,
       "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },
     { "name": "Topics", "type": "[]OffsetCommitResponseTopic", "versions": 
"0+",
       "about": "The responses for each topic.", "fields": [
-      { "name": "Name", "type": "string", "versions": "0+", "entityType": 
"topicName",
+      { "name": "Name", "type": "string", "versions": "0-9", "entityType": 
"topicName", "ignorable": true,
         "about": "The topic name." },
+      { "name":  "TopicId", "type": "uuid", "versions": "10+", "ignorable": 
true,
+        "about": "The topic ID." },
       { "name": "Partitions", "type": "[]OffsetCommitResponsePartition", 
"versions": "0+",
         "about": "The responses for each partition in the topic.",  "fields": [
         { "name": "PartitionIndex", "type": "int32", "versions": "0+",
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 5c9e06ff90d..a34f2f16337 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -682,7 +682,7 @@ public abstract class ConsumerCoordinatorTest {
                         )
                 );
 
-        consumerClient.send(coordinator.checkAndGetCoordinator(), new 
OffsetCommitRequest.Builder(offsetCommitRequestData))
+        consumerClient.send(coordinator.checkAndGetCoordinator(), 
OffsetCommitRequest.Builder.forTopicNames(offsetCommitRequestData))
                 .compose(new RequestFutureAdapter<ClientResponse, Object>() {
                     @Override
                     public void onSuccess(ClientResponse value, 
RequestFuture<Object> future) {}
diff --git 
a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java 
b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
index b28b8274f58..4674bf2013e 100644
--- a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
@@ -56,11 +56,13 @@ import org.apache.kafka.common.protocol.Message;
 import org.apache.kafka.common.protocol.MessageUtil;
 import org.apache.kafka.common.protocol.ObjectSerializationCache;
 import org.apache.kafka.common.protocol.types.RawTaggedField;
+import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
 
 import com.fasterxml.jackson.databind.JsonNode;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
 
 import java.lang.reflect.Method;
 import java.nio.ByteBuffer;
@@ -409,90 +411,49 @@ public final class MessageTest {
                 new OffsetForLeaderEpochRequestData().setReplicaId(-2));
     }
 
-    @Test
-    public void testOffsetCommitRequestVersions() throws Exception {
-        String groupId = "groupId";
-        String topicName = "topic";
-        String metadata = "metadata";
-        int partition = 2;
-        int offset = 100;
-
-        testAllMessageRoundTrips(new OffsetCommitRequestData()
-                                     .setGroupId(groupId)
-                                     .setTopics(Collections.singletonList(
-                                         new OffsetCommitRequestTopic()
-                                             .setName(topicName)
-                                             
.setPartitions(Collections.singletonList(
-                                                 new 
OffsetCommitRequestPartition()
-                                                     
.setPartitionIndex(partition)
-                                                     
.setCommittedMetadata(metadata)
-                                                     
.setCommittedOffset(offset)
-                                             )))));
-
-        Supplier<OffsetCommitRequestData> request =
-            () -> new OffsetCommitRequestData()
-                      .setGroupId(groupId)
-                      .setMemberId("memberId")
-                      .setGroupInstanceId("instanceId")
-                      .setTopics(Collections.singletonList(
-                          new OffsetCommitRequestTopic()
-                              .setName(topicName)
-                              .setPartitions(Collections.singletonList(
-                                  new OffsetCommitRequestPartition()
-                                      .setPartitionIndex(partition)
-                                      .setCommittedLeaderEpoch(10)
-                                      .setCommittedMetadata(metadata)
-                                      .setCommittedOffset(offset)
-                            ))))
-                    .setRetentionTimeMs(20);
-
-        for (short version : ApiKeys.OFFSET_COMMIT.allVersions()) {
-            OffsetCommitRequestData requestData = request.get();
-
-            if (version > 4) {
-                requestData.setRetentionTimeMs(-1);
-            }
-
-            if (version < 6) {
-                
requestData.topics().get(0).partitions().get(0).setCommittedLeaderEpoch(-1);
-            }
-
-            if (version < 7) {
-                requestData.setGroupInstanceId(null);
-            }
-
-            if (version >= 2 && version <= 4) {
-                testAllMessageRoundTripsBetweenVersions(version, (short) 5, 
requestData, requestData);
-            } else {
-                testAllMessageRoundTripsFromVersion(version, requestData);
-            }
-        }
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+    public void testOffsetCommitRequestVersions(short version) throws 
Exception {
+        OffsetCommitRequestData request = new OffsetCommitRequestData()
+            .setGroupId("groupId")
+            .setMemberId("memberId")
+            .setGenerationIdOrMemberEpoch(version >= 1 ? 10 : -1)
+            .setGroupInstanceId(version >= 7 ? "instanceId" : null)
+            .setRetentionTimeMs((version >= 2 && version <= 4) ? 20 : -1)
+            .setTopics(singletonList(
+                new OffsetCommitRequestTopic()
+                    .setTopicId(version >= 10 ? Uuid.randomUuid() : 
Uuid.ZERO_UUID)
+                    .setName(version < 10 ? "topic" : "")
+                    .setPartitions(singletonList(
+                        new OffsetCommitRequestPartition()
+                            .setPartitionIndex(1)
+                            .setCommittedMetadata("metadata")
+                            .setCommittedOffset(100)
+                            .setCommittedLeaderEpoch(version >= 6 ? 10 : -1)
+
+                    ))
+            ));
+
+        testMessageRoundTrip(version, request, request);
     }
 
-    @Test
-    public void testOffsetCommitResponseVersions() throws Exception {
-        Supplier<OffsetCommitResponseData> response =
-            () -> new OffsetCommitResponseData()
-                      .setTopics(
-                          singletonList(
-                              new OffsetCommitResponseTopic()
-                                  .setName("topic")
-                                  .setPartitions(singletonList(
-                                      new OffsetCommitResponsePartition()
-                                          .setPartitionIndex(1)
-                                          
.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
-                                  ))
-                          )
-                      )
-                      .setThrottleTimeMs(20);
-
-        for (short version : ApiKeys.OFFSET_COMMIT.allVersions()) {
-            OffsetCommitResponseData responseData = response.get();
-            if (version < 3) {
-                responseData.setThrottleTimeMs(0);
-            }
-            testAllMessageRoundTripsFromVersion(version, responseData);
-        }
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+    public void testOffsetCommitResponseVersions(short version) throws 
Exception {
+        OffsetCommitResponseData response = new OffsetCommitResponseData()
+            .setThrottleTimeMs(version >= 3 ? 20 : 0)
+            .setTopics(singletonList(
+                new OffsetCommitResponseTopic()
+                    .setTopicId(version >= 10 ? Uuid.randomUuid() : 
Uuid.ZERO_UUID)
+                    .setName(version < 10 ? "topic" : "")
+                    .setPartitions(singletonList(
+                        new OffsetCommitResponsePartition()
+                            .setPartitionIndex(1)
+                            .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
+                    ))
+            ));
+
+        testMessageRoundTrip(version, response, response);
     }
 
     @Test
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/OffsetCommitRequestTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/OffsetCommitRequestTest.java
index 161a4dd5f11..9cd95cfec76 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/OffsetCommitRequestTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/OffsetCommitRequestTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.OffsetCommitRequestData;
 import 
org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestPartition;
@@ -45,6 +46,8 @@ public class OffsetCommitRequestTest {
     protected static String groupId = "groupId";
     protected static String memberId = "consumerId";
     protected static String groupInstanceId = "groupInstanceId";
+    protected static Uuid topicIdOne = Uuid.randomUuid();
+    protected static Uuid topicIdTwo = Uuid.randomUuid();
     protected static String topicOne = "topicOne";
     protected static String topicTwo = "topicTwo";
     protected static int partitionOne = 1;
@@ -61,6 +64,7 @@ public class OffsetCommitRequestTest {
     public void setUp() {
         List<OffsetCommitRequestTopic> topics = Arrays.asList(
             new OffsetCommitRequestTopic()
+                .setTopicId(topicIdOne)
                 .setName(topicOne)
                 .setPartitions(Collections.singletonList(
                     new OffsetCommitRequestPartition()
@@ -70,6 +74,7 @@ public class OffsetCommitRequestTest {
                         .setCommittedMetadata(metadata)
                 )),
             new OffsetCommitRequestTopic()
+                .setTopicId(topicIdTwo)
                 .setName(topicTwo)
                 .setPartitions(Collections.singletonList(
                     new OffsetCommitRequestPartition()
@@ -90,7 +95,7 @@ public class OffsetCommitRequestTest {
         expectedOffsets.put(new TopicPartition(topicOne, partitionOne), 
offset);
         expectedOffsets.put(new TopicPartition(topicTwo, partitionTwo), 
offset);
 
-        OffsetCommitRequest.Builder builder = new 
OffsetCommitRequest.Builder(data);
+        OffsetCommitRequest.Builder builder = 
OffsetCommitRequest.Builder.forTopicNames(data);
 
         for (short version : ApiKeys.OFFSET_COMMIT.allVersions()) {
             OffsetCommitRequest request = builder.build(version);
@@ -105,7 +110,7 @@ public class OffsetCommitRequestTest {
 
     @Test
     public void testVersionSupportForGroupInstanceId() {
-        OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(
+        OffsetCommitRequest.Builder builder = 
OffsetCommitRequest.Builder.forTopicNames(
             new OffsetCommitRequestData()
                 .setGroupId(groupId)
                 .setMemberId(memberId)
@@ -127,12 +132,14 @@ public class OffsetCommitRequestTest {
         OffsetCommitResponseData expectedResponse = new 
OffsetCommitResponseData()
             .setTopics(Arrays.asList(
                 new OffsetCommitResponseTopic()
+                    .setTopicId(topicIdOne)
                     .setName(topicOne)
                     .setPartitions(Collections.singletonList(
                         new OffsetCommitResponsePartition()
                             .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
                             .setPartitionIndex(partitionOne))),
                 new OffsetCommitResponseTopic()
+                    .setTopicId(topicIdTwo)
                     .setName(topicTwo)
                     .setPartitions(Collections.singletonList(
                         new OffsetCommitResponsePartition()
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index b33dec17d9a..5d1e853e691 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -329,6 +329,8 @@ import static org.junit.jupiter.api.Assertions.fail;
 // This class performs tests requests and responses for all API keys
 public class RequestResponseTest {
 
+    private static final Uuid TOPIC_ID = Uuid.randomUuid();
+
     // Exception includes a message that we verify is not included in error 
responses
     private final UnknownServerException unknownServerException = new 
UnknownServerException("secret");
 
@@ -2401,7 +2403,7 @@ public class RequestResponseTest {
     }
 
     private OffsetCommitRequest createOffsetCommitRequest(short version) {
-        return new OffsetCommitRequest.Builder(new OffsetCommitRequestData()
+        return OffsetCommitRequest.Builder.forTopicNames(new 
OffsetCommitRequestData()
                 .setGroupId("group1")
                 .setMemberId("consumer1")
                 .setGroupInstanceId(null)
@@ -2409,6 +2411,7 @@ public class RequestResponseTest {
                 .setTopics(singletonList(
                         new OffsetCommitRequestData.OffsetCommitRequestTopic()
                                 .setName("test")
+                                .setTopicId(TOPIC_ID)
                                 .setPartitions(asList(
                                         new 
OffsetCommitRequestData.OffsetCommitRequestPartition()
                                                 .setPartitionIndex(0)
@@ -2430,6 +2433,7 @@ public class RequestResponseTest {
                 .setTopics(singletonList(
                         new 
OffsetCommitResponseData.OffsetCommitResponseTopic()
                                 .setName("test")
+                                .setTopicId(TOPIC_ID)
                                 .setPartitions(singletonList(
                                         new 
OffsetCommitResponseData.OffsetCommitResponsePartition()
                                                 .setPartitionIndex(0)
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 6a22963ac7d..ca8cc311b96 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -275,11 +275,21 @@ class KafkaApis(val requestChannel: RequestChannel,
   ): CompletableFuture[Unit] = {
     val offsetCommitRequest = request.body[OffsetCommitRequest]
 
-    // Reject the request if not authorized to the group
+    // Reject the request if not authorized to the group.
     if (!authHelper.authorize(request.context, READ, GROUP, 
offsetCommitRequest.data.groupId)) {
       requestHelper.sendMaybeThrottle(request, 
offsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
       CompletableFuture.completedFuture[Unit](())
     } else {
+      val useTopicIds = 
OffsetCommitResponse.useTopicIds(request.header.apiVersion)
+
+      if (useTopicIds) {
+        offsetCommitRequest.data.topics.forEach { topic =>
+          if (topic.topicId != Uuid.ZERO_UUID) {
+            metadataCache.getTopicName(topic.topicId).ifPresent(name => 
topic.setName(name))
+          }
+        }
+      }
+
       val authorizedTopics = authHelper.filterByAuthorized(
         request.context,
         READ,
@@ -287,28 +297,40 @@ class KafkaApis(val requestChannel: RequestChannel,
         offsetCommitRequest.data.topics.asScala
       )(_.name)
 
-      val responseBuilder = new OffsetCommitResponse.Builder()
+      val responseBuilder = OffsetCommitResponse.newBuilder(useTopicIds)
       val authorizedTopicsRequest = new 
mutable.ArrayBuffer[OffsetCommitRequestData.OffsetCommitRequestTopic]()
       offsetCommitRequest.data.topics.forEach { topic =>
-        if (!authorizedTopics.contains(topic.name)) {
+        if (useTopicIds && topic.name.isEmpty) {
+          // If the topic name is undefined, it means that the topic id is 
unknown so we add
+          // the topic and all its partitions to the response with 
UNKNOWN_TOPIC_ID.
+          
responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition](
+            topic.topicId, topic.name, topic.partitions, _.partitionIndex, 
Errors.UNKNOWN_TOPIC_ID)
+        } else if (!authorizedTopics.contains(topic.name)) {
           // If the topic is not authorized, we add the topic and all its 
partitions
           // to the response with TOPIC_AUTHORIZATION_FAILED.
           
responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition](
-            topic.name, topic.partitions, _.partitionIndex, 
Errors.TOPIC_AUTHORIZATION_FAILED)
+            topic.topicId, topic.name, topic.partitions, _.partitionIndex, 
Errors.TOPIC_AUTHORIZATION_FAILED)
         } else if (!metadataCache.contains(topic.name)) {
           // If the topic is unknown, we add the topic and all its partitions
           // to the response with UNKNOWN_TOPIC_OR_PARTITION.
           
responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition](
-            topic.name, topic.partitions, _.partitionIndex, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)
+            topic.topicId, topic.name, topic.partitions, _.partitionIndex, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)
         } else {
           // Otherwise, we check all partitions to ensure that they all exist.
-          val topicWithValidPartitions = new 
OffsetCommitRequestData.OffsetCommitRequestTopic().setName(topic.name)
+          val topicWithValidPartitions = new 
OffsetCommitRequestData.OffsetCommitRequestTopic()
+            .setTopicId(topic.topicId)
+            .setName(topic.name)
 
           topic.partitions.forEach { partition =>
-            if (metadataCache.getLeaderAndIsr(topic.name, 
partition.partitionIndex).isPresent()) {
+            if (metadataCache.getLeaderAndIsr(topic.name, 
partition.partitionIndex).isPresent) {
               topicWithValidPartitions.partitions.add(partition)
             } else {
-              responseBuilder.addPartition(topic.name, 
partition.partitionIndex, Errors.UNKNOWN_TOPIC_OR_PARTITION)
+              responseBuilder.addPartition(
+                topic.topicId,
+                topic.name,
+                partition.partitionIndex,
+                Errors.UNKNOWN_TOPIC_OR_PARTITION
+              )
             }
           }
 
@@ -322,42 +344,23 @@ class KafkaApis(val requestChannel: RequestChannel,
         requestHelper.sendMaybeThrottle(request, responseBuilder.build())
         CompletableFuture.completedFuture(())
       } else {
-        // For version > 0, store offsets in Coordinator.
-        commitOffsetsToCoordinator(
-          request,
-          offsetCommitRequest,
-          authorizedTopicsRequest,
-          responseBuilder,
-          requestLocal
-        )
-      }
-    }
-  }
-
-  private def commitOffsetsToCoordinator(
-    request: RequestChannel.Request,
-    offsetCommitRequest: OffsetCommitRequest,
-    authorizedTopicsRequest: 
mutable.ArrayBuffer[OffsetCommitRequestData.OffsetCommitRequestTopic],
-    responseBuilder: OffsetCommitResponse.Builder,
-    requestLocal: RequestLocal
-  ): CompletableFuture[Unit] = {
-    val offsetCommitRequestData = new OffsetCommitRequestData()
-      .setGroupId(offsetCommitRequest.data.groupId)
-      .setMemberId(offsetCommitRequest.data.memberId)
-      
.setGenerationIdOrMemberEpoch(offsetCommitRequest.data.generationIdOrMemberEpoch)
-      .setRetentionTimeMs(offsetCommitRequest.data.retentionTimeMs)
-      .setGroupInstanceId(offsetCommitRequest.data.groupInstanceId)
-      .setTopics(authorizedTopicsRequest.asJava)
-
-    groupCoordinator.commitOffsets(
-      request.context,
-      offsetCommitRequestData,
-      requestLocal.bufferSupplier
-    ).handle[Unit] { (results, exception) =>
-      if (exception != null) {
-        requestHelper.sendMaybeThrottle(request, 
offsetCommitRequest.getErrorResponse(exception))
-      } else {
-        requestHelper.sendMaybeThrottle(request, 
responseBuilder.merge(results).build())
+        groupCoordinator.commitOffsets(
+          request.context,
+          new OffsetCommitRequestData()
+            .setGroupId(offsetCommitRequest.data.groupId)
+            .setMemberId(offsetCommitRequest.data.memberId)
+            
.setGenerationIdOrMemberEpoch(offsetCommitRequest.data.generationIdOrMemberEpoch)
+            .setRetentionTimeMs(offsetCommitRequest.data.retentionTimeMs)
+            .setGroupInstanceId(offsetCommitRequest.data.groupInstanceId)
+            .setTopics(authorizedTopicsRequest.asJava),
+          requestLocal.bufferSupplier
+        ).handle[Unit] { (results, exception) =>
+          if (exception != null) {
+            requestHelper.sendMaybeThrottle(request, 
offsetCommitRequest.getErrorResponse(exception))
+          } else {
+            requestHelper.sendMaybeThrottle(request, 
responseBuilder.merge(results).build())
+          }
+        }
       }
     }
   }
diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 7fccbdc9e28..e9e476fcd12 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -372,7 +372,7 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
   }
 
   private def createOffsetCommitRequest = {
-    new requests.OffsetCommitRequest.Builder(
+    requests.OffsetCommitRequest.Builder.forTopicNames(
         new OffsetCommitRequestData()
           .setGroupId(group)
           .setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID)
diff --git 
a/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala 
b/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala
index 37c81ce20e5..8f5f759250b 100644
--- a/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala
@@ -690,7 +690,7 @@ class ConsumerProtocolMigrationTest(cluster: 
ClusterInstance) extends GroupCoord
 
     val topicName = "foo"
     // Create the topic.
-    createTopic(
+    val topicId = createTopic(
       topic = topicName,
       numPartitions = 3
     )
@@ -702,6 +702,7 @@ class ConsumerProtocolMigrationTest(cluster: 
ClusterInstance) extends GroupCoord
       memberId = "member-id",
       memberEpoch = -1,
       topic = topicName,
+      topicId = topicId,
       partition = 0,
       offset = 1000L,
       expectedError = Errors.NONE,
@@ -765,7 +766,7 @@ class ConsumerProtocolMigrationTest(cluster: 
ClusterInstance) extends GroupCoord
     createOffsetsTopic()
 
     // Create the topic.
-    createTopic(
+    val topicId = createTopic(
       topic = "foo",
       numPartitions = 3
     )
@@ -865,6 +866,7 @@ class ConsumerProtocolMigrationTest(cluster: 
ClusterInstance) extends GroupCoord
           memberId = memberId1,
           memberEpoch = 1,
           topic = "foo",
+          topicId = topicId,
           partition = partitionId,
           offset = 100L + 10 * version + partitionId,
           expectedError = Errors.NONE,
@@ -1096,7 +1098,7 @@ class ConsumerProtocolMigrationTest(cluster: 
ClusterInstance) extends GroupCoord
     createOffsetsTopic()
 
     // Create the topic.
-    createTopic(
+    val topicId = createTopic(
       topic = "foo",
       numPartitions = 3
     )
@@ -1164,6 +1166,7 @@ class ConsumerProtocolMigrationTest(cluster: 
ClusterInstance) extends GroupCoord
           memberId = memberId1,
           memberEpoch = 1,
           topic = "foo",
+          topicId = topicId,
           partition = partitionId,
           offset = 100L + 10 * version + partitionId,
           expectedError = Errors.NONE,
diff --git 
a/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala
index 88733a86576..fe4501e640a 100644
--- a/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala
@@ -48,7 +48,7 @@ class DeleteGroupsRequestTest(cluster: ClusterInstance) 
extends GroupCoordinator
     createOffsetsTopic()
 
     // Create the topic.
-    createTopic(
+    val topicId = createTopic(
       topic = "foo",
       numPartitions = 3
     )
@@ -89,6 +89,7 @@ class DeleteGroupsRequestTest(cluster: ClusterInstance) 
extends GroupCoordinator
           memberId = memberId,
           memberEpoch = memberEpoch,
           topic = "foo",
+          topicId = topicId,
           partition = 0,
           offset = 100L,
           expectedError = Errors.GROUP_ID_NOT_FOUND,
diff --git 
a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
index c9b1dda51a5..be96826858a 100644
--- 
a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
@@ -19,7 +19,7 @@ package kafka.server
 import kafka.network.SocketServer
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, 
RecordMetadata}
-import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
+import org.apache.kafka.common.{TopicCollection, TopicIdPartition, 
TopicPartition, Uuid}
 import 
org.apache.kafka.common.message.DeleteGroupsResponseData.{DeletableGroupResult, 
DeletableGroupResultCollection}
 import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
 import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse
@@ -75,7 +75,7 @@ class GroupCoordinatorBaseRequestTest(cluster: 
ClusterInstance) {
   protected def createTopic(
     topic: String,
     numPartitions: Int
-  ): Unit = {
+  ): Uuid = {
     val admin = cluster.admin()
     try {
       TestUtils.createTopicWithAdmin(
@@ -85,6 +85,12 @@ class GroupCoordinatorBaseRequestTest(cluster: 
ClusterInstance) {
         topic = topic,
         numPartitions = numPartitions
       )
+      admin
+        .describeTopics(TopicCollection.ofTopicNames(List(topic).asJava))
+        .allTopicNames()
+        .get()
+        .get(topic)
+        .topicId()
     } finally {
       admin.close()
     }
@@ -166,18 +172,24 @@ class GroupCoordinatorBaseRequestTest(cluster: 
ClusterInstance) {
     memberId: String,
     memberEpoch: Int,
     topic: String,
+    topicId: Uuid,
     partition: Int,
     offset: Long,
     expectedError: Errors,
     version: Short = ApiKeys.OFFSET_COMMIT.latestVersion(isUnstableApiEnabled)
   ): Unit = {
-    val request = new OffsetCommitRequest.Builder(
+    if (version >= 10 && topicId == Uuid.ZERO_UUID) {
+      throw new IllegalArgumentException(s"Cannot call OffsetCommit API 
version $version without a topic id")
+    }
+
+    val request = OffsetCommitRequest.Builder.forTopicIdsOrNames(
       new OffsetCommitRequestData()
         .setGroupId(groupId)
         .setMemberId(memberId)
         .setGenerationIdOrMemberEpoch(memberEpoch)
         .setTopics(List(
           new OffsetCommitRequestData.OffsetCommitRequestTopic()
+            .setTopicId(topicId)
             .setName(topic)
             .setPartitions(List(
               new OffsetCommitRequestData.OffsetCommitRequestPartition()
@@ -191,7 +203,8 @@ class GroupCoordinatorBaseRequestTest(cluster: 
ClusterInstance) {
     val expectedResponse = new OffsetCommitResponseData()
       .setTopics(List(
         new OffsetCommitResponseData.OffsetCommitResponseTopic()
-          .setName(topic)
+          .setTopicId(if (version >= 10) topicId else Uuid.ZERO_UUID)
+          .setName(if (version < 10) topic else "")
           .setPartitions(List(
             new OffsetCommitResponseData.OffsetCommitResponsePartition()
               .setPartitionIndex(partition)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 02541097d4c..42c1131af61 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -992,27 +992,43 @@ class KafkaApisTest extends Logging {
     )
   }
 
-  @Test
-  def testHandleOffsetCommitRequest(): Unit = {
-    addTopicToMetadataCache("foo", numPartitions = 1)
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+  def testHandleOffsetCommitRequest(version: Short): Unit = {
+    val topicName = "foo"
+    val topicId = Uuid.randomUuid()
+    addTopicToMetadataCache(topicName, topicId = topicId, numPartitions = 1)
 
     val offsetCommitRequest = new OffsetCommitRequestData()
       .setGroupId("group")
       .setMemberId("member")
       .setTopics(List(
         new OffsetCommitRequestData.OffsetCommitRequestTopic()
-          .setName("foo")
+          .setTopicId(if (version >= 10) topicId else Uuid.ZERO_UUID)
+          .setName(if (version < 10) topicName else "")
+          .setPartitions(List(
+            new OffsetCommitRequestData.OffsetCommitRequestPartition()
+              .setPartitionIndex(0)
+              .setCommittedOffset(10)).asJava)).asJava)
+
+    val expectedOffsetCommitRequest = new OffsetCommitRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setTopics(List(
+        new OffsetCommitRequestData.OffsetCommitRequestTopic()
+          .setTopicId(if (version >= 10) topicId else Uuid.ZERO_UUID)
+          .setName(topicName)
           .setPartitions(List(
             new OffsetCommitRequestData.OffsetCommitRequestPartition()
               .setPartitionIndex(0)
               .setCommittedOffset(10)).asJava)).asJava)
 
-    val requestChannelRequest = buildRequest(new 
OffsetCommitRequest.Builder(offsetCommitRequest).build())
+    val requestChannelRequest = 
buildRequest(OffsetCommitRequest.Builder.forTopicIdsOrNames(offsetCommitRequest,
 true).build(version))
 
     val future = new CompletableFuture[OffsetCommitResponseData]()
     when(groupCoordinator.commitOffsets(
       requestChannelRequest.context,
-      offsetCommitRequest,
+      expectedOffsetCommitRequest,
       RequestLocal.noCaching.bufferSupplier
     )).thenReturn(future)
     kafkaApis = createKafkaApis()
@@ -1025,7 +1041,8 @@ class KafkaApisTest extends Logging {
     val offsetCommitResponse = new OffsetCommitResponseData()
       .setTopics(List(
         new OffsetCommitResponseData.OffsetCommitResponseTopic()
-          .setName("foo")
+          .setTopicId(if (version >= 10) topicId else Uuid.ZERO_UUID)
+          .setName(if (version < 10) topicName else "")
           .setPartitions(List(
             new OffsetCommitResponseData.OffsetCommitResponsePartition()
               .setPartitionIndex(0)
@@ -1036,27 +1053,43 @@ class KafkaApisTest extends Logging {
     assertEquals(offsetCommitResponse, response.data)
   }
 
-  @Test
-  def testHandleOffsetCommitRequestFutureFailed(): Unit = {
-    addTopicToMetadataCache("foo", numPartitions = 1)
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+  def testHandleOffsetCommitRequestFutureFailed(version: Short): Unit = {
+    val topicName = "foo"
+    val topicId = Uuid.randomUuid()
+    addTopicToMetadataCache(topicName, topicId = topicId, numPartitions = 1)
 
     val offsetCommitRequest = new OffsetCommitRequestData()
       .setGroupId("group")
       .setMemberId("member")
       .setTopics(List(
         new OffsetCommitRequestData.OffsetCommitRequestTopic()
-          .setName("foo")
+          .setTopicId(if (version >= 10) topicId else Uuid.ZERO_UUID)
+          .setName(if (version < 10) topicName else "")
           .setPartitions(List(
             new OffsetCommitRequestData.OffsetCommitRequestPartition()
               .setPartitionIndex(0)
               .setCommittedOffset(10)).asJava)).asJava)
 
-    val requestChannelRequest = buildRequest(new 
OffsetCommitRequest.Builder(offsetCommitRequest).build())
+    val expectedOffsetCommitRequest = new OffsetCommitRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setTopics(List(
+        new OffsetCommitRequestData.OffsetCommitRequestTopic()
+          .setTopicId(if (version >= 10) topicId else Uuid.ZERO_UUID)
+          .setName(topicName)
+          .setPartitions(List(
+            new OffsetCommitRequestData.OffsetCommitRequestPartition()
+              .setPartitionIndex(0)
+              .setCommittedOffset(10)).asJava)).asJava)
+
+    val requestChannelRequest = 
buildRequest(OffsetCommitRequest.Builder.forTopicIdsOrNames(offsetCommitRequest,
 true).build(version))
 
     val future = new CompletableFuture[OffsetCommitResponseData]()
     when(groupCoordinator.commitOffsets(
       requestChannelRequest.context,
-      offsetCommitRequest,
+      expectedOffsetCommitRequest,
       RequestLocal.noCaching.bufferSupplier
     )).thenReturn(future)
 
@@ -1069,7 +1102,8 @@ class KafkaApisTest extends Logging {
     val expectedOffsetCommitResponse = new OffsetCommitResponseData()
       .setTopics(List(
         new OffsetCommitResponseData.OffsetCommitResponseTopic()
-          .setName("foo")
+          .setTopicId(if (version >= 10) topicId else Uuid.ZERO_UUID)
+          .setName(if (version < 10) topicName else "")
           .setPartitions(List(
             new OffsetCommitResponseData.OffsetCommitResponsePartition()
               .setPartitionIndex(0)
@@ -1080,6 +1114,161 @@ class KafkaApisTest extends Logging {
     assertEquals(expectedOffsetCommitResponse, response.data)
   }
 
+  @Test
+  def 
testHandleOffsetCommitRequestTopicsAndPartitionsValidationWithTopicIds(): Unit 
= {
+    val fooId = Uuid.randomUuid()
+    val barId = Uuid.randomUuid()
+    val zarId = Uuid.randomUuid()
+    val fooName = "foo"
+    val barName = "bar"
+    addTopicToMetadataCache(fooName, topicId = fooId, numPartitions = 2)
+    addTopicToMetadataCache(barName, topicId = barId, numPartitions = 2)
+
+    val offsetCommitRequest = new OffsetCommitRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setTopics(List(
+        // foo exists but only has 2 partitions.
+        new OffsetCommitRequestData.OffsetCommitRequestTopic()
+          .setTopicId(fooId)
+          .setPartitions(List(
+            new OffsetCommitRequestData.OffsetCommitRequestPartition()
+              .setPartitionIndex(0)
+              .setCommittedOffset(10),
+            new OffsetCommitRequestData.OffsetCommitRequestPartition()
+              .setPartitionIndex(1)
+              .setCommittedOffset(20),
+            new OffsetCommitRequestData.OffsetCommitRequestPartition()
+              .setPartitionIndex(2)
+              .setCommittedOffset(30)).asJava),
+        // bar exists.
+        new OffsetCommitRequestData.OffsetCommitRequestTopic()
+          .setTopicId(barId)
+          .setPartitions(List(
+            new OffsetCommitRequestData.OffsetCommitRequestPartition()
+              .setPartitionIndex(0)
+              .setCommittedOffset(40),
+            new OffsetCommitRequestData.OffsetCommitRequestPartition()
+              .setPartitionIndex(1)
+              .setCommittedOffset(50)).asJava),
+        // zar does not exist.
+        new OffsetCommitRequestData.OffsetCommitRequestTopic()
+          .setTopicId(zarId)
+          .setPartitions(List(
+            new OffsetCommitRequestData.OffsetCommitRequestPartition()
+              .setPartitionIndex(0)
+              .setCommittedOffset(60),
+            new OffsetCommitRequestData.OffsetCommitRequestPartition()
+              .setPartitionIndex(1)
+              .setCommittedOffset(70)).asJava)).asJava)
+
+    val requestChannelRequest = 
buildRequest(OffsetCommitRequest.Builder.forTopicIdsOrNames(offsetCommitRequest,
 true).build())
+
+    // This is the request expected by the group coordinator.
+    val expectedOffsetCommitRequest = new OffsetCommitRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setTopics(List(
+        // foo exists but only has 2 partitions.
+        new OffsetCommitRequestData.OffsetCommitRequestTopic()
+          .setTopicId(fooId)
+          .setName(fooName)
+          .setPartitions(List(
+            new OffsetCommitRequestData.OffsetCommitRequestPartition()
+              .setPartitionIndex(0)
+              .setCommittedOffset(10),
+            new OffsetCommitRequestData.OffsetCommitRequestPartition()
+              .setPartitionIndex(1)
+              .setCommittedOffset(20)).asJava),
+        new OffsetCommitRequestData.OffsetCommitRequestTopic()
+          .setTopicId(barId)
+          .setName(barName)
+          .setPartitions(List(
+            new OffsetCommitRequestData.OffsetCommitRequestPartition()
+              .setPartitionIndex(0)
+              .setCommittedOffset(40),
+            new OffsetCommitRequestData.OffsetCommitRequestPartition()
+              .setPartitionIndex(1)
+              .setCommittedOffset(50)).asJava)).asJava)
+
+    val future = new CompletableFuture[OffsetCommitResponseData]()
+    when(groupCoordinator.commitOffsets(
+      requestChannelRequest.context,
+      expectedOffsetCommitRequest,
+      RequestLocal.noCaching.bufferSupplier
+    )).thenReturn(future)
+    kafkaApis = createKafkaApis()
+    kafkaApis.handle(
+      requestChannelRequest,
+      RequestLocal.noCaching
+    )
+
+    // This is the response returned by the group coordinator.
+    val offsetCommitResponse = new OffsetCommitResponseData()
+      .setTopics(List(
+        new OffsetCommitResponseData.OffsetCommitResponseTopic()
+          .setTopicId(fooId)
+          .setName(fooName)
+          .setPartitions(List(
+            new OffsetCommitResponseData.OffsetCommitResponsePartition()
+              .setPartitionIndex(0)
+              .setErrorCode(Errors.NONE.code),
+            new OffsetCommitResponseData.OffsetCommitResponsePartition()
+              .setPartitionIndex(1)
+              .setErrorCode(Errors.NONE.code)).asJava),
+        new OffsetCommitResponseData.OffsetCommitResponseTopic()
+          .setTopicId(barId)
+          .setName(barName)
+          .setPartitions(List(
+            new OffsetCommitResponseData.OffsetCommitResponsePartition()
+              .setPartitionIndex(0)
+              .setErrorCode(Errors.NONE.code),
+            new OffsetCommitResponseData.OffsetCommitResponsePartition()
+              .setPartitionIndex(1)
+              .setErrorCode(Errors.NONE.code)).asJava)).asJava)
+
+    val expectedOffsetCommitResponse = new OffsetCommitResponseData()
+      .setTopics(List(
+        new OffsetCommitResponseData.OffsetCommitResponseTopic()
+          .setTopicId(fooId)
+          .setPartitions(List(
+            // foo-2 is first because partitions failing the validation
+            // are put in the response first.
+            new OffsetCommitResponseData.OffsetCommitResponsePartition()
+              .setPartitionIndex(2)
+              .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code),
+            new OffsetCommitResponseData.OffsetCommitResponsePartition()
+              .setPartitionIndex(0)
+              .setErrorCode(Errors.NONE.code),
+            new OffsetCommitResponseData.OffsetCommitResponsePartition()
+              .setPartitionIndex(1)
+              .setErrorCode(Errors.NONE.code)).asJava),
+        // zar is before bar because topics failing the validation are
+        // put in the response first.
+        new OffsetCommitResponseData.OffsetCommitResponseTopic()
+          .setTopicId(zarId)
+          .setPartitions(List(
+            new OffsetCommitResponseData.OffsetCommitResponsePartition()
+              .setPartitionIndex(0)
+              .setErrorCode(Errors.UNKNOWN_TOPIC_ID.code),
+            new OffsetCommitResponseData.OffsetCommitResponsePartition()
+              .setPartitionIndex(1)
+              .setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)).asJava),
+        new OffsetCommitResponseData.OffsetCommitResponseTopic()
+          .setTopicId(barId)
+          .setPartitions(List(
+            new OffsetCommitResponseData.OffsetCommitResponsePartition()
+              .setPartitionIndex(0)
+              .setErrorCode(Errors.NONE.code),
+            new OffsetCommitResponseData.OffsetCommitResponsePartition()
+              .setPartitionIndex(1)
+              .setErrorCode(Errors.NONE.code)).asJava)).asJava)
+
+    future.complete(offsetCommitResponse)
+    val response = 
verifyNoThrottling[OffsetCommitResponse](requestChannelRequest)
+    assertEquals(expectedOffsetCommitResponse, response.data)
+  }
+
   @Test
   def testHandleOffsetCommitRequestTopicsAndPartitionsValidation(): Unit = {
     addTopicToMetadataCache("foo", numPartitions = 2)
@@ -1123,7 +1312,7 @@ class KafkaApisTest extends Logging {
               .setPartitionIndex(1)
               .setCommittedOffset(70)).asJava)).asJava)
 
-    val requestChannelRequest = buildRequest(new 
OffsetCommitRequest.Builder(offsetCommitRequest).build())
+    val requestChannelRequest = 
buildRequest(OffsetCommitRequest.Builder.forTopicNames(offsetCommitRequest).build())
 
     // This is the request expected by the group coordinator.
     val expectedOffsetCommitRequest = new OffsetCommitRequestData()
@@ -1226,48 +1415,6 @@ class KafkaApisTest extends Logging {
     assertEquals(expectedOffsetCommitResponse, response.data)
   }
 
-  @Test
-  def testOffsetCommitWithInvalidPartition(): Unit = {
-    val topic = "topic"
-    addTopicToMetadataCache(topic, numPartitions = 1)
-
-    def checkInvalidPartition(invalidPartitionId: Int): Unit = {
-      reset(replicaManager, clientRequestQuotaManager, requestChannel)
-
-      val offsetCommitRequest = new OffsetCommitRequest.Builder(
-        new OffsetCommitRequestData()
-          .setGroupId("groupId")
-          .setTopics(Collections.singletonList(
-            new OffsetCommitRequestData.OffsetCommitRequestTopic()
-              .setName(topic)
-              .setPartitions(Collections.singletonList(
-                new OffsetCommitRequestData.OffsetCommitRequestPartition()
-                  .setPartitionIndex(invalidPartitionId)
-                  .setCommittedOffset(15)
-                  
.setCommittedLeaderEpoch(RecordBatch.NO_PARTITION_LEADER_EPOCH)
-                  .setCommittedMetadata(""))
-              )
-          ))).build()
-
-      val request = buildRequest(offsetCommitRequest)
-      
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
-        any[Long])).thenReturn(0)
-      val kafkaApis = createKafkaApis()
-      try {
-        kafkaApis.handleOffsetCommitRequest(request, 
RequestLocal.withThreadConfinedCaching)
-
-        val response = verifyNoThrottling[OffsetCommitResponse](request)
-        assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION,
-          
Errors.forCode(response.data.topics().get(0).partitions().get(0).errorCode))
-      } finally {
-        kafkaApis.close()
-      }
-    }
-
-    checkInvalidPartition(-1)
-    checkInvalidPartition(1) // topic has only one partition
-  }
-
   @Test
   def testTxnOffsetCommitWithInvalidPartition(): Unit = {
     val topic = "topic"
diff --git 
a/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala
index f289c241d1b..eceb21a4077 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala
@@ -16,6 +16,7 @@
  */
 package kafka.server
 
+import org.apache.kafka.common.Uuid
 import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.test.ClusterInstance
@@ -46,7 +47,7 @@ class OffsetCommitRequestTest(cluster: ClusterInstance) 
extends GroupCoordinator
     createOffsetsTopic()
 
     // Create the topic.
-    createTopic(
+    val topicId = createTopic(
       topic = "foo",
       numPartitions = 3
     )
@@ -55,7 +56,6 @@ class OffsetCommitRequestTest(cluster: ClusterInstance) 
extends GroupCoordinator
     // a session long enough for the duration of the test.
     val (memberId, memberEpoch) = joinConsumerGroup("grp", useNewProtocol)
 
-    // Start from version 1 because version 0 goes to ZK.
     for (version <- ApiKeys.OFFSET_COMMIT.oldestVersion to 
ApiKeys.OFFSET_COMMIT.latestVersion(isUnstableApiEnabled)) {
       // Commit offset.
       commitOffset(
@@ -63,6 +63,7 @@ class OffsetCommitRequestTest(cluster: ClusterInstance) 
extends GroupCoordinator
         memberId = memberId,
         memberEpoch = memberEpoch,
         topic = "foo",
+        topicId = topicId,
         partition = 0,
         offset = 100L,
         expectedError = if (useNewProtocol && version < 9) 
Errors.UNSUPPORTED_VERSION else Errors.NONE,
@@ -75,6 +76,7 @@ class OffsetCommitRequestTest(cluster: ClusterInstance) 
extends GroupCoordinator
         memberId = memberId,
         memberEpoch = memberEpoch,
         topic = "foo",
+        topicId = topicId,
         partition = 0,
         offset = 100L,
         expectedError =
@@ -89,6 +91,7 @@ class OffsetCommitRequestTest(cluster: ClusterInstance) 
extends GroupCoordinator
         memberId = memberId,
         memberEpoch = memberEpoch,
         topic = "foo",
+        topicId = topicId,
         partition = 0,
         offset = 100L,
         expectedError =
@@ -103,6 +106,7 @@ class OffsetCommitRequestTest(cluster: ClusterInstance) 
extends GroupCoordinator
         memberId = "",
         memberEpoch = memberEpoch,
         topic = "foo",
+        topicId = topicId,
         partition = 0,
         offset = 100L,
         expectedError = Errors.UNKNOWN_MEMBER_ID,
@@ -115,6 +119,7 @@ class OffsetCommitRequestTest(cluster: ClusterInstance) 
extends GroupCoordinator
         memberId = memberId,
         memberEpoch = memberEpoch + 1,
         topic = "foo",
+        topicId = topicId,
         partition = 0,
         offset = 100L,
         expectedError =
@@ -131,11 +136,27 @@ class OffsetCommitRequestTest(cluster: ClusterInstance) 
extends GroupCoordinator
         memberId = "",
         memberEpoch = -1,
         topic = "foo",
+        topicId = topicId,
         partition = 0,
         offset = 100L,
         expectedError = Errors.NONE,
         version = version.toShort
       )
+
+      // Commit offset to a group with an unknown topic id.
+      if (version >= 10) {
+        commitOffset(
+          groupId = "grp",
+          memberId = memberId,
+          memberEpoch = memberEpoch,
+          topic = "bar",
+          topicId = Uuid.randomUuid(),
+          partition = 0,
+          offset = 100L,
+          expectedError = Errors.UNKNOWN_TOPIC_ID,
+          version = version.toShort
+        )
+      }
     }
   }
 }
diff --git 
a/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala
index c9201b24e98..0fc414e24c9 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala
@@ -45,7 +45,7 @@ class OffsetDeleteRequestTest(cluster: ClusterInstance) 
extends GroupCoordinator
     createOffsetsTopic()
 
     // Create the topic.
-    createTopic(
+    val topicId = createTopic(
       topic = "foo",
       numPartitions = 3
     )
@@ -65,6 +65,7 @@ class OffsetDeleteRequestTest(cluster: ClusterInstance) 
extends GroupCoordinator
           memberId = memberId,
           memberEpoch = memberEpoch,
           topic = "foo",
+          topicId = topicId,
           partition = partitionId,
           offset = 100L + partitionId,
           expectedError = Errors.NONE,
diff --git a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
index b49de577931..be95cef7844 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
@@ -71,7 +71,7 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) 
extends GroupCoordinatorB
     createOffsetsTopic()
 
     // Create the topic.
-    createTopic(
+    val topicId = createTopic(
       topic = "foo",
       numPartitions = 3
     )
@@ -87,6 +87,7 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) 
extends GroupCoordinatorB
         memberId = memberId,
         memberEpoch = memberEpoch,
         topic = "foo",
+        topicId = topicId,
         partition = partitionId,
         offset = 100L + partitionId,
         expectedError = Errors.NONE,
@@ -239,7 +240,7 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) 
extends GroupCoordinatorB
     createOffsetsTopic()
 
     // Create the topic.
-    createTopic(
+    val topicId = createTopic(
       topic = "foo",
       numPartitions = 3
     )
@@ -255,6 +256,7 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) 
extends GroupCoordinatorB
         memberId = memberId,
         memberEpoch = memberEpoch,
         topic = "foo",
+        topicId = topicId,
         partition = partitionId,
         offset = 100L + partitionId,
         expectedError = Errors.NONE,
@@ -348,7 +350,7 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) 
extends GroupCoordinatorB
     createOffsetsTopic()
 
     // Create the topic.
-    createTopic(
+    val topicId = createTopic(
       topic = "foo",
       numPartitions = 3
     )
@@ -365,6 +367,7 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) 
extends GroupCoordinatorB
           memberId = memberId,
           memberEpoch = memberEpoch,
           topic = "foo",
+          topicId = topicId,
           partition = partitionId,
           offset = 100L + partitionId,
           expectedError = Errors.NONE,
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala 
b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 154afd34a60..7610e466207 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -270,7 +270,7 @@ class RequestQuotaTest extends BaseRequestTest {
             .setTargetTimes(List(topic).asJava)
 
         case ApiKeys.OFFSET_COMMIT =>
-          new OffsetCommitRequest.Builder(
+          OffsetCommitRequest.Builder.forTopicNames(
             new OffsetCommitRequestData()
               .setGroupId("test-group")
               .setGenerationIdOrMemberEpoch(1)
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
index 2b50071a7f7..0fa997c557a 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
@@ -461,7 +461,9 @@ public class OffsetMetadataManager {
         final OptionalLong expireTimestampMs = 
expireTimestampMs(request.retentionTimeMs(), currentTimeMs);
 
         request.topics().forEach(topic -> {
-            final OffsetCommitResponseTopic topicResponse = new 
OffsetCommitResponseTopic().setName(topic.name());
+            final OffsetCommitResponseTopic topicResponse = new 
OffsetCommitResponseTopic()
+                .setTopicId(topic.topicId())
+                .setName(topic.name());
             response.topics().add(topicResponse);
 
             topic.partitions().forEach(partition -> {
@@ -470,8 +472,8 @@ public class OffsetMetadataManager {
                         .setPartitionIndex(partition.partitionIndex())
                         
.setErrorCode(Errors.OFFSET_METADATA_TOO_LARGE.code()));
                 } else {
-                    log.debug("[GroupId {}] Committing offsets {} for 
partition {}-{} from member {} with leader epoch {}.",
-                        request.groupId(), partition.committedOffset(), 
topic.name(), partition.partitionIndex(),
+                    log.debug("[GroupId {}] Committing offsets {} for 
partition {}-{}-{} from member {} with leader epoch {}.",
+                        request.groupId(), partition.committedOffset(), 
topic.topicId(), topic.name(), partition.partitionIndex(),
                         request.memberId(), partition.committedLeaderEpoch());
 
                     topicResponse.partitions().add(new 
OffsetCommitResponsePartition()
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
index 6f788d84fd0..382b2a9b0e5 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
@@ -1308,6 +1308,75 @@ public class OffsetMetadataManagerTest {
         );
     }
 
+    @Test
+    public void testConsumerGroupOffsetCommitWithTopicIds() {
+        Uuid topicId = Uuid.randomUuid();
+        OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
+
+        // Create an empty group.
+        ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup(
+            "foo",
+            true
+        );
+
+        // Add member.
+        group.updateMember(new ConsumerGroupMember.Builder("member")
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(10)
+            .build()
+        );
+
+        CoordinatorResult<OffsetCommitResponseData, CoordinatorRecord> result 
= context.commitOffset(
+            new OffsetCommitRequestData()
+                .setGroupId("foo")
+                .setMemberId("member")
+                .setGenerationIdOrMemberEpoch(10)
+                .setTopics(List.of(
+                    new OffsetCommitRequestData.OffsetCommitRequestTopic()
+                        .setTopicId(topicId)
+                        .setName("bar")
+                        .setPartitions(List.of(
+                            new 
OffsetCommitRequestData.OffsetCommitRequestPartition()
+                                .setPartitionIndex(0)
+                                .setCommittedOffset(100L)
+                                .setCommittedLeaderEpoch(10)
+                                .setCommittedMetadata("metadata")
+                        ))
+                ))
+        );
+
+        assertEquals(
+            new OffsetCommitResponseData()
+                .setTopics(List.of(
+                    new OffsetCommitResponseData.OffsetCommitResponseTopic()
+                        .setTopicId(topicId)
+                        .setName("bar")
+                        .setPartitions(List.of(
+                            new 
OffsetCommitResponseData.OffsetCommitResponsePartition()
+                                .setPartitionIndex(0)
+                                .setErrorCode(Errors.NONE.code())
+                        ))
+                )),
+            result.response()
+        );
+
+        assertEquals(
+            List.of(GroupCoordinatorRecordHelpers.newOffsetCommitRecord(
+                "foo",
+                "bar",
+                0,
+                new OffsetAndMetadata(
+                    100L,
+                    OptionalInt.of(10),
+                    "metadata",
+                    context.time.milliseconds(),
+                    OptionalLong.empty()
+                )
+            )),
+            result.records()
+        );
+    }
+
     @Test
     public void testConsumerGroupOffsetCommitWithOffsetMetadataTooLarge() {
         OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder()

Reply via email to