codelipenghui commented on code in PR #23248:
URL: https://github.com/apache/pulsar/pull/23248#discussion_r1742801847


##########
pip/pip-376.md:
##########
@@ -0,0 +1,138 @@
+# PIP-376: Make topic policies service pluggable
+
+# Background knowledge
+
+## Topic policies service and system topics
+
+[PIP-39](https://github.com/apache/pulsar/wiki/PIP-39%3A-Namespace-Change-Events)
 introduces system topics and the topic level policies. However, the topic 
policies service (`TopicPoliciesService`) only has one implementation 
(`SystemTopicBasedTopicPoliciesService`) that depends on the system topics. So 
the following configs are both required (though they're all enabled by default 
now):
+
+```properties
+systemTopicEnabled=true
+topicLevelPoliciesEnabled=true
+```
+
+However, if the Pulsar storage is switched to a S3-based solution (by 
modifying the `managedLedgerStorageClassName` config), using system topics to 
manage topic policies could have low performance (due to the S3 write and read 
latency) and higher cost (due to redundant S3 API calls).
+
+## Badly designed TopicPoliciesService interface
+
+The `TopicPoliciesService` interface is a terrible abstraction because it's 
never designed for 3rd party implementations.
+
+1. Methods that should not be exposed
+
+`addOwnedNamespaceBundleAsync` and `removeOwnedNamespaceBundleAsync` are only 
used internally in `SystemTopicBasedTopicPoliciesService`.
+
+`getTopicPoliciesBypassCacheAsync` is only used in tests. This method just 
creates a reader to replay the `__change_events` topic and construct the topic 
policies map.
+
+2. Confusing and inconsistent `getTopicPolicies` family
+
+There are two overrides of `getTopicPolicies`:
+
+```java
+TopicPolicies getTopicPolicies(TopicName topicName, boolean isGlobal) throws 
TopicPoliciesCacheNotInitException;
+TopicPolicies getTopicPolicies(TopicName topicName) throws 
TopicPoliciesCacheNotInitException;

Review Comment:
   It looks like this one is only used by tests, we can remove if from the 
interface



##########
pip/pip-376.md:
##########
@@ -0,0 +1,138 @@
+# PIP-376: Make topic policies service pluggable
+
+# Background knowledge
+
+## Topic policies service and system topics
+
+[PIP-39](https://github.com/apache/pulsar/wiki/PIP-39%3A-Namespace-Change-Events)
 introduces system topics and the topic level policies. However, the topic 
policies service (`TopicPoliciesService`) only has one implementation 
(`SystemTopicBasedTopicPoliciesService`) that depends on the system topics. So 
the following configs are both required (though they're all enabled by default 
now):
+
+```properties
+systemTopicEnabled=true
+topicLevelPoliciesEnabled=true
+```
+
+However, if the Pulsar storage is switched to a S3-based solution (by 
modifying the `managedLedgerStorageClassName` config), using system topics to 
manage topic policies could have low performance (due to the S3 write and read 
latency) and higher cost (due to redundant S3 API calls).
+
+## Badly designed TopicPoliciesService interface
+
+The `TopicPoliciesService` interface is a terrible abstraction because it's 
never designed for 3rd party implementations.
+
+1. Methods that should not be exposed
+
+`addOwnedNamespaceBundleAsync` and `removeOwnedNamespaceBundleAsync` are only 
used internally in `SystemTopicBasedTopicPoliciesService`.
+
+`getTopicPoliciesBypassCacheAsync` is only used in tests. This method just 
creates a reader to replay the `__change_events` topic and construct the topic 
policies map.
+
+2. Confusing and inconsistent `getTopicPolicies` family
+
+There are two overrides of `getTopicPolicies`:
+
+```java
+TopicPolicies getTopicPolicies(TopicName topicName, boolean isGlobal) throws 
TopicPoliciesCacheNotInitException;

Review Comment:
   It looks like this one is only used by 
`SystemTopicBasedTopicPoliciesService` internally and tests, we can remove it 
from the interface.



##########
pip/pip-376.md:
##########
@@ -0,0 +1,138 @@
+# PIP-376: Make topic policies service pluggable
+
+# Background knowledge
+
+## Topic policies service and system topics
+
+[PIP-39](https://github.com/apache/pulsar/wiki/PIP-39%3A-Namespace-Change-Events)
 introduces system topics and the topic level policies. However, the topic 
policies service (`TopicPoliciesService`) only has one implementation 
(`SystemTopicBasedTopicPoliciesService`) that depends on the system topics. So 
the following configs are both required (though they're all enabled by default 
now):
+
+```properties
+systemTopicEnabled=true
+topicLevelPoliciesEnabled=true
+```
+
+However, if the Pulsar storage is switched to a S3-based solution (by 
modifying the `managedLedgerStorageClassName` config), using system topics to 
manage topic policies could have low performance (due to the S3 write and read 
latency) and higher cost (due to redundant S3 API calls).
+
+## Badly designed TopicPoliciesService interface
+
+The `TopicPoliciesService` interface is a terrible abstraction because it's 
never designed for 3rd party implementations.
+
+1. Methods that should not be exposed
+
+`addOwnedNamespaceBundleAsync` and `removeOwnedNamespaceBundleAsync` are only 
used internally in `SystemTopicBasedTopicPoliciesService`.
+
+`getTopicPoliciesBypassCacheAsync` is only used in tests. This method just 
creates a reader to replay the `__change_events` topic and construct the topic 
policies map.
+
+2. Confusing and inconsistent `getTopicPolicies` family
+
+There are two overrides of `getTopicPolicies`:
+
+```java
+TopicPolicies getTopicPolicies(TopicName topicName, boolean isGlobal) throws 
TopicPoliciesCacheNotInitException;
+TopicPolicies getTopicPolicies(TopicName topicName) throws 
TopicPoliciesCacheNotInitException;
+```
+
+The 2nd method is equivalent to `getTopicPolicies(topicName, false)`.
+
+The semantics of these two methods are very intuitive. First, they are not 
synchronous methods that are blocked by waiting a future. They just start an 
asynchronous policies initialization (creating a reader to replay the 
`__change_events` topic), and then try to get the policies from the cache. If 
the asynchronous policies initialization didn't start, just throw 
`TopicPoliciesCacheNotInitException`.
+
+As you can see, these two methods are hard to use. And they are also only used 
in tests except for the `getTopicPoliciesAsyncWithRetry` method, which uses a 
user-provided executor and backoff policy to call `getTopicPolicies` until 
`TopicPoliciesCacheNotInitException` is not thrown:
+
+```java
+    default CompletableFuture<Optional<TopicPolicies>> 
getTopicPoliciesAsyncWithRetry(TopicName topicName,
+              final Backoff backoff, ScheduledExecutorService 
scheduledExecutorService, boolean isGlobal) {
+```
+
+The `getTopicPolicies` overrides are only called in tests while 
`getTopicPoliciesAsyncWithRetry` is used in the core. It would be very 
confusing to users that want to implement their own topic policies service. 
They have to look deeply into the Pulsar's source code to know these details.
+
+https://github.com/apache/pulsar/pull/21231 adds two asynchronous overrides 
that are much more friendly to users:
+
+```java
+CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(@Nonnull 
TopicName topicName, boolean isGlobal);
+CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(@Nonnull 
TopicName topicName);
+```
+
+Now we have **5** asynchronous get methods. What's worse, unlike 
`getTopicPolicies`, `getTopicPoliciesAsync(topic)` is not equivalent to 
`getTopicPoliciesAsync(topic, false)`, instead,
+- `getTopicPoliciesAsync(topic)` will try getting local policies first, if 
absent, then try getting global policies
+- `getTopicPoliciesAsync(topic, true)` will try getting global policies
+- `getTopicPoliciesAsync(topic, false)` will try getting local policies
+
+It should be noted that the topic policies support global policies across 
clusters since [#12517](https://github.com/apache/pulsar/pull/12517). So there 
are local policies and global policies.
+
+Currently,
+- `getTopicPoliciesAsync(TopicName)` is used in 
`BrokerService#getTopicPoliciesBypassSystemTopic`, which is called when 
initializing the topic policies of `PersistentTopic` objects. So it uses the 
"local-first" semantics in case the global policies or local policies is 
deleted.
+- `getTopicPoliciesAsyncWithRetry` is used in 
`AdminResource#getTopicPoliciesAsyncWithRetry`, which is called for all topic 
policies admin APIs. Since these admin APIs all have a `isGlobal` field to 
indicate whether to get the global policies, it uses the "local only" or 
"global only" semantics.
+- Other methods are never called directly other than tests.
+
+Actually there is the 6th method `getTopicPoliciesIfExists`, which just tries 
to get the local topic policies from the cache. This method is the most clear 
and simple in all these stuffs.
+
+```java
+    TopicPolicies getTopicPoliciesIfExists(TopicName topicName);
+```

Review Comment:
   This one is also confusing for plugin. If there is no policies for a topic, 
user can also implement by 
   
   ```
   CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(@Nonnull 
TopicName topicName, boolean isGlobal);
   ```
   
   I remember it's also added for resolving the issues from SystemTopic based 
solution which don't want to wait for the message replay from the system topic.



##########
pip/pip-376.md:
##########
@@ -0,0 +1,138 @@
+# PIP-376: Make topic policies service pluggable
+
+# Background knowledge
+
+## Topic policies service and system topics
+
+[PIP-39](https://github.com/apache/pulsar/wiki/PIP-39%3A-Namespace-Change-Events)
 introduces system topics and the topic level policies. However, the topic 
policies service (`TopicPoliciesService`) only has one implementation 
(`SystemTopicBasedTopicPoliciesService`) that depends on the system topics. So 
the following configs are both required (though they're all enabled by default 
now):
+
+```properties
+systemTopicEnabled=true
+topicLevelPoliciesEnabled=true
+```
+
+However, if the Pulsar storage is switched to a S3-based solution (by 
modifying the `managedLedgerStorageClassName` config), using system topics to 
manage topic policies could have low performance (due to the S3 write and read 
latency) and higher cost (due to redundant S3 API calls).
+
+## Badly designed TopicPoliciesService interface
+
+The `TopicPoliciesService` interface is a terrible abstraction because it's 
never designed for 3rd party implementations.
+
+1. Methods that should not be exposed
+
+`addOwnedNamespaceBundleAsync` and `removeOwnedNamespaceBundleAsync` are only 
used internally in `SystemTopicBasedTopicPoliciesService`.
+
+`getTopicPoliciesBypassCacheAsync` is only used in tests. This method just 
creates a reader to replay the `__change_events` topic and construct the topic 
policies map.
+
+2. Confusing and inconsistent `getTopicPolicies` family
+
+There are two overrides of `getTopicPolicies`:
+
+```java
+TopicPolicies getTopicPolicies(TopicName topicName, boolean isGlobal) throws 
TopicPoliciesCacheNotInitException;
+TopicPolicies getTopicPolicies(TopicName topicName) throws 
TopicPoliciesCacheNotInitException;
+```
+
+The 2nd method is equivalent to `getTopicPolicies(topicName, false)`.
+
+The semantics of these two methods are very intuitive. First, they are not 
synchronous methods that are blocked by waiting a future. They just start an 
asynchronous policies initialization (creating a reader to replay the 
`__change_events` topic), and then try to get the policies from the cache. If 
the asynchronous policies initialization didn't start, just throw 
`TopicPoliciesCacheNotInitException`.
+
+As you can see, these two methods are hard to use. And they are also only used 
in tests except for the `getTopicPoliciesAsyncWithRetry` method, which uses a 
user-provided executor and backoff policy to call `getTopicPolicies` until 
`TopicPoliciesCacheNotInitException` is not thrown:
+
+```java
+    default CompletableFuture<Optional<TopicPolicies>> 
getTopicPoliciesAsyncWithRetry(TopicName topicName,
+              final Backoff backoff, ScheduledExecutorService 
scheduledExecutorService, boolean isGlobal) {
+```

Review Comment:
   Can we remove this one from the interface? Since this is added for fixing 
the bug from the SystemTopic based solution. And finally, we should also 
improve it with a pure async way for the SystemTopic based solution, but it's 
not required in this proposal.



##########
pip/pip-376.md:
##########
@@ -0,0 +1,138 @@
+# PIP-376: Make topic policies service pluggable
+
+# Background knowledge
+
+## Topic policies service and system topics
+
+[PIP-39](https://github.com/apache/pulsar/wiki/PIP-39%3A-Namespace-Change-Events)
 introduces system topics and the topic level policies. However, the topic 
policies service (`TopicPoliciesService`) only has one implementation 
(`SystemTopicBasedTopicPoliciesService`) that depends on the system topics. So 
the following configs are both required (though they're all enabled by default 
now):
+
+```properties
+systemTopicEnabled=true
+topicLevelPoliciesEnabled=true
+```
+
+However, if the Pulsar storage is switched to a S3-based solution (by 
modifying the `managedLedgerStorageClassName` config), using system topics to 
manage topic policies could have low performance (due to the S3 write and read 
latency) and higher cost (due to redundant S3 API calls).
+
+## Badly designed TopicPoliciesService interface
+
+The `TopicPoliciesService` interface is a terrible abstraction because it's 
never designed for 3rd party implementations.
+
+1. Methods that should not be exposed
+
+`addOwnedNamespaceBundleAsync` and `removeOwnedNamespaceBundleAsync` are only 
used internally in `SystemTopicBasedTopicPoliciesService`.
+
+`getTopicPoliciesBypassCacheAsync` is only used in tests. This method just 
creates a reader to replay the `__change_events` topic and construct the topic 
policies map.
+
+2. Confusing and inconsistent `getTopicPolicies` family
+
+There are two overrides of `getTopicPolicies`:
+
+```java
+TopicPolicies getTopicPolicies(TopicName topicName, boolean isGlobal) throws 
TopicPoliciesCacheNotInitException;
+TopicPolicies getTopicPolicies(TopicName topicName) throws 
TopicPoliciesCacheNotInitException;
+```
+
+The 2nd method is equivalent to `getTopicPolicies(topicName, false)`.
+
+The semantics of these two methods are very intuitive. First, they are not 
synchronous methods that are blocked by waiting a future. They just start an 
asynchronous policies initialization (creating a reader to replay the 
`__change_events` topic), and then try to get the policies from the cache. If 
the asynchronous policies initialization didn't start, just throw 
`TopicPoliciesCacheNotInitException`.
+
+As you can see, these two methods are hard to use. And they are also only used 
in tests except for the `getTopicPoliciesAsyncWithRetry` method, which uses a 
user-provided executor and backoff policy to call `getTopicPolicies` until 
`TopicPoliciesCacheNotInitException` is not thrown:
+
+```java
+    default CompletableFuture<Optional<TopicPolicies>> 
getTopicPoliciesAsyncWithRetry(TopicName topicName,
+              final Backoff backoff, ScheduledExecutorService 
scheduledExecutorService, boolean isGlobal) {
+```
+
+The `getTopicPolicies` overrides are only called in tests while 
`getTopicPoliciesAsyncWithRetry` is used in the core. It would be very 
confusing to users that want to implement their own topic policies service. 
They have to look deeply into the Pulsar's source code to know these details.
+
+https://github.com/apache/pulsar/pull/21231 adds two asynchronous overrides 
that are much more friendly to users:
+
+```java
+CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(@Nonnull 
TopicName topicName, boolean isGlobal);
+CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(@Nonnull 
TopicName topicName);
+```
+
+Now we have **5** asynchronous get methods. What's worse, unlike 
`getTopicPolicies`, `getTopicPoliciesAsync(topic)` is not equivalent to 
`getTopicPoliciesAsync(topic, false)`, instead,
+- `getTopicPoliciesAsync(topic)` will try getting local policies first, if 
absent, then try getting global policies

Review Comment:
   Is it better to not expose this method? Users should only implement 
   
   ```
   CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(@Nonnull 
TopicName topicName, boolean isGlobal);
   ```
   
   Let's the broker decide when to get local topic policy or global policy. At 
lease for now, I don't think users should customize this behavior



##########
pip/pip-376.md:
##########
@@ -0,0 +1,138 @@
+# PIP-376: Make topic policies service pluggable
+
+# Background knowledge
+
+## Topic policies service and system topics
+
+[PIP-39](https://github.com/apache/pulsar/wiki/PIP-39%3A-Namespace-Change-Events)
 introduces system topics and the topic level policies. However, the topic 
policies service (`TopicPoliciesService`) only has one implementation 
(`SystemTopicBasedTopicPoliciesService`) that depends on the system topics. So 
the following configs are both required (though they're all enabled by default 
now):
+
+```properties
+systemTopicEnabled=true
+topicLevelPoliciesEnabled=true
+```
+
+However, if the Pulsar storage is switched to a S3-based solution (by 
modifying the `managedLedgerStorageClassName` config), using system topics to 
manage topic policies could have low performance (due to the S3 write and read 
latency) and higher cost (due to redundant S3 API calls).
+
+## Badly designed TopicPoliciesService interface
+
+The `TopicPoliciesService` interface is a terrible abstraction because it's 
never designed for 3rd party implementations.
+
+1. Methods that should not be exposed
+
+`addOwnedNamespaceBundleAsync` and `removeOwnedNamespaceBundleAsync` are only 
used internally in `SystemTopicBasedTopicPoliciesService`.
+
+`getTopicPoliciesBypassCacheAsync` is only used in tests. This method just 
creates a reader to replay the `__change_events` topic and construct the topic 
policies map.
+
+2. Confusing and inconsistent `getTopicPolicies` family
+
+There are two overrides of `getTopicPolicies`:
+
+```java
+TopicPolicies getTopicPolicies(TopicName topicName, boolean isGlobal) throws 
TopicPoliciesCacheNotInitException;
+TopicPolicies getTopicPolicies(TopicName topicName) throws 
TopicPoliciesCacheNotInitException;
+```
+
+The 2nd method is equivalent to `getTopicPolicies(topicName, false)`.
+
+The semantics of these two methods are very intuitive. First, they are not 
synchronous methods that are blocked by waiting a future. They just start an 
asynchronous policies initialization (creating a reader to replay the 
`__change_events` topic), and then try to get the policies from the cache. If 
the asynchronous policies initialization didn't start, just throw 
`TopicPoliciesCacheNotInitException`.
+
+As you can see, these two methods are hard to use. And they are also only used 
in tests except for the `getTopicPoliciesAsyncWithRetry` method, which uses a 
user-provided executor and backoff policy to call `getTopicPolicies` until 
`TopicPoliciesCacheNotInitException` is not thrown:
+
+```java
+    default CompletableFuture<Optional<TopicPolicies>> 
getTopicPoliciesAsyncWithRetry(TopicName topicName,
+              final Backoff backoff, ScheduledExecutorService 
scheduledExecutorService, boolean isGlobal) {
+```
+
+The `getTopicPolicies` overrides are only called in tests while 
`getTopicPoliciesAsyncWithRetry` is used in the core. It would be very 
confusing to users that want to implement their own topic policies service. 
They have to look deeply into the Pulsar's source code to know these details.
+
+https://github.com/apache/pulsar/pull/21231 adds two asynchronous overrides 
that are much more friendly to users:
+
+```java
+CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(@Nonnull 
TopicName topicName, boolean isGlobal);
+CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(@Nonnull 
TopicName topicName);
+```
+
+Now we have **5** asynchronous get methods. What's worse, unlike 
`getTopicPolicies`, `getTopicPoliciesAsync(topic)` is not equivalent to 
`getTopicPoliciesAsync(topic, false)`, instead,
+- `getTopicPoliciesAsync(topic)` will try getting local policies first, if 
absent, then try getting global policies
+- `getTopicPoliciesAsync(topic, true)` will try getting global policies
+- `getTopicPoliciesAsync(topic, false)` will try getting local policies
+
+It should be noted that the topic policies support global policies across 
clusters since [#12517](https://github.com/apache/pulsar/pull/12517). So there 
are local policies and global policies.
+
+Currently,
+- `getTopicPoliciesAsync(TopicName)` is used in 
`BrokerService#getTopicPoliciesBypassSystemTopic`, which is called when 
initializing the topic policies of `PersistentTopic` objects. So it uses the 
"local-first" semantics in case the global policies or local policies is 
deleted.
+- `getTopicPoliciesAsyncWithRetry` is used in 
`AdminResource#getTopicPoliciesAsyncWithRetry`, which is called for all topic 
policies admin APIs. Since these admin APIs all have a `isGlobal` field to 
indicate whether to get the global policies, it uses the "local only" or 
"global only" semantics.
+- Other methods are never called directly other than tests.
+
+Actually there is the 6th method `getTopicPoliciesIfExists`, which just tries 
to get the local topic policies from the cache. This method is the most clear 
and simple in all these stuffs.
+
+```java
+    TopicPolicies getTopicPoliciesIfExists(TopicName topicName);
+```
+
+# Motivation
+
+Make `TopicPoliciesService` pluggable so that users can customize topic 
policies service via another backend metadata store.
+
+# Goals
+
+## In Scope
+
+Redesign a clear and simple `TopicPoliciesService` interface for users to 
customize.
+
+# High Level Design
+
+Add a `topicPoliciesServiceClassName` config to specify the topic policies 
service class name. If the class name is not the default 
`SystemTopicBasedTopicPoliciesService`, `systemTopicEnabled` will not be 
required unless the implementation requires it.
+
+# Detailed Design
+
+## Design & Implementation Details
+
+1. Add a unified method to get topic policies.
+
+```java
+    enum GetType {
+        LOCAL_FIRST, // try getting the local topic policies, if not present, 
then get the global policies
+        GLOBAL_ONLY, // only get the global policies
+        LOCAL_ONLY,  // only get the local policies
+    }
+    CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(TopicName 
topicName, GetType type);
+```

Review Comment:
   Do we really need to expose LOCAL_FIRST? The priority of the policies 
defined in Pulsar
   
   local_topic > global_topic > namespace > dymamic_config > broker.conf
   
   Do we want users to customize the priority? If no, we can only let users to 
implement the method to get local policies and global policies.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to