This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.1 by this push:
     new b14efd87508 KAFKA-19732, KAFKA-19716: Clear out coordinator snapshots 
periodically while loading (#20591)
b14efd87508 is described below

commit b14efd87508519fc973ba9ea811ebb1b679ff36b
Author: Sean Quah <[email protected]>
AuthorDate: Fri Sep 26 10:01:16 2025 +0100

    KAFKA-19732, KAFKA-19716: Clear out coordinator snapshots periodically 
while loading (#20591)
    
    When nested Timeline collections are created and discarded while loading
    a coordinator partition, references to them accumulate in the current
    snapshot. Allow the GC to reclaim them by starting a new snapshot and
    discarding previous snapshots every 16,384 records.
    
    Small intervals degrade loading times for non-transactional offset
    commit workloads while large intervals degrade loading times for
    transactional workloads. A default of 16,384 was chosen as a compromise.
    
    Cherry pick of d067c6c04089a3d24e1f72e6cb1b10b0d85f76da.
    
    Reviewers: David Jacot <[email protected]>
---
 .../coordinator/group/CoordinatorLoaderImpl.scala  |  36 +++++--
 .../src/main/scala/kafka/server/BrokerServer.scala |   6 +-
 .../group/CoordinatorLoaderImplTest.scala          | 109 ++++++++++++++++++---
 3 files changed, 130 insertions(+), 21 deletions(-)

diff --git 
a/core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala 
b/core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala
index 44a9b956584..170d7a49061 100644
--- a/core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala
+++ b/core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala
@@ -34,20 +34,38 @@ import java.util.concurrent.CompletableFuture
 import java.util.concurrent.atomic.AtomicBoolean
 import scala.jdk.CollectionConverters._
 
+object CoordinatorLoaderImpl {
+  /**
+   * The interval between updating the last committed offset during loading, 
in offsets. Smaller
+   * values commit more often at the expense of loading times when the 
workload is simple and does
+   * not create collections that need to participate in {@link 
CoordinatorPlayback} snapshotting.
+   * Larger values commit less often and allow more temporary data to 
accumulate before the next
+   * commit when the workload creates many temporary collections that need to 
be snapshotted.
+   *
+   * The value of 16,384 was chosen as a trade-off between the performance of 
these two workloads.
+   *
+   * When changing this value, please run the 
GroupCoordinatorShardLoadingBenchmark to evaluate
+   * the relative change in performance.
+   */
+  val DEFAULT_COMMIT_INTERVAL_OFFSETS = 16384L
+}
+
 /**
  * Coordinator loader which reads records from a partition and replays them
  * to a group coordinator.
  *
- * @param replicaManager  The replica manager.
- * @param deserializer    The deserializer to use.
- * @param loadBufferSize  The load buffer size.
+ * @param replicaManager        The replica manager.
+ * @param deserializer          The deserializer to use.
+ * @param loadBufferSize        The load buffer size.
+ * @param commitIntervalOffsets The interval between updating the last 
committed offset during loading, in offsets.
  * @tparam T The record type.
  */
 class CoordinatorLoaderImpl[T](
   time: Time,
   replicaManager: ReplicaManager,
   deserializer: Deserializer[T],
-  loadBufferSize: Int
+  loadBufferSize: Int,
+  commitIntervalOffsets: Long = 
CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
 ) extends CoordinatorLoader[T] with Logging {
   private val isRunning = new AtomicBoolean(true)
   private val scheduler = new KafkaScheduler(1)
@@ -99,7 +117,7 @@ class CoordinatorLoaderImpl[T](
           // the log end offset but the log is empty. This could happen with 
compacted topics.
           var readAtLeastOneRecord = true
 
-          var previousHighWatermark = -1L
+          var lastCommittedOffset = -1L
           var numRecords = 0L
           var numBytes = 0L
           while (currentOffset < logEndOffset && readAtLeastOneRecord && 
isRunning.get) {
@@ -208,10 +226,14 @@ class CoordinatorLoaderImpl[T](
               if (currentOffset >= currentHighWatermark) {
                 coordinator.updateLastWrittenOffset(currentOffset)
 
-                if (currentHighWatermark > previousHighWatermark) {
+                if (currentHighWatermark > lastCommittedOffset) {
                   coordinator.updateLastCommittedOffset(currentHighWatermark)
-                  previousHighWatermark = currentHighWatermark
+                  lastCommittedOffset = currentHighWatermark
                 }
+              } else if (currentOffset - lastCommittedOffset >= 
commitIntervalOffsets) {
+                coordinator.updateLastWrittenOffset(currentOffset)
+                coordinator.updateLastCommittedOffset(currentOffset)
+                lastCommittedOffset = currentOffset
               }
             }
             numBytes = numBytes + memoryRecords.sizeInBytes()
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index c56e178ca2e..06a1ab14b2e 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -620,7 +620,8 @@ class BrokerServer(
       time,
       replicaManager,
       serde,
-      config.groupCoordinatorConfig.offsetsLoadBufferSize
+      config.groupCoordinatorConfig.offsetsLoadBufferSize,
+      CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
     )
     val writer = new CoordinatorPartitionWriter(
       replicaManager
@@ -650,7 +651,8 @@ class BrokerServer(
       time,
       replicaManager,
       serde,
-      config.shareCoordinatorConfig.shareCoordinatorLoadBufferSize()
+      config.shareCoordinatorConfig.shareCoordinatorLoadBufferSize(),
+      CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
     )
     val writer = new CoordinatorPartitionWriter(
       replicaManager
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala
index a0ca3f23c48..a9e519de13a 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala
@@ -62,7 +62,8 @@ class CoordinatorLoaderImplTest {
       time = Time.SYSTEM,
       replicaManager = replicaManager,
       deserializer = serde,
-      loadBufferSize = 1000
+      loadBufferSize = 1000,
+      commitIntervalOffsets = 
CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
     )) { loader =>
       when(replicaManager.getLog(tp)).thenReturn(None)
 
@@ -82,7 +83,8 @@ class CoordinatorLoaderImplTest {
       time = Time.SYSTEM,
       replicaManager = replicaManager,
       deserializer = serde,
-      loadBufferSize = 1000
+      loadBufferSize = 1000,
+      commitIntervalOffsets = 
CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
     )) { loader =>
       loader.close()
 
@@ -103,7 +105,8 @@ class CoordinatorLoaderImplTest {
       time = Time.SYSTEM,
       replicaManager = replicaManager,
       deserializer = serde,
-      loadBufferSize = 1000
+      loadBufferSize = 1000,
+      commitIntervalOffsets = 
CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
     )) { loader =>
       when(replicaManager.getLog(tp)).thenReturn(Some(log))
       when(log.logStartOffset).thenReturn(0L)
@@ -186,7 +189,8 @@ class CoordinatorLoaderImplTest {
       time = Time.SYSTEM,
       replicaManager = replicaManager,
       deserializer = serde,
-      loadBufferSize = 1000
+      loadBufferSize = 1000,
+      commitIntervalOffsets = 
CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
     )) { loader =>
       when(replicaManager.getLog(tp)).thenReturn(Some(log))
       when(log.logStartOffset).thenReturn(0L)
@@ -229,7 +233,8 @@ class CoordinatorLoaderImplTest {
       time = Time.SYSTEM,
       replicaManager = replicaManager,
       deserializer = serde,
-      loadBufferSize = 1000
+      loadBufferSize = 1000,
+      commitIntervalOffsets = 
CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
     )) { loader =>
       when(replicaManager.getLog(tp)).thenReturn(Some(log))
       when(log.logStartOffset).thenReturn(0L)
@@ -265,7 +270,8 @@ class CoordinatorLoaderImplTest {
       time = Time.SYSTEM,
       replicaManager = replicaManager,
       deserializer = serde,
-      loadBufferSize = 1000
+      loadBufferSize = 1000,
+      commitIntervalOffsets = 
CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
     )) { loader =>
       when(replicaManager.getLog(tp)).thenReturn(Some(log))
       when(log.logStartOffset).thenReturn(0L)
@@ -303,7 +309,8 @@ class CoordinatorLoaderImplTest {
       time = Time.SYSTEM,
       replicaManager = replicaManager,
       deserializer = serde,
-      loadBufferSize = 1000
+      loadBufferSize = 1000,
+      commitIntervalOffsets = 
CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
     )) { loader =>
       when(replicaManager.getLog(tp)).thenReturn(Some(log))
       when(log.logStartOffset).thenReturn(0L)
@@ -331,7 +338,8 @@ class CoordinatorLoaderImplTest {
       time,
       replicaManager = replicaManager,
       deserializer = serde,
-      loadBufferSize = 1000
+      loadBufferSize = 1000,
+      commitIntervalOffsets = 
CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
     )) { loader =>
       val startTimeMs = time.milliseconds()
       when(replicaManager.getLog(tp)).thenReturn(Some(log))
@@ -378,7 +386,8 @@ class CoordinatorLoaderImplTest {
       time = Time.SYSTEM,
       replicaManager = replicaManager,
       deserializer = serde,
-      loadBufferSize = 1000
+      loadBufferSize = 1000,
+      commitIntervalOffsets = 
CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
     )) { loader =>
       when(replicaManager.getLog(tp)).thenReturn(Some(log))
       when(log.logStartOffset).thenReturn(0L)
@@ -441,7 +450,8 @@ class CoordinatorLoaderImplTest {
       time = Time.SYSTEM,
       replicaManager = replicaManager,
       deserializer = serde,
-      loadBufferSize = 1000
+      loadBufferSize = 1000,
+      commitIntervalOffsets = 
CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
     )) { loader =>
       when(replicaManager.getLog(tp)).thenReturn(Some(log))
       when(log.logStartOffset).thenReturn(0L)
@@ -467,7 +477,8 @@ class CoordinatorLoaderImplTest {
       time = Time.SYSTEM,
       replicaManager = replicaManager,
       deserializer = serde,
-      loadBufferSize = 1000
+      loadBufferSize = 1000,
+      commitIntervalOffsets = 
CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
     )) { loader =>
       when(replicaManager.getLog(tp)).thenReturn(Some(log))
       when(log.logStartOffset).thenReturn(0L)
@@ -531,7 +542,8 @@ class CoordinatorLoaderImplTest {
       time = Time.SYSTEM,
       replicaManager = replicaManager,
       deserializer = serde,
-      loadBufferSize = 1000
+      loadBufferSize = 1000,
+      commitIntervalOffsets = 
CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
     )) { loader =>
       when(replicaManager.getLog(tp)).thenReturn(Some(log))
       when(log.logStartOffset).thenReturn(0L)
@@ -559,6 +571,79 @@ class CoordinatorLoaderImplTest {
     }
   }
 
+  @Test
+  def testUpdateLastWrittenOffsetCommitInterval(): Unit = {
+    val tp = new TopicPartition("foo", 0)
+    val replicaManager = mock(classOf[ReplicaManager])
+    val serde = new StringKeyValueDeserializer
+    val log = mock(classOf[UnifiedLog])
+    val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
+
+    Using.resource(new CoordinatorLoaderImpl[(String, String)](
+      time = Time.SYSTEM,
+      replicaManager = replicaManager,
+      deserializer = serde,
+      loadBufferSize = 1000,
+      commitIntervalOffsets = 2L
+    )) { loader =>
+      when(replicaManager.getLog(tp)).thenReturn(Some(log))
+      when(log.logStartOffset).thenReturn(0L)
+      when(log.highWatermark).thenReturn(7L)
+      when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(7L))
+
+      val readResult1 = logReadResult(startOffset = 0, records = Seq(
+        new SimpleRecord("k1".getBytes, "v1".getBytes),
+        new SimpleRecord("k2".getBytes, "v2".getBytes)
+      ))
+
+      when(log.read(0L, 1000, FetchIsolation.LOG_END, true
+      )).thenReturn(readResult1)
+
+      val readResult2 = logReadResult(startOffset = 2, records = Seq(
+        new SimpleRecord("k3".getBytes, "v3".getBytes),
+        new SimpleRecord("k4".getBytes, "v4".getBytes),
+        new SimpleRecord("k5".getBytes, "v5".getBytes)
+      ))
+
+      when(log.read(2L, 1000, FetchIsolation.LOG_END, true
+      )).thenReturn(readResult2)
+
+      val readResult3 = logReadResult(startOffset = 5, records = Seq(
+        new SimpleRecord("k6".getBytes, "v6".getBytes)
+      ))
+
+      when(log.read(5L, 1000, FetchIsolation.LOG_END, true
+      )).thenReturn(readResult3)
+
+      val readResult4 = logReadResult(startOffset = 6, records = Seq(
+        new SimpleRecord("k7".getBytes, "v7".getBytes)
+      ))
+
+      when(log.read(6L, 1000, FetchIsolation.LOG_END, true
+      )).thenReturn(readResult4)
+
+      assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS))
+
+      verify(coordinator).replay(0L, RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, ("k1", "v1"))
+      verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, ("k2", "v2"))
+      verify(coordinator).replay(2L, RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, ("k3", "v3"))
+      verify(coordinator).replay(3L, RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, ("k4", "v4"))
+      verify(coordinator).replay(4L, RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, ("k5", "v5"))
+      verify(coordinator).replay(5L, RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, ("k6", "v6"))
+      verify(coordinator).replay(6L, RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, ("k7", "v7"))
+      verify(coordinator, times(0)).updateLastWrittenOffset(0L)
+      verify(coordinator, times(1)).updateLastWrittenOffset(2L)
+      verify(coordinator, times(1)).updateLastWrittenOffset(5L)
+      verify(coordinator, times(0)).updateLastWrittenOffset(6L)
+      verify(coordinator, times(1)).updateLastWrittenOffset(7L)
+      verify(coordinator, times(0)).updateLastCommittedOffset(0L)
+      verify(coordinator, times(1)).updateLastCommittedOffset(2L)
+      verify(coordinator, times(1)).updateLastCommittedOffset(5L)
+      verify(coordinator, times(0)).updateLastCommittedOffset(6L)
+      verify(coordinator, times(1)).updateLastCommittedOffset(7L)
+    }
+  }
+
   private def logReadResult(
     startOffset: Long,
     producerId: Long = RecordBatch.NO_PRODUCER_ID,

Reply via email to