This is an automated email from the ASF dual-hosted git repository.
zhaocong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 35e9897742b [fix][broker] Release EntryBuffer after parse proto object
(#20170)
35e9897742b is described below
commit 35e9897742b7db4bd29349940075a819b2ad6999
Author: Cong Zhao <[email protected]>
AuthorDate: Mon Apr 24 10:34:20 2023 +0800
[fix][broker] Release EntryBuffer after parse proto object (#20170)
---
.../bucket/BookkeeperBucketSnapshotStorage.java | 30 ++++++++++++++++++----
1 file changed, 25 insertions(+), 5 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
index 040bbbc586f..e99f39b382f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.delayed.bucket;
import com.google.protobuf.InvalidProtocolBufferException;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
@@ -38,6 +39,7 @@ import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.delayed.proto.SnapshotMetadata;
import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.util.FutureUtil;
@Slf4j
@@ -60,8 +62,9 @@ public class BookkeeperBucketSnapshotStorage implements
BucketSnapshotStorage {
public CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata
snapshotMetadata,
List<SnapshotSegment>
bucketSnapshotSegments,
String bucketKey,
String topicName, String cursorName) {
+ ByteBuf metadataByteBuf =
Unpooled.wrappedBuffer(snapshotMetadata.toByteArray());
return createLedger(bucketKey, topicName, cursorName)
- .thenCompose(ledgerHandle -> addEntry(ledgerHandle,
snapshotMetadata.toByteArray())
+ .thenCompose(ledgerHandle -> addEntry(ledgerHandle,
metadataByteBuf)
.thenCompose(__ -> addSnapshotSegments(ledgerHandle,
bucketSnapshotSegments))
.thenCompose(__ -> closeLedger(ledgerHandle))
.thenApply(__ -> ledgerHandle.getId()));
@@ -117,19 +120,32 @@ public class BookkeeperBucketSnapshotStorage implements
BucketSnapshotStorage {
private CompletableFuture<Void> addSnapshotSegments(LedgerHandle
ledgerHandle,
List<SnapshotSegment>
bucketSnapshotSegments) {
List<CompletableFuture<Void>> addFutures = new ArrayList<>();
+ ByteBuf byteBuf;
for (SnapshotSegment bucketSnapshotSegment : bucketSnapshotSegments) {
- addFutures.add(addEntry(ledgerHandle,
bucketSnapshotSegment.toByteArray()));
+ byteBuf =
PulsarByteBufAllocator.DEFAULT.directBuffer(bucketSnapshotSegment.getSerializedSize());
+ try {
+ bucketSnapshotSegment.writeTo(byteBuf);
+ } catch (Exception e){
+ byteBuf.release();
+ throw e;
+ }
+ addFutures.add(addEntry(ledgerHandle, byteBuf));
}
return FutureUtil.waitForAll(addFutures);
}
private SnapshotMetadata parseSnapshotMetadataEntry(LedgerEntry
ledgerEntry) {
+ ByteBuf entryBuffer = null;
try {
- ByteBuf entryBuffer = ledgerEntry.getEntryBuffer();
+ entryBuffer = ledgerEntry.getEntryBuffer();
return SnapshotMetadata.parseFrom(entryBuffer.nioBuffer());
} catch (InvalidProtocolBufferException e) {
throw new BucketSnapshotSerializationException(e);
+ } finally {
+ if (entryBuffer != null) {
+ entryBuffer.release();
+ }
}
}
@@ -139,7 +155,11 @@ public class BookkeeperBucketSnapshotStorage implements
BucketSnapshotStorage {
LedgerEntry ledgerEntry = entryEnumeration.nextElement();
SnapshotSegment snapshotSegment = new SnapshotSegment();
ByteBuf entryBuffer = ledgerEntry.getEntryBuffer();
- snapshotSegment.parseFrom(entryBuffer,
entryBuffer.readableBytes());
+ try {
+ snapshotSegment.parseFrom(entryBuffer,
entryBuffer.readableBytes());
+ } finally {
+ entryBuffer.release();
+ }
snapshotMetadataList.add(snapshotSegment);
}
return snapshotMetadataList;
@@ -208,7 +228,7 @@ public class BookkeeperBucketSnapshotStorage implements
BucketSnapshotStorage {
return future;
}
- private CompletableFuture<Void> addEntry(LedgerHandle ledgerHandle, byte[]
data) {
+ private CompletableFuture<Void> addEntry(LedgerHandle ledgerHandle,
ByteBuf data) {
final CompletableFuture<Void> future = new CompletableFuture<>();
ledgerHandle.asyncAddEntry(data,
(rc, handle, entryId, ctx) -> {