This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.1 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit e55a7262daf8c14794036a6f9b621878472d22bf Author: Penghui Li <[email protected]> AuthorDate: Fri Jan 16 02:52:59 2026 -0800 [fix][broker] Fence reset cursor by timestamp to avoid concurrent timestamp-based position lookups (#25151) (cherry picked from commit 6a3066d33024af2f30011a165e25a451e1074377) --- .../service/persistent/PersistentSubscription.java | 23 ++++++-- .../broker/service/SubscriptionSeekTest.java | 62 ++++++++++++++++++++++ 2 files changed, 81 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 786b902b3ba..a5cd832e99f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -835,7 +835,12 @@ public class PersistentSubscription extends AbstractSubscription { @Override public CompletableFuture<Void> resetCursor(long timestamp) { - CompletableFuture<Void> future = new CompletableFuture<>(); + if (!IS_FENCED_UPDATER.compareAndSet(PersistentSubscription.this, FALSE, TRUE)) { + return CompletableFuture.failedFuture(new SubscriptionBusyException("Failed to fence subscription")); + } + + final CompletableFuture<Void> future = new CompletableFuture<>(); + inProgressResetCursorFuture = future; PersistentMessageFinder persistentMessageFinder = new PersistentMessageFinder(topicName, cursor, config.getManagedLedgerCursorResetLedgerCloseTimestampMaxClockSkewMillis()); @@ -854,6 +859,8 @@ public class PersistentSubscription extends AbstractSubscription { log.warn("[{}][{}] Unable to find position for timestamp {}." + " Unable to reset cursor to first position", topicName, subName, timestamp); + IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE); + inProgressResetCursorFuture = null; future.completeExceptionally( new SubscriptionInvalidCursorPosition( "Unable to find position for specified timestamp")); @@ -866,7 +873,7 @@ public class PersistentSubscription extends AbstractSubscription { } else { finalPosition = position.getNext(); } - CompletableFuture<Void> resetCursorFuture = resetCursor(finalPosition); + CompletableFuture<Void> resetCursorFuture = resetCursorInternal(finalPosition, future, true); FutureUtil.completeAfter(future, resetCursorFuture); } @@ -874,6 +881,8 @@ public class PersistentSubscription extends AbstractSubscription { public void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition, Object ctx) { // todo - what can go wrong here that needs to be retried? + IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE); + inProgressResetCursorFuture = null; if (exception instanceof ConcurrentFindCursorPositionException) { future.completeExceptionally(new SubscriptionBusyException(exception.getMessage())); } else { @@ -887,11 +896,17 @@ public class PersistentSubscription extends AbstractSubscription { @Override public CompletableFuture<Void> resetCursor(Position finalPosition) { - if (!IS_FENCED_UPDATER.compareAndSet(PersistentSubscription.this, FALSE, TRUE)) { + final CompletableFuture<Void> future = new CompletableFuture<>(); + return resetCursorInternal(finalPosition, future, false); + } + + private CompletableFuture<Void> resetCursorInternal(Position finalPosition, CompletableFuture<Void> future, + boolean alreadyFenced) { + if (!alreadyFenced + && !IS_FENCED_UPDATER.compareAndSet(PersistentSubscription.this, FALSE, TRUE)) { return CompletableFuture.failedFuture(new SubscriptionBusyException("Failed to fence subscription")); } - final CompletableFuture<Void> future = new CompletableFuture<>(); inProgressResetCursorFuture = future; final CompletableFuture<Void> disconnectFuture; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java index 2b9924d1d5b..556a3124522 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java @@ -20,12 +20,17 @@ package org.apache.pulsar.broker.service; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; +import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -36,6 +41,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -457,6 +463,62 @@ public class SubscriptionSeekTest extends BrokerTestBase { } } + @Test + public void testConcurrentResetCursorByTimestamp() throws Exception { + final String topicName = "persistent://prop/use/ns-abc/testConcurrentResetTimestamp_" + + System.currentTimeMillis(); + final String subscriptionName = "test-sub-name"; + + @Cleanup + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + + admin.topics().createSubscription(topicName, subscriptionName, MessageId.earliest); + + PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); + assertNotNull(topicRef); + assertEquals(topicRef.getProducers().size(), 1); + + for (int i = 0; i < 10; i++) { + String message = "my-message-" + i; + producer.send(message.getBytes()); + } + + PersistentSubscription subscription = (PersistentSubscription) topicRef.getSubscription(subscriptionName); + + ManagedCursor originalCursor = subscription.getCursor(); + ManagedCursor spyCursor = spy(originalCursor); + + Field cursorField = PersistentSubscription.class.getDeclaredField("cursor"); + cursorField.setAccessible(true); + cursorField.set(subscription, spyCursor); + + AtomicInteger findCallCount = new AtomicInteger(0); + doAnswer(invocation -> { + findCallCount.incrementAndGet(); + return invocation.callRealMethod(); + }).when(spyCursor).asyncFindNewestMatching(any(), any(), any(), any(), any(), any(), anyBoolean()); + + long resetTimestamp = System.currentTimeMillis(); + List<CompletableFuture<Void>> futures = new ArrayList<>(); + CyclicBarrier barrier = new CyclicBarrier(4); + + for (int i = 0; i < 4; i++) { + CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { + try { + barrier.await(); + admin.topics().resetCursor(topicName, subscriptionName, resetTimestamp); + } catch (Exception e) { + } + }); + futures.add(future); + } + + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); + + assertEquals(findCallCount.get(), 1, + "asyncFindNewestMatching should only be called once due to subscription fencing"); + } + @Test public void testSeekOnPartitionedTopic() throws Exception { final String topicName = "persistent://prop/use/ns-abc/testSeekPartitions";
