This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 00ee67df999 [improve][broker] Optimize subscription seek (cursor 
reset) by timestamp (#22792)
00ee67df999 is described below

commit 00ee67df999a6c9a6a60502a08ce5274c30ae49d
Author: 道君 <[email protected]>
AuthorDate: Thu Jan 9 21:05:39 2025 +0800

    [improve][broker] Optimize subscription seek (cursor reset) by timestamp 
(#22792)
    
    Co-authored-by: Lari Hotari <[email protected]>
    (cherry picked from commit 2eb4eabc84f68fef5b29d894631c7c23d06ec3af)
---
 .../apache/bookkeeper/mledger/ManagedCursor.java   |  25 ++
 .../apache/bookkeeper/mledger/ManagedLedger.java   |   6 +
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  58 ++--
 .../bookkeeper/mledger/impl/ManagedCursorTest.java | 295 +++++++++++++++++++++
 .../apache/pulsar/broker/ServiceConfiguration.java |  18 ++
 .../broker/service/TopicPoliciesService.java       |  19 +-
 .../persistent/PersistentMessageFinder.java        |  69 ++++-
 .../service/persistent/PersistentSubscription.java |  17 +-
 .../service/PersistentMessageFinderTest.java       | 242 ++++++++++++++++-
 .../offload/jcloud/impl/MockManagedLedger.java     |   6 +
 10 files changed, 716 insertions(+), 39 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
index 227b5429abf..221ef56696b 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
@@ -658,6 +658,31 @@ public interface ManagedCursor {
     void asyncFindNewestMatching(FindPositionConstraint constraint, 
Predicate<Entry> condition,
             FindEntryCallback callback, Object ctx, boolean isFindFromLedger);
 
+
+    /**
+     * Find the newest entry that matches the given predicate.
+     *
+     * @param constraint
+     *            search only active entries or all entries
+     * @param condition
+     *            predicate that reads an entry an applies a condition
+     * @param callback
+     *            callback object returning the resultant position
+     * @param startPosition
+     *           start position to search from.
+     * @param endPosition
+     *          end position to search to.
+     * @param ctx
+     *            opaque context
+     * @param isFindFromLedger
+     *            find the newest entry from ledger
+     */
+    default void asyncFindNewestMatching(FindPositionConstraint constraint, 
Predicate<Entry> condition,
+                                 PositionImpl startPosition, PositionImpl 
endPosition, FindEntryCallback callback,
+                                 Object ctx, boolean isFindFromLedger) {
+        asyncFindNewestMatching(constraint, condition, callback, ctx, 
isFindFromLedger);
+    }
+
     /**
      * reset the cursor to specified position to enable replay of messages.
      *
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
index 955a0d78502..c9507e38065 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
@@ -20,6 +20,7 @@ package org.apache.bookkeeper.mledger;
 
 import io.netty.buffer.ByteBuf;
 import java.util.Map;
+import java.util.NavigableMap;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Predicate;
@@ -696,4 +697,9 @@ public interface ManagedLedger {
      * Check if managed ledger should cache backlog reads.
      */
     void checkCursorsToCacheEntries();
+
+    /**
+     * Get all the managed ledgers.
+     */
+    NavigableMap<Long, LedgerInfo> getLedgersInfo();
 }
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index fce2a26e156..2005d3c08ef 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1288,28 +1288,56 @@ public class ManagedCursorImpl implements ManagedCursor 
{
 
     @Override
     public void asyncFindNewestMatching(FindPositionConstraint constraint, 
Predicate<Entry> condition,
-            FindEntryCallback callback, Object ctx, boolean isFindFromLedger) {
-        OpFindNewest op;
-        PositionImpl startPosition = null;
-        long max = 0;
+                                        FindEntryCallback callback, Object 
ctx, boolean isFindFromLedger) {
+        asyncFindNewestMatching(constraint, condition, null, null, callback, 
ctx,
+                isFindFromLedger);
+    }
+
+
+    @Override
+    public void asyncFindNewestMatching(FindPositionConstraint constraint, 
Predicate<Entry> condition,
+                                        PositionImpl start, PositionImpl end, 
FindEntryCallback callback,
+                                        Object ctx, boolean isFindFromLedger) {
+        PositionImpl startPosition;
         switch (constraint) {
-        case SearchAllAvailableEntries:
-            startPosition = (PositionImpl) getFirstPosition();
-            max = ledger.getNumberOfEntries() - 1;
-            break;
-        case SearchActiveEntries:
-            startPosition = ledger.getNextValidPosition(markDeletePosition);
-            max = getNumberOfEntriesInStorage();
-            break;
-        default:
-            callback.findEntryFailed(new ManagedLedgerException("Unknown 
position constraint"), Optional.empty(), ctx);
-            return;
+            case SearchAllAvailableEntries ->
+                    startPosition = start == null ?  (PositionImpl) 
getFirstPosition() : start;
+            case SearchActiveEntries -> {
+                if (start == null) {
+                    startPosition = 
ledger.getNextValidPosition(markDeletePosition);
+                } else {
+                    startPosition = start;
+                    startPosition = 
startPosition.compareTo(markDeletePosition) <= 0
+                            ? ledger.getNextValidPosition(startPosition) : 
startPosition;
+                }
+            }
+            default -> {
+                callback.findEntryFailed(
+                        new ManagedLedgerException("Unknown position 
constraint"), Optional.empty(), ctx);
+                return;
+            }
         }
+        // startPosition can't be null, should never go here.
         if (startPosition == null) {
             callback.findEntryFailed(new ManagedLedgerException("Couldn't find 
start position"),
                     Optional.empty(), ctx);
             return;
         }
+        // Calculate the end position
+        PositionImpl endPosition = end == null ? ledger.lastConfirmedEntry : 
end;
+        endPosition = endPosition.compareTo(ledger.lastConfirmedEntry) > 0 ? 
ledger.lastConfirmedEntry : endPosition;
+        // Calculate the number of entries between the startPosition and 
endPosition
+        long max = 0;
+        if (startPosition.compareTo(endPosition) <= 0) {
+            max = ledger.getNumberOfEntries(Range.closed(startPosition, 
endPosition));
+        }
+
+        if (max <= 0) {
+            callback.findEntryComplete(null, ctx);
+            return;
+        }
+
+        OpFindNewest op;
         if (isFindFromLedger) {
             op = new OpFindNewest(this.ledger, startPosition, condition, max, 
callback, ctx);
         } else {
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index a4bc7fad3ed..24e3da16e77 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -4902,5 +4902,300 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
         ml.delete();
     }
 
+    static PositionImpl transferToPosImpl(Position position) {
+        return new PositionImpl(position.getLedgerId(), position.getEntryId());
+    }
+
+    @Test
+    public void 
testFindNewestMatching_SearchAllAvailableEntries_ByStartAndEnd() throws 
Exception {
+        ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
+        managedLedgerConfig.setMaxEntriesPerLedger(2);
+        managedLedgerConfig.setMinimumRolloverTime(0, TimeUnit.MILLISECONDS);
+        @Cleanup
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) 
factory.open("testFindNewestMatching_SearchAllAvailableEntries_ByStartAndEnd", 
managedLedgerConfig);
+        @Cleanup
+        ManagedCursor managedCursor = ledger.openCursor("test");
+
+        Position position = ledger.addEntry("test".getBytes(Encoding));
+        Position position1 = ledger.addEntry("test1".getBytes(Encoding));
+        Position position2 = ledger.addEntry("test2".getBytes(Encoding));
+        Position position3 = ledger.addEntry("test3".getBytes(Encoding));
+
+        Predicate<Entry> condition = entry -> {
+            try {
+                Position p = entry.getPosition();
+                return 
transferToPosImpl(p).compareTo(transferToPosImpl(position1)) <= 0;
+            } finally {
+                entry.release();
+            }
+        };
+
+        // find the newest entry with start and end position
+        AtomicBoolean failed = new AtomicBoolean(false);
+        CountDownLatch latch = new CountDownLatch(1);
+        AtomicReference<Position> positionRef = new AtomicReference<>();
+        
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries,
 condition, transferToPosImpl(position), transferToPosImpl(position2), new 
AsyncCallbacks.FindEntryCallback() {
+            @Override
+            public void findEntryComplete(Position position, Object ctx) {
+                positionRef.set(position);
+                latch.countDown();
+            }
+
+            @Override
+            public void findEntryFailed(ManagedLedgerException exception, 
Optional<Position> failedReadPosition, Object ctx) {
+                failed.set(true);
+                latch.countDown();
+            }
+        }, null, true);
+
+        latch.await();
+        assertFalse(failed.get());
+        assertNotNull(positionRef.get());
+        assertEquals(positionRef.get(), position1);
+
+        // find the newest entry with start
+        AtomicBoolean failed1 = new AtomicBoolean(false);
+        CountDownLatch latch1 = new CountDownLatch(1);
+        AtomicReference<Position> positionRef1 = new AtomicReference<>();
+        
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries,
 condition, transferToPosImpl(position), null, new 
AsyncCallbacks.FindEntryCallback() {
+            @Override
+            public void findEntryComplete(Position position, Object ctx) {
+                positionRef1.set(position);
+                latch1.countDown();
+            }
+
+            @Override
+            public void findEntryFailed(ManagedLedgerException exception, 
Optional<Position> failedReadPosition, Object ctx) {
+                failed1.set(true);
+                latch1.countDown();
+            }
+        }, null, true);
+        latch1.await();
+        assertFalse(failed1.get());
+        assertNotNull(positionRef1.get());
+        assertEquals(positionRef1.get(), position1);
+
+        // find the newest entry with end
+        AtomicBoolean failed2 = new AtomicBoolean(false);
+        CountDownLatch latch2 = new CountDownLatch(1);
+        AtomicReference<Position> positionRef2 = new AtomicReference<>();
+        
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries,
 condition, null, transferToPosImpl(position2), new 
AsyncCallbacks.FindEntryCallback() {
+            @Override
+            public void findEntryComplete(Position position, Object ctx) {
+                positionRef2.set(position);
+                latch2.countDown();
+            }
+
+            @Override
+            public void findEntryFailed(ManagedLedgerException exception, 
Optional<Position> failedReadPosition, Object ctx) {
+                failed2.set(true);
+                latch2.countDown();
+            }
+        }, null, true);
+        latch2.await();
+        assertFalse(failed2.get());
+        assertNotNull(positionRef2.get());
+        assertEquals(positionRef2.get(), position1);
+
+        // find the newest entry without start and end position
+        AtomicBoolean failed3 = new AtomicBoolean(false);
+        CountDownLatch latch3 = new CountDownLatch(1);
+        AtomicReference<Position> positionRef3 = new AtomicReference<>();
+        
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries,
 condition, null, null, new AsyncCallbacks.FindEntryCallback() {
+            @Override
+            public void findEntryComplete(Position position, Object ctx) {
+                positionRef3.set(position);
+                latch3.countDown();
+            }
+
+            @Override
+            public void findEntryFailed(ManagedLedgerException exception, 
Optional<Position> failedReadPosition, Object ctx) {
+                failed3.set(true);
+                latch3.countDown();
+            }
+        }, null, true);
+        latch3.await();
+        assertFalse(failed3.get());
+        assertNotNull(positionRef3.get());
+        assertEquals(positionRef3.get(), position1);
+
+        // find position3
+        AtomicBoolean failed4 = new AtomicBoolean(false);
+        CountDownLatch latch4 = new CountDownLatch(1);
+        AtomicReference<Position> positionRef4 = new AtomicReference<>();
+        
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries,
 entry -> {
+            try {
+                Position p = entry.getPosition();
+                return 
transferToPosImpl(p).compareTo(transferToPosImpl(position3)) <= 0;
+            } finally {
+                entry.release();
+            }
+        }, transferToPosImpl(position3), transferToPosImpl(position3), new 
AsyncCallbacks.FindEntryCallback() {
+            @Override
+            public void findEntryComplete(Position position, Object ctx) {
+                positionRef4.set(position);
+                latch4.countDown();
+            }
+
+            @Override
+            public void findEntryFailed(ManagedLedgerException exception, 
Optional<Position> failedReadPosition, Object ctx) {
+                failed4.set(true);
+                latch4.countDown();
+            }
+        }, null, true);
+        latch4.await();
+        assertFalse(failed4.get());
+        assertNotNull(positionRef4.get());
+        assertEquals(positionRef4.get(), position3);
+    }
+
+
+    @Test
+    public void testFindNewestMatching_SearchActiveEntries_ByStartAndEnd() 
throws Exception {
+        ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
+        managedLedgerConfig.setMaxEntriesPerLedger(2);
+        managedLedgerConfig.setMinimumRolloverTime(0, TimeUnit.MILLISECONDS);
+        @Cleanup
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) 
factory.open("testFindNewestMatching_SearchActiveEntries_ByStartAndEnd", 
managedLedgerConfig);
+        @Cleanup
+        ManagedCursorImpl managedCursor = (ManagedCursorImpl) 
ledger.openCursor("test");
+
+        Position position = ledger.addEntry("test".getBytes(Encoding));
+        Position position1 = ledger.addEntry("test1".getBytes(Encoding));
+        Position position2 = ledger.addEntry("test2".getBytes(Encoding));
+        Position position3 = ledger.addEntry("test3".getBytes(Encoding));
+        Position position4 = ledger.addEntry("test4".getBytes(Encoding));
+        managedCursor.markDelete(position1);
+        assertEquals(managedCursor.getNumberOfEntries(), 3);
+
+        Predicate<Entry> condition = entry -> {
+            try {
+                PositionImpl p = (PositionImpl) entry.getPosition();
+                return 
transferToPosImpl(p).compareTo(transferToPosImpl(position3)) <= 0;
+            } finally {
+                entry.release();
+            }
+        };
+
+        // find the newest entry with start and end position
+        AtomicBoolean failed = new AtomicBoolean(false);
+        CountDownLatch latch = new CountDownLatch(1);
+        AtomicReference<Position> positionRef = new AtomicReference<>();
+        
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries,
 condition, transferToPosImpl(position2), transferToPosImpl(position4), new 
AsyncCallbacks.FindEntryCallback() {
+            @Override
+            public void findEntryComplete(Position position, Object ctx) {
+                positionRef.set(position);
+                latch.countDown();
+            }
+
+            @Override
+            public void findEntryFailed(ManagedLedgerException exception, 
Optional<Position> failedReadPosition, Object ctx) {
+                failed.set(true);
+                latch.countDown();
+            }
+        }, null, true);
+        latch.await();
+        assertFalse(failed.get());
+        assertNotNull(positionRef.get());
+        assertEquals(positionRef.get(), position3);
+
+        // find the newest entry with start
+        AtomicBoolean failed1 = new AtomicBoolean(false);
+        CountDownLatch latch1 = new CountDownLatch(1);
+        AtomicReference<Position> positionRef1 = new AtomicReference<>();
+        
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries,
 condition, transferToPosImpl(position2), null, new 
AsyncCallbacks.FindEntryCallback() {
+            @Override
+            public void findEntryComplete(Position position, Object ctx) {
+                positionRef1.set(position);
+                latch1.countDown();
+            }
+
+            @Override
+            public void findEntryFailed(ManagedLedgerException exception, 
Optional<Position> failedReadPosition, Object ctx) {
+                failed1.set(true);
+                latch1.countDown();
+            }
+        }, null, true);
+
+        latch1.await();
+        assertFalse(failed1.get());
+        assertNotNull(positionRef1.get());
+        assertEquals(positionRef1.get(), position3);
+
+        // find the newest entry with end
+        AtomicBoolean failed2 = new AtomicBoolean(false);
+        CountDownLatch latch2 = new CountDownLatch(1);
+        AtomicReference<Position> positionRef2 = new AtomicReference<>();
+        
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries,
 condition, null, transferToPosImpl(position4), new 
AsyncCallbacks.FindEntryCallback() {
+            @Override
+            public void findEntryComplete(Position position, Object ctx) {
+                positionRef2.set(position);
+                latch2.countDown();
+            }
+
+            @Override
+            public void findEntryFailed(ManagedLedgerException exception, 
Optional<Position> failedReadPosition, Object ctx) {
+                failed2.set(true);
+                latch2.countDown();
+            }
+        }, null, true);
+
+        latch2.await();
+        assertFalse(failed2.get());
+        assertNotNull(positionRef2.get());
+        assertEquals(positionRef2.get(), position3);
+
+        // find the newest entry without start and end position
+        AtomicBoolean failed3 = new AtomicBoolean(false);
+        CountDownLatch latch3 = new CountDownLatch(1);
+        AtomicReference<Position> positionRef3 = new AtomicReference<>();
+        
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries,
 condition, null, null, new AsyncCallbacks.FindEntryCallback() {
+            @Override
+            public void findEntryComplete(Position position, Object ctx) {
+                positionRef3.set(position);
+                latch3.countDown();
+            }
+
+            @Override
+            public void findEntryFailed(ManagedLedgerException exception, 
Optional<Position> failedReadPosition, Object ctx) {
+                failed3.set(true);
+                latch3.countDown();
+            }
+        }, null, true);
+        latch3.await();
+        assertFalse(failed3.get());
+        assertNotNull(positionRef3.get());
+        assertEquals(positionRef3.get(), position3);
+
+        // find position4
+        AtomicBoolean failed4 = new AtomicBoolean(false);
+        CountDownLatch latch4 = new CountDownLatch(1);
+        AtomicReference<Position> positionRef4 = new AtomicReference<>();
+        
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries,
 entry -> {
+            try {
+                Position p = entry.getPosition();
+                return 
transferToPosImpl(p).compareTo(transferToPosImpl(position4)) <= 0;
+            } finally {
+                entry.release();
+            }
+        }, transferToPosImpl(position4), transferToPosImpl(position4), new 
AsyncCallbacks.FindEntryCallback() {
+            @Override
+            public void findEntryComplete(Position position, Object ctx) {
+                positionRef4.set(position);
+                latch4.countDown();
+            }
+
+            @Override
+            public void findEntryFailed(ManagedLedgerException exception, 
Optional<Position> failedReadPosition, Object ctx) {
+                failed4.set(true);
+                latch4.countDown();
+            }
+        }, null, true);
+        latch4.await();
+        assertFalse(failed4.get());
+        assertNotNull(positionRef4.get());
+        assertEquals(positionRef4.get(), position4);
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(ManagedCursorTest.class);
 }
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 039b27e32b6..57ea9838b2d 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2167,6 +2167,24 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
         doc = "Max time before triggering a rollover on a cursor ledger"
     )
     private int managedLedgerCursorRolloverTimeInSeconds = 14400;
+
+    @FieldContext(
+            category = CATEGORY_STORAGE_ML,
+            dynamic = true,
+            doc = "When resetting a subscription by timestamp, the broker will 
use the"
+                    + " ledger closing timestamp metadata to determine the 
range of ledgers"
+                    + " to search for the message where the subscription 
position is reset to. "
+                    + " Since by default, the search condition is based on the 
message publish time provided by the "
+                    + " client at the publish time, there will be some clock 
skew between the ledger closing timestamp "
+                    + " metadata and the publish time."
+                    + " This configuration is used to set the max clock skew 
between the ledger closing"
+                    + " timestamp and the message publish time for finding the 
range of ledgers to open for searching."
+                    + " The default value is 60000 milliseconds (60 seconds). 
When set to -1, the broker will not"
+                    + " use the ledger closing timestamp metadata to determine 
the range of ledgers to search for the"
+                    + " message."
+    )
+    private int managedLedgerCursorResetLedgerCloseTimestampMaxClockSkewMillis 
= 60000;
+
     @FieldContext(
         category = CATEGORY_STORAGE_ML,
         doc = "Max number of `acknowledgment holes` that are going to be 
persistently stored.\n\n"
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
index eca31ec230a..f60e6047f56 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
@@ -22,7 +22,6 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import javax.annotation.Nonnull;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException;
 import org.apache.pulsar.client.util.RetryUtil;
 import org.apache.pulsar.common.classification.InterfaceStability;
@@ -32,7 +31,7 @@ import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.util.Backoff;
 import org.apache.pulsar.common.util.BackoffBuilder;
 import org.apache.pulsar.common.util.FutureUtil;
-import org.jetbrains.annotations.NotNull;
+import org.jspecify.annotations.NonNull;
 
 /**
  * Topic policies service.
@@ -121,8 +120,8 @@ public interface TopicPoliciesService extends AutoCloseable 
{
      * @return A CompletableFuture containing an Optional of TopicPolicies.
      * @throws NullPointerException If the topicName is null.
      */
-    @Nonnull
-    CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(@Nonnull 
TopicName topicName, boolean isGlobal);
+    @NonNull
+    CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(@NonNull 
TopicName topicName, boolean isGlobal);
 
     /**
      * Asynchronously retrieves topic policies.
@@ -134,8 +133,8 @@ public interface TopicPoliciesService extends AutoCloseable 
{
      * @return A CompletableFuture containing an Optional of TopicPolicies.
      * @throws NullPointerException If the topicName is null.
      */
-    @Nonnull
-    CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(@Nonnull 
TopicName topicName);
+    @NonNull
+    CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(@NonNull 
TopicName topicName);
 
     /**
      * Get policies for a topic without cache async.
@@ -190,16 +189,16 @@ public interface TopicPoliciesService extends 
AutoCloseable {
             return null;
         }
 
-        @NotNull
+        @NonNull
         @Override
-        public CompletableFuture<Optional<TopicPolicies>> 
getTopicPoliciesAsync(@NotNull TopicName topicName,
+        public CompletableFuture<Optional<TopicPolicies>> 
getTopicPoliciesAsync(@NonNull TopicName topicName,
                                                                                
 boolean isGlobal) {
             return CompletableFuture.completedFuture(Optional.empty());
         }
 
-        @NotNull
+        @NonNull
         @Override
-        public CompletableFuture<Optional<TopicPolicies>> 
getTopicPoliciesAsync(@NotNull TopicName topicName) {
+        public CompletableFuture<Optional<TopicPolicies>> 
getTopicPoliciesAsync(@NonNull TopicName topicName) {
             return CompletableFuture.completedFuture(Optional.empty());
         }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java
index 08273155e4c..ae08cb7b28d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java
@@ -25,6 +25,9 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import 
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.util.Codec;
@@ -37,6 +40,7 @@ import org.slf4j.LoggerFactory;
 public class PersistentMessageFinder implements 
AsyncCallbacks.FindEntryCallback {
     private final ManagedCursor cursor;
     private final String subName;
+    private final int ledgerCloseTimestampMaxClockSkewMillis;
     private final String topicName;
     private long timestamp = 0;
 
@@ -48,19 +52,23 @@ public class PersistentMessageFinder implements 
AsyncCallbacks.FindEntryCallback
             AtomicIntegerFieldUpdater
                     .newUpdater(PersistentMessageFinder.class, 
"messageFindInProgress");
 
-    public PersistentMessageFinder(String topicName, ManagedCursor cursor) {
+    public PersistentMessageFinder(String topicName, ManagedCursor cursor, int 
ledgerCloseTimestampMaxClockSkewMillis) {
         this.topicName = topicName;
         this.cursor = cursor;
         this.subName = Codec.decode(cursor.getName());
+        this.ledgerCloseTimestampMaxClockSkewMillis = 
ledgerCloseTimestampMaxClockSkewMillis;
     }
 
     public void findMessages(final long timestamp, 
AsyncCallbacks.FindEntryCallback callback) {
-        this.timestamp = timestamp;
         if (messageFindInProgressUpdater.compareAndSet(this, FALSE, TRUE)) {
+            this.timestamp = timestamp;
             if (log.isDebugEnabled()) {
                 log.debug("[{}] Starting message position find at timestamp 
{}", subName, timestamp);
             }
-
+            Pair<PositionImpl, PositionImpl> range =
+                    
getFindPositionRange(cursor.getManagedLedger().getLedgersInfo().values(),
+                            (PositionImpl) 
cursor.getManagedLedger().getLastConfirmedEntry(), timestamp,
+                            ledgerCloseTimestampMaxClockSkewMillis);
             
cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries,
 entry -> {
                 try {
                     long entryTimestamp = 
Commands.getEntryTimestamp(entry.getDataBuffer());
@@ -71,7 +79,7 @@ public class PersistentMessageFinder implements 
AsyncCallbacks.FindEntryCallback
                     entry.release();
                 }
                 return false;
-            }, this, callback, true);
+            }, range.getLeft(), range.getRight(), this, callback, true);
         } else {
             if (log.isDebugEnabled()) {
                 log.debug("[{}][{}] Ignore message position find scheduled 
task, last find is still running", topicName,
@@ -83,6 +91,59 @@ public class PersistentMessageFinder implements 
AsyncCallbacks.FindEntryCallback
         }
     }
 
+    public static Pair<PositionImpl, PositionImpl> 
getFindPositionRange(Iterable<LedgerInfo> ledgerInfos,
+                                                                PositionImpl 
lastConfirmedEntry, long targetTimestamp,
+                                                                int 
ledgerCloseTimestampMaxClockSkewMillis) {
+        if (ledgerCloseTimestampMaxClockSkewMillis < 0) {
+            // this feature is disabled when the value is negative
+            return Pair.of(null, null);
+        }
+
+        long targetTimestampMin = targetTimestamp - 
ledgerCloseTimestampMaxClockSkewMillis;
+        long targetTimestampMax = targetTimestamp + 
ledgerCloseTimestampMaxClockSkewMillis;
+
+        PositionImpl start = null;
+        PositionImpl end = null;
+
+        LedgerInfo secondToLastLedgerInfo = null;
+        LedgerInfo lastLedgerInfo = null;
+        for (LedgerInfo info : ledgerInfos) {
+            if (!info.hasTimestamp()) {
+                // unexpected case, don't set start and end
+                return Pair.of(null, null);
+            }
+            secondToLastLedgerInfo = lastLedgerInfo;
+            lastLedgerInfo = info;
+            long closeTimestamp = info.getTimestamp();
+            // For an open ledger, closeTimestamp is 0
+            if (closeTimestamp == 0) {
+                end = null;
+                break;
+            }
+            if (closeTimestamp <= targetTimestampMin) {
+                start = new PositionImpl(info.getLedgerId(), 0);
+            } else if (closeTimestamp > targetTimestampMax) {
+                // If the close timestamp is greater than the timestamp
+                end = new PositionImpl(info.getLedgerId(), info.getEntries() - 
1);
+                break;
+            }
+        }
+        // If the second-to-last ledger's close timestamp is less than the 
target timestamp, then start from the
+        // first entry of the last ledger when there are confirmed entries in 
the ledger
+        if (lastLedgerInfo != null && secondToLastLedgerInfo != null
+                && secondToLastLedgerInfo.getTimestamp() > 0
+                && secondToLastLedgerInfo.getTimestamp() < targetTimestampMin) 
{
+            PositionImpl firstPositionInLedger = new 
PositionImpl(lastLedgerInfo.getLedgerId(), 0);
+            if (lastConfirmedEntry != null
+                    && lastConfirmedEntry.compareTo(firstPositionInLedger) >= 
0) {
+                start = firstPositionInLedger;
+            } else {
+                start = lastConfirmedEntry;
+            }
+        }
+        return Pair.of(start, end);
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(PersistentMessageFinder.class);
 
     @Override
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 4ae289d7bae..72b47fdbf96 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -136,6 +136,7 @@ public class PersistentSubscription extends 
AbstractSubscription {
     private volatile CompletableFuture<Void> fenceFuture;
     private volatile CompletableFuture<Void> inProgressResetCursorFuture;
     private volatile Boolean replicatedControlled;
+    private final ServiceConfiguration config;
 
     static Map<String, Long> getBaseCursorProperties(Boolean isReplicated) {
         return isReplicated != null && isReplicated ? 
REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES :
@@ -154,6 +155,7 @@ public class PersistentSubscription extends 
AbstractSubscription {
     public PersistentSubscription(PersistentTopic topic, String 
subscriptionName, ManagedCursor cursor,
                                   Boolean replicated, Map<String, String> 
subscriptionProperties) {
         this.topic = topic;
+        this.config = topic.getBrokerService().getPulsar().getConfig();
         this.cursor = cursor;
         this.topicName = topic.getName();
         this.subName = subscriptionName;
@@ -164,7 +166,7 @@ public class PersistentSubscription extends 
AbstractSubscription {
         }
         this.subscriptionProperties = MapUtils.isEmpty(subscriptionProperties)
                 ? Collections.emptyMap() : 
Collections.unmodifiableMap(subscriptionProperties);
-        if 
(topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled()
+        if (config.isTransactionCoordinatorEnabled()
                 && !isEventSystemTopic(TopicName.get(topicName))
                 && !ExtensibleLoadManagerImpl.isInternalTopic(topicName)) {
             this.pendingAckHandle = new PendingAckHandleImpl(this);
@@ -201,7 +203,6 @@ public class PersistentSubscription extends 
AbstractSubscription {
 
     public boolean setReplicated(boolean replicated) {
         replicatedControlled = replicated;
-        ServiceConfiguration config = 
topic.getBrokerService().getPulsar().getConfig();
 
         if (!replicated || !config.isEnableReplicatedSubscriptions()) {
             this.replicatedSubscriptionSnapshotCache = null;
@@ -409,7 +410,7 @@ public class PersistentSubscription extends 
AbstractSubscription {
                 log.debug("[{}][{}] Individual acks on {}", topicName, 
subName, positions);
             }
             cursor.asyncDelete(positions, deleteCallback, 
previousMarkDeletePosition);
-            if 
(topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled())
 {
+            if (config.isTransactionCoordinatorEnabled()) {
                 positions.forEach(position -> {
                     if (((ManagedCursorImpl) 
cursor).isMessageDeleted(position)) {
                         pendingAckHandle.clearIndividualPosition(position);
@@ -585,10 +586,9 @@ public class PersistentSubscription extends 
AbstractSubscription {
         final EntryFilterSupport entryFilterSupport = dispatcher != null
                 ? (EntryFilterSupport) dispatcher : new 
EntryFilterSupport(this);
         // we put some hard limits on the scan, in order to prevent denial of 
services
-        ServiceConfiguration configuration = 
topic.getBrokerService().getPulsar().getConfiguration();
-        long maxEntries = configuration.getSubscriptionBacklogScanMaxEntries();
-        long timeOutMs = configuration.getSubscriptionBacklogScanMaxTimeMs();
-        int batchSize = configuration.getDispatcherMaxReadBatchSize();
+        long maxEntries = config.getSubscriptionBacklogScanMaxEntries();
+        long timeOutMs = config.getSubscriptionBacklogScanMaxTimeMs();
+        int batchSize = config.getDispatcherMaxReadBatchSize();
         AtomicReference<Position> firstPosition = new AtomicReference<>();
         AtomicReference<Position> lastPosition = new AtomicReference<>();
         final Predicate<Entry> condition = entry -> {
@@ -763,7 +763,8 @@ public class PersistentSubscription extends 
AbstractSubscription {
     @Override
     public CompletableFuture<Void> resetCursor(long timestamp) {
         CompletableFuture<Void> future = new CompletableFuture<>();
-        PersistentMessageFinder persistentMessageFinder = new 
PersistentMessageFinder(topicName, cursor);
+        PersistentMessageFinder persistentMessageFinder = new 
PersistentMessageFinder(topicName, cursor,
+                
config.getManagedLedgerCursorResetLedgerCloseTimestampMaxClockSkewMillis());
 
         if (log.isDebugEnabled()) {
             log.debug("[{}][{}] Resetting subscription to timestamp {}", 
topicName, subName, timestamp);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
index 0972c9098b5..806bd797fc8 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
@@ -59,6 +59,7 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import 
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
 import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import 
org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor;
 import org.apache.pulsar.broker.service.persistent.PersistentMessageFinder;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
@@ -138,7 +139,7 @@ public class PersistentMessageFinderTest extends 
MockedBookKeeperTestCase {
     }
 
     CompletableFuture<Void> findMessage(final Result result, final 
ManagedCursor c1, final long timestamp) {
-        PersistentMessageFinder messageFinder = new 
PersistentMessageFinder("topicname", c1);
+        PersistentMessageFinder messageFinder = new 
PersistentMessageFinder("topicname", c1, 0);
 
         final CompletableFuture<Void> future = new CompletableFuture<>();
         messageFinder.findMessages(timestamp, new 
AsyncCallbacks.FindEntryCallback() {
@@ -217,7 +218,7 @@ public class PersistentMessageFinderTest extends 
MockedBookKeeperTestCase {
         assertNotEquals(result.position, null);
         assertEquals(result.position, lastPosition);
 
-        PersistentMessageFinder messageFinder = new 
PersistentMessageFinder("topicname", c1);
+        PersistentMessageFinder messageFinder = new 
PersistentMessageFinder("topicname", c1, 0);
         final AtomicBoolean ex = new AtomicBoolean(false);
         messageFinder.findEntryFailed(new ManagedLedgerException("failed"), 
Optional.empty(),
                 new AsyncCallbacks.FindEntryCallback() {
@@ -589,4 +590,241 @@ public class PersistentMessageFinderTest extends 
MockedBookKeeperTestCase {
         resetCursorData.setExcluded(true);
         System.out.println(Entity.entity(resetCursorData, 
MediaType.APPLICATION_JSON));
     }
+
+    @Test
+    public void testGetFindPositionRange_EmptyLedgerInfos() {
+        List<LedgerInfo> ledgerInfos = new ArrayList<>();
+        PositionImpl lastConfirmedEntry = null;
+        long targetTimestamp = 2000;
+        Pair<PositionImpl, PositionImpl> range =
+                PersistentMessageFinder.getFindPositionRange(ledgerInfos, 
lastConfirmedEntry, targetTimestamp, 0);
+
+        assertNotNull(range);
+        assertNull(range.getLeft());
+        assertNull(range.getRight());
+    }
+
+    @Test
+    public void testGetFindPositionRange_AllTimestampsLessThanTarget() {
+        List<LedgerInfo> ledgerInfos = new ArrayList<>();
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(1).setEntries(10).setTimestamp(1000).build());
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(2).setEntries(10).setTimestamp(1500).build());
+        PositionImpl lastConfirmedEntry = new PositionImpl(2, 9);
+
+        long targetTimestamp = 2000;
+        Pair<PositionImpl, PositionImpl> range = 
PersistentMessageFinder.getFindPositionRange(ledgerInfos,
+                lastConfirmedEntry, targetTimestamp, 0);
+
+        assertNotNull(range);
+        assertNotNull(range.getLeft());
+        assertNull(range.getRight());
+        assertEquals(range.getLeft(), new PositionImpl(2, 0));
+    }
+
+    @Test
+    public void testGetFindPositionRange_LastTimestampIsZero() {
+        List<LedgerInfo> ledgerInfos = new ArrayList<>();
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(1).setEntries(10).setTimestamp(1000).build());
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(2).setEntries(10).setTimestamp(1500).build());
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(3).setEntries(10).setTimestamp(0).build());
+        PositionImpl lastConfirmedEntry = new PositionImpl(3, 5);
+
+        long targetTimestamp = 2000;
+        Pair<PositionImpl, PositionImpl> range = 
PersistentMessageFinder.getFindPositionRange(ledgerInfos,
+                lastConfirmedEntry, targetTimestamp, 0);
+
+        assertNotNull(range);
+        assertNotNull(range.getLeft());
+        assertNull(range.getRight());
+        assertEquals(range.getLeft(), new PositionImpl(3, 0));
+    }
+
+    @Test
+    public void testGetFindPositionRange_LastTimestampIsZeroWithNoEntries() {
+        List<LedgerInfo> ledgerInfos = new ArrayList<>();
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(1).setEntries(10).setTimestamp(1000).build());
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(2).setEntries(10).setTimestamp(1500).build());
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(3).setEntries(10).setTimestamp(0).build());
+        PositionImpl lastConfirmedEntry = new PositionImpl(2, 9);
+
+        long targetTimestamp = 2000;
+        Pair<PositionImpl, PositionImpl> range = 
PersistentMessageFinder.getFindPositionRange(ledgerInfos,
+                lastConfirmedEntry, targetTimestamp, 0);
+
+        assertNotNull(range);
+        assertNotNull(range.getLeft());
+        assertNull(range.getRight());
+        assertEquals(range.getLeft(), new PositionImpl(2, 9));
+    }
+
+    @Test
+    public void testGetFindPositionRange_AllTimestampsGreaterThanTarget() {
+        List<LedgerInfo> ledgerInfos = new ArrayList<>();
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(1).setEntries(10).setTimestamp(3000).build());
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(2).setEntries(10).setTimestamp(4000).build());
+        PositionImpl lastConfirmedEntry = new PositionImpl(2, 9);
+
+        long targetTimestamp = 2000;
+        Pair<PositionImpl, PositionImpl> range = 
PersistentMessageFinder.getFindPositionRange(ledgerInfos,
+                lastConfirmedEntry, targetTimestamp, 0);
+
+        assertNotNull(range);
+        assertNull(range.getLeft());
+        assertNotNull(range.getRight());
+        assertEquals(range.getRight(), new PositionImpl(1, 9));
+    }
+
+    @Test
+    public void testGetFindPositionRange_MixedTimestamps() {
+        List<LedgerInfo> ledgerInfos = new ArrayList<>();
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(1).setEntries(10).setTimestamp(1000).build());
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(2).setEntries(10).setTimestamp(2000).build());
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(3).setEntries(10).setTimestamp(3000).build());
+        PositionImpl lastConfirmedEntry = new PositionImpl(3, 9);
+
+        long targetTimestamp = 2500;
+        Pair<PositionImpl, PositionImpl> range = 
PersistentMessageFinder.getFindPositionRange(ledgerInfos,
+                lastConfirmedEntry, targetTimestamp, 0);
+
+        assertNotNull(range);
+        assertNotNull(range.getLeft());
+        assertNotNull(range.getRight());
+        assertEquals(range.getLeft(), new PositionImpl(3, 0));
+        assertEquals(range.getRight(), new PositionImpl(3, 9));
+    }
+
+    @Test
+    public void testGetFindPositionRange_TimestampAtBoundary() {
+        List<LedgerInfo> ledgerInfos = new ArrayList<>();
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(1).setEntries(10).setTimestamp(1000).build());
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(2).setEntries(10).setTimestamp(2000).build());
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(3).setEntries(10).setTimestamp(3000).build());
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(4).setEntries(10).setTimestamp(4000).build());
+        PositionImpl lastConfirmedEntry = new PositionImpl(4, 9);
+
+        long targetTimestamp = 3000;
+        Pair<PositionImpl, PositionImpl> range = 
PersistentMessageFinder.getFindPositionRange(ledgerInfos,
+                lastConfirmedEntry, targetTimestamp, 0);
+
+        assertNotNull(range);
+        assertNotNull(range.getLeft());
+        assertNotNull(range.getRight());
+        assertEquals(range.getLeft(), new PositionImpl(3, 0));
+        // there might be entries in the next ledger with the same timestamp 
as the target timestamp, even though
+        // the close timestamp of ledger 3 is equals to the target timestamp
+        assertEquals(range.getRight(), new PositionImpl(4, 9));
+    }
+
+    @Test
+    public void testGetFindPositionRange_ClockSkew() {
+        List<LedgerInfo> ledgerInfos = new ArrayList<>();
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(1).setEntries(10).setTimestamp(1000).build());
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(2).setEntries(10).setTimestamp(2000).build());
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(3).setEntries(10).setTimestamp(2010).build());
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(4).setEntries(10).setTimestamp(4000).build());
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(5).setTimestamp(0).build());
+        PositionImpl lastConfirmedEntry = new PositionImpl(5, 5);
+
+        long targetTimestamp = 2009;
+        Pair<PositionImpl, PositionImpl> range = 
PersistentMessageFinder.getFindPositionRange(ledgerInfos,
+                lastConfirmedEntry, targetTimestamp, 10);
+
+        assertNotNull(range);
+        assertNotNull(range.getLeft());
+        assertNotNull(range.getRight());
+        assertEquals(range.getLeft(), new PositionImpl(1, 0));
+        assertEquals(range.getRight(), new PositionImpl(4, 9));
+    }
+
+    @Test
+    public void testGetFindPositionRange_ClockSkewCase2() {
+        List<LedgerInfo> ledgerInfos = new ArrayList<>();
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(1).setEntries(10).setTimestamp(1000).build());
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(2).setEntries(10).setTimestamp(2000).build());
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(3).setEntries(10).setTimestamp(3000).build());
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(4).setEntries(10).setTimestamp(4000).build());
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(5).setTimestamp(0).build());
+        PositionImpl lastConfirmedEntry = new PositionImpl(5, 5);
+
+        long targetTimestamp = 2995;
+        Pair<PositionImpl, PositionImpl> range = 
PersistentMessageFinder.getFindPositionRange(ledgerInfos,
+                lastConfirmedEntry, targetTimestamp, 10);
+
+        assertNotNull(range);
+        assertNotNull(range.getLeft());
+        assertNotNull(range.getRight());
+        assertEquals(range.getLeft(), new PositionImpl(2, 0));
+        assertEquals(range.getRight(), new PositionImpl(4, 9));
+    }
+
+    @Test
+    public void testGetFindPositionRange_ClockSkewCase3() {
+        List<LedgerInfo> ledgerInfos = new ArrayList<>();
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(1).setEntries(10).setTimestamp(1000).build());
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(2).setEntries(10).setTimestamp(2000).build());
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(3).setEntries(10).setTimestamp(3000).build());
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(4).setEntries(10).setTimestamp(4000).build());
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(5).setTimestamp(0).build());
+        PositionImpl lastConfirmedEntry = new PositionImpl(5, 5);
+
+        long targetTimestamp = 3005;
+        Pair<PositionImpl, PositionImpl> range = 
PersistentMessageFinder.getFindPositionRange(ledgerInfos,
+                lastConfirmedEntry, targetTimestamp, 10);
+
+        assertNotNull(range);
+        assertNotNull(range.getLeft());
+        assertNotNull(range.getRight());
+        assertEquals(range.getLeft(), new PositionImpl(2, 0));
+        assertEquals(range.getRight(), new PositionImpl(4, 9));
+    }
+
+    @Test
+    public void 
testGetFindPositionRange_FeatureDisabledWithNegativeClockSkew() {
+        List<LedgerInfo> ledgerInfos = new ArrayList<>();
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(1).setEntries(10).setTimestamp(1000).build());
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(2).setEntries(10).setTimestamp(2000).build());
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(3).setEntries(10).setTimestamp(2010).build());
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(4).setEntries(10).setTimestamp(4000).build());
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(5).setTimestamp(0).build());
+        PositionImpl lastConfirmedEntry = new PositionImpl(5, 5);
+
+        long targetTimestamp = 2009;
+        Pair<PositionImpl, PositionImpl> range = 
PersistentMessageFinder.getFindPositionRange(ledgerInfos,
+                lastConfirmedEntry, targetTimestamp, -1);
+
+        assertNotNull(range);
+        assertNull(range.getLeft());
+        assertNull(range.getRight());
+    }
+
+    @Test
+    public void testGetFindPositionRange_SingleLedger() {
+        List<LedgerInfo> ledgerInfos = new ArrayList<>();
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(1).setTimestamp(0).build());
+        PositionImpl lastConfirmedEntry = new PositionImpl(1, 5);
+
+        long targetTimestamp = 2500;
+        Pair<PositionImpl, PositionImpl> range = 
PersistentMessageFinder.getFindPositionRange(ledgerInfos,
+                lastConfirmedEntry, targetTimestamp, 0);
+
+        assertNotNull(range);
+        assertNull(range.getLeft());
+        assertNull(range.getRight());
+    }
+
+    @Test
+    public void testGetFindPositionRange_SingleClosedLedger() {
+        List<LedgerInfo> ledgerInfos = new ArrayList<>();
+        
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(1).setEntries(10).setTimestamp(1000).build());
+        PositionImpl lastConfirmedEntry = new PositionImpl(1, 9);
+
+        long targetTimestamp = 2500;
+        Pair<PositionImpl, PositionImpl> range = 
PersistentMessageFinder.getFindPositionRange(ledgerInfos,
+                lastConfirmedEntry, targetTimestamp, 0);
+
+        assertNotNull(range);
+        assertNotNull(range.getLeft());
+        assertNull(range.getRight());
+        assertEquals(range.getLeft(), new PositionImpl(1, 0));
+    }
 }
diff --git 
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java
 
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java
index 8f52d20c5ee..756e1a30f92 100644
--- 
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java
+++ 
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java
@@ -20,6 +20,7 @@ package org.apache.bookkeeper.mledger.offload.jcloud.impl;
 
 import io.netty.buffer.ByteBuf;
 import java.util.Map;
+import java.util.NavigableMap;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Predicate;
@@ -381,6 +382,11 @@ public class MockManagedLedger implements ManagedLedger {
         // no-op
     }
 
+    @Override
+    public NavigableMap<Long, LedgerInfo> getLedgersInfo() {
+        return null;
+    }
+
     @Override
     public CompletableFuture<Position> asyncMigrate() {
         // no-op

Reply via email to