This is an automated email from the ASF dual-hosted git repository. xyz pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit a316c52b857d178c792dbea815c6621e5b6f2d7e Author: Yunze Xu <[email protected]> AuthorDate: Mon Feb 10 11:19:33 2025 +0800 [improve][ml] Do not switch thread to execute asyncAddEntry's core logic (#23940) (cherry picked from commit 215b36dcc73dad91f4c9ba9a90da50540e4899a7) --- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 52 ++++++++++++--------- .../mledger/impl/ShadowManagedLedgerImpl.java | 16 +++---- .../ManagedLedgerInterceptorImplTest.java | 53 ++++++++++++++++++++++ 3 files changed, 90 insertions(+), 31 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 4f45fc67b63..f9a0ff26208 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -802,33 +802,41 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { buffer.retain(); // Jump to specific thread to avoid contention from writers writing from different threads - executor.execute(() -> { - OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, numberOfMessages, callback, ctx, - currentLedgerTimeoutTriggered); - internalAsyncAddEntry(addOperation); - }); + final var addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, numberOfMessages, callback, ctx, + currentLedgerTimeoutTriggered); + var added = false; + try { + // Use synchronized to ensure if `addOperation` is added to queue and fails later, it will be the first + // element in `pendingAddEntries`. + synchronized (this) { + if (managedLedgerInterceptor != null) { + managedLedgerInterceptor.beforeAddEntry(addOperation, addOperation.getNumberOfMessages()); + } + final var state = STATE_UPDATER.get(this); + beforeAddEntryToQueue(state); + pendingAddEntries.add(addOperation); + added = true; + afterAddEntryToQueue(state, addOperation); + } + } catch (Throwable throwable) { + if (!added) { + addOperation.failed(ManagedLedgerException.getManagedLedgerException(throwable)); + } // else: all elements of `pendingAddEntries` will fail in another thread + } } - protected synchronized void internalAsyncAddEntry(OpAddEntry addOperation) { - if (!beforeAddEntry(addOperation)) { - return; - } - final State state = STATE_UPDATER.get(this); + protected void beforeAddEntryToQueue(State state) throws ManagedLedgerException { if (state.isFenced()) { - addOperation.failed(new ManagedLedgerFencedException()); - return; - } else if (state == State.Terminated) { - addOperation.failed(new ManagedLedgerTerminatedException("Managed ledger was already terminated")); - return; - } else if (state == State.Closed) { - addOperation.failed(new ManagedLedgerAlreadyClosedException("Managed ledger was already closed")); - return; - } else if (state == State.WriteFailed) { - addOperation.failed(new ManagedLedgerAlreadyClosedException("Waiting to recover from failure")); - return; + throw new ManagedLedgerFencedException(); } - pendingAddEntries.add(addOperation); + switch (state) { + case Terminated -> throw new ManagedLedgerTerminatedException("Managed ledger was already terminated"); + case Closed -> throw new ManagedLedgerAlreadyClosedException("Managed ledger was already closed"); + case WriteFailed -> throw new ManagedLedgerAlreadyClosedException("Waiting to recover from failure"); + } + } + protected void afterAddEntryToQueue(State state, OpAddEntry addOperation) throws ManagedLedgerException { if (state == State.ClosingLedger || state == State.CreatingLedger) { // We don't have a ready ledger to write into // We are waiting for a new ledger to be created diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java index 4b03cad8e0a..bae6cd66d28 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java @@ -223,25 +223,23 @@ public class ShadowManagedLedgerImpl extends ManagedLedgerImpl { } @Override - protected synchronized void internalAsyncAddEntry(OpAddEntry addOperation) { - if (!beforeAddEntry(addOperation)) { - return; - } + protected void beforeAddEntryToQueue(State state) throws ManagedLedgerException { if (state != State.LedgerOpened) { - addOperation.failed(new ManagedLedgerException("Managed ledger is not opened")); - return; + throw new ManagedLedgerException("Managed ledger is not opened"); } + } + @Override + protected void afterAddEntryToQueue(State state, OpAddEntry addOperation) throws ManagedLedgerException { if (addOperation.getCtx() == null || !(addOperation.getCtx() instanceof Position position)) { - addOperation.failed(new ManagedLedgerException("Illegal addOperation context object.")); - return; + pendingAddEntries.poll(); + throw new ManagedLedgerException("Illegal addOperation context object."); } if (log.isDebugEnabled()) { log.debug("[{}] Add entry into shadow ledger lh={} entries={}, pos=({},{})", name, currentLedger.getId(), currentLedgerEntries, position.getLedgerId(), position.getEntryId()); } - pendingAddEntries.add(addOperation); if (position.getLedgerId() <= currentLedger.getId()) { // Write into lastLedger if (position.getLedgerId() == currentLedger.getId()) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImplTest.java index b57b5ce94be..8663019efb8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImplTest.java @@ -21,6 +21,7 @@ package org.apache.pulsar.broker.intercept; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; @@ -29,9 +30,12 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Random; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; import lombok.Cleanup; @@ -499,4 +503,53 @@ public class ManagedLedgerInterceptorImplTest extends MockedBookKeeperTestCase ledger.close(); } + @Test + public void testBeforeAddEntry() throws Exception { + final var interceptor = new ManagedLedgerInterceptorImpl(getBrokerEntryMetadataInterceptors(), null); + final var config = new ManagedLedgerConfig(); + final var numEntries = 100; + config.setMaxEntriesPerLedger(numEntries); + config.setManagedLedgerInterceptor(interceptor); + @Cleanup final var ml = (ManagedLedgerImpl) factory.open("test_concurrent_add_entry", config); + + final var indexesBeforeAdd = new ArrayList<Long>(); + final var batchSizes = new ArrayList<Long>(); + final var random = new Random(); + final var latch = new CountDownLatch(numEntries); + final var executor = Executors.newFixedThreadPool(3); + final var lock = new Object(); // make sure `asyncAddEntry` are called in order + for (int i = 0; i < numEntries; i++) { + final var batchSize = random.nextInt(0, 100); + final var msg = "msg-" + i; + final var callback = new AsyncCallbacks.AddEntryCallback() { + + @Override + public void addComplete(Position position, ByteBuf entryData, Object ctx) { + latch.countDown(); + } + + @Override + public void addFailed(ManagedLedgerException exception, Object ctx) { + log.error("Failed to add {}", msg, exception); + latch.countDown(); + } + }; + executor.execute(() -> { + synchronized (lock) { + batchSizes.add((long) batchSize); + indexesBeforeAdd.add(interceptor.getIndex() + 1); // index is updated in each asyncAddEntry call + ml.asyncAddEntry(Unpooled.wrappedBuffer(msg.getBytes()), batchSize, callback, null); + } + }); + } + assertTrue(latch.await(3, TimeUnit.SECONDS)); + synchronized (lock) { + for (int i = 1; i < numEntries; i++) { + final var sum = batchSizes.get(i) + batchSizes.get(i - 1); + batchSizes.set(i, sum); + } + assertEquals(indexesBeforeAdd.subList(1, numEntries), batchSizes.subList(0, numEntries - 1)); + } + executor.shutdown(); + } }
