This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a316c52b857d178c792dbea815c6621e5b6f2d7e
Author: Yunze Xu <[email protected]>
AuthorDate: Mon Feb 10 11:19:33 2025 +0800

    [improve][ml] Do not switch thread to execute asyncAddEntry's core logic 
(#23940)
    
    (cherry picked from commit 215b36dcc73dad91f4c9ba9a90da50540e4899a7)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 52 ++++++++++++---------
 .../mledger/impl/ShadowManagedLedgerImpl.java      | 16 +++----
 .../ManagedLedgerInterceptorImplTest.java          | 53 ++++++++++++++++++++++
 3 files changed, 90 insertions(+), 31 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 4f45fc67b63..f9a0ff26208 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -802,33 +802,41 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         buffer.retain();
 
         // Jump to specific thread to avoid contention from writers writing 
from different threads
-        executor.execute(() -> {
-            OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this, 
buffer, numberOfMessages, callback, ctx,
-                    currentLedgerTimeoutTriggered);
-            internalAsyncAddEntry(addOperation);
-        });
+        final var addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, 
numberOfMessages, callback, ctx,
+                currentLedgerTimeoutTriggered);
+        var added = false;
+        try {
+            // Use synchronized to ensure if `addOperation` is added to queue 
and fails later, it will be the first
+            // element in `pendingAddEntries`.
+            synchronized (this) {
+                if (managedLedgerInterceptor != null) {
+                    managedLedgerInterceptor.beforeAddEntry(addOperation, 
addOperation.getNumberOfMessages());
+                }
+                final var state = STATE_UPDATER.get(this);
+                beforeAddEntryToQueue(state);
+                pendingAddEntries.add(addOperation);
+                added = true;
+                afterAddEntryToQueue(state, addOperation);
+            }
+        } catch (Throwable throwable) {
+            if (!added) {
+                
addOperation.failed(ManagedLedgerException.getManagedLedgerException(throwable));
+            } // else: all elements of `pendingAddEntries` will fail in 
another thread
+        }
     }
 
-    protected synchronized void internalAsyncAddEntry(OpAddEntry addOperation) 
{
-        if (!beforeAddEntry(addOperation)) {
-            return;
-        }
-        final State state = STATE_UPDATER.get(this);
+    protected void beforeAddEntryToQueue(State state) throws 
ManagedLedgerException {
         if (state.isFenced()) {
-            addOperation.failed(new ManagedLedgerFencedException());
-            return;
-        } else if (state == State.Terminated) {
-            addOperation.failed(new ManagedLedgerTerminatedException("Managed 
ledger was already terminated"));
-            return;
-        } else if (state == State.Closed) {
-            addOperation.failed(new 
ManagedLedgerAlreadyClosedException("Managed ledger was already closed"));
-            return;
-        } else if (state == State.WriteFailed) {
-            addOperation.failed(new 
ManagedLedgerAlreadyClosedException("Waiting to recover from failure"));
-            return;
+            throw new ManagedLedgerFencedException();
         }
-        pendingAddEntries.add(addOperation);
+        switch (state) {
+            case Terminated -> throw new 
ManagedLedgerTerminatedException("Managed ledger was already terminated");
+            case Closed -> throw new 
ManagedLedgerAlreadyClosedException("Managed ledger was already closed");
+            case WriteFailed -> throw new 
ManagedLedgerAlreadyClosedException("Waiting to recover from failure");
+        }
+    }
 
+    protected void afterAddEntryToQueue(State state, OpAddEntry addOperation) 
throws ManagedLedgerException {
         if (state == State.ClosingLedger || state == State.CreatingLedger) {
             // We don't have a ready ledger to write into
             // We are waiting for a new ledger to be created
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java
index 4b03cad8e0a..bae6cd66d28 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java
@@ -223,25 +223,23 @@ public class ShadowManagedLedgerImpl extends 
ManagedLedgerImpl {
     }
 
     @Override
-    protected synchronized void internalAsyncAddEntry(OpAddEntry addOperation) 
{
-        if (!beforeAddEntry(addOperation)) {
-            return;
-        }
+    protected void beforeAddEntryToQueue(State state) throws 
ManagedLedgerException {
         if (state != State.LedgerOpened) {
-            addOperation.failed(new ManagedLedgerException("Managed ledger is 
not opened"));
-            return;
+            throw new ManagedLedgerException("Managed ledger is not opened");
         }
+    }
 
+    @Override
+    protected void afterAddEntryToQueue(State state, OpAddEntry addOperation) 
throws ManagedLedgerException {
         if (addOperation.getCtx() == null || !(addOperation.getCtx() 
instanceof Position position)) {
-            addOperation.failed(new ManagedLedgerException("Illegal 
addOperation context object."));
-            return;
+            pendingAddEntries.poll();
+            throw new ManagedLedgerException("Illegal addOperation context 
object.");
         }
 
         if (log.isDebugEnabled()) {
             log.debug("[{}] Add entry into shadow ledger lh={} entries={}, 
pos=({},{})",
                     name, currentLedger.getId(), currentLedgerEntries, 
position.getLedgerId(), position.getEntryId());
         }
-        pendingAddEntries.add(addOperation);
         if (position.getLedgerId() <= currentLedger.getId()) {
             // Write into lastLedger
             if (position.getLedgerId() == currentLedger.getId()) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImplTest.java
index b57b5ce94be..8663019efb8 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImplTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.intercept;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
@@ -29,9 +30,12 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Predicate;
 import lombok.Cleanup;
@@ -499,4 +503,53 @@ public class ManagedLedgerInterceptorImplTest  extends 
MockedBookKeeperTestCase
         ledger.close();
     }
 
+    @Test
+    public void testBeforeAddEntry() throws Exception {
+        final var interceptor = new 
ManagedLedgerInterceptorImpl(getBrokerEntryMetadataInterceptors(), null);
+        final var config = new ManagedLedgerConfig();
+        final var numEntries = 100;
+        config.setMaxEntriesPerLedger(numEntries);
+        config.setManagedLedgerInterceptor(interceptor);
+        @Cleanup final var ml = (ManagedLedgerImpl) 
factory.open("test_concurrent_add_entry", config);
+
+        final var indexesBeforeAdd = new ArrayList<Long>();
+        final var batchSizes = new ArrayList<Long>();
+        final var random = new Random();
+        final var latch = new CountDownLatch(numEntries);
+        final var executor = Executors.newFixedThreadPool(3);
+        final var lock = new Object(); // make sure `asyncAddEntry` are called 
in order
+        for (int i = 0; i < numEntries; i++) {
+            final var batchSize = random.nextInt(0, 100);
+            final var msg = "msg-" + i;
+            final var callback = new AsyncCallbacks.AddEntryCallback() {
+
+                @Override
+                public void addComplete(Position position, ByteBuf entryData, 
Object ctx) {
+                    latch.countDown();
+                }
+
+                @Override
+                public void addFailed(ManagedLedgerException exception, Object 
ctx) {
+                    log.error("Failed to add {}", msg, exception);
+                    latch.countDown();
+                }
+            };
+            executor.execute(() -> {
+                synchronized (lock) {
+                    batchSizes.add((long) batchSize);
+                    indexesBeforeAdd.add(interceptor.getIndex() + 1); // index 
is updated in each asyncAddEntry call
+                    ml.asyncAddEntry(Unpooled.wrappedBuffer(msg.getBytes()), 
batchSize, callback, null);
+                }
+            });
+        }
+        assertTrue(latch.await(3, TimeUnit.SECONDS));
+        synchronized (lock) {
+            for (int i = 1; i < numEntries; i++) {
+                final var sum = batchSizes.get(i) + batchSizes.get(i - 1);
+                batchSizes.set(i, sum);
+            }
+            assertEquals(indexesBeforeAdd.subList(1, numEntries), 
batchSizes.subList(0, numEntries - 1));
+        }
+        executor.shutdown();
+    }
 }

Reply via email to