This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 215b36dcc73 [improve][ml] Do not switch thread to execute
asyncAddEntry's core logic (#23940)
215b36dcc73 is described below
commit 215b36dcc73dad91f4c9ba9a90da50540e4899a7
Author: Yunze Xu <[email protected]>
AuthorDate: Mon Feb 10 11:19:33 2025 +0800
[improve][ml] Do not switch thread to execute asyncAddEntry's core logic
(#23940)
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 52 ++++++++++++---------
.../mledger/impl/ShadowManagedLedgerImpl.java | 16 +++----
.../ManagedLedgerInterceptorImplTest.java | 53 ++++++++++++++++++++++
3 files changed, 90 insertions(+), 31 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 4f45fc67b63..f9a0ff26208 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -802,33 +802,41 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
buffer.retain();
// Jump to specific thread to avoid contention from writers writing
from different threads
- executor.execute(() -> {
- OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this,
buffer, numberOfMessages, callback, ctx,
- currentLedgerTimeoutTriggered);
- internalAsyncAddEntry(addOperation);
- });
+ final var addOperation = OpAddEntry.createNoRetainBuffer(this, buffer,
numberOfMessages, callback, ctx,
+ currentLedgerTimeoutTriggered);
+ var added = false;
+ try {
+ // Use synchronized to ensure if `addOperation` is added to queue
and fails later, it will be the first
+ // element in `pendingAddEntries`.
+ synchronized (this) {
+ if (managedLedgerInterceptor != null) {
+ managedLedgerInterceptor.beforeAddEntry(addOperation,
addOperation.getNumberOfMessages());
+ }
+ final var state = STATE_UPDATER.get(this);
+ beforeAddEntryToQueue(state);
+ pendingAddEntries.add(addOperation);
+ added = true;
+ afterAddEntryToQueue(state, addOperation);
+ }
+ } catch (Throwable throwable) {
+ if (!added) {
+
addOperation.failed(ManagedLedgerException.getManagedLedgerException(throwable));
+ } // else: all elements of `pendingAddEntries` will fail in
another thread
+ }
}
- protected synchronized void internalAsyncAddEntry(OpAddEntry addOperation)
{
- if (!beforeAddEntry(addOperation)) {
- return;
- }
- final State state = STATE_UPDATER.get(this);
+ protected void beforeAddEntryToQueue(State state) throws
ManagedLedgerException {
if (state.isFenced()) {
- addOperation.failed(new ManagedLedgerFencedException());
- return;
- } else if (state == State.Terminated) {
- addOperation.failed(new ManagedLedgerTerminatedException("Managed
ledger was already terminated"));
- return;
- } else if (state == State.Closed) {
- addOperation.failed(new
ManagedLedgerAlreadyClosedException("Managed ledger was already closed"));
- return;
- } else if (state == State.WriteFailed) {
- addOperation.failed(new
ManagedLedgerAlreadyClosedException("Waiting to recover from failure"));
- return;
+ throw new ManagedLedgerFencedException();
}
- pendingAddEntries.add(addOperation);
+ switch (state) {
+ case Terminated -> throw new
ManagedLedgerTerminatedException("Managed ledger was already terminated");
+ case Closed -> throw new
ManagedLedgerAlreadyClosedException("Managed ledger was already closed");
+ case WriteFailed -> throw new
ManagedLedgerAlreadyClosedException("Waiting to recover from failure");
+ }
+ }
+ protected void afterAddEntryToQueue(State state, OpAddEntry addOperation)
throws ManagedLedgerException {
if (state == State.ClosingLedger || state == State.CreatingLedger) {
// We don't have a ready ledger to write into
// We are waiting for a new ledger to be created
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java
index 4b03cad8e0a..bae6cd66d28 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java
@@ -223,25 +223,23 @@ public class ShadowManagedLedgerImpl extends
ManagedLedgerImpl {
}
@Override
- protected synchronized void internalAsyncAddEntry(OpAddEntry addOperation)
{
- if (!beforeAddEntry(addOperation)) {
- return;
- }
+ protected void beforeAddEntryToQueue(State state) throws
ManagedLedgerException {
if (state != State.LedgerOpened) {
- addOperation.failed(new ManagedLedgerException("Managed ledger is
not opened"));
- return;
+ throw new ManagedLedgerException("Managed ledger is not opened");
}
+ }
+ @Override
+ protected void afterAddEntryToQueue(State state, OpAddEntry addOperation)
throws ManagedLedgerException {
if (addOperation.getCtx() == null || !(addOperation.getCtx()
instanceof Position position)) {
- addOperation.failed(new ManagedLedgerException("Illegal
addOperation context object."));
- return;
+ pendingAddEntries.poll();
+ throw new ManagedLedgerException("Illegal addOperation context
object.");
}
if (log.isDebugEnabled()) {
log.debug("[{}] Add entry into shadow ledger lh={} entries={},
pos=({},{})",
name, currentLedger.getId(), currentLedgerEntries,
position.getLedgerId(), position.getEntryId());
}
- pendingAddEntries.add(addOperation);
if (position.getLedgerId() <= currentLedger.getId()) {
// Write into lastLedger
if (position.getLedgerId() == currentLedger.getId()) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImplTest.java
index b57b5ce94be..8663019efb8 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImplTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.intercept;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
@@ -29,9 +30,12 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
+import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import lombok.Cleanup;
@@ -499,4 +503,53 @@ public class ManagedLedgerInterceptorImplTest extends
MockedBookKeeperTestCase
ledger.close();
}
+ @Test
+ public void testBeforeAddEntry() throws Exception {
+ final var interceptor = new
ManagedLedgerInterceptorImpl(getBrokerEntryMetadataInterceptors(), null);
+ final var config = new ManagedLedgerConfig();
+ final var numEntries = 100;
+ config.setMaxEntriesPerLedger(numEntries);
+ config.setManagedLedgerInterceptor(interceptor);
+ @Cleanup final var ml = (ManagedLedgerImpl)
factory.open("test_concurrent_add_entry", config);
+
+ final var indexesBeforeAdd = new ArrayList<Long>();
+ final var batchSizes = new ArrayList<Long>();
+ final var random = new Random();
+ final var latch = new CountDownLatch(numEntries);
+ final var executor = Executors.newFixedThreadPool(3);
+ final var lock = new Object(); // make sure `asyncAddEntry` are called
in order
+ for (int i = 0; i < numEntries; i++) {
+ final var batchSize = random.nextInt(0, 100);
+ final var msg = "msg-" + i;
+ final var callback = new AsyncCallbacks.AddEntryCallback() {
+
+ @Override
+ public void addComplete(Position position, ByteBuf entryData,
Object ctx) {
+ latch.countDown();
+ }
+
+ @Override
+ public void addFailed(ManagedLedgerException exception, Object
ctx) {
+ log.error("Failed to add {}", msg, exception);
+ latch.countDown();
+ }
+ };
+ executor.execute(() -> {
+ synchronized (lock) {
+ batchSizes.add((long) batchSize);
+ indexesBeforeAdd.add(interceptor.getIndex() + 1); // index
is updated in each asyncAddEntry call
+ ml.asyncAddEntry(Unpooled.wrappedBuffer(msg.getBytes()),
batchSize, callback, null);
+ }
+ });
+ }
+ assertTrue(latch.await(3, TimeUnit.SECONDS));
+ synchronized (lock) {
+ for (int i = 1; i < numEntries; i++) {
+ final var sum = batchSizes.get(i) + batchSizes.get(i - 1);
+ batchSizes.set(i, sum);
+ }
+ assertEquals(indexesBeforeAdd.subList(1, numEntries),
batchSizes.subList(0, numEntries - 1));
+ }
+ executor.shutdown();
+ }
}