This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8879b3ba9da [feat][pip] PIP-442: Add memory limits for 
CommandGetTopicsOfNamespace and CommandWatchTopicList on Broker and Proxy 
(#24727)
8879b3ba9da is described below

commit 8879b3ba9daa6de9b8b78bbce89bab68cc37cf0b
Author: Lari Hotari <[email protected]>
AuthorDate: Thu Sep 18 21:48:38 2025 +0300

    [feat][pip] PIP-442: Add memory limits for CommandGetTopicsOfNamespace and 
CommandWatchTopicList on Broker and Proxy (#24727)
---
 pip/pip-442.md | 537 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 537 insertions(+)

diff --git a/pip/pip-442.md b/pip/pip-442.md
new file mode 100644
index 00000000000..19e0500d2de
--- /dev/null
+++ b/pip/pip-442.md
@@ -0,0 +1,537 @@
+# PIP-442: Add memory limits for CommandGetTopicsOfNamespace and 
CommandWatchTopicList on Broker and Proxy
+
+## Background Knowledge
+
+Apache Pulsar brokers provide commands for clients to discover topics within a 
namespace and watch for topic updates.
+These commands are critical for client operations but currently lack memory 
limits and flow control mechanisms,
+creating potential memory and stability issues at scale.
+
+### Existing Broker Memory Management
+
+Pulsar brokers already implement comprehensive memory management for most 
operations through several key configurations:
+
+**Message Publishing Memory Limits:**
+- `maxMessagePublishBufferSizeInMB` (default: 50% direct memory): Limits 
memory used for buffering messages during publishing, providing backpressure 
when producers exceed broker capacity
+
+**Managed Ledger Memory Limits:**
+- `managedLedgerMaxReadsInFlightSizeInMB` (default: 0, disabled): Controls 
memory allocation for concurrent read operations from BookKeeper, preventing 
excessive memory usage during high read loads. This limit extends to cover 
buffers that were read from BookKeeper and are waiting in channel outbound 
buffers to be written to client sockets. 
+- `managedLedgerCacheSizeMB` (default: 20% of direct memory): Limits cache 
memory for recently read ledger entries, ensuring predictable memory usage for 
read caching. This limit extends to cover buffers that were read from the cache 
and are waiting in channel outbound buffers to be written to client sockets. 
+
+**Additional Memory Controls:**
+- `maxConcurrentLookupRequest` (default: 50000): Limits concurrent topic 
lookup requests. The unit of this limit is the number of requests; it is not 
expressed in memory size.
+- `maxConcurrentTopicLoadRequest` (default: 5000): Controls concurrent topic 
loading operations. The unit of this limit is the number of requests; it is not 
expressed in memory size.
+
+These existing limits effectively bound memory usage for message handling, 
storage operations, and most broker functions. However, there is a significant 
gap in memory management for topic discovery operations.
+
+### The Memory Management Gap
+
+Major unbounded memory allocation in Pulsar brokers occurs during topic 
listing operations:
+
+- `CommandGetTopicsOfNamespace` / `CommandGetTopicsOfNamespaceResponse`
+- `CommandWatchTopicList` / `CommandWatchTopicListSuccess` & 
`CommandWatchTopicUpdate`
+
+These operations can allocate arbitrary amounts of memory based on namespace 
size, with no limits or backpressure mechanisms.
+
+### Key Components
+
+**Topic Discovery Commands:**
+- **`CommandGetTopicsOfNamespace`**: Binary protocol command that retrieves 
all topics within a namespace
+- **`CommandGetTopicsOfNamespaceResponse`**: Response containing the list of 
topics
+- **`CommandWatchTopicList`**: Command to establish a watch for topic list 
changes
+- **`CommandWatchTopicListSuccess`**: Initial response confirming watch 
establishment and returning current topic list
+- **`CommandWatchTopicUpdate`**: Notifications sent when topics are added or 
removed
+
+**Current Implementation Flow:**
+
+The `getTopicsOfNamespace` request follows this path:
+
+1. **Client Request**: Sends `CommandGetTopicsOfNamespace` via binary protocol
+2. **Request Handling**: 
+   - Broker: `ServerCnx.handleGetTopicsOfNamespace()` 
+   - Proxy: `ProxyConnection.handleGetTopicsOfNamespace()`
+3. **Topic Retrieval**: `NamespaceService.getListOfUserTopics()` orchestrates:
+   - Fetches persistent topics from `TopicResources`
+   - Retrieves non-persistent topics from local cache or peer clusters
+   - Filters system topics using `TopicList.filterSystemTopic()`
+   - Implements caching via `inProgressQueryUserTopics` to prevent duplicate 
queries
+4. **Response Construction**: Packages results with hash calculation and 
filtering metadata
+5. **Response Transmission**: Sends complete response back to client
+
+### The Unbounded Memory Problem
+
+Unlike other broker operations that have memory limits, topic listing 
operations create unbounded memory allocation scenarios:
+
+**Memory Allocation Points:**
+1. **Topic List Assembly**: When retrieving topics from metadata store, the 
entire list is materialized in heap memory
+2. **Response Object Creation**: The complete topic list is serialized into a 
response object
+3. **Network Buffers**: Netty allocates direct memory for the serialized 
response
+4. **Proxy Buffering**: Proxy deserializes broker response then re-serializes 
for client
+
+**Scale Impact:**
+- Namespace with 10,000 topics × 100 bytes average topic name = ~1MB per 
response
+- With 1000 concurrent requests: ~1GB memory pressure
+
+## Motivation
+
+The lack of memory limits for topic listing commands creates the final 
significant gap in Pulsar's otherwise comprehensive memory management system:
+
+1. **Memory Management Consistency**: While all other broker operations have 
memory limits and backpressure mechanisms, topic listing operations remain 
unbounded, creating an inconsistent and unpredictable memory profile.
+
+2. **Broker Memory Exhaustion Risk**: Large clusters can trigger 
OutOfMemoryErrors when multiple clients simultaneously request topic lists, 
causing broker crashes and service disruption despite other memory controls 
being in place.
+
+3. **Proxy Memory Exhaustion Risk**: Proxies are also impacted for 
`CommandGetTopicsOfNamespace` since the request is forwarded to a broker and 
the response is deserialized and reserialized without limits.
+
+4. **Unpredictable Resource Usage**: Operators cannot reliably predict or 
limit total broker or proxy memory consumption due to this unbounded allocation 
path, undermining capacity planning and resource management.
+
+5. **Performance Degradation**: Even without OOM, large topic list operations 
cause GC pressure and latency spikes affecting all broker operations, 
counteracting the stability provided by other memory limits.
+
+## Goals
+
+### In Scope
+
+- Close the memory management gap by implementing configurable memory limits 
for topic listing operations
+- Add memory-based flow control and backpressure for both 
`CommandGetTopicsOfNamespace` and `CommandWatchTopicList` commands
+- Provide separate limits for heap and direct memory consumption, consistent 
with existing broker memory management patterns
+- Ensure fairness through queueing mechanisms when memory limits are reached
+- Add comprehensive metrics for monitoring and alerting, similar to existing 
memory limit metrics
+- Maintain backward compatibility with existing clients
+
+### Out of Scope
+
+- Pagination or streaming of topic lists (requires protocol changes)
+- Compression of topic list responses (separate optimization)
+- Changes to topic discovery semantics or filtering capabilities
+- Memory limits for other broker commands (already covered by existing 
configurations)
+
+## High-Level Design
+
+The solution introduces an `AsyncDualMemoryLimiter` that acts as a 
memory-aware semaphore for topic listing operations, completing Pulsar's memory 
management framework:
+
+1. **Memory Tracking**: Before processing requests or sending responses, the 
system estimates memory requirements and acquires permits from the limiter. 
When the permit cannot be estimated and allocated before the operation, an 
initial permit is acquired and updated before continuing with handling. 
Although not optimal, this will effectively limit memory usage across the 
broker.
+
+2. **Dual Memory Pools**: Separate tracking for heap memory (topic list 
assembly) and direct memory (network buffers) with independent limits, since 
topic listing operations use both types of memory.
+
+3. **Asynchronous Backpressure**: When memory limits are reached, requests 
queue with configurable timeouts rather than failing immediately, providing 
graceful degradation similar to `managedLedgerMaxReadsInFlightSizeInMB` 
behavior. This type of solution is helpful since rejecting requests and 
requiring clients to retry can cause more load on the system and would cause 
unfair queueing. When the queue is completely full, requests are rejected.
+
+4. **Graceful Degradation**: The system continues processing within memory 
limits rather than crashing, with clear metrics indicating when memory-based 
throttling occurs.
+
+5. **Release Guarantees**: Memory permits are released after response 
transmission completes or on request failure, preventing memory leaks and 
ensuring accurate memory tracking.
+
+## Detailed Design
+
+### Design & Implementation Details
+
+#### AsyncSemaphore Interface
+
+This is an abstraction for a generic asynchronous semaphore. The memory 
limiter implementation will use this abstraction to implement separate limiters 
for heap and direct memory.
+
+```java
+public interface AsyncSemaphore {
+    /**
+     * Acquire permits from the semaphore.
+     * Returned future completes when permits are available.
+     * It will complete exceptionally with 
AsyncSemaphorePermitAcquireTimeoutException on timeout
+     * and exceptionally with AsyncSemaphorePermitAcquireQueueFullException 
when queue full
+     * @return CompletableFuture that completes with permit when available
+     */
+    CompletableFuture<AsyncSemaphorePermitResult> acquire(long permits);
+
+    /**
+     * Acquire or release permits for previously acquired permits by updating 
the permits.
+     * Returns a future that completes when permits are available.
+     * It will complete exceptionally with 
AsyncSemaphorePermitAcquireTimeoutException on timeout
+     * and exceptionally with AsyncSemaphorePermitAcquireQueueFullException 
when queue full
+     * @return CompletableFuture that completes with permit when available
+     */
+    CompletableFuture<AsyncSemaphorePermit> update(AsyncSemaphorePermit 
permit, long newPermits);
+    
+    /**
+     * Release previously acquired permit.
+     * Must be called to prevent memory permit leaks.
+     */
+    void release(AsyncSemaphorePermit permit);
+}
+```
+
+#### AsyncDualMemoryLimiter Interface
+
+This is an abstraction for an asynchronous memory semaphore that tracks 
separate limits for heap and direct memory.
+
+```java
+public interface AsyncDualMemoryLimiter {
+    enum LimitType {
+        HEAP_MEMORY,    // For heap memory allocation
+        DIRECT_MEMORY   // For direct memory allocation
+    }
+    
+    /**
+     * Acquire permits for the specified memory size.
+     * Returned future completes when memory permits are available.
+     * It will complete exceptionally with 
AsyncSemaphorePermitAcquireTimeoutException on timeout
+     * and exceptionally with AsyncSemaphorePermitAcquireQueueFullException 
when queue full
+     * @return CompletableFuture that completes with permit when available
+     */
+    CompletableFuture<AsyncDualMemoryLimiterPermit> acquire(long memorySize, 
LimitType limitType);
+
+    /**
+     * Acquire or release permits for previously acquired permits by updating 
the requested memory size.
+     * Returns a future that completes when permits are available.
+     * It will complete exceptionally with 
AsyncSemaphorePermitAcquireTimeoutException on timeout
+     * and exceptionally with AsyncSemaphorePermitAcquireQueueFullException 
when queue full
+     * @return CompletableFuture that completes with permit when available
+     */
+    CompletableFuture<AsyncDualMemoryLimiterPermit> 
update(AsyncDualMemoryLimiterPermit permit, long newMemorySize);
+    
+    /**
+     * Release previously acquired permit.
+     * Must be called to prevent memory permit leaks.
+     */
+    void release(AsyncDualMemoryLimiterPermit permit);
+}
+```
+
+#### Integration Points
+
+**1. Heap Memory Limiting (Post-Retrieval)**
+
+In `ServerCnx.handleGetTopicsOfNamespace`:
+
+```java
+// Acquire a fixed amount of permits initially since it's not known how much 
memory will be used
+// This will ensure that the operation continues only after it has the initial 
permits
+// It would be possible to use statistics for initial estimate, but this is 
simpler and sufficient
+maxTopicListInFlightLimiter.acquire(INITIAL_SIZE, 
AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY)
+    .thenCompose(initialPermit -> {
+        
getBrokerService().pulsar().getNamespaceService().getListOfUserTopics(namespaceName,
 mode)
+            .thenCompose(topics -> {
+                // Estimate memory after retrieval and update the permits to 
reflect the actual size
+                long estimatedSize = 
topics.stream().mapToInt(String::length).sum();
+                return maxTopicListInFlightLimiter
+                    .update(initialPermit, estimatedSize)
+                    .thenApply(permit -> Pair.of(topics, permit));
+            })
+            .thenAccept(topicsAndPermit -> {
+                try {
+                    // Process and send response
+                    ...
+                } finally {
+                    
maxTopicListInFlightLimiter.release(topicsAndPermit.getRight());
+                }
+            });
+        ...
+    // For exceptional paths, initialPermit would need to be released
+```
+
+**2. Direct Memory Limiting (Pre-Serialization)**
+
+Modified `CommandSender` implementation:
+
+```java
+@Override
+public void sendGetTopicsOfNamespaceResponse(List<String> topics, String 
topicsHash,
+                                             boolean filtered, boolean 
changed, long requestId) {
+    BaseCommand command = 
Commands.newGetTopicsOfNamespaceResponseCommand(topics, topicsHash,
+            filtered, changed, requestId);
+    safeIntercept(command, cnx);
+    acquireMaxTopicListInFlightPermitsAndWriteAndFlush(command);
+}
+
+private void acquireMaxTopicListInFlightPermitsAndWriteAndFlush(BaseCommand 
command) {
+    // Calculate serialized size before acquiring permits
+    int serializedSize = command.getSerializedSize();
+    // Acquire permits
+    maxTopicListInFlightLimiter.acquire(serializedSize, 
AsyncDualMemoryLimiter.LimitType.DIRECT_MEMORY)
+            .thenAcceptAsync(permits -> {
+                try {
+                    // Serialize the response
+                    ByteBuf outBuf = 
Commands.serializeWithPrecalculatedSerializedSize(command, serializedSize);
+                    // Write the response
+                    cnx.ctx().writeAndFlush(outBuf).addListener(future -> {
+                        // Release permits after the response has been written 
to the socket
+                        maxTopicListInFlightLimiter.release(permits);
+                    });
+                } catch (Exception e) {
+                    // Return permits if an exception occurs before 
writeAndFlush is called successfully
+                    maxTopicListInFlightLimiter.release(permits);
+                    throw e;
+                }
+            }, cnx.ctx().executor());
+}
+```
+
+**3. Watch Command Memory Control**
+
+Similar memory limiting patterns apply to watch commands:
+
+```java
+public void sendWatchTopicListSuccess(long requestId, long watcherId, String 
topicsHash, List<String> topics) {
+    BaseCommand command = Commands.newWatchTopicListSuccess(requestId, 
watcherId, topicsHash, topics);
+    acquireMaxTopicListInFlightPermitsAndWriteAndFlush(command);
+}
+
+public void sendWatchTopicListUpdate(long watcherId, List<String> newTopics, 
List<String> deletedTopics, String topicsHash) {
+    BaseCommand command = Commands.newWatchTopicUpdate(watcherId, newTopics, 
deletedTopics, topicsHash);
+    acquireMaxTopicListInFlightPermitsAndWriteAndFlush(command);
+}
+```
+
+**4. Proxy Reading Memory Control**
+
+On the Pulsar Proxy side, the problem is slightly different. The problem 
occurs when the proxy receives a `CommandGetTopicsOfNamespace` command, 
forwards it to a broker, and receives a response. The proxy needs to 
deserialize and serialize the response before sending it to the client.
+Memory is allocated for both deserialization and serialization.
+
+Solving this requires a slight modification to PulsarDecoder.
+
+In `PulsarDecoder.channelRead`, it would be necessary to record the size of 
the incoming message:
+
+```java
+        // Get a buffer that contains the full frame
+        ByteBuf buffer = (ByteBuf) msg;
+        try {
+            // De-serialize the command
+            int cmdSize = (int) buffer.readUnsignedInt();
+            cmd.parseFrom(buffer, cmdSize);
+```
+
+It could be modified to store the `cmdSize` in a field instead of a local 
variable:
+```java
+    protected int cmdSize;
+...
+        // Get a buffer that contains the full frame
+        ByteBuf buffer = (ByteBuf) msg;
+        try {
+            // De-serialize the command
+            cmdSize = (int) buffer.readUnsignedInt();
+            cmd.parseFrom(buffer, cmdSize);
+```
+
+Changes would be needed to be able to use this serialized size so that it 
doesn't need to be re-calculated.
+`cmdSize` would be added as a field to `GetTopicsResult`:
+
+```java
+@Override
+protected void 
handleGetTopicsOfNamespaceSuccess(CommandGetTopicsOfNamespaceResponse success) {
+    checkArgument(state == State.Ready);
+
+    long requestId = success.getRequestId();
+    List<String> topics = success.getTopicsList();
+
+
+    if (log.isDebugEnabled()) {
+        log.debug("{} Received get topics of namespace success response from 
server: {} - topics.size: {}",
+                ctx.channel(), success.getRequestId(), topics.size());
+    }
+
+    CompletableFuture<GetTopicsResult> requestFuture =
+            (CompletableFuture<GetTopicsResult>) 
pendingRequests.remove(requestId);
+    if (requestFuture != null) {
+        requestFuture.complete(new GetTopicsResult(topics,
+                success.hasTopicsHash() ? success.getTopicsHash() : null,
+                success.isFiltered(),
+                success.isChanged(),
+                // Store cmdSize in the GetTopicsResult <----
+                cmdSize));
+    } else {
+        duplicatedResponseCounter.incrementAndGet();
+        log.warn("{} Received unknown request id from server: {}", 
ctx.channel(), success.getRequestId());
+    }
+}
+```
+
+The limiter would be integrated into `LookupProxyHandler`'s 
`performGetTopicsOfNamespace` in this way:
+
+```java
+    
proxyConnection.getConnectionPool().getConnection(addr).thenAccept(clientCnx -> 
{
+        // Connected to backend broker
+        long requestId = proxyConnection.newRequestId();
+        ByteBuf command;
+        command = Commands.newGetTopicsOfNamespaceRequest(namespaceName, 
requestId, mode,
+                topicsPattern, topicsHash);
+        // Acquire a fixed amount of permits initially since it's not known 
how much memory will be used
+        // This will ensure that the operation continues only after it has the 
initial permits
+        // It would be possible to use statistics for initial estimate, but 
this is simpler and sufficient
+        maxTopicListInFlightLimiter.acquire(INITIAL_SIZE, 
AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY)
+            .thenCompose(initialPermit -> {
+                clientCnx.newGetTopicsOfNamespace(command, 
requestId).whenComplete((r, t) -> {
+                    if (t != null) {
+                        maxTopicListInFlightLimiter.release(initialPermit);    
                        
+                        log.warn("[{}] Failed to get TopicsOfNamespace {}: {}",
+                                clientAddress, namespaceName, t.getMessage());
+                        writeAndFlush(
+                            Commands.newError(clientRequestId, 
getServerError(t), t.getMessage()));
+                    } else {
+                        // Update the initial permits to reflect the actual 
size of the response
+                        maxTopicListInFlightLimiter.update(initialPermit, 
r.getSerializedSize())
+                            .thenCompose(heapPermit -> {
+                                // Acquire a direct memory permit for 
serialization
+                                
maxTopicListInFlightLimiter.acquire(r.getSerializedSize(), 
AsyncDualMemoryLimiter.LimitType.DIRECT_MEMORY)
+                                    .thenAccept(directPermit -> {
+                                        proxyConnection.ctx().writeAndFlush(
+                                            
Commands.newGetTopicsOfNamespaceResponse(r.getNonPartitionedOrPartitionTopics(),
+                                                                               
     r.getTopicsHash(), r.isFiltered(),
+                                                                               
     r.isChanged(), clientRequestId)
+                                            ).addListener(future -> {
+                                                // Release permits after the 
response has been written to the socket
+                                                
maxTopicListInFlightLimiter.release(heapPermit);
+                                                
maxTopicListInFlightLimiter.release(directPermit);
+                                            });
+                                    }) // Add exception handling for releasing 
directPermit
+                            }); // Add exception handling for releasing 
heapPermit
+                    }
+                });
+            });
+        proxyConnection.getConnectionPool().releaseConnection(clientCnx);
+    }).exceptionally(ex -> {
+```
+
+### Public-facing Changes
+
+#### Configuration
+
+**broker.conf**/**proxy.conf** additions to complete the memory management 
configuration set:
+
+```properties
+# Maximum heap memory for inflight topic list operations (MB)
+# Default: 100 MB (supports ~1M topic names assuming 100 bytes each)
+maxTopicListInFlightHeapMemSizeMB=100
+
+# Maximum direct memory for inflight topic list responses (MB)  
+# Default: 100 MB (network buffers for serialized responses)
+maxTopicListInFlightDirectMemSizeMB=100
+
+# Timeout for acquiring heap memory permits (milliseconds)
+# Default: 25000 (25 seconds)
+maxTopicListInFlightHeapMemSizePermitsAcquireTimeoutMillis=25000
+
+# Maximum queue size for heap memory permit requests
+# Default: 1000 (prevent unbounded queueing)
+maxTopicListInFlightHeapMemSizePermitsAcquireQueueSize=1000
+
+# Timeout for acquiring direct memory permits (milliseconds)
+# Default: 25000 (25 seconds)  
+maxTopicListInFlightDirectMemSizePermitsAcquireTimeoutMillis=25000
+
+# Maximum queue size for direct memory permit requests
+# Default: 1000 (prevent unbounded queueing)
+maxTopicListInFlightDirectMemSizePermitsAcquireQueueSize=1000
+```
+
+#### Metrics
+
+New metrics under `pulsar_broker_topic_list_`/`pulsar_proxy_topic_list` 
prefix, complementing existing memory metrics:
+
+| Metric Name | Type | Description | Labels |
+|------------|------|-------------|--------|
+| `heap_memory_used_bytes` | Gauge | Current heap memory used by topic 
listings | `cluster` |
+| `heap_memory_limit_bytes` | Gauge | Configured heap memory limit | `cluster` 
|
+| `direct_memory_used_bytes` | Gauge | Current direct memory used by topic 
listings | `cluster` |
+| `direct_memory_limit_bytes` | Gauge | Configured direct memory limit | 
`cluster` |
+| `heap_queue_size` | Gauge | Current heap memory limiter queue size | 
`cluster` |
+| `heap_queue_max_size` | Gauge | Maximum heap memory limiter queue size | 
`cluster` |
+| `direct_queue_size` | Gauge | Current direct memory limiter queue size | 
`cluster` |
+| `direct_queue_max_size` | Gauge | Maximum direct memory limiter queue size | 
`cluster` |
+| `heap_wait_time_ms` | Histogram | Wait time for heap memory permits | 
`cluster` |
+| `direct_wait_time_ms` | Histogram | Wait time for direct memory permits | 
`cluster` |
+| `heap_timeout_total` | Counter | Total heap memory permit timeouts | 
`cluster` |
+| `direct_timeout_total` | Counter | Total direct memory permit timeouts | 
`cluster` |
+
+#### Public API
+
+No changes to REST API.
+
+#### Binary Protocol
+
+No protocol changes. Existing commands continue to work with added server-side 
memory limits and backpressure.
+
+## Monitoring
+
+Operators should monitor the following metrics alongside existing memory 
management metrics and set up alerts:
+
+1. **Memory Utilization Alert**:
+   - Alert when `heap_memory_used_bytes / heap_memory_limit_bytes > 0.8`
+   - Indicates the need to increase limits or investigate namespace growth
+
+2. **Queue Saturation Alert**:
+   - Alert when `heap_queue_size / heap_queue_max_size > 0.9`
+   - Indicates sustained memory pressure requiring capacity adjustment
+
+3. **Timeout Rate Alert**:
+   - Alert when `rate(heap_timeout_total[5m]) > 1`
+   - Indicates clients experiencing failures due to memory-based flow control
+
+4. **P99 Wait Time Alert**:
+   - Alert when `heap_wait_time_ms{quantile="0.99"} > 10000`
+   - Indicates degraded client experience due to memory pressure
+
+These alerts should be configured alongside existing memory alerts for 
`managedLedgerCacheSizeMB`, `maxMessagePublishBufferSizeInMB`, and other memory 
limits to provide comprehensive memory management visibility.
+
+## Security Considerations
+
+The memory limiting mechanism introduces new denial-of-service protections:
+
+1. **Resource Exhaustion Protection**: The limits prevent bad clients from 
triggering OOM by requesting large topic lists repeatedly, completing the 
broker's defense against memory-based attacks.
+
+2. **Fair Queueing**: The queue size limits prevent bad clients from 
monopolizing memory permits and blocking legitimate requests.
+
+3. **Multi-tenancy Isolation**: Consider per-tenant memory limits in future 
iterations to prevent one tenant from consuming all available topic listing 
memory permits, similar to how other memory limits could benefit from tenant 
isolation.
+
+## Backward & Forward Compatibility
+
+### Upgrade
+
+1. The feature can be disabled setting the limits set to 0 initially to 
maintain full compatibility
+2. After upgrade, gradually enable memory limits:
+   ```bash
+   # Start with high limits to understand current usage
+   pulsar-admin brokers update-dynamic-config --config 
maxTopicListInFlightHeapMemSizeMB --value 512
+   pulsar-admin brokers update-dynamic-config --config 
maxTopicListInFlightDirectMemSizeMB --value 512
+   
+   # Monitor metrics and adjust downward based on actual usage patterns
+   pulsar-admin brokers update-dynamic-config --config 
maxTopicListInFlightHeapMemSizeMB --value 200
+   pulsar-admin brokers update-dynamic-config --config 
maxTopicListInFlightDirectMemSizeMB --value 200
+   pulsar-admin brokers update-dynamic-config --config 
maxTopicListInFlightHeapMemSizeMB --value 100
+   pulsar-admin brokers update-dynamic-config --config 
maxTopicListInFlightDirectMemSizeMB --value 100
+   ```
+3. No client changes are required
+
+### Downgrade / Rollback
+
+- No changes required
+
+### Pulsar Geo-Replication Upgrade & Downgrade/Rollback Considerations
+
+- No impact on replication protocol
+
+## Alternatives
+
+### Alternative 1: Pagination Protocol
+- **Approach**: Modify protocol to support paginated topic listing
+- **Rejected because**: Requires breaking protocol changes and client updates
+
+### Alternative 2: Response Streaming
+- **Approach**: Stream topics as they're retrieved rather than buffering
+- **Rejected because**: Streaming in smaller "chunks" doesn't solve the memory 
issue since the Pulsar client could have multiple outstanding requests. The 
topic list watcher is already designed to handle this scenario to reduce the 
load on the broker.
+
+### Alternative 3: Hard Memory Limits with Immediate Failure
+- **Approach**: Fail requests immediately when memory threshold reached
+- **Rejected because**: Client retries would add more load and wouldn't 
provide graceful degradation under peak load
+
+### Alternative 4: Extend Existing Memory Limits
+- **Approach**: Include topic listing memory in `managedLedgerCacheSizeMB` or 
similar
+- **Rejected because**: Topic listing memory has different characteristics and 
usage patterns, requiring separate tuning and monitoring
+
+## General Notes
+
+- Memory estimates are conservative and may overestimate actual usage to 
ensure safety
+- The solution prioritizes memory management consistency and stability over 
perfect memory accuracy
+- This completes Pulsar's comprehensive memory management framework by 
addressing the final unbounded allocation path
+- Future enhancements could include:
+  - Per-tenant memory limits for topic listing operations
+  - Per-namespace memory limits
+  - Per-connection memory limits to prevent single clients from queueing up 
many topic listing requests
+  - Integration with overall broker memory management policies
+
+## Links
+
+* Mailing List discussion thread: 
https://lists.apache.org/thread/16fz747yqcr5kjkw9p5r6sc09rmcsyxr
+* Mailing List voting thread: 
https://lists.apache.org/thread/pwptwg3h6y4nn3whmc7y172cpopd36gd
\ No newline at end of file

Reply via email to