lhotari commented on code in PR #25134:
URL: https://github.com/apache/pulsar/pull/25134#discussion_r2691233659


##########
pip/pip-452.md:
##########
@@ -0,0 +1,245 @@
+# PIP-452: Customizable topic listing of namespace with properties
+
+# Motivation
+Currently, the CommandGetTopicsOfNamespace logic in the Pulsar Broker is 
hard-coded to scan the metadata store (ZooKeeper) for all children nodes under 
a namespace.
+
+This implementation limits the flexibility required for complex multi-tenant 
scenarios:
+
+No Client Context: The broker cannot distinguish who is asking for the topics 
or why. It cannot filter topics based on client properties (these client 
properties may correspond to, or be derived from topic properties).
+
+Inefficient Filtering: For namespaces with millions of topics, the broker must 
fetch the full list into memory before applying the topics_pattern regex. There 
is no way to "push down" the filtering to the data source (e.g., a database 
with an index).
+
+To address these issues, I propose making the topic listing logic pluggable 
and extending the protocol to accept client properties.
+
+# Goals
+Protocol: Add a properties field to `CommandGetTopicsOfNamespace` to carry 
client-side context.
+
+Broker: Extend `BrokerInterceptor` to allow plugins to intercept the "Get 
Topics" request.
+
+Client: Update the Java Client to forward Consumer properties to the lookup 
service when using Regex subscriptions.
+
+Admin API & CLI: Update the REST API and CLI to accept properties for listing 
topics in a namespace.
+
+# High Level Design
+We will modify the Pulsar Protocol to carry a properties map. On the Broker 
side, we will add a new method to the BrokerInterceptor interface.
+The default implementation will preserve the existing behavior (fetching from 
ZooKeeper). The Broker's connection handler will simply delegate the request to 
this strategy.
+
+# Detailed Design
+
+## Public-facing Changes
+
+### Protocol Changes
+Update `PulsarApi.proto` to include the properties field.
+
+PulsarApi.proto
+```protobuf
+message CommandGetTopicsOfNamespace {
+    required uint64 request_id = 1;
+    required string namespace = 2;
+    optional Mode mode = 3 [default = PERSISTENT];
+    
+    // Existing fields for filtering and hash optimization
+    optional string topics_pattern = 4;
+    optional string topics_hash = 5;
+
+    // New field: Context properties from the client
+    repeated KeyValue properties = 6;
+}
+```
+
+### REST API & CLI Changes
+
+Add `properties` parameter to the REST API endpoint for listing topics in a 
namespace to list topic with specific properties for customizable topic listing.
+
+REST API (URL encoding is required for values):
+```
+GET /admin/v2/persistent/{tenant}/{namespace}?properties=k1=v1,k2=v2
+```
+
+CLI:
+```
+pulsar-admin topics list <tenant>/<namespace> -p k1=v1 -p k2=v2
+```
+
+## Design & Implementation Details
+### Broker Changes
+
+Add `TopicListingResult` for listing topic results
+
+```java
+package org.apache.pulsar.broker.namespace;
+
+import java.util.List;
+import java.util.Objects;
+
+public record TopicListingResult (List<String> topics, boolean filtered) {
+
+    public TopicListingResult{
+        Objects.requireNonNull(topics, "topics");
+    }
+}
+```
+Update BrokerInterceptor Interface
+Add a default method in BrokerInterceptor. It returns null by default, 
indicating no interception (to maintain backward compatibility).
+
+```Java
+
+package org.apache.pulsar.broker.intercept;
+
+import java.util.Optional;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
+import java.util.concurrent.CompletableFuture;
+import java.util.Map;
+
+public interface BrokerInterceptor extends AutoCloseable {
+
+    // ... existing methods ...
+
+    /**
+     * Intercept the GetTopicsOfNamespace request.
+     * <p>
+     * This method allows plugins to override the default topic discovery 
logic (ZooKeeper scan).
+     * It enables fetching topics from external sources (e.g., databases, 
other metadata stores)
+     * based on the provided client context properties.
+     *
+     * @param namespace     The namespace being queried.
+     * @param mode          The query mode (PERSISTENT, NON_PERSISTENT, or 
ALL).
+     * @param topicsPattern Optional regex pattern provided by the client.
+     * @param properties    Context properties provided by the client.
+     * @return A CompletableFuture containing the result:
+     * If the future completes with {@code Optional.empty()}, proceed to the 
next interceptor or broker's default logic to list all topics.
+     */
+    default CompletableFuture<Optional<TopicListingResult>> 
interceptGetTopicsOfNamespace(
+            NamespaceName namespace,
+            CommandGetTopicsOfNamespace.Mode mode,
+            Optional<String> topicsPattern,
+            Map<String, String> properties) {
+        return CompletableFuture.completedFuture(Optional.empty());
+    }
+}
+```
+
+ServerCnx Logic Update
+In ServerCnx.java, the method internalHandleGetTopicsOfNamespace will be 
updated to use this strategy.
+
+```java
+private void internalHandleGetTopicsOfNamespace(String namespace, 
NamespaceName namespaceName, long requestId,
+                                                
CommandGetTopicsOfNamespace.Mode mode,
+                                                Optional<String> 
topicsPattern, Optional<String> topicsHash,
+                                                Map<String, String> properties,
+                                                Semaphore lookupSemaphore) {
+    BooleanSupplier isPermitRequestCancelled = () -> 
!ctx().channel().isActive();
+    TopicListSizeResultCache.ResultHolder listSizeHolder =
+        
service.getTopicListSizeResultCache().getTopicListSize(namespaceName.toString(),
 mode);
+
+    listSizeHolder.getSizeAsync().thenAccept(initialSize -> {
+        maxTopicListInFlightLimiter.withAcquiredPermits(initialSize,
+            AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY, 
isPermitRequestCancelled, initialPermits -> {
+                // Call the Interceptor if exists
+                BrokerInterceptor brokerInterceptor = 
getBrokerService().getPulsar().getBrokerInterceptor();
+                CompletableFuture<Optional<TopicListingResult>> 
interceptorFuture =
+                        brokerInterceptor == null ? 
CompletableFuture.completedFuture(Optional.empty()) :
+                            
brokerInterceptor.interceptGetTopicsOfNamespace(namespaceName, mode, 
topicsPattern, properties);
+
+                return interceptorFuture.thenCompose(interceptorResult -> {
+                    // Decision branch: Use the interceptor result OR fall 
back to the default logic
+                    if (interceptorResult != null && 
interceptorResult.isPresent()) {
+                        // case A: The interceptor has taken over the request
+                        return 
CompletableFuture.completedFuture(interceptorResult.get());
+                    } else {
+                        // case B: The interceptor did not handle, so the 
original query logic is executed.
+                        // Wrap List<String> into TopicListingResult(topics, 
filtered=false) for unified processing
+                        return 
getBrokerService().pulsar().getNamespaceService()
+                            .getListOfUserTopics(namespaceName, mode)
+                            .thenApply(topics -> new 
TopicListingResult(topics, false));
+                    }
+                }).thenCompose(listingResult -> {
+                    // Adjust memory permits based on ACTUAL size
+                    long actualSize = 
TopicListMemoryLimiter.estimateTopicListSize(listingResult.topics());
+                    listSizeHolder.updateSize(actualSize);
+
+                    return 
maxTopicListInFlightLimiter.withUpdatedPermits(initialPermits, actualSize,
+                        isPermitRequestCancelled, permits -> {
+
+                            List<String> finalTopics = listingResult.topics();
+                            boolean isAlreadyFiltered = 
listingResult.filtered();
+
+                            // Apply regex filtering (if the interceptor did 
not filter and regex matching is enabled)
+                            if (!isAlreadyFiltered && 
enableSubscriptionPatternEvaluation
+                                && topicsPattern.isPresent()) {
+                                    ......
+                            }
+
+                            // Calculate the Hash and send the response
+                            String hash = TopicList.calculateHash(finalTopics);
+                            boolean hashUnchanged = topicsHash.isPresent() && 
topicsHash.get().equals(hash);
+                                ......
+                        },
+                        // Handle logic for heap memory limit exceeded inside 
withUpdatedPermits
+                        t -> {
+                                ......
+                            return CompletableFuture.completedFuture(null);
+                        });
+                }).whenComplete((__, ___) -> {
+                    lookupSemaphore.release();
+                }).exceptionally(ex -> {
+                    ......
+                    return null;
+                });
+            },
+            // Handle logic for initial acquire failure
+            t -> {
+                ......
+            });
+    });
+}
+```
+
+### Client Changes
+Update LookupService
+The internal LookupService interface is updated to accept the properties map.
+
+```Java
+CompletableFuture<List<String>> getTopicsUnderNamespace(
+    NamespaceName namespace, 
+    Mode mode, 
+    String topicsPattern, 
+    String topicsHash, 
+    Map<String, String> properties
+);
+```
+Update `PatternMultiTopicsConsumerImpl`
+The regex consumer implementation (PatternMultiTopicsConsumerImpl) will be 
updated to extract the properties from ConsumerConfigurationData and pass them 
to the LookupService.
+
+```Java
+// In PatternMultiTopicsConsumerImpl.java
+ConsumerConfigurationData conf;
+Map<String, String> contextProperties = conf.getProperties();
+
+lookup.getTopicsUnderNamespace(
+    namespace, 
+    mode, 
+    topicsPattern.pattern(), 
+    topicsHash, 
+    contextProperties // Pass consumer properties here
+).thenAccept(topics -> {
+    // ... update subscriptions ...
+});
+```
+
+
+# Backward Compatibility
+Protocol: Adding an optional field (properties) to the Protobuf definition is 
non-breaking. Old clients will not send this field; old brokers will ignore it.
+
+Behavior: The default strategy mimics the current behavior exactly. The Broker 
retains the final filtering logic, ensuring that even custom strategies cannot 
return topics that violate the client's requested pattern.
+
+# Security Considerations
+Input Validation: The properties map is user-supplied input. Implementers of 
the customizable strategy MUST validate and sanitize these inputs before using 
them, especially if they are used to construct database queries.
+
+Authorization: This PIP only controls the discovery (listing) of topics. It 
does not bypass the Authorization Service for consuming or producing to those 
topics.
+

Review Comment:
   Answering my own question about the topic properties. They were added with 
[PIP-110](https://github.com/apache/pulsar/blob/master/pip/pip-110.md), #12818 
. 
   The topic properties for partitioned topics are stored in 
PartitionedTopicMetadata, 
   
https://github.com/apache/pulsar/blob/a373258d0430aa04cc05c4c46b0ac5e0e38153cb/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/partition/PartitionedTopicMetadata.java#L33-L34
   However, they are stored in LedgerMetadata for non-partitioned topics:
   
https://github.com/apache/pulsar/blob/75658cc353e74e132ef30090767fe062eae9ca05/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java#L665-L666
   (PR #17238)
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to