This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new 00ee67df999 [improve][broker] Optimize subscription seek (cursor
reset) by timestamp (#22792)
00ee67df999 is described below
commit 00ee67df999a6c9a6a60502a08ce5274c30ae49d
Author: 道君 <[email protected]>
AuthorDate: Thu Jan 9 21:05:39 2025 +0800
[improve][broker] Optimize subscription seek (cursor reset) by timestamp
(#22792)
Co-authored-by: Lari Hotari <[email protected]>
(cherry picked from commit 2eb4eabc84f68fef5b29d894631c7c23d06ec3af)
---
.../apache/bookkeeper/mledger/ManagedCursor.java | 25 ++
.../apache/bookkeeper/mledger/ManagedLedger.java | 6 +
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 58 ++--
.../bookkeeper/mledger/impl/ManagedCursorTest.java | 295 +++++++++++++++++++++
.../apache/pulsar/broker/ServiceConfiguration.java | 18 ++
.../broker/service/TopicPoliciesService.java | 19 +-
.../persistent/PersistentMessageFinder.java | 69 ++++-
.../service/persistent/PersistentSubscription.java | 17 +-
.../service/PersistentMessageFinderTest.java | 242 ++++++++++++++++-
.../offload/jcloud/impl/MockManagedLedger.java | 6 +
10 files changed, 716 insertions(+), 39 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
index 227b5429abf..221ef56696b 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
@@ -658,6 +658,31 @@ public interface ManagedCursor {
void asyncFindNewestMatching(FindPositionConstraint constraint,
Predicate<Entry> condition,
FindEntryCallback callback, Object ctx, boolean isFindFromLedger);
+
+ /**
+ * Find the newest entry that matches the given predicate.
+ *
+ * @param constraint
+ * search only active entries or all entries
+ * @param condition
+ * predicate that reads an entry an applies a condition
+ * @param callback
+ * callback object returning the resultant position
+ * @param startPosition
+ * start position to search from.
+ * @param endPosition
+ * end position to search to.
+ * @param ctx
+ * opaque context
+ * @param isFindFromLedger
+ * find the newest entry from ledger
+ */
+ default void asyncFindNewestMatching(FindPositionConstraint constraint,
Predicate<Entry> condition,
+ PositionImpl startPosition, PositionImpl
endPosition, FindEntryCallback callback,
+ Object ctx, boolean isFindFromLedger) {
+ asyncFindNewestMatching(constraint, condition, callback, ctx,
isFindFromLedger);
+ }
+
/**
* reset the cursor to specified position to enable replay of messages.
*
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
index 955a0d78502..c9507e38065 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
@@ -20,6 +20,7 @@ package org.apache.bookkeeper.mledger;
import io.netty.buffer.ByteBuf;
import java.util.Map;
+import java.util.NavigableMap;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate;
@@ -696,4 +697,9 @@ public interface ManagedLedger {
* Check if managed ledger should cache backlog reads.
*/
void checkCursorsToCacheEntries();
+
+ /**
+ * Get all the managed ledgers.
+ */
+ NavigableMap<Long, LedgerInfo> getLedgersInfo();
}
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index fce2a26e156..2005d3c08ef 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1288,28 +1288,56 @@ public class ManagedCursorImpl implements ManagedCursor
{
@Override
public void asyncFindNewestMatching(FindPositionConstraint constraint,
Predicate<Entry> condition,
- FindEntryCallback callback, Object ctx, boolean isFindFromLedger) {
- OpFindNewest op;
- PositionImpl startPosition = null;
- long max = 0;
+ FindEntryCallback callback, Object
ctx, boolean isFindFromLedger) {
+ asyncFindNewestMatching(constraint, condition, null, null, callback,
ctx,
+ isFindFromLedger);
+ }
+
+
+ @Override
+ public void asyncFindNewestMatching(FindPositionConstraint constraint,
Predicate<Entry> condition,
+ PositionImpl start, PositionImpl end,
FindEntryCallback callback,
+ Object ctx, boolean isFindFromLedger) {
+ PositionImpl startPosition;
switch (constraint) {
- case SearchAllAvailableEntries:
- startPosition = (PositionImpl) getFirstPosition();
- max = ledger.getNumberOfEntries() - 1;
- break;
- case SearchActiveEntries:
- startPosition = ledger.getNextValidPosition(markDeletePosition);
- max = getNumberOfEntriesInStorage();
- break;
- default:
- callback.findEntryFailed(new ManagedLedgerException("Unknown
position constraint"), Optional.empty(), ctx);
- return;
+ case SearchAllAvailableEntries ->
+ startPosition = start == null ? (PositionImpl)
getFirstPosition() : start;
+ case SearchActiveEntries -> {
+ if (start == null) {
+ startPosition =
ledger.getNextValidPosition(markDeletePosition);
+ } else {
+ startPosition = start;
+ startPosition =
startPosition.compareTo(markDeletePosition) <= 0
+ ? ledger.getNextValidPosition(startPosition) :
startPosition;
+ }
+ }
+ default -> {
+ callback.findEntryFailed(
+ new ManagedLedgerException("Unknown position
constraint"), Optional.empty(), ctx);
+ return;
+ }
}
+ // startPosition can't be null, should never go here.
if (startPosition == null) {
callback.findEntryFailed(new ManagedLedgerException("Couldn't find
start position"),
Optional.empty(), ctx);
return;
}
+ // Calculate the end position
+ PositionImpl endPosition = end == null ? ledger.lastConfirmedEntry :
end;
+ endPosition = endPosition.compareTo(ledger.lastConfirmedEntry) > 0 ?
ledger.lastConfirmedEntry : endPosition;
+ // Calculate the number of entries between the startPosition and
endPosition
+ long max = 0;
+ if (startPosition.compareTo(endPosition) <= 0) {
+ max = ledger.getNumberOfEntries(Range.closed(startPosition,
endPosition));
+ }
+
+ if (max <= 0) {
+ callback.findEntryComplete(null, ctx);
+ return;
+ }
+
+ OpFindNewest op;
if (isFindFromLedger) {
op = new OpFindNewest(this.ledger, startPosition, condition, max,
callback, ctx);
} else {
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index a4bc7fad3ed..24e3da16e77 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -4902,5 +4902,300 @@ public class ManagedCursorTest extends
MockedBookKeeperTestCase {
ml.delete();
}
+ static PositionImpl transferToPosImpl(Position position) {
+ return new PositionImpl(position.getLedgerId(), position.getEntryId());
+ }
+
+ @Test
+ public void
testFindNewestMatching_SearchAllAvailableEntries_ByStartAndEnd() throws
Exception {
+ ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
+ managedLedgerConfig.setMaxEntriesPerLedger(2);
+ managedLedgerConfig.setMinimumRolloverTime(0, TimeUnit.MILLISECONDS);
+ @Cleanup
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl)
factory.open("testFindNewestMatching_SearchAllAvailableEntries_ByStartAndEnd",
managedLedgerConfig);
+ @Cleanup
+ ManagedCursor managedCursor = ledger.openCursor("test");
+
+ Position position = ledger.addEntry("test".getBytes(Encoding));
+ Position position1 = ledger.addEntry("test1".getBytes(Encoding));
+ Position position2 = ledger.addEntry("test2".getBytes(Encoding));
+ Position position3 = ledger.addEntry("test3".getBytes(Encoding));
+
+ Predicate<Entry> condition = entry -> {
+ try {
+ Position p = entry.getPosition();
+ return
transferToPosImpl(p).compareTo(transferToPosImpl(position1)) <= 0;
+ } finally {
+ entry.release();
+ }
+ };
+
+ // find the newest entry with start and end position
+ AtomicBoolean failed = new AtomicBoolean(false);
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicReference<Position> positionRef = new AtomicReference<>();
+
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries,
condition, transferToPosImpl(position), transferToPosImpl(position2), new
AsyncCallbacks.FindEntryCallback() {
+ @Override
+ public void findEntryComplete(Position position, Object ctx) {
+ positionRef.set(position);
+ latch.countDown();
+ }
+
+ @Override
+ public void findEntryFailed(ManagedLedgerException exception,
Optional<Position> failedReadPosition, Object ctx) {
+ failed.set(true);
+ latch.countDown();
+ }
+ }, null, true);
+
+ latch.await();
+ assertFalse(failed.get());
+ assertNotNull(positionRef.get());
+ assertEquals(positionRef.get(), position1);
+
+ // find the newest entry with start
+ AtomicBoolean failed1 = new AtomicBoolean(false);
+ CountDownLatch latch1 = new CountDownLatch(1);
+ AtomicReference<Position> positionRef1 = new AtomicReference<>();
+
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries,
condition, transferToPosImpl(position), null, new
AsyncCallbacks.FindEntryCallback() {
+ @Override
+ public void findEntryComplete(Position position, Object ctx) {
+ positionRef1.set(position);
+ latch1.countDown();
+ }
+
+ @Override
+ public void findEntryFailed(ManagedLedgerException exception,
Optional<Position> failedReadPosition, Object ctx) {
+ failed1.set(true);
+ latch1.countDown();
+ }
+ }, null, true);
+ latch1.await();
+ assertFalse(failed1.get());
+ assertNotNull(positionRef1.get());
+ assertEquals(positionRef1.get(), position1);
+
+ // find the newest entry with end
+ AtomicBoolean failed2 = new AtomicBoolean(false);
+ CountDownLatch latch2 = new CountDownLatch(1);
+ AtomicReference<Position> positionRef2 = new AtomicReference<>();
+
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries,
condition, null, transferToPosImpl(position2), new
AsyncCallbacks.FindEntryCallback() {
+ @Override
+ public void findEntryComplete(Position position, Object ctx) {
+ positionRef2.set(position);
+ latch2.countDown();
+ }
+
+ @Override
+ public void findEntryFailed(ManagedLedgerException exception,
Optional<Position> failedReadPosition, Object ctx) {
+ failed2.set(true);
+ latch2.countDown();
+ }
+ }, null, true);
+ latch2.await();
+ assertFalse(failed2.get());
+ assertNotNull(positionRef2.get());
+ assertEquals(positionRef2.get(), position1);
+
+ // find the newest entry without start and end position
+ AtomicBoolean failed3 = new AtomicBoolean(false);
+ CountDownLatch latch3 = new CountDownLatch(1);
+ AtomicReference<Position> positionRef3 = new AtomicReference<>();
+
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries,
condition, null, null, new AsyncCallbacks.FindEntryCallback() {
+ @Override
+ public void findEntryComplete(Position position, Object ctx) {
+ positionRef3.set(position);
+ latch3.countDown();
+ }
+
+ @Override
+ public void findEntryFailed(ManagedLedgerException exception,
Optional<Position> failedReadPosition, Object ctx) {
+ failed3.set(true);
+ latch3.countDown();
+ }
+ }, null, true);
+ latch3.await();
+ assertFalse(failed3.get());
+ assertNotNull(positionRef3.get());
+ assertEquals(positionRef3.get(), position1);
+
+ // find position3
+ AtomicBoolean failed4 = new AtomicBoolean(false);
+ CountDownLatch latch4 = new CountDownLatch(1);
+ AtomicReference<Position> positionRef4 = new AtomicReference<>();
+
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries,
entry -> {
+ try {
+ Position p = entry.getPosition();
+ return
transferToPosImpl(p).compareTo(transferToPosImpl(position3)) <= 0;
+ } finally {
+ entry.release();
+ }
+ }, transferToPosImpl(position3), transferToPosImpl(position3), new
AsyncCallbacks.FindEntryCallback() {
+ @Override
+ public void findEntryComplete(Position position, Object ctx) {
+ positionRef4.set(position);
+ latch4.countDown();
+ }
+
+ @Override
+ public void findEntryFailed(ManagedLedgerException exception,
Optional<Position> failedReadPosition, Object ctx) {
+ failed4.set(true);
+ latch4.countDown();
+ }
+ }, null, true);
+ latch4.await();
+ assertFalse(failed4.get());
+ assertNotNull(positionRef4.get());
+ assertEquals(positionRef4.get(), position3);
+ }
+
+
+ @Test
+ public void testFindNewestMatching_SearchActiveEntries_ByStartAndEnd()
throws Exception {
+ ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
+ managedLedgerConfig.setMaxEntriesPerLedger(2);
+ managedLedgerConfig.setMinimumRolloverTime(0, TimeUnit.MILLISECONDS);
+ @Cleanup
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl)
factory.open("testFindNewestMatching_SearchActiveEntries_ByStartAndEnd",
managedLedgerConfig);
+ @Cleanup
+ ManagedCursorImpl managedCursor = (ManagedCursorImpl)
ledger.openCursor("test");
+
+ Position position = ledger.addEntry("test".getBytes(Encoding));
+ Position position1 = ledger.addEntry("test1".getBytes(Encoding));
+ Position position2 = ledger.addEntry("test2".getBytes(Encoding));
+ Position position3 = ledger.addEntry("test3".getBytes(Encoding));
+ Position position4 = ledger.addEntry("test4".getBytes(Encoding));
+ managedCursor.markDelete(position1);
+ assertEquals(managedCursor.getNumberOfEntries(), 3);
+
+ Predicate<Entry> condition = entry -> {
+ try {
+ PositionImpl p = (PositionImpl) entry.getPosition();
+ return
transferToPosImpl(p).compareTo(transferToPosImpl(position3)) <= 0;
+ } finally {
+ entry.release();
+ }
+ };
+
+ // find the newest entry with start and end position
+ AtomicBoolean failed = new AtomicBoolean(false);
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicReference<Position> positionRef = new AtomicReference<>();
+
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries,
condition, transferToPosImpl(position2), transferToPosImpl(position4), new
AsyncCallbacks.FindEntryCallback() {
+ @Override
+ public void findEntryComplete(Position position, Object ctx) {
+ positionRef.set(position);
+ latch.countDown();
+ }
+
+ @Override
+ public void findEntryFailed(ManagedLedgerException exception,
Optional<Position> failedReadPosition, Object ctx) {
+ failed.set(true);
+ latch.countDown();
+ }
+ }, null, true);
+ latch.await();
+ assertFalse(failed.get());
+ assertNotNull(positionRef.get());
+ assertEquals(positionRef.get(), position3);
+
+ // find the newest entry with start
+ AtomicBoolean failed1 = new AtomicBoolean(false);
+ CountDownLatch latch1 = new CountDownLatch(1);
+ AtomicReference<Position> positionRef1 = new AtomicReference<>();
+
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries,
condition, transferToPosImpl(position2), null, new
AsyncCallbacks.FindEntryCallback() {
+ @Override
+ public void findEntryComplete(Position position, Object ctx) {
+ positionRef1.set(position);
+ latch1.countDown();
+ }
+
+ @Override
+ public void findEntryFailed(ManagedLedgerException exception,
Optional<Position> failedReadPosition, Object ctx) {
+ failed1.set(true);
+ latch1.countDown();
+ }
+ }, null, true);
+
+ latch1.await();
+ assertFalse(failed1.get());
+ assertNotNull(positionRef1.get());
+ assertEquals(positionRef1.get(), position3);
+
+ // find the newest entry with end
+ AtomicBoolean failed2 = new AtomicBoolean(false);
+ CountDownLatch latch2 = new CountDownLatch(1);
+ AtomicReference<Position> positionRef2 = new AtomicReference<>();
+
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries,
condition, null, transferToPosImpl(position4), new
AsyncCallbacks.FindEntryCallback() {
+ @Override
+ public void findEntryComplete(Position position, Object ctx) {
+ positionRef2.set(position);
+ latch2.countDown();
+ }
+
+ @Override
+ public void findEntryFailed(ManagedLedgerException exception,
Optional<Position> failedReadPosition, Object ctx) {
+ failed2.set(true);
+ latch2.countDown();
+ }
+ }, null, true);
+
+ latch2.await();
+ assertFalse(failed2.get());
+ assertNotNull(positionRef2.get());
+ assertEquals(positionRef2.get(), position3);
+
+ // find the newest entry without start and end position
+ AtomicBoolean failed3 = new AtomicBoolean(false);
+ CountDownLatch latch3 = new CountDownLatch(1);
+ AtomicReference<Position> positionRef3 = new AtomicReference<>();
+
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries,
condition, null, null, new AsyncCallbacks.FindEntryCallback() {
+ @Override
+ public void findEntryComplete(Position position, Object ctx) {
+ positionRef3.set(position);
+ latch3.countDown();
+ }
+
+ @Override
+ public void findEntryFailed(ManagedLedgerException exception,
Optional<Position> failedReadPosition, Object ctx) {
+ failed3.set(true);
+ latch3.countDown();
+ }
+ }, null, true);
+ latch3.await();
+ assertFalse(failed3.get());
+ assertNotNull(positionRef3.get());
+ assertEquals(positionRef3.get(), position3);
+
+ // find position4
+ AtomicBoolean failed4 = new AtomicBoolean(false);
+ CountDownLatch latch4 = new CountDownLatch(1);
+ AtomicReference<Position> positionRef4 = new AtomicReference<>();
+
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries,
entry -> {
+ try {
+ Position p = entry.getPosition();
+ return
transferToPosImpl(p).compareTo(transferToPosImpl(position4)) <= 0;
+ } finally {
+ entry.release();
+ }
+ }, transferToPosImpl(position4), transferToPosImpl(position4), new
AsyncCallbacks.FindEntryCallback() {
+ @Override
+ public void findEntryComplete(Position position, Object ctx) {
+ positionRef4.set(position);
+ latch4.countDown();
+ }
+
+ @Override
+ public void findEntryFailed(ManagedLedgerException exception,
Optional<Position> failedReadPosition, Object ctx) {
+ failed4.set(true);
+ latch4.countDown();
+ }
+ }, null, true);
+ latch4.await();
+ assertFalse(failed4.get());
+ assertNotNull(positionRef4.get());
+ assertEquals(positionRef4.get(), position4);
+ }
+
private static final Logger log =
LoggerFactory.getLogger(ManagedCursorTest.class);
}
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 039b27e32b6..57ea9838b2d 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2167,6 +2167,24 @@ public class ServiceConfiguration implements
PulsarConfiguration {
doc = "Max time before triggering a rollover on a cursor ledger"
)
private int managedLedgerCursorRolloverTimeInSeconds = 14400;
+
+ @FieldContext(
+ category = CATEGORY_STORAGE_ML,
+ dynamic = true,
+ doc = "When resetting a subscription by timestamp, the broker will
use the"
+ + " ledger closing timestamp metadata to determine the
range of ledgers"
+ + " to search for the message where the subscription
position is reset to. "
+ + " Since by default, the search condition is based on the
message publish time provided by the "
+ + " client at the publish time, there will be some clock
skew between the ledger closing timestamp "
+ + " metadata and the publish time."
+ + " This configuration is used to set the max clock skew
between the ledger closing"
+ + " timestamp and the message publish time for finding the
range of ledgers to open for searching."
+ + " The default value is 60000 milliseconds (60 seconds).
When set to -1, the broker will not"
+ + " use the ledger closing timestamp metadata to determine
the range of ledgers to search for the"
+ + " message."
+ )
+ private int managedLedgerCursorResetLedgerCloseTimestampMaxClockSkewMillis
= 60000;
+
@FieldContext(
category = CATEGORY_STORAGE_ML,
doc = "Max number of `acknowledgment holes` that are going to be
persistently stored.\n\n"
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
index eca31ec230a..f60e6047f56 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
@@ -22,7 +22,6 @@ import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import javax.annotation.Nonnull;
import
org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException;
import org.apache.pulsar.client.util.RetryUtil;
import org.apache.pulsar.common.classification.InterfaceStability;
@@ -32,7 +31,7 @@ import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.common.util.BackoffBuilder;
import org.apache.pulsar.common.util.FutureUtil;
-import org.jetbrains.annotations.NotNull;
+import org.jspecify.annotations.NonNull;
/**
* Topic policies service.
@@ -121,8 +120,8 @@ public interface TopicPoliciesService extends AutoCloseable
{
* @return A CompletableFuture containing an Optional of TopicPolicies.
* @throws NullPointerException If the topicName is null.
*/
- @Nonnull
- CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(@Nonnull
TopicName topicName, boolean isGlobal);
+ @NonNull
+ CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(@NonNull
TopicName topicName, boolean isGlobal);
/**
* Asynchronously retrieves topic policies.
@@ -134,8 +133,8 @@ public interface TopicPoliciesService extends AutoCloseable
{
* @return A CompletableFuture containing an Optional of TopicPolicies.
* @throws NullPointerException If the topicName is null.
*/
- @Nonnull
- CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(@Nonnull
TopicName topicName);
+ @NonNull
+ CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(@NonNull
TopicName topicName);
/**
* Get policies for a topic without cache async.
@@ -190,16 +189,16 @@ public interface TopicPoliciesService extends
AutoCloseable {
return null;
}
- @NotNull
+ @NonNull
@Override
- public CompletableFuture<Optional<TopicPolicies>>
getTopicPoliciesAsync(@NotNull TopicName topicName,
+ public CompletableFuture<Optional<TopicPolicies>>
getTopicPoliciesAsync(@NonNull TopicName topicName,
boolean isGlobal) {
return CompletableFuture.completedFuture(Optional.empty());
}
- @NotNull
+ @NonNull
@Override
- public CompletableFuture<Optional<TopicPolicies>>
getTopicPoliciesAsync(@NotNull TopicName topicName) {
+ public CompletableFuture<Optional<TopicPolicies>>
getTopicPoliciesAsync(@NonNull TopicName topicName) {
return CompletableFuture.completedFuture(Optional.empty());
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java
index 08273155e4c..ae08cb7b28d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java
@@ -25,6 +25,9 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.Codec;
@@ -37,6 +40,7 @@ import org.slf4j.LoggerFactory;
public class PersistentMessageFinder implements
AsyncCallbacks.FindEntryCallback {
private final ManagedCursor cursor;
private final String subName;
+ private final int ledgerCloseTimestampMaxClockSkewMillis;
private final String topicName;
private long timestamp = 0;
@@ -48,19 +52,23 @@ public class PersistentMessageFinder implements
AsyncCallbacks.FindEntryCallback
AtomicIntegerFieldUpdater
.newUpdater(PersistentMessageFinder.class,
"messageFindInProgress");
- public PersistentMessageFinder(String topicName, ManagedCursor cursor) {
+ public PersistentMessageFinder(String topicName, ManagedCursor cursor, int
ledgerCloseTimestampMaxClockSkewMillis) {
this.topicName = topicName;
this.cursor = cursor;
this.subName = Codec.decode(cursor.getName());
+ this.ledgerCloseTimestampMaxClockSkewMillis =
ledgerCloseTimestampMaxClockSkewMillis;
}
public void findMessages(final long timestamp,
AsyncCallbacks.FindEntryCallback callback) {
- this.timestamp = timestamp;
if (messageFindInProgressUpdater.compareAndSet(this, FALSE, TRUE)) {
+ this.timestamp = timestamp;
if (log.isDebugEnabled()) {
log.debug("[{}] Starting message position find at timestamp
{}", subName, timestamp);
}
-
+ Pair<PositionImpl, PositionImpl> range =
+
getFindPositionRange(cursor.getManagedLedger().getLedgersInfo().values(),
+ (PositionImpl)
cursor.getManagedLedger().getLastConfirmedEntry(), timestamp,
+ ledgerCloseTimestampMaxClockSkewMillis);
cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries,
entry -> {
try {
long entryTimestamp =
Commands.getEntryTimestamp(entry.getDataBuffer());
@@ -71,7 +79,7 @@ public class PersistentMessageFinder implements
AsyncCallbacks.FindEntryCallback
entry.release();
}
return false;
- }, this, callback, true);
+ }, range.getLeft(), range.getRight(), this, callback, true);
} else {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Ignore message position find scheduled
task, last find is still running", topicName,
@@ -83,6 +91,59 @@ public class PersistentMessageFinder implements
AsyncCallbacks.FindEntryCallback
}
}
+ public static Pair<PositionImpl, PositionImpl>
getFindPositionRange(Iterable<LedgerInfo> ledgerInfos,
+ PositionImpl
lastConfirmedEntry, long targetTimestamp,
+ int
ledgerCloseTimestampMaxClockSkewMillis) {
+ if (ledgerCloseTimestampMaxClockSkewMillis < 0) {
+ // this feature is disabled when the value is negative
+ return Pair.of(null, null);
+ }
+
+ long targetTimestampMin = targetTimestamp -
ledgerCloseTimestampMaxClockSkewMillis;
+ long targetTimestampMax = targetTimestamp +
ledgerCloseTimestampMaxClockSkewMillis;
+
+ PositionImpl start = null;
+ PositionImpl end = null;
+
+ LedgerInfo secondToLastLedgerInfo = null;
+ LedgerInfo lastLedgerInfo = null;
+ for (LedgerInfo info : ledgerInfos) {
+ if (!info.hasTimestamp()) {
+ // unexpected case, don't set start and end
+ return Pair.of(null, null);
+ }
+ secondToLastLedgerInfo = lastLedgerInfo;
+ lastLedgerInfo = info;
+ long closeTimestamp = info.getTimestamp();
+ // For an open ledger, closeTimestamp is 0
+ if (closeTimestamp == 0) {
+ end = null;
+ break;
+ }
+ if (closeTimestamp <= targetTimestampMin) {
+ start = new PositionImpl(info.getLedgerId(), 0);
+ } else if (closeTimestamp > targetTimestampMax) {
+ // If the close timestamp is greater than the timestamp
+ end = new PositionImpl(info.getLedgerId(), info.getEntries() -
1);
+ break;
+ }
+ }
+ // If the second-to-last ledger's close timestamp is less than the
target timestamp, then start from the
+ // first entry of the last ledger when there are confirmed entries in
the ledger
+ if (lastLedgerInfo != null && secondToLastLedgerInfo != null
+ && secondToLastLedgerInfo.getTimestamp() > 0
+ && secondToLastLedgerInfo.getTimestamp() < targetTimestampMin)
{
+ PositionImpl firstPositionInLedger = new
PositionImpl(lastLedgerInfo.getLedgerId(), 0);
+ if (lastConfirmedEntry != null
+ && lastConfirmedEntry.compareTo(firstPositionInLedger) >=
0) {
+ start = firstPositionInLedger;
+ } else {
+ start = lastConfirmedEntry;
+ }
+ }
+ return Pair.of(start, end);
+ }
+
private static final Logger log =
LoggerFactory.getLogger(PersistentMessageFinder.class);
@Override
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 4ae289d7bae..72b47fdbf96 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -136,6 +136,7 @@ public class PersistentSubscription extends
AbstractSubscription {
private volatile CompletableFuture<Void> fenceFuture;
private volatile CompletableFuture<Void> inProgressResetCursorFuture;
private volatile Boolean replicatedControlled;
+ private final ServiceConfiguration config;
static Map<String, Long> getBaseCursorProperties(Boolean isReplicated) {
return isReplicated != null && isReplicated ?
REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES :
@@ -154,6 +155,7 @@ public class PersistentSubscription extends
AbstractSubscription {
public PersistentSubscription(PersistentTopic topic, String
subscriptionName, ManagedCursor cursor,
Boolean replicated, Map<String, String>
subscriptionProperties) {
this.topic = topic;
+ this.config = topic.getBrokerService().getPulsar().getConfig();
this.cursor = cursor;
this.topicName = topic.getName();
this.subName = subscriptionName;
@@ -164,7 +166,7 @@ public class PersistentSubscription extends
AbstractSubscription {
}
this.subscriptionProperties = MapUtils.isEmpty(subscriptionProperties)
? Collections.emptyMap() :
Collections.unmodifiableMap(subscriptionProperties);
- if
(topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled()
+ if (config.isTransactionCoordinatorEnabled()
&& !isEventSystemTopic(TopicName.get(topicName))
&& !ExtensibleLoadManagerImpl.isInternalTopic(topicName)) {
this.pendingAckHandle = new PendingAckHandleImpl(this);
@@ -201,7 +203,6 @@ public class PersistentSubscription extends
AbstractSubscription {
public boolean setReplicated(boolean replicated) {
replicatedControlled = replicated;
- ServiceConfiguration config =
topic.getBrokerService().getPulsar().getConfig();
if (!replicated || !config.isEnableReplicatedSubscriptions()) {
this.replicatedSubscriptionSnapshotCache = null;
@@ -409,7 +410,7 @@ public class PersistentSubscription extends
AbstractSubscription {
log.debug("[{}][{}] Individual acks on {}", topicName,
subName, positions);
}
cursor.asyncDelete(positions, deleteCallback,
previousMarkDeletePosition);
- if
(topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled())
{
+ if (config.isTransactionCoordinatorEnabled()) {
positions.forEach(position -> {
if (((ManagedCursorImpl)
cursor).isMessageDeleted(position)) {
pendingAckHandle.clearIndividualPosition(position);
@@ -585,10 +586,9 @@ public class PersistentSubscription extends
AbstractSubscription {
final EntryFilterSupport entryFilterSupport = dispatcher != null
? (EntryFilterSupport) dispatcher : new
EntryFilterSupport(this);
// we put some hard limits on the scan, in order to prevent denial of
services
- ServiceConfiguration configuration =
topic.getBrokerService().getPulsar().getConfiguration();
- long maxEntries = configuration.getSubscriptionBacklogScanMaxEntries();
- long timeOutMs = configuration.getSubscriptionBacklogScanMaxTimeMs();
- int batchSize = configuration.getDispatcherMaxReadBatchSize();
+ long maxEntries = config.getSubscriptionBacklogScanMaxEntries();
+ long timeOutMs = config.getSubscriptionBacklogScanMaxTimeMs();
+ int batchSize = config.getDispatcherMaxReadBatchSize();
AtomicReference<Position> firstPosition = new AtomicReference<>();
AtomicReference<Position> lastPosition = new AtomicReference<>();
final Predicate<Entry> condition = entry -> {
@@ -763,7 +763,8 @@ public class PersistentSubscription extends
AbstractSubscription {
@Override
public CompletableFuture<Void> resetCursor(long timestamp) {
CompletableFuture<Void> future = new CompletableFuture<>();
- PersistentMessageFinder persistentMessageFinder = new
PersistentMessageFinder(topicName, cursor);
+ PersistentMessageFinder persistentMessageFinder = new
PersistentMessageFinder(topicName, cursor,
+
config.getManagedLedgerCursorResetLedgerCloseTimestampMaxClockSkewMillis());
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Resetting subscription to timestamp {}",
topicName, subName, timestamp);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
index 0972c9098b5..806bd797fc8 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
@@ -59,6 +59,7 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl;
import
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.commons.lang3.tuple.Pair;
import
org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor;
import org.apache.pulsar.broker.service.persistent.PersistentMessageFinder;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
@@ -138,7 +139,7 @@ public class PersistentMessageFinderTest extends
MockedBookKeeperTestCase {
}
CompletableFuture<Void> findMessage(final Result result, final
ManagedCursor c1, final long timestamp) {
- PersistentMessageFinder messageFinder = new
PersistentMessageFinder("topicname", c1);
+ PersistentMessageFinder messageFinder = new
PersistentMessageFinder("topicname", c1, 0);
final CompletableFuture<Void> future = new CompletableFuture<>();
messageFinder.findMessages(timestamp, new
AsyncCallbacks.FindEntryCallback() {
@@ -217,7 +218,7 @@ public class PersistentMessageFinderTest extends
MockedBookKeeperTestCase {
assertNotEquals(result.position, null);
assertEquals(result.position, lastPosition);
- PersistentMessageFinder messageFinder = new
PersistentMessageFinder("topicname", c1);
+ PersistentMessageFinder messageFinder = new
PersistentMessageFinder("topicname", c1, 0);
final AtomicBoolean ex = new AtomicBoolean(false);
messageFinder.findEntryFailed(new ManagedLedgerException("failed"),
Optional.empty(),
new AsyncCallbacks.FindEntryCallback() {
@@ -589,4 +590,241 @@ public class PersistentMessageFinderTest extends
MockedBookKeeperTestCase {
resetCursorData.setExcluded(true);
System.out.println(Entity.entity(resetCursorData,
MediaType.APPLICATION_JSON));
}
+
+ @Test
+ public void testGetFindPositionRange_EmptyLedgerInfos() {
+ List<LedgerInfo> ledgerInfos = new ArrayList<>();
+ PositionImpl lastConfirmedEntry = null;
+ long targetTimestamp = 2000;
+ Pair<PositionImpl, PositionImpl> range =
+ PersistentMessageFinder.getFindPositionRange(ledgerInfos,
lastConfirmedEntry, targetTimestamp, 0);
+
+ assertNotNull(range);
+ assertNull(range.getLeft());
+ assertNull(range.getRight());
+ }
+
+ @Test
+ public void testGetFindPositionRange_AllTimestampsLessThanTarget() {
+ List<LedgerInfo> ledgerInfos = new ArrayList<>();
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(1).setEntries(10).setTimestamp(1000).build());
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(2).setEntries(10).setTimestamp(1500).build());
+ PositionImpl lastConfirmedEntry = new PositionImpl(2, 9);
+
+ long targetTimestamp = 2000;
+ Pair<PositionImpl, PositionImpl> range =
PersistentMessageFinder.getFindPositionRange(ledgerInfos,
+ lastConfirmedEntry, targetTimestamp, 0);
+
+ assertNotNull(range);
+ assertNotNull(range.getLeft());
+ assertNull(range.getRight());
+ assertEquals(range.getLeft(), new PositionImpl(2, 0));
+ }
+
+ @Test
+ public void testGetFindPositionRange_LastTimestampIsZero() {
+ List<LedgerInfo> ledgerInfos = new ArrayList<>();
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(1).setEntries(10).setTimestamp(1000).build());
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(2).setEntries(10).setTimestamp(1500).build());
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(3).setEntries(10).setTimestamp(0).build());
+ PositionImpl lastConfirmedEntry = new PositionImpl(3, 5);
+
+ long targetTimestamp = 2000;
+ Pair<PositionImpl, PositionImpl> range =
PersistentMessageFinder.getFindPositionRange(ledgerInfos,
+ lastConfirmedEntry, targetTimestamp, 0);
+
+ assertNotNull(range);
+ assertNotNull(range.getLeft());
+ assertNull(range.getRight());
+ assertEquals(range.getLeft(), new PositionImpl(3, 0));
+ }
+
+ @Test
+ public void testGetFindPositionRange_LastTimestampIsZeroWithNoEntries() {
+ List<LedgerInfo> ledgerInfos = new ArrayList<>();
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(1).setEntries(10).setTimestamp(1000).build());
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(2).setEntries(10).setTimestamp(1500).build());
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(3).setEntries(10).setTimestamp(0).build());
+ PositionImpl lastConfirmedEntry = new PositionImpl(2, 9);
+
+ long targetTimestamp = 2000;
+ Pair<PositionImpl, PositionImpl> range =
PersistentMessageFinder.getFindPositionRange(ledgerInfos,
+ lastConfirmedEntry, targetTimestamp, 0);
+
+ assertNotNull(range);
+ assertNotNull(range.getLeft());
+ assertNull(range.getRight());
+ assertEquals(range.getLeft(), new PositionImpl(2, 9));
+ }
+
+ @Test
+ public void testGetFindPositionRange_AllTimestampsGreaterThanTarget() {
+ List<LedgerInfo> ledgerInfos = new ArrayList<>();
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(1).setEntries(10).setTimestamp(3000).build());
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(2).setEntries(10).setTimestamp(4000).build());
+ PositionImpl lastConfirmedEntry = new PositionImpl(2, 9);
+
+ long targetTimestamp = 2000;
+ Pair<PositionImpl, PositionImpl> range =
PersistentMessageFinder.getFindPositionRange(ledgerInfos,
+ lastConfirmedEntry, targetTimestamp, 0);
+
+ assertNotNull(range);
+ assertNull(range.getLeft());
+ assertNotNull(range.getRight());
+ assertEquals(range.getRight(), new PositionImpl(1, 9));
+ }
+
+ @Test
+ public void testGetFindPositionRange_MixedTimestamps() {
+ List<LedgerInfo> ledgerInfos = new ArrayList<>();
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(1).setEntries(10).setTimestamp(1000).build());
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(2).setEntries(10).setTimestamp(2000).build());
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(3).setEntries(10).setTimestamp(3000).build());
+ PositionImpl lastConfirmedEntry = new PositionImpl(3, 9);
+
+ long targetTimestamp = 2500;
+ Pair<PositionImpl, PositionImpl> range =
PersistentMessageFinder.getFindPositionRange(ledgerInfos,
+ lastConfirmedEntry, targetTimestamp, 0);
+
+ assertNotNull(range);
+ assertNotNull(range.getLeft());
+ assertNotNull(range.getRight());
+ assertEquals(range.getLeft(), new PositionImpl(3, 0));
+ assertEquals(range.getRight(), new PositionImpl(3, 9));
+ }
+
+ @Test
+ public void testGetFindPositionRange_TimestampAtBoundary() {
+ List<LedgerInfo> ledgerInfos = new ArrayList<>();
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(1).setEntries(10).setTimestamp(1000).build());
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(2).setEntries(10).setTimestamp(2000).build());
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(3).setEntries(10).setTimestamp(3000).build());
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(4).setEntries(10).setTimestamp(4000).build());
+ PositionImpl lastConfirmedEntry = new PositionImpl(4, 9);
+
+ long targetTimestamp = 3000;
+ Pair<PositionImpl, PositionImpl> range =
PersistentMessageFinder.getFindPositionRange(ledgerInfos,
+ lastConfirmedEntry, targetTimestamp, 0);
+
+ assertNotNull(range);
+ assertNotNull(range.getLeft());
+ assertNotNull(range.getRight());
+ assertEquals(range.getLeft(), new PositionImpl(3, 0));
+ // there might be entries in the next ledger with the same timestamp
as the target timestamp, even though
+ // the close timestamp of ledger 3 is equals to the target timestamp
+ assertEquals(range.getRight(), new PositionImpl(4, 9));
+ }
+
+ @Test
+ public void testGetFindPositionRange_ClockSkew() {
+ List<LedgerInfo> ledgerInfos = new ArrayList<>();
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(1).setEntries(10).setTimestamp(1000).build());
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(2).setEntries(10).setTimestamp(2000).build());
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(3).setEntries(10).setTimestamp(2010).build());
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(4).setEntries(10).setTimestamp(4000).build());
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(5).setTimestamp(0).build());
+ PositionImpl lastConfirmedEntry = new PositionImpl(5, 5);
+
+ long targetTimestamp = 2009;
+ Pair<PositionImpl, PositionImpl> range =
PersistentMessageFinder.getFindPositionRange(ledgerInfos,
+ lastConfirmedEntry, targetTimestamp, 10);
+
+ assertNotNull(range);
+ assertNotNull(range.getLeft());
+ assertNotNull(range.getRight());
+ assertEquals(range.getLeft(), new PositionImpl(1, 0));
+ assertEquals(range.getRight(), new PositionImpl(4, 9));
+ }
+
+ @Test
+ public void testGetFindPositionRange_ClockSkewCase2() {
+ List<LedgerInfo> ledgerInfos = new ArrayList<>();
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(1).setEntries(10).setTimestamp(1000).build());
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(2).setEntries(10).setTimestamp(2000).build());
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(3).setEntries(10).setTimestamp(3000).build());
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(4).setEntries(10).setTimestamp(4000).build());
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(5).setTimestamp(0).build());
+ PositionImpl lastConfirmedEntry = new PositionImpl(5, 5);
+
+ long targetTimestamp = 2995;
+ Pair<PositionImpl, PositionImpl> range =
PersistentMessageFinder.getFindPositionRange(ledgerInfos,
+ lastConfirmedEntry, targetTimestamp, 10);
+
+ assertNotNull(range);
+ assertNotNull(range.getLeft());
+ assertNotNull(range.getRight());
+ assertEquals(range.getLeft(), new PositionImpl(2, 0));
+ assertEquals(range.getRight(), new PositionImpl(4, 9));
+ }
+
+ @Test
+ public void testGetFindPositionRange_ClockSkewCase3() {
+ List<LedgerInfo> ledgerInfos = new ArrayList<>();
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(1).setEntries(10).setTimestamp(1000).build());
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(2).setEntries(10).setTimestamp(2000).build());
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(3).setEntries(10).setTimestamp(3000).build());
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(4).setEntries(10).setTimestamp(4000).build());
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(5).setTimestamp(0).build());
+ PositionImpl lastConfirmedEntry = new PositionImpl(5, 5);
+
+ long targetTimestamp = 3005;
+ Pair<PositionImpl, PositionImpl> range =
PersistentMessageFinder.getFindPositionRange(ledgerInfos,
+ lastConfirmedEntry, targetTimestamp, 10);
+
+ assertNotNull(range);
+ assertNotNull(range.getLeft());
+ assertNotNull(range.getRight());
+ assertEquals(range.getLeft(), new PositionImpl(2, 0));
+ assertEquals(range.getRight(), new PositionImpl(4, 9));
+ }
+
+ @Test
+ public void
testGetFindPositionRange_FeatureDisabledWithNegativeClockSkew() {
+ List<LedgerInfo> ledgerInfos = new ArrayList<>();
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(1).setEntries(10).setTimestamp(1000).build());
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(2).setEntries(10).setTimestamp(2000).build());
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(3).setEntries(10).setTimestamp(2010).build());
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(4).setEntries(10).setTimestamp(4000).build());
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(5).setTimestamp(0).build());
+ PositionImpl lastConfirmedEntry = new PositionImpl(5, 5);
+
+ long targetTimestamp = 2009;
+ Pair<PositionImpl, PositionImpl> range =
PersistentMessageFinder.getFindPositionRange(ledgerInfos,
+ lastConfirmedEntry, targetTimestamp, -1);
+
+ assertNotNull(range);
+ assertNull(range.getLeft());
+ assertNull(range.getRight());
+ }
+
+ @Test
+ public void testGetFindPositionRange_SingleLedger() {
+ List<LedgerInfo> ledgerInfos = new ArrayList<>();
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(1).setTimestamp(0).build());
+ PositionImpl lastConfirmedEntry = new PositionImpl(1, 5);
+
+ long targetTimestamp = 2500;
+ Pair<PositionImpl, PositionImpl> range =
PersistentMessageFinder.getFindPositionRange(ledgerInfos,
+ lastConfirmedEntry, targetTimestamp, 0);
+
+ assertNotNull(range);
+ assertNull(range.getLeft());
+ assertNull(range.getRight());
+ }
+
+ @Test
+ public void testGetFindPositionRange_SingleClosedLedger() {
+ List<LedgerInfo> ledgerInfos = new ArrayList<>();
+
ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(1).setEntries(10).setTimestamp(1000).build());
+ PositionImpl lastConfirmedEntry = new PositionImpl(1, 9);
+
+ long targetTimestamp = 2500;
+ Pair<PositionImpl, PositionImpl> range =
PersistentMessageFinder.getFindPositionRange(ledgerInfos,
+ lastConfirmedEntry, targetTimestamp, 0);
+
+ assertNotNull(range);
+ assertNotNull(range.getLeft());
+ assertNull(range.getRight());
+ assertEquals(range.getLeft(), new PositionImpl(1, 0));
+ }
}
diff --git
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java
index 8f52d20c5ee..756e1a30f92 100644
---
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java
+++
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java
@@ -20,6 +20,7 @@ package org.apache.bookkeeper.mledger.offload.jcloud.impl;
import io.netty.buffer.ByteBuf;
import java.util.Map;
+import java.util.NavigableMap;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate;
@@ -381,6 +382,11 @@ public class MockManagedLedger implements ManagedLedger {
// no-op
}
+ @Override
+ public NavigableMap<Long, LedgerInfo> getLedgersInfo() {
+ return null;
+ }
+
@Override
public CompletableFuture<Position> asyncMigrate() {
// no-op