This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e6669672ef1 KAFKA-14367; Add `OffsetCommit` to the new
`GroupCoordinator` interface (#12886)
e6669672ef1 is described below
commit e6669672ef11ee869e3e4e5ae04bad6c505d5342
Author: David Jacot <[email protected]>
AuthorDate: Thu Jan 12 18:05:49 2023 +0100
KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface
(#12886)
This patch adds `OffsetCommit` to the new `GroupCoordinator` interface and
updates `KafkaApis` to use it.
Reviewers: Omnia G H Ibrahim <[email protected]>, Jeff Kim
<[email protected]>, Justine Olshan <[email protected]>
---
.../kafka/common/requests/OffsetCommitRequest.java | 5 +
.../common/requests/OffsetCommitResponse.java | 80 +++++++
.../kafka/server/builders/KafkaApisBuilder.java | 2 +-
.../group/GroupCoordinatorAdapter.scala | 87 +++++++-
.../src/main/scala/kafka/server/BrokerServer.scala | 2 +-
core/src/main/scala/kafka/server/KafkaApis.scala | 239 +++++++++++----------
core/src/main/scala/kafka/server/KafkaServer.scala | 2 +-
.../group/GroupCoordinatorAdapterTest.scala | 94 +++++++-
.../scala/unit/kafka/server/KafkaApisTest.scala | 233 ++++++++++++++++++++
.../kafka/coordinator/group/GroupCoordinator.java | 17 ++
10 files changed, 627 insertions(+), 134 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
index 9869da5d254..8fc884e6614 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
@@ -118,6 +118,11 @@ public class OffsetCommitRequest extends AbstractRequest {
.setThrottleTimeMs(throttleTimeMs));
}
+ @Override
+ public OffsetCommitResponse getErrorResponse(Throwable e) {
+ return getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, e);
+ }
+
public static OffsetCommitRequest parse(ByteBuffer buffer, short version) {
return new OffsetCommitRequest(new OffsetCommitRequestData(new
ByteBufferAccessor(buffer), version), version);
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
index 713b68974a1..8c0bb9c182d 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
@@ -27,7 +27,9 @@ import org.apache.kafka.common.protocol.Errors;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.function.Function;
/**
* Possible error codes:
@@ -116,4 +118,82 @@ public class OffsetCommitResponse extends AbstractResponse
{
public boolean shouldClientThrottle(short version) {
return version >= 4;
}
+
+ public static class Builder {
+ OffsetCommitResponseData data = new OffsetCommitResponseData();
+ HashMap<String, OffsetCommitResponseTopic> byTopicName = new
HashMap<>();
+
+ private OffsetCommitResponseTopic getOrCreateTopic(
+ String topicName
+ ) {
+ OffsetCommitResponseTopic topic = byTopicName.get(topicName);
+ if (topic == null) {
+ topic = new OffsetCommitResponseTopic().setName(topicName);
+ data.topics().add(topic);
+ byTopicName.put(topicName, topic);
+ }
+ return topic;
+ }
+
+ public Builder addPartition(
+ String topicName,
+ int partitionIndex,
+ Errors error
+ ) {
+ final OffsetCommitResponseTopic topicResponse =
getOrCreateTopic(topicName);
+
+ topicResponse.partitions().add(new OffsetCommitResponsePartition()
+ .setPartitionIndex(partitionIndex)
+ .setErrorCode(error.code()));
+
+ return this;
+ }
+
+ public <P> Builder addPartitions(
+ String topicName,
+ List<P> partitions,
+ Function<P, Integer> partitionIndex,
+ Errors error
+ ) {
+ final OffsetCommitResponseTopic topicResponse =
getOrCreateTopic(topicName);
+
+ partitions.forEach(partition -> {
+ topicResponse.partitions().add(new
OffsetCommitResponsePartition()
+ .setPartitionIndex(partitionIndex.apply(partition))
+ .setErrorCode(error.code()));
+ });
+
+ return this;
+ }
+
+ public Builder merge(
+ OffsetCommitResponseData newData
+ ) {
+ if (data.topics().isEmpty()) {
+ // If the current data is empty, we can discard it and use the
new data.
+ data = newData;
+ } else {
+ // Otherwise, we have to merge them together.
+ newData.topics().forEach(newTopic -> {
+ OffsetCommitResponseTopic existingTopic =
byTopicName.get(newTopic.name());
+ if (existingTopic == null) {
+ // If no topic exists, we can directly copy the new
topic data.
+ data.topics().add(newTopic);
+ byTopicName.put(newTopic.name(), newTopic);
+ } else {
+ // Otherwise, we add the partitions to the existing
one. Note we
+ // expect non-overlapping partitions here as we don't
verify
+ // if the partition is already in the list before
adding it.
+
existingTopic.partitions().addAll(newTopic.partitions());
+ }
+ });
+ }
+
+ return this;
+ }
+
+ public OffsetCommitResponse build() {
+ return new OffsetCommitResponse(data);
+ }
+ }
}
diff --git a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
index 95c18a6f3e3..18cd42c77cb 100644
--- a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
+++ b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
@@ -179,7 +179,7 @@ public class KafkaApisBuilder {
metadataSupport,
replicaManager,
groupCoordinator,
- new GroupCoordinatorAdapter(groupCoordinator),
+ new GroupCoordinatorAdapter(groupCoordinator,
time),
txnCoordinator,
autoTopicCreationManager,
brokerId,
diff --git
a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
index 29242a8774a..1a97079ce39 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
@@ -16,15 +16,18 @@
*/
package kafka.coordinator.group
+import kafka.common.OffsetAndMetadata
import kafka.server.RequestLocal
import kafka.utils.Implicits.MapExtensionMethods
import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.message.{DeleteGroupsResponseData,
DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData,
JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData,
LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData,
OffsetFetchRequestData, OffsetFetchResponseData, SyncGroupRequestData,
SyncGroupResponseData}
+import org.apache.kafka.common.message.{DeleteGroupsResponseData,
DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData,
JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData,
LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData,
OffsetCommitRequestData, OffsetCommitResponseData, OffsetFetchRequestData,
OffsetFetchResponseData, SyncGroupRequestData, SyncGroupResponseData}
import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.RequestContext
-import org.apache.kafka.common.utils.BufferSupplier
+import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.requests.{OffsetCommitRequest, RequestContext}
+import org.apache.kafka.common.utils.{BufferSupplier, Time}
import java.util
+import java.util.Optional
import java.util.concurrent.CompletableFuture
import scala.collection.{immutable, mutable}
import scala.jdk.CollectionConverters._
@@ -34,7 +37,8 @@ import scala.jdk.CollectionConverters._
* that exposes the new org.apache.kafka.coordinator.group.GroupCoordinator
interface.
*/
class GroupCoordinatorAdapter(
- val coordinator: GroupCoordinator
+ private val coordinator: GroupCoordinator,
+ private val time: Time
) extends org.apache.kafka.coordinator.group.GroupCoordinator {
override def joinGroup(
@@ -312,4 +316,79 @@ class GroupCoordinatorAdapter(
future
}
+
+ override def commitOffsets(
+ context: RequestContext,
+ request: OffsetCommitRequestData,
+ bufferSupplier: BufferSupplier
+ ): CompletableFuture[OffsetCommitResponseData] = {
+ val currentTimeMs = time.milliseconds
+ val future = new CompletableFuture[OffsetCommitResponseData]()
+
+ def callback(commitStatus: Map[TopicPartition, Errors]): Unit = {
+ val response = new OffsetCommitResponseData()
+ val byTopics = new mutable.HashMap[String,
OffsetCommitResponseData.OffsetCommitResponseTopic]()
+
+ commitStatus.forKeyValue { (tp, error) =>
+ val topic = byTopics.get(tp.topic) match {
+ case Some(existingTopic) =>
+ existingTopic
+ case None =>
+ val newTopic = new
OffsetCommitResponseData.OffsetCommitResponseTopic().setName(tp.topic)
+ byTopics += tp.topic -> newTopic
+ response.topics.add(newTopic)
+ newTopic
+ }
+
+ topic.partitions.add(new
OffsetCommitResponseData.OffsetCommitResponsePartition()
+ .setPartitionIndex(tp.partition)
+ .setErrorCode(error.code))
+ }
+
+ future.complete(response)
+ }
+
+ // "default" expiration timestamp is defined as now + retention. The
retention may be overridden
+ // in versions from v2 to v4. Otherwise, the retention defined on the
broker is used. If an explicit
+ // commit timestamp is provided (v1 only), the expiration timestamp is
computed based on that.
+ val expireTimeMs = request.retentionTimeMs match {
+ case OffsetCommitRequest.DEFAULT_RETENTION_TIME => None
+ case retentionTimeMs => Some(currentTimeMs + retentionTimeMs)
+ }
+
+ val partitions = new mutable.HashMap[TopicPartition, OffsetAndMetadata]()
+ request.topics.forEach { topic =>
+ topic.partitions.forEach { partition =>
+ val tp = new TopicPartition(topic.name, partition.partitionIndex)
+ partitions += tp -> new OffsetAndMetadata(
+ offset = partition.committedOffset,
+ leaderEpoch = partition.committedLeaderEpoch match {
+ case RecordBatch.NO_PARTITION_LEADER_EPOCH =>
Optional.empty[Integer]
+ case committedLeaderEpoch =>
Optional.of[Integer](committedLeaderEpoch)
+ },
+ metadata = partition.committedMetadata match {
+ case null => OffsetAndMetadata.NoMetadata
+ case metadata => metadata
+ },
+ commitTimestamp = partition.commitTimestamp match {
+ case OffsetCommitRequest.DEFAULT_TIMESTAMP => currentTimeMs
+ case customTimestamp => customTimestamp
+ },
+ expireTimestamp = expireTimeMs
+ )
+ }
+ }
+
+ coordinator.handleCommitOffsets(
+ request.groupId,
+ request.memberId,
+ Option(request.groupInstanceId),
+ request.generationId,
+ partitions.toMap,
+ callback,
+ RequestLocal(bufferSupplier)
+ )
+
+ future
+ }
}
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 83cdb046863..c72f297b2fc 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -409,7 +409,7 @@ class BrokerServer(
metadataSupport = raftSupport,
replicaManager = replicaManager,
groupCoordinator = groupCoordinator,
- newGroupCoordinator = new GroupCoordinatorAdapter(groupCoordinator),
+ newGroupCoordinator = new GroupCoordinatorAdapter(groupCoordinator,
time),
txnCoordinator = transactionCoordinator,
autoTopicCreationManager = autoTopicCreationManager,
brokerId = config.nodeId,
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 20cfe766eb2..42768fa98f9 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -187,7 +187,7 @@ class KafkaApis(val requestChannel: RequestChannel,
case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
case ApiKeys.UPDATE_METADATA => handleUpdateMetadataRequest(request,
requestLocal)
case ApiKeys.CONTROLLED_SHUTDOWN =>
handleControlledShutdownRequest(request)
- case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request,
requestLocal)
+ case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request,
requestLocal).exceptionally(handleError)
case ApiKeys.OFFSET_FETCH =>
handleOffsetFetchRequest(request).exceptionally(handleError)
case ApiKeys.FIND_COORDINATOR => handleFindCoordinatorRequest(request)
case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request,
requestLocal).exceptionally(handleError)
@@ -410,137 +410,144 @@ class KafkaApis(val requestChannel: RequestChannel,
/**
* Handle an offset commit request
*/
- def handleOffsetCommitRequest(request: RequestChannel.Request, requestLocal:
RequestLocal): Unit = {
- val header = request.header
+ def handleOffsetCommitRequest(
+ request: RequestChannel.Request,
+ requestLocal: RequestLocal
+ ): CompletableFuture[Unit] = {
val offsetCommitRequest = request.body[OffsetCommitRequest]
- val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]()
- val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]()
- // the callback for sending an offset commit response
- def sendResponseCallback(commitStatus: Map[TopicPartition, Errors]): Unit
= {
- val combinedCommitStatus = commitStatus ++ unauthorizedTopicErrors ++
nonExistingTopicErrors
- if (isDebugEnabled)
- combinedCommitStatus.forKeyValue { (topicPartition, error) =>
- if (error != Errors.NONE) {
- debug(s"Offset commit request with correlation id
${header.correlationId} from client ${header.clientId} " +
- s"on partition $topicPartition failed due to
${error.exceptionName}")
- }
- }
- requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
- new OffsetCommitResponse(requestThrottleMs,
combinedCommitStatus.asJava))
- }
-
- // reject the request if not authorized to the group
+ // Reject the request if not authorized to the group
if (!authHelper.authorize(request.context, READ, GROUP,
offsetCommitRequest.data.groupId)) {
- val error = Errors.GROUP_AUTHORIZATION_FAILED
- val responseTopicList = OffsetCommitRequest.getErrorResponseTopics(
- offsetCommitRequest.data.topics,
- error)
-
- requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
new OffsetCommitResponse(
- new OffsetCommitResponseData()
- .setTopics(responseTopicList)
- .setThrottleTimeMs(requestThrottleMs)
- ))
+ requestHelper.sendMaybeThrottle(request,
offsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
+ CompletableFuture.completedFuture[Unit](())
} else if (offsetCommitRequest.data.groupInstanceId != null &&
config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) {
// Only enable static membership when IBP >= 2.3, because it is not safe
for the broker to use the static member logic
// until we are sure that all brokers support it. If static group being
loaded by an older coordinator, it will discard
// the group.instance.id field, so static members could accidentally
become "dynamic", which leads to wrong states.
- val errorMap = new mutable.HashMap[TopicPartition, Errors]
- for (topicData <- offsetCommitRequest.data.topics.asScala) {
- for (partitionData <- topicData.partitions.asScala) {
- val topicPartition = new TopicPartition(topicData.name,
partitionData.partitionIndex)
- errorMap += topicPartition -> Errors.UNSUPPORTED_VERSION
- }
- }
- sendResponseCallback(errorMap.toMap)
+ requestHelper.sendMaybeThrottle(request,
offsetCommitRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+ CompletableFuture.completedFuture[Unit](())
} else {
- val authorizedTopicRequestInfoBldr =
immutable.Map.newBuilder[TopicPartition,
OffsetCommitRequestData.OffsetCommitRequestPartition]
+ val authorizedTopics = authHelper.filterByAuthorized(
+ request.context,
+ READ,
+ TOPIC,
+ offsetCommitRequest.data.topics.asScala
+ )(_.name)
+
+ val responseBuilder = new OffsetCommitResponse.Builder()
+ val authorizedTopicsRequest = new
mutable.ArrayBuffer[OffsetCommitRequestData.OffsetCommitRequestTopic]()
+ offsetCommitRequest.data.topics.forEach { topic =>
+ if (!authorizedTopics.contains(topic.name)) {
+ // If the topic is not authorized, we add the topic and all its
partitions
+ // to the response with TOPIC_AUTHORIZATION_FAILED.
+
responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition](
+ topic.name, topic.partitions, _.partitionIndex,
Errors.TOPIC_AUTHORIZATION_FAILED)
+ } else if (!metadataCache.contains(topic.name)) {
+ // If the topic is unknown, we add the topic and all its partitions
+ // to the response with UNKNOWN_TOPIC_OR_PARTITION.
+
responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition](
+ topic.name, topic.partitions, _.partitionIndex,
Errors.UNKNOWN_TOPIC_OR_PARTITION)
+ } else {
+ // Otherwise, we check all partitions to ensure that they all exist.
+ val topicWithValidPartitions = new
OffsetCommitRequestData.OffsetCommitRequestTopic().setName(topic.name)
- val topics = offsetCommitRequest.data.topics.asScala
- val authorizedTopics = authHelper.filterByAuthorized(request.context,
READ, TOPIC, topics)(_.name)
- for (topicData <- topics) {
- for (partitionData <- topicData.partitions.asScala) {
- val topicPartition = new TopicPartition(topicData.name,
partitionData.partitionIndex)
- if (!authorizedTopics.contains(topicData.name))
- unauthorizedTopicErrors += (topicPartition ->
Errors.TOPIC_AUTHORIZATION_FAILED)
- else if (!metadataCache.contains(topicPartition))
- nonExistingTopicErrors += (topicPartition ->
Errors.UNKNOWN_TOPIC_OR_PARTITION)
- else
- authorizedTopicRequestInfoBldr += (topicPartition -> partitionData)
+ topic.partitions.forEach { partition =>
+ if (metadataCache.getPartitionInfo(topic.name,
partition.partitionIndex).nonEmpty) {
+ topicWithValidPartitions.partitions.add(partition)
+ } else {
+ responseBuilder.addPartition(topic.name,
partition.partitionIndex, Errors.UNKNOWN_TOPIC_OR_PARTITION)
+ }
+ }
+
+ if (!topicWithValidPartitions.partitions.isEmpty) {
+ authorizedTopicsRequest += topicWithValidPartitions
+ }
}
}
- val authorizedTopicRequestInfo = authorizedTopicRequestInfoBldr.result()
-
- if (authorizedTopicRequestInfo.isEmpty)
- sendResponseCallback(Map.empty)
- else if (header.apiVersion == 0) {
- // for version 0 always store offsets to ZK
- val zkSupport =
metadataSupport.requireZkOrThrow(KafkaApis.unsupported("Version 0 offset commit
requests"))
- val responseInfo = authorizedTopicRequestInfo.map {
- case (topicPartition, partitionData) =>
- try {
- if (partitionData.committedMetadata() != null
- && partitionData.committedMetadata().length >
config.offsetMetadataMaxSize)
- (topicPartition, Errors.OFFSET_METADATA_TOO_LARGE)
- else {
- zkSupport.zkClient.setOrCreateConsumerOffset(
- offsetCommitRequest.data.groupId,
- topicPartition,
- partitionData.committedOffset)
- (topicPartition, Errors.NONE)
- }
- } catch {
- case e: Throwable => (topicPartition, Errors.forException(e))
- }
- }
- sendResponseCallback(responseInfo)
+ if (authorizedTopicsRequest.isEmpty) {
+ requestHelper.sendMaybeThrottle(request, responseBuilder.build())
+ CompletableFuture.completedFuture(())
+ } else if (request.header.apiVersion == 0) {
+ // For version 0, always store offsets in ZK.
+ commitOffsetsToZookeeper(
+ request,
+ offsetCommitRequest,
+ authorizedTopicsRequest,
+ responseBuilder
+ )
} else {
- // for version 1 and beyond store offsets in offset manager
-
- // "default" expiration timestamp is now + retention (and retention
may be overridden if v2)
- // expire timestamp is computed differently for v1 and v2.
- // - If v1 and no explicit commit timestamp is provided we treat it
the same as v5.
- // - If v1 and explicit retention time is provided we calculate
expiration timestamp based on that
- // - If v2/v3/v4 (no explicit commit timestamp) we treat it the same
as v5.
- // - For v5 and beyond there is no per partition expiration
timestamp, so this field is no longer in effect
- val currentTimestamp = time.milliseconds
- val partitionData = authorizedTopicRequestInfo.map { case (k,
partitionData) =>
- val metadata = if (partitionData.committedMetadata == null)
- OffsetAndMetadata.NoMetadata
- else
- partitionData.committedMetadata
+ // For version > 0, store offsets in Coordinator.
+ commitOffsetsToCoordinator(
+ request,
+ offsetCommitRequest,
+ authorizedTopicsRequest,
+ responseBuilder,
+ requestLocal
+ )
+ }
+ }
+ }
- val leaderEpochOpt = if (partitionData.committedLeaderEpoch ==
RecordBatch.NO_PARTITION_LEADER_EPOCH)
- Optional.empty[Integer]
- else
- Optional.of[Integer](partitionData.committedLeaderEpoch)
-
- k -> new OffsetAndMetadata(
- offset = partitionData.committedOffset,
- leaderEpoch = leaderEpochOpt,
- metadata = metadata,
- commitTimestamp = partitionData.commitTimestamp match {
- case OffsetCommitRequest.DEFAULT_TIMESTAMP => currentTimestamp
- case customTimestamp => customTimestamp
- },
- expireTimestamp = offsetCommitRequest.data.retentionTimeMs match {
- case OffsetCommitRequest.DEFAULT_RETENTION_TIME => None
- case retentionTime => Some(currentTimestamp + retentionTime)
- }
- )
+ private def commitOffsetsToZookeeper(
+ request: RequestChannel.Request,
+ offsetCommitRequest: OffsetCommitRequest,
+ authorizedTopicsRequest:
mutable.ArrayBuffer[OffsetCommitRequestData.OffsetCommitRequestTopic],
+ responseBuilder: OffsetCommitResponse.Builder
+ ): CompletableFuture[Unit] = {
+ val zkSupport = metadataSupport.requireZkOrThrow(
+ KafkaApis.unsupported("Version 0 offset commit requests"))
+
+ authorizedTopicsRequest.foreach { topic =>
+ topic.partitions.forEach { partition =>
+ val error = try {
+ if (partition.committedMetadata != null &&
partition.committedMetadata.length > config.offsetMetadataMaxSize) {
+ Errors.OFFSET_METADATA_TOO_LARGE
+ } else {
+ zkSupport.zkClient.setOrCreateConsumerOffset(
+ offsetCommitRequest.data.groupId,
+ new TopicPartition(topic.name, partition.partitionIndex),
+ partition.committedOffset
+ )
+ Errors.NONE
+ }
+ } catch {
+ case e: Throwable =>
+ Errors.forException(e)
}
- // call coordinator to handle commit offset
- groupCoordinator.handleCommitOffsets(
- offsetCommitRequest.data.groupId,
- offsetCommitRequest.data.memberId,
- Option(offsetCommitRequest.data.groupInstanceId),
- offsetCommitRequest.data.generationId,
- partitionData,
- sendResponseCallback,
- requestLocal)
+ responseBuilder.addPartition(topic.name, partition.partitionIndex,
error)
+ }
+ }
+
+ requestHelper.sendMaybeThrottle(request, responseBuilder.build())
+ CompletableFuture.completedFuture[Unit](())
+ }
+
+ private def commitOffsetsToCoordinator(
+ request: RequestChannel.Request,
+ offsetCommitRequest: OffsetCommitRequest,
+ authorizedTopicsRequest:
mutable.ArrayBuffer[OffsetCommitRequestData.OffsetCommitRequestTopic],
+ responseBuilder: OffsetCommitResponse.Builder,
+ requestLocal: RequestLocal
+ ): CompletableFuture[Unit] = {
+ val offsetCommitRequestData = new OffsetCommitRequestData()
+ .setGroupId(offsetCommitRequest.data.groupId)
+ .setMemberId(offsetCommitRequest.data.memberId)
+ .setGenerationId(offsetCommitRequest.data.generationId)
+ .setRetentionTimeMs(offsetCommitRequest.data.retentionTimeMs)
+ .setGroupInstanceId(offsetCommitRequest.data.groupInstanceId)
+ .setTopics(authorizedTopicsRequest.asJava)
+
+ newGroupCoordinator.commitOffsets(
+ request.context,
+ offsetCommitRequestData,
+ requestLocal.bufferSupplier
+ ).handle[Unit] { (results, exception) =>
+ if (exception != null) {
+ requestHelper.sendMaybeThrottle(request,
offsetCommitRequest.getErrorResponse(exception))
+ } else {
+ requestHelper.sendMaybeThrottle(request,
responseBuilder.merge(results).build())
}
}
}
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 87b9d48d46c..30a96537cc1 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -506,7 +506,7 @@ class KafkaServer(
metadataSupport = zkSupport,
replicaManager = replicaManager,
groupCoordinator = groupCoordinator,
- newGroupCoordinator = new GroupCoordinatorAdapter(groupCoordinator),
+ newGroupCoordinator = new GroupCoordinatorAdapter(groupCoordinator,
time),
txnCoordinator = transactionCoordinator,
autoTopicCreationManager = autoTopicCreationManager,
brokerId = config.brokerId,
diff --git
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
index 1fbdf333a96..9ecbdd54e06 100644
---
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
@@ -16,17 +16,19 @@
*/
package kafka.coordinator.group
+import kafka.common.OffsetAndMetadata
import
kafka.coordinator.group.GroupCoordinatorConcurrencyTest.{JoinGroupCallback,
SyncGroupCallback}
import kafka.server.RequestLocal
+import kafka.utils.MockTime
import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.message.{DeleteGroupsResponseData,
DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData,
JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData,
LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData,
OffsetFetchRequestData, OffsetFetchResponseData, SyncGroupRequestData,
SyncGroupResponseData}
+import org.apache.kafka.common.message.{DeleteGroupsResponseData,
DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData,
JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData,
LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData,
OffsetCommitRequestData, OffsetCommitResponseData, OffsetFetchRequestData,
OffsetFetchResponseData, SyncGroupRequestData, SyncGroupResponseData}
import
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol
import
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember
import org.apache.kafka.common.network.{ClientInformation, ListenerName}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{OffsetFetchResponse, RequestContext,
RequestHeader}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
-import org.apache.kafka.common.utils.BufferSupplier
+import org.apache.kafka.common.utils.{BufferSupplier, Time}
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.api.Test
@@ -60,7 +62,7 @@ class GroupCoordinatorAdapterTest {
@ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
def testJoinGroup(version: Short): Unit = {
val groupCoordinator = mock(classOf[GroupCoordinator])
- val adapter = new GroupCoordinatorAdapter(groupCoordinator)
+ val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM)
val ctx = makeContext(ApiKeys.JOIN_GROUP, version)
val request = new JoinGroupRequestData()
@@ -148,7 +150,7 @@ class GroupCoordinatorAdapterTest {
@ApiKeyVersionsSource(apiKey = ApiKeys.SYNC_GROUP)
def testSyncGroup(version: Short): Unit = {
val groupCoordinator = mock(classOf[GroupCoordinator])
- val adapter = new GroupCoordinatorAdapter(groupCoordinator)
+ val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM)
val ctx = makeContext(ApiKeys.SYNC_GROUP, version)
val data = new SyncGroupRequestData()
@@ -215,7 +217,7 @@ class GroupCoordinatorAdapterTest {
@Test
def testHeartbeat(): Unit = {
val groupCoordinator = mock(classOf[GroupCoordinator])
- val adapter = new GroupCoordinatorAdapter(groupCoordinator)
+ val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM)
val ctx = makeContext(ApiKeys.HEARTBEAT, ApiKeys.HEARTBEAT.latestVersion)
val data = new HeartbeatRequestData()
@@ -246,7 +248,7 @@ class GroupCoordinatorAdapterTest {
def testLeaveGroup(): Unit = {
val groupCoordinator = mock(classOf[GroupCoordinator])
- val adapter = new GroupCoordinatorAdapter(groupCoordinator)
+ val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM)
val ctx = makeContext(ApiKeys.LEAVE_GROUP,
ApiKeys.LEAVE_GROUP.latestVersion)
val data = new LeaveGroupRequestData()
@@ -315,7 +317,7 @@ class GroupCoordinatorAdapterTest {
expectedStatesFilter: Set[String]
): Unit = {
val groupCoordinator = mock(classOf[GroupCoordinator])
- val adapter = new GroupCoordinatorAdapter(groupCoordinator)
+ val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM)
val ctx = makeContext(ApiKeys.LIST_GROUPS,
ApiKeys.LIST_GROUPS.latestVersion)
val data = new ListGroupsRequestData()
@@ -350,7 +352,7 @@ class GroupCoordinatorAdapterTest {
@Test
def testDescribeGroup(): Unit = {
val groupCoordinator = mock(classOf[GroupCoordinator])
- val adapter = new GroupCoordinatorAdapter(groupCoordinator)
+ val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM)
val groupId1 = "group-1"
val groupId2 = "group-2"
@@ -407,7 +409,7 @@ class GroupCoordinatorAdapterTest {
@Test
def testDeleteGroups(): Unit = {
val groupCoordinator = mock(classOf[GroupCoordinator])
- val adapter = new GroupCoordinatorAdapter(groupCoordinator)
+ val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM)
val ctx = makeContext(ApiKeys.DELETE_GROUPS,
ApiKeys.DELETE_GROUPS.latestVersion)
val groupIds = List("group-1", "group-2", "group-3")
@@ -446,7 +448,7 @@ class GroupCoordinatorAdapterTest {
val bar1 = new TopicPartition("bar", 1)
val groupCoordinator = mock(classOf[GroupCoordinator])
- val adapter = new GroupCoordinatorAdapter(groupCoordinator)
+ val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM)
when(groupCoordinator.handleFetchOffsets(
"group",
@@ -527,7 +529,7 @@ class GroupCoordinatorAdapterTest {
val bar1 = new TopicPartition("bar", 1)
val groupCoordinator = mock(classOf[GroupCoordinator])
- val adapter = new GroupCoordinatorAdapter(groupCoordinator)
+ val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM)
when(groupCoordinator.handleFetchOffsets(
"group",
@@ -608,4 +610,74 @@ class GroupCoordinatorAdapterTest {
future.get().asScala.toList.sortWith(_.name > _.name)
)
}
+
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+ def testCommitOffsets(version: Short): Unit = {
+ val groupCoordinator = mock(classOf[GroupCoordinator])
+ val time = new MockTime()
+ val adapter = new GroupCoordinatorAdapter(groupCoordinator, time)
+ val now = time.milliseconds()
+
+ val ctx = makeContext(ApiKeys.OFFSET_COMMIT, version)
+ val data = new OffsetCommitRequestData()
+ .setGroupId("group")
+ .setMemberId("member")
+ .setGenerationId(10)
+ .setRetentionTimeMs(1000)
+ .setTopics(List(
+ new OffsetCommitRequestData.OffsetCommitRequestTopic()
+ .setName("foo")
+ .setPartitions(List(
+ new OffsetCommitRequestData.OffsetCommitRequestPartition()
+ .setPartitionIndex(0)
+ .setCommittedOffset(100)
+ .setCommitTimestamp(now)
+ .setCommittedLeaderEpoch(1)
+ ).asJava)
+ ).asJava)
+ val bufferSupplier = BufferSupplier.create()
+
+ val future = adapter.commitOffsets(ctx, data, bufferSupplier)
+ assertFalse(future.isDone)
+
+ val capturedCallback: ArgumentCaptor[Map[TopicPartition, Errors] => Unit] =
+ ArgumentCaptor.forClass(classOf[Map[TopicPartition, Errors] => Unit])
+
+ verify(groupCoordinator).handleCommitOffsets(
+ ArgumentMatchers.eq(data.groupId),
+ ArgumentMatchers.eq(data.memberId),
+ ArgumentMatchers.eq(None),
+ ArgumentMatchers.eq(data.generationId),
+ ArgumentMatchers.eq(Map(
+ new TopicPartition("foo", 0) -> new OffsetAndMetadata(
+ offset = 100,
+ leaderEpoch = Optional.of[Integer](1),
+ metadata = "",
+ commitTimestamp = now,
+ expireTimestamp = Some(now + 1000L)
+ )
+ )),
+ capturedCallback.capture(),
+ ArgumentMatchers.eq(RequestLocal(bufferSupplier))
+ )
+
+ capturedCallback.getValue.apply(Map(
+ new TopicPartition("foo", 0) -> Errors.NONE
+ ))
+
+ val expectedResponseData = new OffsetCommitResponseData()
+ .setTopics(List(
+ new OffsetCommitResponseData.OffsetCommitResponseTopic()
+ .setName("foo")
+ .setPartitions(List(
+ new OffsetCommitResponseData.OffsetCommitResponsePartition()
+ .setPartitionIndex(0)
+ .setErrorCode(Errors.NONE.code)
+ ).asJava)
+ ).asJava)
+
+ assertTrue(future.isDone)
+ assertEquals(expectedResponseData, future.get())
+ }
}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 27b2338a8e7..81c2c9ffbe2 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -1264,6 +1264,239 @@ class KafkaApisTest {
)
}
+ @Test
+ def testHandleOffsetCommitRequest(): Unit = {
+ addTopicToMetadataCache("foo", numPartitions = 1)
+
+ val offsetCommitRequest = new OffsetCommitRequestData()
+ .setGroupId("group")
+ .setMemberId("member")
+ .setTopics(List(
+ new OffsetCommitRequestData.OffsetCommitRequestTopic()
+ .setName("foo")
+ .setPartitions(List(
+ new OffsetCommitRequestData.OffsetCommitRequestPartition()
+ .setPartitionIndex(0)
+ .setCommittedOffset(10)).asJava)).asJava)
+
+ val requestChannelRequest = buildRequest(new
OffsetCommitRequest.Builder(offsetCommitRequest).build())
+
+ val future = new CompletableFuture[OffsetCommitResponseData]()
+ when(newGroupCoordinator.commitOffsets(
+ requestChannelRequest.context,
+ offsetCommitRequest,
+ RequestLocal.NoCaching.bufferSupplier
+ )).thenReturn(future)
+
+ createKafkaApis().handle(
+ requestChannelRequest,
+ RequestLocal.NoCaching
+ )
+
+ // This is the response returned by the group coordinator.
+ val offsetCommitResponse = new OffsetCommitResponseData()
+ .setTopics(List(
+ new OffsetCommitResponseData.OffsetCommitResponseTopic()
+ .setName("foo")
+ .setPartitions(List(
+ new OffsetCommitResponseData.OffsetCommitResponsePartition()
+ .setPartitionIndex(0)
+ .setErrorCode(Errors.NONE.code)).asJava)).asJava)
+
+ future.complete(offsetCommitResponse)
+ val response =
verifyNoThrottling[OffsetCommitResponse](requestChannelRequest)
+ assertEquals(offsetCommitResponse, response.data)
+ }
+
+ @Test
+ def testHandleOffsetCommitRequestFutureFailed(): Unit = {
+ addTopicToMetadataCache("foo", numPartitions = 1)
+
+ val offsetCommitRequest = new OffsetCommitRequestData()
+ .setGroupId("group")
+ .setMemberId("member")
+ .setTopics(List(
+ new OffsetCommitRequestData.OffsetCommitRequestTopic()
+ .setName("foo")
+ .setPartitions(List(
+ new OffsetCommitRequestData.OffsetCommitRequestPartition()
+ .setPartitionIndex(0)
+ .setCommittedOffset(10)).asJava)).asJava)
+
+ val requestChannelRequest = buildRequest(new
OffsetCommitRequest.Builder(offsetCommitRequest).build())
+
+ val future = new CompletableFuture[OffsetCommitResponseData]()
+ when(newGroupCoordinator.commitOffsets(
+ requestChannelRequest.context,
+ offsetCommitRequest,
+ RequestLocal.NoCaching.bufferSupplier
+ )).thenReturn(future)
+
+ createKafkaApis().handle(
+ requestChannelRequest,
+ RequestLocal.NoCaching
+ )
+
+ val expectedOffsetCommitResponse = new OffsetCommitResponseData()
+ .setTopics(List(
+ new OffsetCommitResponseData.OffsetCommitResponseTopic()
+ .setName("foo")
+ .setPartitions(List(
+ new OffsetCommitResponseData.OffsetCommitResponsePartition()
+ .setPartitionIndex(0)
+ .setErrorCode(Errors.NOT_COORDINATOR.code)).asJava)).asJava)
+
+ future.completeExceptionally(Errors.NOT_COORDINATOR.exception)
+ val response =
verifyNoThrottling[OffsetCommitResponse](requestChannelRequest)
+ assertEquals(expectedOffsetCommitResponse, response.data)
+ }
+
+ @Test
+ def testHandleOffsetCommitRequestTopicsAndPartitionsValidation(): Unit = {
+ addTopicToMetadataCache("foo", numPartitions = 2)
+ addTopicToMetadataCache("bar", numPartitions = 2)
+
+ val offsetCommitRequest = new OffsetCommitRequestData()
+ .setGroupId("group")
+ .setMemberId("member")
+ .setTopics(List(
+ // foo exists but only has 2 partitions.
+ new OffsetCommitRequestData.OffsetCommitRequestTopic()
+ .setName("foo")
+ .setPartitions(List(
+ new OffsetCommitRequestData.OffsetCommitRequestPartition()
+ .setPartitionIndex(0)
+ .setCommittedOffset(10),
+ new OffsetCommitRequestData.OffsetCommitRequestPartition()
+ .setPartitionIndex(1)
+ .setCommittedOffset(20),
+ new OffsetCommitRequestData.OffsetCommitRequestPartition()
+ .setPartitionIndex(2)
+ .setCommittedOffset(30)).asJava),
+ // bar exists.
+ new OffsetCommitRequestData.OffsetCommitRequestTopic()
+ .setName("bar")
+ .setPartitions(List(
+ new OffsetCommitRequestData.OffsetCommitRequestPartition()
+ .setPartitionIndex(0)
+ .setCommittedOffset(40),
+ new OffsetCommitRequestData.OffsetCommitRequestPartition()
+ .setPartitionIndex(1)
+ .setCommittedOffset(50)).asJava),
+ // zar does not exist.
+ new OffsetCommitRequestData.OffsetCommitRequestTopic()
+ .setName("zar")
+ .setPartitions(List(
+ new OffsetCommitRequestData.OffsetCommitRequestPartition()
+ .setPartitionIndex(0)
+ .setCommittedOffset(60),
+ new OffsetCommitRequestData.OffsetCommitRequestPartition()
+ .setPartitionIndex(1)
+ .setCommittedOffset(70)).asJava)).asJava)
+
+ val requestChannelRequest = buildRequest(new
OffsetCommitRequest.Builder(offsetCommitRequest).build())
+
+ // This is the request expected by the group coordinator.
+ val expectedOffsetCommitRequest = new OffsetCommitRequestData()
+ .setGroupId("group")
+ .setMemberId("member")
+ .setTopics(List(
+ // foo exists but only has 2 partitions.
+ new OffsetCommitRequestData.OffsetCommitRequestTopic()
+ .setName("foo")
+ .setPartitions(List(
+ new OffsetCommitRequestData.OffsetCommitRequestPartition()
+ .setPartitionIndex(0)
+ .setCommittedOffset(10),
+ new OffsetCommitRequestData.OffsetCommitRequestPartition()
+ .setPartitionIndex(1)
+ .setCommittedOffset(20)).asJava),
+ new OffsetCommitRequestData.OffsetCommitRequestTopic()
+ .setName("bar")
+ .setPartitions(List(
+ new OffsetCommitRequestData.OffsetCommitRequestPartition()
+ .setPartitionIndex(0)
+ .setCommittedOffset(40),
+ new OffsetCommitRequestData.OffsetCommitRequestPartition()
+ .setPartitionIndex(1)
+ .setCommittedOffset(50)).asJava)).asJava)
+
+ val future = new CompletableFuture[OffsetCommitResponseData]()
+ when(newGroupCoordinator.commitOffsets(
+ requestChannelRequest.context,
+ expectedOffsetCommitRequest,
+ RequestLocal.NoCaching.bufferSupplier
+ )).thenReturn(future)
+
+ createKafkaApis().handle(
+ requestChannelRequest,
+ RequestLocal.NoCaching
+ )
+
+ // This is the response returned by the group coordinator.
+ val offsetCommitResponse = new OffsetCommitResponseData()
+ .setTopics(List(
+ new OffsetCommitResponseData.OffsetCommitResponseTopic()
+ .setName("foo")
+ .setPartitions(List(
+ new OffsetCommitResponseData.OffsetCommitResponsePartition()
+ .setPartitionIndex(0)
+ .setErrorCode(Errors.NONE.code),
+ new OffsetCommitResponseData.OffsetCommitResponsePartition()
+ .setPartitionIndex(1)
+ .setErrorCode(Errors.NONE.code)).asJava),
+ new OffsetCommitResponseData.OffsetCommitResponseTopic()
+ .setName("bar")
+ .setPartitions(List(
+ new OffsetCommitResponseData.OffsetCommitResponsePartition()
+ .setPartitionIndex(0)
+ .setErrorCode(Errors.NONE.code),
+ new OffsetCommitResponseData.OffsetCommitResponsePartition()
+ .setPartitionIndex(1)
+ .setErrorCode(Errors.NONE.code)).asJava)).asJava)
+
+ val expectedOffsetCommitResponse = new OffsetCommitResponseData()
+ .setTopics(List(
+ new OffsetCommitResponseData.OffsetCommitResponseTopic()
+ .setName("foo")
+ .setPartitions(List(
+ // foo-2 is first because partitions failing the validation
+ // are put in the response first.
+ new OffsetCommitResponseData.OffsetCommitResponsePartition()
+ .setPartitionIndex(2)
+ .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code),
+ new OffsetCommitResponseData.OffsetCommitResponsePartition()
+ .setPartitionIndex(0)
+ .setErrorCode(Errors.NONE.code),
+ new OffsetCommitResponseData.OffsetCommitResponsePartition()
+ .setPartitionIndex(1)
+ .setErrorCode(Errors.NONE.code)).asJava),
+ // zar is before bar because topics failing the validation are
+ // put in the response first.
+ new OffsetCommitResponseData.OffsetCommitResponseTopic()
+ .setName("zar")
+ .setPartitions(List(
+ new OffsetCommitResponseData.OffsetCommitResponsePartition()
+ .setPartitionIndex(0)
+ .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code),
+ new OffsetCommitResponseData.OffsetCommitResponsePartition()
+ .setPartitionIndex(1)
+ .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)).asJava),
+ new OffsetCommitResponseData.OffsetCommitResponseTopic()
+ .setName("bar")
+ .setPartitions(List(
+ new OffsetCommitResponseData.OffsetCommitResponsePartition()
+ .setPartitionIndex(0)
+ .setErrorCode(Errors.NONE.code),
+ new OffsetCommitResponseData.OffsetCommitResponsePartition()
+ .setPartitionIndex(1)
+ .setErrorCode(Errors.NONE.code)).asJava)).asJava)
+
+ future.complete(offsetCommitResponse)
+ val response =
verifyNoThrottling[OffsetCommitResponse](requestChannelRequest)
+ assertEquals(expectedOffsetCommitResponse, response.data)
+ }
+
@Test
def testOffsetCommitWithInvalidPartition(): Unit = {
val topic = "topic"
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
index 97bd1902460..30d48410822 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
@@ -22,6 +22,8 @@ import org.apache.kafka.common.message.HeartbeatRequestData;
import org.apache.kafka.common.message.HeartbeatResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
import org.apache.kafka.common.message.LeaveGroupRequestData;
import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.ListGroupsRequestData;
@@ -164,5 +166,20 @@ public interface GroupCoordinator {
String groupId,
boolean requireStable
);
+
+ /**
+ * Commit offsets for a given Group.
+ *
+ * @param context The request context.
+ * @param request The OffsetCommitRequest data.
+ * @param bufferSupplier The buffer supplier tight to the request
thread.
+ *
+ * @return A future yielding the response or an exception.
+ */
+ CompletableFuture<OffsetCommitResponseData> commitOffsets(
+ RequestContext context,
+ OffsetCommitRequestData request,
+ BufferSupplier bufferSupplier
+ );
}