codelipenghui commented on code in PR #24444: URL: https://github.com/apache/pulsar/pull/24444#discussion_r2232554669
########## pip/pip-430.md: ########## @@ -0,0 +1,335 @@ +# PIP-430: Pulsar Broker cache improvements: refactoring eviction and adding a new cache strategy based on expected read count + +# Background knowledge + +Apache Pulsar brokers maintain an in-memory entry cache to reduce latency and load on BookKeeper and tiered storage (S3) by serving frequently accessed message data directly from memory. + +Key concepts: +- **`ManagedLedgerImpl`**: Manages the storage for a single topic partition. Each `ManagedLedgerImpl` instance has its own `EntryCache` (typically `RangeEntryCacheImpl`) instance. +- **`EntryCache`**: Stores message entries (payloads) in memory. The default implementation is `RangeEntryCacheImpl`, which uses a `RangeCache` internally. +- **`RangeCache`**: A specialized cache storing entries mapped by their `Position` (ledgerId, entryId). It supports range-based operations for retrieval and expiration. +- **`EntryCacheManager` (`RangeEntryCacheManagerImpl`)**: A global component that limits the total size of all entry caches in a broker. When the total size exceeds a threshold, it triggers eviction. +- **Cache Eviction Policies**: Mechanisms to remove entries from the cache to make space or remove old data. + - **Timestamp-based eviction**: Removes entries older than a configured threshold (e.g., `managedLedgerCacheEvictionTimeThresholdMillis`). This is currently handled periodically by `ManagedLedgerFactoryImpl` iterating over all managed ledgers. + - **Size-based eviction**: Removes entries when the total cache size exceeds a configured limit (`managedLedgerCacheSizeMB`). The current implementation resides in `EntryCacheDefaultEvictionPolicy`, which selects a subset of larger caches and proportionally evicts entries from each entry cache to keep the total cache size under the limit. + - **Cursor-based eviction**: Invalidates cache entries up to the slowest consumer's read position or mark-delete position, depending on `cacheEvictionByMarkDeletedPosition`. + +The broker cache serves various read patterns: +- **Tailing reads**: Consumers reading the latest published messages. +- **Catch-up reads (backlogged cursors)**: Consumers reading older messages to catch up. +- **Key_Shared subscription reads**: Messages with the same key are routed to the same consumer. If a consumer is slow, messages might be replayed while the cursor reads more messages for available consumers. + +# Motivation + +The current Pulsar broker entry cache implementation and its eviction mechanisms face several challenges that impact performance, efficiency, and predictability: + +1. **Inefficient and Flawed Size-Based Eviction**: + The `EntryCacheDefaultEvictionPolicy` (the current default for size-based eviction) does not guarantee the removal of the oldest entries globally. It sorts individual `EntryCache` instances by their size, selects a percentage of the largest caches, and then asks each of them to evict a proportional amount of data. This can lead to newer entries being evicted from large, active caches while older, less relevant entries remain in smaller or less active caches, resulting in suboptimal cache utilization and potentially lower hit rates. + +2. **Inefficient and Incorrect Timestamp-Based Eviction**: + The existing timestamp-based eviction mechanism, triggered by `ManagedLedgerFactoryImpl`, has significant performance and correctness issues: + * **Performance**: It iterates through *all* `ManagedLedgerImpl` instances and their respective `EntryCache` instances periodically (default every 10ms). In brokers with a large number of topics, this frequent and exhaustive iteration leads to high CPU utilization and memory pressure. + * **Correctness**: The per-cache eviction (e.g., `RangeCache.evictLEntriesBeforeTimestamp`) often assumes entries within a single `RangeCache` are primarily ordered by timestamp due to typical append-only workloads. This assumption breaks down with mixed read patterns like catch-up reads or when entries are inserted out of their natural position order (Key_shared subscription replay queue scenario), potentially leading to incorrect eviction decisions or inefficient scanning. + +3. **Limited Cache Scope and Effectiveness for Diverse Read Patterns**: + The original `RangeCache` was primarily designed with tailing reads in mind. While support for caching for backlogged cursors and replay queue reads was added later, the eviction algorithms were not holistically updated to effectively manage mixed read patterns (tailing, catch-up, replays in Key_Shared). This can lead to: + * Unnecessary BookKeeper and tiered storage (S3) reads during catch-up scenarios, even if data was recently read for another consumer. + * Poor cache hit rates for Key_Shared subscriptions with slow consumers, as entries might be evicted before a replayed message (due to consumer unacknowledgment or redelivery request) is read again. + +4. **Foundation for Advanced Caching Strategies Needed**: + The current cache architecture makes it difficult to implement more intelligent caching strategies that could further optimize for common Pulsar use cases, such as efficiently handling fan-out to multiple shared consumers or retaining entries expected to be read by several cursors. + +Addressing these issues is crucial for improving broker performance, reducing operational costs (lower BookKeeper load), and providing a more robust caching layer that can adapt to diverse workloads. + +# Goals + +The refactoring aims to make the cache eviction more robust, performant, and predictable. The "expected read count" strategy is an attempt to make the cache more aware of Pulsar's specific consumption patterns. + +## In Scope + +- **Refactor Cache Eviction Mechanism**: + - Replace the existing per-cache iteration for timestamp eviction and the `EntryCacheDefaultEvictionPolicy` for size-based eviction with a centralized, insertion-order aware mechanism. + - Implement a `RangeCacheRemovalQueue` that tracks all cached entries globally in approximate insertion order. + - Ensure timestamp-based eviction reliably removes entries older than the threshold by processing this queue. + - Ensure size-based eviction globally removes the oldest entries first from this queue until the target size is met. +- **Introduce "Expected Read Count" Cache Strategy**: + - Implement a new caching strategy where entries track an "expected read count," representing how many active cursors are anticipated to read that entry. + - Prioritize retaining entries with a positive expected read count during size-based eviction. + - Provide a new configuration option (`cacheEvictionByExpectedReadCount`) to enable this strategy. +- **Improve Performance and Efficiency**: + - Reduce CPU overhead associated with cache eviction, particularly timestamp-based eviction. + - Improve overall cache hit rates by making better eviction decisions. +- **Enhance Correctness**: Ensure eviction policies work correctly across various read patterns. +- **Provide a Foundation for Future Cache Optimizations**: The refactored design should make it easier to implement further caching improvements, such as more sophisticated strategies for catch-up reads or Key_Shared subscriptions. +- **Simplify RangeCache Implementation**: Remove unnecessary generic type parameters from the RangeCache implementation. Since RangeCache is used exclusively for a single purpose within the Pulsar codebase (caching managed ledger entries), the generic key and value parameters add unnecessary complexity without providing any practical benefit. This simplification will improve code readability and reduce cognitive overhead. + +# High Level Design + +The proposed solution involves two main components: a refactored eviction mechanism using a centralized removal queue and a new cache strategy based on expected read count. + +## 1. Centralized Cache Eviction with `RangeCacheRemovalQueue` + +A new component, `RangeCacheRemovalQueue`, will be introduced at the `EntryCacheManager` level. +- **Entry Tracking**: When an entry is inserted into any `RangeEntryCacheImpl` (the per-ledger cache), a lightweight wrapper for this entry (`RangeCacheEntryWrapper`) is also added to this global `RangeCacheRemovalQueue`. This queue maintains entries in their global insertion order (FIFO). This wrapper is already necessary in `RangeEntryCacheImpl` to prevent consistency issues. The current internal wrapper class is refactored to a top-level class so that it can be used with the removal queue. +- **Timestamp-Based Eviction**: A single, periodic task (e.g., managed by `ManagedLedgerFactoryImpl`'s `cacheEvictionExecutor`) will process the `RangeCacheRemovalQueue`. It will iterate from the head of the queue, removing entries whose `timestampNanos` are older than the `cacheEvictionTimeThresholdNanos`. Since the queue is insertion-ordered, this process can often stop early once it encounters an entry that is not expired. +- **Size-Based Eviction**: When the `EntryCacheManager` detects that the total cache size exceeds `evictionTriggerThresholdPercent * maxSize`, it will trigger an eviction cycle. This cycle will also process the `RangeCacheRemovalQueue` from the head, removing the oldest entries (regardless of which specific ledger they belong to) until the cache size is brought down to `cacheEvictionWatermark * maxSize`. +- **Stashing Mechanism**: During size-based eviction, if an entry is encountered that should not be evicted immediately (e.g., due to a positive "expected read count" as per the new strategy, and it hasn't met timestamp expiration), it will be temporarily "stashed" (moved to a secondary list within `RangeCacheRemovalQueue`). Stashed entries are reconsidered in subsequent eviction passes or if they eventually expire by timestamp. This prevents premature eviction of entries that are likely to be read again soon. + +This centralized approach replaces the distributed, per-cache iteration for timestamp eviction and the less precise `EntryCacheDefaultEvictionPolicy` for size eviction, leading to more globally optimal and efficient eviction decisions. + +## 2. "Expected Read Count" Cache Strategy + +This new strategy aims to improve cache hit rates by retaining entries that are likely to be read by multiple active consumers/cursors. +- **`EntryReadCountHandler`**: Each cached entry (`CachedEntryImpl`) will be associated with an `EntryReadCountHandlerImpl`. This handler maintains an `expectedReadCount` (an atomic integer). +- **Initialization**: + - When a new entry is added to the ledger (`OpAddEntry`), its `expectedReadCount` is initialized to the number of *active* cursors. + - When entries are read from BookKeeper or tiered storage and inserted into the cache (`RangeEntryCacheImpl.readFromStorage`), the `expectedReadCount` is initialized based on the current state of active cursors currently positioned before or at the entry being added. This information is sourced from `ManagedCursorContainer.getNumberOfCursorsAtSamePositionOrBefore(ManagedCursor)`. Review Comment: Is it better to also check the read position of the active cursors? If they are far away from this entry, we might don't need consider it in expectedReadCount? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
