oneby-wang commented on code in PR #25101:
URL: https://github.com/apache/pulsar/pull/25101#discussion_r2639836537
##########
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java:
##########
@@ -1385,16 +1391,182 @@ public void markDeleteComplete(Object ctx) {
}
latch.await();
-
assertEquals(c1.getNumberOfEntries(), 0);
+ // Sleep 1s here to wait ledger rollover finished
+ Thread.sleep(1000);
+
// Reopen
- @Cleanup("shutdown")
- ManagedLedgerFactory factory2 = new
ManagedLedgerFactoryImpl(metadataStore, bkc);
+ @Cleanup("shutdown") ManagedLedgerFactory factory2 = new
ManagedLedgerFactoryImpl(metadataStore, bkc);
+ // flaky test case: factory2.open() may throw
MetadataStoreException$BadVersionException, race condition:
+ // 1. my_test_ledger ledger rollover triggers cursor.asyncMarkDelete()
operation.
+ // 2. factory2.open() triggers ledger recovery, read versionA
ManagedLedgerInfo of my_test_ledger ledger.
+ // 3. cursor.asyncMarkDelete() triggers
MetaStoreImpl.asyncUpdateLedgerIds(), update versionB ManagedLedgerInfo
+ // into metaStore.
+ // 4. factory2.open() triggers MetaStoreImpl.asyncUpdateLedgerIds(),
update versionA ManagedLedgerInfo
+ // into metaStore, then throws BadVersionException and moves
my_test_ledger ledger to fenced state.
+ // See PR https://github.com/apache/pulsar/pull/25087.
+ // Recovery open async_mark_delete_blocking_test_ledger ledger,
ledgerId++
ledger = factory2.open("my_test_ledger");
ManagedCursor c2 = ledger.openCursor("c1");
- assertEquals(c2.getMarkDeletedPosition(), lastPosition.get());
+ // Three cases:
+ // 1. cursor recovered with lastPosition markDeletePosition
+ // 2. cursor recovered with (lastPositionLedgerId+1:-1)
markDeletePosition, cursor ledger not rolled over, we
+ // move markDeletePosition to (lastPositionLegderId+2:-1)
+ // 3. cursor recovered with (lastPositionLedgerId+1:-1)
markDeletePosition, cursor ledger rolled over, we
+ // move markDeletePosition to (lastPositionLegderId+3:-1)
+ // See PR https://github.com/apache/pulsar/pull/25087.
+ log.info("c2 markDeletePosition: {}, lastPosition: {}",
c2.getMarkDeletedPosition(), lastPosition);
+ long lastPositionLedgerId = lastPosition.get().getLedgerId();
+ Awaitility.await().untilAsserted(() ->
+ assertTrue(
+ c2.getMarkDeletedPosition().equals(lastPosition.get())
+ ||
c2.getMarkDeletedPosition().equals(PositionFactory.create(lastPositionLedgerId
+ 2, -1))
+ ||
c2.getMarkDeletedPosition().equals(PositionFactory.create(lastPositionLedgerId
+ 3, -1))
+ )
+ );
+ }
+
+ @Test(timeOut = 20000)
+ public void asyncMarkDeleteBlockingWithOneShot() throws Exception {
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMaxEntriesPerLedger(10);
+ // open async_mark_delete_blocking_test_ledger ledger, create ledger 3.
+ ManagedLedger ledger =
factory.open("async_mark_delete_blocking_test_ledger", config);
+ final ManagedCursor c1 = ledger.openCursor("c1");
+ final AtomicReference<Position> lastPosition = new AtomicReference<>();
+ // just for log debug purpose
+ Deque<Position> positions = new ConcurrentLinkedDeque<>();
+
+ // In previous flaky test, we set num=100, PR
https://github.com/apache/pulsar/pull/25087 will make the test
+ // more flaky. Flaky case:
+ // 1. cursor recovered with markDeletePosition 12:9,
persistentMarkDeletePosition 12:9.
+ // 2. cursor recovered with mark markDeletePosition 13:-1,
persistentMarkDeletePosition 13:-1.
+ // Here, we set num to 101, make sure the ledger 13 is created and
become the active(last) ledger,
+ // and cursor will always be recovered with markDeletePosition 13:0,
persistentMarkDeletePosition 13:0.
+ final int num = 101;
+ final CountDownLatch addEntryLatch = new CountDownLatch(num);
+ // 10 entries per ledger, create ledger 4~13
+ for (int i = 0; i < num; i++) {
+ String entryStr = "entry-" + i;
+ ledger.asyncAddEntry(entryStr.getBytes(Encoding), new
AddEntryCallback() {
+ @Override
+ public void addFailed(ManagedLedgerException exception, Object
ctx) {
+ }
+
+ @Override
+ public void addComplete(Position position, ByteBuf entryData,
Object ctx) {
+ lastPosition.set(position);
+ positions.offer(position);
+ addEntryLatch.countDown();
+ }
+ }, null);
+ }
+ addEntryLatch.await();
+ assertEquals(lastPosition.get(), PositionFactory.create(13, 0));
Review Comment:
> Unit tests should be tied to implementation details too closely.
Got. Thanks for clearing up. Seems typo? Unit tests shouldn't be tied to
implementation details too closely.
I'll modify all these assetions.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]