This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 522c2864cd5 KAFKA-14505; [2/N] Implement TxnOffsetCommit API (#14845)
522c2864cd5 is described below

commit 522c2864cd53e1a61dbc296a8cda062abf1002c5
Author: David Jacot <[email protected]>
AuthorDate: Thu Dec 7 11:51:22 2023 +0100

    KAFKA-14505; [2/N] Implement TxnOffsetCommit API (#14845)
    
    This patch implements the TxnOffsetCommit API. When a transactional offset 
commit is received, it is stored in the pending transactional offsets structure 
and waits there until the transaction is committed or aborted. Note that the 
handling of the transaction completion is not implemented in this patch.
    
    Reviewers: Justine Olshan <[email protected]>
---
 checkstyle/suppressions.xml                        |   2 +-
 .../org/apache/kafka/coordinator/group/Group.java  |   4 +-
 .../coordinator/group/GroupCoordinatorService.java |  18 +-
 .../coordinator/group/GroupCoordinatorShard.java   |  19 +
 .../kafka/coordinator/group/OffsetAndMetadata.java |  18 +
 .../coordinator/group/OffsetMetadataManager.java   | 315 +++++++++++----
 .../coordinator/group/consumer/ConsumerGroup.java  |   5 +-
 .../coordinator/group/generic/GenericGroup.java    |  14 +-
 .../group/GroupCoordinatorServiceTest.java         | 126 ++++++
 .../group/GroupCoordinatorShardTest.java           |  89 ++++-
 .../coordinator/group/OffsetAndMetadataTest.java   |  43 ++
 .../group/OffsetMetadataManagerTest.java           | 443 ++++++++++++++++++++-
 .../group/consumer/ConsumerGroupTest.java          |  17 +-
 .../group/generic/GenericGroupTest.java            |  22 +-
 14 files changed, 1028 insertions(+), 107 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 5e582599a40..bd31d3f3b80 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -338,7 +338,7 @@
     <suppress checks="ParameterNumber"
               
files="(ConsumerGroupMember|GroupMetadataManager|GroupCoordinatorConfig).java"/>
     <suppress checks="ClassDataAbstractionCouplingCheck"
-              
files="(RecordHelpersTest|GroupMetadataManager|GroupMetadataManagerTest|GroupCoordinatorServiceTest|GroupCoordinatorShardTest).java"/>
+              
files="(RecordHelpersTest|GroupMetadataManager|GroupMetadataManagerTest|OffsetMetadataManagerTest|GroupCoordinatorServiceTest|GroupCoordinatorShardTest).java"/>
     <suppress checks="JavaNCSS"
               files="GroupMetadataManagerTest.java"/>
 
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
index 0cb10b12a51..9939b25737a 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
@@ -74,11 +74,13 @@ public interface Group {
      * @param groupInstanceId           The group instance id.
      * @param generationIdOrMemberEpoch The generation id for genetic groups 
or the member epoch
      *                                  for consumer groups.
+     * @param isTransactional           Whether the offset commit is 
transactional or not.
      */
     void validateOffsetCommit(
         String memberId,
         String groupInstanceId,
-        int generationIdOrMemberEpoch
+        int generationIdOrMemberEpoch,
+        boolean isTransactional
     ) throws KafkaException;
 
     /**
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index a05402d410b..ec0de853d53 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -77,7 +77,6 @@ import 
org.apache.kafka.coordinator.group.runtime.PartitionWriter;
 import org.apache.kafka.image.MetadataDelta;
 import org.apache.kafka.image.MetadataImage;
 import org.apache.kafka.server.record.BrokerCompressionType;
-import org.apache.kafka.server.util.FutureUtils;
 import org.apache.kafka.server.util.timer.Timer;
 import org.slf4j.Logger;
 
@@ -884,9 +883,20 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
             ));
         }
 
-        return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-            "This API is not implemented yet."
-        ));
+        if (!isGroupIdNotEmpty(request.groupId())) {
+            return 
CompletableFuture.completedFuture(TxnOffsetCommitRequest.getErrorResponse(
+                request,
+                Errors.INVALID_GROUP_ID
+            ));
+        }
+
+        return runtime.scheduleWriteOperation(
+            "txn-commit-offset",
+            topicPartitionFor(request.groupId()),
+            coordinator -> coordinator.commitTransactionalOffset(context, 
request)
+        ).exceptionally(exception ->
+            TxnOffsetCommitRequest.getErrorResponse(request, 
normalizeException(exception))
+        );
     }
 
     /**
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
index 3364260a5b5..b68549ef157 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
@@ -38,6 +38,8 @@ import 
org.apache.kafka.common.message.OffsetFetchResponseData;
 import org.apache.kafka.common.message.SyncGroupRequestData;
 import org.apache.kafka.common.message.SyncGroupResponseData;
 import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
+import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
 import org.apache.kafka.common.protocol.ApiMessage;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.RequestContext;
@@ -454,6 +456,22 @@ public class GroupCoordinatorShard implements 
CoordinatorShard<Record> {
         return offsetMetadataManager.commitOffset(context, request);
     }
 
+    /**
+     * Handles an TxnOffsetCommit request.
+     *
+     * @param context The request context.
+     * @param request The actual TxnOffsetCommit request.
+     *
+     * @return A Result containing the TxnOffsetCommitResponse response and
+     *         a list of records to update the state machine.
+     */
+    public CoordinatorResult<TxnOffsetCommitResponseData, Record> 
commitTransactionalOffset(
+        RequestContext context,
+        TxnOffsetCommitRequestData request
+    ) throws ApiException {
+        return offsetMetadataManager.commitTransactionalOffset(context, 
request);
+    }
+
     /**
      * Handles a ListGroups request.
      *
@@ -638,6 +656,7 @@ public class GroupCoordinatorShard implements 
CoordinatorShard<Record> {
             case 0:
             case 1:
                 offsetMetadataManager.replay(
+                    producerId,
                     (OffsetCommitKey) key.message(),
                     (OffsetCommitValue) messageOrNull(value)
                 );
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetAndMetadata.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetAndMetadata.java
index cf0b936917d..0eb1afaead5 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetAndMetadata.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetAndMetadata.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.coordinator.group;
 
 import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
 import org.apache.kafka.common.requests.OffsetCommitRequest;
 import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
 
@@ -141,4 +142,21 @@ public class OffsetAndMetadata {
             expireTimestampMs
         );
     }
+
+    /**
+     * @return An OffsetAndMetadata created from an 
OffsetCommitRequestPartition request.
+     */
+    public static OffsetAndMetadata fromRequest(
+        TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition partition,
+        long currentTimeMs
+    ) {
+        return new OffsetAndMetadata(
+            partition.committedOffset(),
+            ofSentinel(partition.committedLeaderEpoch()),
+            partition.committedMetadata() == null ?
+                OffsetAndMetadata.NO_METADATA : partition.committedMetadata(),
+            currentTimeMs,
+            OptionalLong.empty()
+        );
+    }
 }
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
index faff257d584..fce7140559f 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
@@ -28,7 +28,12 @@ import 
org.apache.kafka.common.message.OffsetDeleteRequestData;
 import org.apache.kafka.common.message.OffsetFetchRequestData;
 import org.apache.kafka.common.message.OffsetFetchResponseData;
 import org.apache.kafka.common.message.OffsetDeleteResponseData;
+import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
+import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
+import 
org.apache.kafka.common.message.TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic;
+import 
org.apache.kafka.common.message.TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.requests.OffsetCommitRequest;
 import org.apache.kafka.common.requests.RequestContext;
 import org.apache.kafka.common.utils.LogContext;
@@ -178,9 +183,83 @@ public class OffsetMetadataManager {
     private final GroupCoordinatorConfig config;
 
     /**
-     * The offsets keyed by group id, topic name and partition id.
+     * The committed offsets.
      */
-    private final TimelineHashMap<String, TimelineHashMap<String, 
TimelineHashMap<Integer, OffsetAndMetadata>>> offsetsByGroup;
+    private final Offsets offsets;
+
+    /**
+     * The pending transactional offsets keyed by producer id. This structure 
holds all the
+     * transactional offsets that are part of ongoing transactions. When the 
transaction is
+     * committed, they are transferred to `offsets`; when the transaction is 
aborted, they
+     * are removed.
+     */
+    private final TimelineHashMap<Long, Offsets> pendingTransactionalOffsets;
+
+    private class Offsets {
+        /**
+         * The offsets keyed by group id, topic name and partition id.
+         */
+        private final TimelineHashMap<String, TimelineHashMap<String, 
TimelineHashMap<Integer, OffsetAndMetadata>>> offsetsByGroup;
+
+        private Offsets() {
+            this.offsetsByGroup = new TimelineHashMap<>(snapshotRegistry, 0);
+        }
+
+        private OffsetAndMetadata get(
+            String groupId,
+            String topic,
+            int partition
+        ) {
+            TimelineHashMap<String, TimelineHashMap<Integer, 
OffsetAndMetadata>> topicOffsets = offsetsByGroup.get(groupId);
+            if (topicOffsets == null) {
+                return null;
+            } else {
+                TimelineHashMap<Integer, OffsetAndMetadata> partitionOffsets = 
topicOffsets.get(topic);
+                if (partitionOffsets == null) {
+                    return null;
+                } else {
+                    return partitionOffsets.get(partition);
+                }
+            }
+        }
+
+        private OffsetAndMetadata put(
+            String groupId,
+            String topic,
+            int partition,
+            OffsetAndMetadata offsetAndMetadata
+        ) {
+            TimelineHashMap<String, TimelineHashMap<Integer, 
OffsetAndMetadata>> topicOffsets = offsetsByGroup
+                .computeIfAbsent(groupId, __ -> new 
TimelineHashMap<>(snapshotRegistry, 0));
+            TimelineHashMap<Integer, OffsetAndMetadata> partitionOffsets = 
topicOffsets
+                .computeIfAbsent(topic, __ -> new 
TimelineHashMap<>(snapshotRegistry, 0));
+            return partitionOffsets.put(partition, offsetAndMetadata);
+        }
+
+        private OffsetAndMetadata remove(
+            String groupId,
+            String topic,
+            int partition
+        ) {
+            TimelineHashMap<String, TimelineHashMap<Integer, 
OffsetAndMetadata>> topicOffsets = offsetsByGroup.get(groupId);
+            if (topicOffsets == null)
+                return null;
+
+            TimelineHashMap<Integer, OffsetAndMetadata> partitionOffsets = 
topicOffsets.get(topic);
+            if (partitionOffsets == null)
+                return null;
+
+            OffsetAndMetadata removedValue = 
partitionOffsets.remove(partition);
+
+            if (partitionOffsets.isEmpty())
+                topicOffsets.remove(topic);
+
+            if (topicOffsets.isEmpty())
+                offsetsByGroup.remove(groupId);
+
+            return removedValue;
+        }
+    }
 
     OffsetMetadataManager(
         SnapshotRegistry snapshotRegistry,
@@ -198,7 +277,8 @@ public class OffsetMetadataManager {
         this.groupMetadataManager = groupMetadataManager;
         this.config = config;
         this.metrics = metrics;
-        this.offsetsByGroup = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.offsets = new Offsets();
+        this.pendingTransactionalOffsets = new 
TimelineHashMap<>(snapshotRegistry, 0);
     }
 
     /**
@@ -239,7 +319,8 @@ public class OffsetMetadataManager {
             group.validateOffsetCommit(
                 request.memberId(),
                 request.groupInstanceId(),
-                request.generationIdOrMemberEpoch()
+                request.generationIdOrMemberEpoch(),
+                false
             );
         } catch (StaleMemberEpochException ex) {
             // The STALE_MEMBER_EPOCH error is only returned for new consumer 
group (KIP-848). When
@@ -256,6 +337,43 @@ public class OffsetMetadataManager {
         return group;
     }
 
+    /**
+     * Validates an TxnOffsetCommit request.
+     *
+     * @param request The actual request.
+     */
+    private Group validateTransactionalOffsetCommit(
+        TxnOffsetCommitRequestData request
+    ) throws ApiException {
+        Group group;
+        try {
+            group = groupMetadataManager.group(request.groupId());
+        } catch (GroupIdNotFoundException ex) {
+            if (request.generationId() < 0) {
+                // If the group does not exist and generation id is -1, the 
request comes from
+                // either the admin client or a consumer which does not use 
the group management
+                // facility. In this case, a so-called simple group is created 
and the request
+                // is accepted.
+                group = 
groupMetadataManager.getOrMaybeCreateGenericGroup(request.groupId(), true);
+            } else {
+                throw Errors.ILLEGAL_GENERATION.exception();
+            }
+        }
+
+        try {
+            group.validateOffsetCommit(
+                request.memberId(),
+                request.groupInstanceId(),
+                request.generationId(),
+                true
+            );
+        } catch (StaleMemberEpochException ex) {
+            throw Errors.ILLEGAL_GENERATION.exception();
+        }
+
+        return group;
+    }
+
     /**
      * Validates an OffsetFetch request.
      *
@@ -287,6 +405,13 @@ public class OffsetMetadataManager {
         return group;
     }
 
+    /**
+     * @return True if the committed metadata is invalid; False otherwise.
+     */
+    private boolean isMetadataInvalid(String metadata) {
+        return metadata != null && metadata.length() > 
config.offsetMetadataMaxSize;
+    }
+
     /**
      * Computes the expiration timestamp based on the retention time provided 
in the OffsetCommit
      * request.
@@ -345,7 +470,7 @@ public class OffsetMetadataManager {
             response.topics().add(topicResponse);
 
             topic.partitions().forEach(partition -> {
-                if (partition.committedMetadata() != null && 
partition.committedMetadata().length() > config.offsetMetadataMaxSize) {
+                if (isMetadataInvalid(partition.committedMetadata())) {
                     topicResponse.partitions().add(new 
OffsetCommitResponsePartition()
                         .setPartitionIndex(partition.partitionIndex())
                         
.setErrorCode(Errors.OFFSET_METADATA_TOO_LARGE.code()));
@@ -382,6 +507,66 @@ public class OffsetMetadataManager {
         return new CoordinatorResult<>(records, response);
     }
 
+    /**
+     * Handles an TxnOffsetCommit request.
+     *
+     * @param context The request context.
+     * @param request The TxnOffsetCommit request.
+     *
+     * @return A Result containing the TxnOffsetCommitResponseData response and
+     *         a list of records to update the state machine.
+     */
+    public CoordinatorResult<TxnOffsetCommitResponseData, Record> 
commitTransactionalOffset(
+        RequestContext context,
+        TxnOffsetCommitRequestData request
+    ) throws ApiException {
+        validateTransactionalOffsetCommit(request);
+
+        final TxnOffsetCommitResponseData response = new 
TxnOffsetCommitResponseData();
+        final List<Record> records = new ArrayList<>();
+        final long currentTimeMs = time.milliseconds();
+
+        request.topics().forEach(topic -> {
+            final TxnOffsetCommitResponseTopic topicResponse = new 
TxnOffsetCommitResponseTopic().setName(topic.name());
+            response.topics().add(topicResponse);
+
+            topic.partitions().forEach(partition -> {
+                if (isMetadataInvalid(partition.committedMetadata())) {
+                    topicResponse.partitions().add(new 
TxnOffsetCommitResponsePartition()
+                        .setPartitionIndex(partition.partitionIndex())
+                        
.setErrorCode(Errors.OFFSET_METADATA_TOO_LARGE.code()));
+                } else {
+                    log.debug("[GroupId {}] Committing transactional offsets 
{} for partition {}-{} from member {} with leader epoch {}.",
+                        request.groupId(), partition.committedOffset(), 
topic.name(), partition.partitionIndex(),
+                        request.memberId(), partition.committedLeaderEpoch());
+
+                    topicResponse.partitions().add(new 
TxnOffsetCommitResponsePartition()
+                        .setPartitionIndex(partition.partitionIndex())
+                        .setErrorCode(Errors.NONE.code()));
+
+                    final OffsetAndMetadata offsetAndMetadata = 
OffsetAndMetadata.fromRequest(
+                        partition,
+                        currentTimeMs
+                    );
+
+                    records.add(RecordHelpers.newOffsetCommitRecord(
+                        request.groupId(),
+                        topic.name(),
+                        partition.partitionIndex(),
+                        offsetAndMetadata,
+                        metadataImage.features().metadataVersion()
+                    ));
+                }
+            });
+        });
+
+        if (!records.isEmpty()) {
+            metrics.record(GroupCoordinatorMetrics.OFFSET_COMMITS_SENSOR_NAME, 
records.size());
+        }
+
+        return new CoordinatorResult<>(records, response);
+    }
+
     /**
      * Handles an OffsetDelete request.
      *
@@ -398,7 +583,7 @@ public class OffsetMetadataManager {
         final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection 
responseTopicCollection =
             new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection();
         final TimelineHashMap<String, TimelineHashMap<Integer, 
OffsetAndMetadata>> offsetsByTopic =
-            offsetsByGroup.get(request.groupId());
+            offsets.offsetsByGroup.get(request.groupId());
 
         request.topics().forEach(topic -> {
             final 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection 
responsePartitionCollection =
@@ -457,7 +642,7 @@ public class OffsetMetadataManager {
         String groupId,
         List<Record> records
     ) {
-        TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> 
offsetsByTopic = offsetsByGroup.get(groupId);
+        TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> 
offsetsByTopic = offsets.offsetsByGroup.get(groupId);
         AtomicInteger numDeletedOffsets = new AtomicInteger();
 
         if (offsetsByTopic != null) {
@@ -492,7 +677,7 @@ public class OffsetMetadataManager {
 
         final List<OffsetFetchResponseData.OffsetFetchResponseTopics> 
topicResponses = new ArrayList<>(request.topics().size());
         final TimelineHashMap<String, TimelineHashMap<Integer, 
OffsetAndMetadata>> groupOffsets =
-            failAllPartitions ? null : offsetsByGroup.get(request.groupId(), 
lastCommittedOffset);
+            failAllPartitions ? null : 
offsets.offsetsByGroup.get(request.groupId(), lastCommittedOffset);
 
         request.topics().forEach(topic -> {
             final OffsetFetchResponseData.OffsetFetchResponseTopics 
topicResponse =
@@ -549,7 +734,7 @@ public class OffsetMetadataManager {
 
         final List<OffsetFetchResponseData.OffsetFetchResponseTopics> 
topicResponses = new ArrayList<>();
         final TimelineHashMap<String, TimelineHashMap<Integer, 
OffsetAndMetadata>> groupOffsets =
-            offsetsByGroup.get(request.groupId(), lastCommittedOffset);
+            offsets.offsetsByGroup.get(request.groupId(), lastCommittedOffset);
 
         if (groupOffsets != null) {
             groupOffsets.entrySet(lastCommittedOffset).forEach(topicEntry -> {
@@ -587,7 +772,8 @@ public class OffsetMetadataManager {
      * @return True if no offsets exist or if all offsets expired, false 
otherwise.
      */
     public boolean cleanupExpiredOffsets(String groupId, List<Record> records) 
{
-        TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> 
offsetsByTopic = offsetsByGroup.get(groupId);
+        TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> 
offsetsByTopic =
+            offsets.offsetsByGroup.get(groupId);
         if (offsetsByTopic == null) {
             return true;
         }
@@ -649,10 +835,13 @@ public class OffsetMetadataManager {
     /**
      * Replays OffsetCommitKey/Value to update or delete the corresponding 
offsets.
      *
-     * @param key   A OffsetCommitKey key.
-     * @param value A OffsetCommitValue value.
+     * @param producerId The producer id of the batch containing the provided
+     *                   key and value.
+     * @param key        A OffsetCommitKey key.
+     * @param value      A OffsetCommitValue value.
      */
     public void replay(
+        long producerId,
         OffsetCommitKey key,
         OffsetCommitValue value
     ) {
@@ -672,18 +861,34 @@ public class OffsetMetadataManager {
                 groupMetadataManager.getOrMaybeCreateGenericGroup(groupId, 
true);
             }
 
-            updateOffset(
-                groupId,
-                topic,
-                partition,
-                OffsetAndMetadata.fromRecord(value)
-            );
+            if (producerId == RecordBatch.NO_PRODUCER_ID) {
+                // If the offset is not part of a transaction, it is directly 
stored
+                // in the offsets store.
+                OffsetAndMetadata previousValue = offsets.put(
+                    groupId,
+                    topic,
+                    partition,
+                    OffsetAndMetadata.fromRecord(value)
+                );
+                if (previousValue == null) {
+                    metrics.incrementLocalGauge(NUM_OFFSETS);
+                }
+            } else {
+                // Otherwise, the transaction offset is stored in the pending 
transactional
+                // offsets store. Pending offsets there are moved to the main 
store when
+                // the transaction is committed; or removed when the 
transaction is aborted.
+                Offsets pendingOffsets = 
pendingTransactionalOffsets.computeIfAbsent(producerId, __ -> new Offsets());
+                pendingOffsets.put(
+                    groupId,
+                    topic,
+                    partition,
+                    OffsetAndMetadata.fromRecord(value)
+                );
+            }
         } else {
-            removeOffset(
-                groupId,
-                topic,
-                partition
-            );
+            if (offsets.remove(groupId, topic, partition) != null) {
+                metrics.decrementLocalGauge(NUM_OFFSETS);
+            }
         }
     }
 
@@ -703,70 +908,28 @@ public class OffsetMetadataManager {
      *
      * package-private for testing.
      */
-    OffsetAndMetadata offset(String groupId, String topic, int partition) {
-        TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> 
topicOffsets = offsetsByGroup.get(groupId);
-        if (topicOffsets == null) {
-            return null;
-        } else {
-            TimelineHashMap<Integer, OffsetAndMetadata> partitionOffsets = 
topicOffsets.get(topic);
-            if (partitionOffsets == null) {
-                return null;
-            } else {
-                return partitionOffsets.get(partition);
-            }
-        }
-    }
-
-    /**
-     * Updates the offset.
-     *
-     * @param groupId           The group id.
-     * @param topic             The topic name.
-     * @param partition         The partition id.
-     * @param offsetAndMetadata The offset metadata.
-     */
-    private void updateOffset(
+    OffsetAndMetadata offset(
         String groupId,
         String topic,
-        int partition,
-        OffsetAndMetadata offsetAndMetadata
+        int partition
     ) {
-        TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> 
topicOffsets = offsetsByGroup
-            .computeIfAbsent(groupId, __ -> new 
TimelineHashMap<>(snapshotRegistry, 0));
-        TimelineHashMap<Integer, OffsetAndMetadata> partitionOffsets = 
topicOffsets
-            .computeIfAbsent(topic, __ -> new 
TimelineHashMap<>(snapshotRegistry, 0));
-        if (partitionOffsets.put(partition, offsetAndMetadata) == null) {
-            metrics.incrementLocalGauge(NUM_OFFSETS);
-        }
+        return offsets.get(groupId, topic, partition);
     }
 
     /**
-     * Removes the offset.
+     * @return The pending transactional offset for the provided parameters or 
null
+     * if it does not exist.
      *
-     * @param groupId           The group id.
-     * @param topic             The topic name.
-     * @param partition         The partition id.
+     * package-private for testing.
      */
-    private void removeOffset(
+    OffsetAndMetadata pendingTransactionalOffset(
+        long producerId,
         String groupId,
         String topic,
         int partition
     ) {
-        TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> 
topicOffsets = offsetsByGroup.get(groupId);
-        if (topicOffsets == null)
-            return;
-
-        TimelineHashMap<Integer, OffsetAndMetadata> partitionOffsets = 
topicOffsets.get(topic);
-        if (partitionOffsets == null)
-            return;
-
-        partitionOffsets.remove(partition);
-        metrics.decrementLocalGauge(NUM_OFFSETS);
-
-        if (partitionOffsets.isEmpty())
-            topicOffsets.remove(topic);
-
-        if (topicOffsets.isEmpty())
-            offsetsByGroup.remove(groupId);
+        Offsets offsets = pendingTransactionalOffsets.get(producerId);
+        if (offsets == null) return null;
+        return offsets.get(groupId, topic, partition);
     }
 }
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
index 0fe5555ac31..3beaa5de1ef 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
@@ -653,12 +653,15 @@ public class ConsumerGroup implements Group {
      * @param memberId          The member id.
      * @param groupInstanceId   The group instance id.
      * @param memberEpoch       The member epoch.
+     * @param isTransactional   Whether the offset commit is transactional or 
not. It has no
+     *                          impact when a consumer group is used.
      */
     @Override
     public void validateOffsetCommit(
         String memberId,
         String groupInstanceId,
-        int memberEpoch
+        int memberEpoch,
+        boolean isTransactional
     ) throws UnknownMemberIdException, StaleMemberEpochException {
         // When the member epoch is -1, the request comes from either the 
admin client
         // or a consumer which does not use the group management facility. In 
this case,
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
index 72de3e8fd52..8903ef93213 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
@@ -809,12 +809,14 @@ public class GenericGroup implements Group {
      * @param memberId          The member id.
      * @param groupInstanceId   The group instance id.
      * @param generationId      The generation id.
+     * @param isTransactional   Whether the offset commit is transactional or 
not.
      */
     @Override
     public void validateOffsetCommit(
         String memberId,
         String groupInstanceId,
-        int generationId
+        int generationId,
+        boolean isTransactional
     ) throws CoordinatorNotAvailableException, UnknownMemberIdException, 
IllegalGenerationException, FencedInstanceIdException {
         if (isInState(DEAD)) {
             throw Errors.COORDINATOR_NOT_AVAILABLE.exception();
@@ -828,22 +830,26 @@ public class GenericGroup implements Group {
         }
 
         if (generationId >= 0 || !memberId.isEmpty() || groupInstanceId != 
null) {
-            validateMember(memberId, groupInstanceId, "offset-commit");
+            validateMember(memberId, groupInstanceId, isTransactional ? 
"offset-commit" : "txn-offset-commit");
 
             if (generationId != this.generationId) {
                 throw Errors.ILLEGAL_GENERATION.exception();
             }
-        } else if (!isInState(EMPTY)) {
+        } else if (!isTransactional && !isInState(EMPTY)) {
             // If the request does not contain the member id and the 
generation id (version 0),
             // offset commits are only accepted when the group is empty.
+            // This does not apply to transactional offset commits, since the 
older versions
+            // of this protocol do not require member id and generation id.
             throw Errors.UNKNOWN_MEMBER_ID.exception();
         }
 
-        if (isInState(COMPLETING_REBALANCE)) {
+        if (!isTransactional && isInState(COMPLETING_REBALANCE)) {
             // We should not receive a commit request if the group has not 
completed rebalance;
             // but since the consumer's member.id and generation is valid, it 
means it has received
             // the latest group generation information from the JoinResponse.
             // So let's return a REBALANCE_IN_PROGRESS to let consumer handle 
it gracefully.
+            // This does not apply to transactional offset commits, since the 
group state
+            // is not enforced for those.
             throw Errors.REBALANCE_IN_PROGRESS.exception();
         }
     }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
index 4e0e2b2d573..bb813b4f700 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -51,6 +51,8 @@ import org.apache.kafka.common.message.LeaveGroupRequestData;
 import org.apache.kafka.common.message.LeaveGroupResponseData;
 import org.apache.kafka.common.message.SyncGroupRequestData;
 import org.apache.kafka.common.message.SyncGroupResponseData;
+import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
+import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
 import org.apache.kafka.common.network.ClientInformation;
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.protocol.ApiKeys;
@@ -71,6 +73,7 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.NullSource;
 import org.junit.jupiter.params.provider.ValueSource;
 import org.mockito.ArgumentMatchers;
 import org.mockito.internal.util.collections.Sets;
@@ -1753,4 +1756,127 @@ public class GroupCoordinatorServiceTest {
             future.get()
         );
     }
+
+    @Test
+    public void testCommitTransactionalOffsetsWhenNotStarted() throws 
ExecutionException, InterruptedException {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime,
+            new GroupCoordinatorMetrics()
+        );
+
+        TxnOffsetCommitRequestData request = new TxnOffsetCommitRequestData()
+            .setGroupId("foo")
+            .setTransactionalId("transactional-id")
+            .setMemberId("member-id")
+            .setGenerationId(10)
+            .setTopics(Collections.singletonList(new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
+                .setName("topic")
+                .setPartitions(Collections.singletonList(new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
+                    .setPartitionIndex(0)
+                    .setCommittedOffset(100)))));
+
+        CompletableFuture<TxnOffsetCommitResponseData> future = 
service.commitTransactionalOffsets(
+            requestContext(ApiKeys.TXN_OFFSET_COMMIT),
+            request,
+            BufferSupplier.NO_CACHING
+        );
+
+        assertEquals(
+            new TxnOffsetCommitResponseData()
+                .setTopics(Collections.singletonList(new 
TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic()
+                    .setName("topic")
+                    .setPartitions(Collections.singletonList(new 
TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition()
+                        .setPartitionIndex(0)
+                        
.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()))))),
+            future.get()
+        );
+    }
+
+    @ParameterizedTest
+    @NullSource
+    @ValueSource(strings = {""})
+    public void testCommitTransactionalOffsetsWithInvalidGroupId(String 
groupId) throws ExecutionException, InterruptedException {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime,
+            new GroupCoordinatorMetrics()
+        );
+        service.startup(() -> 1);
+
+        TxnOffsetCommitRequestData request = new TxnOffsetCommitRequestData()
+            .setGroupId(groupId)
+            .setTransactionalId("transactional-id")
+            .setMemberId("member-id")
+            .setGenerationId(10)
+            .setTopics(Collections.singletonList(new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
+                .setName("topic")
+                .setPartitions(Collections.singletonList(new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
+                    .setPartitionIndex(0)
+                    .setCommittedOffset(100)))));
+
+        CompletableFuture<TxnOffsetCommitResponseData> future = 
service.commitTransactionalOffsets(
+            requestContext(ApiKeys.TXN_OFFSET_COMMIT),
+            request,
+            BufferSupplier.NO_CACHING
+        );
+
+        assertEquals(
+            new TxnOffsetCommitResponseData()
+                .setTopics(Collections.singletonList(new 
TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic()
+                    .setName("topic")
+                    .setPartitions(Collections.singletonList(new 
TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition()
+                        .setPartitionIndex(0)
+                        .setErrorCode(Errors.INVALID_GROUP_ID.code()))))),
+            future.get()
+        );
+    }
+
+    @Test
+    public void testCommitTransactionalOffsets() throws ExecutionException, 
InterruptedException {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime,
+            new GroupCoordinatorMetrics()
+        );
+        service.startup(() -> 1);
+
+        TxnOffsetCommitRequestData request = new TxnOffsetCommitRequestData()
+            .setGroupId("foo")
+            .setTransactionalId("transactional-id")
+            .setMemberId("member-id")
+            .setGenerationId(10)
+            .setTopics(Collections.singletonList(new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
+                .setName("topic")
+                .setPartitions(Collections.singletonList(new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
+                    .setPartitionIndex(0)
+                    .setCommittedOffset(100)))));
+
+        TxnOffsetCommitResponseData response = new 
TxnOffsetCommitResponseData()
+            .setTopics(Collections.singletonList(new 
TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic()
+                .setName("topic")
+                .setPartitions(Collections.singletonList(new 
TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition()
+                    .setPartitionIndex(0)
+                    .setErrorCode(Errors.NONE.code())))));
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("txn-commit-offset"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(response));
+
+        CompletableFuture<TxnOffsetCommitResponseData> future = 
service.commitTransactionalOffsets(
+            requestContext(ApiKeys.TXN_OFFSET_COMMIT),
+            request,
+            BufferSupplier.NO_CACHING
+        );
+
+        assertEquals(response, future.get());
+    }
 }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
index f15e30e11d6..9d0bccc012b 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
@@ -21,6 +21,8 @@ import 
org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
 import org.apache.kafka.common.message.DeleteGroupsResponseData;
 import org.apache.kafka.common.message.OffsetCommitRequestData;
 import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
+import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.RecordBatch;
@@ -143,6 +145,38 @@ public class GroupCoordinatorShardTest {
         assertEquals(result, coordinator.commitOffset(context, request));
     }
 
+    @Test
+    public void testCommitTransactionalOffset() {
+        GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+        OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
+        CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
+        CoordinatorMetricsShard metricsShard = 
mock(CoordinatorMetricsShard.class);
+        GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+            new LogContext(),
+            groupMetadataManager,
+            offsetMetadataManager,
+            Time.SYSTEM,
+            new MockCoordinatorTimer<>(new MockTime()),
+            mock(GroupCoordinatorConfig.class),
+            coordinatorMetrics,
+            metricsShard
+        );
+
+        RequestContext context = requestContext(ApiKeys.TXN_OFFSET_COMMIT);
+        TxnOffsetCommitRequestData request = new TxnOffsetCommitRequestData();
+        CoordinatorResult<TxnOffsetCommitResponseData, Record> result = new 
CoordinatorResult<>(
+            Collections.emptyList(),
+            new TxnOffsetCommitResponseData()
+        );
+
+        when(offsetMetadataManager.commitTransactionalOffset(
+            context,
+            request
+        )).thenReturn(result);
+
+        assertEquals(result, coordinator.commitTransactionalOffset(context, 
request));
+    }
+
     @Test
     public void testDeleteGroups() {
         GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
@@ -298,7 +332,54 @@ public class GroupCoordinatorShardTest {
             new ApiMessageAndVersion(value, (short) 0)
         ));
 
-        verify(offsetMetadataManager, times(2)).replay(key, value);
+        verify(offsetMetadataManager, times(2)).replay(
+            RecordBatch.NO_PRODUCER_ID,
+            key,
+            value
+        );
+    }
+
+    @Test
+    public void testReplayTransactionalOffsetCommit() {
+        GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+        OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
+        CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
+        CoordinatorMetricsShard metricsShard = 
mock(CoordinatorMetricsShard.class);
+        GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+            new LogContext(),
+            groupMetadataManager,
+            offsetMetadataManager,
+            Time.SYSTEM,
+            new MockCoordinatorTimer<>(new MockTime()),
+            mock(GroupCoordinatorConfig.class),
+            coordinatorMetrics,
+            metricsShard
+        );
+
+        OffsetCommitKey key = new OffsetCommitKey();
+        OffsetCommitValue value = new OffsetCommitValue();
+
+        coordinator.replay(100L, (short) 0, new Record(
+            new ApiMessageAndVersion(key, (short) 0),
+            new ApiMessageAndVersion(value, (short) 0)
+        ));
+
+        coordinator.replay(101L, (short) 1, new Record(
+            new ApiMessageAndVersion(key, (short) 1),
+            new ApiMessageAndVersion(value, (short) 0)
+        ));
+
+        verify(offsetMetadataManager, times(1)).replay(
+            100L,
+            key,
+            value
+        );
+
+        verify(offsetMetadataManager, times(1)).replay(
+            101L,
+            key,
+            value
+        );
     }
 
     @Test
@@ -330,7 +411,11 @@ public class GroupCoordinatorShardTest {
             null
         ));
 
-        verify(offsetMetadataManager, times(2)).replay(key, null);
+        verify(offsetMetadataManager, times(2)).replay(
+            RecordBatch.NO_PRODUCER_ID,
+            key,
+            null
+        );
     }
 
     @Test
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetAndMetadataTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetAndMetadataTest.java
index 946fa701c61..46fe369974b 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetAndMetadataTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetAndMetadataTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.coordinator.group;
 
 import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
 import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
 import org.apache.kafka.server.util.MockTime;
 import org.junit.jupiter.api.Test;
@@ -133,4 +134,46 @@ public class OffsetAndMetadataTest {
             )
         );
     }
+
+    @Test
+    public void testFromTransactionalRequest() {
+        MockTime time = new MockTime();
+
+        TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition partition =
+            new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
+                .setPartitionIndex(0)
+                .setCommittedOffset(100L)
+                .setCommittedLeaderEpoch(-1)
+                .setCommittedMetadata(null);
+
+        assertEquals(
+            new OffsetAndMetadata(
+                100L,
+                OptionalInt.empty(),
+                "",
+                time.milliseconds(),
+                OptionalLong.empty()
+            ), OffsetAndMetadata.fromRequest(
+                partition,
+                time.milliseconds()
+            )
+        );
+
+        partition
+            .setCommittedLeaderEpoch(10)
+            .setCommittedMetadata("hello");
+
+        assertEquals(
+            new OffsetAndMetadata(
+                100L,
+                OptionalInt.of(10),
+                "hello",
+                time.milliseconds(),
+                OptionalLong.empty()
+            ), OffsetAndMetadata.fromRequest(
+                partition,
+                time.milliseconds()
+            )
+        );
+    }
 }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
index 792d1f62507..bf1ef890fff 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
@@ -31,11 +31,14 @@ import 
org.apache.kafka.common.message.OffsetDeleteRequestData;
 import org.apache.kafka.common.message.OffsetDeleteResponseData;
 import org.apache.kafka.common.message.OffsetFetchRequestData;
 import org.apache.kafka.common.message.OffsetFetchResponseData;
+import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
+import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
 import org.apache.kafka.common.network.ClientInformation;
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.ApiMessage;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.requests.RequestContext;
 import org.apache.kafka.common.requests.RequestHeader;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
@@ -187,8 +190,6 @@ public class OffsetMetadataManagerTest {
             short version,
             OffsetCommitRequestData request
         ) {
-            snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset);
-
             RequestContext context = new RequestContext(
                 new RequestHeader(
                     ApiKeys.OFFSET_COMMIT,
@@ -214,6 +215,38 @@ public class OffsetMetadataManagerTest {
             return result;
         }
 
+        public CoordinatorResult<TxnOffsetCommitResponseData, Record> 
commitTransactionalOffset(
+            TxnOffsetCommitRequestData request
+        ) {
+            RequestContext context = new RequestContext(
+                new RequestHeader(
+                    ApiKeys.TXN_OFFSET_COMMIT,
+                    ApiKeys.TXN_OFFSET_COMMIT.latestVersion(),
+                    "client",
+                    0
+                ),
+                "1",
+                InetAddress.getLoopbackAddress(),
+                KafkaPrincipal.ANONYMOUS,
+                ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
+                SecurityProtocol.PLAINTEXT,
+                ClientInformation.EMPTY,
+                false
+            );
+
+            CoordinatorResult<TxnOffsetCommitResponseData, Record> result = 
offsetMetadataManager.commitTransactionalOffset(
+                context,
+                request
+            );
+
+            result.records().forEach(record -> replay(
+                request.producerId(),
+                record
+            ));
+
+            return result;
+        }
+
         public CoordinatorResult<OffsetDeleteResponseData, Record> 
deleteOffsets(
             OffsetDeleteRequestData request
         ) {
@@ -372,6 +405,16 @@ public class OffsetMetadataManagerTest {
 
         private void replay(
             Record record
+        ) {
+            replay(
+                RecordBatch.NO_PRODUCER_ID,
+                record
+            );
+        }
+
+        private void replay(
+            long producerId,
+            Record record
         ) {
             snapshotRegistry.getOrCreateSnapshot(lastWrittenOffset);
 
@@ -385,6 +428,7 @@ public class OffsetMetadataManagerTest {
             switch (key.version()) {
                 case OffsetCommitKey.HIGHEST_SUPPORTED_VERSION:
                     offsetMetadataManager.replay(
+                        producerId,
                         (OffsetCommitKey) key.message(),
                         (OffsetCommitValue) messageOrNull(value)
                     );
@@ -1276,6 +1320,324 @@ public class OffsetMetadataManagerTest {
         );
     }
 
+    @Test
+    public void testConsumerGroupTransactionalOffsetCommit() {
+        OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
+
+        // Create an empty group.
+        ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(
+            "foo",
+            true
+        );
+
+        // Add member.
+        group.updateMember(new ConsumerGroupMember.Builder("member")
+            .setMemberEpoch(10)
+            .setTargetMemberEpoch(10)
+            .setPreviousMemberEpoch(10)
+            .build()
+        );
+
+        CoordinatorResult<TxnOffsetCommitResponseData, Record> result = 
context.commitTransactionalOffset(
+            new TxnOffsetCommitRequestData()
+                .setGroupId("foo")
+                .setMemberId("member")
+                .setGenerationId(10)
+                .setTopics(Collections.singletonList(
+                    new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
+                        .setName("bar")
+                        .setPartitions(Collections.singletonList(
+                            new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
+                                .setPartitionIndex(0)
+                                .setCommittedOffset(100L)
+                                .setCommittedLeaderEpoch(10)
+                                .setCommittedMetadata("metadata")
+                        ))
+                ))
+        );
+
+        assertEquals(
+            new TxnOffsetCommitResponseData()
+                .setTopics(Collections.singletonList(
+                    new 
TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic()
+                        .setName("bar")
+                        .setPartitions(Collections.singletonList(
+                            new 
TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition()
+                                .setPartitionIndex(0)
+                                .setErrorCode(Errors.NONE.code())
+                        ))
+                )),
+            result.response()
+        );
+
+        assertEquals(
+            Collections.singletonList(RecordHelpers.newOffsetCommitRecord(
+                "foo",
+                "bar",
+                0,
+                new OffsetAndMetadata(
+                    100L,
+                    OptionalInt.of(10),
+                    "metadata",
+                    context.time.milliseconds(),
+                    OptionalLong.empty()
+                ),
+                MetadataImage.EMPTY.features().metadataVersion()
+            )),
+            result.records()
+        );
+    }
+
+    @Test
+    public void testConsumerGroupTransactionalOffsetCommitWithUnknownGroupId() 
{
+        OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
+
+        assertThrows(IllegalGenerationException.class, () -> 
context.commitTransactionalOffset(
+            new TxnOffsetCommitRequestData()
+                .setGroupId("foo")
+                .setMemberId("member")
+                .setGenerationId(10)
+                .setTopics(Collections.singletonList(
+                    new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
+                        .setName("bar")
+                        .setPartitions(Collections.singletonList(
+                            new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
+                                .setPartitionIndex(0)
+                                .setCommittedOffset(100L)
+                                .setCommittedLeaderEpoch(10)
+                                .setCommittedMetadata("metadata")
+                        ))
+                ))
+        ));
+    }
+
+    @Test
+    public void 
testConsumerGroupTransactionalOffsetCommitWithUnknownMemberId() {
+        OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
+
+        // Create an empty group.
+        context.groupMetadataManager.getOrMaybeCreateConsumerGroup(
+            "foo",
+            true
+        );
+
+        assertThrows(UnknownMemberIdException.class, () -> 
context.commitTransactionalOffset(
+            new TxnOffsetCommitRequestData()
+                .setGroupId("foo")
+                .setMemberId("member")
+                .setGenerationId(10)
+                .setTopics(Collections.singletonList(
+                    new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
+                        .setName("bar")
+                        .setPartitions(Collections.singletonList(
+                            new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
+                                .setPartitionIndex(0)
+                                .setCommittedOffset(100L)
+                                .setCommittedLeaderEpoch(10)
+                                .setCommittedMetadata("metadata")
+                        ))
+                ))
+        ));
+    }
+
+    @Test
+    public void 
testConsumerGroupTransactionalOffsetCommitWithStaleMemberEpoch() {
+        OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
+
+        // Create an empty group.
+        ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(
+            "foo",
+            true
+        );
+
+        // Add member.
+        group.updateMember(new ConsumerGroupMember.Builder("member")
+            .setMemberEpoch(10)
+            .setTargetMemberEpoch(10)
+            .setPreviousMemberEpoch(10)
+            .build()
+        );
+
+        assertThrows(IllegalGenerationException.class, () -> 
context.commitTransactionalOffset(
+            new TxnOffsetCommitRequestData()
+                .setGroupId("foo")
+                .setMemberId("member")
+                .setGenerationId(100)
+                .setTopics(Collections.singletonList(
+                    new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
+                        .setName("bar")
+                        .setPartitions(Collections.singletonList(
+                            new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
+                                .setPartitionIndex(0)
+                                .setCommittedOffset(100L)
+                                .setCommittedLeaderEpoch(10)
+                                .setCommittedMetadata("metadata")
+                        ))
+                ))
+        ));
+    }
+
+    @Test
+    public void testGenericGroupTransactionalOffsetCommit() {
+        OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
+
+        // Create a group.
+        GenericGroup group = 
context.groupMetadataManager.getOrMaybeCreateGenericGroup(
+            "foo",
+            true
+        );
+
+        // Add member.
+        GenericGroupMember member = mkGenericMember("member", 
Optional.empty());
+        group.add(member);
+
+        // Transition to next generation.
+        group.transitionTo(GenericGroupState.PREPARING_REBALANCE);
+        group.initNextGeneration();
+        assertEquals(1, group.generationId());
+        group.transitionTo(GenericGroupState.STABLE);
+
+        CoordinatorResult<TxnOffsetCommitResponseData, Record> result = 
context.commitTransactionalOffset(
+            new TxnOffsetCommitRequestData()
+                .setGroupId("foo")
+                .setMemberId("member")
+                .setGenerationId(1)
+                .setTopics(Collections.singletonList(
+                    new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
+                        .setName("bar")
+                        .setPartitions(Collections.singletonList(
+                            new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
+                                .setPartitionIndex(0)
+                                .setCommittedOffset(100L)
+                                .setCommittedLeaderEpoch(10)
+                                .setCommittedMetadata("metadata")
+                        ))
+                ))
+        );
+
+        assertEquals(
+            new TxnOffsetCommitResponseData()
+                .setTopics(Collections.singletonList(
+                    new 
TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic()
+                        .setName("bar")
+                        .setPartitions(Collections.singletonList(
+                            new 
TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition()
+                                .setPartitionIndex(0)
+                                .setErrorCode(Errors.NONE.code())
+                        ))
+                )),
+            result.response()
+        );
+
+        assertEquals(
+            Collections.singletonList(RecordHelpers.newOffsetCommitRecord(
+                "foo",
+                "bar",
+                0,
+                new OffsetAndMetadata(
+                    100L,
+                    OptionalInt.of(10),
+                    "metadata",
+                    context.time.milliseconds(),
+                    OptionalLong.empty()
+                ),
+                MetadataImage.EMPTY.features().metadataVersion()
+            )),
+            result.records()
+        );
+    }
+
+    @Test
+    public void testGenericGroupTransactionalOffsetCommitWithUnknownGroupId() {
+        OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
+
+        assertThrows(IllegalGenerationException.class, () -> 
context.commitTransactionalOffset(
+            new TxnOffsetCommitRequestData()
+                .setGroupId("foo")
+                .setMemberId("member")
+                .setGenerationId(10)
+                .setTopics(Collections.singletonList(
+                    new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
+                        .setName("bar")
+                        .setPartitions(Collections.singletonList(
+                            new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
+                                .setPartitionIndex(0)
+                                .setCommittedOffset(100L)
+                                .setCommittedLeaderEpoch(10)
+                                .setCommittedMetadata("metadata")
+                        ))
+                ))
+        ));
+    }
+
+    @Test
+    public void testGenericGroupTransactionalOffsetCommitWithUnknownMemberId() 
{
+        OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
+
+        // Create an empty group.
+        context.groupMetadataManager.getOrMaybeCreateGenericGroup(
+            "foo",
+            true
+        );
+
+        assertThrows(UnknownMemberIdException.class, () -> 
context.commitTransactionalOffset(
+            new TxnOffsetCommitRequestData()
+                .setGroupId("foo")
+                .setMemberId("member")
+                .setGenerationId(10)
+                .setTopics(Collections.singletonList(
+                    new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
+                        .setName("bar")
+                        .setPartitions(Collections.singletonList(
+                            new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
+                                .setPartitionIndex(0)
+                                .setCommittedOffset(100L)
+                                .setCommittedLeaderEpoch(10)
+                                .setCommittedMetadata("metadata")
+                        ))
+                ))
+        ));
+    }
+
+    @Test
+    public void 
testGenericGroupTransactionalOffsetCommitWithIllegalGenerationId() {
+        OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
+
+        // Create a group.
+        GenericGroup group = 
context.groupMetadataManager.getOrMaybeCreateGenericGroup(
+            "foo",
+            true
+        );
+
+        // Add member.
+        GenericGroupMember member = mkGenericMember("member", 
Optional.empty());
+        group.add(member);
+
+        // Transition to next generation.
+        group.transitionTo(GenericGroupState.PREPARING_REBALANCE);
+        group.initNextGeneration();
+        assertEquals(1, group.generationId());
+        group.transitionTo(GenericGroupState.STABLE);
+
+        assertThrows(IllegalGenerationException.class, () -> 
context.commitTransactionalOffset(
+            new TxnOffsetCommitRequestData()
+                .setGroupId("foo")
+                .setMemberId("member")
+                .setGenerationId(100)
+                .setTopics(Collections.singletonList(
+                    new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
+                        .setName("bar")
+                        .setPartitions(Collections.singletonList(
+                            new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
+                                .setPartitionIndex(0)
+                                .setCommittedOffset(100L)
+                                .setCommittedLeaderEpoch(10)
+                                .setCommittedMetadata("metadata")
+                        ))
+                ))
+        ));
+    }
+
     @Test
     public void testGenericGroupFetchOffsetsWithDeadGroup() {
         OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
@@ -1975,6 +2337,59 @@ public class OffsetMetadataManagerTest {
         ));
     }
 
+    @Test
+    public void testTransactionalReplay() {
+        OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
+
+        verifyTransactionalReplay(context, 5, "foo", "bar", 0, new 
OffsetAndMetadata(
+            100L,
+            OptionalInt.empty(),
+            "small",
+            context.time.milliseconds(),
+            OptionalLong.empty()
+        ));
+
+        verifyTransactionalReplay(context, 5, "foo", "bar", 1, new 
OffsetAndMetadata(
+            101L,
+            OptionalInt.empty(),
+            "small",
+            context.time.milliseconds(),
+            OptionalLong.empty()
+        ));
+
+        verifyTransactionalReplay(context, 5, "bar", "zar", 0, new 
OffsetAndMetadata(
+            100L,
+            OptionalInt.empty(),
+            "small",
+            context.time.milliseconds(),
+            OptionalLong.empty()
+        ));
+
+        verifyTransactionalReplay(context, 5, "bar", "zar", 1, new 
OffsetAndMetadata(
+            101L,
+            OptionalInt.empty(),
+            "small",
+            context.time.milliseconds(),
+            OptionalLong.empty()
+        ));
+
+        verifyTransactionalReplay(context, 6, "foo", "bar", 2, new 
OffsetAndMetadata(
+            102L,
+            OptionalInt.empty(),
+            "small",
+            context.time.milliseconds(),
+            OptionalLong.empty()
+        ));
+
+        verifyTransactionalReplay(context, 6, "foo", "bar", 3, new 
OffsetAndMetadata(
+            102L,
+            OptionalInt.empty(),
+            "small",
+            context.time.milliseconds(),
+            OptionalLong.empty()
+        ));
+    }
+
     @Test
     public void testReplayWithTombstone() {
         OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
@@ -2143,6 +2558,30 @@ public class OffsetMetadataManagerTest {
         ));
     }
 
+    private void verifyTransactionalReplay(
+        OffsetMetadataManagerTestContext context,
+        long producerId,
+        String groupId,
+        String topic,
+        int partition,
+        OffsetAndMetadata offsetAndMetadata
+    ) {
+        context.replay(producerId, RecordHelpers.newOffsetCommitRecord(
+            groupId,
+            topic,
+            partition,
+            offsetAndMetadata,
+            MetadataImage.EMPTY.features().metadataVersion()
+        ));
+
+        assertEquals(offsetAndMetadata, 
context.offsetMetadataManager.pendingTransactionalOffset(
+            producerId,
+            groupId,
+            topic,
+            partition
+        ));
+    }
+
     private GenericGroupMember mkGenericMember(
         String memberId,
         Optional<String> groupInstanceId
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
index f54ad25e321..7263181322f 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
@@ -32,6 +32,8 @@ import 
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
 import org.apache.kafka.image.MetadataImage;
 import org.apache.kafka.timeline.SnapshotRegistry;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -670,31 +672,32 @@ public class ConsumerGroupTest {
         assertEquals(0, group.metadataRefreshDeadline().epoch);
     }
 
-    @Test
-    public void testValidateOffsetCommit() {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void testValidateOffsetCommit(boolean isTransactional) {
         ConsumerGroup group = createConsumerGroup("group-foo");
 
         // Simulate a call from the admin client without member id and member 
epoch.
         // This should pass only if the group is empty.
-        group.validateOffsetCommit("", "", -1);
+        group.validateOffsetCommit("", "", -1, isTransactional);
 
         // The member does not exist.
         assertThrows(UnknownMemberIdException.class, () ->
-            group.validateOffsetCommit("member-id", null, 0));
+            group.validateOffsetCommit("member-id", null, 0, isTransactional));
 
         // Create a member.
         group.getOrMaybeCreateMember("member-id", true);
 
         // A call from the admin client should fail as the group is not empty.
         assertThrows(UnknownMemberIdException.class, () ->
-            group.validateOffsetCommit("", "", -1));
+            group.validateOffsetCommit("", "", -1, isTransactional));
 
         // The member epoch is stale.
         assertThrows(StaleMemberEpochException.class, () ->
-            group.validateOffsetCommit("member-id", "", 10));
+            group.validateOffsetCommit("member-id", "", 10, isTransactional));
 
         // This should succeed.
-        group.validateOffsetCommit("member-id", "", 0);
+        group.validateOffsetCommit("member-id", "", 0, isTransactional);
     }
 
     @Test
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java
index e5e71aaf2ab..6aa36541a5b 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java
@@ -984,7 +984,7 @@ public class GenericGroupTest {
     @Test
     public void testValidateOffsetCommit() {
         // A call from the admin client without any parameters should pass.
-        group.validateOffsetCommit("", "", -1);
+        group.validateOffsetCommit("", "", -1, false);
 
         // Add a member.
         group.add(new GenericGroupMember(
@@ -1006,36 +1006,40 @@ public class GenericGroupTest {
 
         // No parameters and the group is not empty.
         assertThrows(UnknownMemberIdException.class,
-            () -> group.validateOffsetCommit("", "", -1));
+            () -> group.validateOffsetCommit("", "", -1, false));
+
+        // A transactional offset commit without any parameters
+        // and a non-empty group is accepted.
+        group.validateOffsetCommit("", null, -1, true);
 
         // The member id does not exist.
         assertThrows(UnknownMemberIdException.class,
-            () -> group.validateOffsetCommit("unknown", "unknown", -1));
+            () -> group.validateOffsetCommit("unknown", "unknown", -1, false));
 
         // The instance id does not exist.
         assertThrows(UnknownMemberIdException.class,
-            () -> group.validateOffsetCommit("member-id", "unknown", -1));
+            () -> group.validateOffsetCommit("member-id", "unknown", -1, 
false));
 
         // The generation id is invalid.
         assertThrows(IllegalGenerationException.class,
-            () -> group.validateOffsetCommit("member-id", "instance-id", 0));
+            () -> group.validateOffsetCommit("member-id", "instance-id", 0, 
false));
 
         // Group is in prepare rebalance state.
         assertThrows(RebalanceInProgressException.class,
-            () -> group.validateOffsetCommit("member-id", "instance-id", 1));
+            () -> group.validateOffsetCommit("member-id", "instance-id", 1, 
false));
 
         // Group transitions to stable.
         group.transitionTo(STABLE);
 
         // This should work.
-        group.validateOffsetCommit("member-id", "instance-id", 1);
+        group.validateOffsetCommit("member-id", "instance-id", 1, false);
 
         // Replace static member.
         group.replaceStaticMember("instance-id", "member-id", "new-member-id");
 
         // The old instance id should be fenced.
         assertThrows(FencedInstanceIdException.class,
-            () -> group.validateOffsetCommit("member-id", "instance-id", 1));
+            () -> group.validateOffsetCommit("member-id", "instance-id", 1, 
false));
 
         // Remove member and transitions to dead.
         group.remove("new-instance-id");
@@ -1043,7 +1047,7 @@ public class GenericGroupTest {
 
         // This should fail with CoordinatorNotAvailableException.
         assertThrows(CoordinatorNotAvailableException.class,
-            () -> group.validateOffsetCommit("member-id", "new-instance-id", 
1));
+            () -> group.validateOffsetCommit("member-id", "new-instance-id", 
1, false));
     }
 
     @Test

Reply via email to