This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 5c4f4cb6416 [improve][broker] Add fine-grain authorization to ns/topic 
management endpoints (#22305)
5c4f4cb6416 is described below

commit 5c4f4cb64167c0ea1a5775f6d991b386ad95c786
Author: Jiwei Guo <[email protected]>
AuthorDate: Wed Mar 20 13:49:54 2024 +0800

    [improve][broker] Add fine-grain authorization to ns/topic management 
endpoints (#22305)
    
    (cherry picked from commit fd34d4ab9c5aa7e0dca961d5a8badae4613fbe8e)
    
    # Conflicts:
    #       
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
    (cherry picked from commit 99eb49a68982271562597d4c4cea127132bc0b35)
    
    # Conflicts:
    #       
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
    #       
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
---
 .../apache/pulsar/broker/admin/AdminResource.java  |   7 +-
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 166 ++++++------
 .../broker/admin/impl/PersistentTopicsBase.java    | 167 ++++++------
 .../apache/pulsar/broker/admin/BaseAuthZTest.java  | 107 ++++++++
 .../pulsar/broker/admin/NamespaceAuthZTest.java    | 117 +++++++++
 .../apache/pulsar/broker/admin/TopicAuthZTest.java | 292 +++++++++++++++++++++
 .../broker/admin/TopicPoliciesAuthZTest.java       |  87 +-----
 7 files changed, 688 insertions(+), 255 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 5eddb4cecf2..07459b9190a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -54,8 +54,6 @@ import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.NamespaceOperation;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.Policies;
-import org.apache.pulsar.common.policies.data.PolicyName;
-import org.apache.pulsar.common.policies.data.PolicyOperation;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.policies.data.SubscribeRate;
@@ -772,10 +770,7 @@ public abstract class AdminResource extends 
PulsarWebResource {
     }
 
     protected CompletableFuture<SchemaCompatibilityStrategy> 
getSchemaCompatibilityStrategyAsync() {
-        return validateTopicPolicyOperationAsync(topicName,
-                PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
-                PolicyOperation.READ)
-                .thenCompose((__) -> 
getSchemaCompatibilityStrategyAsyncWithoutAuth()).whenComplete((__, ex) -> {
+        return 
getSchemaCompatibilityStrategyAsyncWithoutAuth().whenComplete((__, ex) -> {
                     if (ex != null) {
                         log.error("[{}] Failed to get schema compatibility 
strategy of topic {} {}",
                                 clientAppId(), topicName, ex);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 0a602a631c2..fc111cd2370 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -2639,102 +2639,110 @@ public abstract class NamespacesBase extends 
AdminResource {
    }
 
    protected void internalSetProperty(String key, String value, AsyncResponse 
asyncResponse) {
-       validatePoliciesReadOnlyAccess();
-       updatePoliciesAsync(namespaceName, policies -> {
-           policies.properties.put(key, value);
-           return policies;
-       }).thenAccept(v -> {
-           log.info("[{}] Successfully set property for key {} on namespace 
{}", clientAppId(), key,
-                   namespaceName);
-           asyncResponse.resume(Response.noContent().build());
-       }).exceptionally(ex -> {
-           Throwable cause = ex.getCause();
-           log.error("[{}] Failed to set property for key {} on namespace {}", 
clientAppId(), key,
-                   namespaceName, cause);
-           asyncResponse.resume(cause);
-           return null;
-       });
+       validateAdminAccessForTenantAsync(namespaceName.getTenant())
+               .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+               .thenCompose(__ -> updatePoliciesAsync(namespaceName, policies 
-> {
+                   policies.properties.put(key, value);
+                   return policies;
+               }))
+               .thenAccept(v -> {
+                   log.info("[{}] Successfully set property for key {} on 
namespace {}", clientAppId(), key,
+                           namespaceName);
+                   asyncResponse.resume(Response.noContent().build());
+               }).exceptionally(ex -> {
+                   Throwable cause = ex.getCause();
+                   log.error("[{}] Failed to set property for key {} on 
namespace {}", clientAppId(), key,
+                           namespaceName, cause);
+                   asyncResponse.resume(cause);
+                   return null;
+               });
    }
 
    protected void internalSetProperties(Map<String, String> properties, 
AsyncResponse asyncResponse) {
-       validatePoliciesReadOnlyAccess();
-       updatePoliciesAsync(namespaceName, policies -> {
-           policies.properties.putAll(properties);
-           return policies;
-       }).thenAccept(v -> {
-           log.info("[{}] Successfully set {} properties on namespace {}", 
clientAppId(), properties.size(),
-                   namespaceName);
-           asyncResponse.resume(Response.noContent().build());
-       }).exceptionally(ex -> {
-           Throwable cause = ex.getCause();
-           log.error("[{}] Failed to set {} properties on namespace {}", 
clientAppId(), properties.size(),
-                   namespaceName, cause);
-           asyncResponse.resume(cause);
-           return null;
-       });
+       validateAdminAccessForTenantAsync(namespaceName.getTenant())
+               .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+               .thenCompose(__ -> updatePoliciesAsync(namespaceName, policies 
-> {
+                   policies.properties.putAll(properties);
+                   return policies;
+               }))
+               .thenAccept(v -> {
+                   log.info("[{}] Successfully set {} properties on namespace 
{}", clientAppId(), properties.size(),
+                           namespaceName);
+                   asyncResponse.resume(Response.noContent().build());
+               }).exceptionally(ex -> {
+                   Throwable cause = ex.getCause();
+                   log.error("[{}] Failed to set {} properties on namespace 
{}", clientAppId(), properties.size(),
+                           namespaceName, cause);
+                   asyncResponse.resume(cause);
+                   return null;
+               });
    }
 
    protected void internalGetProperty(String key, AsyncResponse asyncResponse) 
{
-        getNamespacePoliciesAsync(namespaceName).thenAccept(policies -> {
-            asyncResponse.resume(policies.properties.get(key));
-        }).exceptionally(ex -> {
-            Throwable cause = ex.getCause();
-            log.error("[{}] Failed to get property for key {} of namespace 
{}", clientAppId(), key,
-                    namespaceName, cause);
-            asyncResponse.resume(cause);
-            return null;
-        });
+       validateAdminAccessForTenantAsync(namespaceName.getTenant())
+               .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+               .thenAccept(policies -> 
asyncResponse.resume(policies.properties.get(key)))
+               .exceptionally(ex -> {
+                   Throwable cause = ex.getCause();
+                   log.error("[{}] Failed to get property for key {} of 
namespace {}", clientAppId(), key,
+                           namespaceName, cause);
+                   asyncResponse.resume(cause);
+                   return null;
+               });
    }
 
    protected void internalGetProperties(AsyncResponse asyncResponse) {
-       getNamespacePoliciesAsync(namespaceName).thenAccept(policies -> {
-           asyncResponse.resume(policies.properties);
-       }).exceptionally(ex -> {
-           Throwable cause = ex.getCause();
-           log.error("[{}] Failed to get properties of namespace {}", 
clientAppId(), namespaceName, cause);
-           asyncResponse.resume(cause);
-           return null;
-       });
+       validateAdminAccessForTenantAsync(namespaceName.getTenant())
+               .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+               .thenAccept(policies -> 
asyncResponse.resume(policies.properties))
+               .exceptionally(ex -> {
+                   Throwable cause = ex.getCause();
+                   log.error("[{}] Failed to get properties of namespace {}", 
clientAppId(), namespaceName, cause);
+                   asyncResponse.resume(cause);
+                   return null;
+               });
    }
 
    protected void internalRemoveProperty(String key, AsyncResponse 
asyncResponse) {
-       validatePoliciesReadOnlyAccess();
-
        AtomicReference<String> oldVal = new AtomicReference<>(null);
-       updatePoliciesAsync(namespaceName, policies -> {
-           oldVal.set(policies.properties.remove(key));
-           return policies;
-       }).thenAccept(v -> {
-           asyncResponse.resume(oldVal.get());
-           log.info("[{}] Successfully remove property for key {} on namespace 
{}", clientAppId(), key,
-                   namespaceName);
-       }).exceptionally(ex -> {
-           Throwable cause = ex.getCause();
-           log.error("[{}] Failed to remove property for key {} on namespace 
{}", clientAppId(), key,
-                   namespaceName, cause);
-           asyncResponse.resume(cause);
-          return null;
-       });
+       validateAdminAccessForTenantAsync(namespaceName.getTenant())
+               .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+               .thenCompose(__ -> updatePoliciesAsync(namespaceName, policies 
-> {
+                   oldVal.set(policies.properties.remove(key));
+                   return policies;
+               })).thenAccept(v -> {
+                   asyncResponse.resume(oldVal.get());
+                   log.info("[{}] Successfully remove property for key {} on 
namespace {}", clientAppId(), key,
+                           namespaceName);
+               }).exceptionally(ex -> {
+                   Throwable cause = ex.getCause();
+                   log.error("[{}] Failed to remove property for key {} on 
namespace {}", clientAppId(), key,
+                           namespaceName, cause);
+                   asyncResponse.resume(cause);
+                   return null;
+               });
    }
 
    protected void internalClearProperties(AsyncResponse asyncResponse) {
-       validatePoliciesReadOnlyAccess();
        AtomicReference<Integer> clearedCount = new AtomicReference<>(0);
-       updatePoliciesAsync(namespaceName, policies -> {
-           clearedCount.set(policies.properties.size());
-           policies.properties.clear();
-           return policies;
-       }).thenAccept(v -> {
-           asyncResponse.resume(Response.noContent().build());
-           log.info("[{}] Successfully clear {} properties on namespace {}", 
clientAppId(), clearedCount.get(),
-                   namespaceName);
-       }).exceptionally(ex -> {
-           Throwable cause = ex.getCause();
-           log.error("[{}] Failed to clear {} properties on namespace {}", 
clientAppId(), clearedCount.get(),
-                   namespaceName, cause);
-           asyncResponse.resume(cause);
-           return null;
-       });
+       validateAdminAccessForTenantAsync(namespaceName.getTenant())
+               .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+               .thenCompose(__ -> updatePoliciesAsync(namespaceName, policies 
-> {
+                   clearedCount.set(policies.properties.size());
+                   policies.properties.clear();
+                   return policies;
+               }))
+               .thenAccept(v -> {
+                   asyncResponse.resume(Response.noContent().build());
+                   log.info("[{}] Successfully clear {} properties on 
namespace {}", clientAppId(), clearedCount.get(),
+                           namespaceName);
+               }).exceptionally(ex -> {
+                   Throwable cause = ex.getCause();
+                   log.error("[{}] Failed to clear {} properties on namespace 
{}", clientAppId(), clearedCount.get(),
+                           namespaceName, cause);
+                   asyncResponse.resume(cause);
+                   return null;
+               });
    }
 
    private CompletableFuture<Void> updatePoliciesAsync(NamespaceName ns, 
Function<Policies, Policies> updateFunction) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 3c1a43a9c69..8f6191938ed 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -503,7 +503,9 @@ public class PersistentTopicsBase extends AdminResource {
     protected void internalCreateMissedPartitions(AsyncResponse asyncResponse) 
{
         getPartitionedTopicMetadataAsync(topicName, false, 
false).thenAccept(metadata -> {
             if (metadata != null) {
-                tryCreatePartitionsAsync(metadata.partitions).thenAccept(v -> {
+                CompletableFuture<Void> future = 
validateNamespaceOperationAsync(topicName.getNamespaceObject(),
+                        NamespaceOperation.CREATE_TOPIC);
+                future.thenCompose(__ -> 
tryCreatePartitionsAsync(metadata.partitions)).thenAccept(v -> {
                     asyncResponse.resume(Response.noContent().build());
                 }).exceptionally(e -> {
                     log.error("[{}] Failed to create partitions for topic {}", 
clientAppId(), topicName);
@@ -741,13 +743,13 @@ public class PersistentTopicsBase extends AdminResource {
 
     protected void internalUnloadTopic(AsyncResponse asyncResponse, boolean 
authoritative) {
         log.info("[{}] Unloading topic {}", clientAppId(), topicName);
-        CompletableFuture<Void> future;
-        if (topicName.isGlobal()) {
-            future = validateGlobalNamespaceOwnershipAsync(namespaceName);
-        } else {
-            future = CompletableFuture.completedFuture(null);
-        }
-       future.thenAccept(__ -> {
+        CompletableFuture<Void> future = 
validateTopicOperationAsync(topicName, TopicOperation.UNLOAD);
+        future.thenCompose(__ -> {
+            if (topicName.isGlobal()) {
+                return validateGlobalNamespaceOwnershipAsync(namespaceName);
+            }
+            return CompletableFuture.completedFuture(null);
+        }).thenAccept(__ -> {
            // If the topic name is a partition name, no need to get partition 
topic metadata again
            if (topicName.isPartitioned()) {
                if (isTransactionCoordinatorAssign(topicName)) {
@@ -965,13 +967,12 @@ public class PersistentTopicsBase extends AdminResource {
 
     private void internalUnloadNonPartitionedTopicAsync(AsyncResponse 
asyncResponse, boolean authoritative) {
         validateTopicOwnershipAsync(topicName, authoritative)
-                .thenCompose(unused -> validateTopicOperationAsync(topicName, 
TopicOperation.UNLOAD)
                         .thenCompose(__ -> getTopicReferenceAsync(topicName))
                         .thenCompose(topic -> topic.close(false))
                         .thenRun(() -> {
                             log.info("[{}] Successfully unloaded topic {}", 
clientAppId(), topicName);
                             asyncResponse.resume(Response.noContent().build());
-                        }))
+                        })
                 .exceptionally(ex -> {
                     // If the exception is not redirect exception we need to 
log it.
                     if (!isNot307And404Exception(ex)) {
@@ -984,16 +985,14 @@ public class PersistentTopicsBase extends AdminResource {
 
     private void internalUnloadTransactionCoordinatorAsync(AsyncResponse 
asyncResponse, boolean authoritative) {
         validateTopicOwnershipAsync(topicName, authoritative)
-                .thenCompose(__ -> validateTopicOperationAsync(topicName, 
TopicOperation.UNLOAD)
-                        .thenCompose(v -> pulsar()
-                                .getTransactionMetadataStoreService()
-                                .removeTransactionMetadataStore(
-                                        
TransactionCoordinatorID.get(topicName.getPartitionIndex())))
-                        .thenRun(() -> {
-                            log.info("[{}] Successfully unloaded tc {}", 
clientAppId(),
-                                    topicName.getPartitionIndex());
-                            asyncResponse.resume(Response.noContent().build());
-                        }))
+                .thenCompose(v -> pulsar()
+                        .getTransactionMetadataStoreService()
+                        .removeTransactionMetadataStore(
+                                
TransactionCoordinatorID.get(topicName.getPartitionIndex())))
+                .thenRun(() -> {
+                    log.info("[{}] Successfully unloaded tc {}", 
clientAppId(), topicName.getPartitionIndex());
+                    asyncResponse.resume(Response.noContent().build());
+                })
                 .exceptionally(ex -> {
                     // If the exception is not redirect exception we need to 
log it.
                     if (!isNot307And404Exception(ex)) {
@@ -1204,13 +1203,13 @@ public class PersistentTopicsBase extends AdminResource 
{
     }
 
     protected void internalGetManagedLedgerInfo(AsyncResponse asyncResponse, 
boolean authoritative) {
-        CompletableFuture<Void> future;
-        if (topicName.isGlobal()) {
-            future = validateGlobalNamespaceOwnershipAsync(namespaceName);
-        } else {
-            future = CompletableFuture.completedFuture(null);
-        }
-        future.thenAccept(__ -> {
+        CompletableFuture<Void> future = 
validateTopicOperationAsync(topicName, TopicOperation.GET_STATS);
+        future.thenCompose(__ -> {
+            if (topicName.isGlobal()) {
+                return validateGlobalNamespaceOwnershipAsync(namespaceName);
+            }
+            return CompletableFuture.completedFuture(null);
+        }).thenAccept(__ -> {
             // If the topic name is a partition name, no need to get partition 
topic metadata again
             if (topicName.isPartitioned()) {
                 
internalGetManagedLedgerInfoForNonPartitionedTopic(asyncResponse);
@@ -1317,13 +1316,13 @@ public class PersistentTopicsBase extends AdminResource 
{
     protected void internalGetPartitionedStats(AsyncResponse asyncResponse, 
boolean authoritative, boolean perPartition,
                                                boolean getPreciseBacklog, 
boolean subscriptionBacklogSize,
                                                boolean 
getEarliestTimeInBacklog) {
-        CompletableFuture<Void> future;
-        if (topicName.isGlobal()) {
-            future = validateGlobalNamespaceOwnershipAsync(namespaceName);
-        } else {
-            future = CompletableFuture.completedFuture(null);
-        }
-        future.thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName,
+        CompletableFuture<Void> future = 
validateTopicOperationAsync(topicName, TopicOperation.GET_STATS);
+        future.thenCompose(__ -> {
+            if (topicName.isGlobal()) {
+                return validateGlobalNamespaceOwnershipAsync(namespaceName);
+            }
+            return  CompletableFuture.completedFuture(null);
+        }).thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName,
                 authoritative, false)).thenAccept(partitionMetadata -> {
             if (partitionMetadata.partitions == 0) {
                 asyncResponse.resume(new RestException(Status.NOT_FOUND,
@@ -1403,14 +1402,15 @@ public class PersistentTopicsBase extends AdminResource 
{
     }
 
     protected void internalGetPartitionedStatsInternal(AsyncResponse 
asyncResponse, boolean authoritative) {
-        CompletableFuture<Void> future;
-        if (topicName.isGlobal()) {
-            future = validateGlobalNamespaceOwnershipAsync(namespaceName);
-        } else {
-            future = CompletableFuture.completedFuture(null);
-        }
-        future.thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, 
authoritative, false))
-                .thenAccept(partitionMetadata -> {
+        CompletableFuture<Void> future = 
validateTopicOperationAsync(topicName, TopicOperation.GET_STATS);
+        future.thenCompose(__ -> {
+            if (topicName.isGlobal()) {
+                return validateGlobalNamespaceOwnershipAsync(namespaceName);
+            } else {
+                return CompletableFuture.completedFuture(null);
+            }
+        }).thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, 
authoritative, false))
+        .thenAccept(partitionMetadata -> {
             if (partitionMetadata.partitions == 0) {
                 asyncResponse.resume(new RestException(Status.NOT_FOUND,
                         
getPartitionedTopicNotFoundErrorMessage(topicName.toString())));
@@ -2213,13 +2213,14 @@ public class PersistentTopicsBase extends AdminResource 
{
 
     protected void internalCreateSubscription(AsyncResponse asyncResponse, 
String subscriptionName,
             MessageIdImpl messageId, boolean authoritative, boolean 
replicated, Map<String, String> properties) {
-        CompletableFuture<Void> ret;
-        if (topicName.isGlobal()) {
-            ret = validateGlobalNamespaceOwnershipAsync(namespaceName);
-        } else {
-            ret = CompletableFuture.completedFuture(null);
-        }
-        ret.thenAccept(__ -> {
+        CompletableFuture<Void> ret = validateTopicOperationAsync(topicName, 
TopicOperation.SUBSCRIBE,
+                subscriptionName);
+        ret.thenCompose(__ -> {
+            if (topicName.isGlobal()) {
+                return validateGlobalNamespaceOwnershipAsync(namespaceName);
+            }
+            return CompletableFuture.completedFuture(null);
+        }).thenAccept(__ -> {
             final MessageIdImpl targetMessageId = messageId == null ? 
(MessageIdImpl) MessageId.latest : messageId;
             log.info("[{}][{}] Creating subscription {} at message id {} with 
properties {}", clientAppId(),
                     topicName, subscriptionName, targetMessageId, properties);
@@ -2378,14 +2379,13 @@ public class PersistentTopicsBase extends AdminResource 
{
     protected void internalUpdateSubscriptionProperties(AsyncResponse 
asyncResponse, String subName,
                                                         Map<String, String> 
subscriptionProperties,
                                                         boolean authoritative) 
{
-        CompletableFuture<Void> future;
-        if (topicName.isGlobal()) {
-            future = validateGlobalNamespaceOwnershipAsync(namespaceName);
-        } else {
-            future = CompletableFuture.completedFuture(null);
-        }
-
-        future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative)).thenAccept(__ -> {
+        CompletableFuture<Void> future = 
validateTopicOperationAsync(topicName, TopicOperation.SUBSCRIBE, subName);
+        future.thenCompose(__ -> {
+            if (topicName.isGlobal()) {
+                return validateGlobalNamespaceOwnershipAsync(namespaceName);
+            }
+            return CompletableFuture.completedFuture(null);
+        }).thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative)).thenAccept(__ -> {
             if (topicName.isPartitioned()) {
                 
internalUpdateSubscriptionPropertiesForNonPartitionedTopic(asyncResponse, 
subName,
                         subscriptionProperties, authoritative);
@@ -2457,14 +2457,13 @@ public class PersistentTopicsBase extends AdminResource 
{
     protected void internalAnalyzeSubscriptionBacklog(AsyncResponse 
asyncResponse, String subName,
                                                       Optional<Position> 
position,
                                                       boolean authoritative) {
-        CompletableFuture<Void> future;
-        if (topicName.isGlobal()) {
-            future = validateGlobalNamespaceOwnershipAsync(namespaceName);
-        } else {
-            future = CompletableFuture.completedFuture(null);
-        }
-
-        future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
+        CompletableFuture<Void> future = 
validateTopicOperationAsync(topicName, TopicOperation.CONSUME, subName);
+        future.thenCompose(__ -> {
+            if (topicName.isGlobal()) {
+                return validateGlobalNamespaceOwnershipAsync(namespaceName);
+            }
+            return CompletableFuture.completedFuture(null);
+        }).thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
                 .thenCompose(__ -> {
                     if (topicName.isPartitioned()) {
                         return CompletableFuture.completedFuture(null);
@@ -2496,14 +2495,13 @@ public class PersistentTopicsBase extends AdminResource 
{
 
     protected void internalGetSubscriptionProperties(AsyncResponse 
asyncResponse, String subName,
                                                         boolean authoritative) 
{
-        CompletableFuture<Void> future;
-        if (topicName.isGlobal()) {
-            future = validateGlobalNamespaceOwnershipAsync(namespaceName);
-        } else {
-            future = CompletableFuture.completedFuture(null);
-        }
-
-        future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative)).thenAccept(__ -> {
+        CompletableFuture<Void> future = 
validateTopicOperationAsync(topicName, TopicOperation.CONSUME, subName);
+        future.thenCompose(__ -> {
+            if (topicName.isGlobal()) {
+                return validateGlobalNamespaceOwnershipAsync(namespaceName);
+            }
+            return CompletableFuture.completedFuture(null);
+        }).thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative)).thenAccept(__ -> {
             if (topicName.isPartitioned()) {
                 
internalGetSubscriptionPropertiesForNonPartitionedTopic(asyncResponse, subName,
                         authoritative);
@@ -4119,13 +4117,14 @@ public class PersistentTopicsBase extends AdminResource 
{
 
     protected void internalTriggerCompaction(AsyncResponse asyncResponse, 
boolean authoritative) {
         log.info("[{}] Trigger compaction on topic {}", clientAppId(), 
topicName);
-        CompletableFuture<Void> future;
-        if (topicName.isGlobal()) {
-            future = validateGlobalNamespaceOwnershipAsync(namespaceName);
-        } else {
-            future = CompletableFuture.completedFuture(null);
-        }
-        future.thenAccept(__ -> {
+        CompletableFuture<Void> future = 
validateTopicOperationAsync(topicName, TopicOperation.COMPACT);
+        future.thenCompose(__ -> {
+            if (topicName.isGlobal()) {
+                return validateGlobalNamespaceOwnershipAsync(namespaceName);
+            } else {
+                return CompletableFuture.completedFuture(null);
+            }
+        }).thenAccept(__ -> {
             // If the topic name is a partition name, no need to get partition 
topic metadata again
             if (topicName.isPartitioned()) {
                 internalTriggerCompactionNonPartitionedTopic(asyncResponse, 
authoritative);
@@ -5344,12 +5343,12 @@ public class PersistentTopicsBase extends AdminResource 
{
     }
 
     protected CompletableFuture<SchemaCompatibilityStrategy> 
internalGetSchemaCompatibilityStrategy(boolean applied) {
+        CompletableFuture<Void> future = 
validateTopicPolicyOperationAsync(topicName,
+                PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, 
PolicyOperation.READ);
         if (applied) {
-            return getSchemaCompatibilityStrategyAsync();
+            return future.thenCompose(__ -> 
getSchemaCompatibilityStrategyAsync());
         }
-        return validateTopicPolicyOperationAsync(topicName,
-                PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
-                PolicyOperation.READ)
+        return future
                 .thenCompose(n -> 
getTopicPoliciesAsyncWithRetry(topicName).thenApply(op -> {
                     if (!op.isPresent()) {
                         return null;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BaseAuthZTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BaseAuthZTest.java
new file mode 100644
index 00000000000..2300a92e2df
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BaseAuthZTest.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import io.jsonwebtoken.Jwts;
+import io.jsonwebtoken.SignatureAlgorithm;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import javax.crypto.SecretKey;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+
+public abstract class BaseAuthZTest extends MockedPulsarServiceBaseTest {
+    protected static final SecretKey SECRET_KEY = 
AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+    private static final String TENANT_ADMIN_SUBJECT = 
UUID.randomUUID().toString();
+    private static final String TENANT_ADMIN_TOKEN = Jwts.builder()
+            .claim("sub", TENANT_ADMIN_SUBJECT).signWith(SECRET_KEY).compact();
+    private static final String BROKER_INTERNAL_CLIENT_SUBJECT = 
"broker_internal";
+    private static final String BROKER_INTERNAL_CLIENT_TOKEN = Jwts.builder()
+            .claim("sub", 
BROKER_INTERNAL_CLIENT_SUBJECT).signWith(SECRET_KEY).compact();
+    private static final String SUPER_USER_SUBJECT = "super-user";
+    private static final String SUPER_USER_TOKEN = Jwts.builder()
+            .claim("sub", SUPER_USER_SUBJECT).signWith(SECRET_KEY).compact();
+    private static final String NOBODY_SUBJECT = "nobody";
+    private static final String NOBODY_TOKEN = Jwts.builder()
+            .claim("sub", NOBODY_SUBJECT).signWith(SECRET_KEY).compact();
+    protected PulsarAdmin superUserAdmin;
+    protected PulsarAdmin tenantManagerAdmin;
+
+    @BeforeClass
+    @Override
+    protected void setup() throws Exception {
+        conf.setAuthorizationEnabled(true);
+        
conf.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
+        conf.setSuperUserRoles(Set.of(SUPER_USER_SUBJECT, 
BROKER_INTERNAL_CLIENT_SUBJECT));
+        conf.setAuthenticationEnabled(true);
+        
conf.setAuthenticationProviders(Set.of(AuthenticationProviderToken.class.getName()));
+        // internal client
+        
conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
+        final Map<String, String> brokerClientAuthParams = new HashMap<>();
+        brokerClientAuthParams.put("token", BROKER_INTERNAL_CLIENT_TOKEN);
+        final String brokerClientAuthParamStr = 
ObjectMapperFactory.getThreadLocal()
+                .writeValueAsString(brokerClientAuthParams);
+        conf.setBrokerClientAuthenticationParameters(brokerClientAuthParamStr);
+
+        Properties properties = conf.getProperties();
+        if (properties == null) {
+            properties = new Properties();
+            conf.setProperties(properties);
+        }
+        properties.put("tokenSecretKey", 
AuthTokenUtils.encodeKeyBase64(SECRET_KEY));
+
+        internalSetup();
+        setupDefaultTenantAndNamespace();
+
+        this.superUserAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(pulsar.getWebServiceAddress())
+                .authentication(new AuthenticationToken(SUPER_USER_TOKEN))
+                .build();
+        final TenantInfo tenantInfo = 
superUserAdmin.tenants().getTenantInfo("public");
+        tenantInfo.getAdminRoles().add(TENANT_ADMIN_SUBJECT);
+        superUserAdmin.tenants().updateTenant("public", tenantInfo);
+        this.tenantManagerAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(pulsar.getWebServiceAddress())
+                .authentication(new AuthenticationToken(TENANT_ADMIN_TOKEN))
+                .build();
+    }
+
+    @Override
+    protected void customizeNewPulsarAdminBuilder(PulsarAdminBuilder 
pulsarAdminBuilder) {
+        pulsarAdminBuilder.authentication(new 
AuthenticationToken(SUPER_USER_TOKEN));
+    }
+
+    @AfterClass
+    @Override
+    protected void cleanup() throws Exception {
+        internalCleanup();
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java
new file mode 100644
index 00000000000..a96de5a1136
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.broker.admin;
+
+import io.jsonwebtoken.Jwts;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import lombok.Cleanup;
+import lombok.SneakyThrows;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-admin")
+public class NamespaceAuthZTest extends BaseAuthZTest {
+    @SneakyThrows
+    @Test
+    public void testProperties() {
+        final String random = UUID.randomUUID().toString();
+        final String namespace = "public/default";
+        final String topic = "persistent://public/default/" + random;
+        final String subject =  UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        superUserAdmin.topics().createNonPartitionedTopic(topic);
+
+        @Cleanup
+        final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(pulsar.getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        // test superuser
+        Map<String, String> properties = new HashMap<>();
+        properties.put("key1", "value1");
+        superUserAdmin.namespaces().setProperties(namespace, properties);
+        superUserAdmin.namespaces().setProperty(namespace, "key2", "value2");
+        superUserAdmin.namespaces().getProperties(namespace);
+        superUserAdmin.namespaces().getProperty(namespace, "key2");
+        superUserAdmin.namespaces().removeProperty(namespace, "key2");
+        superUserAdmin.namespaces().clearProperties(namespace);
+
+        // test tenant manager
+        tenantManagerAdmin.namespaces().setProperties(namespace, properties);
+        tenantManagerAdmin.namespaces().setProperty(namespace, "key2", 
"value2");
+        tenantManagerAdmin.namespaces().getProperties(namespace);
+        tenantManagerAdmin.namespaces().getProperty(namespace, "key2");
+        tenantManagerAdmin.namespaces().removeProperty(namespace, "key2");
+        tenantManagerAdmin.namespaces().clearProperties(namespace);
+
+        // test nobody
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.namespaces().setProperties(namespace, 
properties));
+
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.namespaces().setProperty(namespace, "key2", 
"value2"));
+
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.namespaces().getProperties(namespace));
+
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.namespaces().getProperty(namespace, "key2"));
+
+
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.namespaces().removeProperty(namespace, "key2"));
+
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.namespaces().clearProperties(namespace));
+
+        for (AuthAction action : AuthAction.values()) {
+            superUserAdmin.namespaces().grantPermissionOnNamespace(namespace, 
subject, Set.of(action));
+            
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                    () -> subAdmin.namespaces().setProperties(namespace, 
properties));
+
+            
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                    () -> subAdmin.namespaces().setProperty(namespace, "key2", 
"value2"));
+
+            
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                    () -> subAdmin.namespaces().getProperties(namespace));
+
+            
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                    () -> subAdmin.namespaces().getProperty(namespace, 
"key2"));
+
+
+            
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                    () -> subAdmin.namespaces().removeProperty(namespace, 
"key2"));
+
+            
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                    () -> subAdmin.namespaces().clearProperties(namespace));
+
+            
superUserAdmin.namespaces().revokePermissionsOnNamespace(namespace, subject);
+        }
+        superUserAdmin.topics().delete(topic, true);
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java
new file mode 100644
index 00000000000..9da4f6fcc40
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java
@@ -0,0 +1,292 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.broker.admin;
+
+import io.jsonwebtoken.Jwts;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.Cleanup;
+import lombok.SneakyThrows;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-admin")
+public class TopicAuthZTest extends BaseAuthZTest {
+
+    @SneakyThrows
+    @Test
+    public void testUnloadAndCompactAndTrim() {
+        final String random = UUID.randomUUID().toString();
+        final String topic = "persistent://public/default/" + random;
+        final String subject =  UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        superUserAdmin.topics().createPartitionedTopic(topic, 2);
+
+        @Cleanup
+        final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(pulsar.getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        // test superuser
+        superUserAdmin.topics().unload(topic);
+        superUserAdmin.topics().triggerCompaction(topic);
+        superUserAdmin.topicPolicies().getSchemaCompatibilityStrategy(topic, 
false);
+
+        // test tenant manager
+        tenantManagerAdmin.topics().unload(topic);
+        tenantManagerAdmin.topics().triggerCompaction(topic);
+        
tenantManagerAdmin.topicPolicies().getSchemaCompatibilityStrategy(topic, false);
+
+        // test nobody
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.topics().unload(topic));
+
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.topics().triggerCompaction(topic));
+
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.topicPolicies().getSchemaCompatibilityStrategy(topic, false));
+
+        // Test only super/admin can do the operation, other auth are not 
permitted.
+        for (AuthAction action : AuthAction.values()) {
+            superUserAdmin.topics().grantPermission(topic, subject, 
Set.of(action));
+
+            
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                    () -> subAdmin.topics().unload(topic));
+
+            
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                    () -> subAdmin.topics().triggerCompaction(topic));
+
+            
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                    () -> 
subAdmin.topicPolicies().getSchemaCompatibilityStrategy(topic, false));
+
+            superUserAdmin.topics().revokePermissions(topic, subject);
+        }
+        superUserAdmin.topics().deletePartitionedTopic(topic, true);
+    }
+
+    @Test
+    @SneakyThrows
+    public void testGetManagedLedgerInfo() {
+        final String random = UUID.randomUUID().toString();
+        final String topic = "persistent://public/default/" + random;
+        final String subject =  UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        superUserAdmin.topics().createPartitionedTopic(topic, 2);
+
+        @Cleanup
+        final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(pulsar.getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        // test superuser
+        superUserAdmin.topics().getInternalInfo(topic);
+
+        // test tenant manager
+        tenantManagerAdmin.topics().getInternalInfo(topic);
+
+        // test nobody
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.topics().getInternalInfo(topic));
+
+        for (AuthAction action : AuthAction.values()) {
+            superUserAdmin.topics().grantPermission(topic, subject, 
Set.of(action));
+            if (action == AuthAction.produce || action == AuthAction.consume) {
+                subAdmin.topics().getInternalInfo(topic);
+            } else {
+                
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                        () -> subAdmin.topics().getInternalInfo(topic));
+            }
+            superUserAdmin.topics().revokePermissions(topic, subject);
+        }
+        superUserAdmin.topics().deletePartitionedTopic(topic, true);
+    }
+
+    @Test
+    @SneakyThrows
+    public void testGetPartitionedStatsAndInternalStats() {
+        final String random = UUID.randomUUID().toString();
+        final String topic = "persistent://public/default/" + random;
+        final String subject =  UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        superUserAdmin.topics().createPartitionedTopic(topic, 2);
+
+        @Cleanup
+        final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(pulsar.getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        // test superuser
+        superUserAdmin.topics().getPartitionedStats(topic, false);
+        superUserAdmin.topics().getPartitionedInternalStats(topic);
+
+        // test tenant manager
+        tenantManagerAdmin.topics().getPartitionedStats(topic, false);
+        tenantManagerAdmin.topics().getPartitionedInternalStats(topic);
+
+        // test nobody
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.topics().getPartitionedStats(topic, false));
+
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.topics().getPartitionedInternalStats(topic));
+
+        for (AuthAction action : AuthAction.values()) {
+            superUserAdmin.topics().grantPermission(topic, subject, 
Set.of(action));
+            if (action == AuthAction.produce || action == AuthAction.consume) {
+                subAdmin.topics().getPartitionedStats(topic, false);
+                subAdmin.topics().getPartitionedInternalStats(topic);
+            } else {
+                
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                        () -> subAdmin.topics().getPartitionedStats(topic, 
false));
+
+                
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                        () -> 
subAdmin.topics().getPartitionedInternalStats(topic));
+            }
+            superUserAdmin.topics().revokePermissions(topic, subject);
+        }
+        superUserAdmin.topics().deletePartitionedTopic(topic, true);
+    }
+
+    @Test
+    @SneakyThrows
+    public void 
testCreateSubscriptionAndUpdateSubscriptionPropertiesAndAnalyzeSubscriptionBacklog()
 {
+        final String random = UUID.randomUUID().toString();
+        final String topic = "persistent://public/default/" + random;
+        final String subject =  UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        superUserAdmin.topics().createPartitionedTopic(topic, 2);
+        AtomicInteger suffix = new AtomicInteger(1);
+        @Cleanup
+        final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(pulsar.getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        //
+        superUserAdmin.topics().createSubscription(topic, "test-sub" + 
suffix.incrementAndGet(), MessageId.earliest);
+
+        // test tenant manager
+        tenantManagerAdmin.topics().createSubscription(topic, "test-sub" + 
suffix.incrementAndGet(), MessageId.earliest);
+
+        // test nobody
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.topics().createSubscription(topic, "test-sub" + 
suffix.incrementAndGet(), MessageId.earliest));
+
+        for (AuthAction action : AuthAction.values()) {
+            superUserAdmin.topics().grantPermission(topic, subject, 
Set.of(action));
+            if (action == AuthAction.consume) {
+                subAdmin.topics().createSubscription(topic, "test-sub" + 
suffix.incrementAndGet(), MessageId.earliest);
+            } else {
+                
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                        () -> subAdmin.topics().createSubscription(topic, 
"test-sub" + suffix.incrementAndGet(), MessageId.earliest));
+            }
+            superUserAdmin.topics().revokePermissions(topic, subject);
+        }
+        // test UpdateSubscriptionProperties
+        Map<String, String> properties = new HashMap<>();
+        superUserAdmin.topics().createSubscription(topic, "test-sub", 
MessageId.earliest);
+        // test superuser
+        superUserAdmin.topics().updateSubscriptionProperties(topic, "test-sub" 
, properties);
+        superUserAdmin.topics().getSubscriptionProperties(topic, "test-sub");
+        
superUserAdmin.topics().analyzeSubscriptionBacklog(TopicName.get(topic).getPartition(0).getLocalName(),
 "test-sub", Optional.empty());
+
+        // test tenant manager
+        tenantManagerAdmin.topics().updateSubscriptionProperties(topic, 
"test-sub" , properties);
+        tenantManagerAdmin.topics().getSubscriptionProperties(topic, 
"test-sub");
+        
tenantManagerAdmin.topics().analyzeSubscriptionBacklog(TopicName.get(topic).getPartition(0).getLocalName(),
 "test-sub", Optional.empty());
+
+        // test nobody
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.topics().updateSubscriptionProperties(topic, 
"test-sub", properties));
+
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.topics().getSubscriptionProperties(topic, 
"test-sub"));
+
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.topics().analyzeSubscriptionBacklog(TopicName.get(topic).getPartition(0).getLocalName(),
 "test-sub", Optional.empty()));
+
+        for (AuthAction action : AuthAction.values()) {
+            superUserAdmin.topics().grantPermission(topic, subject, 
Set.of(action));
+            if (action == AuthAction.consume) {
+                subAdmin.topics().updateSubscriptionProperties(topic, 
"test-sub", properties);
+                subAdmin.topics().getSubscriptionProperties(topic, "test-sub");
+                
subAdmin.topics().analyzeSubscriptionBacklog(TopicName.get(topic).getPartition(0).getLocalName(),
 "test-sub", Optional.empty());
+            } else {
+                
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                        () -> 
subAdmin.topics().updateSubscriptionProperties(topic, "test-sub", properties));
+
+                
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                        () -> 
subAdmin.topics().getSubscriptionProperties(topic, "test-sub"));
+
+                
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                        () -> 
subAdmin.topics().analyzeSubscriptionBacklog(TopicName.get(topic).getPartition(0).getLocalName(),
 "test-sub", Optional.empty()));
+            }
+            superUserAdmin.topics().revokePermissions(topic, subject);
+        }
+        superUserAdmin.topics().deletePartitionedTopic(topic, true);
+    }
+
+    @Test
+    @SneakyThrows
+    public void testCreateMissingPartition() {
+        final String random = UUID.randomUUID().toString();
+        final String topic = "persistent://public/default/" + random;
+        final String subject =  UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        superUserAdmin.topics().createPartitionedTopic(topic, 2);
+        AtomicInteger suffix = new AtomicInteger(1);
+        @Cleanup
+        final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(pulsar.getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        //
+        superUserAdmin.topics().createMissedPartitions(topic);
+
+        // test tenant manager
+        tenantManagerAdmin.topics().createMissedPartitions(topic);
+
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.topics().createMissedPartitions(topic));
+
+        for (AuthAction action : AuthAction.values()) {
+            superUserAdmin.topics().grantPermission(topic, subject, 
Set.of(action));
+            
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                    () -> subAdmin.topics().createMissedPartitions(topic));
+            superUserAdmin.topics().revokePermissions(topic, subject);
+        }
+        superUserAdmin.topics().deletePartitionedTopic(topic, true);
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesAuthZTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesAuthZTest.java
index 5966fe81e44..660c8b08378 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesAuthZTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesAuthZTest.java
@@ -20,107 +20,22 @@ package org.apache.pulsar.broker.admin;
 
 import static org.awaitility.Awaitility.await;
 import io.jsonwebtoken.Jwts;
-import io.jsonwebtoken.SignatureAlgorithm;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
 import java.util.Set;
 import java.util.UUID;
-import javax.crypto.SecretKey;
 import lombok.Cleanup;
 import lombok.SneakyThrows;
-import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
-import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
-import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
-import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
 import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.admin.PulsarAdminBuilder;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.impl.auth.AuthenticationToken;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.OffloadPolicies;
 import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
-import org.apache.pulsar.common.policies.data.TenantInfo;
-import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 
-public final class TopicPoliciesAuthZTest extends MockedPulsarServiceBaseTest {
-
-    private PulsarAdmin superUserAdmin;
-
-    private PulsarAdmin tenantManagerAdmin;
-
-    private static final SecretKey SECRET_KEY = 
AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
-    private static final String TENANT_ADMIN_SUBJECT =  
UUID.randomUUID().toString();
-    private static final String TENANT_ADMIN_TOKEN = Jwts.builder()
-            .claim("sub", TENANT_ADMIN_SUBJECT).signWith(SECRET_KEY).compact();
-
-
-    private static final String BROKER_INTERNAL_CLIENT_SUBJECT = 
"broker_internal";
-    private static final String BROKER_INTERNAL_CLIENT_TOKEN = Jwts.builder()
-            .claim("sub", 
BROKER_INTERNAL_CLIENT_SUBJECT).signWith(SECRET_KEY).compact();
-    private static final String SUPER_USER_SUBJECT = "super-user";
-    private static final String SUPER_USER_TOKEN = Jwts.builder()
-            .claim("sub", SUPER_USER_SUBJECT).signWith(SECRET_KEY).compact();
-    private static final String NOBODY_SUBJECT =  "nobody";
-    private static final String NOBODY_TOKEN = Jwts.builder()
-            .claim("sub", NOBODY_SUBJECT).signWith(SECRET_KEY).compact();
-
-
-    @BeforeClass
-    @Override
-    protected void setup() throws Exception {
-        conf.setAuthorizationEnabled(true);
-        
conf.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
-        conf.setSuperUserRoles(Set.of(SUPER_USER_SUBJECT, 
BROKER_INTERNAL_CLIENT_SUBJECT));
-        conf.setAuthenticationEnabled(true);
-        
conf.setAuthenticationProviders(Set.of(AuthenticationProviderToken.class.getName()));
-        // internal client
-        
conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
-        final Map<String, String> brokerClientAuthParams = new HashMap<>();
-        brokerClientAuthParams.put("token", BROKER_INTERNAL_CLIENT_TOKEN);
-        final String brokerClientAuthParamStr = 
ObjectMapperFactory.getThreadLocal()
-                .writeValueAsString(brokerClientAuthParams);
-        conf.setBrokerClientAuthenticationParameters(brokerClientAuthParamStr);
-
-        Properties properties = conf.getProperties();
-        if (properties == null) {
-            properties = new Properties();
-            conf.setProperties(properties);
-        }
-        properties.put("tokenSecretKey", 
AuthTokenUtils.encodeKeyBase64(SECRET_KEY));
-
-        internalSetup();
-        setupDefaultTenantAndNamespace();
-
-        this.superUserAdmin =PulsarAdmin.builder()
-                .serviceHttpUrl(pulsar.getWebServiceAddress())
-                .authentication(new AuthenticationToken(SUPER_USER_TOKEN))
-                .build();
-        final TenantInfo tenantInfo = 
superUserAdmin.tenants().getTenantInfo("public");
-        tenantInfo.getAdminRoles().add(TENANT_ADMIN_SUBJECT);
-        superUserAdmin.tenants().updateTenant("public", tenantInfo);
-        this.tenantManagerAdmin = PulsarAdmin.builder()
-                .serviceHttpUrl(pulsar.getWebServiceAddress())
-                .authentication(new AuthenticationToken(TENANT_ADMIN_TOKEN))
-                .build();
-    }
-
-    @Override
-    protected void customizeNewPulsarAdminBuilder(PulsarAdminBuilder 
pulsarAdminBuilder) {
-        pulsarAdminBuilder.authentication(new 
AuthenticationToken(SUPER_USER_TOKEN));
-    }
-
-    @AfterClass
-    @Override
-    protected void cleanup() throws Exception {
-     internalCleanup();
-    }
+public final class TopicPoliciesAuthZTest extends BaseAuthZTest {
 
 
     @SneakyThrows

Reply via email to