This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 522c2864cd5 KAFKA-14505; [2/N] Implement TxnOffsetCommit API (#14845)
522c2864cd5 is described below
commit 522c2864cd53e1a61dbc296a8cda062abf1002c5
Author: David Jacot <[email protected]>
AuthorDate: Thu Dec 7 11:51:22 2023 +0100
KAFKA-14505; [2/N] Implement TxnOffsetCommit API (#14845)
This patch implements the TxnOffsetCommit API. When a transactional offset
commit is received, it is stored in the pending transactional offsets structure
and waits there until the transaction is committed or aborted. Note that the
handling of the transaction completion is not implemented in this patch.
Reviewers: Justine Olshan <[email protected]>
---
checkstyle/suppressions.xml | 2 +-
.../org/apache/kafka/coordinator/group/Group.java | 4 +-
.../coordinator/group/GroupCoordinatorService.java | 18 +-
.../coordinator/group/GroupCoordinatorShard.java | 19 +
.../kafka/coordinator/group/OffsetAndMetadata.java | 18 +
.../coordinator/group/OffsetMetadataManager.java | 315 +++++++++++----
.../coordinator/group/consumer/ConsumerGroup.java | 5 +-
.../coordinator/group/generic/GenericGroup.java | 14 +-
.../group/GroupCoordinatorServiceTest.java | 126 ++++++
.../group/GroupCoordinatorShardTest.java | 89 ++++-
.../coordinator/group/OffsetAndMetadataTest.java | 43 ++
.../group/OffsetMetadataManagerTest.java | 443 ++++++++++++++++++++-
.../group/consumer/ConsumerGroupTest.java | 17 +-
.../group/generic/GenericGroupTest.java | 22 +-
14 files changed, 1028 insertions(+), 107 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 5e582599a40..bd31d3f3b80 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -338,7 +338,7 @@
<suppress checks="ParameterNumber"
files="(ConsumerGroupMember|GroupMetadataManager|GroupCoordinatorConfig).java"/>
<suppress checks="ClassDataAbstractionCouplingCheck"
-
files="(RecordHelpersTest|GroupMetadataManager|GroupMetadataManagerTest|GroupCoordinatorServiceTest|GroupCoordinatorShardTest).java"/>
+
files="(RecordHelpersTest|GroupMetadataManager|GroupMetadataManagerTest|OffsetMetadataManagerTest|GroupCoordinatorServiceTest|GroupCoordinatorShardTest).java"/>
<suppress checks="JavaNCSS"
files="GroupMetadataManagerTest.java"/>
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
index 0cb10b12a51..9939b25737a 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
@@ -74,11 +74,13 @@ public interface Group {
* @param groupInstanceId The group instance id.
* @param generationIdOrMemberEpoch The generation id for genetic groups
or the member epoch
* for consumer groups.
+ * @param isTransactional Whether the offset commit is
transactional or not.
*/
void validateOffsetCommit(
String memberId,
String groupInstanceId,
- int generationIdOrMemberEpoch
+ int generationIdOrMemberEpoch,
+ boolean isTransactional
) throws KafkaException;
/**
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index a05402d410b..ec0de853d53 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -77,7 +77,6 @@ import
org.apache.kafka.coordinator.group.runtime.PartitionWriter;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.record.BrokerCompressionType;
-import org.apache.kafka.server.util.FutureUtils;
import org.apache.kafka.server.util.timer.Timer;
import org.slf4j.Logger;
@@ -884,9 +883,20 @@ public class GroupCoordinatorService implements
GroupCoordinator {
));
}
- return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
- "This API is not implemented yet."
- ));
+ if (!isGroupIdNotEmpty(request.groupId())) {
+ return
CompletableFuture.completedFuture(TxnOffsetCommitRequest.getErrorResponse(
+ request,
+ Errors.INVALID_GROUP_ID
+ ));
+ }
+
+ return runtime.scheduleWriteOperation(
+ "txn-commit-offset",
+ topicPartitionFor(request.groupId()),
+ coordinator -> coordinator.commitTransactionalOffset(context,
request)
+ ).exceptionally(exception ->
+ TxnOffsetCommitRequest.getErrorResponse(request,
normalizeException(exception))
+ );
}
/**
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
index 3364260a5b5..b68549ef157 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
@@ -38,6 +38,8 @@ import
org.apache.kafka.common.message.OffsetFetchResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
+import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.RequestContext;
@@ -454,6 +456,22 @@ public class GroupCoordinatorShard implements
CoordinatorShard<Record> {
return offsetMetadataManager.commitOffset(context, request);
}
+ /**
+ * Handles an TxnOffsetCommit request.
+ *
+ * @param context The request context.
+ * @param request The actual TxnOffsetCommit request.
+ *
+ * @return A Result containing the TxnOffsetCommitResponse response and
+ * a list of records to update the state machine.
+ */
+ public CoordinatorResult<TxnOffsetCommitResponseData, Record>
commitTransactionalOffset(
+ RequestContext context,
+ TxnOffsetCommitRequestData request
+ ) throws ApiException {
+ return offsetMetadataManager.commitTransactionalOffset(context,
request);
+ }
+
/**
* Handles a ListGroups request.
*
@@ -638,6 +656,7 @@ public class GroupCoordinatorShard implements
CoordinatorShard<Record> {
case 0:
case 1:
offsetMetadataManager.replay(
+ producerId,
(OffsetCommitKey) key.message(),
(OffsetCommitValue) messageOrNull(value)
);
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetAndMetadata.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetAndMetadata.java
index cf0b936917d..0eb1afaead5 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetAndMetadata.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetAndMetadata.java
@@ -17,6 +17,7 @@
package org.apache.kafka.coordinator.group;
import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
@@ -141,4 +142,21 @@ public class OffsetAndMetadata {
expireTimestampMs
);
}
+
+ /**
+ * @return An OffsetAndMetadata created from an
OffsetCommitRequestPartition request.
+ */
+ public static OffsetAndMetadata fromRequest(
+ TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition partition,
+ long currentTimeMs
+ ) {
+ return new OffsetAndMetadata(
+ partition.committedOffset(),
+ ofSentinel(partition.committedLeaderEpoch()),
+ partition.committedMetadata() == null ?
+ OffsetAndMetadata.NO_METADATA : partition.committedMetadata(),
+ currentTimeMs,
+ OptionalLong.empty()
+ );
+ }
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
index faff257d584..fce7140559f 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
@@ -28,7 +28,12 @@ import
org.apache.kafka.common.message.OffsetDeleteRequestData;
import org.apache.kafka.common.message.OffsetFetchRequestData;
import org.apache.kafka.common.message.OffsetFetchResponseData;
import org.apache.kafka.common.message.OffsetDeleteResponseData;
+import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
+import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
+import
org.apache.kafka.common.message.TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic;
+import
org.apache.kafka.common.message.TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition;
import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.utils.LogContext;
@@ -178,9 +183,83 @@ public class OffsetMetadataManager {
private final GroupCoordinatorConfig config;
/**
- * The offsets keyed by group id, topic name and partition id.
+ * The committed offsets.
*/
- private final TimelineHashMap<String, TimelineHashMap<String,
TimelineHashMap<Integer, OffsetAndMetadata>>> offsetsByGroup;
+ private final Offsets offsets;
+
+ /**
+ * The pending transactional offsets keyed by producer id. This structure
holds all the
+ * transactional offsets that are part of ongoing transactions. When the
transaction is
+ * committed, they are transferred to `offsets`; when the transaction is
aborted, they
+ * are removed.
+ */
+ private final TimelineHashMap<Long, Offsets> pendingTransactionalOffsets;
+
+ private class Offsets {
+ /**
+ * The offsets keyed by group id, topic name and partition id.
+ */
+ private final TimelineHashMap<String, TimelineHashMap<String,
TimelineHashMap<Integer, OffsetAndMetadata>>> offsetsByGroup;
+
+ private Offsets() {
+ this.offsetsByGroup = new TimelineHashMap<>(snapshotRegistry, 0);
+ }
+
+ private OffsetAndMetadata get(
+ String groupId,
+ String topic,
+ int partition
+ ) {
+ TimelineHashMap<String, TimelineHashMap<Integer,
OffsetAndMetadata>> topicOffsets = offsetsByGroup.get(groupId);
+ if (topicOffsets == null) {
+ return null;
+ } else {
+ TimelineHashMap<Integer, OffsetAndMetadata> partitionOffsets =
topicOffsets.get(topic);
+ if (partitionOffsets == null) {
+ return null;
+ } else {
+ return partitionOffsets.get(partition);
+ }
+ }
+ }
+
+ private OffsetAndMetadata put(
+ String groupId,
+ String topic,
+ int partition,
+ OffsetAndMetadata offsetAndMetadata
+ ) {
+ TimelineHashMap<String, TimelineHashMap<Integer,
OffsetAndMetadata>> topicOffsets = offsetsByGroup
+ .computeIfAbsent(groupId, __ -> new
TimelineHashMap<>(snapshotRegistry, 0));
+ TimelineHashMap<Integer, OffsetAndMetadata> partitionOffsets =
topicOffsets
+ .computeIfAbsent(topic, __ -> new
TimelineHashMap<>(snapshotRegistry, 0));
+ return partitionOffsets.put(partition, offsetAndMetadata);
+ }
+
+ private OffsetAndMetadata remove(
+ String groupId,
+ String topic,
+ int partition
+ ) {
+ TimelineHashMap<String, TimelineHashMap<Integer,
OffsetAndMetadata>> topicOffsets = offsetsByGroup.get(groupId);
+ if (topicOffsets == null)
+ return null;
+
+ TimelineHashMap<Integer, OffsetAndMetadata> partitionOffsets =
topicOffsets.get(topic);
+ if (partitionOffsets == null)
+ return null;
+
+ OffsetAndMetadata removedValue =
partitionOffsets.remove(partition);
+
+ if (partitionOffsets.isEmpty())
+ topicOffsets.remove(topic);
+
+ if (topicOffsets.isEmpty())
+ offsetsByGroup.remove(groupId);
+
+ return removedValue;
+ }
+ }
OffsetMetadataManager(
SnapshotRegistry snapshotRegistry,
@@ -198,7 +277,8 @@ public class OffsetMetadataManager {
this.groupMetadataManager = groupMetadataManager;
this.config = config;
this.metrics = metrics;
- this.offsetsByGroup = new TimelineHashMap<>(snapshotRegistry, 0);
+ this.offsets = new Offsets();
+ this.pendingTransactionalOffsets = new
TimelineHashMap<>(snapshotRegistry, 0);
}
/**
@@ -239,7 +319,8 @@ public class OffsetMetadataManager {
group.validateOffsetCommit(
request.memberId(),
request.groupInstanceId(),
- request.generationIdOrMemberEpoch()
+ request.generationIdOrMemberEpoch(),
+ false
);
} catch (StaleMemberEpochException ex) {
// The STALE_MEMBER_EPOCH error is only returned for new consumer
group (KIP-848). When
@@ -256,6 +337,43 @@ public class OffsetMetadataManager {
return group;
}
+ /**
+ * Validates an TxnOffsetCommit request.
+ *
+ * @param request The actual request.
+ */
+ private Group validateTransactionalOffsetCommit(
+ TxnOffsetCommitRequestData request
+ ) throws ApiException {
+ Group group;
+ try {
+ group = groupMetadataManager.group(request.groupId());
+ } catch (GroupIdNotFoundException ex) {
+ if (request.generationId() < 0) {
+ // If the group does not exist and generation id is -1, the
request comes from
+ // either the admin client or a consumer which does not use
the group management
+ // facility. In this case, a so-called simple group is created
and the request
+ // is accepted.
+ group =
groupMetadataManager.getOrMaybeCreateGenericGroup(request.groupId(), true);
+ } else {
+ throw Errors.ILLEGAL_GENERATION.exception();
+ }
+ }
+
+ try {
+ group.validateOffsetCommit(
+ request.memberId(),
+ request.groupInstanceId(),
+ request.generationId(),
+ true
+ );
+ } catch (StaleMemberEpochException ex) {
+ throw Errors.ILLEGAL_GENERATION.exception();
+ }
+
+ return group;
+ }
+
/**
* Validates an OffsetFetch request.
*
@@ -287,6 +405,13 @@ public class OffsetMetadataManager {
return group;
}
+ /**
+ * @return True if the committed metadata is invalid; False otherwise.
+ */
+ private boolean isMetadataInvalid(String metadata) {
+ return metadata != null && metadata.length() >
config.offsetMetadataMaxSize;
+ }
+
/**
* Computes the expiration timestamp based on the retention time provided
in the OffsetCommit
* request.
@@ -345,7 +470,7 @@ public class OffsetMetadataManager {
response.topics().add(topicResponse);
topic.partitions().forEach(partition -> {
- if (partition.committedMetadata() != null &&
partition.committedMetadata().length() > config.offsetMetadataMaxSize) {
+ if (isMetadataInvalid(partition.committedMetadata())) {
topicResponse.partitions().add(new
OffsetCommitResponsePartition()
.setPartitionIndex(partition.partitionIndex())
.setErrorCode(Errors.OFFSET_METADATA_TOO_LARGE.code()));
@@ -382,6 +507,66 @@ public class OffsetMetadataManager {
return new CoordinatorResult<>(records, response);
}
+ /**
+ * Handles an TxnOffsetCommit request.
+ *
+ * @param context The request context.
+ * @param request The TxnOffsetCommit request.
+ *
+ * @return A Result containing the TxnOffsetCommitResponseData response and
+ * a list of records to update the state machine.
+ */
+ public CoordinatorResult<TxnOffsetCommitResponseData, Record>
commitTransactionalOffset(
+ RequestContext context,
+ TxnOffsetCommitRequestData request
+ ) throws ApiException {
+ validateTransactionalOffsetCommit(request);
+
+ final TxnOffsetCommitResponseData response = new
TxnOffsetCommitResponseData();
+ final List<Record> records = new ArrayList<>();
+ final long currentTimeMs = time.milliseconds();
+
+ request.topics().forEach(topic -> {
+ final TxnOffsetCommitResponseTopic topicResponse = new
TxnOffsetCommitResponseTopic().setName(topic.name());
+ response.topics().add(topicResponse);
+
+ topic.partitions().forEach(partition -> {
+ if (isMetadataInvalid(partition.committedMetadata())) {
+ topicResponse.partitions().add(new
TxnOffsetCommitResponsePartition()
+ .setPartitionIndex(partition.partitionIndex())
+
.setErrorCode(Errors.OFFSET_METADATA_TOO_LARGE.code()));
+ } else {
+ log.debug("[GroupId {}] Committing transactional offsets
{} for partition {}-{} from member {} with leader epoch {}.",
+ request.groupId(), partition.committedOffset(),
topic.name(), partition.partitionIndex(),
+ request.memberId(), partition.committedLeaderEpoch());
+
+ topicResponse.partitions().add(new
TxnOffsetCommitResponsePartition()
+ .setPartitionIndex(partition.partitionIndex())
+ .setErrorCode(Errors.NONE.code()));
+
+ final OffsetAndMetadata offsetAndMetadata =
OffsetAndMetadata.fromRequest(
+ partition,
+ currentTimeMs
+ );
+
+ records.add(RecordHelpers.newOffsetCommitRecord(
+ request.groupId(),
+ topic.name(),
+ partition.partitionIndex(),
+ offsetAndMetadata,
+ metadataImage.features().metadataVersion()
+ ));
+ }
+ });
+ });
+
+ if (!records.isEmpty()) {
+ metrics.record(GroupCoordinatorMetrics.OFFSET_COMMITS_SENSOR_NAME,
records.size());
+ }
+
+ return new CoordinatorResult<>(records, response);
+ }
+
/**
* Handles an OffsetDelete request.
*
@@ -398,7 +583,7 @@ public class OffsetMetadataManager {
final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection
responseTopicCollection =
new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection();
final TimelineHashMap<String, TimelineHashMap<Integer,
OffsetAndMetadata>> offsetsByTopic =
- offsetsByGroup.get(request.groupId());
+ offsets.offsetsByGroup.get(request.groupId());
request.topics().forEach(topic -> {
final
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection
responsePartitionCollection =
@@ -457,7 +642,7 @@ public class OffsetMetadataManager {
String groupId,
List<Record> records
) {
- TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>>
offsetsByTopic = offsetsByGroup.get(groupId);
+ TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>>
offsetsByTopic = offsets.offsetsByGroup.get(groupId);
AtomicInteger numDeletedOffsets = new AtomicInteger();
if (offsetsByTopic != null) {
@@ -492,7 +677,7 @@ public class OffsetMetadataManager {
final List<OffsetFetchResponseData.OffsetFetchResponseTopics>
topicResponses = new ArrayList<>(request.topics().size());
final TimelineHashMap<String, TimelineHashMap<Integer,
OffsetAndMetadata>> groupOffsets =
- failAllPartitions ? null : offsetsByGroup.get(request.groupId(),
lastCommittedOffset);
+ failAllPartitions ? null :
offsets.offsetsByGroup.get(request.groupId(), lastCommittedOffset);
request.topics().forEach(topic -> {
final OffsetFetchResponseData.OffsetFetchResponseTopics
topicResponse =
@@ -549,7 +734,7 @@ public class OffsetMetadataManager {
final List<OffsetFetchResponseData.OffsetFetchResponseTopics>
topicResponses = new ArrayList<>();
final TimelineHashMap<String, TimelineHashMap<Integer,
OffsetAndMetadata>> groupOffsets =
- offsetsByGroup.get(request.groupId(), lastCommittedOffset);
+ offsets.offsetsByGroup.get(request.groupId(), lastCommittedOffset);
if (groupOffsets != null) {
groupOffsets.entrySet(lastCommittedOffset).forEach(topicEntry -> {
@@ -587,7 +772,8 @@ public class OffsetMetadataManager {
* @return True if no offsets exist or if all offsets expired, false
otherwise.
*/
public boolean cleanupExpiredOffsets(String groupId, List<Record> records)
{
- TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>>
offsetsByTopic = offsetsByGroup.get(groupId);
+ TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>>
offsetsByTopic =
+ offsets.offsetsByGroup.get(groupId);
if (offsetsByTopic == null) {
return true;
}
@@ -649,10 +835,13 @@ public class OffsetMetadataManager {
/**
* Replays OffsetCommitKey/Value to update or delete the corresponding
offsets.
*
- * @param key A OffsetCommitKey key.
- * @param value A OffsetCommitValue value.
+ * @param producerId The producer id of the batch containing the provided
+ * key and value.
+ * @param key A OffsetCommitKey key.
+ * @param value A OffsetCommitValue value.
*/
public void replay(
+ long producerId,
OffsetCommitKey key,
OffsetCommitValue value
) {
@@ -672,18 +861,34 @@ public class OffsetMetadataManager {
groupMetadataManager.getOrMaybeCreateGenericGroup(groupId,
true);
}
- updateOffset(
- groupId,
- topic,
- partition,
- OffsetAndMetadata.fromRecord(value)
- );
+ if (producerId == RecordBatch.NO_PRODUCER_ID) {
+ // If the offset is not part of a transaction, it is directly
stored
+ // in the offsets store.
+ OffsetAndMetadata previousValue = offsets.put(
+ groupId,
+ topic,
+ partition,
+ OffsetAndMetadata.fromRecord(value)
+ );
+ if (previousValue == null) {
+ metrics.incrementLocalGauge(NUM_OFFSETS);
+ }
+ } else {
+ // Otherwise, the transaction offset is stored in the pending
transactional
+ // offsets store. Pending offsets there are moved to the main
store when
+ // the transaction is committed; or removed when the
transaction is aborted.
+ Offsets pendingOffsets =
pendingTransactionalOffsets.computeIfAbsent(producerId, __ -> new Offsets());
+ pendingOffsets.put(
+ groupId,
+ topic,
+ partition,
+ OffsetAndMetadata.fromRecord(value)
+ );
+ }
} else {
- removeOffset(
- groupId,
- topic,
- partition
- );
+ if (offsets.remove(groupId, topic, partition) != null) {
+ metrics.decrementLocalGauge(NUM_OFFSETS);
+ }
}
}
@@ -703,70 +908,28 @@ public class OffsetMetadataManager {
*
* package-private for testing.
*/
- OffsetAndMetadata offset(String groupId, String topic, int partition) {
- TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>>
topicOffsets = offsetsByGroup.get(groupId);
- if (topicOffsets == null) {
- return null;
- } else {
- TimelineHashMap<Integer, OffsetAndMetadata> partitionOffsets =
topicOffsets.get(topic);
- if (partitionOffsets == null) {
- return null;
- } else {
- return partitionOffsets.get(partition);
- }
- }
- }
-
- /**
- * Updates the offset.
- *
- * @param groupId The group id.
- * @param topic The topic name.
- * @param partition The partition id.
- * @param offsetAndMetadata The offset metadata.
- */
- private void updateOffset(
+ OffsetAndMetadata offset(
String groupId,
String topic,
- int partition,
- OffsetAndMetadata offsetAndMetadata
+ int partition
) {
- TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>>
topicOffsets = offsetsByGroup
- .computeIfAbsent(groupId, __ -> new
TimelineHashMap<>(snapshotRegistry, 0));
- TimelineHashMap<Integer, OffsetAndMetadata> partitionOffsets =
topicOffsets
- .computeIfAbsent(topic, __ -> new
TimelineHashMap<>(snapshotRegistry, 0));
- if (partitionOffsets.put(partition, offsetAndMetadata) == null) {
- metrics.incrementLocalGauge(NUM_OFFSETS);
- }
+ return offsets.get(groupId, topic, partition);
}
/**
- * Removes the offset.
+ * @return The pending transactional offset for the provided parameters or
null
+ * if it does not exist.
*
- * @param groupId The group id.
- * @param topic The topic name.
- * @param partition The partition id.
+ * package-private for testing.
*/
- private void removeOffset(
+ OffsetAndMetadata pendingTransactionalOffset(
+ long producerId,
String groupId,
String topic,
int partition
) {
- TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>>
topicOffsets = offsetsByGroup.get(groupId);
- if (topicOffsets == null)
- return;
-
- TimelineHashMap<Integer, OffsetAndMetadata> partitionOffsets =
topicOffsets.get(topic);
- if (partitionOffsets == null)
- return;
-
- partitionOffsets.remove(partition);
- metrics.decrementLocalGauge(NUM_OFFSETS);
-
- if (partitionOffsets.isEmpty())
- topicOffsets.remove(topic);
-
- if (topicOffsets.isEmpty())
- offsetsByGroup.remove(groupId);
+ Offsets offsets = pendingTransactionalOffsets.get(producerId);
+ if (offsets == null) return null;
+ return offsets.get(groupId, topic, partition);
}
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
index 0fe5555ac31..3beaa5de1ef 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
@@ -653,12 +653,15 @@ public class ConsumerGroup implements Group {
* @param memberId The member id.
* @param groupInstanceId The group instance id.
* @param memberEpoch The member epoch.
+ * @param isTransactional Whether the offset commit is transactional or
not. It has no
+ * impact when a consumer group is used.
*/
@Override
public void validateOffsetCommit(
String memberId,
String groupInstanceId,
- int memberEpoch
+ int memberEpoch,
+ boolean isTransactional
) throws UnknownMemberIdException, StaleMemberEpochException {
// When the member epoch is -1, the request comes from either the
admin client
// or a consumer which does not use the group management facility. In
this case,
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
index 72de3e8fd52..8903ef93213 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
@@ -809,12 +809,14 @@ public class GenericGroup implements Group {
* @param memberId The member id.
* @param groupInstanceId The group instance id.
* @param generationId The generation id.
+ * @param isTransactional Whether the offset commit is transactional or
not.
*/
@Override
public void validateOffsetCommit(
String memberId,
String groupInstanceId,
- int generationId
+ int generationId,
+ boolean isTransactional
) throws CoordinatorNotAvailableException, UnknownMemberIdException,
IllegalGenerationException, FencedInstanceIdException {
if (isInState(DEAD)) {
throw Errors.COORDINATOR_NOT_AVAILABLE.exception();
@@ -828,22 +830,26 @@ public class GenericGroup implements Group {
}
if (generationId >= 0 || !memberId.isEmpty() || groupInstanceId !=
null) {
- validateMember(memberId, groupInstanceId, "offset-commit");
+ validateMember(memberId, groupInstanceId, isTransactional ?
"offset-commit" : "txn-offset-commit");
if (generationId != this.generationId) {
throw Errors.ILLEGAL_GENERATION.exception();
}
- } else if (!isInState(EMPTY)) {
+ } else if (!isTransactional && !isInState(EMPTY)) {
// If the request does not contain the member id and the
generation id (version 0),
// offset commits are only accepted when the group is empty.
+ // This does not apply to transactional offset commits, since the
older versions
+ // of this protocol do not require member id and generation id.
throw Errors.UNKNOWN_MEMBER_ID.exception();
}
- if (isInState(COMPLETING_REBALANCE)) {
+ if (!isTransactional && isInState(COMPLETING_REBALANCE)) {
// We should not receive a commit request if the group has not
completed rebalance;
// but since the consumer's member.id and generation is valid, it
means it has received
// the latest group generation information from the JoinResponse.
// So let's return a REBALANCE_IN_PROGRESS to let consumer handle
it gracefully.
+ // This does not apply to transactional offset commits, since the
group state
+ // is not enforced for those.
throw Errors.REBALANCE_IN_PROGRESS.exception();
}
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
index 4e0e2b2d573..bb813b4f700 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -51,6 +51,8 @@ import org.apache.kafka.common.message.LeaveGroupRequestData;
import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupResponseData;
+import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
+import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
import org.apache.kafka.common.network.ClientInformation;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.ApiKeys;
@@ -71,6 +73,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.NullSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentMatchers;
import org.mockito.internal.util.collections.Sets;
@@ -1753,4 +1756,127 @@ public class GroupCoordinatorServiceTest {
future.get()
);
}
+
+ @Test
+ public void testCommitTransactionalOffsetsWhenNotStarted() throws
ExecutionException, InterruptedException {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime,
+ new GroupCoordinatorMetrics()
+ );
+
+ TxnOffsetCommitRequestData request = new TxnOffsetCommitRequestData()
+ .setGroupId("foo")
+ .setTransactionalId("transactional-id")
+ .setMemberId("member-id")
+ .setGenerationId(10)
+ .setTopics(Collections.singletonList(new
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
+ .setName("topic")
+ .setPartitions(Collections.singletonList(new
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
+ .setPartitionIndex(0)
+ .setCommittedOffset(100)))));
+
+ CompletableFuture<TxnOffsetCommitResponseData> future =
service.commitTransactionalOffsets(
+ requestContext(ApiKeys.TXN_OFFSET_COMMIT),
+ request,
+ BufferSupplier.NO_CACHING
+ );
+
+ assertEquals(
+ new TxnOffsetCommitResponseData()
+ .setTopics(Collections.singletonList(new
TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic()
+ .setName("topic")
+ .setPartitions(Collections.singletonList(new
TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition()
+ .setPartitionIndex(0)
+
.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()))))),
+ future.get()
+ );
+ }
+
+ @ParameterizedTest
+ @NullSource
+ @ValueSource(strings = {""})
+ public void testCommitTransactionalOffsetsWithInvalidGroupId(String
groupId) throws ExecutionException, InterruptedException {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime,
+ new GroupCoordinatorMetrics()
+ );
+ service.startup(() -> 1);
+
+ TxnOffsetCommitRequestData request = new TxnOffsetCommitRequestData()
+ .setGroupId(groupId)
+ .setTransactionalId("transactional-id")
+ .setMemberId("member-id")
+ .setGenerationId(10)
+ .setTopics(Collections.singletonList(new
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
+ .setName("topic")
+ .setPartitions(Collections.singletonList(new
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
+ .setPartitionIndex(0)
+ .setCommittedOffset(100)))));
+
+ CompletableFuture<TxnOffsetCommitResponseData> future =
service.commitTransactionalOffsets(
+ requestContext(ApiKeys.TXN_OFFSET_COMMIT),
+ request,
+ BufferSupplier.NO_CACHING
+ );
+
+ assertEquals(
+ new TxnOffsetCommitResponseData()
+ .setTopics(Collections.singletonList(new
TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic()
+ .setName("topic")
+ .setPartitions(Collections.singletonList(new
TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition()
+ .setPartitionIndex(0)
+ .setErrorCode(Errors.INVALID_GROUP_ID.code()))))),
+ future.get()
+ );
+ }
+
+ @Test
+ public void testCommitTransactionalOffsets() throws ExecutionException,
InterruptedException {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime,
+ new GroupCoordinatorMetrics()
+ );
+ service.startup(() -> 1);
+
+ TxnOffsetCommitRequestData request = new TxnOffsetCommitRequestData()
+ .setGroupId("foo")
+ .setTransactionalId("transactional-id")
+ .setMemberId("member-id")
+ .setGenerationId(10)
+ .setTopics(Collections.singletonList(new
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
+ .setName("topic")
+ .setPartitions(Collections.singletonList(new
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
+ .setPartitionIndex(0)
+ .setCommittedOffset(100)))));
+
+ TxnOffsetCommitResponseData response = new
TxnOffsetCommitResponseData()
+ .setTopics(Collections.singletonList(new
TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic()
+ .setName("topic")
+ .setPartitions(Collections.singletonList(new
TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition()
+ .setPartitionIndex(0)
+ .setErrorCode(Errors.NONE.code())))));
+
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("txn-commit-offset"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(response));
+
+ CompletableFuture<TxnOffsetCommitResponseData> future =
service.commitTransactionalOffsets(
+ requestContext(ApiKeys.TXN_OFFSET_COMMIT),
+ request,
+ BufferSupplier.NO_CACHING
+ );
+
+ assertEquals(response, future.get());
+ }
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
index f15e30e11d6..9d0bccc012b 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
@@ -21,6 +21,8 @@ import
org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.message.DeleteGroupsResponseData;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
+import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.RecordBatch;
@@ -143,6 +145,38 @@ public class GroupCoordinatorShardTest {
assertEquals(result, coordinator.commitOffset(context, request));
}
+ @Test
+ public void testCommitTransactionalOffset() {
+ GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
+ OffsetMetadataManager offsetMetadataManager =
mock(OffsetMetadataManager.class);
+ CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
+ CoordinatorMetricsShard metricsShard =
mock(CoordinatorMetricsShard.class);
+ GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+ new LogContext(),
+ groupMetadataManager,
+ offsetMetadataManager,
+ Time.SYSTEM,
+ new MockCoordinatorTimer<>(new MockTime()),
+ mock(GroupCoordinatorConfig.class),
+ coordinatorMetrics,
+ metricsShard
+ );
+
+ RequestContext context = requestContext(ApiKeys.TXN_OFFSET_COMMIT);
+ TxnOffsetCommitRequestData request = new TxnOffsetCommitRequestData();
+ CoordinatorResult<TxnOffsetCommitResponseData, Record> result = new
CoordinatorResult<>(
+ Collections.emptyList(),
+ new TxnOffsetCommitResponseData()
+ );
+
+ when(offsetMetadataManager.commitTransactionalOffset(
+ context,
+ request
+ )).thenReturn(result);
+
+ assertEquals(result, coordinator.commitTransactionalOffset(context,
request));
+ }
+
@Test
public void testDeleteGroups() {
GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
@@ -298,7 +332,54 @@ public class GroupCoordinatorShardTest {
new ApiMessageAndVersion(value, (short) 0)
));
- verify(offsetMetadataManager, times(2)).replay(key, value);
+ verify(offsetMetadataManager, times(2)).replay(
+ RecordBatch.NO_PRODUCER_ID,
+ key,
+ value
+ );
+ }
+
+ @Test
+ public void testReplayTransactionalOffsetCommit() {
+ GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
+ OffsetMetadataManager offsetMetadataManager =
mock(OffsetMetadataManager.class);
+ CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
+ CoordinatorMetricsShard metricsShard =
mock(CoordinatorMetricsShard.class);
+ GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+ new LogContext(),
+ groupMetadataManager,
+ offsetMetadataManager,
+ Time.SYSTEM,
+ new MockCoordinatorTimer<>(new MockTime()),
+ mock(GroupCoordinatorConfig.class),
+ coordinatorMetrics,
+ metricsShard
+ );
+
+ OffsetCommitKey key = new OffsetCommitKey();
+ OffsetCommitValue value = new OffsetCommitValue();
+
+ coordinator.replay(100L, (short) 0, new Record(
+ new ApiMessageAndVersion(key, (short) 0),
+ new ApiMessageAndVersion(value, (short) 0)
+ ));
+
+ coordinator.replay(101L, (short) 1, new Record(
+ new ApiMessageAndVersion(key, (short) 1),
+ new ApiMessageAndVersion(value, (short) 0)
+ ));
+
+ verify(offsetMetadataManager, times(1)).replay(
+ 100L,
+ key,
+ value
+ );
+
+ verify(offsetMetadataManager, times(1)).replay(
+ 101L,
+ key,
+ value
+ );
}
@Test
@@ -330,7 +411,11 @@ public class GroupCoordinatorShardTest {
null
));
- verify(offsetMetadataManager, times(2)).replay(key, null);
+ verify(offsetMetadataManager, times(2)).replay(
+ RecordBatch.NO_PRODUCER_ID,
+ key,
+ null
+ );
}
@Test
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetAndMetadataTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetAndMetadataTest.java
index 946fa701c61..46fe369974b 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetAndMetadataTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetAndMetadataTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.coordinator.group;
import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
import org.apache.kafka.server.util.MockTime;
import org.junit.jupiter.api.Test;
@@ -133,4 +134,46 @@ public class OffsetAndMetadataTest {
)
);
}
+
+ @Test
+ public void testFromTransactionalRequest() {
+ MockTime time = new MockTime();
+
+ TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition partition =
+ new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
+ .setPartitionIndex(0)
+ .setCommittedOffset(100L)
+ .setCommittedLeaderEpoch(-1)
+ .setCommittedMetadata(null);
+
+ assertEquals(
+ new OffsetAndMetadata(
+ 100L,
+ OptionalInt.empty(),
+ "",
+ time.milliseconds(),
+ OptionalLong.empty()
+ ), OffsetAndMetadata.fromRequest(
+ partition,
+ time.milliseconds()
+ )
+ );
+
+ partition
+ .setCommittedLeaderEpoch(10)
+ .setCommittedMetadata("hello");
+
+ assertEquals(
+ new OffsetAndMetadata(
+ 100L,
+ OptionalInt.of(10),
+ "hello",
+ time.milliseconds(),
+ OptionalLong.empty()
+ ), OffsetAndMetadata.fromRequest(
+ partition,
+ time.milliseconds()
+ )
+ );
+ }
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
index 792d1f62507..bf1ef890fff 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
@@ -31,11 +31,14 @@ import
org.apache.kafka.common.message.OffsetDeleteRequestData;
import org.apache.kafka.common.message.OffsetDeleteResponseData;
import org.apache.kafka.common.message.OffsetFetchRequestData;
import org.apache.kafka.common.message.OffsetFetchResponseData;
+import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
+import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
import org.apache.kafka.common.network.ClientInformation;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
@@ -187,8 +190,6 @@ public class OffsetMetadataManagerTest {
short version,
OffsetCommitRequestData request
) {
- snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset);
-
RequestContext context = new RequestContext(
new RequestHeader(
ApiKeys.OFFSET_COMMIT,
@@ -214,6 +215,38 @@ public class OffsetMetadataManagerTest {
return result;
}
+ public CoordinatorResult<TxnOffsetCommitResponseData, Record>
commitTransactionalOffset(
+ TxnOffsetCommitRequestData request
+ ) {
+ RequestContext context = new RequestContext(
+ new RequestHeader(
+ ApiKeys.TXN_OFFSET_COMMIT,
+ ApiKeys.TXN_OFFSET_COMMIT.latestVersion(),
+ "client",
+ 0
+ ),
+ "1",
+ InetAddress.getLoopbackAddress(),
+ KafkaPrincipal.ANONYMOUS,
+ ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
+ SecurityProtocol.PLAINTEXT,
+ ClientInformation.EMPTY,
+ false
+ );
+
+ CoordinatorResult<TxnOffsetCommitResponseData, Record> result =
offsetMetadataManager.commitTransactionalOffset(
+ context,
+ request
+ );
+
+ result.records().forEach(record -> replay(
+ request.producerId(),
+ record
+ ));
+
+ return result;
+ }
+
public CoordinatorResult<OffsetDeleteResponseData, Record>
deleteOffsets(
OffsetDeleteRequestData request
) {
@@ -372,6 +405,16 @@ public class OffsetMetadataManagerTest {
private void replay(
Record record
+ ) {
+ replay(
+ RecordBatch.NO_PRODUCER_ID,
+ record
+ );
+ }
+
+ private void replay(
+ long producerId,
+ Record record
) {
snapshotRegistry.getOrCreateSnapshot(lastWrittenOffset);
@@ -385,6 +428,7 @@ public class OffsetMetadataManagerTest {
switch (key.version()) {
case OffsetCommitKey.HIGHEST_SUPPORTED_VERSION:
offsetMetadataManager.replay(
+ producerId,
(OffsetCommitKey) key.message(),
(OffsetCommitValue) messageOrNull(value)
);
@@ -1276,6 +1320,324 @@ public class OffsetMetadataManagerTest {
);
}
+ @Test
+ public void testConsumerGroupTransactionalOffsetCommit() {
+ OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
+
+ // Create an empty group.
+ ConsumerGroup group =
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(
+ "foo",
+ true
+ );
+
+ // Add member.
+ group.updateMember(new ConsumerGroupMember.Builder("member")
+ .setMemberEpoch(10)
+ .setTargetMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .build()
+ );
+
+ CoordinatorResult<TxnOffsetCommitResponseData, Record> result =
context.commitTransactionalOffset(
+ new TxnOffsetCommitRequestData()
+ .setGroupId("foo")
+ .setMemberId("member")
+ .setGenerationId(10)
+ .setTopics(Collections.singletonList(
+ new
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
+ .setName("bar")
+ .setPartitions(Collections.singletonList(
+ new
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
+ .setPartitionIndex(0)
+ .setCommittedOffset(100L)
+ .setCommittedLeaderEpoch(10)
+ .setCommittedMetadata("metadata")
+ ))
+ ))
+ );
+
+ assertEquals(
+ new TxnOffsetCommitResponseData()
+ .setTopics(Collections.singletonList(
+ new
TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic()
+ .setName("bar")
+ .setPartitions(Collections.singletonList(
+ new
TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition()
+ .setPartitionIndex(0)
+ .setErrorCode(Errors.NONE.code())
+ ))
+ )),
+ result.response()
+ );
+
+ assertEquals(
+ Collections.singletonList(RecordHelpers.newOffsetCommitRecord(
+ "foo",
+ "bar",
+ 0,
+ new OffsetAndMetadata(
+ 100L,
+ OptionalInt.of(10),
+ "metadata",
+ context.time.milliseconds(),
+ OptionalLong.empty()
+ ),
+ MetadataImage.EMPTY.features().metadataVersion()
+ )),
+ result.records()
+ );
+ }
+
+ @Test
+ public void testConsumerGroupTransactionalOffsetCommitWithUnknownGroupId()
{
+ OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
+
+ assertThrows(IllegalGenerationException.class, () ->
context.commitTransactionalOffset(
+ new TxnOffsetCommitRequestData()
+ .setGroupId("foo")
+ .setMemberId("member")
+ .setGenerationId(10)
+ .setTopics(Collections.singletonList(
+ new
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
+ .setName("bar")
+ .setPartitions(Collections.singletonList(
+ new
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
+ .setPartitionIndex(0)
+ .setCommittedOffset(100L)
+ .setCommittedLeaderEpoch(10)
+ .setCommittedMetadata("metadata")
+ ))
+ ))
+ ));
+ }
+
+ @Test
+ public void
testConsumerGroupTransactionalOffsetCommitWithUnknownMemberId() {
+ OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
+
+ // Create an empty group.
+ context.groupMetadataManager.getOrMaybeCreateConsumerGroup(
+ "foo",
+ true
+ );
+
+ assertThrows(UnknownMemberIdException.class, () ->
context.commitTransactionalOffset(
+ new TxnOffsetCommitRequestData()
+ .setGroupId("foo")
+ .setMemberId("member")
+ .setGenerationId(10)
+ .setTopics(Collections.singletonList(
+ new
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
+ .setName("bar")
+ .setPartitions(Collections.singletonList(
+ new
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
+ .setPartitionIndex(0)
+ .setCommittedOffset(100L)
+ .setCommittedLeaderEpoch(10)
+ .setCommittedMetadata("metadata")
+ ))
+ ))
+ ));
+ }
+
+ @Test
+ public void
testConsumerGroupTransactionalOffsetCommitWithStaleMemberEpoch() {
+ OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
+
+ // Create an empty group.
+ ConsumerGroup group =
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(
+ "foo",
+ true
+ );
+
+ // Add member.
+ group.updateMember(new ConsumerGroupMember.Builder("member")
+ .setMemberEpoch(10)
+ .setTargetMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .build()
+ );
+
+ assertThrows(IllegalGenerationException.class, () ->
context.commitTransactionalOffset(
+ new TxnOffsetCommitRequestData()
+ .setGroupId("foo")
+ .setMemberId("member")
+ .setGenerationId(100)
+ .setTopics(Collections.singletonList(
+ new
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
+ .setName("bar")
+ .setPartitions(Collections.singletonList(
+ new
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
+ .setPartitionIndex(0)
+ .setCommittedOffset(100L)
+ .setCommittedLeaderEpoch(10)
+ .setCommittedMetadata("metadata")
+ ))
+ ))
+ ));
+ }
+
+ @Test
+ public void testGenericGroupTransactionalOffsetCommit() {
+ OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
+
+ // Create a group.
+ GenericGroup group =
context.groupMetadataManager.getOrMaybeCreateGenericGroup(
+ "foo",
+ true
+ );
+
+ // Add member.
+ GenericGroupMember member = mkGenericMember("member",
Optional.empty());
+ group.add(member);
+
+ // Transition to next generation.
+ group.transitionTo(GenericGroupState.PREPARING_REBALANCE);
+ group.initNextGeneration();
+ assertEquals(1, group.generationId());
+ group.transitionTo(GenericGroupState.STABLE);
+
+ CoordinatorResult<TxnOffsetCommitResponseData, Record> result =
context.commitTransactionalOffset(
+ new TxnOffsetCommitRequestData()
+ .setGroupId("foo")
+ .setMemberId("member")
+ .setGenerationId(1)
+ .setTopics(Collections.singletonList(
+ new
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
+ .setName("bar")
+ .setPartitions(Collections.singletonList(
+ new
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
+ .setPartitionIndex(0)
+ .setCommittedOffset(100L)
+ .setCommittedLeaderEpoch(10)
+ .setCommittedMetadata("metadata")
+ ))
+ ))
+ );
+
+ assertEquals(
+ new TxnOffsetCommitResponseData()
+ .setTopics(Collections.singletonList(
+ new
TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic()
+ .setName("bar")
+ .setPartitions(Collections.singletonList(
+ new
TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition()
+ .setPartitionIndex(0)
+ .setErrorCode(Errors.NONE.code())
+ ))
+ )),
+ result.response()
+ );
+
+ assertEquals(
+ Collections.singletonList(RecordHelpers.newOffsetCommitRecord(
+ "foo",
+ "bar",
+ 0,
+ new OffsetAndMetadata(
+ 100L,
+ OptionalInt.of(10),
+ "metadata",
+ context.time.milliseconds(),
+ OptionalLong.empty()
+ ),
+ MetadataImage.EMPTY.features().metadataVersion()
+ )),
+ result.records()
+ );
+ }
+
+ @Test
+ public void testGenericGroupTransactionalOffsetCommitWithUnknownGroupId() {
+ OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
+
+ assertThrows(IllegalGenerationException.class, () ->
context.commitTransactionalOffset(
+ new TxnOffsetCommitRequestData()
+ .setGroupId("foo")
+ .setMemberId("member")
+ .setGenerationId(10)
+ .setTopics(Collections.singletonList(
+ new
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
+ .setName("bar")
+ .setPartitions(Collections.singletonList(
+ new
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
+ .setPartitionIndex(0)
+ .setCommittedOffset(100L)
+ .setCommittedLeaderEpoch(10)
+ .setCommittedMetadata("metadata")
+ ))
+ ))
+ ));
+ }
+
+ @Test
+ public void testGenericGroupTransactionalOffsetCommitWithUnknownMemberId()
{
+ OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
+
+ // Create an empty group.
+ context.groupMetadataManager.getOrMaybeCreateGenericGroup(
+ "foo",
+ true
+ );
+
+ assertThrows(UnknownMemberIdException.class, () ->
context.commitTransactionalOffset(
+ new TxnOffsetCommitRequestData()
+ .setGroupId("foo")
+ .setMemberId("member")
+ .setGenerationId(10)
+ .setTopics(Collections.singletonList(
+ new
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
+ .setName("bar")
+ .setPartitions(Collections.singletonList(
+ new
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
+ .setPartitionIndex(0)
+ .setCommittedOffset(100L)
+ .setCommittedLeaderEpoch(10)
+ .setCommittedMetadata("metadata")
+ ))
+ ))
+ ));
+ }
+
+ @Test
+ public void
testGenericGroupTransactionalOffsetCommitWithIllegalGenerationId() {
+ OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
+
+ // Create a group.
+ GenericGroup group =
context.groupMetadataManager.getOrMaybeCreateGenericGroup(
+ "foo",
+ true
+ );
+
+ // Add member.
+ GenericGroupMember member = mkGenericMember("member",
Optional.empty());
+ group.add(member);
+
+ // Transition to next generation.
+ group.transitionTo(GenericGroupState.PREPARING_REBALANCE);
+ group.initNextGeneration();
+ assertEquals(1, group.generationId());
+ group.transitionTo(GenericGroupState.STABLE);
+
+ assertThrows(IllegalGenerationException.class, () ->
context.commitTransactionalOffset(
+ new TxnOffsetCommitRequestData()
+ .setGroupId("foo")
+ .setMemberId("member")
+ .setGenerationId(100)
+ .setTopics(Collections.singletonList(
+ new
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
+ .setName("bar")
+ .setPartitions(Collections.singletonList(
+ new
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
+ .setPartitionIndex(0)
+ .setCommittedOffset(100L)
+ .setCommittedLeaderEpoch(10)
+ .setCommittedMetadata("metadata")
+ ))
+ ))
+ ));
+ }
+
@Test
public void testGenericGroupFetchOffsetsWithDeadGroup() {
OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
@@ -1975,6 +2337,59 @@ public class OffsetMetadataManagerTest {
));
}
+ @Test
+ public void testTransactionalReplay() {
+ OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
+
+ verifyTransactionalReplay(context, 5, "foo", "bar", 0, new
OffsetAndMetadata(
+ 100L,
+ OptionalInt.empty(),
+ "small",
+ context.time.milliseconds(),
+ OptionalLong.empty()
+ ));
+
+ verifyTransactionalReplay(context, 5, "foo", "bar", 1, new
OffsetAndMetadata(
+ 101L,
+ OptionalInt.empty(),
+ "small",
+ context.time.milliseconds(),
+ OptionalLong.empty()
+ ));
+
+ verifyTransactionalReplay(context, 5, "bar", "zar", 0, new
OffsetAndMetadata(
+ 100L,
+ OptionalInt.empty(),
+ "small",
+ context.time.milliseconds(),
+ OptionalLong.empty()
+ ));
+
+ verifyTransactionalReplay(context, 5, "bar", "zar", 1, new
OffsetAndMetadata(
+ 101L,
+ OptionalInt.empty(),
+ "small",
+ context.time.milliseconds(),
+ OptionalLong.empty()
+ ));
+
+ verifyTransactionalReplay(context, 6, "foo", "bar", 2, new
OffsetAndMetadata(
+ 102L,
+ OptionalInt.empty(),
+ "small",
+ context.time.milliseconds(),
+ OptionalLong.empty()
+ ));
+
+ verifyTransactionalReplay(context, 6, "foo", "bar", 3, new
OffsetAndMetadata(
+ 102L,
+ OptionalInt.empty(),
+ "small",
+ context.time.milliseconds(),
+ OptionalLong.empty()
+ ));
+ }
+
@Test
public void testReplayWithTombstone() {
OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
@@ -2143,6 +2558,30 @@ public class OffsetMetadataManagerTest {
));
}
+ private void verifyTransactionalReplay(
+ OffsetMetadataManagerTestContext context,
+ long producerId,
+ String groupId,
+ String topic,
+ int partition,
+ OffsetAndMetadata offsetAndMetadata
+ ) {
+ context.replay(producerId, RecordHelpers.newOffsetCommitRecord(
+ groupId,
+ topic,
+ partition,
+ offsetAndMetadata,
+ MetadataImage.EMPTY.features().metadataVersion()
+ ));
+
+ assertEquals(offsetAndMetadata,
context.offsetMetadataManager.pendingTransactionalOffset(
+ producerId,
+ groupId,
+ topic,
+ partition
+ ));
+ }
+
private GenericGroupMember mkGenericMember(
String memberId,
Optional<String> groupInstanceId
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
index f54ad25e321..7263181322f 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
@@ -32,6 +32,8 @@ import
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.util.Arrays;
import java.util.Collections;
@@ -670,31 +672,32 @@ public class ConsumerGroupTest {
assertEquals(0, group.metadataRefreshDeadline().epoch);
}
- @Test
- public void testValidateOffsetCommit() {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testValidateOffsetCommit(boolean isTransactional) {
ConsumerGroup group = createConsumerGroup("group-foo");
// Simulate a call from the admin client without member id and member
epoch.
// This should pass only if the group is empty.
- group.validateOffsetCommit("", "", -1);
+ group.validateOffsetCommit("", "", -1, isTransactional);
// The member does not exist.
assertThrows(UnknownMemberIdException.class, () ->
- group.validateOffsetCommit("member-id", null, 0));
+ group.validateOffsetCommit("member-id", null, 0, isTransactional));
// Create a member.
group.getOrMaybeCreateMember("member-id", true);
// A call from the admin client should fail as the group is not empty.
assertThrows(UnknownMemberIdException.class, () ->
- group.validateOffsetCommit("", "", -1));
+ group.validateOffsetCommit("", "", -1, isTransactional));
// The member epoch is stale.
assertThrows(StaleMemberEpochException.class, () ->
- group.validateOffsetCommit("member-id", "", 10));
+ group.validateOffsetCommit("member-id", "", 10, isTransactional));
// This should succeed.
- group.validateOffsetCommit("member-id", "", 0);
+ group.validateOffsetCommit("member-id", "", 0, isTransactional);
}
@Test
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java
index e5e71aaf2ab..6aa36541a5b 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java
@@ -984,7 +984,7 @@ public class GenericGroupTest {
@Test
public void testValidateOffsetCommit() {
// A call from the admin client without any parameters should pass.
- group.validateOffsetCommit("", "", -1);
+ group.validateOffsetCommit("", "", -1, false);
// Add a member.
group.add(new GenericGroupMember(
@@ -1006,36 +1006,40 @@ public class GenericGroupTest {
// No parameters and the group is not empty.
assertThrows(UnknownMemberIdException.class,
- () -> group.validateOffsetCommit("", "", -1));
+ () -> group.validateOffsetCommit("", "", -1, false));
+
+ // A transactional offset commit without any parameters
+ // and a non-empty group is accepted.
+ group.validateOffsetCommit("", null, -1, true);
// The member id does not exist.
assertThrows(UnknownMemberIdException.class,
- () -> group.validateOffsetCommit("unknown", "unknown", -1));
+ () -> group.validateOffsetCommit("unknown", "unknown", -1, false));
// The instance id does not exist.
assertThrows(UnknownMemberIdException.class,
- () -> group.validateOffsetCommit("member-id", "unknown", -1));
+ () -> group.validateOffsetCommit("member-id", "unknown", -1,
false));
// The generation id is invalid.
assertThrows(IllegalGenerationException.class,
- () -> group.validateOffsetCommit("member-id", "instance-id", 0));
+ () -> group.validateOffsetCommit("member-id", "instance-id", 0,
false));
// Group is in prepare rebalance state.
assertThrows(RebalanceInProgressException.class,
- () -> group.validateOffsetCommit("member-id", "instance-id", 1));
+ () -> group.validateOffsetCommit("member-id", "instance-id", 1,
false));
// Group transitions to stable.
group.transitionTo(STABLE);
// This should work.
- group.validateOffsetCommit("member-id", "instance-id", 1);
+ group.validateOffsetCommit("member-id", "instance-id", 1, false);
// Replace static member.
group.replaceStaticMember("instance-id", "member-id", "new-member-id");
// The old instance id should be fenced.
assertThrows(FencedInstanceIdException.class,
- () -> group.validateOffsetCommit("member-id", "instance-id", 1));
+ () -> group.validateOffsetCommit("member-id", "instance-id", 1,
false));
// Remove member and transitions to dead.
group.remove("new-instance-id");
@@ -1043,7 +1047,7 @@ public class GenericGroupTest {
// This should fail with CoordinatorNotAvailableException.
assertThrows(CoordinatorNotAvailableException.class,
- () -> group.validateOffsetCommit("member-id", "new-instance-id",
1));
+ () -> group.validateOffsetCommit("member-id", "new-instance-id",
1, false));
}
@Test