This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new eefee6d58d8 KAFKA-19427 Allow the coordinator to grow its buffer 
dynamically (#20040)
eefee6d58d8 is described below

commit eefee6d58d8bc426ab54d4bc05d49ddb5566c025
Author: Ming-Yen Chung <[email protected]>
AuthorDate: Wed Jul 16 22:06:33 2025 +0800

    KAFKA-19427 Allow the coordinator to grow its buffer dynamically (#20040)
    
    * Coordinator starts with a smaller buffer, which can grow as needed.
    
    * In freeCurrentBatch, release the appropriate buffer:
      * The Coordinator recycles the expanded buffer
    (`currentBatch.builder.buffer()`), not `currentBatch.buffer`, because
    `MemoryBuilder` may allocate a new `ByteBuffer` if the existing one
    isn't large enough.
    
      * There are two cases that buffer may exceeds `maxMessageSize`      1.
    If there's a single record whose size exceeds `maxMessageSize` (which,
    so far, is derived from `max.message.bytes`) and the write is in
    `non-atomic` mode, it's still possible for the buffer to grow beyond
    `maxMessageSize`. In this case, the Coordinator should revert to using a
    smaller buffer afterward.      2. Coordinator do not recycles the buffer
    that larger than `maxMessageSize`. If the user dynamically reduces
    `maxMessageSize` to a value even smaller than `INITIAL_BUFFER_SIZE`, the
    Coordinator should avoid recycling any buffer larger than
    `maxMessageSize` so that Coordinator can allocate the smaller buffer in
    the next round.
    
    * Add tests to verify the above scenarios.
    
    Reviewers: David Jacot <[email protected]>, Sean Quah
    <[email protected]>, Ken Huang <[email protected]>, PoAn Yang
    <[email protected]>, TaiJuWu <[email protected]>, Jhen-Yung Hsu
    <[email protected]>, Chia-Ping Tsai <[email protected]>
---
 .../common/runtime/CoordinatorRuntime.java         |  17 +-
 .../common/runtime/CoordinatorRuntimeTest.java     | 215 ++++++++++++++++++++-
 2 files changed, 222 insertions(+), 10 deletions(-)

diff --git 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
index 1e9724a57aa..e1e80476cf8 100644
--- 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
+++ 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
@@ -70,6 +70,7 @@ import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
+import static java.lang.Math.min;
 import static 
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime.CoordinatorWriteEvent.NOT_QUEUED;
 
 /**
@@ -758,8 +759,14 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
             // Cancel the linger timeout.
             currentBatch.lingerTimeoutTask.ifPresent(TimerTask::cancel);
 
-            // Release the buffer.
-            bufferSupplier.release(currentBatch.buffer);
+            // Release the buffer only if it is not larger than the 
maxBatchSize.
+            int maxBatchSize = partitionWriter.config(tp).maxMessageSize();
+
+            if (currentBatch.builder.buffer().capacity() <= maxBatchSize) {
+                bufferSupplier.release(currentBatch.builder.buffer());
+            } else if (currentBatch.buffer.capacity() <= maxBatchSize) {
+                bufferSupplier.release(currentBatch.buffer);
+            }
 
             currentBatch = null;
         }
@@ -859,7 +866,7 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
                 LogConfig logConfig = partitionWriter.config(tp);
                 int maxBatchSize = logConfig.maxMessageSize();
                 long prevLastWrittenOffset = coordinator.lastWrittenOffset();
-                ByteBuffer buffer = bufferSupplier.get(maxBatchSize);
+                ByteBuffer buffer = 
bufferSupplier.get(min(INITIAL_BUFFER_SIZE, maxBatchSize));
 
                 MemoryRecordsBuilder builder = new MemoryRecordsBuilder(
                     buffer,
@@ -1888,9 +1895,9 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
     }
 
     /**
-     * 16KB. Used for initial buffer size for write operations.
+     * 512KB. Used for initial buffer size for write operations.
      */
-    static final int MIN_BUFFER_SIZE = 16384;
+    static final int INITIAL_BUFFER_SIZE = 512 * 1024;
 
     /**
      * The log prefix.
diff --git 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
index 9e4e6f7bb9b..9198e207e4b 100644
--- 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
+++ 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.coordinator.common.runtime;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.errors.NotCoordinatorException;
 import org.apache.kafka.common.errors.NotEnoughReplicasException;
 import org.apache.kafka.common.errors.RecordTooLargeException;
@@ -65,6 +66,7 @@ import java.util.Comparator;
 import java.util.Deque;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.OptionalInt;
 import java.util.Set;
@@ -84,7 +86,7 @@ import static 
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime.Coo
 import static 
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime.CoordinatorState.INITIAL;
 import static 
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime.CoordinatorState.LOADING;
 import static 
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime.HighWatermarkListener.NO_OFFSET;
-import static 
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime.MIN_BUFFER_SIZE;
+import static 
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime.INITIAL_BUFFER_SIZE;
 import static org.apache.kafka.test.TestUtils.assertFutureThrows;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -3486,11 +3488,11 @@ public class CoordinatorRuntimeTest {
         assertEquals(List.of(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
 
         int maxBatchSize = writer.config(TP).maxMessageSize();
-        assertTrue(maxBatchSize > MIN_BUFFER_SIZE);
+        assertTrue(maxBatchSize > INITIAL_BUFFER_SIZE);
 
-        // Generate enough records to create a batch that has 16KB < batchSize 
< maxBatchSize
+        // Generate enough records to create a batch that has 
INITIAL_BUFFER_SIZE < batchSize < maxBatchSize
         List<String> records = new ArrayList<>();
-        for (int i = 0; i < 3000; i++) {
+        for (int i = 0; i < 50000; i++) {
             records.add("record-" + i);
         }
 
@@ -3504,7 +3506,210 @@ public class CoordinatorRuntimeTest {
         assertFalse(write1.isCompletedExceptionally());
 
         int batchSize = writer.entries(TP).get(0).sizeInBytes();
-        assertTrue(batchSize > MIN_BUFFER_SIZE && batchSize < maxBatchSize);
+        assertTrue(batchSize > INITIAL_BUFFER_SIZE && batchSize < 
maxBatchSize);
+    }
+
+    @Test
+    public void testCoordinatorDoNotRetainBufferLargeThanMaxMessageSize() {
+        MockTimer timer = new MockTimer();
+        InMemoryPartitionWriter mockWriter = new 
InMemoryPartitionWriter(false) {
+            @Override
+            public LogConfig config(TopicPartition tp) {
+                return new LogConfig(Map.of(
+                    TopicConfig.MAX_MESSAGE_BYTES_CONFIG, String.valueOf(1024 
* 1024) // 1MB
+                ));
+            }
+        };
+        StringSerializer serializer = new StringSerializer();
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(mockWriter)
+                .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+                
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
+                .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
+                .withSerializer(serializer)
+                .withExecutorService(mock(ExecutorService.class))
+                .build();
+
+        // Schedule the loading.
+        runtime.scheduleLoadOperation(TP, 10);
+
+        // Verify the initial state.
+        CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(List.of(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+
+        // Generate a record larger than the maxBatchSize.
+        List<String> largeRecords = List.of("A".repeat(100 * 1024 * 1024));
+
+        // Write #1.
+        CompletableFuture<String> write1 = 
runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT,
+            state -> new CoordinatorResult<>(largeRecords, "response1", null, 
true, false)
+        );
+
+        // Verify that the write has not completed exceptionally.
+        // This will catch any exceptions thrown including 
RecordTooLargeException.
+        assertFalse(write1.isCompletedExceptionally());
+
+        // Verify that the next buffer retrieved from the bufferSupplier is 
the initial small one, not the large buffer.
+        assertEquals(INITIAL_BUFFER_SIZE, 
ctx.bufferSupplier.get(1).capacity());
+    }
+
+    @Test
+    public void 
testCoordinatorRetainExpandedBufferLessOrEqualToMaxMessageSize() {
+        MockTimer timer = new MockTimer();
+        InMemoryPartitionWriter mockWriter = new 
InMemoryPartitionWriter(false) {
+            @Override
+            public LogConfig config(TopicPartition tp) {
+                return new LogConfig(Map.of(
+                    TopicConfig.MAX_MESSAGE_BYTES_CONFIG, String.valueOf(1024 
* 1024 * 1024) // 1GB
+                ));
+            }
+        };
+        StringSerializer serializer = new StringSerializer();
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(mockWriter)
+                .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+                
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
+                .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
+                .withSerializer(serializer)
+                .withExecutorService(mock(ExecutorService.class))
+                .build();
+
+        // Schedule the loading.
+        runtime.scheduleLoadOperation(TP, 10);
+
+        // Verify the initial state.
+        CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(List.of(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+
+        // Generate enough records to create a batch that has 
INITIAL_BUFFER_SIZE < batchSize < maxBatchSize
+        List<String> records = new ArrayList<>();
+        for (int i = 0; i < 1000000; i++) {
+            records.add("record-" + i);
+        }
+
+        // Write #1.
+        CompletableFuture<String> write1 = 
runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT,
+            state -> new CoordinatorResult<>(records, "response1")
+        );
+
+        // Verify that the write has not completed exceptionally.
+        // This will catch any exceptions thrown including 
RecordTooLargeException.
+        assertFalse(write1.isCompletedExceptionally());
+
+        int batchSize = mockWriter.entries(TP).get(0).sizeInBytes();
+        int maxBatchSize = mockWriter.config(TP).maxMessageSize();
+        assertTrue(INITIAL_BUFFER_SIZE < batchSize && batchSize <= 
maxBatchSize);
+
+        // Verify that the next buffer retrieved from the bufferSupplier is 
the expanded buffer.
+        assertTrue(ctx.bufferSupplier.get(1).capacity() > INITIAL_BUFFER_SIZE);
+    }
+
+    @Test
+    public void 
testBufferShrinkWhenMaxMessageSizeReducedBelowInitialBufferSize() {
+        MockTimer timer = new MockTimer();
+        var mockWriter = new InMemoryPartitionWriter(false) {
+            private LogConfig config = new LogConfig(Map.of(
+                TopicConfig.MAX_MESSAGE_BYTES_CONFIG, String.valueOf(1024 * 
1024) // 1MB
+            ));
+
+            @Override
+            public LogConfig config(TopicPartition tp) {
+                return config;
+            }
+
+            public void updateConfig(LogConfig newConfig) {
+                this.config = newConfig;
+            }
+        };
+        StringSerializer serializer = new StringSerializer();
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(mockWriter)
+                .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+                
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
+                .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
+                .withSerializer(serializer)
+                .withExecutorService(mock(ExecutorService.class))
+                .build();
+
+        // Schedule the loading.
+        runtime.scheduleLoadOperation(TP, 10);
+
+        // Verify the initial state.
+        CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(List.of(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+
+        List<String> records = new ArrayList<>();
+        for (int i = 0; i < 1000; i++) {
+            records.add("record-" + i);
+        }
+
+        // Write #1.
+        CompletableFuture<String> write1 = 
runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT,
+            state -> new CoordinatorResult<>(records, "response1")
+        );
+
+        // Verify that the write has not completed exceptionally.
+        // This will catch any exceptions thrown including 
RecordTooLargeException.
+        assertFalse(write1.isCompletedExceptionally());
+
+        int batchSize = mockWriter.entries(TP).get(0).sizeInBytes();
+        int maxBatchSize = mockWriter.config(TP).maxMessageSize();
+        assertTrue(batchSize <= INITIAL_BUFFER_SIZE && INITIAL_BUFFER_SIZE <= 
maxBatchSize);
+
+        ByteBuffer cachedBuffer = ctx.bufferSupplier.get(1);
+        assertEquals(INITIAL_BUFFER_SIZE, cachedBuffer.capacity());
+        // ctx.bufferSupplier.get(1); will clear cachedBuffer in 
bufferSupplier. Use release to put it back to bufferSupplier
+        ctx.bufferSupplier.release(cachedBuffer);
+
+        // Reduce max message size below initial buffer size.
+        mockWriter.updateConfig(new LogConfig(
+            Map.of(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, 
String.valueOf(INITIAL_BUFFER_SIZE - 66))));
+        assertEquals(INITIAL_BUFFER_SIZE - 66, 
mockWriter.config(TP).maxMessageSize());
+
+        // Write #2.
+        CompletableFuture<String> write2 = 
runtime.scheduleWriteOperation("write#2", TP, DEFAULT_WRITE_TIMEOUT,
+            state -> new CoordinatorResult<>(records, "response2")
+        );
+        assertFalse(write2.isCompletedExceptionally());
+
+        // Verify that there is no cached buffer since the cached buffer size 
is greater than new maxMessageSize.
+        assertEquals(1, ctx.bufferSupplier.get(1).capacity());
+
+        // Write #3.
+        CompletableFuture<String> write3 = 
runtime.scheduleWriteOperation("write#3", TP, DEFAULT_WRITE_TIMEOUT,
+            state -> new CoordinatorResult<>(records, "response3")
+        );
+        assertFalse(write3.isCompletedExceptionally());
+
+        // Verify that the cached buffer size is equals to new maxMessageSize 
that less than INITIAL_BUFFER_SIZE.
+        assertEquals(mockWriter.config(TP).maxMessageSize(), 
ctx.bufferSupplier.get(1).capacity());
     }
 
     @Test

Reply via email to