This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 7c98c2f2c34f11da919422d49a5326e97a9f3b69 Author: Lari Hotari <[email protected]> AuthorDate: Fri Dec 12 12:47:41 2025 +0200 [improve][broker] Improve replicated subscription snapshot cache so that subscriptions can be replicated when mark delete position update is not frequent (#25044) (cherry picked from commit 634502699321469a6816d12a09081d3f63333bba) --- conf/broker.conf | 2 +- .../apache/pulsar/broker/ServiceConfiguration.java | 2 +- .../service/persistent/PersistentSubscription.java | 6 +- .../ReplicatedSubscriptionSnapshotCache.java | 354 +++++++++++++++++++-- .../ReplicatedSubscriptionsController.java | 18 +- .../pulsar/broker/service/PersistentTopicTest.java | 2 + .../ReplicatedSubscriptionSnapshotCacheTest.java | 241 +++++++++++++- 7 files changed, 571 insertions(+), 54 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index 4d45d928d34..0ec722348a8 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -732,7 +732,7 @@ replicatedSubscriptionsSnapshotFrequencyMillis=1000 replicatedSubscriptionsSnapshotTimeoutSeconds=30 # Max number of snapshot to be cached per subscription. -replicatedSubscriptionsSnapshotMaxCachedPerSubscription=10 +replicatedSubscriptionsSnapshotMaxCachedPerSubscription=30 # Max memory size for broker handling messages sending from producers. # If the processing message size exceed this value, broker will stop read data diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index c6618304914..d04d8d9be26 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -1540,7 +1540,7 @@ public class ServiceConfiguration implements PulsarConfiguration { @FieldContext( category = CATEGORY_SERVER, doc = "Max number of snapshot to be cached per subscription.") - private int replicatedSubscriptionsSnapshotMaxCachedPerSubscription = 10; + private int replicatedSubscriptionsSnapshotMaxCachedPerSubscription = 30; @FieldContext( category = CATEGORY_SERVER, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 66e00189c2e..5eb06b87628 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -206,7 +206,8 @@ public class PersistentSubscription extends AbstractSubscription { this.replicatedSubscriptionSnapshotCache = null; } else if (this.replicatedSubscriptionSnapshotCache == null) { this.replicatedSubscriptionSnapshotCache = new ReplicatedSubscriptionSnapshotCache(subName, - config.getReplicatedSubscriptionsSnapshotMaxCachedPerSubscription()); + config.getReplicatedSubscriptionsSnapshotMaxCachedPerSubscription(), + getCursor().getManagedLedger()::getNumberOfEntries); } if (this.cursor != null) { @@ -557,7 +558,8 @@ public class PersistentSubscription extends AbstractSubscription { private void handleReplicatedSubscriptionsUpdate(Position markDeletePosition) { ReplicatedSubscriptionSnapshotCache snapshotCache = this.replicatedSubscriptionSnapshotCache; if (snapshotCache != null) { - ReplicatedSubscriptionsSnapshot snapshot = snapshotCache.advancedMarkDeletePosition(markDeletePosition); + ReplicatedSubscriptionSnapshotCache.SnapshotResult snapshot = snapshotCache + .advancedMarkDeletePosition(markDeletePosition); if (snapshot != null) { topic.getReplicatedSubscriptionController() .ifPresent(c -> c.localSubscriptionUpdated(subName, snapshot)); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCache.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCache.java index 6b4e60dc63b..2e5dbbe0c78 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCache.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCache.java @@ -18,14 +18,19 @@ */ package org.apache.pulsar.broker.service.persistent; -import java.util.Map; -import java.util.NavigableMap; -import java.util.TreeMap; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Range; +import java.util.ArrayList; +import java.util.List; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.function.ToLongFunction; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.PositionFactory; import org.apache.pulsar.common.api.proto.MarkersMessageIdData; import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot; +import org.apache.pulsar.common.util.StringInterner; /** * Store the last N snapshots that were scanned by a particular subscription. @@ -33,66 +38,357 @@ import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot; @Slf4j public class ReplicatedSubscriptionSnapshotCache { private final String subscription; - private final NavigableMap<Position, ReplicatedSubscriptionsSnapshot> snapshots; + private final ToLongFunction<Range<Position>> distanceFunction; private final int maxSnapshotToCache; + private SnapshotEntry head; + private SnapshotEntry tail; + private int numberOfSnapshots = 0; + private SnapshotEntry lastSortedEntry; + private final SortedSet<SnapshotEntry> sortedSnapshots; - public ReplicatedSubscriptionSnapshotCache(String subscription, int maxSnapshotToCache) { + public ReplicatedSubscriptionSnapshotCache(String subscription, int maxSnapshotToCache, + ToLongFunction<Range<Position>> distanceFunction) { this.subscription = subscription; - this.snapshots = new TreeMap<>(); + this.distanceFunction = distanceFunction; + if (maxSnapshotToCache < 3) { + throw new IllegalArgumentException("maxSnapshotToCache must be >= 3"); + } this.maxSnapshotToCache = maxSnapshotToCache; + this.sortedSnapshots = new TreeSet<>(); + } + + /** + * Memory footprint estimate for one SnapshotEntry with shared String cluster instances. + * + * Assumptions: + * - 64-bit JVM with compressed OOPs enabled (default for heap sizes < 32GB) + * - Cluster name strings are shared/interned across entries + * - 2 ClusterEntry objects per SnapshotEntry (typical case) + * - Each entry has its own Position objects + * - 1 of the ClusterEntry objects is for the local cluster and shares the local cluster position + * - List.of() creates ImmutableCollections.List12 for 2-element lists + * - 8-byte memory alignment padding applied where needed + * + * Breakdown: + * + * 1. SnapshotEntry object: ~32 bytes + * - Object header (mark + klass): 12 bytes + * - Position position: 4 bytes (reference) + * - List<ClusterEntry> clusters: 4 bytes (reference) + * - long distanceToPrevious: 8 bytes + * - SnapshotEntry next: 4 bytes (reference) + * - SnapshotEntry prev: 4 bytes (reference) + * - Alignment padding: 4 bytes + * Subtotal: 40 bytes + * + * 2. Position object (snapshot position): ~32 bytes + * - Object header: 12 bytes + * - long ledgerId: 8 bytes + * - long entryId: 8 bytes + * - Alignment padding: 4 bytes + * Subtotal: 32 bytes + * + * 3. ImmutableCollections.List12 (for 2 elements): ~32 bytes + * - Object header: 12 bytes + * - Object e0: 4 bytes (reference to first ClusterEntry) + * - Object e1: 4 bytes (reference to second ClusterEntry) + * - Alignment padding: 12 bytes + * Subtotal: 32 bytes + * + * 4. ClusterEntry objects (2 instances): ~64 bytes + * Each ClusterEntry (Java record): ~24 bytes + * - Object header: 12 bytes + * - String cluster: 4 bytes (reference, string itself is shared/interned) + * - Position position: 4 bytes (reference) + * - Alignment padding: 4 bytes + * Subtotal per entry: 24 bytes × 2 = 48 bytes + * + * With alignment to 8 bytes: 48 → 48 bytes + * Actual total for both: 48 bytes + * + * 5. Additional Position object (for non-local cluster): ~32 bytes + * - Object header: 12 bytes + * - long ledgerId: 8 bytes + * - long entryId: 8 bytes + * - Alignment padding: 4 bytes + * Subtotal: 32 bytes + * + * Total per SnapshotEntry: 40 + 32 + 32 + 48 + 32 = ~184 bytes + * + * Rounded estimate: ~184-192 bytes per entry + * + * Note: Actual memory consumption may vary based on: + * - JVM implementation and version + * - Whether compressed OOPs are enabled + * - Garbage collection and heap layout + * - Runtime optimizations (escape analysis, object allocation elimination) + * - Number of clusters per snapshot (this estimate assumes 2) + */ + static class SnapshotEntry implements Comparable<SnapshotEntry> { + private final Position position; + private final List<ClusterEntry> clusters; + private long distanceToPrevious = -1; + private SnapshotEntry next; + private SnapshotEntry prev; + + SnapshotEntry(Position position, List<ClusterEntry> clusters) { + this.position = position; + this.clusters = clusters; + } + + Position position() { + return position; + } + + List<ClusterEntry> clusters() { + return clusters; + } + + long distanceToPrevious() { + return distanceToPrevious; + } + + void setDistanceToPrevious(long distanceToPrevious) { + this.distanceToPrevious = distanceToPrevious; + } + + SnapshotEntry next() { + return next; + } + + void setNext(SnapshotEntry next) { + this.next = next; + } + + SnapshotEntry prev() { + return prev; + } + + void setPrev(SnapshotEntry prev) { + this.prev = prev; + } + + long totalDistance() { + return distanceToPrevious + (next != null ? next.distanceToPrevious : 0L); + } + + @Override + public int compareTo(SnapshotEntry o) { + int retval = Long.compare(totalDistance(), o.totalDistance()); + if (retval != 0) { + return retval; + } + retval = position.compareTo(o.position); + if (retval != 0) { + return retval; + } + return Integer.compare(System.identityHashCode(this), System.identityHashCode(o)); + } + + @Override + public String toString() { + return String.format("SnapshotEntry(position=%s, clusters=%s, distanceToPrevious=%d)", position, clusters, + distanceToPrevious); + } } + public record ClusterEntry(String cluster, Position position) {} + + public record SnapshotResult(Position position, List<ClusterEntry> clusters) {} + public synchronized void addNewSnapshot(ReplicatedSubscriptionsSnapshot snapshot) { MarkersMessageIdData msgId = snapshot.getLocalMessageId(); Position position = PositionFactory.create(msgId.getLedgerId(), msgId.getEntryId()); + if (tail != null && position.compareTo(tail.position) <= 0) { + // clear the entries in the cache if the new snapshot is older than the last one + // this means that the subscription has been reset + head = null; + tail = null; + numberOfSnapshots = 0; + sortedSnapshots.clear(); + lastSortedEntry = null; + } + + List<ClusterEntry> clusterEntryList = snapshot.getClustersList().stream() + .map(cmid -> { + Position clusterPosition = + PositionFactory.create(cmid.getMessageId().getLedgerId(), cmid.getMessageId().getEntryId()); + if (clusterPosition.equals(position)) { + // reduce memory usage by sharing the same instance for the local cluster + clusterPosition = position; + } + return new ClusterEntry(StringInterner.intern(cmid.getCluster()), clusterPosition); + }) + .toList(); + + // optimize heap memory consumption of the cache + if (clusterEntryList.size() == 2) { + clusterEntryList = List.of(clusterEntryList.get(0), clusterEntryList.get(1)); + } else if (clusterEntryList.size() == 3) { + clusterEntryList = List.of(clusterEntryList.get(0), clusterEntryList.get(1), clusterEntryList.get(2)); + } + + SnapshotEntry entry = new SnapshotEntry(position, clusterEntryList); + if (log.isDebugEnabled()) { log.debug("[{}] Added new replicated-subscription snapshot at {} -- {}", subscription, position, snapshot.getSnapshotId()); } - snapshots.put(position, snapshot); + // append to the double-linked list + if (head == null) { + head = entry; + tail = entry; + entry.setDistanceToPrevious(0); + } else { + tail.setNext(entry); + entry.setPrev(tail); + tail = entry; + } + numberOfSnapshots++; // Prune the cache - while (snapshots.size() > maxSnapshotToCache) { - snapshots.pollFirstEntry(); + if (numberOfSnapshots > maxSnapshotToCache) { + removeSingleEntryWithMinimumTotalDistanceToPreviousAndNext(); + } + } + + private void removeSingleEntryWithMinimumTotalDistanceToPreviousAndNext() { + updateSortedEntriesByTotalDistance(); + + SnapshotEntry minEntry = sortedSnapshots.first(); + // Defensive check: minEntry should never be head or tail, as these are boundary entries that must be preserved + if (minEntry == head || minEntry == tail) { + throw new IllegalStateException("minEntry should not be head or tail boundary entry"); + } + SnapshotEntry minEntryNext = minEntry.next; + SnapshotEntry minEntryPrevious = minEntry.prev; + + // remove minEntry from the sorted set + sortedSnapshots.remove(minEntry); + + // remove minEntryPrevious and minEntryNext from the sorted set since the distance will be updated + if (minEntryNext != tail) { + sortedSnapshots.remove(minEntryNext); + } + if (minEntryPrevious != head) { + sortedSnapshots.remove(minEntryPrevious); + } + + // remove minEntry from the linked list + minEntryPrevious.setNext(minEntryNext); + minEntryNext.setPrev(minEntryPrevious); + numberOfSnapshots--; + + // handle the case where the entry to remove is the last entry that has been sorted + if (lastSortedEntry == minEntry) { + lastSortedEntry = minEntryPrevious; + } + + // update distanceToPrevious for the next entry + minEntryNext.setDistanceToPrevious(minEntryNext.distanceToPrevious + minEntry.distanceToPrevious); + + // add entries back to the sorted set so that entries up to lastSortedEntry are sorted + if (minEntryNext != tail) { + sortedSnapshots.add(minEntryNext); + } + if (minEntryPrevious != head) { + sortedSnapshots.add(minEntryPrevious); + } + } + + /** + * Maintains a sorted set of entries ordered by their total distance to adjacent entries. + * This method calculates the 'distanceToPrevious' field for both current and next entries before adding them to the + * sorted set. Subsequent calls to this method will continue processing from where the last entry was added. + */ + private void updateSortedEntriesByTotalDistance() { + SnapshotEntry current = lastSortedEntry != null ? lastSortedEntry.next : head.next; + SnapshotEntry previousLoopEntry = null; + while (current != null) { + // calculate the distance to the previous snapshot entry + if (current.distanceToPrevious == -1) { + long distanceToPrevious = + distanceFunction.applyAsLong(Range.open(current.prev.position, current.position)); + current.setDistanceToPrevious(distanceToPrevious); + } + // Add the entry to the sorted set, which is sorted by total distance to the previous and the next entry. + // We cannot add the current entry here since sorting requires that the current and next entries have + // their distanceToPrevious field set. + if (previousLoopEntry != null) { + sortedSnapshots.add(previousLoopEntry); + lastSortedEntry = previousLoopEntry; + } + previousLoopEntry = current; + current = current.next; } } /** * Signal that the mark-delete position on the subscription has been advanced. If there is a snapshot that - * correspond to this position, it will returned, other it will return null. + * corresponds to this position, it will be returned; otherwise it will return null. */ - public synchronized ReplicatedSubscriptionsSnapshot advancedMarkDeletePosition(Position pos) { - ReplicatedSubscriptionsSnapshot snapshot = null; - while (!snapshots.isEmpty()) { - Map.Entry<Position, ReplicatedSubscriptionsSnapshot> firstEntry = - snapshots.firstEntry(); - Position first = firstEntry.getKey(); - if (first.compareTo(pos) > 0) { - // Snapshot is associated which an higher position, so it cannot be used now + public synchronized SnapshotResult advancedMarkDeletePosition(Position pos) { + SnapshotEntry snapshot = null; + + SnapshotEntry current = head; + + while (current != null) { + if (current.position.compareTo(pos) > 0) { + // Snapshot is associated with a higher position, so it cannot be used now if (log.isDebugEnabled()) { - log.debug("[{}] Snapshot {} is associated with an higher position {} so it cannot be used for mark " - + "delete position {}", subscription, firstEntry.getValue(), first, pos); + log.debug("[{}] Snapshot {} is associated with a higher position {} so it cannot be used for mark " + + "delete position {}", subscription, current, current.position, pos); } break; - } else { - // This snapshot is potentially good. Continue the search for to see if there is a higher snapshot we - // can use - snapshot = snapshots.pollFirstEntry().getValue(); } + // This snapshot is potentially good. Continue the search to see if there is a higher snapshot we + // can use + snapshot = current; + if (current == lastSortedEntry) { + lastSortedEntry = null; + } + current = current.next; + head = current; + if (head != null) { + sortedSnapshots.remove(head); + } + numberOfSnapshots--; + } + + if (head == null) { + tail = null; + } else { + head.setPrev(null); + head.setDistanceToPrevious(0L); } if (log.isDebugEnabled()) { if (snapshot != null) { - log.debug("[{}] Advanced mark-delete position to {} -- found snapshot {} at {}:{}", subscription, pos, - snapshot.getSnapshotId(), - snapshot.getLocalMessageId().getLedgerId(), - snapshot.getLocalMessageId().getEntryId()); + log.debug("[{}] Advanced mark-delete position to {} -- found snapshot at {}", subscription, pos, + snapshot.position()); } else { log.debug("[{}] Advanced mark-delete position to {} -- snapshot not found", subscription, pos); } } - return snapshot; + + return snapshot != null ? new SnapshotResult(snapshot.position(), snapshot.clusters()) : null; + } + + @VisibleForTesting + synchronized List<SnapshotEntry> getSnapshots() { + List<SnapshotEntry> snapshots = new ArrayList<>(numberOfSnapshots); + SnapshotEntry current = head; + while (current != null) { + snapshots.add(current); + current = current.next; + } + return snapshots; + } + + @VisibleForTesting + synchronized int size() { + return numberOfSnapshots; } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java index 7ae48f7976b..5eaa313c3d0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java @@ -47,7 +47,6 @@ import org.apache.pulsar.common.api.proto.CommandAck.AckType; import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.api.proto.MarkerType; import org.apache.pulsar.common.api.proto.MarkersMessageIdData; -import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot; import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshotRequest; import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshotResponse; import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsUpdate; @@ -126,20 +125,19 @@ public class ReplicatedSubscriptionsController implements AutoCloseable, Topic.P } } - public void localSubscriptionUpdated(String subscriptionName, ReplicatedSubscriptionsSnapshot snapshot) { + public void localSubscriptionUpdated(String subscriptionName, + ReplicatedSubscriptionSnapshotCache.SnapshotResult snapshot) { if (log.isDebugEnabled()) { - log.debug("[{}][{}][{}] Updating subscription to snapshot {}", - topic.getBrokerService().pulsar().getBrokerId(), topic, subscriptionName, - snapshot.getClustersList().stream() - .map(cmid -> String.format("%s -> %d:%d", cmid.getCluster(), - cmid.getMessageId().getLedgerId(), cmid.getMessageId().getEntryId())) + log.debug("[{}][{}] Updating subscription to snapshot {}", topic, subscriptionName, + snapshot.clusters().stream() + .map(entry -> String.format("%s -> %s", entry.cluster(), entry.position())) .collect(Collectors.toList())); } Map<String, MarkersMessageIdData> clusterIds = new TreeMap<>(); - for (int i = 0, size = snapshot.getClustersCount(); i < size; i++) { - ClusterMessageId cmid = snapshot.getClusterAt(i); - clusterIds.put(cmid.getCluster(), cmid.getMessageId()); + for (ReplicatedSubscriptionSnapshotCache.ClusterEntry cluster : snapshot.clusters()) { + clusterIds.put(cluster.cluster(), new MarkersMessageIdData().setLedgerId(cluster.position().getLedgerId()) + .setEntryId(cluster.position().getEntryId())); } ByteBuf subscriptionUpdate = Markers.newReplicatedSubscriptionsUpdate(subscriptionName, clusterIds); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index 16dad41ba93..b6ac7923d2a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -1375,6 +1375,8 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase { void setupMLAsyncCallbackMocks() { ledgerMock = mock(ManagedLedger.class); cursorMock = mock(ManagedCursorImpl.class); + doReturn(ledgerMock).when(cursorMock).getManagedLedger(); + doReturn(0L).when(ledgerMock).getNumberOfEntries(any()); final CompletableFuture<Void> closeFuture = new CompletableFuture<>(); doReturn(new ArrayList<>()).when(ledgerMock).getCursors(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCacheTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCacheTest.java index 5f6ae405ccd..e1e7e0d4164 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCacheTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCacheTest.java @@ -18,19 +18,27 @@ */ package org.apache.pulsar.broker.service.persistent; +import static org.assertj.core.api.Assertions.assertThat; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; +import java.util.List; +import java.util.Random; +import java.util.UUID; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.PositionFactory; import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot; import org.testng.annotations.Test; +@Slf4j @Test(groups = "broker") public class ReplicatedSubscriptionSnapshotCacheTest { @Test public void testSnapshotCache() { - ReplicatedSubscriptionSnapshotCache cache = new ReplicatedSubscriptionSnapshotCache("my-subscription", 10); + ReplicatedSubscriptionSnapshotCache cache = + new ReplicatedSubscriptionSnapshotCache("my-subscription", 10, range -> 0); assertNull(cache.advancedMarkDeletePosition(PositionFactory.create(0, 0))); assertNull(cache.advancedMarkDeletePosition(PositionFactory.create(100, 0))); @@ -58,13 +66,14 @@ public class ReplicatedSubscriptionSnapshotCacheTest { assertNull(cache.advancedMarkDeletePosition(PositionFactory.create(0, 0))); assertNull(cache.advancedMarkDeletePosition(PositionFactory.create(1, 0))); - ReplicatedSubscriptionsSnapshot snapshot = cache.advancedMarkDeletePosition(PositionFactory.create(1, 1)); + ReplicatedSubscriptionSnapshotCache.SnapshotResult + snapshot = cache.advancedMarkDeletePosition(PositionFactory.create(1, 1)); assertNotNull(snapshot); - assertEquals(snapshot.getSnapshotId(), "snapshot-1"); + assertEquals(snapshot.position(), PositionFactory.create(1, 1)); snapshot = cache.advancedMarkDeletePosition(PositionFactory.create(5, 6)); assertNotNull(snapshot); - assertEquals(snapshot.getSnapshotId(), "snapshot-5"); + assertEquals(snapshot.position(), PositionFactory.create(5, 5)); // Snapshots should have been now removed assertNull(cache.advancedMarkDeletePosition(PositionFactory.create(2, 2))); @@ -73,7 +82,8 @@ public class ReplicatedSubscriptionSnapshotCacheTest { @Test public void testSnapshotCachePruning() { - ReplicatedSubscriptionSnapshotCache cache = new ReplicatedSubscriptionSnapshotCache("my-subscription", 3); + ReplicatedSubscriptionSnapshotCache cache = + new ReplicatedSubscriptionSnapshotCache("my-subscription", 3, range -> 1); ReplicatedSubscriptionsSnapshot s1 = new ReplicatedSubscriptionsSnapshot() .setSnapshotId("snapshot-1"); @@ -96,14 +106,223 @@ public class ReplicatedSubscriptionSnapshotCacheTest { cache.addNewSnapshot(s3); cache.addNewSnapshot(s4); - // Snapshot-1 was already pruned - assertNull(cache.advancedMarkDeletePosition(PositionFactory.create(1, 1))); - ReplicatedSubscriptionsSnapshot snapshot = cache.advancedMarkDeletePosition(PositionFactory.create(2, 2)); + ReplicatedSubscriptionSnapshotCache.SnapshotResult + snapshot = cache.advancedMarkDeletePosition(PositionFactory.create(2, 2)); assertNotNull(snapshot); - assertEquals(snapshot.getSnapshotId(), "snapshot-2"); + // Snapshot-2 was already pruned + assertEquals(snapshot.position(), PositionFactory.create(1, 1)); snapshot = cache.advancedMarkDeletePosition(PositionFactory.create(5, 5)); assertNotNull(snapshot); - assertEquals(snapshot.getSnapshotId(), "snapshot-4"); + assertEquals(snapshot.position(), PositionFactory.create(4, 4)); } -} + + + @Test(timeOut = 15_000) + public void testSnapshotCachePruningByKeepingEqualDistance() { + int maxSnapshotToCache = 10_000; + int addSnapshotCount = 1_000_000; + + ReplicatedSubscriptionSnapshotCache cache = + new ReplicatedSubscriptionSnapshotCache("my-subscription", maxSnapshotToCache, + range -> range.upperEndpoint().getEntryId() - range.lowerEndpoint().getEntryId()); + + long ledgerIdCluster1 = 1; + long entryIdCluster1 = 0; + long ledgerIdCluster2 = 2; + long entryIdCluster2 = 0; + Random random = new Random(); + + // create a large number of snapshots where the entry ids move forward 100 + 0-1000 (random) entries at a time + for (int i = 0; i < addSnapshotCount; i++) { + ReplicatedSubscriptionsSnapshot snapshot = new ReplicatedSubscriptionsSnapshot() + .setSnapshotId(UUID.randomUUID().toString()); + snapshot.setLocalMessageId().setLedgerId(ledgerIdCluster1).setEntryId(entryIdCluster1); + snapshot.addCluster().setCluster("cluster1").setMessageId().setLedgerId(ledgerIdCluster1) + .setEntryId(entryIdCluster1); + snapshot.addCluster().setCluster("cluster2").setMessageId().setLedgerId(ledgerIdCluster2) + .setEntryId(entryIdCluster2); + cache.addNewSnapshot(snapshot); + entryIdCluster1 += 100 + random.nextInt(1000); + entryIdCluster2 += 100 + random.nextInt(1000); + } + + // validate the state of snapshots + List<ReplicatedSubscriptionSnapshotCache.SnapshotEntry> snapshots = cache.getSnapshots(); + assertEquals(snapshots.size(), maxSnapshotToCache); + ReplicatedSubscriptionSnapshotCache.SnapshotEntry second = snapshots.get(1); + ReplicatedSubscriptionSnapshotCache.SnapshotEntry secondLast = snapshots.get(snapshots.size() - 2); + long distance = secondLast.position().getEntryId() - second.position().getEntryId(); + long expectedAverageDistance = distance / snapshots.size(); + + long maxDistance = 0; + long minDistance = Long.MAX_VALUE; + for (int i = 0; i < snapshots.size() - 1; i++) { + Position position = snapshots.get(i).position(); + Position nextPosition = snapshots.get(i + 1).position(); + long distanceToNext = nextPosition.getEntryId() - position.getEntryId(); + if (log.isDebugEnabled()) { + log.debug(i + ": " + position + " -> " + nextPosition + " distance to next: " + distanceToNext + + " to previous: " + snapshots.get(i).distanceToPrevious()); + } + maxDistance = Math.max(maxDistance, distanceToNext); + minDistance = Math.min(minDistance, distanceToNext); + + // ensure that each snapshot is within 2 * expected average distance from the previous one + ReplicatedSubscriptionSnapshotCache.SnapshotEntry snapshotEntry = snapshots.get(i); + assertThat(snapshotEntry.distanceToPrevious()).describedAs( + "distance to previous for snapshot entry: %s is not expected", snapshotEntry) + .isLessThanOrEqualTo(expectedAverageDistance * 2); + } + + log.info("Average distance, expected: {}", expectedAverageDistance); + log.info("Min distance: {}", minDistance); + log.info("Max distance: {}", maxDistance); + + // check that picking a random markDeletePosition within the range of the second snapshot will result in a + // snapshot that is within 2 * expectedAverageDistance from the markDeletePosition + Position markDeletePosition = + PositionFactory.create(ledgerIdCluster1, + second.position().getEntryId() + random.nextLong(Math.max(1, distance))); + + assertThat(cache.advancedMarkDeletePosition(markDeletePosition)).satisfies(snapshotResult -> { + long snapshotDistance = markDeletePosition.getEntryId() - snapshotResult.position().getEntryId(); + assertThat(snapshotDistance).describedAs("snapshot result: %s markDeletePosition: %s", snapshotResult, + markDeletePosition).isLessThanOrEqualTo(expectedAverageDistance * 2); + }); + + } + + @Test + public void testSnapshotCachePruningScenarios() { + ReplicatedSubscriptionSnapshotCache cache = new ReplicatedSubscriptionSnapshotCache("my-subscription", 5, + range -> range.upperEndpoint().getEntryId() - range.lowerEndpoint().getEntryId()); + + ReplicatedSubscriptionsSnapshot s1 = new ReplicatedSubscriptionsSnapshot(); + s1.setLocalMessageId().setLedgerId(1).setEntryId(1); + cache.addNewSnapshot(s1); + + ReplicatedSubscriptionsSnapshot s2 = new ReplicatedSubscriptionsSnapshot(); + s2.setLocalMessageId().setLedgerId(1).setEntryId(2); + cache.addNewSnapshot(s2); + + ReplicatedSubscriptionsSnapshot s3 = new ReplicatedSubscriptionsSnapshot(); + s3.setLocalMessageId().setLedgerId(1).setEntryId(10); + cache.addNewSnapshot(s3); + + ReplicatedSubscriptionsSnapshot s4 = new ReplicatedSubscriptionsSnapshot(); + s4.setLocalMessageId().setLedgerId(1).setEntryId(15); + cache.addNewSnapshot(s4); + + ReplicatedSubscriptionsSnapshot s5 = new ReplicatedSubscriptionsSnapshot(); + s5.setLocalMessageId().setLedgerId(1).setEntryId(25); + cache.addNewSnapshot(s5); + + ReplicatedSubscriptionsSnapshot s6 = new ReplicatedSubscriptionsSnapshot(); + s6.setLocalMessageId().setLedgerId(1).setEntryId(100); + cache.addNewSnapshot(s6); + + // s2 should be pruned (special case where head is previous to the removed one) + assertThat(cache.getSnapshots()).hasSize(5) + .allSatisfy(snapshotEntry -> assertThat(snapshotEntry.position()).isNotEqualTo( + PositionFactory.create(1, 2))); + + ReplicatedSubscriptionsSnapshot s7 = new ReplicatedSubscriptionsSnapshot(); + s7.setLocalMessageId().setLedgerId(1).setEntryId(110); + cache.addNewSnapshot(s7); + + // s3 should be pruned (ordinary case where middle entry is removed) + assertThat(cache.getSnapshots()).hasSize(5) + .allSatisfy(snapshotEntry -> assertThat(snapshotEntry.position()).isNotEqualTo( + PositionFactory.create(1, 10))); + + ReplicatedSubscriptionsSnapshot s8 = new ReplicatedSubscriptionsSnapshot(); + s8.setLocalMessageId().setLedgerId(1).setEntryId(112); + cache.addNewSnapshot(s8); + + // s7 should be pruned (special case where tail is after the removed one) + assertThat(cache.getSnapshots()).hasSize(5) + .allSatisfy(snapshotEntry -> assertThat(snapshotEntry.position()).isNotEqualTo( + PositionFactory.create(1, 110))); + + + ReplicatedSubscriptionsSnapshot s9 = new ReplicatedSubscriptionsSnapshot(); + s9.setLocalMessageId().setLedgerId(1).setEntryId(113); + cache.addNewSnapshot(s9); + + // s8 should be pruned (check that pruning works after the one before the tail was removed) + assertThat(cache.getSnapshots()).hasSize(5) + .allSatisfy(snapshotEntry -> assertThat(snapshotEntry.position()).isNotEqualTo( + PositionFactory.create(1, 112))); + + ReplicatedSubscriptionsSnapshot s10 = new ReplicatedSubscriptionsSnapshot(); + s10.setLocalMessageId().setLedgerId(1).setEntryId(200); + cache.addNewSnapshot(s10); + + // s4 should be pruned (check that pruning still works immediately after head) + assertThat(cache.getSnapshots()).hasSize(5) + .allSatisfy(snapshotEntry -> assertThat(snapshotEntry.position()).isNotEqualTo( + PositionFactory.create(1, 15))); + + ReplicatedSubscriptionsSnapshot s11 = new ReplicatedSubscriptionsSnapshot(); + // entry id that is before the tail + s11.setLocalMessageId().setLedgerId(1).setEntryId(50); + cache.addNewSnapshot(s11); + + // all snapshots should be pruned, and s11 should be the only one + assertThat(cache.getSnapshots()).hasSize(1) + .first().satisfies(snapshotEntry -> assertThat(snapshotEntry.position()).isEqualTo( + PositionFactory.create(1, 50))); + } + + @Test(timeOut = 15_000) + public void testSnapshotCacheStressTest() { + int maxSnapshotToCache = 10_000; + int addSnapshotCount = 1_000_000; + + ReplicatedSubscriptionSnapshotCache cache = + new ReplicatedSubscriptionSnapshotCache("my-subscription", maxSnapshotToCache, + range -> range.upperEndpoint().getEntryId() - range.lowerEndpoint().getEntryId()); + + long ledgerIdCluster1 = 1; + long entryIdCluster1 = 0; + long ledgerIdCluster2 = 2; + long entryIdCluster2 = 0; + Random random = new Random(); + + int addedSnapshots = 0; + long markDeletePositionEntryId = 0; + long firstSnapshotEntryId = -1L; + + while (addedSnapshots < addSnapshotCount) { + // fill up the cache with random number of entries + int addInThisRound = 1 + random.nextInt(2 * maxSnapshotToCache); + for (int i = 0; i < addInThisRound; i++) { + ReplicatedSubscriptionsSnapshot snapshot = new ReplicatedSubscriptionsSnapshot() + .setSnapshotId(UUID.randomUUID().toString()); + snapshot.setLocalMessageId().setLedgerId(ledgerIdCluster1).setEntryId(entryIdCluster1); + snapshot.addCluster().setCluster("cluster1").setMessageId().setLedgerId(ledgerIdCluster1) + .setEntryId(entryIdCluster1); + snapshot.addCluster().setCluster("cluster2").setMessageId().setLedgerId(ledgerIdCluster2) + .setEntryId(entryIdCluster2); + cache.addNewSnapshot(snapshot); + if (firstSnapshotEntryId == -1L) { + firstSnapshotEntryId = entryIdCluster1; + } + addedSnapshots++; + entryIdCluster1 += 100 + random.nextInt(1000); + entryIdCluster2 += 100 + random.nextInt(1000); + } + long bound = entryIdCluster1 - firstSnapshotEntryId; + if (bound > 0) { + markDeletePositionEntryId = firstSnapshotEntryId + random.nextLong(bound); + } else { + markDeletePositionEntryId = firstSnapshotEntryId; + } + ReplicatedSubscriptionSnapshotCache.SnapshotResult snapshotResult = cache.advancedMarkDeletePosition( + PositionFactory.create(ledgerIdCluster1, markDeletePositionEntryId)); + assertNotNull(snapshotResult); + firstSnapshotEntryId = -1L; + } + } +} \ No newline at end of file
