This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 47eec875a81 [fix][broker] Fix PersistentMessageExpiryMonitor
findEntryComplete() method may lose mark-delete properties in race condition
(#25803)
47eec875a81 is described below
commit 47eec875a81d11a030a3781cbb123e273ced5e88
Author: Oneby Wang <[email protected]>
AuthorDate: Mon May 25 23:30:50 2026 +0800
[fix][broker] Fix PersistentMessageExpiryMonitor findEntryComplete() method
may lose mark-delete properties in race condition (#25803)
---
.../persistent/PersistentMessageExpiryMonitor.java | 3 +-
.../service/PersistentMessageFinderTest.java | 58 ++++++++++++++++++++++
2 files changed, 59 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
index a9f7e305104..2848ba2a0d2 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
@@ -245,8 +245,7 @@ public class PersistentMessageExpiryMonitor implements
FindEntryCallback, Messag
.attr("position", position)
.log("Expiring all messages until position");
Position prevMarkDeletePos = cursor.getMarkDeletedPosition();
- cursor.asyncMarkDelete(position, cursor.getProperties(),
markDeleteCallback,
- cursor.getNumberOfEntriesInBacklog(false));
+ cursor.asyncMarkDelete(position, null, markDeleteCallback,
cursor.getNumberOfEntriesInBacklog(false));
if (!Objects.equals(cursor.getMarkDeletedPosition(),
prevMarkDeletePos) && subscription != null) {
subscription.updateLastMarkDeleteAdvancedTimestamp();
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
index 8d9a2e45780..5bf4e1f97d4 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.service;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
@@ -34,11 +35,14 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;
import java.lang.reflect.Field;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -1098,4 +1102,58 @@ public class PersistentMessageFinderTest extends
MockedBookKeeperTestCase {
assertNull(range.getRight());
assertEquals(range.getLeft(), PositionFactory.create(1, 9));
}
+
+ @Test
+ @SuppressWarnings("unchecked")
+ void testExpireMessagesNeverLoseMarkDeleteProperties() throws Exception {
+ final String ledgerAndCursorName =
"testExpireMessagesNeverLoseMarkDeleteProperties";
+
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setRetentionSizeInMB(10);
+ config.setRetentionTime(1, TimeUnit.HOURS);
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl)
factory.open(ledgerAndCursorName, config);
+ ManagedCursorImpl cursor = (ManagedCursorImpl)
ledger.openCursor(ledgerAndCursorName);
+ ManagedCursorImpl spyCursor = spy(cursor);
+
+ Position pos1 = ledger.addEntry(createMessageWrittenToLedger("msg-1"));
+ Position pos2 = ledger.addEntry(createMessageWrittenToLedger("msg-2"));
+
+ CountDownLatch expiryMarkDeleteEnteredLatch = new CountDownLatch(1);
+ CountDownLatch cursorMarkDeleteCompletedLatch = new CountDownLatch(1);
+ CountDownLatch expiryMarkDeleteCompletedLatch = new CountDownLatch(1);
+
+ doAnswer(invocation -> {
+ Map<String, Long> invocationProperties = invocation.getArgument(1);
+ // Pause the expiry-triggered mark-delete so the user markDelete()
can complete first.
+ if (invocationProperties == null ||
invocationProperties.isEmpty()) {
+ expiryMarkDeleteEnteredLatch.countDown();
+ assertTrue(cursorMarkDeleteCompletedLatch.await(5,
TimeUnit.SECONDS));
+ try {
+ return invocation.callRealMethod();
+ } finally {
+ expiryMarkDeleteCompletedLatch.countDown();
+ }
+ }
+
+ return invocation.callRealMethod();
+ }).when(spyCursor)
+ .asyncMarkDelete(any(Position.class), nullable(Map.class),
any(AsyncCallbacks.MarkDeleteCallback.class),
+ nullable(Object.class));
+
+ PersistentTopic topic = mockPersistentTopic("topicname");
+ PersistentMessageExpiryMonitor monitor = new
PersistentMessageExpiryMonitor(topic,
+ spyCursor.getName(), spyCursor, null);
+
+ CompletableFuture.runAsync(() -> monitor.findEntryComplete(pos2,
null));
+ assertTrue(expiryMarkDeleteEnteredLatch.await(5, TimeUnit.SECONDS));
+
+ Map<String, Long> properties = new HashMap<>();
+ properties.put("test-property", 1L);
+ spyCursor.markDelete(pos1, properties);
+ cursorMarkDeleteCompletedLatch.countDown();
+
+ assertTrue(expiryMarkDeleteCompletedLatch.await(5, TimeUnit.SECONDS));
+ assertEquals(spyCursor.getMarkDeletedPosition(), pos2);
+ assertEquals(spyCursor.getProperties(), properties);
+ }
}