This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new eb3714f022c KAFKA-19160;KAFKA-19164; Improve performance of fetching 
stable offsets (#19497)
eb3714f022c is described below

commit eb3714f022c948e9348b7e281c4bd69a38666013
Author: Sean Quah <[email protected]>
AuthorDate: Mon May 12 08:32:17 2025 +0100

    KAFKA-19160;KAFKA-19164; Improve performance of fetching stable offsets 
(#19497)
    
    When fetching stable offsets in the group coordinator, we iterate over
    all requested partitions. For each partition, we iterate over the
    group's ongoing transactions to check if there is a pending
    transactional offset commit for that partition.
    
    This can get slow when there are a large number of partitions and a
    large number of pending transactions. Instead, maintain a list of
    pending transactions per partition to speed up lookups.
    
    Reviewers: Shaan, Dongnuo Lyu <[email protected]>, Chia-Ping Tsai 
<[email protected]>, David Jaco <[email protected]>
---
 .../coordinator/group/OffsetMetadataManager.java   | 278 ++++++++++++++++-----
 .../TransactionalOffsetFetchBenchmark.java         | 142 +++++++++++
 2 files changed, 355 insertions(+), 65 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
index 0fa997c557a..54de475d4bf 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
@@ -62,6 +62,7 @@ import java.util.Optional;
 import java.util.OptionalLong;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 
 import static 
org.apache.kafka.common.requests.OffsetFetchResponse.INVALID_OFFSET;
@@ -88,37 +89,37 @@ public class OffsetMetadataManager {
         private GroupCoordinatorConfig config = null;
         private GroupCoordinatorMetricsShard metrics = null;
 
-        Builder withLogContext(LogContext logContext) {
+        public Builder withLogContext(LogContext logContext) {
             this.logContext = logContext;
             return this;
         }
 
-        Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+        public Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) 
{
             this.snapshotRegistry = snapshotRegistry;
             return this;
         }
 
-        Builder withTime(Time time) {
+        public Builder withTime(Time time) {
             this.time = time;
             return this;
         }
 
-        Builder withGroupMetadataManager(GroupMetadataManager 
groupMetadataManager) {
+        public Builder withGroupMetadataManager(GroupMetadataManager 
groupMetadataManager) {
             this.groupMetadataManager = groupMetadataManager;
             return this;
         }
 
-        Builder withGroupCoordinatorConfig(GroupCoordinatorConfig config) {
+        public Builder withGroupCoordinatorConfig(GroupCoordinatorConfig 
config) {
             this.config = config;
             return this;
         }
 
-        Builder withMetadataImage(MetadataImage metadataImage) {
+        public Builder withMetadataImage(MetadataImage metadataImage) {
             this.metadataImage = metadataImage;
             return this;
         }
 
-        Builder withGroupCoordinatorMetricsShard(GroupCoordinatorMetricsShard 
metrics) {
+        public Builder 
withGroupCoordinatorMetricsShard(GroupCoordinatorMetricsShard metrics) {
             this.metrics = metrics;
             return this;
         }
@@ -193,9 +194,167 @@ public class OffsetMetadataManager {
     private final TimelineHashMap<Long, Offsets> pendingTransactionalOffsets;
 
     /**
-     * The open transactions (producer ids) keyed by group.
+     * The open transactions (producer ids) by group id, topic name and 
partition id.
      */
-    private final TimelineHashMap<String, TimelineHashSet<Long>> 
openTransactionsByGroup;
+    private final OpenTransactions openTransactions;
+
+    /**
+     * Tracks open transactions (producer ids) by group id, topic name and 
partition id.
+     * It is the responsiblity of the caller to update {@link 
#pendingTransactionalOffsets}.
+     */
+    private class OpenTransactions {
+        /**
+         * The open transactions (producer ids) keyed by group id, topic name 
and partition id.
+         * Tracks whether partitions have any pending transactional offsets 
that have not been deleted.
+         *
+         * Values in each level of the map will never be empty collections.
+         */
+        private final TimelineHashMap<String, TimelineHashMap<String, 
TimelineHashMap<Integer, TimelineHashSet<Long>>>> openTransactionsByGroup;
+
+        private OpenTransactions() {
+            this.openTransactionsByGroup = new 
TimelineHashMap<>(snapshotRegistry, 0);
+        }
+
+        /**
+         * Adds a producer id to the open transactions for the given group and 
topic partition.
+         *
+         * @param groupId    The group id.
+         * @param topic      The topic name.
+         * @param partition  The partition.
+         * @param producerId The producer id.
+         * @return {@code true} if the partition did not already have a 
pending offset from the producer id.
+         */
+        private boolean add(String groupId, String topic, int partition, long 
producerId) {
+            return openTransactionsByGroup
+                .computeIfAbsent(groupId, __ -> new 
TimelineHashMap<>(snapshotRegistry, 1))
+                .computeIfAbsent(topic, __ -> new 
TimelineHashMap<>(snapshotRegistry, 1))
+                .computeIfAbsent(partition, __ -> new 
TimelineHashSet<>(snapshotRegistry, 1))
+                .add(producerId);
+        }
+
+        /**
+         * Clears all open transactions for the given group and topic 
partition.
+         *
+         * @param groupId    The group id.
+         * @param topic      The topic name.
+         * @param partition  The partition.
+         */
+        private void clear(String groupId, String topic, int partition) {
+            TimelineHashMap<String, TimelineHashMap<Integer, 
TimelineHashSet<Long>>> openTransactionsByTopic =
+                openTransactionsByGroup.get(groupId);
+            if (openTransactionsByTopic == null) return;
+
+            TimelineHashMap<Integer, TimelineHashSet<Long>> 
openTransactionsByPartition = openTransactionsByTopic.get(topic);
+            if (openTransactionsByPartition == null) return;
+
+            openTransactionsByPartition.remove(partition);
+
+            if (openTransactionsByPartition.isEmpty()) {
+                openTransactionsByTopic.remove(topic);
+                if (openTransactionsByTopic.isEmpty()) {
+                    openTransactionsByGroup.remove(groupId);
+                }
+            }
+        }
+
+        /**
+         * Returns {@code true} if the given group has any pending 
transactional offsets.
+         *
+         * @param groupId The group id.
+         * @return {@code true} if the given group has any pending 
transactional offsets.
+         */
+        private boolean contains(String groupId) {
+            return openTransactionsByGroup.containsKey(groupId);
+        }
+
+        /**
+         * Returns {@code true} if the given group has any pending 
transactional offsets for the given topic and partition.
+         *
+         * @param groupId   The group id.
+         * @param topic     The topic name.
+         * @param partition The partition.
+         * @return {@code true} if the given group has any pending 
transactional offsets for the given topic and partition.
+         */
+        private boolean contains(String groupId, String topic, int partition) {
+            TimelineHashSet<Long> openTransactions = get(groupId, topic, 
partition);
+            return openTransactions != null;
+        }
+
+        /**
+         * Performs the given action for each partition with a pending 
transactional offset for the given group.
+         *
+         * @param groupId The group id.
+         * @param action  The action to be performed for each partition with a 
pending transactional offset.
+         */
+        private void forEachTopicPartition(String groupId, BiConsumer<String, 
Integer> action) {
+            TimelineHashMap<String, TimelineHashMap<Integer, 
TimelineHashSet<Long>>> openTransactionsByTopic =
+                openTransactionsByGroup.get(groupId);
+            if (openTransactionsByTopic == null) return;
+
+            openTransactionsByTopic.forEach((topic, 
openTransactionsByPartition) -> {
+                openTransactionsByPartition.forEach((partition, producerIds) 
-> {
+                    action.accept(topic, partition);
+                });
+            });
+        }
+
+        /**
+         * Performs the given action for each producer id with a pending 
transactional offset for the given group and topic partition.
+         *
+         * @param groupId   The group id.
+         * @param topic     The topic name.
+         * @param partition The partition.
+         * @param action    The action to be performed for each producer id 
with a pending transactional offset.
+         */
+        private void forEach(String groupId, String topic, int partition, 
Consumer<Long> action) {
+            TimelineHashSet<Long> openTransactions = get(groupId, topic, 
partition);
+            if (openTransactions == null) return;
+
+            openTransactions.forEach(action);
+        }
+
+        /**
+         * Gets the set of producer ids with pending transactional offsets for 
the given group and topic partition.
+         *
+         * @param groupId   The group id.
+         * @param topic     The topic name.
+         * @param partition The partition.
+         * @return The set of producer ids with pending transactional offsets 
for the given group and topic partition.
+         */
+        private TimelineHashSet<Long> get(String groupId, String topic, int 
partition) {
+            TimelineHashMap<String, TimelineHashMap<Integer, 
TimelineHashSet<Long>>> openTransactionsByTopic =
+                openTransactionsByGroup.get(groupId);
+            if (openTransactionsByTopic == null) return null;
+
+            TimelineHashMap<Integer, TimelineHashSet<Long>> 
openTransactionsByPartition = openTransactionsByTopic.get(topic);
+            if (openTransactionsByPartition == null) return null;
+
+            return openTransactionsByPartition.get(partition);
+        }
+
+        /**
+         * Removes a producer id from the open transactions for the given 
group and topic partition.
+         *
+         * @param groupId    The group id.
+         * @param topic      The topic name.
+         * @param partition  The partition.
+         * @param producerId The producer id.
+         * @return {@code true} if the group and topic partition had a pending 
transactional offset from the producer id.
+         */
+        private boolean remove(String groupId, String topic, int partition, 
long producerId) {
+            TimelineHashSet<Long> openTransactions = get(groupId, topic, 
partition);
+            if (openTransactions == null) return false;
+
+            boolean removed = openTransactions.remove(producerId);
+
+            if (openTransactions.isEmpty()) {
+                // Re-use the clean up in clear.
+                clear(groupId, topic, partition);
+            }
+
+            return removed;
+        }
+    }
 
     private class Offsets {
         /**
@@ -280,7 +439,7 @@ public class OffsetMetadataManager {
         this.metrics = metrics;
         this.offsets = new Offsets();
         this.pendingTransactionalOffsets = new 
TimelineHashMap<>(snapshotRegistry, 0);
-        this.openTransactionsByGroup = new TimelineHashMap<>(snapshotRegistry, 
0);
+        this.openTransactions = new OpenTransactions();
     }
 
     /**
@@ -652,26 +811,12 @@ public class OffsetMetadataManager {
         // Delete all the pending transactional offsets too. Here we only 
write a tombstone
         // if the topic-partition was not in the main storage because we don't 
need to write
         // two consecutive tombstones.
-        TimelineHashSet<Long> openTransactions = 
openTransactionsByGroup.get(groupId);
-        if (openTransactions != null) {
-            openTransactions.forEach(producerId -> {
-                Offsets pendingOffsets = 
pendingTransactionalOffsets.get(producerId);
-                if (pendingOffsets != null) {
-                    TimelineHashMap<String, TimelineHashMap<Integer, 
OffsetAndMetadata>> pendingGroupOffsets =
-                        pendingOffsets.offsetsByGroup.get(groupId);
-                    if (pendingGroupOffsets != null) {
-                        pendingGroupOffsets.forEach((topic, 
offsetsByPartition) -> {
-                            offsetsByPartition.keySet().forEach(partition -> {
-                                if (!hasCommittedOffset(groupId, topic, 
partition)) {
-                                    
records.add(GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord(groupId,
 topic, partition));
-                                    numDeletedOffsets.getAndIncrement();
-                                }
-                            });
-                        });
-                    }
-                }
-            });
-        }
+        openTransactions.forEachTopicPartition(groupId, (topic, partition) -> {
+            if (!hasCommittedOffset(groupId, topic, partition)) {
+                
records.add(GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord(groupId,
 topic, partition));
+                numDeletedOffsets.getAndIncrement();
+            }
+        });
 
         return numDeletedOffsets.get();
     }
@@ -687,17 +832,7 @@ public class OffsetMetadataManager {
         String topic,
         int partition
     ) {
-        final TimelineHashSet<Long> openTransactions = 
openTransactionsByGroup.get(groupId);
-        if (openTransactions == null) return false;
-
-        for (Long producerId : openTransactions) {
-            Offsets offsets = pendingTransactionalOffsets.get(producerId);
-            if (offsets != null && offsets.get(groupId, topic, partition) != 
null) {
-                return true;
-            }
-        }
-
-        return false;
+        return openTransactions.contains(groupId, topic, partition);
     }
 
     /**
@@ -738,6 +873,11 @@ public class OffsetMetadataManager {
         final List<OffsetFetchResponseData.OffsetFetchResponseTopics> 
topicResponses = new ArrayList<>(request.topics().size());
         final TimelineHashMap<String, TimelineHashMap<Integer, 
OffsetAndMetadata>> groupOffsets =
             failAllPartitions ? null : 
offsets.offsetsByGroup.get(request.groupId(), lastCommittedOffset);
+        // We inline the lookups from hasPendingTransactionalOffsets here, to 
avoid repeating string
+        // comparisons of group ids and topic names for every partition. 
They're only used when the
+        // client has requested stable offsets.
+        final TimelineHashMap<String, TimelineHashMap<Integer, 
TimelineHashSet<Long>>> openTransactionsByTopic =
+            requireStable ? 
openTransactions.openTransactionsByGroup.get(request.groupId(), 
lastCommittedOffset) : null;
 
         request.topics().forEach(topic -> {
             final OffsetFetchResponseData.OffsetFetchResponseTopics 
topicResponse =
@@ -746,12 +886,16 @@ public class OffsetMetadataManager {
 
             final TimelineHashMap<Integer, OffsetAndMetadata> topicOffsets = 
groupOffsets == null ?
                 null : groupOffsets.get(topic.name(), lastCommittedOffset);
+            final TimelineHashMap<Integer, TimelineHashSet<Long>> 
openTransactionsByPartition =
+                (requireStable && openTransactionsByTopic != null) ? 
openTransactionsByTopic.get(topic.name(), lastCommittedOffset) : null;
 
             topic.partitionIndexes().forEach(partitionIndex -> {
                 final OffsetAndMetadata offsetAndMetadata = topicOffsets == 
null ?
                     null : topicOffsets.get(partitionIndex, 
lastCommittedOffset);
 
-                if (requireStable && 
hasPendingTransactionalOffsets(request.groupId(), topic.name(), 
partitionIndex)) {
+                if (requireStable &&
+                    openTransactionsByPartition != null &&
+                    openTransactionsByPartition.containsKey(partitionIndex, 
lastCommittedOffset)) {
                     topicResponse.partitions().add(new 
OffsetFetchResponseData.OffsetFetchResponsePartitions()
                         .setPartitionIndex(partitionIndex)
                         .setErrorCode(Errors.UNSTABLE_OFFSET_COMMIT.code())
@@ -804,11 +948,18 @@ public class OffsetMetadataManager {
         final List<OffsetFetchResponseData.OffsetFetchResponseTopics> 
topicResponses = new ArrayList<>();
         final TimelineHashMap<String, TimelineHashMap<Integer, 
OffsetAndMetadata>> groupOffsets =
             offsets.offsetsByGroup.get(request.groupId(), lastCommittedOffset);
+        // We inline the lookups from hasPendingTransactionalOffsets here, to 
avoid repeating string
+        // comparisons of group ids and topic names for every partition. 
They're only used when the
+        // client has requested stable offsets.
+        final TimelineHashMap<String, TimelineHashMap<Integer, 
TimelineHashSet<Long>>> openTransactionsByTopic =
+            requireStable ? 
openTransactions.openTransactionsByGroup.get(request.groupId(), 
lastCommittedOffset) : null;
 
         if (groupOffsets != null) {
             groupOffsets.entrySet(lastCommittedOffset).forEach(topicEntry -> {
                 final String topic = topicEntry.getKey();
                 final TimelineHashMap<Integer, OffsetAndMetadata> topicOffsets 
= topicEntry.getValue();
+                final TimelineHashMap<Integer, TimelineHashSet<Long>> 
openTransactionsByPartition =
+                    (requireStable && openTransactionsByTopic != null) ? 
openTransactionsByTopic.get(topic, lastCommittedOffset) : null;
 
                 final OffsetFetchResponseData.OffsetFetchResponseTopics 
topicResponse =
                     new 
OffsetFetchResponseData.OffsetFetchResponseTopics().setName(topic);
@@ -818,7 +969,9 @@ public class OffsetMetadataManager {
                     final int partition = partitionEntry.getKey();
                     final OffsetAndMetadata offsetAndMetadata = 
partitionEntry.getValue();
 
-                    if (requireStable && 
hasPendingTransactionalOffsets(request.groupId(), topic, partition)) {
+                    if (requireStable &&
+                        openTransactionsByPartition != null &&
+                        openTransactionsByPartition.containsKey(partition, 
lastCommittedOffset)) {
                         topicResponse.partitions().add(new 
OffsetFetchResponseData.OffsetFetchResponsePartitions()
                             .setPartitionIndex(partition)
                             .setErrorCode(Errors.UNSTABLE_OFFSET_COMMIT.code())
@@ -886,8 +1039,8 @@ public class OffsetMetadataManager {
         });
         metrics.record(OFFSET_EXPIRED_SENSOR_NAME, records.size());
 
-        // We don't want to remove the group if there are ongoing transactions.
-        return allOffsetsExpired.get() && 
!openTransactionsByGroup.containsKey(groupId);
+        // We don't want to remove the group if there are ongoing transactions 
with undeleted offsets.
+        return allOffsetsExpired.get() && !openTransactions.contains(groupId);
     }
 
     /**
@@ -1004,9 +1157,7 @@ public class OffsetMetadataManager {
                         partition,
                         OffsetAndMetadata.fromRecord(recordOffset, value)
                     );
-                openTransactionsByGroup
-                    .computeIfAbsent(groupId, __ -> new 
TimelineHashSet<>(snapshotRegistry, 1))
-                    .add(producerId);
+                openTransactions.add(groupId, topic, partition, producerId);
             }
         } else {
             if (offsets.remove(groupId, topic, partition) != null) {
@@ -1014,15 +1165,13 @@ public class OffsetMetadataManager {
             }
 
             // Remove all the pending offset commits related to the tombstone.
-            TimelineHashSet<Long> openTransactions = 
openTransactionsByGroup.get(groupId);
-            if (openTransactions != null) {
-                openTransactions.forEach(openProducerId -> {
-                    Offsets pendingOffsets = 
pendingTransactionalOffsets.get(openProducerId);
-                    if (pendingOffsets != null) {
-                        pendingOffsets.remove(groupId, topic, partition);
-                    }
-                });
-            }
+            openTransactions.forEach(groupId, topic, partition, openProducerId 
-> {
+                Offsets pendingOffsets = 
pendingTransactionalOffsets.get(openProducerId);
+                if (pendingOffsets != null) {
+                    pendingOffsets.remove(groupId, topic, partition);
+                }
+            });
+            openTransactions.clear(groupId, topic, partition);
         }
     }
 
@@ -1033,6 +1182,7 @@ public class OffsetMetadataManager {
      * @param result        The result of the transaction.
      * @throws RuntimeException if the transaction can not be completed.
      */
+    @SuppressWarnings("NPathComplexity")
     public void replayEndTransactionMarker(
         long producerId,
         TransactionResult result
@@ -1045,14 +1195,12 @@ public class OffsetMetadataManager {
             return;
         }
 
-        pendingOffsets.offsetsByGroup.keySet().forEach(groupId -> {
-            TimelineHashSet<Long> openTransactions = 
openTransactionsByGroup.get(groupId);
-            if (openTransactions != null) {
-                openTransactions.remove(producerId);
-                if (openTransactions.isEmpty()) {
-                    openTransactionsByGroup.remove(groupId);
-                }
-            }
+        pendingOffsets.offsetsByGroup.forEach((groupId, topicOffsets) -> {
+            topicOffsets.forEach((topic, partitionOffsets) -> {
+                partitionOffsets.keySet().forEach(partitionId -> {
+                    openTransactions.remove(groupId, topic, partitionId, 
producerId);
+                });
+            });
         });
 
         if (result == TransactionResult.COMMIT) {
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/TransactionalOffsetFetchBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/TransactionalOffsetFetchBenchmark.java
new file mode 100644
index 00000000000..b1986f639a7
--- /dev/null
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/TransactionalOffsetFetchBenchmark.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.coordinator;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.OffsetFetchRequestData;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.Group;
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
+import org.apache.kafka.coordinator.group.GroupMetadataManager;
+import org.apache.kafka.coordinator.group.OffsetMetadataManager;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.MetadataProvenance;
+import org.apache.kafka.timeline.SnapshotRegistry;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class TransactionalOffsetFetchBenchmark {
+    private static final Time TIME = Time.SYSTEM;
+
+    @Param({"4000"})
+    private int partitionCount;
+
+    @Param({"4000"})
+    private int transactionCount;
+
+    private static final String GROUP_ID = "my-group-id";
+    private static final String TOPIC_NAME = "my-topic-name";
+
+    private OffsetMetadataManager offsetMetadataManager;
+
+    /** A list of partition indexes from 0 to partitionCount - 1. */
+    private List<Integer> partitionIndexes;
+
+    @Setup(Level.Trial)
+    public void setup() {
+        LogContext logContext = new LogContext();
+        MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
+        delta.replay(new TopicRecord()
+            .setTopicId(Uuid.randomUuid())
+            .setName(TOPIC_NAME));
+        MetadataImage image = delta.apply(MetadataProvenance.EMPTY);
+
+        SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
+
+        GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+        Group group = mock(Group.class);
+        when(groupMetadataManager.group(anyString(), 
anyLong())).thenReturn(group);
+
+        offsetMetadataManager = new OffsetMetadataManager.Builder()
+            .withLogContext(logContext)
+            .withSnapshotRegistry(snapshotRegistry)
+            .withTime(TIME)
+            .withGroupMetadataManager(groupMetadataManager)
+            .withGroupCoordinatorConfig(mock(GroupCoordinatorConfig.class))
+            .withMetadataImage(image)
+            
.withGroupCoordinatorMetricsShard(mock(GroupCoordinatorMetricsShard.class))
+            .build();
+
+        for (int i = 0; i < transactionCount; i++) {
+            snapshotRegistry.idempotentCreateSnapshot(i);
+            offsetMetadataManager.replay(
+                i,
+                3193600 + i,
+                new OffsetCommitKey()
+                    .setGroup(GROUP_ID)
+                    .setTopic(TOPIC_NAME)
+                    .setPartition(i),
+                new OffsetCommitValue()
+                    .setOffset(100)
+            );
+        }
+
+        partitionIndexes = new ArrayList<>();
+        for (int i = 0; i < partitionCount; i++) {
+            partitionIndexes.add(i);
+        }
+    }
+
+    @Benchmark
+    @Threads(1)
+    @OutputTimeUnit(TimeUnit.MILLISECONDS)
+    public void run() {
+        offsetMetadataManager.fetchOffsets(
+            new OffsetFetchRequestData.OffsetFetchRequestGroup()
+                .setGroupId(GROUP_ID)
+                .setTopics(List.of(
+                    new OffsetFetchRequestData.OffsetFetchRequestTopics()
+                        .setName(TOPIC_NAME)
+                        .setPartitionIndexes(partitionIndexes)
+                )),
+            Long.MAX_VALUE
+        );
+    }
+}

Reply via email to