This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e6669672ef1 KAFKA-14367; Add `OffsetCommit` to the new 
`GroupCoordinator` interface (#12886)
e6669672ef1 is described below

commit e6669672ef11ee869e3e4e5ae04bad6c505d5342
Author: David Jacot <[email protected]>
AuthorDate: Thu Jan 12 18:05:49 2023 +0100

    KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface 
(#12886)
    
    This patch adds `OffsetCommit` to the new `GroupCoordinator` interface and 
updates `KafkaApis` to use it.
    
    Reviewers: Omnia G H Ibrahim <[email protected]>, Jeff Kim 
<[email protected]>, Justine Olshan <[email protected]>
---
 .../kafka/common/requests/OffsetCommitRequest.java |   5 +
 .../common/requests/OffsetCommitResponse.java      |  80 +++++++
 .../kafka/server/builders/KafkaApisBuilder.java    |   2 +-
 .../group/GroupCoordinatorAdapter.scala            |  87 +++++++-
 .../src/main/scala/kafka/server/BrokerServer.scala |   2 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   | 239 +++++++++++----------
 core/src/main/scala/kafka/server/KafkaServer.scala |   2 +-
 .../group/GroupCoordinatorAdapterTest.scala        |  94 +++++++-
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 233 ++++++++++++++++++++
 .../kafka/coordinator/group/GroupCoordinator.java  |  17 ++
 10 files changed, 627 insertions(+), 134 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
index 9869da5d254..8fc884e6614 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
@@ -118,6 +118,11 @@ public class OffsetCommitRequest extends AbstractRequest {
                 .setThrottleTimeMs(throttleTimeMs));
     }
 
+    @Override
+    public OffsetCommitResponse getErrorResponse(Throwable e) {
+        return getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, e);
+    }
+
     public static OffsetCommitRequest parse(ByteBuffer buffer, short version) {
         return new OffsetCommitRequest(new OffsetCommitRequestData(new 
ByteBufferAccessor(buffer), version), version);
     }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
index 713b68974a1..8c0bb9c182d 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
@@ -27,7 +27,9 @@ import org.apache.kafka.common.protocol.Errors;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.function.Function;
 
 /**
  * Possible error codes:
@@ -116,4 +118,82 @@ public class OffsetCommitResponse extends AbstractResponse 
{
     public boolean shouldClientThrottle(short version) {
         return version >= 4;
     }
+
+    public static class Builder {
+        OffsetCommitResponseData data = new OffsetCommitResponseData();
+        HashMap<String, OffsetCommitResponseTopic> byTopicName = new 
HashMap<>();
+
+        private OffsetCommitResponseTopic getOrCreateTopic(
+            String topicName
+        ) {
+            OffsetCommitResponseTopic topic = byTopicName.get(topicName);
+            if (topic == null) {
+                topic = new OffsetCommitResponseTopic().setName(topicName);
+                data.topics().add(topic);
+                byTopicName.put(topicName, topic);
+            }
+            return topic;
+        }
+
+        public Builder addPartition(
+            String topicName,
+            int partitionIndex,
+            Errors error
+        ) {
+            final OffsetCommitResponseTopic topicResponse = 
getOrCreateTopic(topicName);
+
+            topicResponse.partitions().add(new OffsetCommitResponsePartition()
+                .setPartitionIndex(partitionIndex)
+                .setErrorCode(error.code()));
+
+            return this;
+        }
+
+        public <P> Builder addPartitions(
+            String topicName,
+            List<P> partitions,
+            Function<P, Integer> partitionIndex,
+            Errors error
+        ) {
+            final OffsetCommitResponseTopic topicResponse = 
getOrCreateTopic(topicName);
+
+            partitions.forEach(partition -> {
+                topicResponse.partitions().add(new 
OffsetCommitResponsePartition()
+                    .setPartitionIndex(partitionIndex.apply(partition))
+                    .setErrorCode(error.code()));
+            });
+
+            return this;
+        }
+
+        public Builder merge(
+            OffsetCommitResponseData newData
+        ) {
+            if (data.topics().isEmpty()) {
+                // If the current data is empty, we can discard it and use the 
new data.
+                data = newData;
+            } else {
+                // Otherwise, we have to merge them together.
+                newData.topics().forEach(newTopic -> {
+                    OffsetCommitResponseTopic existingTopic = 
byTopicName.get(newTopic.name());
+                    if (existingTopic == null) {
+                        // If no topic exists, we can directly copy the new 
topic data.
+                        data.topics().add(newTopic);
+                        byTopicName.put(newTopic.name(), newTopic);
+                    } else {
+                        // Otherwise, we add the partitions to the existing 
one. Note we
+                        // expect non-overlapping partitions here as we don't 
verify
+                        // if the partition is already in the list before 
adding it.
+                        
existingTopic.partitions().addAll(newTopic.partitions());
+                    }
+                });
+            }
+
+            return this;
+        }
+
+        public OffsetCommitResponse build() {
+            return new OffsetCommitResponse(data);
+        }
+    }
 }
diff --git a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java 
b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
index 95c18a6f3e3..18cd42c77cb 100644
--- a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
+++ b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
@@ -179,7 +179,7 @@ public class KafkaApisBuilder {
                              metadataSupport,
                              replicaManager,
                              groupCoordinator,
-                             new GroupCoordinatorAdapter(groupCoordinator),
+                             new GroupCoordinatorAdapter(groupCoordinator, 
time),
                              txnCoordinator,
                              autoTopicCreationManager,
                              brokerId,
diff --git 
a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
index 29242a8774a..1a97079ce39 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
@@ -16,15 +16,18 @@
  */
 package kafka.coordinator.group
 
+import kafka.common.OffsetAndMetadata
 import kafka.server.RequestLocal
 import kafka.utils.Implicits.MapExtensionMethods
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.message.{DeleteGroupsResponseData, 
DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, 
JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, 
LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, 
OffsetFetchRequestData, OffsetFetchResponseData, SyncGroupRequestData, 
SyncGroupResponseData}
+import org.apache.kafka.common.message.{DeleteGroupsResponseData, 
DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, 
JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, 
LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, 
OffsetCommitRequestData, OffsetCommitResponseData, OffsetFetchRequestData, 
OffsetFetchResponseData, SyncGroupRequestData, SyncGroupResponseData}
 import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.RequestContext
-import org.apache.kafka.common.utils.BufferSupplier
+import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.requests.{OffsetCommitRequest, RequestContext}
+import org.apache.kafka.common.utils.{BufferSupplier, Time}
 
 import java.util
+import java.util.Optional
 import java.util.concurrent.CompletableFuture
 import scala.collection.{immutable, mutable}
 import scala.jdk.CollectionConverters._
@@ -34,7 +37,8 @@ import scala.jdk.CollectionConverters._
  * that exposes the new org.apache.kafka.coordinator.group.GroupCoordinator 
interface.
  */
 class GroupCoordinatorAdapter(
-  val coordinator: GroupCoordinator
+  private val coordinator: GroupCoordinator,
+  private val time: Time
 ) extends org.apache.kafka.coordinator.group.GroupCoordinator {
 
   override def joinGroup(
@@ -312,4 +316,79 @@ class GroupCoordinatorAdapter(
 
     future
   }
+
+  override def commitOffsets(
+    context: RequestContext,
+    request: OffsetCommitRequestData,
+    bufferSupplier: BufferSupplier
+  ): CompletableFuture[OffsetCommitResponseData] = {
+    val currentTimeMs = time.milliseconds
+    val future = new CompletableFuture[OffsetCommitResponseData]()
+
+    def callback(commitStatus: Map[TopicPartition, Errors]): Unit = {
+      val response = new OffsetCommitResponseData()
+      val byTopics = new mutable.HashMap[String, 
OffsetCommitResponseData.OffsetCommitResponseTopic]()
+
+      commitStatus.forKeyValue { (tp, error) =>
+        val topic = byTopics.get(tp.topic) match {
+          case Some(existingTopic) =>
+            existingTopic
+          case None =>
+            val newTopic = new 
OffsetCommitResponseData.OffsetCommitResponseTopic().setName(tp.topic)
+            byTopics += tp.topic -> newTopic
+            response.topics.add(newTopic)
+            newTopic
+        }
+
+        topic.partitions.add(new 
OffsetCommitResponseData.OffsetCommitResponsePartition()
+          .setPartitionIndex(tp.partition)
+          .setErrorCode(error.code))
+      }
+
+      future.complete(response)
+    }
+
+    // "default" expiration timestamp is defined as now + retention. The 
retention may be overridden
+    // in versions from v2 to v4. Otherwise, the retention defined on the 
broker is used. If an explicit
+    // commit timestamp is provided (v1 only), the expiration timestamp is 
computed based on that.
+    val expireTimeMs = request.retentionTimeMs match {
+      case OffsetCommitRequest.DEFAULT_RETENTION_TIME => None
+      case retentionTimeMs => Some(currentTimeMs + retentionTimeMs)
+    }
+
+    val partitions = new mutable.HashMap[TopicPartition, OffsetAndMetadata]()
+    request.topics.forEach { topic =>
+      topic.partitions.forEach { partition =>
+        val tp = new TopicPartition(topic.name, partition.partitionIndex)
+        partitions += tp -> new OffsetAndMetadata(
+          offset = partition.committedOffset,
+          leaderEpoch = partition.committedLeaderEpoch match {
+            case RecordBatch.NO_PARTITION_LEADER_EPOCH => 
Optional.empty[Integer]
+            case committedLeaderEpoch => 
Optional.of[Integer](committedLeaderEpoch)
+          },
+          metadata = partition.committedMetadata match {
+            case null => OffsetAndMetadata.NoMetadata
+            case metadata => metadata
+          },
+          commitTimestamp = partition.commitTimestamp match {
+            case OffsetCommitRequest.DEFAULT_TIMESTAMP => currentTimeMs
+            case customTimestamp => customTimestamp
+          },
+          expireTimestamp = expireTimeMs
+        )
+      }
+    }
+
+    coordinator.handleCommitOffsets(
+      request.groupId,
+      request.memberId,
+      Option(request.groupInstanceId),
+      request.generationId,
+      partitions.toMap,
+      callback,
+      RequestLocal(bufferSupplier)
+    )
+
+    future
+  }
 }
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 83cdb046863..c72f297b2fc 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -409,7 +409,7 @@ class BrokerServer(
         metadataSupport = raftSupport,
         replicaManager = replicaManager,
         groupCoordinator = groupCoordinator,
-        newGroupCoordinator = new GroupCoordinatorAdapter(groupCoordinator),
+        newGroupCoordinator = new GroupCoordinatorAdapter(groupCoordinator, 
time),
         txnCoordinator = transactionCoordinator,
         autoTopicCreationManager = autoTopicCreationManager,
         brokerId = config.nodeId,
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 20cfe766eb2..42768fa98f9 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -187,7 +187,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
         case ApiKeys.UPDATE_METADATA => handleUpdateMetadataRequest(request, 
requestLocal)
         case ApiKeys.CONTROLLED_SHUTDOWN => 
handleControlledShutdownRequest(request)
-        case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request, 
requestLocal)
+        case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request, 
requestLocal).exceptionally(handleError)
         case ApiKeys.OFFSET_FETCH => 
handleOffsetFetchRequest(request).exceptionally(handleError)
         case ApiKeys.FIND_COORDINATOR => handleFindCoordinatorRequest(request)
         case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request, 
requestLocal).exceptionally(handleError)
@@ -410,137 +410,144 @@ class KafkaApis(val requestChannel: RequestChannel,
   /**
    * Handle an offset commit request
    */
-  def handleOffsetCommitRequest(request: RequestChannel.Request, requestLocal: 
RequestLocal): Unit = {
-    val header = request.header
+  def handleOffsetCommitRequest(
+    request: RequestChannel.Request,
+    requestLocal: RequestLocal
+  ): CompletableFuture[Unit] = {
     val offsetCommitRequest = request.body[OffsetCommitRequest]
 
-    val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]()
-    val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]()
-    // the callback for sending an offset commit response
-    def sendResponseCallback(commitStatus: Map[TopicPartition, Errors]): Unit 
= {
-      val combinedCommitStatus = commitStatus ++ unauthorizedTopicErrors ++ 
nonExistingTopicErrors
-      if (isDebugEnabled)
-        combinedCommitStatus.forKeyValue { (topicPartition, error) =>
-          if (error != Errors.NONE) {
-            debug(s"Offset commit request with correlation id 
${header.correlationId} from client ${header.clientId} " +
-              s"on partition $topicPartition failed due to 
${error.exceptionName}")
-          }
-        }
-      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-        new OffsetCommitResponse(requestThrottleMs, 
combinedCommitStatus.asJava))
-    }
-
-      // reject the request if not authorized to the group
+    // Reject the request if not authorized to the group
     if (!authHelper.authorize(request.context, READ, GROUP, 
offsetCommitRequest.data.groupId)) {
-      val error = Errors.GROUP_AUTHORIZATION_FAILED
-      val responseTopicList = OffsetCommitRequest.getErrorResponseTopics(
-        offsetCommitRequest.data.topics,
-        error)
-
-      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => 
new OffsetCommitResponse(
-        new OffsetCommitResponseData()
-            .setTopics(responseTopicList)
-            .setThrottleTimeMs(requestThrottleMs)
-      ))
+      requestHelper.sendMaybeThrottle(request, 
offsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
+      CompletableFuture.completedFuture[Unit](())
     } else if (offsetCommitRequest.data.groupInstanceId != null && 
config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) {
       // Only enable static membership when IBP >= 2.3, because it is not safe 
for the broker to use the static member logic
       // until we are sure that all brokers support it. If static group being 
loaded by an older coordinator, it will discard
       // the group.instance.id field, so static members could accidentally 
become "dynamic", which leads to wrong states.
-      val errorMap = new mutable.HashMap[TopicPartition, Errors]
-      for (topicData <- offsetCommitRequest.data.topics.asScala) {
-        for (partitionData <- topicData.partitions.asScala) {
-          val topicPartition = new TopicPartition(topicData.name, 
partitionData.partitionIndex)
-          errorMap += topicPartition -> Errors.UNSUPPORTED_VERSION
-        }
-      }
-      sendResponseCallback(errorMap.toMap)
+      requestHelper.sendMaybeThrottle(request, 
offsetCommitRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+      CompletableFuture.completedFuture[Unit](())
     } else {
-      val authorizedTopicRequestInfoBldr = 
immutable.Map.newBuilder[TopicPartition, 
OffsetCommitRequestData.OffsetCommitRequestPartition]
+      val authorizedTopics = authHelper.filterByAuthorized(
+        request.context,
+        READ,
+        TOPIC,
+        offsetCommitRequest.data.topics.asScala
+      )(_.name)
+
+      val responseBuilder = new OffsetCommitResponse.Builder()
+      val authorizedTopicsRequest = new 
mutable.ArrayBuffer[OffsetCommitRequestData.OffsetCommitRequestTopic]()
+      offsetCommitRequest.data.topics.forEach { topic =>
+        if (!authorizedTopics.contains(topic.name)) {
+          // If the topic is not authorized, we add the topic and all its 
partitions
+          // to the response with TOPIC_AUTHORIZATION_FAILED.
+          
responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition](
+            topic.name, topic.partitions, _.partitionIndex, 
Errors.TOPIC_AUTHORIZATION_FAILED)
+        } else if (!metadataCache.contains(topic.name)) {
+          // If the topic is unknown, we add the topic and all its partitions
+          // to the response with UNKNOWN_TOPIC_OR_PARTITION.
+          
responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition](
+            topic.name, topic.partitions, _.partitionIndex, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)
+        } else {
+          // Otherwise, we check all partitions to ensure that they all exist.
+          val topicWithValidPartitions = new 
OffsetCommitRequestData.OffsetCommitRequestTopic().setName(topic.name)
 
-      val topics = offsetCommitRequest.data.topics.asScala
-      val authorizedTopics = authHelper.filterByAuthorized(request.context, 
READ, TOPIC, topics)(_.name)
-      for (topicData <- topics) {
-        for (partitionData <- topicData.partitions.asScala) {
-          val topicPartition = new TopicPartition(topicData.name, 
partitionData.partitionIndex)
-          if (!authorizedTopics.contains(topicData.name))
-            unauthorizedTopicErrors += (topicPartition -> 
Errors.TOPIC_AUTHORIZATION_FAILED)
-          else if (!metadataCache.contains(topicPartition))
-            nonExistingTopicErrors += (topicPartition -> 
Errors.UNKNOWN_TOPIC_OR_PARTITION)
-          else
-            authorizedTopicRequestInfoBldr += (topicPartition -> partitionData)
+          topic.partitions.forEach { partition =>
+            if (metadataCache.getPartitionInfo(topic.name, 
partition.partitionIndex).nonEmpty) {
+              topicWithValidPartitions.partitions.add(partition)
+            } else {
+              responseBuilder.addPartition(topic.name, 
partition.partitionIndex, Errors.UNKNOWN_TOPIC_OR_PARTITION)
+            }
+          }
+
+          if (!topicWithValidPartitions.partitions.isEmpty) {
+            authorizedTopicsRequest += topicWithValidPartitions
+          }
         }
       }
 
-      val authorizedTopicRequestInfo = authorizedTopicRequestInfoBldr.result()
-
-      if (authorizedTopicRequestInfo.isEmpty)
-        sendResponseCallback(Map.empty)
-      else if (header.apiVersion == 0) {
-        // for version 0 always store offsets to ZK
-        val zkSupport = 
metadataSupport.requireZkOrThrow(KafkaApis.unsupported("Version 0 offset commit 
requests"))
-        val responseInfo = authorizedTopicRequestInfo.map {
-          case (topicPartition, partitionData) =>
-            try {
-              if (partitionData.committedMetadata() != null
-                && partitionData.committedMetadata().length > 
config.offsetMetadataMaxSize)
-                (topicPartition, Errors.OFFSET_METADATA_TOO_LARGE)
-              else {
-                zkSupport.zkClient.setOrCreateConsumerOffset(
-                  offsetCommitRequest.data.groupId,
-                  topicPartition,
-                  partitionData.committedOffset)
-                (topicPartition, Errors.NONE)
-              }
-            } catch {
-              case e: Throwable => (topicPartition, Errors.forException(e))
-            }
-        }
-        sendResponseCallback(responseInfo)
+      if (authorizedTopicsRequest.isEmpty) {
+        requestHelper.sendMaybeThrottle(request, responseBuilder.build())
+        CompletableFuture.completedFuture(())
+      } else if (request.header.apiVersion == 0) {
+        // For version 0, always store offsets in ZK.
+        commitOffsetsToZookeeper(
+          request,
+          offsetCommitRequest,
+          authorizedTopicsRequest,
+          responseBuilder
+        )
       } else {
-        // for version 1 and beyond store offsets in offset manager
-
-        // "default" expiration timestamp is now + retention (and retention 
may be overridden if v2)
-        // expire timestamp is computed differently for v1 and v2.
-        //   - If v1 and no explicit commit timestamp is provided we treat it 
the same as v5.
-        //   - If v1 and explicit retention time is provided we calculate 
expiration timestamp based on that
-        //   - If v2/v3/v4 (no explicit commit timestamp) we treat it the same 
as v5.
-        //   - For v5 and beyond there is no per partition expiration 
timestamp, so this field is no longer in effect
-        val currentTimestamp = time.milliseconds
-        val partitionData = authorizedTopicRequestInfo.map { case (k, 
partitionData) =>
-          val metadata = if (partitionData.committedMetadata == null)
-            OffsetAndMetadata.NoMetadata
-          else
-            partitionData.committedMetadata
+        // For version > 0, store offsets in Coordinator.
+        commitOffsetsToCoordinator(
+          request,
+          offsetCommitRequest,
+          authorizedTopicsRequest,
+          responseBuilder,
+          requestLocal
+        )
+      }
+    }
+  }
 
-          val leaderEpochOpt = if (partitionData.committedLeaderEpoch == 
RecordBatch.NO_PARTITION_LEADER_EPOCH)
-            Optional.empty[Integer]
-          else
-            Optional.of[Integer](partitionData.committedLeaderEpoch)
-
-          k -> new OffsetAndMetadata(
-            offset = partitionData.committedOffset,
-            leaderEpoch = leaderEpochOpt,
-            metadata = metadata,
-            commitTimestamp = partitionData.commitTimestamp match {
-              case OffsetCommitRequest.DEFAULT_TIMESTAMP => currentTimestamp
-              case customTimestamp => customTimestamp
-            },
-            expireTimestamp = offsetCommitRequest.data.retentionTimeMs match {
-              case OffsetCommitRequest.DEFAULT_RETENTION_TIME => None
-              case retentionTime => Some(currentTimestamp + retentionTime)
-            }
-          )
+  private def commitOffsetsToZookeeper(
+    request: RequestChannel.Request,
+    offsetCommitRequest: OffsetCommitRequest,
+    authorizedTopicsRequest: 
mutable.ArrayBuffer[OffsetCommitRequestData.OffsetCommitRequestTopic],
+    responseBuilder: OffsetCommitResponse.Builder
+  ): CompletableFuture[Unit] = {
+    val zkSupport = metadataSupport.requireZkOrThrow(
+      KafkaApis.unsupported("Version 0 offset commit requests"))
+
+    authorizedTopicsRequest.foreach { topic =>
+      topic.partitions.forEach { partition =>
+        val error = try {
+          if (partition.committedMetadata != null && 
partition.committedMetadata.length > config.offsetMetadataMaxSize) {
+            Errors.OFFSET_METADATA_TOO_LARGE
+          } else {
+            zkSupport.zkClient.setOrCreateConsumerOffset(
+              offsetCommitRequest.data.groupId,
+              new TopicPartition(topic.name, partition.partitionIndex),
+              partition.committedOffset
+            )
+            Errors.NONE
+          }
+        } catch {
+          case e: Throwable =>
+            Errors.forException(e)
         }
 
-        // call coordinator to handle commit offset
-        groupCoordinator.handleCommitOffsets(
-          offsetCommitRequest.data.groupId,
-          offsetCommitRequest.data.memberId,
-          Option(offsetCommitRequest.data.groupInstanceId),
-          offsetCommitRequest.data.generationId,
-          partitionData,
-          sendResponseCallback,
-          requestLocal)
+        responseBuilder.addPartition(topic.name, partition.partitionIndex, 
error)
+      }
+    }
+
+    requestHelper.sendMaybeThrottle(request, responseBuilder.build())
+    CompletableFuture.completedFuture[Unit](())
+  }
+
+  private def commitOffsetsToCoordinator(
+    request: RequestChannel.Request,
+    offsetCommitRequest: OffsetCommitRequest,
+    authorizedTopicsRequest: 
mutable.ArrayBuffer[OffsetCommitRequestData.OffsetCommitRequestTopic],
+    responseBuilder: OffsetCommitResponse.Builder,
+    requestLocal: RequestLocal
+  ): CompletableFuture[Unit] = {
+    val offsetCommitRequestData = new OffsetCommitRequestData()
+      .setGroupId(offsetCommitRequest.data.groupId)
+      .setMemberId(offsetCommitRequest.data.memberId)
+      .setGenerationId(offsetCommitRequest.data.generationId)
+      .setRetentionTimeMs(offsetCommitRequest.data.retentionTimeMs)
+      .setGroupInstanceId(offsetCommitRequest.data.groupInstanceId)
+      .setTopics(authorizedTopicsRequest.asJava)
+
+    newGroupCoordinator.commitOffsets(
+      request.context,
+      offsetCommitRequestData,
+      requestLocal.bufferSupplier
+    ).handle[Unit] { (results, exception) =>
+      if (exception != null) {
+        requestHelper.sendMaybeThrottle(request, 
offsetCommitRequest.getErrorResponse(exception))
+      } else {
+        requestHelper.sendMaybeThrottle(request, 
responseBuilder.merge(results).build())
       }
     }
   }
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 87b9d48d46c..30a96537cc1 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -506,7 +506,7 @@ class KafkaServer(
           metadataSupport = zkSupport,
           replicaManager = replicaManager,
           groupCoordinator = groupCoordinator,
-          newGroupCoordinator = new GroupCoordinatorAdapter(groupCoordinator),
+          newGroupCoordinator = new GroupCoordinatorAdapter(groupCoordinator, 
time),
           txnCoordinator = transactionCoordinator,
           autoTopicCreationManager = autoTopicCreationManager,
           brokerId = config.brokerId,
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
index 1fbdf333a96..9ecbdd54e06 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
@@ -16,17 +16,19 @@
  */
 package kafka.coordinator.group
 
+import kafka.common.OffsetAndMetadata
 import 
kafka.coordinator.group.GroupCoordinatorConcurrencyTest.{JoinGroupCallback, 
SyncGroupCallback}
 import kafka.server.RequestLocal
+import kafka.utils.MockTime
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.message.{DeleteGroupsResponseData, 
DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, 
JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, 
LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, 
OffsetFetchRequestData, OffsetFetchResponseData, SyncGroupRequestData, 
SyncGroupResponseData}
+import org.apache.kafka.common.message.{DeleteGroupsResponseData, 
DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, 
JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, 
LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, 
OffsetCommitRequestData, OffsetCommitResponseData, OffsetFetchRequestData, 
OffsetFetchResponseData, SyncGroupRequestData, SyncGroupResponseData}
 import 
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol
 import 
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember
 import org.apache.kafka.common.network.{ClientInformation, ListenerName}
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.{OffsetFetchResponse, RequestContext, 
RequestHeader}
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
-import org.apache.kafka.common.utils.BufferSupplier
+import org.apache.kafka.common.utils.{BufferSupplier, Time}
 import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
 import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
 import org.junit.jupiter.api.Test
@@ -60,7 +62,7 @@ class GroupCoordinatorAdapterTest {
   @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
   def testJoinGroup(version: Short): Unit = {
     val groupCoordinator = mock(classOf[GroupCoordinator])
-    val adapter = new GroupCoordinatorAdapter(groupCoordinator)
+    val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM)
 
     val ctx = makeContext(ApiKeys.JOIN_GROUP, version)
     val request = new JoinGroupRequestData()
@@ -148,7 +150,7 @@ class GroupCoordinatorAdapterTest {
   @ApiKeyVersionsSource(apiKey = ApiKeys.SYNC_GROUP)
   def testSyncGroup(version: Short): Unit = {
     val groupCoordinator = mock(classOf[GroupCoordinator])
-    val adapter = new GroupCoordinatorAdapter(groupCoordinator)
+    val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM)
 
     val ctx = makeContext(ApiKeys.SYNC_GROUP, version)
     val data = new SyncGroupRequestData()
@@ -215,7 +217,7 @@ class GroupCoordinatorAdapterTest {
   @Test
   def testHeartbeat(): Unit = {
     val groupCoordinator = mock(classOf[GroupCoordinator])
-    val adapter = new GroupCoordinatorAdapter(groupCoordinator)
+    val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM)
 
     val ctx = makeContext(ApiKeys.HEARTBEAT, ApiKeys.HEARTBEAT.latestVersion)
     val data = new HeartbeatRequestData()
@@ -246,7 +248,7 @@ class GroupCoordinatorAdapterTest {
 
   def testLeaveGroup(): Unit = {
     val groupCoordinator = mock(classOf[GroupCoordinator])
-    val adapter = new GroupCoordinatorAdapter(groupCoordinator)
+    val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM)
 
     val ctx = makeContext(ApiKeys.LEAVE_GROUP, 
ApiKeys.LEAVE_GROUP.latestVersion)
     val data = new LeaveGroupRequestData()
@@ -315,7 +317,7 @@ class GroupCoordinatorAdapterTest {
     expectedStatesFilter: Set[String]
   ): Unit = {
     val groupCoordinator = mock(classOf[GroupCoordinator])
-    val adapter = new GroupCoordinatorAdapter(groupCoordinator)
+    val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM)
 
     val ctx = makeContext(ApiKeys.LIST_GROUPS, 
ApiKeys.LIST_GROUPS.latestVersion)
     val data = new ListGroupsRequestData()
@@ -350,7 +352,7 @@ class GroupCoordinatorAdapterTest {
   @Test
   def testDescribeGroup(): Unit = {
     val groupCoordinator = mock(classOf[GroupCoordinator])
-    val adapter = new GroupCoordinatorAdapter(groupCoordinator)
+    val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM)
 
     val groupId1 = "group-1"
     val groupId2 = "group-2"
@@ -407,7 +409,7 @@ class GroupCoordinatorAdapterTest {
   @Test
   def testDeleteGroups(): Unit = {
     val groupCoordinator = mock(classOf[GroupCoordinator])
-    val adapter = new GroupCoordinatorAdapter(groupCoordinator)
+    val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM)
 
     val ctx = makeContext(ApiKeys.DELETE_GROUPS, 
ApiKeys.DELETE_GROUPS.latestVersion)
     val groupIds = List("group-1", "group-2", "group-3")
@@ -446,7 +448,7 @@ class GroupCoordinatorAdapterTest {
     val bar1 = new TopicPartition("bar", 1)
 
     val groupCoordinator = mock(classOf[GroupCoordinator])
-    val adapter = new GroupCoordinatorAdapter(groupCoordinator)
+    val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM)
 
     when(groupCoordinator.handleFetchOffsets(
       "group",
@@ -527,7 +529,7 @@ class GroupCoordinatorAdapterTest {
     val bar1 = new TopicPartition("bar", 1)
 
     val groupCoordinator = mock(classOf[GroupCoordinator])
-    val adapter = new GroupCoordinatorAdapter(groupCoordinator)
+    val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM)
 
     when(groupCoordinator.handleFetchOffsets(
       "group",
@@ -608,4 +610,74 @@ class GroupCoordinatorAdapterTest {
       future.get().asScala.toList.sortWith(_.name > _.name)
     )
   }
+
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+  def testCommitOffsets(version: Short): Unit = {
+    val groupCoordinator = mock(classOf[GroupCoordinator])
+    val time = new MockTime()
+    val adapter = new GroupCoordinatorAdapter(groupCoordinator, time)
+    val now = time.milliseconds()
+
+    val ctx = makeContext(ApiKeys.OFFSET_COMMIT, version)
+    val data = new OffsetCommitRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setGenerationId(10)
+      .setRetentionTimeMs(1000)
+      .setTopics(List(
+        new OffsetCommitRequestData.OffsetCommitRequestTopic()
+          .setName("foo")
+          .setPartitions(List(
+            new OffsetCommitRequestData.OffsetCommitRequestPartition()
+              .setPartitionIndex(0)
+              .setCommittedOffset(100)
+              .setCommitTimestamp(now)
+              .setCommittedLeaderEpoch(1)
+          ).asJava)
+      ).asJava)
+    val bufferSupplier = BufferSupplier.create()
+
+    val future = adapter.commitOffsets(ctx, data, bufferSupplier)
+    assertFalse(future.isDone)
+
+    val capturedCallback: ArgumentCaptor[Map[TopicPartition, Errors] => Unit] =
+      ArgumentCaptor.forClass(classOf[Map[TopicPartition, Errors] => Unit])
+
+    verify(groupCoordinator).handleCommitOffsets(
+      ArgumentMatchers.eq(data.groupId),
+      ArgumentMatchers.eq(data.memberId),
+      ArgumentMatchers.eq(None),
+      ArgumentMatchers.eq(data.generationId),
+      ArgumentMatchers.eq(Map(
+        new TopicPartition("foo", 0) -> new OffsetAndMetadata(
+          offset = 100,
+          leaderEpoch = Optional.of[Integer](1),
+          metadata = "",
+          commitTimestamp = now,
+          expireTimestamp = Some(now + 1000L)
+        )
+      )),
+      capturedCallback.capture(),
+      ArgumentMatchers.eq(RequestLocal(bufferSupplier))
+    )
+
+    capturedCallback.getValue.apply(Map(
+      new TopicPartition("foo", 0) -> Errors.NONE
+    ))
+
+    val expectedResponseData = new OffsetCommitResponseData()
+      .setTopics(List(
+        new OffsetCommitResponseData.OffsetCommitResponseTopic()
+          .setName("foo")
+          .setPartitions(List(
+            new OffsetCommitResponseData.OffsetCommitResponsePartition()
+              .setPartitionIndex(0)
+              .setErrorCode(Errors.NONE.code)
+          ).asJava)
+      ).asJava)
+
+    assertTrue(future.isDone)
+    assertEquals(expectedResponseData, future.get())
+  }
 }
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 27b2338a8e7..81c2c9ffbe2 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -1264,6 +1264,239 @@ class KafkaApisTest {
     )
   }
 
+  @Test
+  def testHandleOffsetCommitRequest(): Unit = {
+    addTopicToMetadataCache("foo", numPartitions = 1)
+
+    val offsetCommitRequest = new OffsetCommitRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setTopics(List(
+        new OffsetCommitRequestData.OffsetCommitRequestTopic()
+          .setName("foo")
+          .setPartitions(List(
+            new OffsetCommitRequestData.OffsetCommitRequestPartition()
+              .setPartitionIndex(0)
+              .setCommittedOffset(10)).asJava)).asJava)
+
+    val requestChannelRequest = buildRequest(new 
OffsetCommitRequest.Builder(offsetCommitRequest).build())
+
+    val future = new CompletableFuture[OffsetCommitResponseData]()
+    when(newGroupCoordinator.commitOffsets(
+      requestChannelRequest.context,
+      offsetCommitRequest,
+      RequestLocal.NoCaching.bufferSupplier
+    )).thenReturn(future)
+
+    createKafkaApis().handle(
+      requestChannelRequest,
+      RequestLocal.NoCaching
+    )
+
+    // This is the response returned by the group coordinator.
+    val offsetCommitResponse = new OffsetCommitResponseData()
+      .setTopics(List(
+        new OffsetCommitResponseData.OffsetCommitResponseTopic()
+          .setName("foo")
+          .setPartitions(List(
+            new OffsetCommitResponseData.OffsetCommitResponsePartition()
+              .setPartitionIndex(0)
+              .setErrorCode(Errors.NONE.code)).asJava)).asJava)
+
+    future.complete(offsetCommitResponse)
+    val response = 
verifyNoThrottling[OffsetCommitResponse](requestChannelRequest)
+    assertEquals(offsetCommitResponse, response.data)
+  }
+
+  @Test
+  def testHandleOffsetCommitRequestFutureFailed(): Unit = {
+    addTopicToMetadataCache("foo", numPartitions = 1)
+
+    val offsetCommitRequest = new OffsetCommitRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setTopics(List(
+        new OffsetCommitRequestData.OffsetCommitRequestTopic()
+          .setName("foo")
+          .setPartitions(List(
+            new OffsetCommitRequestData.OffsetCommitRequestPartition()
+              .setPartitionIndex(0)
+              .setCommittedOffset(10)).asJava)).asJava)
+
+    val requestChannelRequest = buildRequest(new 
OffsetCommitRequest.Builder(offsetCommitRequest).build())
+
+    val future = new CompletableFuture[OffsetCommitResponseData]()
+    when(newGroupCoordinator.commitOffsets(
+      requestChannelRequest.context,
+      offsetCommitRequest,
+      RequestLocal.NoCaching.bufferSupplier
+    )).thenReturn(future)
+
+    createKafkaApis().handle(
+      requestChannelRequest,
+      RequestLocal.NoCaching
+    )
+
+    val expectedOffsetCommitResponse = new OffsetCommitResponseData()
+      .setTopics(List(
+        new OffsetCommitResponseData.OffsetCommitResponseTopic()
+          .setName("foo")
+          .setPartitions(List(
+            new OffsetCommitResponseData.OffsetCommitResponsePartition()
+              .setPartitionIndex(0)
+              .setErrorCode(Errors.NOT_COORDINATOR.code)).asJava)).asJava)
+
+    future.completeExceptionally(Errors.NOT_COORDINATOR.exception)
+    val response = 
verifyNoThrottling[OffsetCommitResponse](requestChannelRequest)
+    assertEquals(expectedOffsetCommitResponse, response.data)
+  }
+
+  @Test
+  def testHandleOffsetCommitRequestTopicsAndPartitionsValidation(): Unit = {
+    addTopicToMetadataCache("foo", numPartitions = 2)
+    addTopicToMetadataCache("bar", numPartitions = 2)
+
+    val offsetCommitRequest = new OffsetCommitRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setTopics(List(
+        // foo exists but only has 2 partitions.
+        new OffsetCommitRequestData.OffsetCommitRequestTopic()
+          .setName("foo")
+          .setPartitions(List(
+            new OffsetCommitRequestData.OffsetCommitRequestPartition()
+              .setPartitionIndex(0)
+              .setCommittedOffset(10),
+            new OffsetCommitRequestData.OffsetCommitRequestPartition()
+              .setPartitionIndex(1)
+              .setCommittedOffset(20),
+            new OffsetCommitRequestData.OffsetCommitRequestPartition()
+              .setPartitionIndex(2)
+              .setCommittedOffset(30)).asJava),
+        // bar exists.
+        new OffsetCommitRequestData.OffsetCommitRequestTopic()
+          .setName("bar")
+          .setPartitions(List(
+            new OffsetCommitRequestData.OffsetCommitRequestPartition()
+              .setPartitionIndex(0)
+              .setCommittedOffset(40),
+            new OffsetCommitRequestData.OffsetCommitRequestPartition()
+              .setPartitionIndex(1)
+              .setCommittedOffset(50)).asJava),
+        // zar does not exist.
+        new OffsetCommitRequestData.OffsetCommitRequestTopic()
+          .setName("zar")
+          .setPartitions(List(
+            new OffsetCommitRequestData.OffsetCommitRequestPartition()
+              .setPartitionIndex(0)
+              .setCommittedOffset(60),
+            new OffsetCommitRequestData.OffsetCommitRequestPartition()
+              .setPartitionIndex(1)
+              .setCommittedOffset(70)).asJava)).asJava)
+
+    val requestChannelRequest = buildRequest(new 
OffsetCommitRequest.Builder(offsetCommitRequest).build())
+
+    // This is the request expected by the group coordinator.
+    val expectedOffsetCommitRequest = new OffsetCommitRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setTopics(List(
+        // foo exists but only has 2 partitions.
+        new OffsetCommitRequestData.OffsetCommitRequestTopic()
+          .setName("foo")
+          .setPartitions(List(
+            new OffsetCommitRequestData.OffsetCommitRequestPartition()
+              .setPartitionIndex(0)
+              .setCommittedOffset(10),
+            new OffsetCommitRequestData.OffsetCommitRequestPartition()
+              .setPartitionIndex(1)
+              .setCommittedOffset(20)).asJava),
+        new OffsetCommitRequestData.OffsetCommitRequestTopic()
+          .setName("bar")
+          .setPartitions(List(
+            new OffsetCommitRequestData.OffsetCommitRequestPartition()
+              .setPartitionIndex(0)
+              .setCommittedOffset(40),
+            new OffsetCommitRequestData.OffsetCommitRequestPartition()
+              .setPartitionIndex(1)
+              .setCommittedOffset(50)).asJava)).asJava)
+
+    val future = new CompletableFuture[OffsetCommitResponseData]()
+    when(newGroupCoordinator.commitOffsets(
+      requestChannelRequest.context,
+      expectedOffsetCommitRequest,
+      RequestLocal.NoCaching.bufferSupplier
+    )).thenReturn(future)
+
+    createKafkaApis().handle(
+      requestChannelRequest,
+      RequestLocal.NoCaching
+    )
+
+    // This is the response returned by the group coordinator.
+    val offsetCommitResponse = new OffsetCommitResponseData()
+      .setTopics(List(
+        new OffsetCommitResponseData.OffsetCommitResponseTopic()
+          .setName("foo")
+          .setPartitions(List(
+            new OffsetCommitResponseData.OffsetCommitResponsePartition()
+              .setPartitionIndex(0)
+              .setErrorCode(Errors.NONE.code),
+            new OffsetCommitResponseData.OffsetCommitResponsePartition()
+              .setPartitionIndex(1)
+              .setErrorCode(Errors.NONE.code)).asJava),
+        new OffsetCommitResponseData.OffsetCommitResponseTopic()
+          .setName("bar")
+          .setPartitions(List(
+            new OffsetCommitResponseData.OffsetCommitResponsePartition()
+              .setPartitionIndex(0)
+              .setErrorCode(Errors.NONE.code),
+            new OffsetCommitResponseData.OffsetCommitResponsePartition()
+              .setPartitionIndex(1)
+              .setErrorCode(Errors.NONE.code)).asJava)).asJava)
+
+    val expectedOffsetCommitResponse = new OffsetCommitResponseData()
+      .setTopics(List(
+        new OffsetCommitResponseData.OffsetCommitResponseTopic()
+          .setName("foo")
+          .setPartitions(List(
+            // foo-2 is first because partitions failing the validation
+            // are put in the response first.
+            new OffsetCommitResponseData.OffsetCommitResponsePartition()
+              .setPartitionIndex(2)
+              .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code),
+            new OffsetCommitResponseData.OffsetCommitResponsePartition()
+              .setPartitionIndex(0)
+              .setErrorCode(Errors.NONE.code),
+            new OffsetCommitResponseData.OffsetCommitResponsePartition()
+              .setPartitionIndex(1)
+              .setErrorCode(Errors.NONE.code)).asJava),
+        // zar is before bar because topics failing the validation are
+        // put in the response first.
+        new OffsetCommitResponseData.OffsetCommitResponseTopic()
+          .setName("zar")
+          .setPartitions(List(
+            new OffsetCommitResponseData.OffsetCommitResponsePartition()
+              .setPartitionIndex(0)
+              .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code),
+            new OffsetCommitResponseData.OffsetCommitResponsePartition()
+              .setPartitionIndex(1)
+              .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)).asJava),
+        new OffsetCommitResponseData.OffsetCommitResponseTopic()
+          .setName("bar")
+          .setPartitions(List(
+            new OffsetCommitResponseData.OffsetCommitResponsePartition()
+              .setPartitionIndex(0)
+              .setErrorCode(Errors.NONE.code),
+            new OffsetCommitResponseData.OffsetCommitResponsePartition()
+              .setPartitionIndex(1)
+              .setErrorCode(Errors.NONE.code)).asJava)).asJava)
+
+    future.complete(offsetCommitResponse)
+    val response = 
verifyNoThrottling[OffsetCommitResponse](requestChannelRequest)
+    assertEquals(expectedOffsetCommitResponse, response.data)
+  }
+
   @Test
   def testOffsetCommitWithInvalidPartition(): Unit = {
     val topic = "topic"
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
index 97bd1902460..30d48410822 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
@@ -22,6 +22,8 @@ import org.apache.kafka.common.message.HeartbeatRequestData;
 import org.apache.kafka.common.message.HeartbeatResponseData;
 import org.apache.kafka.common.message.JoinGroupRequestData;
 import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
 import org.apache.kafka.common.message.LeaveGroupRequestData;
 import org.apache.kafka.common.message.LeaveGroupResponseData;
 import org.apache.kafka.common.message.ListGroupsRequestData;
@@ -164,5 +166,20 @@ public interface GroupCoordinator {
         String groupId,
         boolean requireStable
     );
+
+    /**
+     * Commit offsets for a given Group.
+     *
+     * @param context           The request context.
+     * @param request           The OffsetCommitRequest data.
+     * @param bufferSupplier    The buffer supplier tight to the request 
thread.
+     *
+     * @return A future yielding the response or an exception.
+     */
+    CompletableFuture<OffsetCommitResponseData> commitOffsets(
+        RequestContext context,
+        OffsetCommitRequestData request,
+        BufferSupplier bufferSupplier
+    );
 }
 


Reply via email to