This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d067c6c0408 KAFKA-19716: Clear out coordinator snapshots periodically 
while loading (#20547)
d067c6c0408 is described below

commit d067c6c04089a3d24e1f72e6cb1b10b0d85f76da
Author: Sean Quah <[email protected]>
AuthorDate: Fri Sep 19 08:44:07 2025 +0100

    KAFKA-19716: Clear out coordinator snapshots periodically while loading 
(#20547)
    
    When nested Timeline collections are created and discarded while loading
    a coordinator partition, references to them accumulate in the current
    snapshot. Allow the GC to reclaim them by starting a new snapshot and
    discarding previous snapshots every 16,384 records.
    
    Small intervals degrade loading times for non-transactional offset
    commit workloads while large intervals degrade loading times for
    transactional workloads. A default of 16,384 was chosen as a compromise.
    
    Also add a benchmark for group coordinator loading.
    
    Reviewers: David Jacot <[email protected]>
---
 build.gradle                                       |   1 +
 checkstyle/import-control-jmh-benchmarks.xml       |   4 +-
 .../common/runtime/CoordinatorLoaderImpl.java      |  39 ++-
 .../common/runtime/SnapshottableCoordinator.java   |   2 +-
 .../common/runtime/CoordinatorLoaderImplTest.java  | 109 ++++++-
 .../src/main/scala/kafka/server/BrokerServer.scala |   6 +-
 .../GroupCoordinatorShardLoadingBenchmark.java     | 355 +++++++++++++++++++++
 .../org/apache/kafka/jmh/coordinator/MockLog.java  |  74 +++++
 8 files changed, 563 insertions(+), 27 deletions(-)

diff --git a/build.gradle b/build.gradle
index 26ca93d0738..753a86cfc35 100644
--- a/build.gradle
+++ b/build.gradle
@@ -3350,6 +3350,7 @@ project(':jmh-benchmarks') {
     implementation project(':raft')
     implementation project(':clients')
     implementation project(':coordinator-common')
+    implementation project(':coordinator-common').sourceSets.test.output
     implementation project(':group-coordinator')
     implementation project(':group-coordinator:group-coordinator-api')
     implementation project(':metadata')
diff --git a/checkstyle/import-control-jmh-benchmarks.xml 
b/checkstyle/import-control-jmh-benchmarks.xml
index d2f87a3577f..4c11bc3acb4 100644
--- a/checkstyle/import-control-jmh-benchmarks.xml
+++ b/checkstyle/import-control-jmh-benchmarks.xml
@@ -52,9 +52,7 @@
     <allow pkg="org.apache.kafka.server"/>
     <allow pkg="org.apache.kafka.storage"/>
     <allow pkg="org.apache.kafka.clients"/>
-    <allow 
class="org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage"/>
-    <allow 
class="org.apache.kafka.coordinator.common.runtime.KRaftCoordinatorMetadataImage"/>
-    <allow class="org.apache.kafka.coordinator.common.runtime.HdrHistogram"/>
+    <allow pkg="org.apache.kafka.coordinator.common.runtime"/>
     <allow pkg="org.apache.kafka.coordinator.group"/>
     <allow pkg="org.apache.kafka.image"/>
     <allow pkg="org.apache.kafka.metadata"/>
diff --git 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImpl.java
 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImpl.java
index 6613ce25fc8..3a8b7434326 100644
--- 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImpl.java
+++ 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImpl.java
@@ -50,6 +50,20 @@ import java.util.function.Function;
  */
 public class CoordinatorLoaderImpl<T> implements CoordinatorLoader<T> {
 
+    /**
+     * The interval between updating the last committed offset during loading, 
in offsets. Smaller
+     * values commit more often at the expense of loading times when the 
workload is simple and does
+     * not create collections that need to participate in {@link 
CoordinatorPlayback} snapshotting.
+     * Larger values commit less often and allow more temporary data to 
accumulate before the next
+     * commit when the workload creates many temporary collections that need 
to be snapshotted.
+     *
+     * The value of 16,384 was chosen as a trade-off between the performance 
of these two workloads.
+     *
+     * When changing this value, please run the 
GroupCoordinatorShardLoadingBenchmark to evaluate
+     * the relative change in performance.
+     */
+    public static final long DEFAULT_COMMIT_INTERVAL_OFFSETS = 16384;
+
     private static final Logger LOG = 
LoggerFactory.getLogger(CoordinatorLoaderImpl.class);
 
     private final Time time;
@@ -57,6 +71,7 @@ public class CoordinatorLoaderImpl<T> implements 
CoordinatorLoader<T> {
     private final Function<TopicPartition, Optional<Long>> 
partitionLogEndOffsetSupplier;
     private final Deserializer<T> deserializer;
     private final int loadBufferSize;
+    private final long commitIntervalOffsets;
 
     private final AtomicBoolean isRunning = new AtomicBoolean(true);
     private final KafkaScheduler scheduler = new KafkaScheduler(1);
@@ -66,13 +81,15 @@ public class CoordinatorLoaderImpl<T> implements 
CoordinatorLoader<T> {
         Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier,
         Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier,
         Deserializer<T> deserializer,
-        int loadBufferSize
+        int loadBufferSize,
+        long commitIntervalOffsets
     ) {
         this.time = time;
         this.partitionLogSupplier = partitionLogSupplier;
         this.partitionLogEndOffsetSupplier = partitionLogEndOffsetSupplier;
         this.deserializer = deserializer;
         this.loadBufferSize = loadBufferSize;
+        this.commitIntervalOffsets = commitIntervalOffsets;
         this.scheduler.startup();
     }
 
@@ -121,7 +138,7 @@ public class CoordinatorLoaderImpl<T> implements 
CoordinatorLoader<T> {
             long currentOffset = log.logStartOffset();
             LoadStats stats = new LoadStats();
 
-            long previousHighWatermark = -1L;
+            long lastCommittedOffset = -1L;
             while (shouldFetchNextBatch(currentOffset, logEndOffset(tp), 
stats.readAtLeastOneRecord)) {
                 FetchDataInfo fetchDataInfo = log.read(currentOffset, 
loadBufferSize, FetchIsolation.LOG_END, true);
 
@@ -133,9 +150,9 @@ public class CoordinatorLoaderImpl<T> implements 
CoordinatorLoader<T> {
                     buffer = memoryRecords.buffer();
                 }
 
-                ReplayResult replayResult = processMemoryRecords(tp, log, 
memoryRecords, coordinator, stats, currentOffset, previousHighWatermark);
+                ReplayResult replayResult = processMemoryRecords(tp, log, 
memoryRecords, coordinator, stats, currentOffset, lastCommittedOffset);
                 currentOffset = replayResult.nextOffset;
-                previousHighWatermark = replayResult.highWatermark;
+                lastCommittedOffset = replayResult.lastCommittedOffset;
             }
 
             long endTimeMs = time.milliseconds();
@@ -207,7 +224,7 @@ public class CoordinatorLoaderImpl<T> implements 
CoordinatorLoader<T> {
         CoordinatorPlayback<T> coordinator,
         LoadStats loadStats,
         long currentOffset,
-        long previousHighWatermark
+        long lastCommittedOffset
     ) {
         for (MutableRecordBatch batch : memoryRecords.batches()) {
             if (batch.isControlBatch()) {
@@ -286,14 +303,18 @@ public class CoordinatorLoaderImpl<T> implements 
CoordinatorLoader<T> {
             if (currentOffset >= currentHighWatermark) {
                 coordinator.updateLastWrittenOffset(currentOffset);
 
-                if (currentHighWatermark > previousHighWatermark) {
+                if (currentHighWatermark > lastCommittedOffset) {
                     
coordinator.updateLastCommittedOffset(currentHighWatermark);
-                    previousHighWatermark = currentHighWatermark;
+                    lastCommittedOffset = currentHighWatermark;
                 }
+            } else if (currentOffset - lastCommittedOffset >= 
commitIntervalOffsets) {
+                coordinator.updateLastWrittenOffset(currentOffset);
+                coordinator.updateLastCommittedOffset(currentOffset);
+                lastCommittedOffset = currentOffset;
             }
         }
         loadStats.numBytes += memoryRecords.sizeInBytes();
-        return new ReplayResult(currentOffset, previousHighWatermark);
+        return new ReplayResult(currentOffset, lastCommittedOffset);
     }
 
     /**
@@ -326,5 +347,5 @@ public class CoordinatorLoaderImpl<T> implements 
CoordinatorLoader<T> {
         }
     }
 
-    private record ReplayResult(long nextOffset, long highWatermark) { }
+    private record ReplayResult(long nextOffset, long lastCommittedOffset) { }
 }
diff --git 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/SnapshottableCoordinator.java
 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/SnapshottableCoordinator.java
index 1550e444bf4..278373e6842 100644
--- 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/SnapshottableCoordinator.java
+++ 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/SnapshottableCoordinator.java
@@ -61,7 +61,7 @@ public class SnapshottableCoordinator<S extends 
CoordinatorShard<U>, U> implemen
      */
     private long lastCommittedOffset;
 
-    SnapshottableCoordinator(
+    public SnapshottableCoordinator(
         LogContext logContext,
         SnapshotRegistry snapshotRegistry,
         S coordinator,
diff --git 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImplTest.java
 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImplTest.java
index 9f8ab68c66f..11cdab83cac 100644
--- 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImplTest.java
+++ 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImplTest.java
@@ -91,7 +91,8 @@ class CoordinatorLoaderImplTest {
             partitionLogSupplier,
             partitionLogEndOffsetSupplier,
             serde,
-            1000
+            1000,
+            CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
         )) {
             assertFutureThrows(NotLeaderOrFollowerException.class, 
loader.load(tp, coordinator));
         }
@@ -110,7 +111,8 @@ class CoordinatorLoaderImplTest {
             partitionLogSupplier,
             partitionLogEndOffsetSupplier,
             serde,
-            1000
+            1000,
+            CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
         )) {
             loader.close();
             assertFutureThrows(RuntimeException.class, loader.load(tp, 
coordinator));
@@ -131,7 +133,8 @@ class CoordinatorLoaderImplTest {
             partitionLogSupplier,
             partitionLogEndOffsetSupplier,
             serde,
-            1000
+            1000,
+            CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
         )) {
             when(log.logStartOffset()).thenReturn(0L);
             when(log.highWatermark()).thenReturn(0L);
@@ -217,7 +220,8 @@ class CoordinatorLoaderImplTest {
             partitionLogSupplier,
             partitionLogEndOffsetSupplier,
             serde,
-            1000
+            1000,
+            CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
         )) {
             when(log.logStartOffset()).thenReturn(0L);
 
@@ -262,7 +266,8 @@ class CoordinatorLoaderImplTest {
             partitionLogSupplier,
             partitionLogEndOffsetSupplier,
             serde,
-            1000
+            1000,
+            CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
         )) {
             when(log.logStartOffset()).thenReturn(0L);
 
@@ -298,7 +303,8 @@ class CoordinatorLoaderImplTest {
             partitionLogSupplier,
             partitionLogEndOffsetSupplier,
             serde,
-            1000
+            1000,
+            CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
         )) {
             when(log.logStartOffset()).thenReturn(0L);
 
@@ -337,7 +343,8 @@ class CoordinatorLoaderImplTest {
             partitionLogSupplier,
             partitionLogEndOffsetSupplier,
             serde,
-            1000
+            1000,
+            CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
         )) {
             when(log.logStartOffset()).thenReturn(0L);
 
@@ -365,7 +372,8 @@ class CoordinatorLoaderImplTest {
             partitionLogSupplier,
             partitionLogEndOffsetSupplier,
             serde,
-            1000
+            1000,
+            CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
         )) {
             long startTimeMs = time.milliseconds();
             when(log.logStartOffset()).thenReturn(0L);
@@ -412,7 +420,8 @@ class CoordinatorLoaderImplTest {
             partitionLogSupplier,
             partitionLogEndOffsetSupplier,
             serde,
-            1000
+            1000,
+            CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
         )) {
             when(log.logStartOffset()).thenReturn(0L);
             when(log.highWatermark()).thenReturn(0L, 0L, 2L);
@@ -475,7 +484,8 @@ class CoordinatorLoaderImplTest {
             partitionLogSupplier,
             partitionLogEndOffsetSupplier,
             serde,
-            1000
+            1000,
+            CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
         )) {
             when(log.logStartOffset()).thenReturn(0L);
             when(log.highWatermark()).thenReturn(0L);
@@ -501,7 +511,8 @@ class CoordinatorLoaderImplTest {
             partitionLogSupplier,
             partitionLogEndOffsetSupplier,
             serde,
-            1000
+            1000,
+            CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
         )) {
             when(log.logStartOffset()).thenReturn(0L);
             when(log.highWatermark()).thenReturn(5L, 7L, 7L);
@@ -551,6 +562,79 @@ class CoordinatorLoaderImplTest {
         }
     }
 
+    @Test
+    void testUpdateLastWrittenOffsetCommitInterval() throws Exception {
+        TopicPartition tp = new TopicPartition("foo", 0);
+        UnifiedLog log = mock(UnifiedLog.class);
+        Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = 
partition -> Optional.of(log);
+        Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier 
= partition -> Optional.of(7L);
+        StringKeyValueDeserializer serde = new StringKeyValueDeserializer();
+        CoordinatorPlayback<Map.Entry<String, String>> coordinator = 
mock(CoordinatorPlayback.class);
+
+        try (CoordinatorLoaderImpl<Map.Entry<String, String>> loader = new 
CoordinatorLoaderImpl<>(
+            Time.SYSTEM,
+            partitionLogSupplier,
+            partitionLogEndOffsetSupplier,
+            serde,
+            1000,
+            2L
+        )) {
+            when(log.logStartOffset()).thenReturn(0L);
+            when(log.highWatermark()).thenReturn(7L);
+
+            FetchDataInfo readResult1 = logReadResult(0, Arrays.asList(
+                new SimpleRecord("k1".getBytes(), "v1".getBytes()),
+                new SimpleRecord("k2".getBytes(), "v2".getBytes())
+            ));
+
+            when(log.read(0L, 1000, FetchIsolation.LOG_END, true))
+                .thenReturn(readResult1);
+
+            FetchDataInfo readResult2 = logReadResult(2, Arrays.asList(
+                new SimpleRecord("k3".getBytes(), "v3".getBytes()),
+                new SimpleRecord("k4".getBytes(), "v4".getBytes()),
+                new SimpleRecord("k5".getBytes(), "v5".getBytes())
+            ));
+
+            when(log.read(2L, 1000, FetchIsolation.LOG_END, true))
+                .thenReturn(readResult2);
+
+            FetchDataInfo readResult3 = logReadResult(5, Arrays.asList(
+                new SimpleRecord("k6".getBytes(), "v6".getBytes())
+            ));
+
+            when(log.read(5L, 1000, FetchIsolation.LOG_END, true))
+                .thenReturn(readResult3);
+
+            FetchDataInfo readResult4 = logReadResult(6, Arrays.asList(
+                new SimpleRecord("k7".getBytes(), "v7".getBytes())
+            ));
+
+            when(log.read(6L, 1000, FetchIsolation.LOG_END, true))
+                .thenReturn(readResult4);
+
+            assertNotNull(loader.load(tp, coordinator).get(10, 
TimeUnit.SECONDS));
+
+            verify(coordinator).replay(0L, RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k1", "v1"));
+            verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k2", "v2"));
+            verify(coordinator).replay(2L, RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k3", "v3"));
+            verify(coordinator).replay(3L, RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k4", "v4"));
+            verify(coordinator).replay(4L, RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k5", "v5"));
+            verify(coordinator).replay(5L, RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k6", "v6"));
+            verify(coordinator).replay(6L, RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, Map.entry("k7", "v7"));
+            verify(coordinator, times(0)).updateLastWrittenOffset(0L);
+            verify(coordinator, times(1)).updateLastWrittenOffset(2L);
+            verify(coordinator, times(1)).updateLastWrittenOffset(5L);
+            verify(coordinator, times(0)).updateLastWrittenOffset(6L);
+            verify(coordinator, times(1)).updateLastWrittenOffset(7L);
+            verify(coordinator, times(0)).updateLastCommittedOffset(0L);
+            verify(coordinator, times(1)).updateLastCommittedOffset(2L);
+            verify(coordinator, times(1)).updateLastCommittedOffset(5L);
+            verify(coordinator, times(0)).updateLastCommittedOffset(6L);
+            verify(coordinator, times(1)).updateLastCommittedOffset(7L);
+        }
+    }
+
     @Test
     void testPartitionGoesOfflineDuringLoad() throws Exception {
         TopicPartition tp = new TopicPartition("foo", 0);
@@ -565,7 +649,8 @@ class CoordinatorLoaderImplTest {
             partitionLogSupplier,
             partitionLogEndOffsetSupplier,
             serde,
-            1000
+            1000,
+            CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
         )) {
             when(log.logStartOffset()).thenReturn(0L);
             when(log.highWatermark()).thenReturn(0L);
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 3ded033020b..47085169979 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -613,7 +613,8 @@ class BrokerServer(
       tp => replicaManager.getLog(tp).toJava,
       tp => replicaManager.getLogEndOffset(tp).map(Long.box).toJava,
       serde,
-      config.groupCoordinatorConfig.offsetsLoadBufferSize
+      config.groupCoordinatorConfig.offsetsLoadBufferSize,
+      CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
     )
     val writer = new CoordinatorPartitionWriter(
       replicaManager
@@ -644,7 +645,8 @@ class BrokerServer(
       tp => replicaManager.getLog(tp).toJava,
       tp => replicaManager.getLogEndOffset(tp).map(Long.box).toJava,
       serde,
-      config.shareCoordinatorConfig.shareCoordinatorLoadBufferSize()
+      config.shareCoordinatorConfig.shareCoordinatorLoadBufferSize(),
+      CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
     )
     val writer = new CoordinatorPartitionWriter(
       replicaManager
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/GroupCoordinatorShardLoadingBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/GroupCoordinatorShardLoadingBenchmark.java
new file mode 100644
index 00000000000..0a6612ca8fd
--- /dev/null
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/GroupCoordinatorShardLoadingBenchmark.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.coordinator;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorLoaderImpl;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import org.apache.kafka.coordinator.common.runtime.MockCoordinatorExecutor;
+import org.apache.kafka.coordinator.common.runtime.MockCoordinatorTimer;
+import org.apache.kafka.coordinator.common.runtime.SnapshottableCoordinator;
+import org.apache.kafka.coordinator.group.GroupConfigManager;
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
+import org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers;
+import org.apache.kafka.coordinator.group.GroupCoordinatorRecordSerde;
+import org.apache.kafka.coordinator.group.GroupCoordinatorShard;
+import org.apache.kafka.coordinator.group.OffsetAndMetadata;
+import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics;
+import org.apache.kafka.server.storage.log.FetchIsolation;
+import org.apache.kafka.storage.internals.log.FetchDataInfo;
+import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
+import org.apache.kafka.storage.internals.log.UnifiedLog;
+import org.apache.kafka.timeline.SnapshotRegistry;
+
+import com.yammer.metrics.core.MetricsRegistry;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.BenchmarkParams;
+import org.openjdk.jmh.infra.IterationParams;
+import org.openjdk.jmh.runner.IterationType;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 1)
+@Measurement(iterations = 1)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class GroupCoordinatorShardLoadingBenchmark {
+
+    private static final String GROUP_ID = "test-group";
+
+    @Param({"1", "4", "16", "64", "256", "1024", "4096", "16384", "65536", 
"262144", "1048576"})
+    private long commitInterval;
+
+    @Param({"8192"})
+    private int batchCount;
+
+    @Param({"2048"})
+    private int batchSize;
+
+    private TopicPartition topicPartition;
+    private MockTime time;
+    private GroupCoordinatorConfig config;
+    private GroupCoordinatorRecordSerde serde;
+    private GroupCoordinatorShard coordinatorShard;
+    private SnapshottableCoordinator<GroupCoordinatorShard, CoordinatorRecord> 
snapshottableCoordinator;
+    private UnifiedLog offsetCommitLog;
+    private UnifiedLog transactionalOffsetCommitLog;
+
+    static class OffsetCommitLog extends MockLog {
+        private final int batchCount;
+        private final SimpleRecord[] batch;
+
+        public OffsetCommitLog(TopicPartition tp, int batchSize, int 
batchCount) throws IOException {
+            super(tp);
+
+            this.batchCount = batchCount;
+
+            List<SimpleRecord> batchRecords = new ArrayList<>();
+            for (int i = 0; i < batchSize; i++) {
+                String topic = "topic-" + i;
+                int partition = 0;
+
+                OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(
+                    0L,
+                    OptionalInt.of(0),
+                    OffsetAndMetadata.NO_METADATA,
+                    0L,
+                    OptionalLong.empty(),
+                    Uuid.randomUuid()
+                );
+
+                CoordinatorRecord coordinatorRecord = 
GroupCoordinatorRecordHelpers.newOffsetCommitRecord(
+                    GROUP_ID, topic, partition, offsetAndMetadata
+                );
+
+                byte[] keyBytes = new 
GroupCoordinatorRecordSerde().serializeKey(coordinatorRecord);
+                byte[] valueBytes = new 
GroupCoordinatorRecordSerde().serializeValue(coordinatorRecord);
+                SimpleRecord simpleRecord = new SimpleRecord(keyBytes, 
valueBytes);
+                batchRecords.add(simpleRecord);
+            }
+
+            this.batch = batchRecords.toArray(new SimpleRecord[0]);
+        }
+
+        @Override
+        public long logStartOffset() {
+            return 0L;
+        }
+
+        @Override
+        public long logEndOffset() {
+            if (batch == null) {
+                return 0L;
+            }
+
+            return (long) batchCount * (long) batch.length;
+        }
+
+        @Override
+        public FetchDataInfo read(long startOffset, int maxLength, 
FetchIsolation isolation, boolean minOneMessage) {
+            if (startOffset < 0 || startOffset >= logEndOffset()) {
+                return new FetchDataInfo(new LogOffsetMetadata(startOffset), 
MemoryRecords.EMPTY);
+            }
+
+            MemoryRecords records = MemoryRecords.withRecords(
+                startOffset,
+                Compression.NONE,
+                batch
+            );
+            return new FetchDataInfo(new LogOffsetMetadata(startOffset), 
records);
+        }
+    }
+
+    static class TransactionalOffsetCommitLog extends MockLog {
+        private final int batchCount;
+        private final SimpleRecord[] batch;
+        private final long producerId;
+        private final short producerEpoch;
+        private final int coordinatorEpoch;
+
+        public TransactionalOffsetCommitLog(TopicPartition tp, int batchSize, 
int batchCount) throws IOException {
+            super(tp);
+
+            this.batchCount = batchCount;
+            this.producerId = 1000L;
+            this.producerEpoch = 0;
+            this.coordinatorEpoch = 100;
+
+            List<SimpleRecord> batchRecords = new ArrayList<>();
+            for (int i = 0; i < batchSize - 1; i++) {
+                String topic = "topic-" + i;
+                int partition = 0;
+
+                OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(
+                    0L,
+                    OptionalInt.of(0),
+                    OffsetAndMetadata.NO_METADATA,
+                    0L,
+                    OptionalLong.empty(),
+                    Uuid.randomUuid()
+                );
+
+                CoordinatorRecord coordinatorRecord = 
GroupCoordinatorRecordHelpers.newOffsetCommitRecord(
+                    GROUP_ID, topic, partition, offsetAndMetadata
+                );
+
+                byte[] keyBytes = new 
GroupCoordinatorRecordSerde().serializeKey(coordinatorRecord);
+                byte[] valueBytes = new 
GroupCoordinatorRecordSerde().serializeValue(coordinatorRecord);
+                SimpleRecord simpleRecord = new SimpleRecord(keyBytes, 
valueBytes);
+                batchRecords.add(simpleRecord);
+            }
+
+            this.batch = batchRecords.toArray(new SimpleRecord[0]);
+        }
+
+        @Override
+        public long logStartOffset() {
+            return 0L;
+        }
+
+        @Override
+        public long logEndOffset() {
+            if (batch == null) {
+                return 0L;
+            }
+
+            return (long) (batch.length + 1) * (long) batchCount;
+        }
+
+        @Override
+        public FetchDataInfo read(long startOffset, int maxLength, 
FetchIsolation isolation, boolean minOneMessage) {
+            if (startOffset < 0 || startOffset >= logEndOffset()) {
+                return new FetchDataInfo(new LogOffsetMetadata(startOffset), 
MemoryRecords.EMPTY);
+            }
+
+            // Repeat the batch followed by a commit marker.
+            long patternLength = batch.length + 1;
+            if (startOffset % patternLength < batch.length) {
+                MemoryRecords records = MemoryRecords.withTransactionalRecords(
+                    startOffset,
+                    Compression.NONE,
+                    producerId,
+                    producerEpoch,
+                    0,
+                    0,
+                    batch
+                );
+                return new FetchDataInfo(new LogOffsetMetadata(startOffset), 
records);
+            } else {
+                MemoryRecords records = MemoryRecords.withEndTransactionMarker(
+                    startOffset,
+                    0L,
+                    0,
+                    producerId,
+                    producerEpoch,
+                    new EndTransactionMarker(ControlRecordType.COMMIT, 
coordinatorEpoch)
+                );
+                return new FetchDataInfo(new LogOffsetMetadata(startOffset), 
records);
+            }
+        }
+    }
+
+    @Setup(Level.Trial)
+    public void setup() throws Exception {
+        topicPartition = new TopicPartition("__consumer_offsets", 0);
+        time = new MockTime();
+        Map<String, Object> props = new HashMap<>();
+        config = GroupCoordinatorConfig.fromProps(props);
+        serde = new GroupCoordinatorRecordSerde();
+    }
+
+    @Setup(Level.Iteration)
+    public void setupIteration(BenchmarkParams benchmarkParams, 
IterationParams iterationParams) throws IOException {
+        // Reduce the data size for warmup iterations, since transactional 
offset commit loading
+        // takes longer than 20 seconds.
+        int iterationBatchCount = batchCount;
+        if (iterationParams.getType() == IterationType.WARMUP) {
+            iterationBatchCount = Math.min(iterationBatchCount, 1024);
+        }
+
+        offsetCommitLog = new OffsetCommitLog(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0), batchSize, 
iterationBatchCount);
+        transactionalOffsetCommitLog = new TransactionalOffsetCommitLog(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0), batchSize, 
iterationBatchCount);
+    }
+
+    @Setup(Level.Invocation)
+    public void setupInvocation() {
+        GroupConfigManager configManager = new GroupConfigManager(new 
HashMap<>());
+        LogContext logContext = new LogContext();
+        SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
+
+        MetricsRegistry metricsRegistry = new MetricsRegistry();
+        Metrics metrics = new Metrics();
+        GroupCoordinatorMetrics coordinatorMetrics = new 
GroupCoordinatorMetrics(metricsRegistry, metrics);
+
+        coordinatorShard = new GroupCoordinatorShard.Builder(config, 
configManager)
+            .withAuthorizerPlugin(Optional.empty())
+            .withLogContext(logContext)
+            .withSnapshotRegistry(snapshotRegistry)
+            .withTime(time)
+            .withTimer(new MockCoordinatorTimer<>(time))
+            .withExecutor(new MockCoordinatorExecutor<>())
+            .withCoordinatorMetrics(coordinatorMetrics)
+            .withTopicPartition(topicPartition)
+            .build();
+
+        snapshottableCoordinator = new SnapshottableCoordinator<>(
+            logContext,
+            snapshotRegistry,
+            coordinatorShard,
+            topicPartition
+        );
+    }
+
+    private CoordinatorLoader.LoadSummary loadRecords(UnifiedLog log) throws 
ExecutionException, InterruptedException {
+        Function<TopicPartition, Optional<UnifiedLog>> partitionLogSupplier = 
tp -> Optional.of(log);
+        Function<TopicPartition, Optional<Long>> partitionLogEndOffsetSupplier 
= tp -> Optional.of(log.logEndOffset());
+
+        CoordinatorLoaderImpl<CoordinatorRecord> loader = new 
CoordinatorLoaderImpl<>(
+            time,
+            partitionLogSupplier,
+            partitionLogEndOffsetSupplier,
+            serde,
+            config.offsetsLoadBufferSize(),
+            commitInterval
+        );
+
+        return loader.load(topicPartition, snapshottableCoordinator).get();
+    }
+
+    @Benchmark
+    @Threads(1)
+    @OutputTimeUnit(TimeUnit.MILLISECONDS)
+    public CoordinatorLoader.LoadSummary loadOffsetCommitRecords() throws 
ExecutionException, InterruptedException {
+        return loadRecords(offsetCommitLog);
+    }
+
+    @Benchmark
+    @Threads(1)
+    @OutputTimeUnit(TimeUnit.MILLISECONDS)
+    public CoordinatorLoader.LoadSummary 
loadTransactionalOffsetCommitRecords() throws ExecutionException, 
InterruptedException {
+        return loadRecords(transactionalOffsetCommitLog);
+    }
+
+    public static void main(String[] args) throws RunnerException {
+        Options opt = new OptionsBuilder()
+                
.include(GroupCoordinatorShardLoadingBenchmark.class.getSimpleName())
+                .forks(1)
+                .build();
+
+        new Runner(opt).run();
+    }
+}
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/MockLog.java 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/MockLog.java
new file mode 100644
index 00000000000..a7de83d5bcb
--- /dev/null
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/MockLog.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.coordinator;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.server.storage.log.FetchIsolation;
+import org.apache.kafka.server.util.Scheduler;
+import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
+import org.apache.kafka.storage.internals.log.FetchDataInfo;
+import org.apache.kafka.storage.internals.log.LocalLog;
+import org.apache.kafka.storage.internals.log.LogOffsetsListener;
+import org.apache.kafka.storage.internals.log.LogSegments;
+import org.apache.kafka.storage.internals.log.ProducerStateManager;
+import org.apache.kafka.storage.internals.log.UnifiedLog;
+import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public abstract class MockLog extends UnifiedLog {
+
+    public MockLog(TopicPartition tp) throws IOException {
+        super(
+            0,
+            createMockLocalLog(tp),
+            mock(BrokerTopicStats.class),
+            Integer.MAX_VALUE,
+            mock(LeaderEpochFileCache.class),
+            mock(ProducerStateManager.class),
+            Optional.empty(),
+            false,
+            LogOffsetsListener.NO_OP_OFFSETS_LISTENER
+        );
+    }
+
+    @Override
+    public abstract long logStartOffset();
+
+    @Override
+    public abstract long logEndOffset();
+
+    @Override
+    public long highWatermark() {
+        return logEndOffset();
+    }
+
+    @Override
+    public abstract FetchDataInfo read(long startOffset, int maxLength, 
FetchIsolation isolation, boolean minOneMessage);
+
+    private static LocalLog createMockLocalLog(TopicPartition tp) {
+        LocalLog localLog = mock(LocalLog.class);
+        when(localLog.scheduler()).thenReturn(mock(Scheduler.class));
+        when(localLog.segments()).thenReturn(mock(LogSegments.class));
+        when(localLog.topicPartition()).thenReturn(tp);
+        return localLog;
+    }
+}


Reply via email to