Copilot commented on code in PR #25117:
URL: https://github.com/apache/pulsar/pull/25117#discussion_r2657626689


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -2181,35 +2181,51 @@ public void asyncMarkDelete(final Position position, 
Map<String, Long> propertie
             log.debug("[{}] Mark delete cursor {} up to position: {}", 
ledger.getName(), name, position);
         }
 
+        // Snapshot all positions into local variables to avoid race condition.
         Position newPosition = ackBatchPosition(position);
-        if (ledger.getLastConfirmedEntry().compareTo(newPosition) < 0) {
-            boolean shouldCursorMoveForward = false;
-            try {
-                long ledgerEntries = 
ledger.getLedgerInfo(markDeletePosition.getLedgerId()).get().getEntries();
-                Long nextValidLedger = 
ledger.getNextValidLedger(ledger.getLastConfirmedEntry().getLedgerId());
-                shouldCursorMoveForward = nextValidLedger != null
-                        && (markDeletePosition.getEntryId() + 1 >= 
ledgerEntries)
-                        && (newPosition.getLedgerId() == nextValidLedger);
-            } catch (Exception e) {
-                log.warn("Failed to get ledger entries while setting 
mark-delete-position", e);
+        Position moveForwardPosition = newPosition;
+        Position markDeletePos = markDeletePosition;
+        Position lastConfirmedEntry = ledger.getLastConfirmedEntry();
+        boolean shouldCursorMoveForward = false;
+        try {
+            if (lastConfirmedEntry.getLedgerId() >= newPosition.getLedgerId()) 
{
+                LedgerInfo curMarkDeleteledgerInfo = 
ledger.getLedgerInfo(newPosition.getLedgerId()).get();
+                Long nextValidLedger = 
ledger.getNextValidLedger(newPosition.getLedgerId());
+                shouldCursorMoveForward = (nextValidLedger != null)
+                        && (curMarkDeleteledgerInfo != null
+                        && newPosition.getEntryId() + 1 >= 
curMarkDeleteledgerInfo.getEntries());

Review Comment:
   Variable name contains a typo. "curMarkDeleteledgerInfo" should be 
"curMarkDeleteLedgerInfo" (capital 'L' in 'Ledger'). This inconsistency makes 
the code harder to read.
   ```suggestion
                   LedgerInfo curMarkDeleteLedgerInfo = 
ledger.getLedgerInfo(newPosition.getLedgerId()).get();
                   Long nextValidLedger = 
ledger.getNextValidLedger(newPosition.getLedgerId());
                   shouldCursorMoveForward = (nextValidLedger != null)
                           && (curMarkDeleteLedgerInfo != null
                           && newPosition.getEntryId() + 1 >= 
curMarkDeleteLedgerInfo.getEntries());
   ```



##########
pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java:
##########
@@ -821,14 +823,20 @@ public void testReadCompactedLatestMessageWithInclusive() 
throws Exception {
         lastMessage.join();
         admin.topics().unload(topic);
         admin.topics().triggerCompaction(topic);
+        final MessageId finalLastMessage = lastMessage.get();
         Awaitility.await().untilAsserted(() -> {
             PersistentTopicInternalStats stats = 
admin.topics().getInternalStats(topic);
             Assert.assertNotEquals(stats.compactedLedger.ledgerId, -1);
             Assert.assertEquals(stats.compactedLedger.entries, numMessages);
             Assert.assertEquals(admin.topics().getStats(topic)
                     
.getSubscriptions().get(COMPACTION_SUBSCRIPTION).getConsumers().size(), 0);
-            Assert.assertEquals(stats.lastConfirmedEntry,
-                    
stats.cursors.get(COMPACTION_SUBSCRIPTION).markDeletePosition);
+            long ledgerId = ((MessageIdImpl) finalLastMessage).getLedgerId();
+            long entryId = ((MessageIdImpl) finalLastMessage).getEntryId();
+            Assert.assertEquals(stats.lastConfirmedEntry, 
PositionFactory.create(ledgerId, entryId).toString());
+            // New ledger create, move markDeletePosition to currentLedgerId:-1

Review Comment:
   The log message "New ledger create" should be "New ledger created" for 
grammatical correctness.
   ```suggestion
               // New ledger created, move markDeletePosition to 
currentLedgerId:-1
   ```



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageTTLTest.java:
##########
@@ -113,12 +112,15 @@ public void testMessageExpiryAfterTopicUnload() throws 
Exception {
                 .pollDelay(3, TimeUnit.SECONDS).untilAsserted(() -> {
             this.runMessageExpiryCheck();
             log.info("***** run message expiry now");
-            // verify that the markDeletePosition was moved forward, and 
exacly to the last message
+            // verify that the markDeletePosition was moved forward, and 
exactly to the last message
             PersistentTopicInternalStats internalStatsAfterExpire = 
admin.topics().getInternalStats(topicName);
             CursorStats statsAfterExpire = 
internalStatsAfterExpire.cursors.get(subscriptionName);
             log.info("markDeletePosition after expire {}", 
statsAfterExpire.markDeletePosition);
-            assertEquals(statsAfterExpire.markDeletePosition, 
PositionFactory.create(lastMessageId.getLedgerId(),
-                    lastMessageId.getEntryId()).toString());
+            // New ledger create, move markDeletePosition to currentLedgerId:-1

Review Comment:
   The comment "New ledger create" should be "New ledger created" for 
grammatical correctness.
   ```suggestion
               // New ledger created, move markDeletePosition to 
currentLedgerId:-1
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to