This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9c5e1c310e2 [fix][test] Fix ManagedCursorTest and NonDurableCursorTest
flaky tests (#25101)
9c5e1c310e2 is described below
commit 9c5e1c310e20d15141da6792fbf3434007278d48
Author: Oneby Wang <[email protected]>
AuthorDate: Wed Dec 24 00:02:01 2025 +0800
[fix][test] Fix ManagedCursorTest and NonDurableCursorTest flaky tests
(#25101)
---
.../bookkeeper/mledger/impl/ManagedCursorTest.java | 194 +++++++++++++++++++--
.../mledger/impl/NonDurableCursorTest.java | 12 +-
.../mledger/util/ManagedLedgerTestUtil.java | 54 ++++++
3 files changed, 240 insertions(+), 20 deletions(-)
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index d56352c1197..a65d830096e 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -20,6 +20,7 @@ package org.apache.bookkeeper.mledger.impl;
import static
org.apache.bookkeeper.mledger.impl.EntryCountEstimator.estimateEntryCountByBytesSize;
import static
org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.any;
@@ -49,6 +50,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collections;
+import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -60,6 +62,7 @@ import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
@@ -110,6 +113,7 @@ import
org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
+import org.apache.bookkeeper.mledger.util.ManagedLedgerTestUtil;
import org.apache.bookkeeper.mledger.util.ManagedLedgerUtils;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.commons.collections4.iterators.EmptyIterator;
@@ -579,7 +583,10 @@ public class ManagedCursorTest extends
MockedBookKeeperTestCase {
@Cleanup("shutdown")
ManagedLedgerFactory factory2 = new
ManagedLedgerFactoryImpl(metadataStore, bkc);
- ledger = factory2.open("my_test_ledger", new
ManagedLedgerConfig().setMaxEntriesPerLedger(1));
+ // Add retry logic here to prove open operation will finally success
despite race condition.
+ ledger = ManagedLedgerTestUtil.retry(
+ () -> factory2.open("my_test_ledger", new
ManagedLedgerConfig().setMaxEntriesPerLedger(1)));
+
c1 = ledger.openCursor("c1");
c2 = ledger.openCursor("c2");
@@ -1248,7 +1255,8 @@ public class ManagedCursorTest extends
MockedBookKeeperTestCase {
@Test(timeOut = 20000)
void cursorPersistence() throws Exception {
- ManagedLedger ledger = factory.open("my_test_ledger");
+ // Open cursor_persistence_ledger ledger, create ledger 3.
+ ManagedLedger ledger = factory.open("cursor_persistence_ledger");
ManagedCursor c1 = ledger.openCursor("c1");
ManagedCursor c2 = ledger.openCursor("c2");
ledger.addEntry("dummy-entry-1".getBytes(Encoding));
@@ -1260,26 +1268,28 @@ public class ManagedCursorTest extends
MockedBookKeeperTestCase {
List<Entry> entries = c1.readEntries(3);
Position p1 = entries.get(2).getPosition();
+ // Mark delete, create ledger 4 due to cursor ledger state is NoLedger.
c1.markDelete(p1);
entries.forEach(Entry::release);
entries = c1.readEntries(4);
Position p2 = entries.get(2).getPosition();
+ // Mark delete, create ledger 5 due to cursor ledger state is NoLedger.
c2.markDelete(p2);
entries.forEach(Entry::release);
// Reopen
-
@Cleanup("shutdown")
ManagedLedgerFactory factory2 = new
ManagedLedgerFactoryImpl(metadataStore, bkc);
- ledger = factory2.open("my_test_ledger");
+ // Recovery open cursor_persistence_ledger ledger, create ledger 6,
and move mark delete position to 6:-1.
+ // See PR https://github.com/apache/pulsar/pull/25087.
+ ledger = factory2.open("cursor_persistence_ledger");
c1 = ledger.openCursor("c1");
c2 = ledger.openCursor("c2");
assertEquals(c1.getMarkDeletedPosition(), p1);
- // move mark-delete-position from 3:5 to 6:-1 since all the entries
have been consumed
ManagedCursor finalC2 = c2;
- Awaitility.await().untilAsserted(() ->
assertNotEquals(finalC2.getMarkDeletedPosition(), p2));
+ Awaitility.await().untilAsserted(() ->
assertThat(finalC2.getMarkDeletedPosition()).isGreaterThan(p2));
}
@Test(timeOut = 20000)
@@ -1350,7 +1360,7 @@ public class ManagedCursorTest extends
MockedBookKeeperTestCase {
assertEquals(c4.getMarkDeletedPosition(), p1);
}
- @Test(groups = "flaky")
+ @Test(timeOut = 20000)
public void asyncMarkDeleteBlocking() throws Exception {
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setMaxEntriesPerLedger(10);
@@ -1385,16 +1395,166 @@ public class ManagedCursorTest extends
MockedBookKeeperTestCase {
}
latch.await();
+ assertEquals(c1.getNumberOfEntries(), 0);
+
+ // Reopen
+ @Cleanup("shutdown") ManagedLedgerFactory factory2 = new
ManagedLedgerFactoryImpl(metadataStore, bkc);
+ // flaky test case: factory2.open() may throw
MetadataStoreException$BadVersionException, race condition:
+ // 1. factory2.open() triggers ledger recovery, read versionA
ManagedLedgerInfo of my_test_ledger ledger.
+ // 2. my_test_ledger ledger rollover triggers
MetaStoreImpl.asyncUpdateLedgerIds(), update versionB
+ // ManagedLedgerInfo into metaStore.
+ // 3. factory2.open() triggers MetaStoreImpl.asyncUpdateLedgerIds(),
update versionA ManagedLedgerInfo
+ // into metaStore, then throws BadVersionException and moves
my_test_ledger ledger to fenced state.
+ // Recovery open async_mark_delete_blocking_test_ledger ledger,
ledgerId++
+ // Add retry logic here to prove open operation will finally success
despite race condition.
+ ledger = ManagedLedgerTestUtil.retry(() ->
factory2.open("my_test_ledger"));
+ ManagedCursor c2 = ledger.openCursor("c1");
+
+ // Three cases:
+ // 1. cursor recovered with lastPosition markDeletePosition
+ // 2. cursor recovered with (lastPositionLedgerId+1:-1)
markDeletePosition, cursor ledger not rolled over, we
+ // move markDeletePosition to (lastPositionLedgerId+2:-1)
+ // 3. cursor recovered with (lastPositionLedgerId+1:-1)
markDeletePosition, cursor ledger rolled over, we
+ // move markDeletePosition to (lastPositionLedgerId+3:-1)
+ // See PR https://github.com/apache/pulsar/pull/25087.
+
assertThat(c2.getMarkDeletedPosition()).isGreaterThanOrEqualTo(lastPosition.get());
+ }
+
+ @Test(timeOut = 20000)
+ public void asyncMarkDeleteBlockingWithOneShot() throws Exception {
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMaxEntriesPerLedger(10);
+ // open async_mark_delete_blocking_test_ledger ledger, create ledger 3.
+ ManagedLedger ledger =
factory.open("async_mark_delete_blocking_test_ledger", config);
+ final ManagedCursor c1 = ledger.openCursor("c1");
+ final AtomicReference<Position> lastPosition = new AtomicReference<>();
+ // just for log debug purpose
+ Deque<Position> positions = new ConcurrentLinkedDeque<>();
+
+ // In previous flaky test, we set num=100, PR
https://github.com/apache/pulsar/pull/25087 will make the test
+ // more flaky. Flaky case:
+ // 1. cursor recovered with markDeletePosition 12:9,
persistentMarkDeletePosition 12:9.
+ // 2. cursor recovered with mark markDeletePosition 13:-1,
persistentMarkDeletePosition 13:-1.
+ // Here, we set num to 101, make sure the ledger 13 is created and
become the active(last) ledger,
+ // and cursor will always be recovered with markDeletePosition 13:0,
persistentMarkDeletePosition 13:0.
+ final int num = 101;
+ final CountDownLatch addEntryLatch = new CountDownLatch(num);
+ // 10 entries per ledger, create ledger 4~13
+ for (int i = 0; i < num; i++) {
+ String entryStr = "entry-" + i;
+ ledger.asyncAddEntry(entryStr.getBytes(Encoding), new
AddEntryCallback() {
+ @Override
+ public void addFailed(ManagedLedgerException exception, Object
ctx) {
+ }
+ @Override
+ public void addComplete(Position position, ByteBuf entryData,
Object ctx) {
+ lastPosition.set(position);
+ positions.offer(position);
+ addEntryLatch.countDown();
+ }
+ }, null);
+ }
+ addEntryLatch.await();
+
+ // If we set num=100, to avoid flaky test, we should add
Thread.sleep(1000) here to make sure ledger rollover
+ // is finished, but this sleep can not guarantee c1 always recovered
with markDeletePosition 12:9.
+ // Thread.sleep(1000);
+
+ final CountDownLatch markDeleteLatch = new CountDownLatch(1);
+ // Mark delete, create ledger 14 due to cursor ledger state is
NoLedger.
+ // The num=100 flaky test case, markDelete operation is triggered
twice:
+ // 1. first is triggered by c1.asyncMarkDelete(), markDeletePosition
is 12:9.
+ // 2. second is triggered by
ManagedLedgerImpl.updateLedgersIdsComplete() due to ledger full rollover,
+ // The entries in ledger 12 are all consumed, and we move
persistentMarkDeletePosition and
+ // markDeletePosition to 13:-1 due to PR
https://github.com/apache/pulsar/pull/25087.
+ // Before this pr, we will not move persistentMarkDeletePosition.
+ // Two markDelete operations is almost triggered at the same time
without order guarantee:
+ // 1. main thread triggered c1.asyncMarkDelete.
+ // 2. bookkeeper-ml-scheduler-OrderedScheduler-0-0 thread triggered
create ledger 13 due to ledger full
+ // rollover by OpAddEntry.
+ // OpAddEntry will close and create a new ledger when closeWhenDone is
true.
+ // In ManagedLedgerImpl class, MetaStoreCallback cb calls
maybeUpdateCursorBeforeTrimmingConsumedLedger(),
+ // which calls cursor.asyncMarkDelete(), so markDelete operation in
ledger rollover may execute after
+ // AddEntryCallback.addComplete(). The root cause is
cursor.asyncMarkDelete() does not propagate completion or
+ // failure to it caller callback
+ c1.asyncMarkDelete(lastPosition.get(), new MarkDeleteCallback() {
+ @Override
+ public void markDeleteFailed(ManagedLedgerException exception,
Object ctx) {
+ }
+
+ @Override
+ public void markDeleteComplete(Object ctx) {
+ markDeleteLatch.countDown();
+ }
+ }, null);
+ markDeleteLatch.await();
assertEquals(c1.getNumberOfEntries(), 0);
// Reopen
- @Cleanup("shutdown")
- ManagedLedgerFactory factory2 = new
ManagedLedgerFactoryImpl(metadataStore, bkc);
- ledger = factory2.open("my_test_ledger");
+ @Cleanup("shutdown") ManagedLedgerFactory factory2 = new
ManagedLedgerFactoryImpl(metadataStore, bkc);
+ // Recovery open async_mark_delete_blocking_test_ledger ledger, create
ledger 15.
+ // When executing
ManagedLedgerImpl.maybeUpdateCursorBeforeTrimmingConsumedLedger(), the
curPointedLedger is 13,
+ // the nextPointedLedger is 15, ledger 13 only has 1 consumed entry
13:0,
+ // so we will move markDeletePosition to 15:-1, see PR
https://github.com/apache/pulsar/pull/25087.
+ ledger = factory2.open("async_mark_delete_blocking_test_ledger");
ManagedCursor c2 = ledger.openCursor("c1");
- assertEquals(c2.getMarkDeletedPosition(), lastPosition.get());
+ log.info("positions size: {}, positions: {}", positions.size(),
positions);
+ // To make sure
ManagedLedgerImpl.maybeUpdateCursorBeforeTrimmingConsumedLedger() is completed,
we should
+ // wait until c2.getMarkDeletedPosition() equals 15:-1, see PR
https://github.com/apache/pulsar/pull/25087.
+ Awaitility.await()
+ .untilAsserted(() ->
assertThat(c2.getMarkDeletedPosition()).isGreaterThan(lastPosition.get()));
+ }
+
+ @Test(timeOut = 20000)
+ public void asyncMarkDeleteBlockingWithMultiShots() throws Exception {
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMaxEntriesPerLedger(10);
+ config.setMetadataMaxEntriesPerLedger(5);
+ ManagedLedger ledger =
factory.open("async_mark_delete_blocking_test_ledger", config);
+ final ManagedCursor c1 = ledger.openCursor("c1");
+ final AtomicReference<Position> lastPosition = new AtomicReference<>();
+
+ final int num = 101;
+ final CountDownLatch addEntryLatch = new CountDownLatch(num);
+ for (int i = 0; i < num; i++) {
+ String entryStr = "entry-" + i;
+ ledger.asyncAddEntry(entryStr.getBytes(Encoding), new
AddEntryCallback() {
+ @Override
+ public void addFailed(ManagedLedgerException exception, Object
ctx) {
+ }
+
+ @Override
+ public void addComplete(Position position, ByteBuf entryData,
Object ctx) {
+ lastPosition.set(position);
+ c1.asyncMarkDelete(lastPosition.get(), new
MarkDeleteCallback() {
+ @Override
+ public void markDeleteFailed(ManagedLedgerException
exception, Object ctx) {
+ }
+
+ @Override
+ public void markDeleteComplete(Object ctx) {
+ addEntryLatch.countDown();
+ }
+ }, null);
+
+ }
+ }, null);
+ }
+ addEntryLatch.await();
+ assertEquals(c1.getNumberOfEntries(), 0);
+
+ // Reopen
+ @Cleanup("shutdown") ManagedLedgerFactory factory2 = new
ManagedLedgerFactoryImpl(metadataStore, bkc);
+ ledger = factory2.open("async_mark_delete_blocking_test_ledger");
+ ManagedCursor c2 = ledger.openCursor("c1");
+
+ // flaky test case: c2.getMarkDeletedPosition() may be equals
lastPositionLedgerId+1 or lastPositionLedgerId+2,
+ // the last c1.asyncMarkDelete() operation may trigger a cursor ledger
rollover
+ // See PR https://github.com/apache/pulsar/pull/25087.
+ Awaitility.await()
+ .untilAsserted(() ->
assertThat(c2.getMarkDeletedPosition()).isGreaterThan(lastPosition.get()));
}
@Test(timeOut = 20000)
@@ -1404,7 +1564,7 @@ public class ManagedCursorTest extends
MockedBookKeeperTestCase {
final ManagedCursor c1 = ledger.openCursor("c1");
final int num = 100;
- List<Position> positions = new ArrayList();
+ List<Position> positions = new ArrayList<>();
for (int i = 0; i < num; i++) {
Position p = ledger.addEntry("dummy-entry".getBytes(Encoding));
positions.add(p);
@@ -1433,10 +1593,11 @@ public class ManagedCursorTest extends
MockedBookKeeperTestCase {
// Reopen
@Cleanup("shutdown")
ManagedLedgerFactory factory2 = new
ManagedLedgerFactoryImpl(metadataStore, bkc);
- ledger = factory2.open("my_test_ledger");
+ // Add retry logic here to prove open operation will finally success
despite race condition.
+ ledger = ManagedLedgerTestUtil.retry(() ->
factory2.open("my_test_ledger"));
ManagedCursor c2 = ledger.openCursor("c1");
- assertEquals(c2.getMarkDeletedPosition(), lastPosition);
+
assertThat(c2.getMarkDeletedPosition()).isGreaterThanOrEqualTo(lastPosition);
}
@Test(timeOut = 20000)
@@ -4509,7 +4670,7 @@ public class ManagedCursorTest extends
MockedBookKeeperTestCase {
ledger.close();
}
- @Test(groups = "flaky")
+ @Test(timeOut = 20000)
public void testLazyCursorLedgerCreationForSubscriptionCreation() throws
Exception {
ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
ManagedLedgerImpl ledger =
@@ -4521,7 +4682,8 @@ public class ManagedCursorTest extends
MockedBookKeeperTestCase {
ledger = (ManagedLedgerImpl)
factory2.open("testLazyCursorLedgerCreation", managedLedgerConfig);
assertNotNull(ledger.getCursors().get("test"));
ManagedCursorImpl cursor1 = (ManagedCursorImpl)
ledger.openCursor("test");
- assertEquals(cursor1.getMarkDeletedPosition(), p1);
+ // Reopen ledger may move cursor to next position. See PR
https://github.com/apache/pulsar/pull/25087.
+
assertThat(cursor1.getMarkDeletedPosition()).isGreaterThanOrEqualTo(p1);
factory2.shutdown();
}
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
index 69927f1ebaf..d53036df26b 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
@@ -20,6 +20,7 @@ package org.apache.bookkeeper.mledger.impl;
import static java.nio.charset.StandardCharsets.UTF_8;
import static
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.ENTRIES_ADDED_COUNTER_UPDATER;
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
@@ -50,6 +51,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
+import org.apache.bookkeeper.mledger.util.ManagedLedgerTestUtil;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
@@ -532,7 +534,7 @@ public class NonDurableCursorTest extends
MockedBookKeeperTestCase {
assertEquals(cursor.getReadPosition(),
PositionFactory.create(p4.getLedgerId(), p4.getEntryId() + 1));
}
- @Test(timeOut = 20000, groups = "flaky")
+ @Test(timeOut = 20000)
public void asyncMarkDeleteBlocking() throws Exception {
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setMaxEntriesPerLedger(10);
@@ -567,16 +569,18 @@ public class NonDurableCursorTest extends
MockedBookKeeperTestCase {
}
latch.await();
-
assertEquals(c1.getNumberOfEntries(), 0);
// Reopen
@Cleanup("shutdown")
ManagedLedgerFactory factory2 = new
ManagedLedgerFactoryImpl(metadataStore, bkc);
- ledger = factory2.open("my_test_ledger");
+ ledger = ManagedLedgerTestUtil.retry(() ->
factory2.open("my_test_ledger"));
ManagedCursor c2 = ledger.openCursor("c1");
- assertEquals(c2.getMarkDeletedPosition(), lastPosition.get());
+ // Since all entries are consumed, we should move mark delete position
to nextLedgerId:-1.
+ // See PR https://github.com/apache/pulsar/pull/25087.
+ Awaitility.await().untilAsserted(
+ () ->
assertThat(c2.getMarkDeletedPosition()).isGreaterThanOrEqualTo(lastPosition.get()));
}
@Test(timeOut = 20000)
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/ManagedLedgerTestUtil.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/ManagedLedgerTestUtil.java
new file mode 100644
index 00000000000..0acca723f8f
--- /dev/null
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/ManagedLedgerTestUtil.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.util;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public abstract class ManagedLedgerTestUtil {
+
+ public static <T> T retry(ThrowingSupplier<T> supplier) {
+ return retry(10, supplier);
+ }
+
+ public static <T> T retry(int retryCount, ThrowingSupplier<T> supplier) {
+ for (int i = 0; i < retryCount; i++) {
+ if (i > 0) {
+ try {
+ log.info("Retrying after 100ms {}/{}", i, retryCount);
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ try {
+ return supplier.get();
+ } catch (Exception e) {
+ log.warn("Failed to execute supplier: {}", supplier, e);
+ }
+ }
+ throw new RuntimeException("Failed to execute supplier after " +
retryCount + " retries");
+ }
+
+ @FunctionalInterface
+ public interface ThrowingSupplier<T> {
+ T get() throws Exception;
+ }
+
+}