This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new e262890216 [ISSUE #9776] Make SharedByteBuffer size configurable via 
MessageStoreConfig.maxMessageSize (#9775)
e262890216 is described below

commit e26289021614d7407c5e2648af5bd708aa90b064
Author: rongtong <[email protected]>
AuthorDate: Fri Oct 24 13:36:52 2025 +0800

    [ISSUE #9776] Make SharedByteBuffer size configurable via 
MessageStoreConfig.maxMessageSize (#9775)
    
    * Make SharedByteBuffer size configurable via 
MessageStoreConfig.maxMessageSize
    
    Change-Id: Ie3c291ba10b84963fb3ba0af90afa323d9b955ff
    
    * Fix checkstyle
    
    Change-Id: I75f9f767e30f33fc2ea4ceafd59b9d950875c765
    
    * Fix UTs
    
    Change-Id: I57b3c904d37558e4301394fc1dd4188b0866718b
    
    * Fix UTs
    
    Change-Id: I87775116926d3f5271eb13f0a86c0a40446ae432
    
    * Fix bugs
    
    Change-Id: Ib76596b91621b59d1e189642d081d259880f9ac8
    
    * Fix comments
    
    Change-Id: I7bdd3b9f24172afe77a7023b9aa4109dc271c27b
    
    * refactor: make SharedByteBufferManager buffer count configurable
    
    Change-Id: Ia97908f7e96f23542e8acf1a2cc6c1407d3d8e87
---
 .../apache/rocketmq/store/DefaultMessageStore.java |   5 +
 .../rocketmq/store/config/MessageStoreConfig.java  |  11 ++
 .../rocketmq/store/logfile/DefaultMappedFile.java  | 117 ++++++++-----------
 .../store/logfile/SharedByteBufferManager.java     | 128 +++++++++++++++++++++
 .../logfile/DefaultMappedFileConcurrencyTest.java  |   3 +
 .../DefaultMappedFileErrorHandlingTest.java        |   3 +
 .../logfile/DefaultMappedFilePerformanceTest.java  |   3 +
 .../DefaultMappedFileWriteWithoutMmapTest.java     |   3 +
 8 files changed, 204 insertions(+), 69 deletions(-)

diff --git 
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 7d939a969a..7db1daa39c 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -103,6 +103,7 @@ import 
org.apache.rocketmq.store.kv.CommitLogDispatcherCompaction;
 import org.apache.rocketmq.store.kv.CompactionService;
 import org.apache.rocketmq.store.kv.CompactionStore;
 import org.apache.rocketmq.store.logfile.MappedFile;
+import org.apache.rocketmq.store.logfile.SharedByteBufferManager;
 import org.apache.rocketmq.store.metrics.DefaultStoreMetricsManager;
 import org.apache.rocketmq.store.queue.CombineConsumeQueueStore;
 import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
@@ -240,6 +241,10 @@ public class DefaultMessageStore implements MessageStore {
 
         this.transientStorePool = new 
TransientStorePool(messageStoreConfig.getTransientStorePoolSize(), 
messageStoreConfig.getMappedFileSizeCommitLog());
 
+        if (messageStoreConfig.isWriteWithoutMmap()) {
+            
SharedByteBufferManager.getInstance().init(messageStoreConfig.getMaxMessageSize(),
 messageStoreConfig.getSharedByteBufferNum());
+        }
+
         this.defaultStoreMetricsManager = new DefaultStoreMetricsManager();
 
         this.scheduledExecutorService =
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java 
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index 2e72f9e6f2..85d19f31b4 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -488,6 +488,9 @@ public class MessageStoreConfig {
      */
     private boolean enableAcceleratedRecovery = false;
 
+    // Shared byte buffer manager configuration
+    private int sharedByteBufferNum = 16;
+
     public String getRocksdbCompressionType() {
         return rocksdbCompressionType;
     }
@@ -2060,4 +2063,12 @@ public class MessageStoreConfig {
     public void setEnableRunningFlagsInFlush(boolean 
enableRunningFlagsInFlush) {
         this.enableRunningFlagsInFlush = enableRunningFlagsInFlush;
     }
+
+    public int getSharedByteBufferNum() {
+        return sharedByteBufferNum;
+    }
+
+    public void setSharedByteBufferNum(int sharedByteBufferNum) {
+        this.sharedByteBufferNum = sharedByteBufferNum;
+    }
 }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java 
b/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
index 147eb3d708..0c16d705bd 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
@@ -33,11 +33,9 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.nio.file.StandardCopyOption;
 import java.util.Iterator;
-import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Consumer;
 import org.apache.commons.lang3.SystemUtils;
 import org.apache.rocketmq.common.UtilAll;
@@ -116,29 +114,11 @@ public class DefaultMappedFile extends AbstractMappedFile 
{
      */
     private long stopTimestamp = -1;
 
-    private static int maxSharedNum = 16;
-    private static final SharedByteBuffer[] SHARED_BYTE_BUFFER;
 
-    protected RunningFlags runningFlags;
-
-    static class SharedByteBuffer {
-        private final ReentrantLock lock;
-        private final ByteBuffer buffer;
 
-        public SharedByteBuffer(int size) {
-            this.lock = new ReentrantLock();
-            this.buffer = ByteBuffer.allocateDirect(size);
-        }
+    protected RunningFlags runningFlags;
 
-        public void release() {
-            this.lock.unlock();
-        }
 
-        public ByteBuffer acquire() {
-            this.lock.lock();
-            return buffer;
-        }
-    }
 
     static {
         WROTE_POSITION_UPDATER = 
AtomicIntegerFieldUpdater.newUpdater(DefaultMappedFile.class, "wrotePosition");
@@ -156,18 +136,9 @@ public class DefaultMappedFile extends AbstractMappedFile {
             }
         }
         IS_LOADED_METHOD = isLoaded0method;
-
-        SHARED_BYTE_BUFFER = new SharedByteBuffer[maxSharedNum];
-        for (int i = 0; i < maxSharedNum; i++) {
-            SHARED_BYTE_BUFFER[i] = new SharedByteBuffer(4 * 1024 * 1024);
-        }
     }
 
-    private static SharedByteBuffer borrowSharedByteBuffer() {
-        int idx = ThreadLocalRandom.current().nextInt(maxSharedNum);
-        SharedByteBuffer buffer = SHARED_BYTE_BUFFER[idx];
-        return buffer;
-    }
+
 
     public DefaultMappedFile() {
     }
@@ -324,10 +295,10 @@ public class DefaultMappedFile extends AbstractMappedFile 
{
         long fileFromOffset = this.getFileFromOffset();
 
         if (currentPos < this.fileSize) {
-            SharedByteBuffer sharedByteBuffer = null;
+            SharedByteBufferManager.SharedByteBuffer sharedByteBuffer = null;
             ByteBuffer byteBuffer;
             if (writeWithoutMmap) {
-                sharedByteBuffer = borrowSharedByteBuffer();
+                sharedByteBuffer = 
SharedByteBufferManager.getInstance().borrowSharedByteBuffer();
                 byteBuffer = sharedByteBuffer.acquire();
                 byteBuffer.position(0).limit(byteBuffer.capacity());
                 fileFromOffset += currentPos;
@@ -336,24 +307,28 @@ public class DefaultMappedFile extends AbstractMappedFile 
{
                 byteBuffer.position(currentPos);
             }
 
-            AppendMessageResult result = cb.doAppend(byteBuffer, 
fileFromOffset, this.fileSize - currentPos, byteBufferMsg);
+            try {
+                AppendMessageResult result = cb.doAppend(byteBuffer, 
fileFromOffset, this.fileSize - currentPos, byteBufferMsg);
+
+                if (sharedByteBuffer != null) {
+                    try {
+                        this.fileChannel.position(currentPos);
+                        byteBuffer.position(0).limit(result.getWroteBytes());
+                        this.fileChannel.write(byteBuffer);
+                    } catch (Throwable t) {
+                        log.error("Failed to write to mappedFile {}", 
this.fileName, t);
+                        return new 
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
+                    }
+                }
 
-            if (sharedByteBuffer != null) {
-                try {
-                    this.fileChannel.position(currentPos);
-                    byteBuffer.position(0).limit(result.getWroteBytes());
-                    this.fileChannel.write(byteBuffer);
-                } catch (Throwable t) {
-                    log.error("Failed to write to mappedFile {}", 
this.fileName, t);
-                    return new 
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
-                } finally {
+                WROTE_POSITION_UPDATER.addAndGet(this, result.getWroteBytes());
+                this.storeTimestamp = result.getStoreTimestamp();
+                return result;
+            } finally {
+                if (sharedByteBuffer != null) {
                     sharedByteBuffer.release();
                 }
             }
-
-            WROTE_POSITION_UPDATER.addAndGet(this, result.getWroteBytes());
-            this.storeTimestamp = result.getStoreTimestamp();
-            return result;
         }
         log.error("MappedFile.appendMessage return null, wrotePosition: {} 
fileSize: {}", currentPos, this.fileSize);
         return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
@@ -380,10 +355,10 @@ public class DefaultMappedFile extends AbstractMappedFile 
{
         long fileFromOffset = this.getFileFromOffset();
 
         if (currentPos < this.fileSize) {
-            SharedByteBuffer sharedByteBuffer = null;
+            SharedByteBufferManager.SharedByteBuffer sharedByteBuffer = null;
             ByteBuffer byteBuffer;
             if (writeWithoutMmap) {
-                sharedByteBuffer = borrowSharedByteBuffer();
+                sharedByteBuffer = 
SharedByteBufferManager.getInstance().borrowSharedByteBuffer();
                 byteBuffer = sharedByteBuffer.acquire();
                 byteBuffer.position(0).limit(byteBuffer.capacity());
                 fileFromOffset += currentPos;
@@ -393,27 +368,31 @@ public class DefaultMappedFile extends AbstractMappedFile 
{
             }
 
             AppendMessageResult result;
-            if (messageExt instanceof MessageExtBatch && !((MessageExtBatch) 
messageExt).isInnerBatch()) {
-                // traditional batch message
-                result = cb.doAppend(fileFromOffset, byteBuffer, this.fileSize 
- currentPos,
-                    (MessageExtBatch) messageExt, putMessageContext);
-            } else if (messageExt instanceof MessageExtBrokerInner) {
-                // traditional single message or newly introduced inner-batch 
message
-                result = cb.doAppend(fileFromOffset, byteBuffer, this.fileSize 
- currentPos,
-                    (MessageExtBrokerInner) messageExt, putMessageContext);
-            } else {
-                return new 
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
-            }
-
-            if (sharedByteBuffer != null) {
-                try {
-                    this.fileChannel.position(currentPos);
-                    byteBuffer.position(0).limit(result.getWroteBytes());
-                    this.fileChannel.write(byteBuffer);
-                } catch (Throwable t) {
-                    log.error("Failed to write to mappedFile {}", 
this.fileName, t);
+            try {
+                if (messageExt instanceof MessageExtBatch && 
!((MessageExtBatch) messageExt).isInnerBatch()) {
+                    // traditional batch message
+                    result = cb.doAppend(fileFromOffset, byteBuffer, 
this.fileSize - currentPos,
+                        (MessageExtBatch) messageExt, putMessageContext);
+                } else if (messageExt instanceof MessageExtBrokerInner) {
+                    // traditional single message or newly introduced 
inner-batch message
+                    result = cb.doAppend(fileFromOffset, byteBuffer, 
this.fileSize - currentPos,
+                        (MessageExtBrokerInner) messageExt, putMessageContext);
+                } else {
                     return new 
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
-                } finally {
+                }
+
+                if (sharedByteBuffer != null) {
+                    try {
+                        this.fileChannel.position(currentPos);
+                        byteBuffer.position(0).limit(result.getWroteBytes());
+                        this.fileChannel.write(byteBuffer);
+                    } catch (Throwable t) {
+                        log.error("Failed to write to mappedFile {}", 
this.fileName, t);
+                        return new 
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
+                    }
+                }
+            } finally {
+                if (sharedByteBuffer != null) {
                     sharedByteBuffer.release();
                 }
             }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/logfile/SharedByteBufferManager.java
 
b/store/src/main/java/org/apache/rocketmq/store/logfile/SharedByteBufferManager.java
new file mode 100644
index 0000000000..f7caff866b
--- /dev/null
+++ 
b/store/src/main/java/org/apache/rocketmq/store/logfile/SharedByteBufferManager.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.logfile;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Shared byte buffer manager for managing some shared ByteBuffers Buffer size 
is set based on MessageStoreConfig's
+ * maxMessageSize
+ */
+public class SharedByteBufferManager {
+
+    private static volatile SharedByteBufferManager instance;
+    private static final Object LOCK = new Object();
+
+    private SharedByteBuffer[] sharedByteBuffers;
+    private int bufferSize;
+    private int maxSharedNum;
+    private volatile boolean initialized = false;
+
+    private SharedByteBufferManager() {
+        // Private constructor
+    }
+
+    /**
+     * Get singleton instance
+     */
+    public static SharedByteBufferManager getInstance() {
+        if (instance == null) {
+            synchronized (LOCK) {
+                if (instance == null) {
+                    instance = new SharedByteBufferManager();
+                }
+            }
+        }
+        return instance;
+    }
+
+    /**
+     * Initialize shared buffers with specified messageSize size and shared 
buffer number
+     *
+     * @param maxMessageSize max messageSize size
+     * @param sharedBufferNum number of shared buffers
+     */
+    public synchronized void init(int maxMessageSize, int sharedBufferNum) {
+        if (!initialized) {
+            //Reserve 64kb for encoding buffer outside body
+            bufferSize = Integer.MAX_VALUE - maxMessageSize >= 64 * 1024 ?
+                maxMessageSize + 64 * 1024 : Integer.MAX_VALUE;
+
+            this.maxSharedNum = sharedBufferNum;
+            this.sharedByteBuffers = new SharedByteBuffer[maxSharedNum];
+            for (int i = 0; i < maxSharedNum; i++) {
+                this.sharedByteBuffers[i] = new SharedByteBuffer(bufferSize);
+            }
+            this.initialized = true;
+        }
+    }
+
+    /**
+     * Borrow a shared buffer
+     *
+     * @return Shared buffer
+     */
+    public SharedByteBuffer borrowSharedByteBuffer() {
+        if (!initialized) {
+            throw new IllegalStateException("SharedByteBufferManager not 
initialized");
+        }
+        int idx = ThreadLocalRandom.current().nextInt(maxSharedNum);
+        return sharedByteBuffers[idx];
+    }
+
+    /**
+     * Get current buffer size
+     *
+     * @return Buffer size
+     */
+    public int getBufferSize() {
+        return bufferSize;
+    }
+
+    /**
+     * Check if initialized
+     *
+     * @return Whether initialized
+     */
+    public boolean isInitialized() {
+        return initialized;
+    }
+
+    /**
+     * Shared byte buffer class
+     */
+    public static class SharedByteBuffer {
+        private final ReentrantLock lock;
+        private final ByteBuffer buffer;
+
+        public SharedByteBuffer(int size) {
+            this.lock = new ReentrantLock();
+            this.buffer = ByteBuffer.allocateDirect(size);
+        }
+
+        public void release() {
+            this.lock.unlock();
+        }
+
+        public ByteBuffer acquire() {
+            this.lock.lock();
+            return buffer;
+        }
+    }
+}
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFileConcurrencyTest.java
 
b/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFileConcurrencyTest.java
index 06f94727d6..d8f3816c25 100644
--- 
a/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFileConcurrencyTest.java
+++ 
b/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFileConcurrencyTest.java
@@ -40,6 +40,9 @@ public class DefaultMappedFileConcurrencyTest {
         storePath = System.getProperty("user.home") + File.separator + 
"unitteststore" + System.currentTimeMillis();
         fileName = storePath + File.separator + "00000000000000000000";
         UtilAll.ensureDirOK(storePath);
+
+        // Initialize SharedByteBufferManager for tests
+        SharedByteBufferManager.getInstance().init(4 * 1024 * 1024, 16); // 
4MB default, 16 shared buffers
     }
 
     @After
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFileErrorHandlingTest.java
 
b/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFileErrorHandlingTest.java
index 649e8071cc..efda8d84aa 100644
--- 
a/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFileErrorHandlingTest.java
+++ 
b/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFileErrorHandlingTest.java
@@ -43,6 +43,9 @@ public class DefaultMappedFileErrorHandlingTest {
         storePath = System.getProperty("user.home") + File.separator + 
"unitteststore" + System.currentTimeMillis();
         fileName = storePath + File.separator + "00000000000000000000";
         UtilAll.ensureDirOK(storePath);
+
+        // Initialize SharedByteBufferManager for tests
+        SharedByteBufferManager.getInstance().init(4 * 1024 * 1024, 16); // 
4MB default, 16 shared buffers
     }
 
     @After
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFilePerformanceTest.java
 
b/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFilePerformanceTest.java
index b958487add..e418aecadb 100644
--- 
a/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFilePerformanceTest.java
+++ 
b/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFilePerformanceTest.java
@@ -38,6 +38,9 @@ public class DefaultMappedFilePerformanceTest {
         storePath = System.getProperty("user.home") + File.separator + 
"unitteststore" + System.currentTimeMillis();
         fileName = storePath + File.separator + "00000000000000000000";
         UtilAll.ensureDirOK(storePath);
+
+        // Initialize SharedByteBufferManager for tests
+        SharedByteBufferManager.getInstance().init(4 * 1024 * 1024, 16); // 
4MB default, 16 shared buffers
     }
 
     @After
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFileWriteWithoutMmapTest.java
 
b/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFileWriteWithoutMmapTest.java
index 79bca016e4..8734a55b07 100644
--- 
a/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFileWriteWithoutMmapTest.java
+++ 
b/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFileWriteWithoutMmapTest.java
@@ -37,6 +37,9 @@ public class DefaultMappedFileWriteWithoutMmapTest {
         storePath = System.getProperty("user.home") + File.separator + 
"unitteststore" + System.currentTimeMillis();
         fileName = storePath + File.separator + "00000000000000000000";
         UtilAll.ensureDirOK(storePath);
+
+        // Initialize SharedByteBufferManager for tests
+        SharedByteBufferManager.getInstance().init(4 * 1024 * 1024, 16); // 
4MB default, 16 shared buffers
     }
 
     @After

Reply via email to