This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new eb3714f022c KAFKA-19160;KAFKA-19164; Improve performance of fetching
stable offsets (#19497)
eb3714f022c is described below
commit eb3714f022c948e9348b7e281c4bd69a38666013
Author: Sean Quah <[email protected]>
AuthorDate: Mon May 12 08:32:17 2025 +0100
KAFKA-19160;KAFKA-19164; Improve performance of fetching stable offsets
(#19497)
When fetching stable offsets in the group coordinator, we iterate over
all requested partitions. For each partition, we iterate over the
group's ongoing transactions to check if there is a pending
transactional offset commit for that partition.
This can get slow when there are a large number of partitions and a
large number of pending transactions. Instead, maintain a list of
pending transactions per partition to speed up lookups.
Reviewers: Shaan, Dongnuo Lyu <[email protected]>, Chia-Ping Tsai
<[email protected]>, David Jaco <[email protected]>
---
.../coordinator/group/OffsetMetadataManager.java | 278 ++++++++++++++++-----
.../TransactionalOffsetFetchBenchmark.java | 142 +++++++++++
2 files changed, 355 insertions(+), 65 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
index 0fa997c557a..54de475d4bf 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
@@ -62,6 +62,7 @@ import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
import java.util.function.Consumer;
import static
org.apache.kafka.common.requests.OffsetFetchResponse.INVALID_OFFSET;
@@ -88,37 +89,37 @@ public class OffsetMetadataManager {
private GroupCoordinatorConfig config = null;
private GroupCoordinatorMetricsShard metrics = null;
- Builder withLogContext(LogContext logContext) {
+ public Builder withLogContext(LogContext logContext) {
this.logContext = logContext;
return this;
}
- Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+ public Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry)
{
this.snapshotRegistry = snapshotRegistry;
return this;
}
- Builder withTime(Time time) {
+ public Builder withTime(Time time) {
this.time = time;
return this;
}
- Builder withGroupMetadataManager(GroupMetadataManager
groupMetadataManager) {
+ public Builder withGroupMetadataManager(GroupMetadataManager
groupMetadataManager) {
this.groupMetadataManager = groupMetadataManager;
return this;
}
- Builder withGroupCoordinatorConfig(GroupCoordinatorConfig config) {
+ public Builder withGroupCoordinatorConfig(GroupCoordinatorConfig
config) {
this.config = config;
return this;
}
- Builder withMetadataImage(MetadataImage metadataImage) {
+ public Builder withMetadataImage(MetadataImage metadataImage) {
this.metadataImage = metadataImage;
return this;
}
- Builder withGroupCoordinatorMetricsShard(GroupCoordinatorMetricsShard
metrics) {
+ public Builder
withGroupCoordinatorMetricsShard(GroupCoordinatorMetricsShard metrics) {
this.metrics = metrics;
return this;
}
@@ -193,9 +194,167 @@ public class OffsetMetadataManager {
private final TimelineHashMap<Long, Offsets> pendingTransactionalOffsets;
/**
- * The open transactions (producer ids) keyed by group.
+ * The open transactions (producer ids) by group id, topic name and
partition id.
*/
- private final TimelineHashMap<String, TimelineHashSet<Long>>
openTransactionsByGroup;
+ private final OpenTransactions openTransactions;
+
+ /**
+ * Tracks open transactions (producer ids) by group id, topic name and
partition id.
+ * It is the responsiblity of the caller to update {@link
#pendingTransactionalOffsets}.
+ */
+ private class OpenTransactions {
+ /**
+ * The open transactions (producer ids) keyed by group id, topic name
and partition id.
+ * Tracks whether partitions have any pending transactional offsets
that have not been deleted.
+ *
+ * Values in each level of the map will never be empty collections.
+ */
+ private final TimelineHashMap<String, TimelineHashMap<String,
TimelineHashMap<Integer, TimelineHashSet<Long>>>> openTransactionsByGroup;
+
+ private OpenTransactions() {
+ this.openTransactionsByGroup = new
TimelineHashMap<>(snapshotRegistry, 0);
+ }
+
+ /**
+ * Adds a producer id to the open transactions for the given group and
topic partition.
+ *
+ * @param groupId The group id.
+ * @param topic The topic name.
+ * @param partition The partition.
+ * @param producerId The producer id.
+ * @return {@code true} if the partition did not already have a
pending offset from the producer id.
+ */
+ private boolean add(String groupId, String topic, int partition, long
producerId) {
+ return openTransactionsByGroup
+ .computeIfAbsent(groupId, __ -> new
TimelineHashMap<>(snapshotRegistry, 1))
+ .computeIfAbsent(topic, __ -> new
TimelineHashMap<>(snapshotRegistry, 1))
+ .computeIfAbsent(partition, __ -> new
TimelineHashSet<>(snapshotRegistry, 1))
+ .add(producerId);
+ }
+
+ /**
+ * Clears all open transactions for the given group and topic
partition.
+ *
+ * @param groupId The group id.
+ * @param topic The topic name.
+ * @param partition The partition.
+ */
+ private void clear(String groupId, String topic, int partition) {
+ TimelineHashMap<String, TimelineHashMap<Integer,
TimelineHashSet<Long>>> openTransactionsByTopic =
+ openTransactionsByGroup.get(groupId);
+ if (openTransactionsByTopic == null) return;
+
+ TimelineHashMap<Integer, TimelineHashSet<Long>>
openTransactionsByPartition = openTransactionsByTopic.get(topic);
+ if (openTransactionsByPartition == null) return;
+
+ openTransactionsByPartition.remove(partition);
+
+ if (openTransactionsByPartition.isEmpty()) {
+ openTransactionsByTopic.remove(topic);
+ if (openTransactionsByTopic.isEmpty()) {
+ openTransactionsByGroup.remove(groupId);
+ }
+ }
+ }
+
+ /**
+ * Returns {@code true} if the given group has any pending
transactional offsets.
+ *
+ * @param groupId The group id.
+ * @return {@code true} if the given group has any pending
transactional offsets.
+ */
+ private boolean contains(String groupId) {
+ return openTransactionsByGroup.containsKey(groupId);
+ }
+
+ /**
+ * Returns {@code true} if the given group has any pending
transactional offsets for the given topic and partition.
+ *
+ * @param groupId The group id.
+ * @param topic The topic name.
+ * @param partition The partition.
+ * @return {@code true} if the given group has any pending
transactional offsets for the given topic and partition.
+ */
+ private boolean contains(String groupId, String topic, int partition) {
+ TimelineHashSet<Long> openTransactions = get(groupId, topic,
partition);
+ return openTransactions != null;
+ }
+
+ /**
+ * Performs the given action for each partition with a pending
transactional offset for the given group.
+ *
+ * @param groupId The group id.
+ * @param action The action to be performed for each partition with a
pending transactional offset.
+ */
+ private void forEachTopicPartition(String groupId, BiConsumer<String,
Integer> action) {
+ TimelineHashMap<String, TimelineHashMap<Integer,
TimelineHashSet<Long>>> openTransactionsByTopic =
+ openTransactionsByGroup.get(groupId);
+ if (openTransactionsByTopic == null) return;
+
+ openTransactionsByTopic.forEach((topic,
openTransactionsByPartition) -> {
+ openTransactionsByPartition.forEach((partition, producerIds)
-> {
+ action.accept(topic, partition);
+ });
+ });
+ }
+
+ /**
+ * Performs the given action for each producer id with a pending
transactional offset for the given group and topic partition.
+ *
+ * @param groupId The group id.
+ * @param topic The topic name.
+ * @param partition The partition.
+ * @param action The action to be performed for each producer id
with a pending transactional offset.
+ */
+ private void forEach(String groupId, String topic, int partition,
Consumer<Long> action) {
+ TimelineHashSet<Long> openTransactions = get(groupId, topic,
partition);
+ if (openTransactions == null) return;
+
+ openTransactions.forEach(action);
+ }
+
+ /**
+ * Gets the set of producer ids with pending transactional offsets for
the given group and topic partition.
+ *
+ * @param groupId The group id.
+ * @param topic The topic name.
+ * @param partition The partition.
+ * @return The set of producer ids with pending transactional offsets
for the given group and topic partition.
+ */
+ private TimelineHashSet<Long> get(String groupId, String topic, int
partition) {
+ TimelineHashMap<String, TimelineHashMap<Integer,
TimelineHashSet<Long>>> openTransactionsByTopic =
+ openTransactionsByGroup.get(groupId);
+ if (openTransactionsByTopic == null) return null;
+
+ TimelineHashMap<Integer, TimelineHashSet<Long>>
openTransactionsByPartition = openTransactionsByTopic.get(topic);
+ if (openTransactionsByPartition == null) return null;
+
+ return openTransactionsByPartition.get(partition);
+ }
+
+ /**
+ * Removes a producer id from the open transactions for the given
group and topic partition.
+ *
+ * @param groupId The group id.
+ * @param topic The topic name.
+ * @param partition The partition.
+ * @param producerId The producer id.
+ * @return {@code true} if the group and topic partition had a pending
transactional offset from the producer id.
+ */
+ private boolean remove(String groupId, String topic, int partition,
long producerId) {
+ TimelineHashSet<Long> openTransactions = get(groupId, topic,
partition);
+ if (openTransactions == null) return false;
+
+ boolean removed = openTransactions.remove(producerId);
+
+ if (openTransactions.isEmpty()) {
+ // Re-use the clean up in clear.
+ clear(groupId, topic, partition);
+ }
+
+ return removed;
+ }
+ }
private class Offsets {
/**
@@ -280,7 +439,7 @@ public class OffsetMetadataManager {
this.metrics = metrics;
this.offsets = new Offsets();
this.pendingTransactionalOffsets = new
TimelineHashMap<>(snapshotRegistry, 0);
- this.openTransactionsByGroup = new TimelineHashMap<>(snapshotRegistry,
0);
+ this.openTransactions = new OpenTransactions();
}
/**
@@ -652,26 +811,12 @@ public class OffsetMetadataManager {
// Delete all the pending transactional offsets too. Here we only
write a tombstone
// if the topic-partition was not in the main storage because we don't
need to write
// two consecutive tombstones.
- TimelineHashSet<Long> openTransactions =
openTransactionsByGroup.get(groupId);
- if (openTransactions != null) {
- openTransactions.forEach(producerId -> {
- Offsets pendingOffsets =
pendingTransactionalOffsets.get(producerId);
- if (pendingOffsets != null) {
- TimelineHashMap<String, TimelineHashMap<Integer,
OffsetAndMetadata>> pendingGroupOffsets =
- pendingOffsets.offsetsByGroup.get(groupId);
- if (pendingGroupOffsets != null) {
- pendingGroupOffsets.forEach((topic,
offsetsByPartition) -> {
- offsetsByPartition.keySet().forEach(partition -> {
- if (!hasCommittedOffset(groupId, topic,
partition)) {
-
records.add(GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord(groupId,
topic, partition));
- numDeletedOffsets.getAndIncrement();
- }
- });
- });
- }
- }
- });
- }
+ openTransactions.forEachTopicPartition(groupId, (topic, partition) -> {
+ if (!hasCommittedOffset(groupId, topic, partition)) {
+
records.add(GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord(groupId,
topic, partition));
+ numDeletedOffsets.getAndIncrement();
+ }
+ });
return numDeletedOffsets.get();
}
@@ -687,17 +832,7 @@ public class OffsetMetadataManager {
String topic,
int partition
) {
- final TimelineHashSet<Long> openTransactions =
openTransactionsByGroup.get(groupId);
- if (openTransactions == null) return false;
-
- for (Long producerId : openTransactions) {
- Offsets offsets = pendingTransactionalOffsets.get(producerId);
- if (offsets != null && offsets.get(groupId, topic, partition) !=
null) {
- return true;
- }
- }
-
- return false;
+ return openTransactions.contains(groupId, topic, partition);
}
/**
@@ -738,6 +873,11 @@ public class OffsetMetadataManager {
final List<OffsetFetchResponseData.OffsetFetchResponseTopics>
topicResponses = new ArrayList<>(request.topics().size());
final TimelineHashMap<String, TimelineHashMap<Integer,
OffsetAndMetadata>> groupOffsets =
failAllPartitions ? null :
offsets.offsetsByGroup.get(request.groupId(), lastCommittedOffset);
+ // We inline the lookups from hasPendingTransactionalOffsets here, to
avoid repeating string
+ // comparisons of group ids and topic names for every partition.
They're only used when the
+ // client has requested stable offsets.
+ final TimelineHashMap<String, TimelineHashMap<Integer,
TimelineHashSet<Long>>> openTransactionsByTopic =
+ requireStable ?
openTransactions.openTransactionsByGroup.get(request.groupId(),
lastCommittedOffset) : null;
request.topics().forEach(topic -> {
final OffsetFetchResponseData.OffsetFetchResponseTopics
topicResponse =
@@ -746,12 +886,16 @@ public class OffsetMetadataManager {
final TimelineHashMap<Integer, OffsetAndMetadata> topicOffsets =
groupOffsets == null ?
null : groupOffsets.get(topic.name(), lastCommittedOffset);
+ final TimelineHashMap<Integer, TimelineHashSet<Long>>
openTransactionsByPartition =
+ (requireStable && openTransactionsByTopic != null) ?
openTransactionsByTopic.get(topic.name(), lastCommittedOffset) : null;
topic.partitionIndexes().forEach(partitionIndex -> {
final OffsetAndMetadata offsetAndMetadata = topicOffsets ==
null ?
null : topicOffsets.get(partitionIndex,
lastCommittedOffset);
- if (requireStable &&
hasPendingTransactionalOffsets(request.groupId(), topic.name(),
partitionIndex)) {
+ if (requireStable &&
+ openTransactionsByPartition != null &&
+ openTransactionsByPartition.containsKey(partitionIndex,
lastCommittedOffset)) {
topicResponse.partitions().add(new
OffsetFetchResponseData.OffsetFetchResponsePartitions()
.setPartitionIndex(partitionIndex)
.setErrorCode(Errors.UNSTABLE_OFFSET_COMMIT.code())
@@ -804,11 +948,18 @@ public class OffsetMetadataManager {
final List<OffsetFetchResponseData.OffsetFetchResponseTopics>
topicResponses = new ArrayList<>();
final TimelineHashMap<String, TimelineHashMap<Integer,
OffsetAndMetadata>> groupOffsets =
offsets.offsetsByGroup.get(request.groupId(), lastCommittedOffset);
+ // We inline the lookups from hasPendingTransactionalOffsets here, to
avoid repeating string
+ // comparisons of group ids and topic names for every partition.
They're only used when the
+ // client has requested stable offsets.
+ final TimelineHashMap<String, TimelineHashMap<Integer,
TimelineHashSet<Long>>> openTransactionsByTopic =
+ requireStable ?
openTransactions.openTransactionsByGroup.get(request.groupId(),
lastCommittedOffset) : null;
if (groupOffsets != null) {
groupOffsets.entrySet(lastCommittedOffset).forEach(topicEntry -> {
final String topic = topicEntry.getKey();
final TimelineHashMap<Integer, OffsetAndMetadata> topicOffsets
= topicEntry.getValue();
+ final TimelineHashMap<Integer, TimelineHashSet<Long>>
openTransactionsByPartition =
+ (requireStable && openTransactionsByTopic != null) ?
openTransactionsByTopic.get(topic, lastCommittedOffset) : null;
final OffsetFetchResponseData.OffsetFetchResponseTopics
topicResponse =
new
OffsetFetchResponseData.OffsetFetchResponseTopics().setName(topic);
@@ -818,7 +969,9 @@ public class OffsetMetadataManager {
final int partition = partitionEntry.getKey();
final OffsetAndMetadata offsetAndMetadata =
partitionEntry.getValue();
- if (requireStable &&
hasPendingTransactionalOffsets(request.groupId(), topic, partition)) {
+ if (requireStable &&
+ openTransactionsByPartition != null &&
+ openTransactionsByPartition.containsKey(partition,
lastCommittedOffset)) {
topicResponse.partitions().add(new
OffsetFetchResponseData.OffsetFetchResponsePartitions()
.setPartitionIndex(partition)
.setErrorCode(Errors.UNSTABLE_OFFSET_COMMIT.code())
@@ -886,8 +1039,8 @@ public class OffsetMetadataManager {
});
metrics.record(OFFSET_EXPIRED_SENSOR_NAME, records.size());
- // We don't want to remove the group if there are ongoing transactions.
- return allOffsetsExpired.get() &&
!openTransactionsByGroup.containsKey(groupId);
+ // We don't want to remove the group if there are ongoing transactions
with undeleted offsets.
+ return allOffsetsExpired.get() && !openTransactions.contains(groupId);
}
/**
@@ -1004,9 +1157,7 @@ public class OffsetMetadataManager {
partition,
OffsetAndMetadata.fromRecord(recordOffset, value)
);
- openTransactionsByGroup
- .computeIfAbsent(groupId, __ -> new
TimelineHashSet<>(snapshotRegistry, 1))
- .add(producerId);
+ openTransactions.add(groupId, topic, partition, producerId);
}
} else {
if (offsets.remove(groupId, topic, partition) != null) {
@@ -1014,15 +1165,13 @@ public class OffsetMetadataManager {
}
// Remove all the pending offset commits related to the tombstone.
- TimelineHashSet<Long> openTransactions =
openTransactionsByGroup.get(groupId);
- if (openTransactions != null) {
- openTransactions.forEach(openProducerId -> {
- Offsets pendingOffsets =
pendingTransactionalOffsets.get(openProducerId);
- if (pendingOffsets != null) {
- pendingOffsets.remove(groupId, topic, partition);
- }
- });
- }
+ openTransactions.forEach(groupId, topic, partition, openProducerId
-> {
+ Offsets pendingOffsets =
pendingTransactionalOffsets.get(openProducerId);
+ if (pendingOffsets != null) {
+ pendingOffsets.remove(groupId, topic, partition);
+ }
+ });
+ openTransactions.clear(groupId, topic, partition);
}
}
@@ -1033,6 +1182,7 @@ public class OffsetMetadataManager {
* @param result The result of the transaction.
* @throws RuntimeException if the transaction can not be completed.
*/
+ @SuppressWarnings("NPathComplexity")
public void replayEndTransactionMarker(
long producerId,
TransactionResult result
@@ -1045,14 +1195,12 @@ public class OffsetMetadataManager {
return;
}
- pendingOffsets.offsetsByGroup.keySet().forEach(groupId -> {
- TimelineHashSet<Long> openTransactions =
openTransactionsByGroup.get(groupId);
- if (openTransactions != null) {
- openTransactions.remove(producerId);
- if (openTransactions.isEmpty()) {
- openTransactionsByGroup.remove(groupId);
- }
- }
+ pendingOffsets.offsetsByGroup.forEach((groupId, topicOffsets) -> {
+ topicOffsets.forEach((topic, partitionOffsets) -> {
+ partitionOffsets.keySet().forEach(partitionId -> {
+ openTransactions.remove(groupId, topic, partitionId,
producerId);
+ });
+ });
});
if (result == TransactionResult.COMMIT) {
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/TransactionalOffsetFetchBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/TransactionalOffsetFetchBenchmark.java
new file mode 100644
index 00000000000..b1986f639a7
--- /dev/null
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/TransactionalOffsetFetchBenchmark.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.coordinator;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.OffsetFetchRequestData;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.Group;
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
+import org.apache.kafka.coordinator.group.GroupMetadataManager;
+import org.apache.kafka.coordinator.group.OffsetMetadataManager;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.MetadataProvenance;
+import org.apache.kafka.timeline.SnapshotRegistry;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class TransactionalOffsetFetchBenchmark {
+ private static final Time TIME = Time.SYSTEM;
+
+ @Param({"4000"})
+ private int partitionCount;
+
+ @Param({"4000"})
+ private int transactionCount;
+
+ private static final String GROUP_ID = "my-group-id";
+ private static final String TOPIC_NAME = "my-topic-name";
+
+ private OffsetMetadataManager offsetMetadataManager;
+
+ /** A list of partition indexes from 0 to partitionCount - 1. */
+ private List<Integer> partitionIndexes;
+
+ @Setup(Level.Trial)
+ public void setup() {
+ LogContext logContext = new LogContext();
+ MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
+ delta.replay(new TopicRecord()
+ .setTopicId(Uuid.randomUuid())
+ .setName(TOPIC_NAME));
+ MetadataImage image = delta.apply(MetadataProvenance.EMPTY);
+
+ SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
+
+ GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
+ Group group = mock(Group.class);
+ when(groupMetadataManager.group(anyString(),
anyLong())).thenReturn(group);
+
+ offsetMetadataManager = new OffsetMetadataManager.Builder()
+ .withLogContext(logContext)
+ .withSnapshotRegistry(snapshotRegistry)
+ .withTime(TIME)
+ .withGroupMetadataManager(groupMetadataManager)
+ .withGroupCoordinatorConfig(mock(GroupCoordinatorConfig.class))
+ .withMetadataImage(image)
+
.withGroupCoordinatorMetricsShard(mock(GroupCoordinatorMetricsShard.class))
+ .build();
+
+ for (int i = 0; i < transactionCount; i++) {
+ snapshotRegistry.idempotentCreateSnapshot(i);
+ offsetMetadataManager.replay(
+ i,
+ 3193600 + i,
+ new OffsetCommitKey()
+ .setGroup(GROUP_ID)
+ .setTopic(TOPIC_NAME)
+ .setPartition(i),
+ new OffsetCommitValue()
+ .setOffset(100)
+ );
+ }
+
+ partitionIndexes = new ArrayList<>();
+ for (int i = 0; i < partitionCount; i++) {
+ partitionIndexes.add(i);
+ }
+ }
+
+ @Benchmark
+ @Threads(1)
+ @OutputTimeUnit(TimeUnit.MILLISECONDS)
+ public void run() {
+ offsetMetadataManager.fetchOffsets(
+ new OffsetFetchRequestData.OffsetFetchRequestGroup()
+ .setGroupId(GROUP_ID)
+ .setTopics(List.of(
+ new OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setName(TOPIC_NAME)
+ .setPartitionIndexes(partitionIndexes)
+ )),
+ Long.MAX_VALUE
+ );
+ }
+}