This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new eefee6d58d8 KAFKA-19427 Allow the coordinator to grow its buffer
dynamically (#20040)
eefee6d58d8 is described below
commit eefee6d58d8bc426ab54d4bc05d49ddb5566c025
Author: Ming-Yen Chung <[email protected]>
AuthorDate: Wed Jul 16 22:06:33 2025 +0800
KAFKA-19427 Allow the coordinator to grow its buffer dynamically (#20040)
* Coordinator starts with a smaller buffer, which can grow as needed.
* In freeCurrentBatch, release the appropriate buffer:
* The Coordinator recycles the expanded buffer
(`currentBatch.builder.buffer()`), not `currentBatch.buffer`, because
`MemoryBuilder` may allocate a new `ByteBuffer` if the existing one
isn't large enough.
* There are two cases that buffer may exceeds `maxMessageSize` 1.
If there's a single record whose size exceeds `maxMessageSize` (which,
so far, is derived from `max.message.bytes`) and the write is in
`non-atomic` mode, it's still possible for the buffer to grow beyond
`maxMessageSize`. In this case, the Coordinator should revert to using a
smaller buffer afterward. 2. Coordinator do not recycles the buffer
that larger than `maxMessageSize`. If the user dynamically reduces
`maxMessageSize` to a value even smaller than `INITIAL_BUFFER_SIZE`, the
Coordinator should avoid recycling any buffer larger than
`maxMessageSize` so that Coordinator can allocate the smaller buffer in
the next round.
* Add tests to verify the above scenarios.
Reviewers: David Jacot <[email protected]>, Sean Quah
<[email protected]>, Ken Huang <[email protected]>, PoAn Yang
<[email protected]>, TaiJuWu <[email protected]>, Jhen-Yung Hsu
<[email protected]>, Chia-Ping Tsai <[email protected]>
---
.../common/runtime/CoordinatorRuntime.java | 17 +-
.../common/runtime/CoordinatorRuntimeTest.java | 215 ++++++++++++++++++++-
2 files changed, 222 insertions(+), 10 deletions(-)
diff --git
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
index 1e9724a57aa..e1e80476cf8 100644
---
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
+++
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
@@ -70,6 +70,7 @@ import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.stream.Collectors;
+import static java.lang.Math.min;
import static
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime.CoordinatorWriteEvent.NOT_QUEUED;
/**
@@ -758,8 +759,14 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
// Cancel the linger timeout.
currentBatch.lingerTimeoutTask.ifPresent(TimerTask::cancel);
- // Release the buffer.
- bufferSupplier.release(currentBatch.buffer);
+ // Release the buffer only if it is not larger than the
maxBatchSize.
+ int maxBatchSize = partitionWriter.config(tp).maxMessageSize();
+
+ if (currentBatch.builder.buffer().capacity() <= maxBatchSize) {
+ bufferSupplier.release(currentBatch.builder.buffer());
+ } else if (currentBatch.buffer.capacity() <= maxBatchSize) {
+ bufferSupplier.release(currentBatch.buffer);
+ }
currentBatch = null;
}
@@ -859,7 +866,7 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
LogConfig logConfig = partitionWriter.config(tp);
int maxBatchSize = logConfig.maxMessageSize();
long prevLastWrittenOffset = coordinator.lastWrittenOffset();
- ByteBuffer buffer = bufferSupplier.get(maxBatchSize);
+ ByteBuffer buffer =
bufferSupplier.get(min(INITIAL_BUFFER_SIZE, maxBatchSize));
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(
buffer,
@@ -1888,9 +1895,9 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
}
/**
- * 16KB. Used for initial buffer size for write operations.
+ * 512KB. Used for initial buffer size for write operations.
*/
- static final int MIN_BUFFER_SIZE = 16384;
+ static final int INITIAL_BUFFER_SIZE = 512 * 1024;
/**
* The log prefix.
diff --git
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
index 9e4e6f7bb9b..9198e207e4b 100644
---
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
+++
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.coordinator.common.runtime;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.errors.NotCoordinatorException;
import org.apache.kafka.common.errors.NotEnoughReplicasException;
import org.apache.kafka.common.errors.RecordTooLargeException;
@@ -65,6 +66,7 @@ import java.util.Comparator;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.OptionalInt;
import java.util.Set;
@@ -84,7 +86,7 @@ import static
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime.Coo
import static
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime.CoordinatorState.INITIAL;
import static
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime.CoordinatorState.LOADING;
import static
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime.HighWatermarkListener.NO_OFFSET;
-import static
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime.MIN_BUFFER_SIZE;
+import static
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime.INITIAL_BUFFER_SIZE;
import static org.apache.kafka.test.TestUtils.assertFutureThrows;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -3486,11 +3488,11 @@ public class CoordinatorRuntimeTest {
assertEquals(List.of(0L),
ctx.coordinator.snapshotRegistry().epochsList());
int maxBatchSize = writer.config(TP).maxMessageSize();
- assertTrue(maxBatchSize > MIN_BUFFER_SIZE);
+ assertTrue(maxBatchSize > INITIAL_BUFFER_SIZE);
- // Generate enough records to create a batch that has 16KB < batchSize
< maxBatchSize
+ // Generate enough records to create a batch that has
INITIAL_BUFFER_SIZE < batchSize < maxBatchSize
List<String> records = new ArrayList<>();
- for (int i = 0; i < 3000; i++) {
+ for (int i = 0; i < 50000; i++) {
records.add("record-" + i);
}
@@ -3504,7 +3506,210 @@ public class CoordinatorRuntimeTest {
assertFalse(write1.isCompletedExceptionally());
int batchSize = writer.entries(TP).get(0).sizeInBytes();
- assertTrue(batchSize > MIN_BUFFER_SIZE && batchSize < maxBatchSize);
+ assertTrue(batchSize > INITIAL_BUFFER_SIZE && batchSize <
maxBatchSize);
+ }
+
+ @Test
+ public void testCoordinatorDoNotRetainBufferLargeThanMaxMessageSize() {
+ MockTimer timer = new MockTimer();
+ InMemoryPartitionWriter mockWriter = new
InMemoryPartitionWriter(false) {
+ @Override
+ public LogConfig config(TopicPartition tp) {
+ return new LogConfig(Map.of(
+ TopicConfig.MAX_MESSAGE_BYTES_CONFIG, String.valueOf(1024
* 1024) // 1MB
+ ));
+ }
+ };
+ StringSerializer serializer = new StringSerializer();
+
+ CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+ new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+ .withTime(timer.time())
+ .withTimer(timer)
+ .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
+ .withLoader(new MockCoordinatorLoader())
+ .withEventProcessor(new DirectEventProcessor())
+ .withPartitionWriter(mockWriter)
+ .withCoordinatorShardBuilderSupplier(new
MockCoordinatorShardBuilderSupplier())
+
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
+ .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
+ .withSerializer(serializer)
+ .withExecutorService(mock(ExecutorService.class))
+ .build();
+
+ // Schedule the loading.
+ runtime.scheduleLoadOperation(TP, 10);
+
+ // Verify the initial state.
+ CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext
ctx = runtime.contextOrThrow(TP);
+ assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+ assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+ assertEquals(List.of(0L),
ctx.coordinator.snapshotRegistry().epochsList());
+
+ // Generate a record larger than the maxBatchSize.
+ List<String> largeRecords = List.of("A".repeat(100 * 1024 * 1024));
+
+ // Write #1.
+ CompletableFuture<String> write1 =
runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT,
+ state -> new CoordinatorResult<>(largeRecords, "response1", null,
true, false)
+ );
+
+ // Verify that the write has not completed exceptionally.
+ // This will catch any exceptions thrown including
RecordTooLargeException.
+ assertFalse(write1.isCompletedExceptionally());
+
+ // Verify that the next buffer retrieved from the bufferSupplier is
the initial small one, not the large buffer.
+ assertEquals(INITIAL_BUFFER_SIZE,
ctx.bufferSupplier.get(1).capacity());
+ }
+
+ @Test
+ public void
testCoordinatorRetainExpandedBufferLessOrEqualToMaxMessageSize() {
+ MockTimer timer = new MockTimer();
+ InMemoryPartitionWriter mockWriter = new
InMemoryPartitionWriter(false) {
+ @Override
+ public LogConfig config(TopicPartition tp) {
+ return new LogConfig(Map.of(
+ TopicConfig.MAX_MESSAGE_BYTES_CONFIG, String.valueOf(1024
* 1024 * 1024) // 1GB
+ ));
+ }
+ };
+ StringSerializer serializer = new StringSerializer();
+
+ CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+ new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+ .withTime(timer.time())
+ .withTimer(timer)
+ .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
+ .withLoader(new MockCoordinatorLoader())
+ .withEventProcessor(new DirectEventProcessor())
+ .withPartitionWriter(mockWriter)
+ .withCoordinatorShardBuilderSupplier(new
MockCoordinatorShardBuilderSupplier())
+
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
+ .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
+ .withSerializer(serializer)
+ .withExecutorService(mock(ExecutorService.class))
+ .build();
+
+ // Schedule the loading.
+ runtime.scheduleLoadOperation(TP, 10);
+
+ // Verify the initial state.
+ CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext
ctx = runtime.contextOrThrow(TP);
+ assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+ assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+ assertEquals(List.of(0L),
ctx.coordinator.snapshotRegistry().epochsList());
+
+ // Generate enough records to create a batch that has
INITIAL_BUFFER_SIZE < batchSize < maxBatchSize
+ List<String> records = new ArrayList<>();
+ for (int i = 0; i < 1000000; i++) {
+ records.add("record-" + i);
+ }
+
+ // Write #1.
+ CompletableFuture<String> write1 =
runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT,
+ state -> new CoordinatorResult<>(records, "response1")
+ );
+
+ // Verify that the write has not completed exceptionally.
+ // This will catch any exceptions thrown including
RecordTooLargeException.
+ assertFalse(write1.isCompletedExceptionally());
+
+ int batchSize = mockWriter.entries(TP).get(0).sizeInBytes();
+ int maxBatchSize = mockWriter.config(TP).maxMessageSize();
+ assertTrue(INITIAL_BUFFER_SIZE < batchSize && batchSize <=
maxBatchSize);
+
+ // Verify that the next buffer retrieved from the bufferSupplier is
the expanded buffer.
+ assertTrue(ctx.bufferSupplier.get(1).capacity() > INITIAL_BUFFER_SIZE);
+ }
+
+ @Test
+ public void
testBufferShrinkWhenMaxMessageSizeReducedBelowInitialBufferSize() {
+ MockTimer timer = new MockTimer();
+ var mockWriter = new InMemoryPartitionWriter(false) {
+ private LogConfig config = new LogConfig(Map.of(
+ TopicConfig.MAX_MESSAGE_BYTES_CONFIG, String.valueOf(1024 *
1024) // 1MB
+ ));
+
+ @Override
+ public LogConfig config(TopicPartition tp) {
+ return config;
+ }
+
+ public void updateConfig(LogConfig newConfig) {
+ this.config = newConfig;
+ }
+ };
+ StringSerializer serializer = new StringSerializer();
+
+ CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+ new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+ .withTime(timer.time())
+ .withTimer(timer)
+ .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
+ .withLoader(new MockCoordinatorLoader())
+ .withEventProcessor(new DirectEventProcessor())
+ .withPartitionWriter(mockWriter)
+ .withCoordinatorShardBuilderSupplier(new
MockCoordinatorShardBuilderSupplier())
+
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
+ .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
+ .withSerializer(serializer)
+ .withExecutorService(mock(ExecutorService.class))
+ .build();
+
+ // Schedule the loading.
+ runtime.scheduleLoadOperation(TP, 10);
+
+ // Verify the initial state.
+ CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext
ctx = runtime.contextOrThrow(TP);
+ assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+ assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+ assertEquals(List.of(0L),
ctx.coordinator.snapshotRegistry().epochsList());
+
+ List<String> records = new ArrayList<>();
+ for (int i = 0; i < 1000; i++) {
+ records.add("record-" + i);
+ }
+
+ // Write #1.
+ CompletableFuture<String> write1 =
runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT,
+ state -> new CoordinatorResult<>(records, "response1")
+ );
+
+ // Verify that the write has not completed exceptionally.
+ // This will catch any exceptions thrown including
RecordTooLargeException.
+ assertFalse(write1.isCompletedExceptionally());
+
+ int batchSize = mockWriter.entries(TP).get(0).sizeInBytes();
+ int maxBatchSize = mockWriter.config(TP).maxMessageSize();
+ assertTrue(batchSize <= INITIAL_BUFFER_SIZE && INITIAL_BUFFER_SIZE <=
maxBatchSize);
+
+ ByteBuffer cachedBuffer = ctx.bufferSupplier.get(1);
+ assertEquals(INITIAL_BUFFER_SIZE, cachedBuffer.capacity());
+ // ctx.bufferSupplier.get(1); will clear cachedBuffer in
bufferSupplier. Use release to put it back to bufferSupplier
+ ctx.bufferSupplier.release(cachedBuffer);
+
+ // Reduce max message size below initial buffer size.
+ mockWriter.updateConfig(new LogConfig(
+ Map.of(TopicConfig.MAX_MESSAGE_BYTES_CONFIG,
String.valueOf(INITIAL_BUFFER_SIZE - 66))));
+ assertEquals(INITIAL_BUFFER_SIZE - 66,
mockWriter.config(TP).maxMessageSize());
+
+ // Write #2.
+ CompletableFuture<String> write2 =
runtime.scheduleWriteOperation("write#2", TP, DEFAULT_WRITE_TIMEOUT,
+ state -> new CoordinatorResult<>(records, "response2")
+ );
+ assertFalse(write2.isCompletedExceptionally());
+
+ // Verify that there is no cached buffer since the cached buffer size
is greater than new maxMessageSize.
+ assertEquals(1, ctx.bufferSupplier.get(1).capacity());
+
+ // Write #3.
+ CompletableFuture<String> write3 =
runtime.scheduleWriteOperation("write#3", TP, DEFAULT_WRITE_TIMEOUT,
+ state -> new CoordinatorResult<>(records, "response3")
+ );
+ assertFalse(write3.isCompletedExceptionally());
+
+ // Verify that the cached buffer size is equals to new maxMessageSize
that less than INITIAL_BUFFER_SIZE.
+ assertEquals(mockWriter.config(TP).maxMessageSize(),
ctx.bufferSupplier.get(1).capacity());
}
@Test