This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new e6364bc41f0 [fix][storage]fix OpAddEntry release error when exception
in ManagedLedgerInterceptor (#17929)
e6364bc41f0 is described below
commit e6364bc41f0588586b62af4ed433e39b5c25d7ec
Author: Qiang Huang <[email protected]>
AuthorDate: Mon Nov 14 11:00:07 2022 +0800
[fix][storage]fix OpAddEntry release error when exception in
ManagedLedgerInterceptor (#17929)
### Motivation
This PR is fully based on https://github.com/apache/pulsar/pull/17394.
### Modifications
- Remove duplicate release in `ManagedLedgerInterceptor`
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 2 -
.../intercept/MangedLedgerInterceptorImplTest.java | 65 ++++++++++++++++++++++
2 files changed, 65 insertions(+), 2 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 6280888174c..e1555fa1ae5 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -34,7 +34,6 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
-import io.netty.util.ReferenceCountUtil;
import java.time.Clock;
import java.util.Collections;
import java.util.HashMap;
@@ -808,7 +807,6 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
} catch (Exception e) {
addOperation.failed(
new ManagedLedgerInterceptException("Interceptor managed
ledger before add to bookie failed."));
- ReferenceCountUtil.release(addOperation.data);
log.error("[{}] Failed to intercept adding an entry to bookie.",
name, e);
return false;
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java
index 75dc8917193..f75ea9d4ad1 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java
@@ -18,13 +18,20 @@
*/
package org.apache.pulsar.broker.intercept;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.util.concurrent.CountDownLatch;
import lombok.Cleanup;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
+import org.apache.bookkeeper.mledger.impl.OpAddEntry;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
@@ -187,6 +194,64 @@ public class MangedLedgerInterceptorImplTest extends
MockedBookKeeperTestCase {
ledger.close();
}
+ @Test
+ public void testBeforeAddEntryWithException() throws Exception {
+ final int MOCK_BATCH_SIZE = 2;
+ final String ledgerAndCursorName = "testBeforeAddEntryWithException";
+
+ ManagedLedgerInterceptor interceptor =
+ new
MockManagedLedgerInterceptorImpl(getBrokerEntryMetadataInterceptors());
+
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMaxEntriesPerLedger(2);
+ config.setManagedLedgerInterceptor(interceptor);
+
+ ByteBuf buffer = Unpooled.wrappedBuffer("message".getBytes());
+ ManagedLedger ledger = factory.open(ledgerAndCursorName, config);
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ try {
+ ledger.asyncAddEntry(buffer, MOCK_BATCH_SIZE, new
AsyncCallbacks.AddEntryCallback() {
+ @Override
+ public void addComplete(Position position, ByteBuf entryData,
Object ctx) {
+ countDownLatch.countDown();
+ }
+
+ @Override
+ public void addFailed(ManagedLedgerException exception, Object
ctx) {
+ countDownLatch.countDown();
+ }
+ }, null);
+ countDownLatch.await();
+ assertEquals(buffer.refCnt(), 1);
+ } finally {
+ ledger.close();
+ factory.shutdown();
+ }
+ }
+
+ private class MockManagedLedgerInterceptorImpl extends
ManagedLedgerInterceptorImpl {
+ private final Set<BrokerEntryMetadataInterceptor>
brokerEntryMetadataInterceptors;
+
+ public MockManagedLedgerInterceptorImpl(
+ Set<BrokerEntryMetadataInterceptor>
brokerEntryMetadataInterceptors) {
+ super(brokerEntryMetadataInterceptors);
+ this.brokerEntryMetadataInterceptors =
brokerEntryMetadataInterceptors;
+ }
+
+ @Override
+ public OpAddEntry beforeAddEntry(OpAddEntry op, int numberOfMessages) {
+ if (op == null || numberOfMessages <= 0) {
+ return op;
+ }
+ op.setData(Commands.addBrokerEntryMetadata(op.getData(),
brokerEntryMetadataInterceptors,
+ numberOfMessages));
+ if (op != null) {
+ throw new RuntimeException("throw exception before add entry
for test");
+ }
+ return op;
+ }
+ }
+
public static Set<BrokerEntryMetadataInterceptor>
getBrokerEntryMetadataInterceptors() {
Set<String> interceptorNames = new HashSet<>();
interceptorNames.add("org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor");