Copilot commented on code in PR #25134:
URL: https://github.com/apache/pulsar/pull/25134#discussion_r2685446889
##########
pip/pip-452.md:
##########
@@ -0,0 +1,241 @@
+# PIP-452: Customizable Topic Listing in GetTopicsOfNamespace with properties
+
+# Motivation
+Currently, the CommandGetTopicsOfNamespace logic in the Pulsar Broker is
hard-coded to scan the metadata store (ZooKeeper) for all children nodes under
a namespace.
+
+This implementation limits the flexibility required for complex multi-tenant
scenarios:
+
+No Client Context: The broker cannot distinguish who is asking for the topics
or why. It cannot filter topics based on client properties (This properties may
be related to topic properties).
+
+Inefficient Filtering: For namespaces with millions of topics, the broker must
fetch the full list into memory before applying the topics_pattern regex. There
is no way to "push down" the filtering to the data source (e.g., a database
with an index).
+
+To address these issues, I propose making the topic listing logic pluggable
and extending the protocol to accept client properties.
+
+# Goals
+Protocol: Add a properties field to `CommandGetTopicsOfNamespace` to carry
client-side context.
+
+Broker: Extend `BrokerInterceptor` to allow plugins to intercept the "Get
Topics" request.
+
+Client: Update the Java Client to forward Consumer properties to the lookup
service when using Regex subscriptions.
+
+# High Level Design
+We will modify the Pulsar Protocol to carry a properties map. On the Broker
side, we will add a new method to the BrokerInterceptor interface.
+The default implementation will preserve the existing behavior (fetching from
ZooKeeper). The Broker's connection handler will simply delegate the request to
this strategy.
+
+# Detailed Design
+
+## Public-facing Changes
+
+### Protocol Changes
+Update `PulsarApi.proto` to include the properties field.
+
+PulsarApi.proto
+```protobuf
+message CommandGetTopicsOfNamespace {
+ required uint64 request_id = 1;
+ required string namespace = 2;
+ optional Mode mode = 3 [default = PERSISTENT];
+
+ // Existing fields for filtering and hash optimization
+ optional string topics_pattern = 4;
+ optional string topics_hash = 5;
+
+ // New field: Context properties from the client
+ repeated KeyValue properties = 6;
+}
+```
+
+### REST API & CLI Changes
+
+Add `properties` parameter to the REST API endpoint for listing topics in a
namespace to list topic with specific properties for customizable topic listing.
+
+REST API:
+```
+GET /admin/v2/persistent/{tenant}/{namespace} with a query params
properties=k1=v1,k2=v2
+```
+
+CLI:
+```
+pulsar-admin topics list <tenant>/<namespace> -p k1=v1 -p k2=v2"
+```
+
+## Design & Implementation Details
+### Broker Changes
+
+Add `TopicListingResult` for listing topic results
+
+```java
+package org.apache.pulsar.broker.namespace;
+
+import java.util.List;
+import java.util.Objects;
+
+public record TopicListingResult (List<String> topics, boolean filtered) {
+
+ public TopicListingResult{
+ Objects.requireNonNull(topics, "topics");
+ }
+}
+```
+Update BrokerInterceptor Interface
+Add a default method in BrokerInterceptor. It returns null by default,
indicating no interception (to maintain backward compatibility).
+
+```Java
+
+package org.apache.pulsar.broker.intercept;
+
+import java.util.Optional;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
+import java.util.concurrent.CompletableFuture;
+
+public interface BrokerInterceptor extends AutoCloseable {
+
+ // ... existing methods ...
+
+ /**
+ * Intercept the GetTopicsOfNamespace request.
+ * <p>
+ * This method allows plugins to override the default topic discovery
logic (ZooKeeper scan).
+ * It enables fetching topics from external sources (e.g., databases,
other metadata stores)
+ * based on the provided client context properties.
+ *
+ * @param namespace The namespace being queried.
+ * @param mode The query mode (PERSISTENT, NON_PERSISTENT, or
ALL).
+ * @param topicsPattern Optional regex pattern provided by the client.
+ * @param properties Context properties provided by the client.
+ * @return A CompletableFuture containing the result:
+ * If the future completes with {@code Optional#empty}, proceed to the
next interceptor or broker's default logic to list all topic.
Review Comment:
Spelling/grammar: The sentence ends with "list all topic" but should be
"list all topics" (plural).
```suggestion
* If the future completes with {@code Optional#empty}, proceed to the
next interceptor or broker's default logic to list all topics.
```
##########
pip/pip-452.md:
##########
@@ -0,0 +1,241 @@
+# PIP-452: Customizable Topic Listing in GetTopicsOfNamespace with properties
+
+# Motivation
+Currently, the CommandGetTopicsOfNamespace logic in the Pulsar Broker is
hard-coded to scan the metadata store (ZooKeeper) for all children nodes under
a namespace.
+
+This implementation limits the flexibility required for complex multi-tenant
scenarios:
+
+No Client Context: The broker cannot distinguish who is asking for the topics
or why. It cannot filter topics based on client properties (This properties may
be related to topic properties).
+
+Inefficient Filtering: For namespaces with millions of topics, the broker must
fetch the full list into memory before applying the topics_pattern regex. There
is no way to "push down" the filtering to the data source (e.g., a database
with an index).
+
+To address these issues, I propose making the topic listing logic pluggable
and extending the protocol to accept client properties.
+
+# Goals
+Protocol: Add a properties field to `CommandGetTopicsOfNamespace` to carry
client-side context.
+
+Broker: Extend `BrokerInterceptor` to allow plugins to intercept the "Get
Topics" request.
+
+Client: Update the Java Client to forward Consumer properties to the lookup
service when using Regex subscriptions.
+
+# High Level Design
+We will modify the Pulsar Protocol to carry a properties map. On the Broker
side, we will add a new method to the BrokerInterceptor interface.
+The default implementation will preserve the existing behavior (fetching from
ZooKeeper). The Broker's connection handler will simply delegate the request to
this strategy.
+
+# Detailed Design
+
+## Public-facing Changes
+
+### Protocol Changes
+Update `PulsarApi.proto` to include the properties field.
+
+PulsarApi.proto
+```protobuf
+message CommandGetTopicsOfNamespace {
+ required uint64 request_id = 1;
+ required string namespace = 2;
+ optional Mode mode = 3 [default = PERSISTENT];
+
+ // Existing fields for filtering and hash optimization
+ optional string topics_pattern = 4;
+ optional string topics_hash = 5;
+
+ // New field: Context properties from the client
+ repeated KeyValue properties = 6;
+}
+```
+
+### REST API & CLI Changes
+
+Add `properties` parameter to the REST API endpoint for listing topics in a
namespace to list topic with specific properties for customizable topic listing.
+
+REST API:
+```
+GET /admin/v2/persistent/{tenant}/{namespace} with a query params
properties=k1=v1,k2=v2
+```
+
+CLI:
+```
+pulsar-admin topics list <tenant>/<namespace> -p k1=v1 -p k2=v2"
Review Comment:
Inconsistent quote style: The CLI example uses a double quote at the end but
no opening quote. Either remove the trailing quote or add both opening and
closing quotes around the entire command for consistency.
```suggestion
pulsar-admin topics list <tenant>/<namespace> -p k1=v1 -p k2=v2
```
##########
pip/pip-452.md:
##########
@@ -0,0 +1,241 @@
+# PIP-452: Customizable Topic Listing in GetTopicsOfNamespace with properties
+
+# Motivation
+Currently, the CommandGetTopicsOfNamespace logic in the Pulsar Broker is
hard-coded to scan the metadata store (ZooKeeper) for all children nodes under
a namespace.
+
+This implementation limits the flexibility required for complex multi-tenant
scenarios:
+
+No Client Context: The broker cannot distinguish who is asking for the topics
or why. It cannot filter topics based on client properties (This properties may
be related to topic properties).
+
+Inefficient Filtering: For namespaces with millions of topics, the broker must
fetch the full list into memory before applying the topics_pattern regex. There
is no way to "push down" the filtering to the data source (e.g., a database
with an index).
+
+To address these issues, I propose making the topic listing logic pluggable
and extending the protocol to accept client properties.
+
+# Goals
+Protocol: Add a properties field to `CommandGetTopicsOfNamespace` to carry
client-side context.
+
+Broker: Extend `BrokerInterceptor` to allow plugins to intercept the "Get
Topics" request.
+
+Client: Update the Java Client to forward Consumer properties to the lookup
service when using Regex subscriptions.
+
+# High Level Design
+We will modify the Pulsar Protocol to carry a properties map. On the Broker
side, we will add a new method to the BrokerInterceptor interface.
+The default implementation will preserve the existing behavior (fetching from
ZooKeeper). The Broker's connection handler will simply delegate the request to
this strategy.
+
+# Detailed Design
+
+## Public-facing Changes
+
+### Protocol Changes
+Update `PulsarApi.proto` to include the properties field.
+
+PulsarApi.proto
+```protobuf
+message CommandGetTopicsOfNamespace {
+ required uint64 request_id = 1;
+ required string namespace = 2;
+ optional Mode mode = 3 [default = PERSISTENT];
+
+ // Existing fields for filtering and hash optimization
+ optional string topics_pattern = 4;
+ optional string topics_hash = 5;
+
+ // New field: Context properties from the client
+ repeated KeyValue properties = 6;
+}
+```
+
+### REST API & CLI Changes
+
+Add `properties` parameter to the REST API endpoint for listing topics in a
namespace to list topic with specific properties for customizable topic listing.
+
+REST API:
+```
+GET /admin/v2/persistent/{tenant}/{namespace} with a query params
properties=k1=v1,k2=v2
+```
+
+CLI:
+```
+pulsar-admin topics list <tenant>/<namespace> -p k1=v1 -p k2=v2"
+```
+
+## Design & Implementation Details
+### Broker Changes
+
+Add `TopicListingResult` for listing topic results
+
+```java
+package org.apache.pulsar.broker.namespace;
+
+import java.util.List;
+import java.util.Objects;
+
+public record TopicListingResult (List<String> topics, boolean filtered) {
+
+ public TopicListingResult{
+ Objects.requireNonNull(topics, "topics");
+ }
+}
+```
+Update BrokerInterceptor Interface
+Add a default method in BrokerInterceptor. It returns null by default,
indicating no interception (to maintain backward compatibility).
+
+```Java
+
+package org.apache.pulsar.broker.intercept;
+
+import java.util.Optional;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
+import java.util.concurrent.CompletableFuture;
+
+public interface BrokerInterceptor extends AutoCloseable {
+
+ // ... existing methods ...
+
+ /**
+ * Intercept the GetTopicsOfNamespace request.
+ * <p>
+ * This method allows plugins to override the default topic discovery
logic (ZooKeeper scan).
+ * It enables fetching topics from external sources (e.g., databases,
other metadata stores)
+ * based on the provided client context properties.
+ *
+ * @param namespace The namespace being queried.
+ * @param mode The query mode (PERSISTENT, NON_PERSISTENT, or
ALL).
+ * @param topicsPattern Optional regex pattern provided by the client.
+ * @param properties Context properties provided by the client.
+ * @return A CompletableFuture containing the result:
+ * If the future completes with {@code Optional#empty}, proceed to the
next interceptor or broker's default logic to list all topic.
Review Comment:
Incorrect Javadoc reference syntax: The Javadoc reference `{@code
Optional#empty}` should be `{@code Optional.empty()}` or `{@link
Optional#empty()}` to follow proper Javadoc conventions. The hash symbol is
correct for linking to methods, but when using @code, it should show the actual
method call syntax with parentheses, or use @link for proper cross-referencing.
```suggestion
* If the future completes with {@code Optional.empty()}, proceed to the
next interceptor or broker's default logic to list all topic.
```
##########
pip/pip-452.md:
##########
@@ -0,0 +1,241 @@
+# PIP-452: Customizable Topic Listing in GetTopicsOfNamespace with properties
+
+# Motivation
+Currently, the CommandGetTopicsOfNamespace logic in the Pulsar Broker is
hard-coded to scan the metadata store (ZooKeeper) for all children nodes under
a namespace.
+
+This implementation limits the flexibility required for complex multi-tenant
scenarios:
+
+No Client Context: The broker cannot distinguish who is asking for the topics
or why. It cannot filter topics based on client properties (This properties may
be related to topic properties).
+
+Inefficient Filtering: For namespaces with millions of topics, the broker must
fetch the full list into memory before applying the topics_pattern regex. There
is no way to "push down" the filtering to the data source (e.g., a database
with an index).
+
+To address these issues, I propose making the topic listing logic pluggable
and extending the protocol to accept client properties.
+
+# Goals
+Protocol: Add a properties field to `CommandGetTopicsOfNamespace` to carry
client-side context.
+
+Broker: Extend `BrokerInterceptor` to allow plugins to intercept the "Get
Topics" request.
+
+Client: Update the Java Client to forward Consumer properties to the lookup
service when using Regex subscriptions.
+
+# High Level Design
+We will modify the Pulsar Protocol to carry a properties map. On the Broker
side, we will add a new method to the BrokerInterceptor interface.
+The default implementation will preserve the existing behavior (fetching from
ZooKeeper). The Broker's connection handler will simply delegate the request to
this strategy.
+
+# Detailed Design
+
+## Public-facing Changes
+
+### Protocol Changes
+Update `PulsarApi.proto` to include the properties field.
+
+PulsarApi.proto
+```protobuf
+message CommandGetTopicsOfNamespace {
+ required uint64 request_id = 1;
+ required string namespace = 2;
+ optional Mode mode = 3 [default = PERSISTENT];
+
+ // Existing fields for filtering and hash optimization
+ optional string topics_pattern = 4;
+ optional string topics_hash = 5;
+
+ // New field: Context properties from the client
+ repeated KeyValue properties = 6;
+}
+```
+
+### REST API & CLI Changes
+
+Add `properties` parameter to the REST API endpoint for listing topics in a
namespace to list topic with specific properties for customizable topic listing.
+
+REST API:
+```
+GET /admin/v2/persistent/{tenant}/{namespace} with a query params
properties=k1=v1,k2=v2
+```
+
+CLI:
+```
+pulsar-admin topics list <tenant>/<namespace> -p k1=v1 -p k2=v2"
+```
+
+## Design & Implementation Details
+### Broker Changes
+
+Add `TopicListingResult` for listing topic results
+
+```java
+package org.apache.pulsar.broker.namespace;
+
+import java.util.List;
+import java.util.Objects;
+
+public record TopicListingResult (List<String> topics, boolean filtered) {
+
+ public TopicListingResult{
+ Objects.requireNonNull(topics, "topics");
+ }
+}
+```
+Update BrokerInterceptor Interface
+Add a default method in BrokerInterceptor. It returns null by default,
indicating no interception (to maintain backward compatibility).
+
+```Java
+
+package org.apache.pulsar.broker.intercept;
+
+import java.util.Optional;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
+import java.util.concurrent.CompletableFuture;
+
+public interface BrokerInterceptor extends AutoCloseable {
+
+ // ... existing methods ...
+
+ /**
+ * Intercept the GetTopicsOfNamespace request.
+ * <p>
+ * This method allows plugins to override the default topic discovery
logic (ZooKeeper scan).
+ * It enables fetching topics from external sources (e.g., databases,
other metadata stores)
+ * based on the provided client context properties.
+ *
+ * @param namespace The namespace being queried.
+ * @param mode The query mode (PERSISTENT, NON_PERSISTENT, or
ALL).
+ * @param topicsPattern Optional regex pattern provided by the client.
+ * @param properties Context properties provided by the client.
+ * @return A CompletableFuture containing the result:
+ * If the future completes with {@code Optional#empty}, proceed to the
next interceptor or broker's default logic to list all topic.
+ */
+ default CompletableFuture<Optional<TopicListingResult>>
interceptGetTopicsOfNamespace(
+ NamespaceName namespace,
+ CommandGetTopicsOfNamespace.Mode mode,
+ Optional<String> topicsPattern,
+ Map<String, String> properties) {
+ return CompletableFuture.completedFuture(Optional.empty());
+ }
+}
+```
+
+ServerCnx Logic Update
+In ServerCnx.java, the method internalHandleGetTopicsOfNamespace will be
updated to use this strategy.
+
+```java
+private void internalHandleGetTopicsOfNamespace(String namespace,
NamespaceName namespaceName, long requestId,
+
CommandGetTopicsOfNamespace.Mode mode,
+ Optional<String>
topicsPattern, Optional<String> topicsHash,
+ Map<String, String> properties,
+ Semaphore lookupSemaphore) {
+ BooleanSupplier isPermitRequestCancelled = () ->
!ctx().channel().isActive();
+ TopicListSizeResultCache.ResultHolder listSizeHolder =
+
service.getTopicListSizeResultCache().getTopicListSize(namespaceName.toString(),
mode);
+
+ listSizeHolder.getSizeAsync().thenAccept(initialSize -> {
+ maxTopicListInFlightLimiter.withAcquiredPermits(initialSize,
+ AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY,
isPermitRequestCancelled, initialPermits -> {
+ // Call the Interceptor if exists
+ BrokerInterceptor brokerInterceptor =
getBrokerService().getPulsar().getBrokerInterceptor();
+ CompletableFuture<Optional<TopicListingResult>>
interceptorFuture =
+ brokerInterceptor == null ?
CompletableFuture.completedFuture(Optional.empty()) :
+
brokerInterceptor.interceptGetTopicsOfNamespace(namespaceName, mode,
topicsPattern, properties);
+
+ return interceptorFuture.thenCompose(interceptorResult -> {
+ // Decision branch: Use the interceptor result OR fall
back to the default logic
+ if (interceptorResult != null &&
interceptorResult.isPresent()) {
+ // case A: The interceptor has taken over the request
+ return
CompletableFuture.completedFuture(interceptorResult);
+ } else {
+ // case B: The interceptor did not handle, so the
original query logic is executed.
+ // Wrap List<String> into TopicListingResult(topics,
filtered=false) for unified processing
+ return
getBrokerService().pulsar().getNamespaceService()
+ .getListOfUserTopics(namespaceName, mode)
+ .thenApply(topics -> new
TopicListingResult(topics, false));
+ }
+ }).thenCompose(listingResult -> {
+ // Adjust memory permits based on ACTUAL size
+ long actualSize =
TopicListMemoryLimiter.estimateTopicListSize(listingResult.getTopics());
+ listSizeHolder.updateSize(actualSize);
+
+ return
maxTopicListInFlightLimiter.withUpdatedPermits(initialPermits, actualSize,
+ isPermitRequestCancelled, permits -> {
+
+ List<String> finalTopics =
listingResult.getTopics();
+ boolean hasAppliedFilter =
listingResult.isFiltered();
+
+ // Apply regex filtering (if the interceptor did
not filter and regex matching is enabled)
+ if (!isAlreadyFiltered &&
enableSubscriptionPatternEvaluation
Review Comment:
Undefined variable: The variable `isAlreadyFiltered` is used at line 166 but
is never defined in the method signature or the code shown. This should likely
be `hasAppliedFilter` which is defined at line 163.
```suggestion
if (!hasAppliedFilter &&
enableSubscriptionPatternEvaluation
```
##########
pip/pip-452.md:
##########
@@ -0,0 +1,241 @@
+# PIP-452: Customizable Topic Listing in GetTopicsOfNamespace with properties
+
+# Motivation
+Currently, the CommandGetTopicsOfNamespace logic in the Pulsar Broker is
hard-coded to scan the metadata store (ZooKeeper) for all children nodes under
a namespace.
+
+This implementation limits the flexibility required for complex multi-tenant
scenarios:
+
+No Client Context: The broker cannot distinguish who is asking for the topics
or why. It cannot filter topics based on client properties (This properties may
be related to topic properties).
+
+Inefficient Filtering: For namespaces with millions of topics, the broker must
fetch the full list into memory before applying the topics_pattern regex. There
is no way to "push down" the filtering to the data source (e.g., a database
with an index).
+
+To address these issues, I propose making the topic listing logic pluggable
and extending the protocol to accept client properties.
+
+# Goals
+Protocol: Add a properties field to `CommandGetTopicsOfNamespace` to carry
client-side context.
+
+Broker: Extend `BrokerInterceptor` to allow plugins to intercept the "Get
Topics" request.
+
+Client: Update the Java Client to forward Consumer properties to the lookup
service when using Regex subscriptions.
+
+# High Level Design
+We will modify the Pulsar Protocol to carry a properties map. On the Broker
side, we will add a new method to the BrokerInterceptor interface.
+The default implementation will preserve the existing behavior (fetching from
ZooKeeper). The Broker's connection handler will simply delegate the request to
this strategy.
+
+# Detailed Design
+
+## Public-facing Changes
+
+### Protocol Changes
+Update `PulsarApi.proto` to include the properties field.
+
+PulsarApi.proto
+```protobuf
+message CommandGetTopicsOfNamespace {
+ required uint64 request_id = 1;
+ required string namespace = 2;
+ optional Mode mode = 3 [default = PERSISTENT];
+
+ // Existing fields for filtering and hash optimization
+ optional string topics_pattern = 4;
+ optional string topics_hash = 5;
+
+ // New field: Context properties from the client
+ repeated KeyValue properties = 6;
+}
+```
+
+### REST API & CLI Changes
+
+Add `properties` parameter to the REST API endpoint for listing topics in a
namespace to list topic with specific properties for customizable topic listing.
+
+REST API:
+```
+GET /admin/v2/persistent/{tenant}/{namespace} with a query params
properties=k1=v1,k2=v2
+```
+
+CLI:
+```
+pulsar-admin topics list <tenant>/<namespace> -p k1=v1 -p k2=v2"
+```
+
+## Design & Implementation Details
+### Broker Changes
+
+Add `TopicListingResult` for listing topic results
+
+```java
+package org.apache.pulsar.broker.namespace;
+
+import java.util.List;
+import java.util.Objects;
+
+public record TopicListingResult (List<String> topics, boolean filtered) {
+
+ public TopicListingResult{
+ Objects.requireNonNull(topics, "topics");
+ }
+}
+```
+Update BrokerInterceptor Interface
+Add a default method in BrokerInterceptor. It returns null by default,
indicating no interception (to maintain backward compatibility).
+
+```Java
+
+package org.apache.pulsar.broker.intercept;
+
+import java.util.Optional;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
+import java.util.concurrent.CompletableFuture;
+
+public interface BrokerInterceptor extends AutoCloseable {
+
+ // ... existing methods ...
+
+ /**
+ * Intercept the GetTopicsOfNamespace request.
+ * <p>
+ * This method allows plugins to override the default topic discovery
logic (ZooKeeper scan).
+ * It enables fetching topics from external sources (e.g., databases,
other metadata stores)
+ * based on the provided client context properties.
+ *
+ * @param namespace The namespace being queried.
+ * @param mode The query mode (PERSISTENT, NON_PERSISTENT, or
ALL).
+ * @param topicsPattern Optional regex pattern provided by the client.
+ * @param properties Context properties provided by the client.
+ * @return A CompletableFuture containing the result:
+ * If the future completes with {@code Optional#empty}, proceed to the
next interceptor or broker's default logic to list all topic.
+ */
+ default CompletableFuture<Optional<TopicListingResult>>
interceptGetTopicsOfNamespace(
+ NamespaceName namespace,
+ CommandGetTopicsOfNamespace.Mode mode,
+ Optional<String> topicsPattern,
+ Map<String, String> properties) {
+ return CompletableFuture.completedFuture(Optional.empty());
+ }
+}
+```
+
+ServerCnx Logic Update
+In ServerCnx.java, the method internalHandleGetTopicsOfNamespace will be
updated to use this strategy.
+
+```java
+private void internalHandleGetTopicsOfNamespace(String namespace,
NamespaceName namespaceName, long requestId,
+
CommandGetTopicsOfNamespace.Mode mode,
+ Optional<String>
topicsPattern, Optional<String> topicsHash,
+ Map<String, String> properties,
+ Semaphore lookupSemaphore) {
+ BooleanSupplier isPermitRequestCancelled = () ->
!ctx().channel().isActive();
+ TopicListSizeResultCache.ResultHolder listSizeHolder =
+
service.getTopicListSizeResultCache().getTopicListSize(namespaceName.toString(),
mode);
+
+ listSizeHolder.getSizeAsync().thenAccept(initialSize -> {
+ maxTopicListInFlightLimiter.withAcquiredPermits(initialSize,
+ AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY,
isPermitRequestCancelled, initialPermits -> {
+ // Call the Interceptor if exists
+ BrokerInterceptor brokerInterceptor =
getBrokerService().getPulsar().getBrokerInterceptor();
+ CompletableFuture<Optional<TopicListingResult>>
interceptorFuture =
+ brokerInterceptor == null ?
CompletableFuture.completedFuture(Optional.empty()) :
+
brokerInterceptor.interceptGetTopicsOfNamespace(namespaceName, mode,
topicsPattern, properties);
+
+ return interceptorFuture.thenCompose(interceptorResult -> {
+ // Decision branch: Use the interceptor result OR fall
back to the default logic
+ if (interceptorResult != null &&
interceptorResult.isPresent()) {
+ // case A: The interceptor has taken over the request
+ return
CompletableFuture.completedFuture(interceptorResult);
+ } else {
+ // case B: The interceptor did not handle, so the
original query logic is executed.
+ // Wrap List<String> into TopicListingResult(topics,
filtered=false) for unified processing
+ return
getBrokerService().pulsar().getNamespaceService()
+ .getListOfUserTopics(namespaceName, mode)
+ .thenApply(topics -> new
TopicListingResult(topics, false));
+ }
+ }).thenCompose(listingResult -> {
Review Comment:
Inconsistent wrapping behavior: The code shows wrapping the default
namespace service result in `TopicListingResult(topics, false)` at line 152,
but the interceptor is expected to return `Optional<TopicListingResult>`.
The logic then unwraps this at line 144-154, but the code flow shows that line
152 returns a `TopicListingResult` directly, not wrapped in an Optional. This
should return `Optional.of(new TopicListingResult(topics, false))` to maintain
type consistency.
```suggestion
// Wrap List<String> into TopicListingResult(topics,
filtered=false) inside an Optional for unified processing
return
getBrokerService().pulsar().getNamespaceService()
.getListOfUserTopics(namespaceName, mode)
.thenApply(topics -> Optional.of(new
TopicListingResult(topics, false)));
}
}).thenCompose(listingResultOpt -> {
TopicListingResult listingResult =
listingResultOpt.get();
```
##########
pip/pip-452.md:
##########
@@ -0,0 +1,241 @@
+# PIP-452: Customizable Topic Listing in GetTopicsOfNamespace with properties
+
+# Motivation
+Currently, the CommandGetTopicsOfNamespace logic in the Pulsar Broker is
hard-coded to scan the metadata store (ZooKeeper) for all children nodes under
a namespace.
+
+This implementation limits the flexibility required for complex multi-tenant
scenarios:
+
+No Client Context: The broker cannot distinguish who is asking for the topics
or why. It cannot filter topics based on client properties (This properties may
be related to topic properties).
+
+Inefficient Filtering: For namespaces with millions of topics, the broker must
fetch the full list into memory before applying the topics_pattern regex. There
is no way to "push down" the filtering to the data source (e.g., a database
with an index).
+
+To address these issues, I propose making the topic listing logic pluggable
and extending the protocol to accept client properties.
+
+# Goals
+Protocol: Add a properties field to `CommandGetTopicsOfNamespace` to carry
client-side context.
+
+Broker: Extend `BrokerInterceptor` to allow plugins to intercept the "Get
Topics" request.
+
+Client: Update the Java Client to forward Consumer properties to the lookup
service when using Regex subscriptions.
+
+# High Level Design
+We will modify the Pulsar Protocol to carry a properties map. On the Broker
side, we will add a new method to the BrokerInterceptor interface.
+The default implementation will preserve the existing behavior (fetching from
ZooKeeper). The Broker's connection handler will simply delegate the request to
this strategy.
+
+# Detailed Design
+
+## Public-facing Changes
+
+### Protocol Changes
+Update `PulsarApi.proto` to include the properties field.
+
+PulsarApi.proto
+```protobuf
+message CommandGetTopicsOfNamespace {
+ required uint64 request_id = 1;
+ required string namespace = 2;
+ optional Mode mode = 3 [default = PERSISTENT];
+
+ // Existing fields for filtering and hash optimization
+ optional string topics_pattern = 4;
+ optional string topics_hash = 5;
+
+ // New field: Context properties from the client
+ repeated KeyValue properties = 6;
+}
+```
+
+### REST API & CLI Changes
+
+Add `properties` parameter to the REST API endpoint for listing topics in a
namespace to list topic with specific properties for customizable topic listing.
+
+REST API:
+```
+GET /admin/v2/persistent/{tenant}/{namespace} with a query params
properties=k1=v1,k2=v2
+```
+
+CLI:
+```
+pulsar-admin topics list <tenant>/<namespace> -p k1=v1 -p k2=v2"
+```
+
+## Design & Implementation Details
+### Broker Changes
+
+Add `TopicListingResult` for listing topic results
+
+```java
+package org.apache.pulsar.broker.namespace;
+
+import java.util.List;
+import java.util.Objects;
+
+public record TopicListingResult (List<String> topics, boolean filtered) {
+
+ public TopicListingResult{
+ Objects.requireNonNull(topics, "topics");
+ }
+}
+```
+Update BrokerInterceptor Interface
+Add a default method in BrokerInterceptor. It returns null by default,
indicating no interception (to maintain backward compatibility).
+
+```Java
+
+package org.apache.pulsar.broker.intercept;
+
+import java.util.Optional;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
+import java.util.concurrent.CompletableFuture;
+
+public interface BrokerInterceptor extends AutoCloseable {
+
+ // ... existing methods ...
+
+ /**
+ * Intercept the GetTopicsOfNamespace request.
+ * <p>
+ * This method allows plugins to override the default topic discovery
logic (ZooKeeper scan).
+ * It enables fetching topics from external sources (e.g., databases,
other metadata stores)
+ * based on the provided client context properties.
+ *
+ * @param namespace The namespace being queried.
+ * @param mode The query mode (PERSISTENT, NON_PERSISTENT, or
ALL).
+ * @param topicsPattern Optional regex pattern provided by the client.
+ * @param properties Context properties provided by the client.
+ * @return A CompletableFuture containing the result:
+ * If the future completes with {@code Optional#empty}, proceed to the
next interceptor or broker's default logic to list all topic.
+ */
+ default CompletableFuture<Optional<TopicListingResult>>
interceptGetTopicsOfNamespace(
+ NamespaceName namespace,
+ CommandGetTopicsOfNamespace.Mode mode,
+ Optional<String> topicsPattern,
+ Map<String, String> properties) {
+ return CompletableFuture.completedFuture(Optional.empty());
+ }
+}
+```
+
+ServerCnx Logic Update
+In ServerCnx.java, the method internalHandleGetTopicsOfNamespace will be
updated to use this strategy.
+
+```java
+private void internalHandleGetTopicsOfNamespace(String namespace,
NamespaceName namespaceName, long requestId,
+
CommandGetTopicsOfNamespace.Mode mode,
+ Optional<String>
topicsPattern, Optional<String> topicsHash,
+ Map<String, String> properties,
+ Semaphore lookupSemaphore) {
+ BooleanSupplier isPermitRequestCancelled = () ->
!ctx().channel().isActive();
+ TopicListSizeResultCache.ResultHolder listSizeHolder =
+
service.getTopicListSizeResultCache().getTopicListSize(namespaceName.toString(),
mode);
+
+ listSizeHolder.getSizeAsync().thenAccept(initialSize -> {
+ maxTopicListInFlightLimiter.withAcquiredPermits(initialSize,
+ AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY,
isPermitRequestCancelled, initialPermits -> {
+ // Call the Interceptor if exists
+ BrokerInterceptor brokerInterceptor =
getBrokerService().getPulsar().getBrokerInterceptor();
+ CompletableFuture<Optional<TopicListingResult>>
interceptorFuture =
+ brokerInterceptor == null ?
CompletableFuture.completedFuture(Optional.empty()) :
+
brokerInterceptor.interceptGetTopicsOfNamespace(namespaceName, mode,
topicsPattern, properties);
+
+ return interceptorFuture.thenCompose(interceptorResult -> {
+ // Decision branch: Use the interceptor result OR fall
back to the default logic
+ if (interceptorResult != null &&
interceptorResult.isPresent()) {
+ // case A: The interceptor has taken over the request
+ return
CompletableFuture.completedFuture(interceptorResult);
+ } else {
+ // case B: The interceptor did not handle, so the
original query logic is executed.
+ // Wrap List<String> into TopicListingResult(topics,
filtered=false) for unified processing
+ return
getBrokerService().pulsar().getNamespaceService()
+ .getListOfUserTopics(namespaceName, mode)
+ .thenApply(topics -> new
TopicListingResult(topics, false));
+ }
+ }).thenCompose(listingResult -> {
+ // Adjust memory permits based on ACTUAL size
+ long actualSize =
TopicListMemoryLimiter.estimateTopicListSize(listingResult.getTopics());
+ listSizeHolder.updateSize(actualSize);
+
+ return
maxTopicListInFlightLimiter.withUpdatedPermits(initialPermits, actualSize,
+ isPermitRequestCancelled, permits -> {
+
+ List<String> finalTopics =
listingResult.getTopics();
Review Comment:
Missing method accessor: The code calls `listingResult.getTopics()`, but
`TopicListingResult` is defined as a Java record with a field named `topics`
(line 73). For Java records, the accessor method should be `topics()` not
`getTopics()`.
```suggestion
long actualSize =
TopicListMemoryLimiter.estimateTopicListSize(listingResult.topics());
listSizeHolder.updateSize(actualSize);
return
maxTopicListInFlightLimiter.withUpdatedPermits(initialPermits, actualSize,
isPermitRequestCancelled, permits -> {
List<String> finalTopics =
listingResult.topics();
```
##########
pip/pip-452.md:
##########
@@ -0,0 +1,241 @@
+# PIP-452: Customizable Topic Listing in GetTopicsOfNamespace with properties
+
+# Motivation
+Currently, the CommandGetTopicsOfNamespace logic in the Pulsar Broker is
hard-coded to scan the metadata store (ZooKeeper) for all children nodes under
a namespace.
+
+This implementation limits the flexibility required for complex multi-tenant
scenarios:
+
+No Client Context: The broker cannot distinguish who is asking for the topics
or why. It cannot filter topics based on client properties (This properties may
be related to topic properties).
+
+Inefficient Filtering: For namespaces with millions of topics, the broker must
fetch the full list into memory before applying the topics_pattern regex. There
is no way to "push down" the filtering to the data source (e.g., a database
with an index).
+
+To address these issues, I propose making the topic listing logic pluggable
and extending the protocol to accept client properties.
+
+# Goals
+Protocol: Add a properties field to `CommandGetTopicsOfNamespace` to carry
client-side context.
+
+Broker: Extend `BrokerInterceptor` to allow plugins to intercept the "Get
Topics" request.
+
+Client: Update the Java Client to forward Consumer properties to the lookup
service when using Regex subscriptions.
+
+# High Level Design
+We will modify the Pulsar Protocol to carry a properties map. On the Broker
side, we will add a new method to the BrokerInterceptor interface.
+The default implementation will preserve the existing behavior (fetching from
ZooKeeper). The Broker's connection handler will simply delegate the request to
this strategy.
+
+# Detailed Design
+
+## Public-facing Changes
+
+### Protocol Changes
+Update `PulsarApi.proto` to include the properties field.
+
+PulsarApi.proto
+```protobuf
+message CommandGetTopicsOfNamespace {
+ required uint64 request_id = 1;
+ required string namespace = 2;
+ optional Mode mode = 3 [default = PERSISTENT];
+
+ // Existing fields for filtering and hash optimization
+ optional string topics_pattern = 4;
+ optional string topics_hash = 5;
+
+ // New field: Context properties from the client
+ repeated KeyValue properties = 6;
+}
+```
+
+### REST API & CLI Changes
+
+Add `properties` parameter to the REST API endpoint for listing topics in a
namespace to list topic with specific properties for customizable topic listing.
+
+REST API:
+```
+GET /admin/v2/persistent/{tenant}/{namespace} with a query params
properties=k1=v1,k2=v2
+```
+
+CLI:
+```
+pulsar-admin topics list <tenant>/<namespace> -p k1=v1 -p k2=v2"
+```
+
+## Design & Implementation Details
+### Broker Changes
+
+Add `TopicListingResult` for listing topic results
+
+```java
+package org.apache.pulsar.broker.namespace;
+
+import java.util.List;
+import java.util.Objects;
+
+public record TopicListingResult (List<String> topics, boolean filtered) {
+
+ public TopicListingResult{
+ Objects.requireNonNull(topics, "topics");
+ }
+}
+```
+Update BrokerInterceptor Interface
+Add a default method in BrokerInterceptor. It returns null by default,
indicating no interception (to maintain backward compatibility).
+
+```Java
+
+package org.apache.pulsar.broker.intercept;
+
+import java.util.Optional;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
+import java.util.concurrent.CompletableFuture;
+
+public interface BrokerInterceptor extends AutoCloseable {
+
+ // ... existing methods ...
+
+ /**
+ * Intercept the GetTopicsOfNamespace request.
+ * <p>
+ * This method allows plugins to override the default topic discovery
logic (ZooKeeper scan).
+ * It enables fetching topics from external sources (e.g., databases,
other metadata stores)
+ * based on the provided client context properties.
+ *
+ * @param namespace The namespace being queried.
+ * @param mode The query mode (PERSISTENT, NON_PERSISTENT, or
ALL).
+ * @param topicsPattern Optional regex pattern provided by the client.
+ * @param properties Context properties provided by the client.
+ * @return A CompletableFuture containing the result:
+ * If the future completes with {@code Optional#empty}, proceed to the
next interceptor or broker's default logic to list all topic.
+ */
+ default CompletableFuture<Optional<TopicListingResult>>
interceptGetTopicsOfNamespace(
+ NamespaceName namespace,
+ CommandGetTopicsOfNamespace.Mode mode,
+ Optional<String> topicsPattern,
+ Map<String, String> properties) {
+ return CompletableFuture.completedFuture(Optional.empty());
+ }
+}
+```
+
+ServerCnx Logic Update
+In ServerCnx.java, the method internalHandleGetTopicsOfNamespace will be
updated to use this strategy.
+
+```java
+private void internalHandleGetTopicsOfNamespace(String namespace,
NamespaceName namespaceName, long requestId,
+
CommandGetTopicsOfNamespace.Mode mode,
+ Optional<String>
topicsPattern, Optional<String> topicsHash,
+ Map<String, String> properties,
+ Semaphore lookupSemaphore) {
+ BooleanSupplier isPermitRequestCancelled = () ->
!ctx().channel().isActive();
+ TopicListSizeResultCache.ResultHolder listSizeHolder =
+
service.getTopicListSizeResultCache().getTopicListSize(namespaceName.toString(),
mode);
+
+ listSizeHolder.getSizeAsync().thenAccept(initialSize -> {
+ maxTopicListInFlightLimiter.withAcquiredPermits(initialSize,
+ AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY,
isPermitRequestCancelled, initialPermits -> {
+ // Call the Interceptor if exists
+ BrokerInterceptor brokerInterceptor =
getBrokerService().getPulsar().getBrokerInterceptor();
+ CompletableFuture<Optional<TopicListingResult>>
interceptorFuture =
+ brokerInterceptor == null ?
CompletableFuture.completedFuture(Optional.empty()) :
+
brokerInterceptor.interceptGetTopicsOfNamespace(namespaceName, mode,
topicsPattern, properties);
+
+ return interceptorFuture.thenCompose(interceptorResult -> {
+ // Decision branch: Use the interceptor result OR fall
back to the default logic
+ if (interceptorResult != null &&
interceptorResult.isPresent()) {
+ // case A: The interceptor has taken over the request
+ return
CompletableFuture.completedFuture(interceptorResult);
+ } else {
+ // case B: The interceptor did not handle, so the
original query logic is executed.
+ // Wrap List<String> into TopicListingResult(topics,
filtered=false) for unified processing
+ return
getBrokerService().pulsar().getNamespaceService()
+ .getListOfUserTopics(namespaceName, mode)
+ .thenApply(topics -> new
TopicListingResult(topics, false));
+ }
+ }).thenCompose(listingResult -> {
Review Comment:
Type inconsistency in chain: The return type handling is problematic. Line
142 shows `interceptorFuture.thenCompose(interceptorResult -> {...})` where
interceptorResult is of type `Optional<TopicListingResult>`. Lines 146
and 152 appear to return different types - line 146 returns
`CompletableFuture.completedFuture(interceptorResult)` (a
`CompletableFuture<Optional<TopicListingResult>>`), while line 152
returns a `TopicListingResult` directly without Optional wrapping. Then line
154 expects `listingResult` to be a `TopicListingResult`. This type chain is
broken and needs to be fixed.
##########
pip/pip-452.md:
##########
@@ -0,0 +1,241 @@
+# PIP-452: Customizable Topic Listing in GetTopicsOfNamespace with properties
+
+# Motivation
+Currently, the CommandGetTopicsOfNamespace logic in the Pulsar Broker is
hard-coded to scan the metadata store (ZooKeeper) for all children nodes under
a namespace.
+
+This implementation limits the flexibility required for complex multi-tenant
scenarios:
+
+No Client Context: The broker cannot distinguish who is asking for the topics
or why. It cannot filter topics based on client properties (This properties may
be related to topic properties).
+
+Inefficient Filtering: For namespaces with millions of topics, the broker must
fetch the full list into memory before applying the topics_pattern regex. There
is no way to "push down" the filtering to the data source (e.g., a database
with an index).
+
+To address these issues, I propose making the topic listing logic pluggable
and extending the protocol to accept client properties.
+
+# Goals
+Protocol: Add a properties field to `CommandGetTopicsOfNamespace` to carry
client-side context.
+
+Broker: Extend `BrokerInterceptor` to allow plugins to intercept the "Get
Topics" request.
+
+Client: Update the Java Client to forward Consumer properties to the lookup
service when using Regex subscriptions.
+
+# High Level Design
+We will modify the Pulsar Protocol to carry a properties map. On the Broker
side, we will add a new method to the BrokerInterceptor interface.
+The default implementation will preserve the existing behavior (fetching from
ZooKeeper). The Broker's connection handler will simply delegate the request to
this strategy.
+
+# Detailed Design
+
+## Public-facing Changes
+
+### Protocol Changes
+Update `PulsarApi.proto` to include the properties field.
+
+PulsarApi.proto
+```protobuf
+message CommandGetTopicsOfNamespace {
+ required uint64 request_id = 1;
+ required string namespace = 2;
+ optional Mode mode = 3 [default = PERSISTENT];
+
+ // Existing fields for filtering and hash optimization
+ optional string topics_pattern = 4;
+ optional string topics_hash = 5;
+
+ // New field: Context properties from the client
+ repeated KeyValue properties = 6;
+}
+```
+
+### REST API & CLI Changes
+
+Add `properties` parameter to the REST API endpoint for listing topics in a
namespace to list topic with specific properties for customizable topic listing.
+
+REST API:
+```
+GET /admin/v2/persistent/{tenant}/{namespace} with a query params
properties=k1=v1,k2=v2
+```
+
+CLI:
+```
+pulsar-admin topics list <tenant>/<namespace> -p k1=v1 -p k2=v2"
+```
+
+## Design & Implementation Details
+### Broker Changes
+
+Add `TopicListingResult` for listing topic results
+
+```java
+package org.apache.pulsar.broker.namespace;
+
+import java.util.List;
+import java.util.Objects;
+
+public record TopicListingResult (List<String> topics, boolean filtered) {
+
+ public TopicListingResult{
+ Objects.requireNonNull(topics, "topics");
+ }
+}
+```
+Update BrokerInterceptor Interface
+Add a default method in BrokerInterceptor. It returns null by default,
indicating no interception (to maintain backward compatibility).
+
+```Java
+
+package org.apache.pulsar.broker.intercept;
+
+import java.util.Optional;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
+import java.util.concurrent.CompletableFuture;
+
+public interface BrokerInterceptor extends AutoCloseable {
+
+ // ... existing methods ...
+
+ /**
+ * Intercept the GetTopicsOfNamespace request.
+ * <p>
+ * This method allows plugins to override the default topic discovery
logic (ZooKeeper scan).
+ * It enables fetching topics from external sources (e.g., databases,
other metadata stores)
+ * based on the provided client context properties.
+ *
+ * @param namespace The namespace being queried.
+ * @param mode The query mode (PERSISTENT, NON_PERSISTENT, or
ALL).
+ * @param topicsPattern Optional regex pattern provided by the client.
+ * @param properties Context properties provided by the client.
+ * @return A CompletableFuture containing the result:
+ * If the future completes with {@code Optional#empty}, proceed to the
next interceptor or broker's default logic to list all topic.
+ */
+ default CompletableFuture<Optional<TopicListingResult>>
interceptGetTopicsOfNamespace(
+ NamespaceName namespace,
+ CommandGetTopicsOfNamespace.Mode mode,
+ Optional<String> topicsPattern,
+ Map<String, String> properties) {
+ return CompletableFuture.completedFuture(Optional.empty());
+ }
+}
Review Comment:
Missing import statement: The code snippet references `Map` interface but
doesn't include the required import statement. Add `import java.util.Map;` to
the imports section.
##########
pip/pip-452.md:
##########
@@ -0,0 +1,241 @@
+# PIP-452: Customizable Topic Listing in GetTopicsOfNamespace with properties
+
+# Motivation
+Currently, the CommandGetTopicsOfNamespace logic in the Pulsar Broker is
hard-coded to scan the metadata store (ZooKeeper) for all children nodes under
a namespace.
+
+This implementation limits the flexibility required for complex multi-tenant
scenarios:
+
+No Client Context: The broker cannot distinguish who is asking for the topics
or why. It cannot filter topics based on client properties (This properties may
be related to topic properties).
+
+Inefficient Filtering: For namespaces with millions of topics, the broker must
fetch the full list into memory before applying the topics_pattern regex. There
is no way to "push down" the filtering to the data source (e.g., a database
with an index).
+
+To address these issues, I propose making the topic listing logic pluggable
and extending the protocol to accept client properties.
+
+# Goals
+Protocol: Add a properties field to `CommandGetTopicsOfNamespace` to carry
client-side context.
+
+Broker: Extend `BrokerInterceptor` to allow plugins to intercept the "Get
Topics" request.
+
+Client: Update the Java Client to forward Consumer properties to the lookup
service when using Regex subscriptions.
+
+# High Level Design
+We will modify the Pulsar Protocol to carry a properties map. On the Broker
side, we will add a new method to the BrokerInterceptor interface.
+The default implementation will preserve the existing behavior (fetching from
ZooKeeper). The Broker's connection handler will simply delegate the request to
this strategy.
+
+# Detailed Design
+
+## Public-facing Changes
+
+### Protocol Changes
+Update `PulsarApi.proto` to include the properties field.
+
+PulsarApi.proto
+```protobuf
+message CommandGetTopicsOfNamespace {
+ required uint64 request_id = 1;
+ required string namespace = 2;
+ optional Mode mode = 3 [default = PERSISTENT];
+
+ // Existing fields for filtering and hash optimization
+ optional string topics_pattern = 4;
+ optional string topics_hash = 5;
+
+ // New field: Context properties from the client
+ repeated KeyValue properties = 6;
+}
+```
+
+### REST API & CLI Changes
+
+Add `properties` parameter to the REST API endpoint for listing topics in a
namespace to list topic with specific properties for customizable topic listing.
+
+REST API:
+```
+GET /admin/v2/persistent/{tenant}/{namespace} with a query params
properties=k1=v1,k2=v2
Review Comment:
Unclear API design: The REST API format shows `properties=k1=v1,k2=v2` at
line 54, but it's not clear how this query parameter should be parsed. Is it a
single parameter with comma-separated key-value pairs, or multiple parameters?
The CLI example at line 59 shows `-p k1=v1 -p k2=v2` which suggests multiple
parameters. The REST API documentation should clarify the expected format and
whether URL encoding is required for values.
##########
pip/pip-452.md:
##########
@@ -0,0 +1,241 @@
+# PIP-452: Customizable Topic Listing in GetTopicsOfNamespace with properties
+
+# Motivation
+Currently, the CommandGetTopicsOfNamespace logic in the Pulsar Broker is
hard-coded to scan the metadata store (ZooKeeper) for all children nodes under
a namespace.
+
+This implementation limits the flexibility required for complex multi-tenant
scenarios:
+
+No Client Context: The broker cannot distinguish who is asking for the topics
or why. It cannot filter topics based on client properties (This properties may
be related to topic properties).
+
+Inefficient Filtering: For namespaces with millions of topics, the broker must
fetch the full list into memory before applying the topics_pattern regex. There
is no way to "push down" the filtering to the data source (e.g., a database
with an index).
+
+To address these issues, I propose making the topic listing logic pluggable
and extending the protocol to accept client properties.
+
+# Goals
+Protocol: Add a properties field to `CommandGetTopicsOfNamespace` to carry
client-side context.
+
+Broker: Extend `BrokerInterceptor` to allow plugins to intercept the "Get
Topics" request.
+
+Client: Update the Java Client to forward Consumer properties to the lookup
service when using Regex subscriptions.
+
+# High Level Design
+We will modify the Pulsar Protocol to carry a properties map. On the Broker
side, we will add a new method to the BrokerInterceptor interface.
+The default implementation will preserve the existing behavior (fetching from
ZooKeeper). The Broker's connection handler will simply delegate the request to
this strategy.
+
+# Detailed Design
+
+## Public-facing Changes
+
+### Protocol Changes
+Update `PulsarApi.proto` to include the properties field.
+
+PulsarApi.proto
+```protobuf
+message CommandGetTopicsOfNamespace {
+ required uint64 request_id = 1;
+ required string namespace = 2;
+ optional Mode mode = 3 [default = PERSISTENT];
+
+ // Existing fields for filtering and hash optimization
+ optional string topics_pattern = 4;
+ optional string topics_hash = 5;
+
+ // New field: Context properties from the client
+ repeated KeyValue properties = 6;
+}
+```
+
+### REST API & CLI Changes
+
+Add `properties` parameter to the REST API endpoint for listing topics in a
namespace to list topic with specific properties for customizable topic listing.
+
+REST API:
+```
+GET /admin/v2/persistent/{tenant}/{namespace} with a query params
properties=k1=v1,k2=v2
+```
+
+CLI:
+```
+pulsar-admin topics list <tenant>/<namespace> -p k1=v1 -p k2=v2"
+```
+
+## Design & Implementation Details
+### Broker Changes
+
+Add `TopicListingResult` for listing topic results
+
+```java
+package org.apache.pulsar.broker.namespace;
+
+import java.util.List;
+import java.util.Objects;
+
+public record TopicListingResult (List<String> topics, boolean filtered) {
+
+ public TopicListingResult{
+ Objects.requireNonNull(topics, "topics");
+ }
+}
+```
+Update BrokerInterceptor Interface
+Add a default method in BrokerInterceptor. It returns null by default,
indicating no interception (to maintain backward compatibility).
+
+```Java
+
+package org.apache.pulsar.broker.intercept;
+
+import java.util.Optional;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
+import java.util.concurrent.CompletableFuture;
+
+public interface BrokerInterceptor extends AutoCloseable {
+
+ // ... existing methods ...
+
+ /**
+ * Intercept the GetTopicsOfNamespace request.
+ * <p>
+ * This method allows plugins to override the default topic discovery
logic (ZooKeeper scan).
+ * It enables fetching topics from external sources (e.g., databases,
other metadata stores)
+ * based on the provided client context properties.
+ *
+ * @param namespace The namespace being queried.
+ * @param mode The query mode (PERSISTENT, NON_PERSISTENT, or
ALL).
+ * @param topicsPattern Optional regex pattern provided by the client.
+ * @param properties Context properties provided by the client.
+ * @return A CompletableFuture containing the result:
+ * If the future completes with {@code Optional#empty}, proceed to the
next interceptor or broker's default logic to list all topic.
+ */
+ default CompletableFuture<Optional<TopicListingResult>>
interceptGetTopicsOfNamespace(
+ NamespaceName namespace,
+ CommandGetTopicsOfNamespace.Mode mode,
+ Optional<String> topicsPattern,
+ Map<String, String> properties) {
+ return CompletableFuture.completedFuture(Optional.empty());
+ }
+}
+```
+
+ServerCnx Logic Update
+In ServerCnx.java, the method internalHandleGetTopicsOfNamespace will be
updated to use this strategy.
+
+```java
+private void internalHandleGetTopicsOfNamespace(String namespace,
NamespaceName namespaceName, long requestId,
+
CommandGetTopicsOfNamespace.Mode mode,
+ Optional<String>
topicsPattern, Optional<String> topicsHash,
+ Map<String, String> properties,
+ Semaphore lookupSemaphore) {
+ BooleanSupplier isPermitRequestCancelled = () ->
!ctx().channel().isActive();
+ TopicListSizeResultCache.ResultHolder listSizeHolder =
+
service.getTopicListSizeResultCache().getTopicListSize(namespaceName.toString(),
mode);
+
+ listSizeHolder.getSizeAsync().thenAccept(initialSize -> {
+ maxTopicListInFlightLimiter.withAcquiredPermits(initialSize,
+ AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY,
isPermitRequestCancelled, initialPermits -> {
+ // Call the Interceptor if exists
+ BrokerInterceptor brokerInterceptor =
getBrokerService().getPulsar().getBrokerInterceptor();
+ CompletableFuture<Optional<TopicListingResult>>
interceptorFuture =
+ brokerInterceptor == null ?
CompletableFuture.completedFuture(Optional.empty()) :
+
brokerInterceptor.interceptGetTopicsOfNamespace(namespaceName, mode,
topicsPattern, properties);
+
+ return interceptorFuture.thenCompose(interceptorResult -> {
+ // Decision branch: Use the interceptor result OR fall
back to the default logic
+ if (interceptorResult != null &&
interceptorResult.isPresent()) {
+ // case A: The interceptor has taken over the request
+ return
CompletableFuture.completedFuture(interceptorResult);
+ } else {
+ // case B: The interceptor did not handle, so the
original query logic is executed.
+ // Wrap List<String> into TopicListingResult(topics,
filtered=false) for unified processing
+ return
getBrokerService().pulsar().getNamespaceService()
+ .getListOfUserTopics(namespaceName, mode)
+ .thenApply(topics -> new
TopicListingResult(topics, false));
+ }
+ }).thenCompose(listingResult -> {
+ // Adjust memory permits based on ACTUAL size
+ long actualSize =
TopicListMemoryLimiter.estimateTopicListSize(listingResult.getTopics());
+ listSizeHolder.updateSize(actualSize);
+
+ return
maxTopicListInFlightLimiter.withUpdatedPermits(initialPermits, actualSize,
+ isPermitRequestCancelled, permits -> {
+
+ List<String> finalTopics =
listingResult.getTopics();
Review Comment:
Missing method accessor: The code at line 156 calls
`listingResult.getTopics()`, but `TopicListingResult` is defined as a Java
record with a field named `topics` (line 73). For Java records, the accessor
method should be `topics()` not `getTopics()`. The same issue occurs at line
162.
```suggestion
long actualSize =
TopicListMemoryLimiter.estimateTopicListSize(listingResult.topics());
listSizeHolder.updateSize(actualSize);
return
maxTopicListInFlightLimiter.withUpdatedPermits(initialPermits, actualSize,
isPermitRequestCancelled, permits -> {
List<String> finalTopics =
listingResult.topics();
```
##########
pip/pip-452.md:
##########
@@ -0,0 +1,241 @@
+# PIP-452: Customizable Topic Listing in GetTopicsOfNamespace with properties
+
+# Motivation
+Currently, the CommandGetTopicsOfNamespace logic in the Pulsar Broker is
hard-coded to scan the metadata store (ZooKeeper) for all children nodes under
a namespace.
+
+This implementation limits the flexibility required for complex multi-tenant
scenarios:
+
+No Client Context: The broker cannot distinguish who is asking for the topics
or why. It cannot filter topics based on client properties (This properties may
be related to topic properties).
+
+Inefficient Filtering: For namespaces with millions of topics, the broker must
fetch the full list into memory before applying the topics_pattern regex. There
is no way to "push down" the filtering to the data source (e.g., a database
with an index).
+
+To address these issues, I propose making the topic listing logic pluggable
and extending the protocol to accept client properties.
+
+# Goals
+Protocol: Add a properties field to `CommandGetTopicsOfNamespace` to carry
client-side context.
+
+Broker: Extend `BrokerInterceptor` to allow plugins to intercept the "Get
Topics" request.
+
+Client: Update the Java Client to forward Consumer properties to the lookup
service when using Regex subscriptions.
+
+# High Level Design
+We will modify the Pulsar Protocol to carry a properties map. On the Broker
side, we will add a new method to the BrokerInterceptor interface.
+The default implementation will preserve the existing behavior (fetching from
ZooKeeper). The Broker's connection handler will simply delegate the request to
this strategy.
+
+# Detailed Design
+
+## Public-facing Changes
+
+### Protocol Changes
+Update `PulsarApi.proto` to include the properties field.
+
+PulsarApi.proto
+```protobuf
+message CommandGetTopicsOfNamespace {
+ required uint64 request_id = 1;
+ required string namespace = 2;
+ optional Mode mode = 3 [default = PERSISTENT];
+
+ // Existing fields for filtering and hash optimization
+ optional string topics_pattern = 4;
+ optional string topics_hash = 5;
+
+ // New field: Context properties from the client
+ repeated KeyValue properties = 6;
+}
+```
+
+### REST API & CLI Changes
+
+Add `properties` parameter to the REST API endpoint for listing topics in a
namespace to list topic with specific properties for customizable topic listing.
+
+REST API:
+```
+GET /admin/v2/persistent/{tenant}/{namespace} with a query params
properties=k1=v1,k2=v2
+```
+
+CLI:
+```
+pulsar-admin topics list <tenant>/<namespace> -p k1=v1 -p k2=v2"
+```
+
+## Design & Implementation Details
+### Broker Changes
+
+Add `TopicListingResult` for listing topic results
+
+```java
+package org.apache.pulsar.broker.namespace;
+
+import java.util.List;
+import java.util.Objects;
+
+public record TopicListingResult (List<String> topics, boolean filtered) {
+
+ public TopicListingResult{
+ Objects.requireNonNull(topics, "topics");
+ }
+}
+```
+Update BrokerInterceptor Interface
+Add a default method in BrokerInterceptor. It returns null by default,
indicating no interception (to maintain backward compatibility).
+
+```Java
+
+package org.apache.pulsar.broker.intercept;
+
+import java.util.Optional;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
+import java.util.concurrent.CompletableFuture;
+
+public interface BrokerInterceptor extends AutoCloseable {
+
+ // ... existing methods ...
+
+ /**
+ * Intercept the GetTopicsOfNamespace request.
+ * <p>
+ * This method allows plugins to override the default topic discovery
logic (ZooKeeper scan).
+ * It enables fetching topics from external sources (e.g., databases,
other metadata stores)
+ * based on the provided client context properties.
+ *
+ * @param namespace The namespace being queried.
+ * @param mode The query mode (PERSISTENT, NON_PERSISTENT, or
ALL).
+ * @param topicsPattern Optional regex pattern provided by the client.
+ * @param properties Context properties provided by the client.
+ * @return A CompletableFuture containing the result:
+ * If the future completes with {@code Optional#empty}, proceed to the
next interceptor or broker's default logic to list all topic.
+ */
+ default CompletableFuture<Optional<TopicListingResult>>
interceptGetTopicsOfNamespace(
+ NamespaceName namespace,
+ CommandGetTopicsOfNamespace.Mode mode,
+ Optional<String> topicsPattern,
+ Map<String, String> properties) {
+ return CompletableFuture.completedFuture(Optional.empty());
+ }
+}
+```
+
+ServerCnx Logic Update
+In ServerCnx.java, the method internalHandleGetTopicsOfNamespace will be
updated to use this strategy.
+
+```java
+private void internalHandleGetTopicsOfNamespace(String namespace,
NamespaceName namespaceName, long requestId,
+
CommandGetTopicsOfNamespace.Mode mode,
+ Optional<String>
topicsPattern, Optional<String> topicsHash,
+ Map<String, String> properties,
+ Semaphore lookupSemaphore) {
+ BooleanSupplier isPermitRequestCancelled = () ->
!ctx().channel().isActive();
+ TopicListSizeResultCache.ResultHolder listSizeHolder =
+
service.getTopicListSizeResultCache().getTopicListSize(namespaceName.toString(),
mode);
+
+ listSizeHolder.getSizeAsync().thenAccept(initialSize -> {
+ maxTopicListInFlightLimiter.withAcquiredPermits(initialSize,
+ AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY,
isPermitRequestCancelled, initialPermits -> {
+ // Call the Interceptor if exists
+ BrokerInterceptor brokerInterceptor =
getBrokerService().getPulsar().getBrokerInterceptor();
+ CompletableFuture<Optional<TopicListingResult>>
interceptorFuture =
+ brokerInterceptor == null ?
CompletableFuture.completedFuture(Optional.empty()) :
+
brokerInterceptor.interceptGetTopicsOfNamespace(namespaceName, mode,
topicsPattern, properties);
+
+ return interceptorFuture.thenCompose(interceptorResult -> {
+ // Decision branch: Use the interceptor result OR fall
back to the default logic
+ if (interceptorResult != null &&
interceptorResult.isPresent()) {
+ // case A: The interceptor has taken over the request
+ return
CompletableFuture.completedFuture(interceptorResult);
+ } else {
+ // case B: The interceptor did not handle, so the
original query logic is executed.
+ // Wrap List<String> into TopicListingResult(topics,
filtered=false) for unified processing
+ return
getBrokerService().pulsar().getNamespaceService()
+ .getListOfUserTopics(namespaceName, mode)
+ .thenApply(topics -> new
TopicListingResult(topics, false));
+ }
+ }).thenCompose(listingResult -> {
+ // Adjust memory permits based on ACTUAL size
+ long actualSize =
TopicListMemoryLimiter.estimateTopicListSize(listingResult.getTopics());
+ listSizeHolder.updateSize(actualSize);
+
+ return
maxTopicListInFlightLimiter.withUpdatedPermits(initialPermits, actualSize,
+ isPermitRequestCancelled, permits -> {
+
+ List<String> finalTopics =
listingResult.getTopics();
+ boolean hasAppliedFilter =
listingResult.isFiltered();
Review Comment:
Missing method accessor: The code calls `listingResult.isFiltered()`, but
`TopicListingResult` is defined as a Java record with a field named `filtered`
(line 73). For Java records, the accessor method should be `filtered()` not
`isFiltered()`. Note that for boolean fields in records, the accessor is simply
the field name, not prefixed with 'is' or 'get'.
```suggestion
boolean hasAppliedFilter =
listingResult.filtered();
```
##########
pip/pip-452.md:
##########
@@ -0,0 +1,241 @@
+# PIP-452: Customizable Topic Listing in GetTopicsOfNamespace with properties
+
+# Motivation
+Currently, the CommandGetTopicsOfNamespace logic in the Pulsar Broker is
hard-coded to scan the metadata store (ZooKeeper) for all children nodes under
a namespace.
+
+This implementation limits the flexibility required for complex multi-tenant
scenarios:
+
+No Client Context: The broker cannot distinguish who is asking for the topics
or why. It cannot filter topics based on client properties (This properties may
be related to topic properties).
Review Comment:
Grammar issue: The parenthetical remark should be clearer. Consider revising
to "These properties may be related to topic properties" or better yet, clarify
what type of properties are being referenced and their relationship to topic
properties.
```suggestion
No Client Context: The broker cannot distinguish who is asking for the
topics or why. It cannot filter topics based on client properties (these client
properties may correspond to, or be derived from, topic properties).
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]