BewareMyPower commented on code in PR #25134:
URL: https://github.com/apache/pulsar/pull/25134#discussion_r2684561910


##########
pip/pip-452.md:
##########
@@ -0,0 +1,306 @@
+# PIP-452: Customizable Topic Listing in GetTopicsOfNamespace with properties
+
+# Motivation
+Currently, the CommandGetTopicsOfNamespace logic in the Pulsar Broker is 
hard-coded to scan the metadata store (ZooKeeper) for all children nodes under 
a namespace.
+
+This implementation limits the flexibility required for complex multi-tenant 
scenarios:
+
+No Client Context: The broker cannot distinguish who is asking for the topics 
or why. It cannot filter topics based on client properties (This properties may 
be related to topic properties).
+
+Inefficient Filtering: For namespaces with millions of topics, the broker must 
fetch the full list into memory before applying the topics_pattern regex. There 
is no way to "push down" the filtering to the data source (e.g., a database 
with an index).
+
+To address these issues, I propose making the topic listing logic pluggable 
and extending the protocol to accept client properties.
+
+# Goals
+Protocol: Add a properties field to `CommandGetTopicsOfNamespace` to carry 
client-side context.
+
+Broker: Extend `BrokerInterceptor` to allow plugins to intercept the "Get 
Topics" request.
+
+Client: Update the Java Client to forward Consumer properties to the lookup 
service when using Regex subscriptions.
+
+# High Level Design
+We will modify the Pulsar Protocol to carry a properties map. On the Broker 
side, we will add a new method to the BrokerInterceptor interface.
+The default implementation will preserve the existing behavior (fetching from 
ZooKeeper). The Broker's connection handler will simply delegate the request to 
this strategy.
+
+# Detailed Design
+
+## Public-facing Changes
+
+### Protocol Changes
+Update `PulsarApi.proto` to include the properties field.
+
+PulsarApi.proto
+```protobuf
+message CommandGetTopicsOfNamespace {
+    required uint64 request_id = 1;
+    required string namespace = 2;
+    optional Mode mode = 3 [default = PERSISTENT];
+    
+    // Existing fields for filtering and hash optimization
+    optional string topics_pattern = 4;
+    optional string topics_hash = 5;
+
+    // New field: Context properties from the client
+    repeated KeyValue properties = 6;
+}
+```
+
+### REST API & CLI Changes
+
+Add `properties` parameter to the REST API endpoint for listing topics in a 
namespace to list topic with specific properties for customizable topic listing.
+
+REST API:
+```
+GET /admin/v2/persistent/{tenant}/{namespace} with a query params 
properties=k1=v1,k2=v2
+```
+
+CLI:
+```
+pulsar-admin topics list <tenant>/<namespace> -p k1=v1 -p k2=v2"
+```
+
+## Design & Implementation Details
+### Broker Changes
+
+Add `TopicListingResult` for listing topic results
+
+```java
+package org.apache.pulsar.broker.intercept;
+
+import java.util.List;
+import lombok.Getter;
+
+@Getter
+public class TopicListingResult {
+
+    // Define a singleton "Pass Through" object
+    private static final TopicListingResult PASS_THROUGH_INSTANCE = new 
TopicListingResult(null, false);
+
+    private final List<String> topics;
+    private final boolean filtered;
+
+    private TopicListingResult(List<String> topics, boolean filtered) {
+        this.topics = topics;
+        this.filtered = filtered;
+    }
+
+
+    /**
+     * Return an invalid result indicating that the caller should continue 
with the default logic.
+     */
+    public static TopicListingResult passThrough() {
+        return PASS_THROUGH_INSTANCE;
+    }
+
+    /**
+     * Return a successful result with the provided topics list.
+     */
+    public static TopicListingResult success(List<String> topics, boolean 
filtered) {
+        if (topics == null) {
+            throw new IllegalArgumentException("Topics list cannot be null in 
success result");
+        }
+        return new TopicListingResult(topics, filtered);
+    }
+
+    /**
+     * Check if this result is a "Pass Through" result.
+     */
+    public boolean isPassThrough() {
+        return this == PASS_THROUGH_INSTANCE;
+    }
+}
+```
+Update BrokerInterceptor Interface
+Add a default method in BrokerInterceptor. It returns null by default, 
indicating no interception (to maintain backward compatibility).
+
+```Java
+
+package org.apache.pulsar.broker.intercept;
+
+import java.util.Optional;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
+import java.util.concurrent.CompletableFuture;
+
+public interface BrokerInterceptor extends AutoCloseable {
+
+    // ... existing methods ...
+
+    /**
+     * Intercept the GetTopicsOfNamespace request.
+     * <p>
+     * This method allows plugins to override the default topic discovery 
logic (ZooKeeper scan).
+     * It enables fetching topics from external sources (e.g., databases, 
other metadata stores)
+     * based on the provided client context properties.
+     *
+     * @param namespace     The namespace being queried.
+     * @param mode          The query mode (PERSISTENT, NON_PERSISTENT, or 
ALL).
+     * @param topicsPattern Optional regex pattern provided by the client.
+     * @param properties    Context properties provided by the client.
+     * @return A CompletableFuture containing the result:
+     * <ul>
+     * <li>{@link TopicListingResult#passThrough()} :
+     * (Default) Proceed to the next interceptor or Broker's default 
logic.</li>
+     * <li>{@link TopicListingResult#success(java.util.List, boolean)} :
+     *      Stop the chain and return these topics immediately.</li>
+     * </ul>
+     * If the future completes with {@code null}, it is treated as {@code 
passThrough()}.
+     */
+    default CompletableFuture<TopicListingResult> 
interceptGetTopicsOfNamespace(
+            NamespaceName namespace,
+            CommandGetTopicsOfNamespace.Mode mode,
+            Optional<String> topicsPattern,
+            Map<String, String> properties) {
+        return 
CompletableFuture.completedFuture(TopicListingResult.passThrough());
+    }
+}
+```
+
+ServerCnx Logic Update
+In ServerCnx.java, the method internalHandleGetTopicsOfNamespace will be 
updated to use this strategy.
+
+```java
+private void internalHandleGetTopicsOfNamespace(String namespace, 
NamespaceName namespaceName, long requestId,
+                                                
CommandGetTopicsOfNamespace.Mode mode,
+                                                Optional<String> 
topicsPattern, Optional<String> topicsHash,
+                                                Map<String, String> properties,
+                                                Semaphore lookupSemaphore) {
+    BooleanSupplier isPermitRequestCancelled = () -> 
!ctx().channel().isActive();
+    TopicListSizeResultCache.ResultHolder listSizeHolder =
+            
service.getTopicListSizeResultCache().getTopicListSize(namespaceName.toString(),
 mode);
+
+    listSizeHolder.getSizeAsync().thenAccept(initialSize -> {
+        maxTopicListInFlightLimiter.withAcquiredPermits(initialSize,
+            AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY, 
isPermitRequestCancelled, initialPermits -> {
+                // Call the Interceptor
+                CompletableFuture<TopicListingResult> interceptorFuture =
+                    getBrokerService().getPulsar().getBrokerInterceptor()
+                        .interceptGetTopicsOfNamespace(namespaceName, mode, 
topicsPattern, properties);
+
+                return interceptorFuture.thenCompose(interceptorResult -> {
+                    // Decision branch: Use the interceptor result OR fall 
back to the default logic
+                    if (interceptorResult != null && 
!interceptorResult.isPassThrough()) {
+                        // case A: The interceptor has taken over the request
+                        return 
CompletableFuture.completedFuture(interceptorResult);
+                    } else {
+                        // case B: The interceptor did not handle, so the 
original query logic is executed.
+                        // Wrap List<String> into TopicListingResult(topics, 
filtered=false) for unified processing
+                        return 
getBrokerService().pulsar().getNamespaceService()
+                                .getListOfUserTopics(namespaceName, mode)
+                                .thenApply(topics -> 
TopicListingResult.success(topics, false));
+                    }
+                }).thenCompose(listingResult -> {
+                    List<String> rawTopics = listingResult.getTopics();

Review Comment:
   You don't have to copy so much code here, which does not focus on the key 
change. The only thing that matters is that what will replace the following 
existing code:
   
   ```java
                           return 
getBrokerService().pulsar().getNamespaceService()
                                   .getListOfUserTopics(namespaceName, mode)
   ```
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to