This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new 8248d1d2bb6 KAFKA-19732, KAFKA-19716: Clear out coordinator snapshots
periodically while loading (#20590)
8248d1d2bb6 is described below
commit 8248d1d2bb6aed8422c752ad1c6721e5691fd6c3
Author: Sean Quah <[email protected]>
AuthorDate: Fri Sep 26 10:01:05 2025 +0100
KAFKA-19732, KAFKA-19716: Clear out coordinator snapshots periodically
while loading (#20590)
When nested Timeline collections are created and discarded while loading
a coordinator partition, references to them accumulate in the current
snapshot. Allow the GC to reclaim them by starting a new snapshot and
discarding previous snapshots every 16,384 records.
Small intervals degrade loading times for non-transactional offset
commit workloads while large intervals degrade loading times for
transactional workloads. A default of 16,384 was chosen as a compromise.
Cherry pick of d067c6c04089a3d24e1f72e6cb1b10b0d85f76da.
Reviewers: David Jacot <[email protected]>
---
.../coordinator/group/CoordinatorLoaderImpl.scala | 36 +++++--
.../src/main/scala/kafka/server/BrokerServer.scala | 6 +-
.../group/CoordinatorLoaderImplTest.scala | 109 ++++++++++++++++++---
3 files changed, 130 insertions(+), 21 deletions(-)
diff --git
a/core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala
b/core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala
index 70536abecc0..86d8748462b 100644
--- a/core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala
+++ b/core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala
@@ -34,20 +34,38 @@ import java.util.concurrent.CompletableFuture
import java.util.concurrent.atomic.AtomicBoolean
import scala.jdk.CollectionConverters._
+object CoordinatorLoaderImpl {
+ /**
+ * The interval between updating the last committed offset during loading,
in offsets. Smaller
+ * values commit more often at the expense of loading times when the
workload is simple and does
+ * not create collections that need to participate in {@link
CoordinatorPlayback} snapshotting.
+ * Larger values commit less often and allow more temporary data to
accumulate before the next
+ * commit when the workload creates many temporary collections that need to
be snapshotted.
+ *
+ * The value of 16,384 was chosen as a trade-off between the performance of
these two workloads.
+ *
+ * When changing this value, please run the
GroupCoordinatorShardLoadingBenchmark to evaluate
+ * the relative change in performance.
+ */
+ val DEFAULT_COMMIT_INTERVAL_OFFSETS = 16384L
+}
+
/**
* Coordinator loader which reads records from a partition and replays them
* to a group coordinator.
*
- * @param replicaManager The replica manager.
- * @param deserializer The deserializer to use.
- * @param loadBufferSize The load buffer size.
+ * @param replicaManager The replica manager.
+ * @param deserializer The deserializer to use.
+ * @param loadBufferSize The load buffer size.
+ * @param commitIntervalOffsets The interval between updating the last
committed offset during loading, in offsets.
* @tparam T The record type.
*/
class CoordinatorLoaderImpl[T](
time: Time,
replicaManager: ReplicaManager,
deserializer: Deserializer[T],
- loadBufferSize: Int
+ loadBufferSize: Int,
+ commitIntervalOffsets: Long =
CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
) extends CoordinatorLoader[T] with Logging {
private val isRunning = new AtomicBoolean(true)
private val scheduler = new KafkaScheduler(1)
@@ -99,7 +117,7 @@ class CoordinatorLoaderImpl[T](
// the log end offset but the log is empty. This could happen with
compacted topics.
var readAtLeastOneRecord = true
- var previousHighWatermark = -1L
+ var lastCommittedOffset = -1L
var numRecords = 0L
var numBytes = 0L
while (currentOffset < logEndOffset && readAtLeastOneRecord &&
isRunning.get) {
@@ -213,10 +231,14 @@ class CoordinatorLoaderImpl[T](
if (currentOffset >= currentHighWatermark) {
coordinator.updateLastWrittenOffset(currentOffset)
- if (currentHighWatermark > previousHighWatermark) {
+ if (currentHighWatermark > lastCommittedOffset) {
coordinator.updateLastCommittedOffset(currentHighWatermark)
- previousHighWatermark = currentHighWatermark
+ lastCommittedOffset = currentHighWatermark
}
+ } else if (currentOffset - lastCommittedOffset >=
commitIntervalOffsets) {
+ coordinator.updateLastWrittenOffset(currentOffset)
+ coordinator.updateLastCommittedOffset(currentOffset)
+ lastCommittedOffset = currentOffset
}
}
numBytes = numBytes + memoryRecords.sizeInBytes()
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 85612c58d4a..f4c43adb772 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -634,7 +634,8 @@ class BrokerServer(
time,
replicaManager,
serde,
- config.groupCoordinatorConfig.offsetsLoadBufferSize
+ config.groupCoordinatorConfig.offsetsLoadBufferSize,
+ CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
)
val writer = new CoordinatorPartitionWriter(
replicaManager
@@ -673,7 +674,8 @@ class BrokerServer(
time,
replicaManager,
serde,
- config.shareCoordinatorConfig.shareCoordinatorLoadBufferSize()
+ config.shareCoordinatorConfig.shareCoordinatorLoadBufferSize(),
+ CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
)
val writer = new CoordinatorPartitionWriter(
replicaManager
diff --git
a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala
b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala
index dc4bbc830cd..585c18e2eca 100644
---
a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala
@@ -63,7 +63,8 @@ class CoordinatorLoaderImplTest {
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
- loadBufferSize = 1000
+ loadBufferSize = 1000,
+ commitIntervalOffsets =
CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
)) { loader =>
when(replicaManager.getLog(tp)).thenReturn(None)
@@ -83,7 +84,8 @@ class CoordinatorLoaderImplTest {
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
- loadBufferSize = 1000
+ loadBufferSize = 1000,
+ commitIntervalOffsets =
CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
)) { loader =>
loader.close()
@@ -104,7 +106,8 @@ class CoordinatorLoaderImplTest {
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
- loadBufferSize = 1000
+ loadBufferSize = 1000,
+ commitIntervalOffsets =
CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
)) { loader =>
when(replicaManager.getLog(tp)).thenReturn(Some(log))
when(log.logStartOffset).thenReturn(0L)
@@ -207,7 +210,8 @@ class CoordinatorLoaderImplTest {
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
- loadBufferSize = 1000
+ loadBufferSize = 1000,
+ commitIntervalOffsets =
CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
)) { loader =>
when(replicaManager.getLog(tp)).thenReturn(Some(log))
when(log.logStartOffset).thenReturn(0L)
@@ -250,7 +254,8 @@ class CoordinatorLoaderImplTest {
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
- loadBufferSize = 1000
+ loadBufferSize = 1000,
+ commitIntervalOffsets =
CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
)) { loader =>
when(replicaManager.getLog(tp)).thenReturn(Some(log))
when(log.logStartOffset).thenReturn(0L)
@@ -290,7 +295,8 @@ class CoordinatorLoaderImplTest {
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
- loadBufferSize = 1000
+ loadBufferSize = 1000,
+ commitIntervalOffsets =
CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
)) { loader =>
when(replicaManager.getLog(tp)).thenReturn(Some(log))
when(log.logStartOffset).thenReturn(0L)
@@ -332,7 +338,8 @@ class CoordinatorLoaderImplTest {
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
- loadBufferSize = 1000
+ loadBufferSize = 1000,
+ commitIntervalOffsets =
CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
)) { loader =>
when(replicaManager.getLog(tp)).thenReturn(Some(log))
when(log.logStartOffset).thenReturn(0L)
@@ -364,7 +371,8 @@ class CoordinatorLoaderImplTest {
time,
replicaManager = replicaManager,
deserializer = serde,
- loadBufferSize = 1000
+ loadBufferSize = 1000,
+ commitIntervalOffsets =
CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
)) { loader =>
val startTimeMs = time.milliseconds()
when(replicaManager.getLog(tp)).thenReturn(Some(log))
@@ -419,7 +427,8 @@ class CoordinatorLoaderImplTest {
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
- loadBufferSize = 1000
+ loadBufferSize = 1000,
+ commitIntervalOffsets =
CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
)) { loader =>
when(replicaManager.getLog(tp)).thenReturn(Some(log))
when(log.logStartOffset).thenReturn(0L)
@@ -494,7 +503,8 @@ class CoordinatorLoaderImplTest {
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
- loadBufferSize = 1000
+ loadBufferSize = 1000,
+ commitIntervalOffsets =
CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
)) { loader =>
when(replicaManager.getLog(tp)).thenReturn(Some(log))
when(log.logStartOffset).thenReturn(0L)
@@ -520,7 +530,8 @@ class CoordinatorLoaderImplTest {
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
- loadBufferSize = 1000
+ loadBufferSize = 1000,
+ commitIntervalOffsets =
CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
)) { loader =>
when(replicaManager.getLog(tp)).thenReturn(Some(log))
when(log.logStartOffset).thenReturn(0L)
@@ -596,7 +607,8 @@ class CoordinatorLoaderImplTest {
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
- loadBufferSize = 1000
+ loadBufferSize = 1000,
+ commitIntervalOffsets =
CoordinatorLoaderImpl.DEFAULT_COMMIT_INTERVAL_OFFSETS
)) { loader =>
when(replicaManager.getLog(tp)).thenReturn(Some(log))
when(log.logStartOffset).thenReturn(0L)
@@ -632,6 +644,79 @@ class CoordinatorLoaderImplTest {
}
}
+ @Test
+ def testUpdateLastWrittenOffsetCommitInterval(): Unit = {
+ val tp = new TopicPartition("foo", 0)
+ val replicaManager = mock(classOf[ReplicaManager])
+ val serde = new StringKeyValueDeserializer
+ val log = mock(classOf[UnifiedLog])
+ val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
+
+ Using.resource(new CoordinatorLoaderImpl[(String, String)](
+ time = Time.SYSTEM,
+ replicaManager = replicaManager,
+ deserializer = serde,
+ loadBufferSize = 1000,
+ commitIntervalOffsets = 2L
+ )) { loader =>
+ when(replicaManager.getLog(tp)).thenReturn(Some(log))
+ when(log.logStartOffset).thenReturn(0L)
+ when(log.highWatermark).thenReturn(7L)
+ when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(7L))
+
+ val readResult1 = logReadResult(startOffset = 0, records = Seq(
+ new SimpleRecord("k1".getBytes, "v1".getBytes),
+ new SimpleRecord("k2".getBytes, "v2".getBytes)
+ ))
+
+ when(log.read(0L, 1000, FetchIsolation.LOG_END, true
+ )).thenReturn(readResult1)
+
+ val readResult2 = logReadResult(startOffset = 2, records = Seq(
+ new SimpleRecord("k3".getBytes, "v3".getBytes),
+ new SimpleRecord("k4".getBytes, "v4".getBytes),
+ new SimpleRecord("k5".getBytes, "v5".getBytes)
+ ))
+
+ when(log.read(2L, 1000, FetchIsolation.LOG_END, true
+ )).thenReturn(readResult2)
+
+ val readResult3 = logReadResult(startOffset = 5, records = Seq(
+ new SimpleRecord("k6".getBytes, "v6".getBytes)
+ ))
+
+ when(log.read(5L, 1000, FetchIsolation.LOG_END, true
+ )).thenReturn(readResult3)
+
+ val readResult4 = logReadResult(startOffset = 6, records = Seq(
+ new SimpleRecord("k7".getBytes, "v7".getBytes)
+ ))
+
+ when(log.read(6L, 1000, FetchIsolation.LOG_END, true
+ )).thenReturn(readResult4)
+
+ assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS))
+
+ verify(coordinator).replay(0L, RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, ("k1", "v1"))
+ verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, ("k2", "v2"))
+ verify(coordinator).replay(2L, RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, ("k3", "v3"))
+ verify(coordinator).replay(3L, RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, ("k4", "v4"))
+ verify(coordinator).replay(4L, RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, ("k5", "v5"))
+ verify(coordinator).replay(5L, RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, ("k6", "v6"))
+ verify(coordinator).replay(6L, RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, ("k7", "v7"))
+ verify(coordinator, times(0)).updateLastWrittenOffset(0L)
+ verify(coordinator, times(1)).updateLastWrittenOffset(2L)
+ verify(coordinator, times(1)).updateLastWrittenOffset(5L)
+ verify(coordinator, times(0)).updateLastWrittenOffset(6L)
+ verify(coordinator, times(1)).updateLastWrittenOffset(7L)
+ verify(coordinator, times(0)).updateLastCommittedOffset(0L)
+ verify(coordinator, times(1)).updateLastCommittedOffset(2L)
+ verify(coordinator, times(1)).updateLastCommittedOffset(5L)
+ verify(coordinator, times(0)).updateLastCommittedOffset(6L)
+ verify(coordinator, times(1)).updateLastCommittedOffset(7L)
+ }
+ }
+
private def logReadResult(
startOffset: Long,
producerId: Long = RecordBatch.NO_PRODUCER_ID,