BewareMyPower commented on code in PR #25134:
URL: https://github.com/apache/pulsar/pull/25134#discussion_r2684561910
##########
pip/pip-452.md:
##########
@@ -0,0 +1,306 @@
+# PIP-452: Customizable Topic Listing in GetTopicsOfNamespace with properties
+
+# Motivation
+Currently, the CommandGetTopicsOfNamespace logic in the Pulsar Broker is
hard-coded to scan the metadata store (ZooKeeper) for all children nodes under
a namespace.
+
+This implementation limits the flexibility required for complex multi-tenant
scenarios:
+
+No Client Context: The broker cannot distinguish who is asking for the topics
or why. It cannot filter topics based on client properties (This properties may
be related to topic properties).
+
+Inefficient Filtering: For namespaces with millions of topics, the broker must
fetch the full list into memory before applying the topics_pattern regex. There
is no way to "push down" the filtering to the data source (e.g., a database
with an index).
+
+To address these issues, I propose making the topic listing logic pluggable
and extending the protocol to accept client properties.
+
+# Goals
+Protocol: Add a properties field to `CommandGetTopicsOfNamespace` to carry
client-side context.
+
+Broker: Extend `BrokerInterceptor` to allow plugins to intercept the "Get
Topics" request.
+
+Client: Update the Java Client to forward Consumer properties to the lookup
service when using Regex subscriptions.
+
+# High Level Design
+We will modify the Pulsar Protocol to carry a properties map. On the Broker
side, we will add a new method to the BrokerInterceptor interface.
+The default implementation will preserve the existing behavior (fetching from
ZooKeeper). The Broker's connection handler will simply delegate the request to
this strategy.
+
+# Detailed Design
+
+## Public-facing Changes
+
+### Protocol Changes
+Update `PulsarApi.proto` to include the properties field.
+
+PulsarApi.proto
+```protobuf
+message CommandGetTopicsOfNamespace {
+ required uint64 request_id = 1;
+ required string namespace = 2;
+ optional Mode mode = 3 [default = PERSISTENT];
+
+ // Existing fields for filtering and hash optimization
+ optional string topics_pattern = 4;
+ optional string topics_hash = 5;
+
+ // New field: Context properties from the client
+ repeated KeyValue properties = 6;
+}
+```
+
+### REST API & CLI Changes
+
+Add `properties` parameter to the REST API endpoint for listing topics in a
namespace to list topic with specific properties for customizable topic listing.
+
+REST API:
+```
+GET /admin/v2/persistent/{tenant}/{namespace} with a query params
properties=k1=v1,k2=v2
+```
+
+CLI:
+```
+pulsar-admin topics list <tenant>/<namespace> -p k1=v1 -p k2=v2"
+```
+
+## Design & Implementation Details
+### Broker Changes
+
+Add `TopicListingResult` for listing topic results
+
+```java
+package org.apache.pulsar.broker.intercept;
+
+import java.util.List;
+import lombok.Getter;
+
+@Getter
+public class TopicListingResult {
+
+ // Define a singleton "Pass Through" object
+ private static final TopicListingResult PASS_THROUGH_INSTANCE = new
TopicListingResult(null, false);
+
+ private final List<String> topics;
+ private final boolean filtered;
+
+ private TopicListingResult(List<String> topics, boolean filtered) {
+ this.topics = topics;
+ this.filtered = filtered;
+ }
+
+
+ /**
+ * Return an invalid result indicating that the caller should continue
with the default logic.
+ */
+ public static TopicListingResult passThrough() {
+ return PASS_THROUGH_INSTANCE;
+ }
+
+ /**
+ * Return a successful result with the provided topics list.
+ */
+ public static TopicListingResult success(List<String> topics, boolean
filtered) {
+ if (topics == null) {
+ throw new IllegalArgumentException("Topics list cannot be null in
success result");
+ }
+ return new TopicListingResult(topics, filtered);
+ }
+
+ /**
+ * Check if this result is a "Pass Through" result.
+ */
+ public boolean isPassThrough() {
+ return this == PASS_THROUGH_INSTANCE;
+ }
+}
+```
+Update BrokerInterceptor Interface
+Add a default method in BrokerInterceptor. It returns null by default,
indicating no interception (to maintain backward compatibility).
+
+```Java
+
+package org.apache.pulsar.broker.intercept;
+
+import java.util.Optional;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
+import java.util.concurrent.CompletableFuture;
+
+public interface BrokerInterceptor extends AutoCloseable {
+
+ // ... existing methods ...
+
+ /**
+ * Intercept the GetTopicsOfNamespace request.
+ * <p>
+ * This method allows plugins to override the default topic discovery
logic (ZooKeeper scan).
+ * It enables fetching topics from external sources (e.g., databases,
other metadata stores)
+ * based on the provided client context properties.
+ *
+ * @param namespace The namespace being queried.
+ * @param mode The query mode (PERSISTENT, NON_PERSISTENT, or
ALL).
+ * @param topicsPattern Optional regex pattern provided by the client.
+ * @param properties Context properties provided by the client.
+ * @return A CompletableFuture containing the result:
+ * <ul>
+ * <li>{@link TopicListingResult#passThrough()} :
+ * (Default) Proceed to the next interceptor or Broker's default
logic.</li>
+ * <li>{@link TopicListingResult#success(java.util.List, boolean)} :
+ * Stop the chain and return these topics immediately.</li>
+ * </ul>
+ * If the future completes with {@code null}, it is treated as {@code
passThrough()}.
+ */
+ default CompletableFuture<TopicListingResult>
interceptGetTopicsOfNamespace(
+ NamespaceName namespace,
+ CommandGetTopicsOfNamespace.Mode mode,
+ Optional<String> topicsPattern,
+ Map<String, String> properties) {
+ return
CompletableFuture.completedFuture(TopicListingResult.passThrough());
+ }
+}
+```
+
+ServerCnx Logic Update
+In ServerCnx.java, the method internalHandleGetTopicsOfNamespace will be
updated to use this strategy.
+
+```java
+private void internalHandleGetTopicsOfNamespace(String namespace,
NamespaceName namespaceName, long requestId,
+
CommandGetTopicsOfNamespace.Mode mode,
+ Optional<String>
topicsPattern, Optional<String> topicsHash,
+ Map<String, String> properties,
+ Semaphore lookupSemaphore) {
+ BooleanSupplier isPermitRequestCancelled = () ->
!ctx().channel().isActive();
+ TopicListSizeResultCache.ResultHolder listSizeHolder =
+
service.getTopicListSizeResultCache().getTopicListSize(namespaceName.toString(),
mode);
+
+ listSizeHolder.getSizeAsync().thenAccept(initialSize -> {
+ maxTopicListInFlightLimiter.withAcquiredPermits(initialSize,
+ AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY,
isPermitRequestCancelled, initialPermits -> {
+ // Call the Interceptor
+ CompletableFuture<TopicListingResult> interceptorFuture =
+ getBrokerService().getPulsar().getBrokerInterceptor()
+ .interceptGetTopicsOfNamespace(namespaceName, mode,
topicsPattern, properties);
+
+ return interceptorFuture.thenCompose(interceptorResult -> {
+ // Decision branch: Use the interceptor result OR fall
back to the default logic
+ if (interceptorResult != null &&
!interceptorResult.isPassThrough()) {
+ // case A: The interceptor has taken over the request
+ return
CompletableFuture.completedFuture(interceptorResult);
+ } else {
+ // case B: The interceptor did not handle, so the
original query logic is executed.
+ // Wrap List<String> into TopicListingResult(topics,
filtered=false) for unified processing
+ return
getBrokerService().pulsar().getNamespaceService()
+ .getListOfUserTopics(namespaceName, mode)
+ .thenApply(topics ->
TopicListingResult.success(topics, false));
+ }
+ }).thenCompose(listingResult -> {
+ List<String> rawTopics = listingResult.getTopics();
Review Comment:
You don't have to copy so much code here, which does not focus on the key
change. The only thing that matters is that what will replace the following
existing code:
```java
return
getBrokerService().pulsar().getNamespaceService()
.getListOfUserTopics(namespaceName, mode)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]