This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 174a42401f83f719b990838e4744f619cb8b274f
Author: Penghui Li <[email protected]>
AuthorDate: Fri Jan 16 02:52:59 2026 -0800

    [fix][broker] Fence reset cursor by timestamp to avoid concurrent 
timestamp-based position lookups (#25151)
    
    (cherry picked from commit 6a3066d33024af2f30011a165e25a451e1074377)
---
 .../service/persistent/PersistentSubscription.java | 23 ++++++--
 .../broker/service/SubscriptionSeekTest.java       | 62 ++++++++++++++++++++++
 2 files changed, 81 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 9e821925e28..d23db646963 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -817,7 +817,12 @@ public class PersistentSubscription extends 
AbstractSubscription {
 
     @Override
     public CompletableFuture<Void> resetCursor(long timestamp) {
-        CompletableFuture<Void> future = new CompletableFuture<>();
+        if (!IS_FENCED_UPDATER.compareAndSet(PersistentSubscription.this, 
FALSE, TRUE)) {
+            return CompletableFuture.failedFuture(new 
SubscriptionBusyException("Failed to fence subscription"));
+        }
+
+        final CompletableFuture<Void> future = new CompletableFuture<>();
+        inProgressResetCursorFuture = future;
         PersistentMessageFinder persistentMessageFinder = new 
PersistentMessageFinder(topicName, cursor,
                 
config.getManagedLedgerCursorResetLedgerCloseTimestampMaxClockSkewMillis());
 
@@ -836,6 +841,8 @@ public class PersistentSubscription extends 
AbstractSubscription {
                         log.warn("[{}][{}] Unable to find position for 
timestamp {}."
                                         + " Unable to reset cursor to first 
position",
                                 topicName, subName, timestamp);
+                        IS_FENCED_UPDATER.set(PersistentSubscription.this, 
FALSE);
+                        inProgressResetCursorFuture = null;
                         future.completeExceptionally(
                                 new SubscriptionInvalidCursorPosition(
                                         "Unable to find position for specified 
timestamp"));
@@ -848,7 +855,7 @@ public class PersistentSubscription extends 
AbstractSubscription {
                 } else {
                     finalPosition = position.getNext();
                 }
-                CompletableFuture<Void> resetCursorFuture = 
resetCursor(finalPosition);
+                CompletableFuture<Void> resetCursorFuture = 
resetCursorInternal(finalPosition, future, true);
                 FutureUtil.completeAfter(future, resetCursorFuture);
             }
 
@@ -856,6 +863,8 @@ public class PersistentSubscription extends 
AbstractSubscription {
             public void findEntryFailed(ManagedLedgerException exception,
                                         Optional<Position> failedReadPosition, 
Object ctx) {
                 // todo - what can go wrong here that needs to be retried?
+                IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE);
+                inProgressResetCursorFuture = null;
                 if (exception instanceof 
ConcurrentFindCursorPositionException) {
                     future.completeExceptionally(new 
SubscriptionBusyException(exception.getMessage()));
                 } else {
@@ -869,11 +878,17 @@ public class PersistentSubscription extends 
AbstractSubscription {
 
     @Override
     public CompletableFuture<Void> resetCursor(Position finalPosition) {
-        if (!IS_FENCED_UPDATER.compareAndSet(PersistentSubscription.this, 
FALSE, TRUE)) {
+        final CompletableFuture<Void> future = new CompletableFuture<>();
+        return resetCursorInternal(finalPosition, future, false);
+    }
+
+    private CompletableFuture<Void> resetCursorInternal(Position 
finalPosition, CompletableFuture<Void> future,
+                                                        boolean alreadyFenced) 
{
+        if (!alreadyFenced
+                && 
!IS_FENCED_UPDATER.compareAndSet(PersistentSubscription.this, FALSE, TRUE)) {
             return CompletableFuture.failedFuture(new 
SubscriptionBusyException("Failed to fence subscription"));
         }
 
-        final CompletableFuture<Void> future = new CompletableFuture<>();
         inProgressResetCursorFuture = future;
         final CompletableFuture<Void> disconnectFuture;
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
index ec16cc7d9eb..b95615990a0 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
@@ -20,12 +20,17 @@ package org.apache.pulsar.broker.service;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
+import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -36,6 +41,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -458,6 +464,62 @@ public class SubscriptionSeekTest extends BrokerTestBase {
         }
     }
 
+    @Test
+    public void testConcurrentResetCursorByTimestamp() throws Exception {
+        final String topicName = 
"persistent://prop/use/ns-abc/testConcurrentResetTimestamp_"
+                + System.currentTimeMillis();
+        final String subscriptionName = "test-sub-name";
+
+        @Cleanup
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+
+        admin.topics().createSubscription(topicName, subscriptionName, 
MessageId.earliest);
+
+        PersistentTopic topicRef = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topicName).get();
+        assertNotNull(topicRef);
+        assertEquals(topicRef.getProducers().size(), 1);
+
+        for (int i = 0; i < 10; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+
+        PersistentSubscription subscription = (PersistentSubscription) 
topicRef.getSubscription(subscriptionName);
+
+        ManagedCursor originalCursor = subscription.getCursor();
+        ManagedCursor spyCursor = spy(originalCursor);
+
+        Field cursorField = 
PersistentSubscription.class.getDeclaredField("cursor");
+        cursorField.setAccessible(true);
+        cursorField.set(subscription, spyCursor);
+
+        AtomicInteger findCallCount = new AtomicInteger(0);
+        doAnswer(invocation -> {
+            findCallCount.incrementAndGet();
+            return invocation.callRealMethod();
+        }).when(spyCursor).asyncFindNewestMatching(any(), any(), any(), any(), 
any(), any(), anyBoolean());
+
+        long resetTimestamp = System.currentTimeMillis();
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+        CyclicBarrier barrier = new CyclicBarrier(4);
+
+        for (int i = 0; i < 4; i++) {
+            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
+                try {
+                    barrier.await();
+                    admin.topics().resetCursor(topicName, subscriptionName, 
resetTimestamp);
+                } catch (Exception e) {
+                }
+            });
+            futures.add(future);
+        }
+
+        CompletableFuture.allOf(futures.toArray(new 
CompletableFuture[0])).join();
+
+        assertEquals(findCallCount.get(), 1,
+                "asyncFindNewestMatching should only be called once due to 
subscription fencing");
+    }
+
     @Test
     public void testSeekOnPartitionedTopic() throws Exception {
         final String topicName = 
"persistent://prop/use/ns-abc/testSeekPartitions";

Reply via email to