This is an automated email from the ASF dual-hosted git repository. jianghaiting pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit d17079a0134acf4f3feaebd207ac50d2a75f79df Author: Qiang Huang <[email protected]> AuthorDate: Thu Sep 8 11:08:49 2022 +0800 [fix][storage]fix OpAddEntry release error when exception in ManagedLedgerInterceptor (#17394) (cherry picked from commit c6a1875db2f7a9e5cab187dc98e90487cf08a805) --- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 2 - .../intercept/MangedLedgerInterceptorImplTest.java | 64 ++++++++++++++++++++++ 2 files changed, 64 insertions(+), 2 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 6d3a628a367..438539bfdd5 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -34,7 +34,6 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; -import io.netty.util.ReferenceCountUtil; import java.io.IOException; import java.time.Clock; import java.util.ArrayList; @@ -825,7 +824,6 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { } catch (Exception e) { addOperation.failed( new ManagedLedgerInterceptException("Interceptor managed ledger before add to bookie failed.")); - ReferenceCountUtil.release(addOperation.data); log.error("[{}] Failed to intercept adding an entry to bookie.", name, e); return false; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java index 60067a7016c..830e1894832 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java @@ -20,13 +20,18 @@ package org.apache.pulsar.broker.intercept; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import java.util.concurrent.CountDownLatch; import lombok.Cleanup; +import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; +import org.apache.bookkeeper.mledger.impl.OpAddEntry; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor; import org.apache.bookkeeper.test.MockedBookKeeperTestCase; @@ -256,6 +261,65 @@ public class MangedLedgerInterceptorImplTest extends MockedBookKeeperTestCase { ledger.close(); } + @Test + public void testBeforeAddEntryWithException() throws Exception { + final int MOCK_BATCH_SIZE = 2; + final String ledgerAndCursorName = "testBeforeAddEntryWithException"; + + ManagedLedgerInterceptor interceptor = + new MockManagedLedgerInterceptorImpl(getBrokerEntryMetadataInterceptors(), null); + + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(2); + config.setManagedLedgerInterceptor(interceptor); + + ByteBuf buffer = Unpooled.wrappedBuffer("message".getBytes()); + ManagedLedger ledger = factory.open(ledgerAndCursorName, config); + CountDownLatch countDownLatch = new CountDownLatch(1); + try { + ledger.asyncAddEntry(buffer, MOCK_BATCH_SIZE, new AsyncCallbacks.AddEntryCallback() { + @Override + public void addComplete(Position position, ByteBuf entryData, Object ctx) { + countDownLatch.countDown(); + } + + @Override + public void addFailed(ManagedLedgerException exception, Object ctx) { + countDownLatch.countDown(); + } + }, null); + countDownLatch.await(); + assertEquals(buffer.refCnt(), 1); + } finally { + ledger.close(); + factory.shutdown(); + } + } + + private class MockManagedLedgerInterceptorImpl extends ManagedLedgerInterceptorImpl { + private final Set<BrokerEntryMetadataInterceptor> brokerEntryMetadataInterceptors; + + public MockManagedLedgerInterceptorImpl( + Set<BrokerEntryMetadataInterceptor> brokerEntryMetadataInterceptors, + Set<ManagedLedgerPayloadProcessor> brokerEntryPayloadProcessors) { + super(brokerEntryMetadataInterceptors, brokerEntryPayloadProcessors); + this.brokerEntryMetadataInterceptors = brokerEntryMetadataInterceptors; + } + + @Override + public OpAddEntry beforeAddEntry(OpAddEntry op, int numberOfMessages) { + if (op == null || numberOfMessages <= 0) { + return op; + } + op.setData(Commands.addBrokerEntryMetadata(op.getData(), brokerEntryMetadataInterceptors, + numberOfMessages)); + if (op != null) { + throw new RuntimeException("throw exception before add entry for test"); + } + return op; + } + } + public static Set<BrokerEntryMetadataInterceptor> getBrokerEntryMetadataInterceptors() { Set<String> interceptorNames = new HashSet<>(); interceptorNames.add("org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor");
