This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 17a44c1f214 [fix][broker] Increase readBuffer size for 
bookkeeper.DLOutputStream (#23548)
17a44c1f214 is described below

commit 17a44c1f21474a832b6843b4006097e71be0291f
Author: jiangpengcheng <[email protected]>
AuthorDate: Tue Nov 5 08:19:22 2024 +0800

    [fix][broker] Increase readBuffer size for bookkeeper.DLOutputStream 
(#23548)
    
    (cherry picked from commit 7a4788895e31dcd794fcb89b3af2bc36fa221343)
---
 .../management/storage/bookkeeper/DLOutputStream.java   | 17 +++++++++++++----
 .../storage/bookkeeper/DLOutputStreamTest.java          |  2 +-
 2 files changed, 14 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStream.java
 
b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStream.java
index 67345ebd47e..f446961c1d8 100644
--- 
a/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStream.java
+++ 
b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStream.java
@@ -36,7 +36,16 @@ class DLOutputStream {
 
     private final DistributedLogManager distributedLogManager;
     private final AsyncLogWriter writer;
-    private final byte[] readBuffer = new byte[8192];
+    /*
+     * The LogRecord structure is:
+     * -------------------
+     * Bytes 0 - 7                      : Metadata (Long)
+     * Bytes 8 - 15                     : TxId (Long)
+     * Bytes 16 - 19                    : Payload length (Integer)
+     * Bytes 20 - 20+payload.length-1   : Payload (Byte[])
+     * So the max buffer size should be LogRecord.MAX_LOGRECORD_SIZE - 2 * 
(Long.SIZE / 8) - Integer.SIZE / 8
+     */
+    private final byte[] readBuffer = new byte[LogRecord.MAX_LOGRECORD_SIZE - 
2 * (Long.SIZE / 8) - Integer.SIZE / 8];
     private long offset = 0L;
 
     private DLOutputStream(DistributedLogManager distributedLogManager, 
AsyncLogWriter writer) {
@@ -51,9 +60,9 @@ class DLOutputStream {
 
     private void writeAsyncHelper(InputStream is, 
CompletableFuture<DLOutputStream> result) {
         try {
-            int read = is.read(readBuffer);
-            if (read != -1) {
-                log.info("write something into the ledgers offset: {}, length: 
{}", offset, read);
+            int read = is.readNBytes(readBuffer, 0, readBuffer.length);
+            if (read > 0) {
+                log.debug("write something into the ledgers offset: {}, 
length: {}", offset, read);
                 final ByteBuf writeBuf = Unpooled.wrappedBuffer(readBuffer, 0, 
read);
                 offset += writeBuf.readableBytes();
                 final LogRecord record = new LogRecord(offset, writeBuf);
diff --git 
a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStreamTest.java
 
b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStreamTest.java
index b55e0e0d34a..235cb4fefc0 100644
--- 
a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStreamTest.java
+++ 
b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStreamTest.java
@@ -99,7 +99,7 @@ public class DLOutputStreamTest {
 
     @Test
     public void writeLongBytesArrayData() throws ExecutionException, 
InterruptedException {
-        byte[] data = new byte[8192 * 3 + 4096];
+        byte[] data = new byte[1040364 * 3 + 4096];
         DLOutputStream.openWriterAsync(dlm)
                 .thenCompose(w -> w.writeAsync(new ByteArrayInputStream(data))
                         .thenCompose(DLOutputStream::closeAsync)).get();

Reply via email to