This is an automated email from the ASF dual-hosted git repository.

zhaocong pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 07a3767df36 [fix] [ml] Add entry fail due to race condition about add 
entry failed/timeout and switch ledger (#22221)
07a3767df36 is described below

commit 07a3767df36dd86717887075c3bf76ba0fbd2081
Author: fengyubiao <yubiao.f...@streamnative.io>
AuthorDate: Tue May 21 15:50:53 2024 +0800

    [fix] [ml] Add entry fail due to race condition about add entry 
failed/timeout and switch ledger (#22221)
    
    (cherry picked from commit ae9616b7ee41e77dde590045b8e1d5bd04192ffc)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 30 ++++++++----
 .../apache/bookkeeper/mledger/impl/OpAddEntry.java | 39 +++++++++-------
 .../mledger/impl/ShadowManagedLedgerImpl.java      |  3 ++
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 53 +++++++++++++++++++++-
 .../bookkeeper/client/PulsarMockBookKeeper.java    |  7 +++
 .../bookkeeper/client/PulsarMockLedgerHandle.java  |  7 +++
 6 files changed, 113 insertions(+), 26 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 07d0d40569a..167c0a1bad3 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -59,6 +59,7 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.atomic.AtomicReference;
@@ -240,6 +241,9 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
     protected volatile long lastAddEntryTimeMs = 0;
     private long inactiveLedgerRollOverTimeMs = 0;
 
+    /** A signal that may trigger all the subsequent OpAddEntry of current 
ledger to be failed due to timeout. **/
+    protected volatile AtomicBoolean currentLedgerTimeoutTriggered;
+
     protected static final int DEFAULT_LEDGER_DELETE_RETRIES = 3;
     protected static final int DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC = 60;
     private static final String MIGRATION_STATE_PROPERTY = "migrated";
@@ -532,6 +536,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                 STATE_UPDATER.set(this, State.LedgerOpened);
                 updateLastLedgerCreatedTimeAndScheduleRolloverTask();
                 currentLedger = lh;
+                currentLedgerTimeoutTriggered = new AtomicBoolean();
 
                 lastConfirmedEntry = new PositionImpl(lh.getId(), -1);
                 // bypass empty ledgers, find last ledger with Message if 
possible.
@@ -774,7 +779,8 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 
         // Jump to specific thread to avoid contention from writers writing 
from different threads
         executor.execute(() -> {
-            OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this, 
buffer, callback, ctx);
+            OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this, 
buffer, callback, ctx,
+                    currentLedgerTimeoutTriggered);
             internalAsyncAddEntry(addOperation);
         });
     }
@@ -790,7 +796,8 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 
         // Jump to specific thread to avoid contention from writers writing 
from different threads
         executor.execute(() -> {
-            OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this, 
buffer, numberOfMessages, callback, ctx);
+            OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this, 
buffer, numberOfMessages, callback, ctx,
+                    currentLedgerTimeoutTriggered);
             internalAsyncAddEntry(addOperation);
         });
     }
@@ -842,6 +849,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 
             // Write into lastLedger
             addOperation.setLedger(currentLedger);
+            addOperation.setTimeoutTriggered(currentLedgerTimeoutTriggered);
 
             ++currentLedgerEntries;
             currentLedgerSize += addOperation.data.readableBytes();
@@ -1585,6 +1593,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                         LedgerHandle originalCurrentLedger = currentLedger;
                         ledgers.put(lh.getId(), newLedger);
                         currentLedger = lh;
+                        currentLedgerTimeoutTriggered = new AtomicBoolean();
                         currentLedgerEntries = 0;
                         currentLedgerSize = 0;
                         updateLedgersIdsComplete(originalCurrentLedger);
@@ -1668,9 +1677,11 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
             if (existsOp != null) {
                 // If op is used by another ledger handle, we need to close it 
and create a new one
                 if (existsOp.ledger != null) {
-                    existsOp.close();
-                    existsOp = OpAddEntry.createNoRetainBuffer(existsOp.ml, 
existsOp.data,
-                            existsOp.getNumberOfMessages(), existsOp.callback, 
existsOp.ctx);
+                    existsOp = 
existsOp.duplicateAndClose(currentLedgerTimeoutTriggered);
+                } else {
+                    // This scenario should not happen.
+                    log.warn("[{}] An OpAddEntry's ledger is empty.", name);
+                    
existsOp.setTimeoutTriggered(currentLedgerTimeoutTriggered);
                 }
                 existsOp.setLedger(currentLedger);
                 pendingAddEntries.add(existsOp);
@@ -4164,13 +4175,14 @@ public class ManagedLedgerImpl implements 
ManagedLedger, CreateCallback {
         }
         OpAddEntry opAddEntry = pendingAddEntries.peek();
         if (opAddEntry != null) {
-            final long finalAddOpCount = opAddEntry.addOpCount;
             boolean isTimedOut = opAddEntry.lastInitTime != -1
                     && TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - 
opAddEntry.lastInitTime) >= timeoutSec;
             if (isTimedOut) {
-                log.error("Failed to add entry for ledger {} in time-out {} 
sec",
-                        (opAddEntry.ledger != null ? opAddEntry.ledger.getId() 
: -1), timeoutSec);
-                opAddEntry.handleAddTimeoutFailure(opAddEntry.ledger, 
finalAddOpCount);
+                log.warn("[{}] Failed to add entry {}:{} in time-out {} sec", 
this.name,
+                        opAddEntry.ledger != null ? opAddEntry.ledger.getId() 
: -1,
+                        opAddEntry.entryId, timeoutSec);
+                currentLedgerTimeoutTriggered.set(true);
+                opAddEntry.handleAddFailure(opAddEntry.ledger);
             }
         }
     }
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
index ae2beafb643..acbb0da5a4e 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
@@ -24,8 +24,10 @@ import io.netty.util.Recycler;
 import io.netty.util.Recycler.Handle;
 import io.netty.util.ReferenceCountUtil;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
@@ -45,7 +47,7 @@ import 
org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
 public class OpAddEntry implements AddCallback, CloseCallback, Runnable {
     protected ManagedLedgerImpl ml;
     LedgerHandle ledger;
-    private long entryId;
+    long entryId;
     private int numberOfMessages;
 
     @SuppressWarnings("unused")
@@ -68,6 +70,9 @@ public class OpAddEntry implements AddCallback, 
CloseCallback, Runnable {
             AtomicReferenceFieldUpdater.newUpdater(OpAddEntry.class, 
OpAddEntry.State.class, "state");
     volatile State state;
 
+    @Setter
+    private AtomicBoolean timeoutTriggered;
+
     enum State {
         OPEN,
         INITIATED,
@@ -76,8 +81,8 @@ public class OpAddEntry implements AddCallback, 
CloseCallback, Runnable {
     }
 
     public static OpAddEntry createNoRetainBuffer(ManagedLedgerImpl ml, 
ByteBuf data, AddEntryCallback callback,
-                                                  Object ctx) {
-        OpAddEntry op = createOpAddEntryNoRetainBuffer(ml, data, callback, 
ctx);
+                                                  Object ctx, AtomicBoolean 
timeoutTriggered) {
+        OpAddEntry op = createOpAddEntryNoRetainBuffer(ml, data, callback, 
ctx, timeoutTriggered);
         if (log.isDebugEnabled()) {
             log.debug("Created new OpAddEntry {}", op);
         }
@@ -85,8 +90,9 @@ public class OpAddEntry implements AddCallback, 
CloseCallback, Runnable {
     }
 
     public static OpAddEntry createNoRetainBuffer(ManagedLedgerImpl ml, 
ByteBuf data, int numberOfMessages,
-                                                  AddEntryCallback callback, 
Object ctx) {
-        OpAddEntry op = createOpAddEntryNoRetainBuffer(ml, data, callback, 
ctx);
+                                                  AddEntryCallback callback, 
Object ctx,
+                                                  AtomicBoolean 
timeoutTriggered) {
+        OpAddEntry op = createOpAddEntryNoRetainBuffer(ml, data, callback, 
ctx, timeoutTriggered);
         op.numberOfMessages = numberOfMessages;
         if (log.isDebugEnabled()) {
             log.debug("Created new OpAddEntry {}", op);
@@ -95,7 +101,8 @@ public class OpAddEntry implements AddCallback, 
CloseCallback, Runnable {
     }
 
     private static OpAddEntry createOpAddEntryNoRetainBuffer(ManagedLedgerImpl 
ml, ByteBuf data,
-                                                             AddEntryCallback 
callback, Object ctx) {
+                                                             AddEntryCallback 
callback, Object ctx,
+                                                             AtomicBoolean 
timeoutTriggered) {
         OpAddEntry op = RECYCLER.get();
         op.ml = ml;
         op.ledger = null;
@@ -109,6 +116,7 @@ public class OpAddEntry implements AddCallback, 
CloseCallback, Runnable {
         op.startTime = System.nanoTime();
         op.state = State.OPEN;
         op.payloadProcessorHandle = null;
+        op.timeoutTriggered = timeoutTriggered;
         ml.mbean.addAddEntrySample(op.dataLength);
         return op;
     }
@@ -176,7 +184,9 @@ public class OpAddEntry implements AddCallback, 
CloseCallback, Runnable {
         if (!STATE_UPDATER.compareAndSet(OpAddEntry.this, State.INITIATED, 
State.COMPLETED)) {
             log.warn("[{}] The add op is terminal legacy callback for entry 
{}-{} adding.", ml.getName(), lh.getId(),
                     entryId);
-            OpAddEntry.this.recycle();
+            // Since there is a thread is coping this object, do not recycle 
this object to avoid other problems.
+            // For example: we recycled this object, other thread get a null 
"opAddEntry.{variable_name}".
+            // Recycling is not mandatory, JVM GC will collect it.
             return;
         }
 
@@ -200,7 +210,7 @@ public class OpAddEntry implements AddCallback, 
CloseCallback, Runnable {
                     lh == null ? -1 : lh.getId(), entryId, dataLength, rc);
         }
 
-        if (rc != BKException.Code.OK) {
+        if (rc != BKException.Code.OK || timeoutTriggered.get()) {
             handleAddFailure(lh);
         } else {
             // Trigger addComplete callback in a thread hashed on the managed 
ledger name
@@ -307,13 +317,6 @@ public class OpAddEntry implements AddCallback, 
CloseCallback, Runnable {
         return false;
     }
 
-    void handleAddTimeoutFailure(final LedgerHandle ledger, Object ctx) {
-        if (checkAndCompleteOp(ctx)) {
-            this.close();
-            this.handleAddFailure(ledger);
-        }
-    }
-
     /**
      * It handles add failure on the given ledger. it can be triggered when 
add-entry fails or times out.
      *
@@ -333,8 +336,11 @@ public class OpAddEntry implements AddCallback, 
CloseCallback, Runnable {
         });
     }
 
-    void close() {
+    OpAddEntry duplicateAndClose(AtomicBoolean timeoutTriggered) {
         STATE_UPDATER.set(OpAddEntry.this, State.CLOSED);
+        OpAddEntry duplicate =
+                OpAddEntry.createNoRetainBuffer(ml, data, 
getNumberOfMessages(), callback, ctx, timeoutTriggered);
+        return duplicate;
     }
 
     public State getState() {
@@ -389,6 +395,7 @@ public class OpAddEntry implements AddCallback, 
CloseCallback, Runnable {
         startTime = -1;
         lastInitTime = -1;
         payloadProcessorHandle = null;
+        timeoutTriggered = null;
         recyclerHandle.recycle(this);
     }
 
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java
index 8b2742d9587..ec5b006c474 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Supplier;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.AsyncCallback;
@@ -54,6 +55,8 @@ public class ShadowManagedLedgerImpl extends 
ManagedLedgerImpl {
                                    String name, final 
Supplier<CompletableFuture<Boolean>> mlOwnershipChecker) {
         super(factory, bookKeeper, store, config, scheduledExecutor, name, 
mlOwnershipChecker);
         this.sourceMLName = config.getShadowSourceName();
+        // ShadowManagedLedgerImpl does not implement add entry timeout yet, 
so this variable will always be false.
+        this.currentLedgerTimeoutTriggered = new AtomicBoolean(false);
     }
 
     /**
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 122bada487a..4f521f1e99e 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -146,6 +146,7 @@ import org.apache.pulsar.metadata.api.extended.SessionEvent;
 import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
 import org.awaitility.Awaitility;
 import org.awaitility.reflect.WhiteboxImpl;
+import org.eclipse.jetty.util.BlockingArrayQueue;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.DataProvider;
@@ -3185,6 +3186,55 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
         ledger.close();
     }
 
+    @Test
+    public void testAddEntryResponseTimeout() throws Exception {
+        // Create ML with feature Add Entry Timeout Check.
+        final ManagedLedgerConfig config = new 
ManagedLedgerConfig().setAddEntryTimeoutSeconds(2);
+        final ManagedLedgerImpl ledger = (ManagedLedgerImpl) 
factory.open("ml1", config);
+        final ManagedCursor cursor = ledger.openCursor("c1");
+        final CollectCtxAddEntryCallback collectCtxAddEntryCallback = new 
CollectCtxAddEntryCallback();
+
+        // Insert a response delay.
+        bkc.addEntryResponseDelay(8, TimeUnit.SECONDS);
+
+        // Add two entries.
+        final byte[] msg1 = new byte[]{1};
+        final byte[] msg2 = new byte[]{2};
+        int ctx1 = 1;
+        int ctx2 = 2;
+        ledger.asyncAddEntry(msg1, collectCtxAddEntryCallback, ctx1);
+        ledger.asyncAddEntry(msg2, collectCtxAddEntryCallback, ctx2);
+        // Verify all write requests are completed.
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(collectCtxAddEntryCallback.addCompleteCtxList, 
Arrays.asList(1, 2));
+        });
+        Entry entry1 = cursor.readEntries(1).get(0);
+        assertEquals(entry1.getData(), msg1);
+        entry1.release();
+        Entry entry2 = cursor.readEntries(1).get(0);
+        assertEquals(entry2.getData(), msg2);
+        entry2.release();
+
+        // cleanup.
+        factory.delete(ledger.name);
+    }
+
+    private static class CollectCtxAddEntryCallback implements 
AddEntryCallback {
+
+        public List<Object> addCompleteCtxList = new BlockingArrayQueue<>();
+        public List<Object> addFailedCtxList = new BlockingArrayQueue<>();
+
+        @Override
+        public void addComplete(Position position, ByteBuf entryData, Object 
ctx) {
+            addCompleteCtxList.add(ctx);
+        }
+
+        @Override
+        public void addFailed(ManagedLedgerException exception, Object ctx) {
+            addFailedCtxList.add(ctx);
+        }
+    }
+
     /**
      * It verifies that if bk-client doesn't complete the add-entry in given 
time out then broker is resilient enough
      * to create new ledger and add entry successfully.
@@ -3260,7 +3310,8 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
 
         List<OpAddEntry> oldOps = new ArrayList<>();
         for (int i = 0; i < 10; i++) {
-            OpAddEntry op = OpAddEntry.createNoRetainBuffer(ledger, 
ByteBufAllocator.DEFAULT.buffer(128).retain(), null, null);
+            OpAddEntry op = OpAddEntry.createNoRetainBuffer(ledger,
+                    ByteBufAllocator.DEFAULT.buffer(128).retain(), null, null, 
new AtomicBoolean());
             if (i > 4) {
                 op.setLedger(mock(LedgerHandle.class));
             }
diff --git 
a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
 
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
index f0d279ef250..4516cfea01f 100644
--- 
a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
+++ 
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
@@ -18,6 +18,7 @@
  */
 package org.apache.bookkeeper.client;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import com.google.common.collect.Lists;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -89,6 +90,7 @@ public class PulsarMockBookKeeper extends BookKeeper {
     }
 
     final Queue<Long> addEntryDelaysMillis = new ConcurrentLinkedQueue<>();
+    final Queue<Long> addEntryResponseDelaysMillis = new 
ConcurrentLinkedQueue<>();
     final List<CompletableFuture<Void>> failures = new ArrayList<>();
     final List<CompletableFuture<Void>> addEntryFailures = new ArrayList<>();
 
@@ -367,6 +369,11 @@ public class PulsarMockBookKeeper extends BookKeeper {
         addEntryDelaysMillis.add(unit.toMillis(delay));
     }
 
+    public synchronized void addEntryResponseDelay(long delay, TimeUnit unit) {
+        checkArgument(delay >= 0, "The delay time must not be negative.");
+        addEntryResponseDelaysMillis.add(unit.toMillis(delay));
+    }
+
     static int getExceptionCode(Throwable t) {
         if (t instanceof BKException) {
             return ((BKException) t).getCode();
diff --git 
a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
 
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
index dea33a0e676..aa61e541d0d 100644
--- 
a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
+++ 
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
@@ -197,6 +197,13 @@ public class PulsarMockLedgerHandle extends LedgerHandle {
                         
cb.addComplete(PulsarMockBookKeeper.getExceptionCode(exception),
                                        PulsarMockLedgerHandle.this, 
LedgerHandle.INVALID_ENTRY_ID, ctx);
                     } else {
+                        Long responseDelayMillis = 
bk.addEntryResponseDelaysMillis.poll();
+                        if (responseDelayMillis != null) {
+                            try {
+                                Thread.sleep(responseDelayMillis);
+                            } catch (InterruptedException e) {
+                            }
+                        }
                         cb.addComplete(BKException.Code.OK, 
PulsarMockLedgerHandle.this, entryId, ctx);
                     }
                 }, bk.executor);

Reply via email to