lhotari commented on code in PR #25117:
URL: https://github.com/apache/pulsar/pull/25117#discussion_r2765423672


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -2257,37 +2257,51 @@ public void asyncMarkDelete(final Position position, 
Map<String, Long> propertie
             log.debug("[{}] Mark delete cursor {} up to position: {}", 
ledger.getName(), name, position);
         }
 
+        // Snapshot all positions into local variables to avoid race condition.
         Position newPosition = ackBatchPosition(position);
+        Position moveForwardPosition = newPosition;
         Position markDeletePos = markDeletePosition;
         Position lastConfirmedEntry = ledger.getLastConfirmedEntry();
-        if (lastConfirmedEntry.compareTo(newPosition) < 0) {
-            boolean shouldCursorMoveForward = false;
-            try {
-                long ledgerEntries = 
ledger.getLedgerInfo(markDeletePos.getLedgerId()).get().getEntries();
+        boolean shouldCursorMoveForward = false;
+        try {
+            if (lastConfirmedEntry.getLedgerId() >= newPosition.getLedgerId()) 
{
+                LedgerInfo curMarkDeleteLedgerInfo = 
ledger.getLedgerInfo(newPosition.getLedgerId()).get();
+                Long nextValidLedger = 
ledger.getNextValidLedger(newPosition.getLedgerId());
+                shouldCursorMoveForward = (nextValidLedger != null)
+                        && (curMarkDeleteLedgerInfo != null
+                        && newPosition.getEntryId() + 1 >= 
curMarkDeleteLedgerInfo.getEntries());
+                if (shouldCursorMoveForward) {
+                    moveForwardPosition = 
PositionFactory.create(nextValidLedger, -1);
+                }

Review Comment:
   This won't behave in the expected way in all cases. the `getNextValidLedger` 
will return the given ledger id if it's already the last ledger.
   Therefore `moveForwardPosition = PositionFactory.create(nextValidLedger, 
-1);` would result in the wrong behavior. 
   
   btw. For a `LedgerInfo` instance, it's currently possible to detect the 
rollover by checking the `timestamp` field. It's `!= 0` for rolled over ledgers.
   



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -2257,37 +2257,51 @@ public void asyncMarkDelete(final Position position, 
Map<String, Long> propertie
             log.debug("[{}] Mark delete cursor {} up to position: {}", 
ledger.getName(), name, position);
         }
 
+        // Snapshot all positions into local variables to avoid race condition.
         Position newPosition = ackBatchPosition(position);
+        Position moveForwardPosition = newPosition;
         Position markDeletePos = markDeletePosition;
         Position lastConfirmedEntry = ledger.getLastConfirmedEntry();
-        if (lastConfirmedEntry.compareTo(newPosition) < 0) {
-            boolean shouldCursorMoveForward = false;
-            try {
-                long ledgerEntries = 
ledger.getLedgerInfo(markDeletePos.getLedgerId()).get().getEntries();
+        boolean shouldCursorMoveForward = false;
+        try {
+            if (lastConfirmedEntry.getLedgerId() >= newPosition.getLedgerId()) 
{
+                LedgerInfo curMarkDeleteLedgerInfo = 
ledger.getLedgerInfo(newPosition.getLedgerId()).get();
+                Long nextValidLedger = 
ledger.getNextValidLedger(newPosition.getLedgerId());
+                shouldCursorMoveForward = (nextValidLedger != null)
+                        && (curMarkDeleteLedgerInfo != null
+                        && newPosition.getEntryId() + 1 >= 
curMarkDeleteLedgerInfo.getEntries());
+                if (shouldCursorMoveForward) {
+                    moveForwardPosition = 
PositionFactory.create(nextValidLedger, -1);
+                }

Review Comment:
   Another detail of batch acknowledgements can be seen in `ackBatchPosition` 
method. If the last entry isn't fully acknowledged, the acked position would be 
the previous position. I'm not sure if this impacts this logic.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to