This is an automated email from the ASF dual-hosted git repository.

zhaocong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 35e9897742b [fix][broker] Release EntryBuffer after parse proto object 
(#20170)
35e9897742b is described below

commit 35e9897742b7db4bd29349940075a819b2ad6999
Author: Cong Zhao <[email protected]>
AuthorDate: Mon Apr 24 10:34:20 2023 +0800

    [fix][broker] Release EntryBuffer after parse proto object (#20170)
---
 .../bucket/BookkeeperBucketSnapshotStorage.java    | 30 ++++++++++++++++++----
 1 file changed, 25 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
index 040bbbc586f..e99f39b382f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.delayed.bucket;
 
 import com.google.protobuf.InvalidProtocolBufferException;
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 import java.util.ArrayList;
 import java.util.Enumeration;
 import java.util.List;
@@ -38,6 +39,7 @@ import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.delayed.proto.SnapshotMetadata;
 import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.util.FutureUtil;
 
 @Slf4j
@@ -60,8 +62,9 @@ public class BookkeeperBucketSnapshotStorage implements 
BucketSnapshotStorage {
     public CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata 
snapshotMetadata,
                                                         List<SnapshotSegment> 
bucketSnapshotSegments,
                                                         String bucketKey, 
String topicName, String cursorName) {
+        ByteBuf metadataByteBuf = 
Unpooled.wrappedBuffer(snapshotMetadata.toByteArray());
         return createLedger(bucketKey, topicName, cursorName)
-                .thenCompose(ledgerHandle -> addEntry(ledgerHandle, 
snapshotMetadata.toByteArray())
+                .thenCompose(ledgerHandle -> addEntry(ledgerHandle, 
metadataByteBuf)
                         .thenCompose(__ -> addSnapshotSegments(ledgerHandle, 
bucketSnapshotSegments))
                         .thenCompose(__ -> closeLedger(ledgerHandle))
                         .thenApply(__ -> ledgerHandle.getId()));
@@ -117,19 +120,32 @@ public class BookkeeperBucketSnapshotStorage implements 
BucketSnapshotStorage {
     private CompletableFuture<Void> addSnapshotSegments(LedgerHandle 
ledgerHandle,
                                                         List<SnapshotSegment> 
bucketSnapshotSegments) {
         List<CompletableFuture<Void>> addFutures = new ArrayList<>();
+        ByteBuf byteBuf;
         for (SnapshotSegment bucketSnapshotSegment : bucketSnapshotSegments) {
-            addFutures.add(addEntry(ledgerHandle, 
bucketSnapshotSegment.toByteArray()));
+            byteBuf = 
PulsarByteBufAllocator.DEFAULT.directBuffer(bucketSnapshotSegment.getSerializedSize());
+            try {
+                bucketSnapshotSegment.writeTo(byteBuf);
+            } catch (Exception e){
+                byteBuf.release();
+                throw e;
+            }
+            addFutures.add(addEntry(ledgerHandle, byteBuf));
         }
 
         return FutureUtil.waitForAll(addFutures);
     }
 
     private SnapshotMetadata parseSnapshotMetadataEntry(LedgerEntry 
ledgerEntry) {
+        ByteBuf entryBuffer = null;
         try {
-            ByteBuf entryBuffer = ledgerEntry.getEntryBuffer();
+            entryBuffer = ledgerEntry.getEntryBuffer();
             return SnapshotMetadata.parseFrom(entryBuffer.nioBuffer());
         } catch (InvalidProtocolBufferException e) {
             throw new BucketSnapshotSerializationException(e);
+        } finally {
+            if (entryBuffer != null) {
+                entryBuffer.release();
+            }
         }
     }
 
@@ -139,7 +155,11 @@ public class BookkeeperBucketSnapshotStorage implements 
BucketSnapshotStorage {
             LedgerEntry ledgerEntry = entryEnumeration.nextElement();
             SnapshotSegment snapshotSegment = new SnapshotSegment();
             ByteBuf entryBuffer = ledgerEntry.getEntryBuffer();
-            snapshotSegment.parseFrom(entryBuffer, 
entryBuffer.readableBytes());
+            try {
+                snapshotSegment.parseFrom(entryBuffer, 
entryBuffer.readableBytes());
+            } finally {
+                entryBuffer.release();
+            }
             snapshotMetadataList.add(snapshotSegment);
         }
         return snapshotMetadataList;
@@ -208,7 +228,7 @@ public class BookkeeperBucketSnapshotStorage implements 
BucketSnapshotStorage {
         return future;
     }
 
-    private CompletableFuture<Void> addEntry(LedgerHandle ledgerHandle, byte[] 
data) {
+    private CompletableFuture<Void> addEntry(LedgerHandle ledgerHandle, 
ByteBuf data) {
         final CompletableFuture<Void> future = new CompletableFuture<>();
         ledgerHandle.asyncAddEntry(data,
                 (rc, handle, entryId, ctx) -> {

Reply via email to