codelipenghui commented on code in PR #24444:
URL: https://github.com/apache/pulsar/pull/24444#discussion_r2232554669


##########
pip/pip-430.md:
##########
@@ -0,0 +1,335 @@
+# PIP-430: Pulsar Broker cache improvements: refactoring eviction and adding a 
new cache strategy based on expected read count
+
+# Background knowledge
+
+Apache Pulsar brokers maintain an in-memory entry cache to reduce latency and 
load on BookKeeper and tiered storage (S3) by serving frequently accessed 
message data directly from memory.
+
+Key concepts:
+- **`ManagedLedgerImpl`**: Manages the storage for a single topic partition. 
Each `ManagedLedgerImpl` instance has its own `EntryCache` (typically 
`RangeEntryCacheImpl`) instance.
+- **`EntryCache`**: Stores message entries (payloads) in memory. The default 
implementation is `RangeEntryCacheImpl`, which uses a `RangeCache` internally.
+- **`RangeCache`**: A specialized cache storing entries mapped by their 
`Position` (ledgerId, entryId). It supports range-based operations for 
retrieval and expiration.
+- **`EntryCacheManager` (`RangeEntryCacheManagerImpl`)**: A global component 
that limits the total size of all entry caches in a broker. When the total size 
exceeds a threshold, it triggers eviction.
+- **Cache Eviction Policies**: Mechanisms to remove entries from the cache to 
make space or remove old data.
+    - **Timestamp-based eviction**: Removes entries older than a configured 
threshold (e.g., `managedLedgerCacheEvictionTimeThresholdMillis`). This is 
currently handled periodically by `ManagedLedgerFactoryImpl` iterating over all 
managed ledgers.
+    - **Size-based eviction**: Removes entries when the total cache size 
exceeds a configured limit (`managedLedgerCacheSizeMB`). The current 
implementation resides in `EntryCacheDefaultEvictionPolicy`, which selects a 
subset of larger caches and proportionally evicts entries from each entry cache 
to keep the total cache size under the limit.
+    - **Cursor-based eviction**: Invalidates cache entries up to the slowest 
consumer's read position or mark-delete position, depending on 
`cacheEvictionByMarkDeletedPosition`.
+
+The broker cache serves various read patterns:
+- **Tailing reads**: Consumers reading the latest published messages.
+- **Catch-up reads (backlogged cursors)**: Consumers reading older messages to 
catch up.
+- **Key_Shared subscription reads**: Messages with the same key are routed to 
the same consumer. If a consumer is slow, messages might be replayed while the 
cursor reads more messages for available consumers.
+
+# Motivation
+
+The current Pulsar broker entry cache implementation and its eviction 
mechanisms face several challenges that impact performance, efficiency, and 
predictability:
+
+1.  **Inefficient and Flawed Size-Based Eviction**:
+    The `EntryCacheDefaultEvictionPolicy` (the current default for size-based 
eviction) does not guarantee the removal of the oldest entries globally. It 
sorts individual `EntryCache` instances by their size, selects a percentage of 
the largest caches, and then asks each of them to evict a proportional amount 
of data. This can lead to newer entries being evicted from large, active caches 
while older, less relevant entries remain in smaller or less active caches, 
resulting in suboptimal cache utilization and potentially lower hit rates.
+
+2.  **Inefficient and Incorrect Timestamp-Based Eviction**:
+    The existing timestamp-based eviction mechanism, triggered by 
`ManagedLedgerFactoryImpl`, has significant performance and correctness issues:
+    *   **Performance**: It iterates through *all* `ManagedLedgerImpl` 
instances and their respective `EntryCache` instances periodically (default 
every 10ms). In brokers with a large number of topics, this frequent and 
exhaustive iteration leads to high CPU utilization and memory pressure.
+    *   **Correctness**: The per-cache eviction (e.g., 
`RangeCache.evictLEntriesBeforeTimestamp`) often assumes entries within a 
single `RangeCache` are primarily ordered by timestamp due to typical 
append-only workloads. This assumption breaks down with mixed read patterns 
like catch-up reads or when entries are inserted out of their natural position 
order (Key_shared subscription replay queue scenario), potentially leading to 
incorrect eviction decisions or inefficient scanning.
+
+3.  **Limited Cache Scope and Effectiveness for Diverse Read Patterns**:
+    The original `RangeCache` was primarily designed with tailing reads in 
mind. While support for caching for backlogged cursors and replay queue reads 
was added later, the eviction algorithms were not holistically updated to 
effectively manage mixed read patterns (tailing, catch-up, replays in 
Key_Shared). This can lead to:
+    *   Unnecessary BookKeeper and tiered storage (S3) reads during catch-up 
scenarios, even if data was recently read for another consumer.
+    *   Poor cache hit rates for Key_Shared subscriptions with slow consumers, 
as entries might be evicted before a replayed message (due to consumer 
unacknowledgment or redelivery request) is read again.
+
+4.  **Foundation for Advanced Caching Strategies Needed**:
+    The current cache architecture makes it difficult to implement more 
intelligent caching strategies that could further optimize for common Pulsar 
use cases, such as efficiently handling fan-out to multiple shared consumers or 
retaining entries expected to be read by several cursors.
+
+Addressing these issues is crucial for improving broker performance, reducing 
operational costs (lower BookKeeper load), and providing a more robust caching 
layer that can adapt to diverse workloads.
+
+# Goals
+
+The refactoring aims to make the cache eviction more robust, performant, and 
predictable. The "expected read count" strategy is an attempt to make the cache 
more aware of Pulsar's specific consumption patterns.
+
+## In Scope
+
+-   **Refactor Cache Eviction Mechanism**:
+    -   Replace the existing per-cache iteration for timestamp eviction and 
the `EntryCacheDefaultEvictionPolicy` for size-based eviction with a 
centralized, insertion-order aware mechanism.
+    -   Implement a `RangeCacheRemovalQueue` that tracks all cached entries 
globally in approximate insertion order.
+    -   Ensure timestamp-based eviction reliably removes entries older than 
the threshold by processing this queue.
+    -   Ensure size-based eviction globally removes the oldest entries first 
from this queue until the target size is met.
+-   **Introduce "Expected Read Count" Cache Strategy**:
+    -   Implement a new caching strategy where entries track an "expected read 
count," representing how many active cursors are anticipated to read that entry.
+    -   Prioritize retaining entries with a positive expected read count 
during size-based eviction.
+    -   Provide a new configuration option 
(`cacheEvictionByExpectedReadCount`) to enable this strategy.
+-   **Improve Performance and Efficiency**:
+    -   Reduce CPU overhead associated with cache eviction, particularly 
timestamp-based eviction.
+    -   Improve overall cache hit rates by making better eviction decisions.
+-   **Enhance Correctness**: Ensure eviction policies work correctly across 
various read patterns.
+-   **Provide a Foundation for Future Cache Optimizations**: The refactored 
design should make it easier to implement further caching improvements, such as 
more sophisticated strategies for catch-up reads or Key_Shared subscriptions.
+-   **Simplify RangeCache Implementation**: Remove unnecessary generic type 
parameters from the RangeCache implementation. Since RangeCache is used 
exclusively for a single purpose within the Pulsar codebase (caching managed 
ledger entries), the generic key and value parameters add unnecessary 
complexity without providing any practical benefit. This simplification will 
improve code readability and reduce cognitive overhead.
+
+# High Level Design
+
+The proposed solution involves two main components: a refactored eviction 
mechanism using a centralized removal queue and a new cache strategy based on 
expected read count.
+
+## 1. Centralized Cache Eviction with `RangeCacheRemovalQueue`
+
+A new component, `RangeCacheRemovalQueue`, will be introduced at the 
`EntryCacheManager` level.
+-   **Entry Tracking**: When an entry is inserted into any 
`RangeEntryCacheImpl` (the per-ledger cache), a lightweight wrapper for this 
entry (`RangeCacheEntryWrapper`) is also added to this global 
`RangeCacheRemovalQueue`. This queue maintains entries in their global 
insertion order (FIFO). This wrapper is already necessary in 
`RangeEntryCacheImpl` to prevent consistency issues. The current internal 
wrapper class is refactored to a top-level class so that it can be used with 
the removal queue.
+-   **Timestamp-Based Eviction**: A single, periodic task (e.g., managed by 
`ManagedLedgerFactoryImpl`'s `cacheEvictionExecutor`) will process the 
`RangeCacheRemovalQueue`. It will iterate from the head of the queue, removing 
entries whose `timestampNanos` are older than the 
`cacheEvictionTimeThresholdNanos`. Since the queue is insertion-ordered, this 
process can often stop early once it encounters an entry that is not expired.
+-   **Size-Based Eviction**: When the `EntryCacheManager` detects that the 
total cache size exceeds `evictionTriggerThresholdPercent * maxSize`, it will 
trigger an eviction cycle. This cycle will also process the 
`RangeCacheRemovalQueue` from the head, removing the oldest entries (regardless 
of which specific ledger they belong to) until the cache size is brought down 
to `cacheEvictionWatermark * maxSize`.
+-   **Stashing Mechanism**: During size-based eviction, if an entry is 
encountered that should not be evicted immediately (e.g., due to a positive 
"expected read count" as per the new strategy, and it hasn't met timestamp 
expiration), it will be temporarily "stashed" (moved to a secondary list within 
`RangeCacheRemovalQueue`). Stashed entries are reconsidered in subsequent 
eviction passes or if they eventually expire by timestamp. This prevents 
premature eviction of entries that are likely to be read again soon.
+
+This centralized approach replaces the distributed, per-cache iteration for 
timestamp eviction and the less precise `EntryCacheDefaultEvictionPolicy` for 
size eviction, leading to more globally optimal and efficient eviction 
decisions.
+
+## 2. "Expected Read Count" Cache Strategy
+
+This new strategy aims to improve cache hit rates by retaining entries that 
are likely to be read by multiple active consumers/cursors.
+-   **`EntryReadCountHandler`**: Each cached entry (`CachedEntryImpl`) will be 
associated with an `EntryReadCountHandlerImpl`. This handler maintains an 
`expectedReadCount` (an atomic integer).
+-   **Initialization**:
+    -   When a new entry is added to the ledger (`OpAddEntry`), its 
`expectedReadCount` is initialized to the number of *active* cursors.
+    -   When entries are read from BookKeeper or tiered storage and inserted 
into the cache (`RangeEntryCacheImpl.readFromStorage`), the `expectedReadCount` 
is initialized based on the current state of active cursors currently 
positioned before or at the entry being added. This information is sourced from 
`ManagedCursorContainer.getNumberOfCursorsAtSamePositionOrBefore(ManagedCursor)`.

Review Comment:
   Is it better to also check the read position of the active cursors? If they 
are far away from this entry, we might don't need consider it in  
expectedReadCount?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to