Copilot commented on code in PR #25117:
URL: https://github.com/apache/pulsar/pull/25117#discussion_r2657626689
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -2181,35 +2181,51 @@ public void asyncMarkDelete(final Position position,
Map<String, Long> propertie
log.debug("[{}] Mark delete cursor {} up to position: {}",
ledger.getName(), name, position);
}
+ // Snapshot all positions into local variables to avoid race condition.
Position newPosition = ackBatchPosition(position);
- if (ledger.getLastConfirmedEntry().compareTo(newPosition) < 0) {
- boolean shouldCursorMoveForward = false;
- try {
- long ledgerEntries =
ledger.getLedgerInfo(markDeletePosition.getLedgerId()).get().getEntries();
- Long nextValidLedger =
ledger.getNextValidLedger(ledger.getLastConfirmedEntry().getLedgerId());
- shouldCursorMoveForward = nextValidLedger != null
- && (markDeletePosition.getEntryId() + 1 >=
ledgerEntries)
- && (newPosition.getLedgerId() == nextValidLedger);
- } catch (Exception e) {
- log.warn("Failed to get ledger entries while setting
mark-delete-position", e);
+ Position moveForwardPosition = newPosition;
+ Position markDeletePos = markDeletePosition;
+ Position lastConfirmedEntry = ledger.getLastConfirmedEntry();
+ boolean shouldCursorMoveForward = false;
+ try {
+ if (lastConfirmedEntry.getLedgerId() >= newPosition.getLedgerId())
{
+ LedgerInfo curMarkDeleteledgerInfo =
ledger.getLedgerInfo(newPosition.getLedgerId()).get();
+ Long nextValidLedger =
ledger.getNextValidLedger(newPosition.getLedgerId());
+ shouldCursorMoveForward = (nextValidLedger != null)
+ && (curMarkDeleteledgerInfo != null
+ && newPosition.getEntryId() + 1 >=
curMarkDeleteledgerInfo.getEntries());
Review Comment:
Variable name contains a typo. "curMarkDeleteledgerInfo" should be
"curMarkDeleteLedgerInfo" (capital 'L' in 'Ledger'). This inconsistency makes
the code harder to read.
```suggestion
LedgerInfo curMarkDeleteLedgerInfo =
ledger.getLedgerInfo(newPosition.getLedgerId()).get();
Long nextValidLedger =
ledger.getNextValidLedger(newPosition.getLedgerId());
shouldCursorMoveForward = (nextValidLedger != null)
&& (curMarkDeleteLedgerInfo != null
&& newPosition.getEntryId() + 1 >=
curMarkDeleteLedgerInfo.getEntries());
```
##########
pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java:
##########
@@ -821,14 +823,20 @@ public void testReadCompactedLatestMessageWithInclusive()
throws Exception {
lastMessage.join();
admin.topics().unload(topic);
admin.topics().triggerCompaction(topic);
+ final MessageId finalLastMessage = lastMessage.get();
Awaitility.await().untilAsserted(() -> {
PersistentTopicInternalStats stats =
admin.topics().getInternalStats(topic);
Assert.assertNotEquals(stats.compactedLedger.ledgerId, -1);
Assert.assertEquals(stats.compactedLedger.entries, numMessages);
Assert.assertEquals(admin.topics().getStats(topic)
.getSubscriptions().get(COMPACTION_SUBSCRIPTION).getConsumers().size(), 0);
- Assert.assertEquals(stats.lastConfirmedEntry,
-
stats.cursors.get(COMPACTION_SUBSCRIPTION).markDeletePosition);
+ long ledgerId = ((MessageIdImpl) finalLastMessage).getLedgerId();
+ long entryId = ((MessageIdImpl) finalLastMessage).getEntryId();
+ Assert.assertEquals(stats.lastConfirmedEntry,
PositionFactory.create(ledgerId, entryId).toString());
+ // New ledger create, move markDeletePosition to currentLedgerId:-1
Review Comment:
The log message "New ledger create" should be "New ledger created" for
grammatical correctness.
```suggestion
// New ledger created, move markDeletePosition to
currentLedgerId:-1
```
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageTTLTest.java:
##########
@@ -113,12 +112,15 @@ public void testMessageExpiryAfterTopicUnload() throws
Exception {
.pollDelay(3, TimeUnit.SECONDS).untilAsserted(() -> {
this.runMessageExpiryCheck();
log.info("***** run message expiry now");
- // verify that the markDeletePosition was moved forward, and
exacly to the last message
+ // verify that the markDeletePosition was moved forward, and
exactly to the last message
PersistentTopicInternalStats internalStatsAfterExpire =
admin.topics().getInternalStats(topicName);
CursorStats statsAfterExpire =
internalStatsAfterExpire.cursors.get(subscriptionName);
log.info("markDeletePosition after expire {}",
statsAfterExpire.markDeletePosition);
- assertEquals(statsAfterExpire.markDeletePosition,
PositionFactory.create(lastMessageId.getLedgerId(),
- lastMessageId.getEntryId()).toString());
+ // New ledger create, move markDeletePosition to currentLedgerId:-1
Review Comment:
The comment "New ledger create" should be "New ledger created" for
grammatical correctness.
```suggestion
// New ledger created, move markDeletePosition to
currentLedgerId:-1
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]