This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 63450269932 [improve][broker] Improve replicated subscription snapshot
cache so that subscriptions can be replicated when mark delete position update
is not frequent (#25044)
63450269932 is described below
commit 634502699321469a6816d12a09081d3f63333bba
Author: Lari Hotari <[email protected]>
AuthorDate: Fri Dec 12 12:47:41 2025 +0200
[improve][broker] Improve replicated subscription snapshot cache so that
subscriptions can be replicated when mark delete position update is not
frequent (#25044)
---
conf/broker.conf | 2 +-
.../apache/pulsar/broker/ServiceConfiguration.java | 2 +-
.../service/persistent/PersistentSubscription.java | 6 +-
.../ReplicatedSubscriptionSnapshotCache.java | 354 +++++++++++++++++++--
.../ReplicatedSubscriptionsController.java | 18 +-
.../pulsar/broker/service/PersistentTopicTest.java | 2 +
.../ReplicatedSubscriptionSnapshotCacheTest.java | 241 +++++++++++++-
7 files changed, 571 insertions(+), 54 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index 997556c1740..e4923172d13 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -735,7 +735,7 @@ replicatedSubscriptionsSnapshotFrequencyMillis=1000
replicatedSubscriptionsSnapshotTimeoutSeconds=30
# Max number of snapshot to be cached per subscription.
-replicatedSubscriptionsSnapshotMaxCachedPerSubscription=10
+replicatedSubscriptionsSnapshotMaxCachedPerSubscription=30
# Max memory size for broker handling messages sending from producers.
# If the processing message size exceed this value, broker will stop read data
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index c6bedcfd01d..e5b7f0e458d 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1548,7 +1548,7 @@ public class ServiceConfiguration implements
PulsarConfiguration {
@FieldContext(
category = CATEGORY_SERVER,
doc = "Max number of snapshot to be cached per subscription.")
- private int replicatedSubscriptionsSnapshotMaxCachedPerSubscription = 10;
+ private int replicatedSubscriptionsSnapshotMaxCachedPerSubscription = 30;
@FieldContext(
category = CATEGORY_SERVER,
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index a0be5c7c945..457bae5e69c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -209,7 +209,8 @@ public class PersistentSubscription extends
AbstractSubscription {
this.replicatedSubscriptionSnapshotCache = null;
} else if (this.replicatedSubscriptionSnapshotCache == null) {
this.replicatedSubscriptionSnapshotCache = new
ReplicatedSubscriptionSnapshotCache(subName,
-
config.getReplicatedSubscriptionsSnapshotMaxCachedPerSubscription());
+
config.getReplicatedSubscriptionsSnapshotMaxCachedPerSubscription(),
+ getCursor().getManagedLedger()::getNumberOfEntries);
}
if (this.cursor != null) {
@@ -570,7 +571,8 @@ public class PersistentSubscription extends
AbstractSubscription {
private void handleReplicatedSubscriptionsUpdate(Position
markDeletePosition) {
ReplicatedSubscriptionSnapshotCache snapshotCache =
this.replicatedSubscriptionSnapshotCache;
if (snapshotCache != null) {
- ReplicatedSubscriptionsSnapshot snapshot =
snapshotCache.advancedMarkDeletePosition(markDeletePosition);
+ ReplicatedSubscriptionSnapshotCache.SnapshotResult snapshot =
snapshotCache
+ .advancedMarkDeletePosition(markDeletePosition);
if (snapshot != null) {
topic.getReplicatedSubscriptionController()
.ifPresent(c -> c.localSubscriptionUpdated(subName,
snapshot));
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCache.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCache.java
index 6b4e60dc63b..2e5dbbe0c78 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCache.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCache.java
@@ -18,14 +18,19 @@
*/
package org.apache.pulsar.broker.service.persistent;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.TreeMap;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Range;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.function.ToLongFunction;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.pulsar.common.api.proto.MarkersMessageIdData;
import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot;
+import org.apache.pulsar.common.util.StringInterner;
/**
* Store the last N snapshots that were scanned by a particular subscription.
@@ -33,66 +38,357 @@ import
org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot;
@Slf4j
public class ReplicatedSubscriptionSnapshotCache {
private final String subscription;
- private final NavigableMap<Position, ReplicatedSubscriptionsSnapshot>
snapshots;
+ private final ToLongFunction<Range<Position>> distanceFunction;
private final int maxSnapshotToCache;
+ private SnapshotEntry head;
+ private SnapshotEntry tail;
+ private int numberOfSnapshots = 0;
+ private SnapshotEntry lastSortedEntry;
+ private final SortedSet<SnapshotEntry> sortedSnapshots;
- public ReplicatedSubscriptionSnapshotCache(String subscription, int
maxSnapshotToCache) {
+ public ReplicatedSubscriptionSnapshotCache(String subscription, int
maxSnapshotToCache,
+ ToLongFunction<Range<Position>>
distanceFunction) {
this.subscription = subscription;
- this.snapshots = new TreeMap<>();
+ this.distanceFunction = distanceFunction;
+ if (maxSnapshotToCache < 3) {
+ throw new IllegalArgumentException("maxSnapshotToCache must be >=
3");
+ }
this.maxSnapshotToCache = maxSnapshotToCache;
+ this.sortedSnapshots = new TreeSet<>();
+ }
+
+ /**
+ * Memory footprint estimate for one SnapshotEntry with shared String
cluster instances.
+ *
+ * Assumptions:
+ * - 64-bit JVM with compressed OOPs enabled (default for heap sizes <
32GB)
+ * - Cluster name strings are shared/interned across entries
+ * - 2 ClusterEntry objects per SnapshotEntry (typical case)
+ * - Each entry has its own Position objects
+ * - 1 of the ClusterEntry objects is for the local cluster and shares the
local cluster position
+ * - List.of() creates ImmutableCollections.List12 for 2-element lists
+ * - 8-byte memory alignment padding applied where needed
+ *
+ * Breakdown:
+ *
+ * 1. SnapshotEntry object: ~32 bytes
+ * - Object header (mark + klass): 12 bytes
+ * - Position position: 4 bytes (reference)
+ * - List<ClusterEntry> clusters: 4 bytes (reference)
+ * - long distanceToPrevious: 8 bytes
+ * - SnapshotEntry next: 4 bytes (reference)
+ * - SnapshotEntry prev: 4 bytes (reference)
+ * - Alignment padding: 4 bytes
+ * Subtotal: 40 bytes
+ *
+ * 2. Position object (snapshot position): ~32 bytes
+ * - Object header: 12 bytes
+ * - long ledgerId: 8 bytes
+ * - long entryId: 8 bytes
+ * - Alignment padding: 4 bytes
+ * Subtotal: 32 bytes
+ *
+ * 3. ImmutableCollections.List12 (for 2 elements): ~32 bytes
+ * - Object header: 12 bytes
+ * - Object e0: 4 bytes (reference to first ClusterEntry)
+ * - Object e1: 4 bytes (reference to second ClusterEntry)
+ * - Alignment padding: 12 bytes
+ * Subtotal: 32 bytes
+ *
+ * 4. ClusterEntry objects (2 instances): ~64 bytes
+ * Each ClusterEntry (Java record): ~24 bytes
+ * - Object header: 12 bytes
+ * - String cluster: 4 bytes (reference, string itself is
shared/interned)
+ * - Position position: 4 bytes (reference)
+ * - Alignment padding: 4 bytes
+ * Subtotal per entry: 24 bytes × 2 = 48 bytes
+ *
+ * With alignment to 8 bytes: 48 → 48 bytes
+ * Actual total for both: 48 bytes
+ *
+ * 5. Additional Position object (for non-local cluster): ~32 bytes
+ * - Object header: 12 bytes
+ * - long ledgerId: 8 bytes
+ * - long entryId: 8 bytes
+ * - Alignment padding: 4 bytes
+ * Subtotal: 32 bytes
+ *
+ * Total per SnapshotEntry: 40 + 32 + 32 + 48 + 32 = ~184 bytes
+ *
+ * Rounded estimate: ~184-192 bytes per entry
+ *
+ * Note: Actual memory consumption may vary based on:
+ * - JVM implementation and version
+ * - Whether compressed OOPs are enabled
+ * - Garbage collection and heap layout
+ * - Runtime optimizations (escape analysis, object allocation elimination)
+ * - Number of clusters per snapshot (this estimate assumes 2)
+ */
+ static class SnapshotEntry implements Comparable<SnapshotEntry> {
+ private final Position position;
+ private final List<ClusterEntry> clusters;
+ private long distanceToPrevious = -1;
+ private SnapshotEntry next;
+ private SnapshotEntry prev;
+
+ SnapshotEntry(Position position, List<ClusterEntry> clusters) {
+ this.position = position;
+ this.clusters = clusters;
+ }
+
+ Position position() {
+ return position;
+ }
+
+ List<ClusterEntry> clusters() {
+ return clusters;
+ }
+
+ long distanceToPrevious() {
+ return distanceToPrevious;
+ }
+
+ void setDistanceToPrevious(long distanceToPrevious) {
+ this.distanceToPrevious = distanceToPrevious;
+ }
+
+ SnapshotEntry next() {
+ return next;
+ }
+
+ void setNext(SnapshotEntry next) {
+ this.next = next;
+ }
+
+ SnapshotEntry prev() {
+ return prev;
+ }
+
+ void setPrev(SnapshotEntry prev) {
+ this.prev = prev;
+ }
+
+ long totalDistance() {
+ return distanceToPrevious + (next != null ?
next.distanceToPrevious : 0L);
+ }
+
+ @Override
+ public int compareTo(SnapshotEntry o) {
+ int retval = Long.compare(totalDistance(), o.totalDistance());
+ if (retval != 0) {
+ return retval;
+ }
+ retval = position.compareTo(o.position);
+ if (retval != 0) {
+ return retval;
+ }
+ return Integer.compare(System.identityHashCode(this),
System.identityHashCode(o));
+ }
+
+ @Override
+ public String toString() {
+ return String.format("SnapshotEntry(position=%s, clusters=%s,
distanceToPrevious=%d)", position, clusters,
+ distanceToPrevious);
+ }
}
+ public record ClusterEntry(String cluster, Position position) {}
+
+ public record SnapshotResult(Position position, List<ClusterEntry>
clusters) {}
+
public synchronized void addNewSnapshot(ReplicatedSubscriptionsSnapshot
snapshot) {
MarkersMessageIdData msgId = snapshot.getLocalMessageId();
Position position = PositionFactory.create(msgId.getLedgerId(),
msgId.getEntryId());
+ if (tail != null && position.compareTo(tail.position) <= 0) {
+ // clear the entries in the cache if the new snapshot is older
than the last one
+ // this means that the subscription has been reset
+ head = null;
+ tail = null;
+ numberOfSnapshots = 0;
+ sortedSnapshots.clear();
+ lastSortedEntry = null;
+ }
+
+ List<ClusterEntry> clusterEntryList =
snapshot.getClustersList().stream()
+ .map(cmid -> {
+ Position clusterPosition =
+
PositionFactory.create(cmid.getMessageId().getLedgerId(),
cmid.getMessageId().getEntryId());
+ if (clusterPosition.equals(position)) {
+ // reduce memory usage by sharing the same instance
for the local cluster
+ clusterPosition = position;
+ }
+ return new
ClusterEntry(StringInterner.intern(cmid.getCluster()), clusterPosition);
+ })
+ .toList();
+
+ // optimize heap memory consumption of the cache
+ if (clusterEntryList.size() == 2) {
+ clusterEntryList = List.of(clusterEntryList.get(0),
clusterEntryList.get(1));
+ } else if (clusterEntryList.size() == 3) {
+ clusterEntryList = List.of(clusterEntryList.get(0),
clusterEntryList.get(1), clusterEntryList.get(2));
+ }
+
+ SnapshotEntry entry = new SnapshotEntry(position, clusterEntryList);
+
if (log.isDebugEnabled()) {
log.debug("[{}] Added new replicated-subscription snapshot at {}
-- {}", subscription, position,
snapshot.getSnapshotId());
}
- snapshots.put(position, snapshot);
+ // append to the double-linked list
+ if (head == null) {
+ head = entry;
+ tail = entry;
+ entry.setDistanceToPrevious(0);
+ } else {
+ tail.setNext(entry);
+ entry.setPrev(tail);
+ tail = entry;
+ }
+ numberOfSnapshots++;
// Prune the cache
- while (snapshots.size() > maxSnapshotToCache) {
- snapshots.pollFirstEntry();
+ if (numberOfSnapshots > maxSnapshotToCache) {
+ removeSingleEntryWithMinimumTotalDistanceToPreviousAndNext();
+ }
+ }
+
+ private void removeSingleEntryWithMinimumTotalDistanceToPreviousAndNext() {
+ updateSortedEntriesByTotalDistance();
+
+ SnapshotEntry minEntry = sortedSnapshots.first();
+ // Defensive check: minEntry should never be head or tail, as these
are boundary entries that must be preserved
+ if (minEntry == head || minEntry == tail) {
+ throw new IllegalStateException("minEntry should not be head or
tail boundary entry");
+ }
+ SnapshotEntry minEntryNext = minEntry.next;
+ SnapshotEntry minEntryPrevious = minEntry.prev;
+
+ // remove minEntry from the sorted set
+ sortedSnapshots.remove(minEntry);
+
+ // remove minEntryPrevious and minEntryNext from the sorted set since
the distance will be updated
+ if (minEntryNext != tail) {
+ sortedSnapshots.remove(minEntryNext);
+ }
+ if (minEntryPrevious != head) {
+ sortedSnapshots.remove(minEntryPrevious);
+ }
+
+ // remove minEntry from the linked list
+ minEntryPrevious.setNext(minEntryNext);
+ minEntryNext.setPrev(minEntryPrevious);
+ numberOfSnapshots--;
+
+ // handle the case where the entry to remove is the last entry that
has been sorted
+ if (lastSortedEntry == minEntry) {
+ lastSortedEntry = minEntryPrevious;
+ }
+
+ // update distanceToPrevious for the next entry
+ minEntryNext.setDistanceToPrevious(minEntryNext.distanceToPrevious +
minEntry.distanceToPrevious);
+
+ // add entries back to the sorted set so that entries up to
lastSortedEntry are sorted
+ if (minEntryNext != tail) {
+ sortedSnapshots.add(minEntryNext);
+ }
+ if (minEntryPrevious != head) {
+ sortedSnapshots.add(minEntryPrevious);
+ }
+ }
+
+ /**
+ * Maintains a sorted set of entries ordered by their total distance to
adjacent entries.
+ * This method calculates the 'distanceToPrevious' field for both current
and next entries before adding them to the
+ * sorted set. Subsequent calls to this method will continue processing
from where the last entry was added.
+ */
+ private void updateSortedEntriesByTotalDistance() {
+ SnapshotEntry current = lastSortedEntry != null ? lastSortedEntry.next
: head.next;
+ SnapshotEntry previousLoopEntry = null;
+ while (current != null) {
+ // calculate the distance to the previous snapshot entry
+ if (current.distanceToPrevious == -1) {
+ long distanceToPrevious =
+
distanceFunction.applyAsLong(Range.open(current.prev.position,
current.position));
+ current.setDistanceToPrevious(distanceToPrevious);
+ }
+ // Add the entry to the sorted set, which is sorted by total
distance to the previous and the next entry.
+ // We cannot add the current entry here since sorting requires
that the current and next entries have
+ // their distanceToPrevious field set.
+ if (previousLoopEntry != null) {
+ sortedSnapshots.add(previousLoopEntry);
+ lastSortedEntry = previousLoopEntry;
+ }
+ previousLoopEntry = current;
+ current = current.next;
}
}
/**
* Signal that the mark-delete position on the subscription has been
advanced. If there is a snapshot that
- * correspond to this position, it will returned, other it will return
null.
+ * corresponds to this position, it will be returned; otherwise it will
return null.
*/
- public synchronized ReplicatedSubscriptionsSnapshot
advancedMarkDeletePosition(Position pos) {
- ReplicatedSubscriptionsSnapshot snapshot = null;
- while (!snapshots.isEmpty()) {
- Map.Entry<Position, ReplicatedSubscriptionsSnapshot> firstEntry =
- snapshots.firstEntry();
- Position first = firstEntry.getKey();
- if (first.compareTo(pos) > 0) {
- // Snapshot is associated which an higher position, so it
cannot be used now
+ public synchronized SnapshotResult advancedMarkDeletePosition(Position
pos) {
+ SnapshotEntry snapshot = null;
+
+ SnapshotEntry current = head;
+
+ while (current != null) {
+ if (current.position.compareTo(pos) > 0) {
+ // Snapshot is associated with a higher position, so it cannot
be used now
if (log.isDebugEnabled()) {
- log.debug("[{}] Snapshot {} is associated with an higher
position {} so it cannot be used for mark "
- + "delete position {}", subscription,
firstEntry.getValue(), first, pos);
+ log.debug("[{}] Snapshot {} is associated with a higher
position {} so it cannot be used for mark "
+ + "delete position {}", subscription, current,
current.position, pos);
}
break;
- } else {
- // This snapshot is potentially good. Continue the search for
to see if there is a higher snapshot we
- // can use
- snapshot = snapshots.pollFirstEntry().getValue();
}
+ // This snapshot is potentially good. Continue the search to see
if there is a higher snapshot we
+ // can use
+ snapshot = current;
+ if (current == lastSortedEntry) {
+ lastSortedEntry = null;
+ }
+ current = current.next;
+ head = current;
+ if (head != null) {
+ sortedSnapshots.remove(head);
+ }
+ numberOfSnapshots--;
+ }
+
+ if (head == null) {
+ tail = null;
+ } else {
+ head.setPrev(null);
+ head.setDistanceToPrevious(0L);
}
if (log.isDebugEnabled()) {
if (snapshot != null) {
- log.debug("[{}] Advanced mark-delete position to {} -- found
snapshot {} at {}:{}", subscription, pos,
- snapshot.getSnapshotId(),
- snapshot.getLocalMessageId().getLedgerId(),
- snapshot.getLocalMessageId().getEntryId());
+ log.debug("[{}] Advanced mark-delete position to {} -- found
snapshot at {}", subscription, pos,
+ snapshot.position());
} else {
log.debug("[{}] Advanced mark-delete position to {} --
snapshot not found", subscription, pos);
}
}
- return snapshot;
+
+ return snapshot != null ? new SnapshotResult(snapshot.position(),
snapshot.clusters()) : null;
+ }
+
+ @VisibleForTesting
+ synchronized List<SnapshotEntry> getSnapshots() {
+ List<SnapshotEntry> snapshots = new ArrayList<>(numberOfSnapshots);
+ SnapshotEntry current = head;
+ while (current != null) {
+ snapshots.add(current);
+ current = current.next;
+ }
+ return snapshots;
+ }
+
+ @VisibleForTesting
+ synchronized int size() {
+ return numberOfSnapshots;
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
index 7ae48f7976b..5eaa313c3d0 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
@@ -47,7 +47,6 @@ import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.MarkerType;
import org.apache.pulsar.common.api.proto.MarkersMessageIdData;
-import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot;
import
org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshotRequest;
import
org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshotResponse;
import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsUpdate;
@@ -126,20 +125,19 @@ public class ReplicatedSubscriptionsController implements
AutoCloseable, Topic.P
}
}
- public void localSubscriptionUpdated(String subscriptionName,
ReplicatedSubscriptionsSnapshot snapshot) {
+ public void localSubscriptionUpdated(String subscriptionName,
+
ReplicatedSubscriptionSnapshotCache.SnapshotResult snapshot) {
if (log.isDebugEnabled()) {
- log.debug("[{}][{}][{}] Updating subscription to snapshot {}",
- topic.getBrokerService().pulsar().getBrokerId(), topic,
subscriptionName,
- snapshot.getClustersList().stream()
- .map(cmid -> String.format("%s -> %d:%d",
cmid.getCluster(),
- cmid.getMessageId().getLedgerId(),
cmid.getMessageId().getEntryId()))
+ log.debug("[{}][{}] Updating subscription to snapshot {}", topic,
subscriptionName,
+ snapshot.clusters().stream()
+ .map(entry -> String.format("%s -> %s",
entry.cluster(), entry.position()))
.collect(Collectors.toList()));
}
Map<String, MarkersMessageIdData> clusterIds = new TreeMap<>();
- for (int i = 0, size = snapshot.getClustersCount(); i < size; i++) {
- ClusterMessageId cmid = snapshot.getClusterAt(i);
- clusterIds.put(cmid.getCluster(), cmid.getMessageId());
+ for (ReplicatedSubscriptionSnapshotCache.ClusterEntry cluster :
snapshot.clusters()) {
+ clusterIds.put(cluster.cluster(), new
MarkersMessageIdData().setLedgerId(cluster.position().getLedgerId())
+ .setEntryId(cluster.position().getEntryId()));
}
ByteBuf subscriptionUpdate =
Markers.newReplicatedSubscriptionsUpdate(subscriptionName, clusterIds);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 097a4f6de05..3b64b2ecc2c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -1373,6 +1373,8 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
void setupMLAsyncCallbackMocks() {
ledgerMock = mock(ManagedLedger.class);
cursorMock = mock(ManagedCursorImpl.class);
+ doReturn(ledgerMock).when(cursorMock).getManagedLedger();
+ doReturn(0L).when(ledgerMock).getNumberOfEntries(any());
final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
doReturn(new ArrayList<>()).when(ledgerMock).getCursors();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCacheTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCacheTest.java
index 5f6ae405ccd..e1e7e0d4164 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCacheTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCacheTest.java
@@ -18,19 +18,27 @@
*/
package org.apache.pulsar.broker.service.persistent;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot;
import org.testng.annotations.Test;
+@Slf4j
@Test(groups = "broker")
public class ReplicatedSubscriptionSnapshotCacheTest {
@Test
public void testSnapshotCache() {
- ReplicatedSubscriptionSnapshotCache cache = new
ReplicatedSubscriptionSnapshotCache("my-subscription", 10);
+ ReplicatedSubscriptionSnapshotCache cache =
+ new ReplicatedSubscriptionSnapshotCache("my-subscription", 10,
range -> 0);
assertNull(cache.advancedMarkDeletePosition(PositionFactory.create(0,
0)));
assertNull(cache.advancedMarkDeletePosition(PositionFactory.create(100, 0)));
@@ -58,13 +66,14 @@ public class ReplicatedSubscriptionSnapshotCacheTest {
assertNull(cache.advancedMarkDeletePosition(PositionFactory.create(0,
0)));
assertNull(cache.advancedMarkDeletePosition(PositionFactory.create(1,
0)));
- ReplicatedSubscriptionsSnapshot snapshot =
cache.advancedMarkDeletePosition(PositionFactory.create(1, 1));
+ ReplicatedSubscriptionSnapshotCache.SnapshotResult
+ snapshot =
cache.advancedMarkDeletePosition(PositionFactory.create(1, 1));
assertNotNull(snapshot);
- assertEquals(snapshot.getSnapshotId(), "snapshot-1");
+ assertEquals(snapshot.position(), PositionFactory.create(1, 1));
snapshot = cache.advancedMarkDeletePosition(PositionFactory.create(5,
6));
assertNotNull(snapshot);
- assertEquals(snapshot.getSnapshotId(), "snapshot-5");
+ assertEquals(snapshot.position(), PositionFactory.create(5, 5));
// Snapshots should have been now removed
assertNull(cache.advancedMarkDeletePosition(PositionFactory.create(2,
2)));
@@ -73,7 +82,8 @@ public class ReplicatedSubscriptionSnapshotCacheTest {
@Test
public void testSnapshotCachePruning() {
- ReplicatedSubscriptionSnapshotCache cache = new
ReplicatedSubscriptionSnapshotCache("my-subscription", 3);
+ ReplicatedSubscriptionSnapshotCache cache =
+ new ReplicatedSubscriptionSnapshotCache("my-subscription", 3,
range -> 1);
ReplicatedSubscriptionsSnapshot s1 = new
ReplicatedSubscriptionsSnapshot()
.setSnapshotId("snapshot-1");
@@ -96,14 +106,223 @@ public class ReplicatedSubscriptionSnapshotCacheTest {
cache.addNewSnapshot(s3);
cache.addNewSnapshot(s4);
- // Snapshot-1 was already pruned
- assertNull(cache.advancedMarkDeletePosition(PositionFactory.create(1,
1)));
- ReplicatedSubscriptionsSnapshot snapshot =
cache.advancedMarkDeletePosition(PositionFactory.create(2, 2));
+ ReplicatedSubscriptionSnapshotCache.SnapshotResult
+ snapshot =
cache.advancedMarkDeletePosition(PositionFactory.create(2, 2));
assertNotNull(snapshot);
- assertEquals(snapshot.getSnapshotId(), "snapshot-2");
+ // Snapshot-2 was already pruned
+ assertEquals(snapshot.position(), PositionFactory.create(1, 1));
snapshot = cache.advancedMarkDeletePosition(PositionFactory.create(5,
5));
assertNotNull(snapshot);
- assertEquals(snapshot.getSnapshotId(), "snapshot-4");
+ assertEquals(snapshot.position(), PositionFactory.create(4, 4));
}
-}
+
+
+ @Test(timeOut = 15_000)
+ public void testSnapshotCachePruningByKeepingEqualDistance() {
+ int maxSnapshotToCache = 10_000;
+ int addSnapshotCount = 1_000_000;
+
+ ReplicatedSubscriptionSnapshotCache cache =
+ new ReplicatedSubscriptionSnapshotCache("my-subscription",
maxSnapshotToCache,
+ range -> range.upperEndpoint().getEntryId() -
range.lowerEndpoint().getEntryId());
+
+ long ledgerIdCluster1 = 1;
+ long entryIdCluster1 = 0;
+ long ledgerIdCluster2 = 2;
+ long entryIdCluster2 = 0;
+ Random random = new Random();
+
+ // create a large number of snapshots where the entry ids move forward
100 + 0-1000 (random) entries at a time
+ for (int i = 0; i < addSnapshotCount; i++) {
+ ReplicatedSubscriptionsSnapshot snapshot = new
ReplicatedSubscriptionsSnapshot()
+ .setSnapshotId(UUID.randomUUID().toString());
+
snapshot.setLocalMessageId().setLedgerId(ledgerIdCluster1).setEntryId(entryIdCluster1);
+
snapshot.addCluster().setCluster("cluster1").setMessageId().setLedgerId(ledgerIdCluster1)
+ .setEntryId(entryIdCluster1);
+
snapshot.addCluster().setCluster("cluster2").setMessageId().setLedgerId(ledgerIdCluster2)
+ .setEntryId(entryIdCluster2);
+ cache.addNewSnapshot(snapshot);
+ entryIdCluster1 += 100 + random.nextInt(1000);
+ entryIdCluster2 += 100 + random.nextInt(1000);
+ }
+
+ // validate the state of snapshots
+ List<ReplicatedSubscriptionSnapshotCache.SnapshotEntry> snapshots =
cache.getSnapshots();
+ assertEquals(snapshots.size(), maxSnapshotToCache);
+ ReplicatedSubscriptionSnapshotCache.SnapshotEntry second =
snapshots.get(1);
+ ReplicatedSubscriptionSnapshotCache.SnapshotEntry secondLast =
snapshots.get(snapshots.size() - 2);
+ long distance = secondLast.position().getEntryId() -
second.position().getEntryId();
+ long expectedAverageDistance = distance / snapshots.size();
+
+ long maxDistance = 0;
+ long minDistance = Long.MAX_VALUE;
+ for (int i = 0; i < snapshots.size() - 1; i++) {
+ Position position = snapshots.get(i).position();
+ Position nextPosition = snapshots.get(i + 1).position();
+ long distanceToNext = nextPosition.getEntryId() -
position.getEntryId();
+ if (log.isDebugEnabled()) {
+ log.debug(i + ": " + position + " -> " + nextPosition + "
distance to next: " + distanceToNext
+ + " to previous: " +
snapshots.get(i).distanceToPrevious());
+ }
+ maxDistance = Math.max(maxDistance, distanceToNext);
+ minDistance = Math.min(minDistance, distanceToNext);
+
+ // ensure that each snapshot is within 2 * expected average
distance from the previous one
+ ReplicatedSubscriptionSnapshotCache.SnapshotEntry snapshotEntry =
snapshots.get(i);
+ assertThat(snapshotEntry.distanceToPrevious()).describedAs(
+ "distance to previous for snapshot entry: %s is
not expected", snapshotEntry)
+ .isLessThanOrEqualTo(expectedAverageDistance * 2);
+ }
+
+ log.info("Average distance, expected: {}", expectedAverageDistance);
+ log.info("Min distance: {}", minDistance);
+ log.info("Max distance: {}", maxDistance);
+
+ // check that picking a random markDeletePosition within the range of
the second snapshot will result in a
+ // snapshot that is within 2 * expectedAverageDistance from the
markDeletePosition
+ Position markDeletePosition =
+ PositionFactory.create(ledgerIdCluster1,
+ second.position().getEntryId() +
random.nextLong(Math.max(1, distance)));
+
+
assertThat(cache.advancedMarkDeletePosition(markDeletePosition)).satisfies(snapshotResult
-> {
+ long snapshotDistance = markDeletePosition.getEntryId() -
snapshotResult.position().getEntryId();
+ assertThat(snapshotDistance).describedAs("snapshot result: %s
markDeletePosition: %s", snapshotResult,
+
markDeletePosition).isLessThanOrEqualTo(expectedAverageDistance * 2);
+ });
+
+ }
+
+ @Test
+ public void testSnapshotCachePruningScenarios() {
+ ReplicatedSubscriptionSnapshotCache cache = new
ReplicatedSubscriptionSnapshotCache("my-subscription", 5,
+ range -> range.upperEndpoint().getEntryId() -
range.lowerEndpoint().getEntryId());
+
+ ReplicatedSubscriptionsSnapshot s1 = new
ReplicatedSubscriptionsSnapshot();
+ s1.setLocalMessageId().setLedgerId(1).setEntryId(1);
+ cache.addNewSnapshot(s1);
+
+ ReplicatedSubscriptionsSnapshot s2 = new
ReplicatedSubscriptionsSnapshot();
+ s2.setLocalMessageId().setLedgerId(1).setEntryId(2);
+ cache.addNewSnapshot(s2);
+
+ ReplicatedSubscriptionsSnapshot s3 = new
ReplicatedSubscriptionsSnapshot();
+ s3.setLocalMessageId().setLedgerId(1).setEntryId(10);
+ cache.addNewSnapshot(s3);
+
+ ReplicatedSubscriptionsSnapshot s4 = new
ReplicatedSubscriptionsSnapshot();
+ s4.setLocalMessageId().setLedgerId(1).setEntryId(15);
+ cache.addNewSnapshot(s4);
+
+ ReplicatedSubscriptionsSnapshot s5 = new
ReplicatedSubscriptionsSnapshot();
+ s5.setLocalMessageId().setLedgerId(1).setEntryId(25);
+ cache.addNewSnapshot(s5);
+
+ ReplicatedSubscriptionsSnapshot s6 = new
ReplicatedSubscriptionsSnapshot();
+ s6.setLocalMessageId().setLedgerId(1).setEntryId(100);
+ cache.addNewSnapshot(s6);
+
+ // s2 should be pruned (special case where head is previous to the
removed one)
+ assertThat(cache.getSnapshots()).hasSize(5)
+ .allSatisfy(snapshotEntry ->
assertThat(snapshotEntry.position()).isNotEqualTo(
+ PositionFactory.create(1, 2)));
+
+ ReplicatedSubscriptionsSnapshot s7 = new
ReplicatedSubscriptionsSnapshot();
+ s7.setLocalMessageId().setLedgerId(1).setEntryId(110);
+ cache.addNewSnapshot(s7);
+
+ // s3 should be pruned (ordinary case where middle entry is removed)
+ assertThat(cache.getSnapshots()).hasSize(5)
+ .allSatisfy(snapshotEntry ->
assertThat(snapshotEntry.position()).isNotEqualTo(
+ PositionFactory.create(1, 10)));
+
+ ReplicatedSubscriptionsSnapshot s8 = new
ReplicatedSubscriptionsSnapshot();
+ s8.setLocalMessageId().setLedgerId(1).setEntryId(112);
+ cache.addNewSnapshot(s8);
+
+ // s7 should be pruned (special case where tail is after the removed
one)
+ assertThat(cache.getSnapshots()).hasSize(5)
+ .allSatisfy(snapshotEntry ->
assertThat(snapshotEntry.position()).isNotEqualTo(
+ PositionFactory.create(1, 110)));
+
+
+ ReplicatedSubscriptionsSnapshot s9 = new
ReplicatedSubscriptionsSnapshot();
+ s9.setLocalMessageId().setLedgerId(1).setEntryId(113);
+ cache.addNewSnapshot(s9);
+
+ // s8 should be pruned (check that pruning works after the one before
the tail was removed)
+ assertThat(cache.getSnapshots()).hasSize(5)
+ .allSatisfy(snapshotEntry ->
assertThat(snapshotEntry.position()).isNotEqualTo(
+ PositionFactory.create(1, 112)));
+
+ ReplicatedSubscriptionsSnapshot s10 = new
ReplicatedSubscriptionsSnapshot();
+ s10.setLocalMessageId().setLedgerId(1).setEntryId(200);
+ cache.addNewSnapshot(s10);
+
+ // s4 should be pruned (check that pruning still works immediately
after head)
+ assertThat(cache.getSnapshots()).hasSize(5)
+ .allSatisfy(snapshotEntry ->
assertThat(snapshotEntry.position()).isNotEqualTo(
+ PositionFactory.create(1, 15)));
+
+ ReplicatedSubscriptionsSnapshot s11 = new
ReplicatedSubscriptionsSnapshot();
+ // entry id that is before the tail
+ s11.setLocalMessageId().setLedgerId(1).setEntryId(50);
+ cache.addNewSnapshot(s11);
+
+ // all snapshots should be pruned, and s11 should be the only one
+ assertThat(cache.getSnapshots()).hasSize(1)
+ .first().satisfies(snapshotEntry ->
assertThat(snapshotEntry.position()).isEqualTo(
+ PositionFactory.create(1, 50)));
+ }
+
+ @Test(timeOut = 15_000)
+ public void testSnapshotCacheStressTest() {
+ int maxSnapshotToCache = 10_000;
+ int addSnapshotCount = 1_000_000;
+
+ ReplicatedSubscriptionSnapshotCache cache =
+ new ReplicatedSubscriptionSnapshotCache("my-subscription",
maxSnapshotToCache,
+ range -> range.upperEndpoint().getEntryId() -
range.lowerEndpoint().getEntryId());
+
+ long ledgerIdCluster1 = 1;
+ long entryIdCluster1 = 0;
+ long ledgerIdCluster2 = 2;
+ long entryIdCluster2 = 0;
+ Random random = new Random();
+
+ int addedSnapshots = 0;
+ long markDeletePositionEntryId = 0;
+ long firstSnapshotEntryId = -1L;
+
+ while (addedSnapshots < addSnapshotCount) {
+ // fill up the cache with random number of entries
+ int addInThisRound = 1 + random.nextInt(2 * maxSnapshotToCache);
+ for (int i = 0; i < addInThisRound; i++) {
+ ReplicatedSubscriptionsSnapshot snapshot = new
ReplicatedSubscriptionsSnapshot()
+ .setSnapshotId(UUID.randomUUID().toString());
+
snapshot.setLocalMessageId().setLedgerId(ledgerIdCluster1).setEntryId(entryIdCluster1);
+
snapshot.addCluster().setCluster("cluster1").setMessageId().setLedgerId(ledgerIdCluster1)
+ .setEntryId(entryIdCluster1);
+
snapshot.addCluster().setCluster("cluster2").setMessageId().setLedgerId(ledgerIdCluster2)
+ .setEntryId(entryIdCluster2);
+ cache.addNewSnapshot(snapshot);
+ if (firstSnapshotEntryId == -1L) {
+ firstSnapshotEntryId = entryIdCluster1;
+ }
+ addedSnapshots++;
+ entryIdCluster1 += 100 + random.nextInt(1000);
+ entryIdCluster2 += 100 + random.nextInt(1000);
+ }
+ long bound = entryIdCluster1 - firstSnapshotEntryId;
+ if (bound > 0) {
+ markDeletePositionEntryId = firstSnapshotEntryId +
random.nextLong(bound);
+ } else {
+ markDeletePositionEntryId = firstSnapshotEntryId;
+ }
+ ReplicatedSubscriptionSnapshotCache.SnapshotResult snapshotResult
= cache.advancedMarkDeletePosition(
+ PositionFactory.create(ledgerIdCluster1,
markDeletePositionEntryId));
+ assertNotNull(snapshotResult);
+ firstSnapshotEntryId = -1L;
+ }
+ }
+}
\ No newline at end of file