lhotari commented on code in PR #25101:
URL: https://github.com/apache/pulsar/pull/25101#discussion_r2642221774


##########
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java:
##########
@@ -1385,16 +1390,173 @@ public void markDeleteComplete(Object ctx) {
         }
 
         latch.await();
-
         assertEquals(c1.getNumberOfEntries(), 0);
 
+        // Sleep 1s here to wait ledger rollover finished
+        Thread.sleep(1000);
+
         // Reopen
-        @Cleanup("shutdown")
-        ManagedLedgerFactory factory2 = new 
ManagedLedgerFactoryImpl(metadataStore, bkc);
+        @Cleanup("shutdown") ManagedLedgerFactory factory2 = new 
ManagedLedgerFactoryImpl(metadataStore, bkc);
+        // flaky test case: factory2.open() may throw 
MetadataStoreException$BadVersionException, race condition:
+        // 1. my_test_ledger ledger rollover triggers cursor.asyncMarkDelete() 
operation.
+        // 2. factory2.open() triggers ledger recovery, read versionA 
ManagedLedgerInfo of my_test_ledger ledger.
+        // 3. cursor.asyncMarkDelete() triggers 
MetaStoreImpl.asyncUpdateLedgerIds(), update versionB ManagedLedgerInfo
+        //    into metaStore.
+        // 4. factory2.open() triggers MetaStoreImpl.asyncUpdateLedgerIds(), 
update versionA ManagedLedgerInfo
+        //    into metaStore, then throws BadVersionException and moves 
my_test_ledger ledger to fenced state.
+        // See PR https://github.com/apache/pulsar/pull/25087.
+        // Recovery open async_mark_delete_blocking_test_ledger ledger, 
ledgerId++
         ledger = factory2.open("my_test_ledger");
         ManagedCursor c2 = ledger.openCursor("c1");
 
-        assertEquals(c2.getMarkDeletedPosition(), lastPosition.get());
+        // Three cases:
+        // 1. cursor recovered with lastPosition markDeletePosition
+        // 2. cursor recovered with (lastPositionLedgerId+1:-1) 
markDeletePosition, cursor ledger not rolled over, we
+        //    move markDeletePosition to (lastPositionLegderId+2:-1)
+        // 3. cursor recovered with (lastPositionLedgerId+1:-1) 
markDeletePosition, cursor ledger rolled over, we
+        //    move markDeletePosition to (lastPositionLegderId+3:-1)
+        // See PR https://github.com/apache/pulsar/pull/25087.
+        log.info("c2 markDeletePosition: {}, lastPosition: {}", 
c2.getMarkDeletedPosition(), lastPosition);
+        Awaitility.await()
+                .untilAsserted(() -> 
assertTrue(c2.getMarkDeletedPosition().compareTo(lastPosition.get()) >= 0));
+    }
+
+    @Test(timeOut = 20000)
+    public void asyncMarkDeleteBlockingWithOneShot() throws Exception {
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMaxEntriesPerLedger(10);
+        // open async_mark_delete_blocking_test_ledger ledger, create ledger 3.
+        ManagedLedger ledger = 
factory.open("async_mark_delete_blocking_test_ledger", config);
+        final ManagedCursor c1 = ledger.openCursor("c1");
+        final AtomicReference<Position> lastPosition = new AtomicReference<>();
+        // just for log debug purpose
+        Deque<Position> positions = new ConcurrentLinkedDeque<>();
+
+        // In previous flaky test, we set num=100, PR 
https://github.com/apache/pulsar/pull/25087 will make the test
+        // more flaky. Flaky case:
+        //   1. cursor recovered with markDeletePosition 12:9, 
persistentMarkDeletePosition 12:9.
+        //   2. cursor recovered with mark markDeletePosition 13:-1, 
persistentMarkDeletePosition 13:-1.
+        // Here, we set num to 101, make sure the ledger 13 is created and 
become the active(last) ledger,
+        // and cursor will always be recovered with markDeletePosition 13:0, 
persistentMarkDeletePosition 13:0.
+        final int num = 101;
+        final CountDownLatch addEntryLatch = new CountDownLatch(num);
+        // 10 entries per ledger, create ledger 4~13
+        for (int i = 0; i < num; i++) {
+            String entryStr = "entry-" + i;
+            ledger.asyncAddEntry(entryStr.getBytes(Encoding), new 
AddEntryCallback() {
+                @Override
+                public void addFailed(ManagedLedgerException exception, Object 
ctx) {
+                }
+
+                @Override
+                public void addComplete(Position position, ByteBuf entryData, 
Object ctx) {
+                    lastPosition.set(position);
+                    positions.offer(position);
+                    addEntryLatch.countDown();
+                }
+            }, null);
+        }
+        addEntryLatch.await();
+
+        // If we set num=100, to avoid flaky test, we should add 
Thread.sleep(1000) here to make sure ledger rollover
+        // is finished, but this sleep can not guarantee c1 always recovered 
with markDeletePosition 12:9.
+        // Thread.sleep(1000);
+
+        final CountDownLatch markDeleteLatch = new CountDownLatch(1);
+        // Mark delete, create ledger 14 due to cursor ledger state is 
NoLedger.
+        // The num=100 flaky test case, markDelete operation is triggered 
twice:
+        //   1. first is triggered by c1.asyncMarkDelete(), markDeletePosition 
is 12:9.
+        //   2. second is triggered by 
ManagedLedgerImpl.updateLedgersIdsComplete() due to ledger full rollover,
+        //      The entries in ledger 12 are all consumed, and we move 
persistentMarkDeletePosition and
+        //      markDeletePosition to 13:-1 due to PR 
https://github.com/apache/pulsar/pull/25087.
+        //      Before this pr, we will not move persistentMarkDeletePosition.
+        // Two markDelete operations is almost triggered at the same time 
without order guarantee:
+        //   1. main thread triggered c1.asyncMarkDelete.
+        //   2. bookkeeper-ml-scheduler-OrderedScheduler-0-0 thread triggered 
create ledger 13 due to ledger full
+        //      rollover by OpAddEntry.
+        // OpAddEntry will close and create a new ledger when closeWhenDone is 
true.
+        // In ManagedLedgerImpl class, MetaStoreCallback cb calls 
maybeUpdateCursorBeforeTrimmingConsumedLedger(),
+        // which calls cursor.asyncMarkDelete(), so markDelete operation in 
ledger rollover may execute after
+        // AddEntryCallback.addComplete(). The root cause is 
cursor.asyncMarkDelete() does not propagate completion or
+        // failure to it caller callback
+        c1.asyncMarkDelete(lastPosition.get(), new MarkDeleteCallback() {
+            @Override
+            public void markDeleteFailed(ManagedLedgerException exception, 
Object ctx) {
+            }
+
+            @Override
+            public void markDeleteComplete(Object ctx) {
+                markDeleteLatch.countDown();
+            }
+        }, null);
+        markDeleteLatch.await();
+        assertEquals(c1.getNumberOfEntries(), 0);
+
+        // Reopen
+        @Cleanup("shutdown") ManagedLedgerFactory factory2 = new 
ManagedLedgerFactoryImpl(metadataStore, bkc);
+        // Recovery open async_mark_delete_blocking_test_ledger ledger, create 
ledger 15.
+        // When executing 
ManagedLedgerImpl.maybeUpdateCursorBeforeTrimmingConsumedLedger(), the 
curPointedLedger is 13,
+        // the nextPointedLedger is 15, ledger 13 only has 1 consumed entry 
13:0,
+        // so we will move markDeletePosition to 15:-1, see PR 
https://github.com/apache/pulsar/pull/25087.
+        ledger = factory2.open("async_mark_delete_blocking_test_ledger");
+        ManagedCursor c2 = ledger.openCursor("c1");
+
+        log.info("positions size: {}, positions: {}", positions.size(), 
positions);
+        // To make sure 
ManagedLedgerImpl.maybeUpdateCursorBeforeTrimmingConsumedLedger() is completed, 
we should
+        // wait until c2.getMarkDeletedPosition() equals 15:-1, see PR 
https://github.com/apache/pulsar/pull/25087.
+        Awaitility.await()
+                .untilAsserted(() -> 
assertTrue(c2.getMarkDeletedPosition().compareTo(lastPosition.get()) > 0));
+    }
+
+    @Test(timeOut = 20000)
+    public void asyncMarkDeleteBlockingWithMultiShots() throws Exception {
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMaxEntriesPerLedger(10);
+        config.setMetadataMaxEntriesPerLedger(5);
+        ManagedLedger ledger = 
factory.open("async_mark_delete_blocking_test_ledger", config);
+        final ManagedCursor c1 = ledger.openCursor("c1");
+        final AtomicReference<Position> lastPosition = new AtomicReference<>();
+
+        final int num = 101;
+        final CountDownLatch addEntryLatch = new CountDownLatch(num);
+        for (int i = 0; i < num; i++) {
+            String entryStr = "entry-" + i;
+            ledger.asyncAddEntry(entryStr.getBytes(Encoding), new 
AddEntryCallback() {
+                @Override
+                public void addFailed(ManagedLedgerException exception, Object 
ctx) {
+                }
+
+                @Override
+                public void addComplete(Position position, ByteBuf entryData, 
Object ctx) {
+                    lastPosition.set(position);
+                    c1.asyncMarkDelete(lastPosition.get(), new 
MarkDeleteCallback() {
+                        @Override
+                        public void markDeleteFailed(ManagedLedgerException 
exception, Object ctx) {
+                        }
+
+                        @Override
+                        public void markDeleteComplete(Object ctx) {
+                            addEntryLatch.countDown();
+                        }
+                    }, null);
+
+                }
+            }, null);
+        }
+        addEntryLatch.await();
+        assertEquals(c1.getNumberOfEntries(), 0);
+
+        // Reopen
+        @Cleanup("shutdown") ManagedLedgerFactory factory2 = new 
ManagedLedgerFactoryImpl(metadataStore, bkc);
+        ledger = factory2.open("async_mark_delete_blocking_test_ledger");
+        ManagedCursor c2 = ledger.openCursor("c1");
+
+        // flaky test case: c2.getMarkDeletedPosition() may be equals 
lastPositionLedgerId+1 or lastPositionLedgerId+2,
+        // the last c1.asyncMarkDelete() operation may trigger a cursor ledger 
rollover
+        // See PR https://github.com/apache/pulsar/pull/25087.
+        log.info("c2 markDeletePosition: {}, lastPosition: {}", 
c2.getMarkDeletedPosition(), lastPosition);
+        Awaitility.await()
+                .untilAsserted(() -> 
assertTrue(c2.getMarkDeletedPosition().compareTo(lastPosition.get()) > 0));

Review Comment:
   ```suggestion
                   .untilAsserted(() -> 
assertThat(c2.getMarkDeletedPosition()).isGreaterThan(lastPosition.get()));
   ```



##########
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java:
##########
@@ -1385,16 +1390,173 @@ public void markDeleteComplete(Object ctx) {
         }
 
         latch.await();
-
         assertEquals(c1.getNumberOfEntries(), 0);
 
+        // Sleep 1s here to wait ledger rollover finished
+        Thread.sleep(1000);
+
         // Reopen
-        @Cleanup("shutdown")
-        ManagedLedgerFactory factory2 = new 
ManagedLedgerFactoryImpl(metadataStore, bkc);
+        @Cleanup("shutdown") ManagedLedgerFactory factory2 = new 
ManagedLedgerFactoryImpl(metadataStore, bkc);
+        // flaky test case: factory2.open() may throw 
MetadataStoreException$BadVersionException, race condition:
+        // 1. my_test_ledger ledger rollover triggers cursor.asyncMarkDelete() 
operation.
+        // 2. factory2.open() triggers ledger recovery, read versionA 
ManagedLedgerInfo of my_test_ledger ledger.
+        // 3. cursor.asyncMarkDelete() triggers 
MetaStoreImpl.asyncUpdateLedgerIds(), update versionB ManagedLedgerInfo
+        //    into metaStore.
+        // 4. factory2.open() triggers MetaStoreImpl.asyncUpdateLedgerIds(), 
update versionA ManagedLedgerInfo
+        //    into metaStore, then throws BadVersionException and moves 
my_test_ledger ledger to fenced state.
+        // See PR https://github.com/apache/pulsar/pull/25087.
+        // Recovery open async_mark_delete_blocking_test_ledger ledger, 
ledgerId++
         ledger = factory2.open("my_test_ledger");
         ManagedCursor c2 = ledger.openCursor("c1");
 
-        assertEquals(c2.getMarkDeletedPosition(), lastPosition.get());
+        // Three cases:
+        // 1. cursor recovered with lastPosition markDeletePosition
+        // 2. cursor recovered with (lastPositionLedgerId+1:-1) 
markDeletePosition, cursor ledger not rolled over, we
+        //    move markDeletePosition to (lastPositionLegderId+2:-1)
+        // 3. cursor recovered with (lastPositionLedgerId+1:-1) 
markDeletePosition, cursor ledger rolled over, we
+        //    move markDeletePosition to (lastPositionLegderId+3:-1)
+        // See PR https://github.com/apache/pulsar/pull/25087.
+        log.info("c2 markDeletePosition: {}, lastPosition: {}", 
c2.getMarkDeletedPosition(), lastPosition);
+        Awaitility.await()
+                .untilAsserted(() -> 
assertTrue(c2.getMarkDeletedPosition().compareTo(lastPosition.get()) >= 0));
+    }
+
+    @Test(timeOut = 20000)
+    public void asyncMarkDeleteBlockingWithOneShot() throws Exception {
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMaxEntriesPerLedger(10);
+        // open async_mark_delete_blocking_test_ledger ledger, create ledger 3.
+        ManagedLedger ledger = 
factory.open("async_mark_delete_blocking_test_ledger", config);
+        final ManagedCursor c1 = ledger.openCursor("c1");
+        final AtomicReference<Position> lastPosition = new AtomicReference<>();
+        // just for log debug purpose
+        Deque<Position> positions = new ConcurrentLinkedDeque<>();
+
+        // In previous flaky test, we set num=100, PR 
https://github.com/apache/pulsar/pull/25087 will make the test
+        // more flaky. Flaky case:
+        //   1. cursor recovered with markDeletePosition 12:9, 
persistentMarkDeletePosition 12:9.
+        //   2. cursor recovered with mark markDeletePosition 13:-1, 
persistentMarkDeletePosition 13:-1.
+        // Here, we set num to 101, make sure the ledger 13 is created and 
become the active(last) ledger,
+        // and cursor will always be recovered with markDeletePosition 13:0, 
persistentMarkDeletePosition 13:0.
+        final int num = 101;
+        final CountDownLatch addEntryLatch = new CountDownLatch(num);
+        // 10 entries per ledger, create ledger 4~13
+        for (int i = 0; i < num; i++) {
+            String entryStr = "entry-" + i;
+            ledger.asyncAddEntry(entryStr.getBytes(Encoding), new 
AddEntryCallback() {
+                @Override
+                public void addFailed(ManagedLedgerException exception, Object 
ctx) {
+                }
+
+                @Override
+                public void addComplete(Position position, ByteBuf entryData, 
Object ctx) {
+                    lastPosition.set(position);
+                    positions.offer(position);
+                    addEntryLatch.countDown();
+                }
+            }, null);
+        }
+        addEntryLatch.await();
+
+        // If we set num=100, to avoid flaky test, we should add 
Thread.sleep(1000) here to make sure ledger rollover
+        // is finished, but this sleep can not guarantee c1 always recovered 
with markDeletePosition 12:9.
+        // Thread.sleep(1000);
+
+        final CountDownLatch markDeleteLatch = new CountDownLatch(1);
+        // Mark delete, create ledger 14 due to cursor ledger state is 
NoLedger.
+        // The num=100 flaky test case, markDelete operation is triggered 
twice:
+        //   1. first is triggered by c1.asyncMarkDelete(), markDeletePosition 
is 12:9.
+        //   2. second is triggered by 
ManagedLedgerImpl.updateLedgersIdsComplete() due to ledger full rollover,
+        //      The entries in ledger 12 are all consumed, and we move 
persistentMarkDeletePosition and
+        //      markDeletePosition to 13:-1 due to PR 
https://github.com/apache/pulsar/pull/25087.
+        //      Before this pr, we will not move persistentMarkDeletePosition.
+        // Two markDelete operations is almost triggered at the same time 
without order guarantee:
+        //   1. main thread triggered c1.asyncMarkDelete.
+        //   2. bookkeeper-ml-scheduler-OrderedScheduler-0-0 thread triggered 
create ledger 13 due to ledger full
+        //      rollover by OpAddEntry.
+        // OpAddEntry will close and create a new ledger when closeWhenDone is 
true.
+        // In ManagedLedgerImpl class, MetaStoreCallback cb calls 
maybeUpdateCursorBeforeTrimmingConsumedLedger(),
+        // which calls cursor.asyncMarkDelete(), so markDelete operation in 
ledger rollover may execute after
+        // AddEntryCallback.addComplete(). The root cause is 
cursor.asyncMarkDelete() does not propagate completion or
+        // failure to it caller callback
+        c1.asyncMarkDelete(lastPosition.get(), new MarkDeleteCallback() {
+            @Override
+            public void markDeleteFailed(ManagedLedgerException exception, 
Object ctx) {
+            }
+
+            @Override
+            public void markDeleteComplete(Object ctx) {
+                markDeleteLatch.countDown();
+            }
+        }, null);
+        markDeleteLatch.await();
+        assertEquals(c1.getNumberOfEntries(), 0);
+
+        // Reopen
+        @Cleanup("shutdown") ManagedLedgerFactory factory2 = new 
ManagedLedgerFactoryImpl(metadataStore, bkc);
+        // Recovery open async_mark_delete_blocking_test_ledger ledger, create 
ledger 15.
+        // When executing 
ManagedLedgerImpl.maybeUpdateCursorBeforeTrimmingConsumedLedger(), the 
curPointedLedger is 13,
+        // the nextPointedLedger is 15, ledger 13 only has 1 consumed entry 
13:0,
+        // so we will move markDeletePosition to 15:-1, see PR 
https://github.com/apache/pulsar/pull/25087.
+        ledger = factory2.open("async_mark_delete_blocking_test_ledger");
+        ManagedCursor c2 = ledger.openCursor("c1");
+
+        log.info("positions size: {}, positions: {}", positions.size(), 
positions);
+        // To make sure 
ManagedLedgerImpl.maybeUpdateCursorBeforeTrimmingConsumedLedger() is completed, 
we should
+        // wait until c2.getMarkDeletedPosition() equals 15:-1, see PR 
https://github.com/apache/pulsar/pull/25087.
+        Awaitility.await()
+                .untilAsserted(() -> 
assertTrue(c2.getMarkDeletedPosition().compareTo(lastPosition.get()) > 0));

Review Comment:
   ```suggestion
                   .untilAsserted(() -> 
assertThat(c2.getMarkDeletedPosition()).isGreaterThan(lastPosition.get()));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to