This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a0f908e80d [improve][test] Add topic policy test for topic API 
(#22546)
3a0f908e80d is described below

commit 3a0f908e80d0863920a1258362fd782e95fe8f17
Author: Jiwei Guo <techno...@apache.org>
AuthorDate: Mon Apr 22 19:47:03 2024 +0800

    [improve][test] Add topic policy test for topic API (#22546)
---
 .../org/apache/pulsar/broker/admin/AuthZTest.java  |  113 ++
 .../apache/pulsar/broker/admin/TopicAuthZTest.java | 1121 ++++++++++++++------
 .../admin/TransactionAndSchemaAuthZTest.java       |  359 +++++++
 3 files changed, 1270 insertions(+), 323 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AuthZTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AuthZTest.java
new file mode 100644
index 00000000000..a710a03970d
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AuthZTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import io.jsonwebtoken.Jwts;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.pulsar.broker.authorization.AuthorizationService;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
+import org.apache.pulsar.common.policies.data.TopicOperation;
+import org.apache.pulsar.security.MockedPulsarStandalone;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import static org.mockito.Mockito.doReturn;
+
+public class AuthZTest extends MockedPulsarStandalone {
+
+    protected PulsarAdmin superUserAdmin;
+
+    protected PulsarAdmin tenantManagerAdmin;
+
+    protected AuthorizationService authorizationService;
+
+    protected AuthorizationService orignalAuthorizationService;
+
+    protected static final String TENANT_ADMIN_SUBJECT =  
UUID.randomUUID().toString();
+    protected static final String TENANT_ADMIN_TOKEN = Jwts.builder()
+            .claim("sub", TENANT_ADMIN_SUBJECT).signWith(SECRET_KEY).compact();
+
+    @BeforeMethod(alwaysRun = true)
+    public void before() throws IllegalAccessException {
+        orignalAuthorizationService = 
getPulsarService().getBrokerService().getAuthorizationService();
+        authorizationService = Mockito.spy(orignalAuthorizationService);
+        FieldUtils.writeField(getPulsarService().getBrokerService(), 
"authorizationService",
+                authorizationService, true);
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void after() throws IllegalAccessException {
+        FieldUtils.writeField(getPulsarService().getBrokerService(), 
"authorizationService",
+                orignalAuthorizationService, true);
+    }
+
+    protected AtomicBoolean setAuthorizationTopicOperationChecker(String role, 
Object operation) {
+        AtomicBoolean execFlag = new AtomicBoolean(false);
+        if (operation instanceof TopicOperation) {
+            Mockito.doAnswer(invocationOnMock -> {
+                String role_ = invocationOnMock.getArgument(2);
+                if (role.equals(role_)) {
+                    TopicOperation operation_ = 
invocationOnMock.getArgument(1);
+                    Assert.assertEquals(operation_, operation);
+                }
+                execFlag.set(true);
+                return invocationOnMock.callRealMethod();
+            
}).when(authorizationService).allowTopicOperationAsync(Mockito.any(), 
Mockito.any(), Mockito.any(),
+                    Mockito.any(), Mockito.any());
+        } else if (operation instanceof NamespaceOperation) {
+            doReturn(true)
+                    
.when(authorizationService).isValidOriginalPrincipal(Mockito.any(), 
Mockito.any(), Mockito.any());
+            Mockito.doAnswer(invocationOnMock -> {
+                String role_ = invocationOnMock.getArgument(2);
+                if (role.equals(role_)) {
+                    TopicOperation operation_ = 
invocationOnMock.getArgument(1);
+                    Assert.assertEquals(operation_, operation);
+                }
+                execFlag.set(true);
+                return invocationOnMock.callRealMethod();
+            
}).when(authorizationService).allowNamespaceOperationAsync(Mockito.any(), 
Mockito.any(), Mockito.any(),
+                    Mockito.any(), Mockito.any());
+        } else {
+            throw new IllegalArgumentException("");
+        }
+
+
+        return execFlag;
+    }
+
+    protected void createTopic(String topic, boolean partitioned) throws 
Exception {
+        if (partitioned) {
+            superUserAdmin.topics().createPartitionedTopic(topic, 2);
+        } else {
+            superUserAdmin.topics().createNonPartitionedTopic(topic);
+        }
+    }
+
+    protected void deleteTopic(String topic, boolean partitioned) throws 
Exception {
+        if (partitioned) {
+            superUserAdmin.topics().deletePartitionedTopic(topic, true);
+        } else {
+            superUserAdmin.topics().delete(topic, true);
+        }
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java
index 3c0596d531f..ad47ac74a89 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java
@@ -20,6 +20,7 @@
 package org.apache.pulsar.broker.admin;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import io.jsonwebtoken.Jwts;
 import lombok.Cleanup;
 import lombok.SneakyThrows;
@@ -38,59 +39,48 @@ import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.auth.AuthenticationToken;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride;
+import org.apache.pulsar.common.policies.data.BacklogQuota;
+import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
+import org.apache.pulsar.common.policies.data.DispatchRate;
 import org.apache.pulsar.common.policies.data.EntryFilters;
+import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
+import org.apache.pulsar.common.policies.data.OffloadPolicies;
+import org.apache.pulsar.common.policies.data.PersistencePolicies;
+import org.apache.pulsar.common.policies.data.PolicyName;
+import org.apache.pulsar.common.policies.data.PolicyOperation;
+import org.apache.pulsar.common.policies.data.PublishRate;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.SubscribeRate;
 import org.apache.pulsar.common.policies.data.TenantInfo;
-import org.apache.pulsar.security.MockedPulsarStandalone;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.pulsar.broker.authorization.AuthorizationService;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.transaction.Transaction;
-import org.apache.pulsar.common.naming.SystemTopicNames;
-import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.NamespaceOperation;
 import org.apache.pulsar.common.policies.data.TopicOperation;
-import org.apache.pulsar.common.schema.SchemaInfo;
-import org.apache.pulsar.common.schema.SchemaType;
-import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.mockito.Mockito;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
+import static org.mockito.Mockito.doReturn;
 
 @Test(groups = "broker-admin")
-public class TopicAuthZTest extends MockedPulsarStandalone {
-
-    private PulsarAdmin superUserAdmin;
-
-    private PulsarAdmin tenantManagerAdmin;
-
-    private AuthorizationService authorizationService;
-
-    private AuthorizationService orignalAuthorizationService;
-
-    private static final String TENANT_ADMIN_SUBJECT =  
UUID.randomUUID().toString();
-    private static final String TENANT_ADMIN_TOKEN = Jwts.builder()
-            .claim("sub", TENANT_ADMIN_SUBJECT).signWith(SECRET_KEY).compact();
+public class TopicAuthZTest extends AuthZTest {
 
     @SneakyThrows
     @BeforeClass(alwaysRun = true)
     public void setup() {
         configureTokenAuthentication();
         configureDefaultAuthorization();
-        enableTransaction();
         start();
-        createTransactionCoordinatorAssign(16);
         this.superUserAdmin =PulsarAdmin.builder()
                 .serviceHttpUrl(getPulsarService().getWebServiceAddress())
                 .authentication(new AuthenticationToken(SUPER_USER_TOKEN))
@@ -103,16 +93,6 @@ public class TopicAuthZTest extends MockedPulsarStandalone {
                 .authentication(new AuthenticationToken(TENANT_ADMIN_TOKEN))
                 .build();
 
-        superUserAdmin.tenants().createTenant("pulsar", tenantInfo);
-        superUserAdmin.namespaces().createNamespace("pulsar/system");
-    }
-
-    protected void createTransactionCoordinatorAssign(int numPartitionsOfTC) 
throws MetadataStoreException {
-        getPulsarService().getPulsarResources()
-                .getNamespaceResources()
-                .getPartitionedTopicResources()
-                
.createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
-                        new PartitionedTopicMetadata(numPartitionsOfTC));
     }
 
     @SneakyThrows
@@ -127,48 +107,28 @@ public class TopicAuthZTest extends 
MockedPulsarStandalone {
         close();
     }
 
-    @BeforeMethod
-    public void before() throws IllegalAccessException {
-        orignalAuthorizationService = 
getPulsarService().getBrokerService().getAuthorizationService();
-        authorizationService = Mockito.spy(orignalAuthorizationService);
-        FieldUtils.writeField(getPulsarService().getBrokerService(), 
"authorizationService",
-                authorizationService, true);
-    }
+    private AtomicBoolean setAuthorizationPolicyOperationChecker(String role, 
Object policyName, Object operation) {
+        AtomicBoolean execFlag = new AtomicBoolean(false);
+        if (operation instanceof PolicyOperation ) {
 
-    @AfterMethod
-    public void after() throws IllegalAccessException {
-        FieldUtils.writeField(getPulsarService().getBrokerService(), 
"authorizationService",
-                orignalAuthorizationService, true);
-    }
+            doReturn(true)
+            
.when(authorizationService).isValidOriginalPrincipal(Mockito.any(), 
Mockito.any(), Mockito.any());
 
-    private AtomicBoolean setAuthorizationTopicOperationChecker(String role, 
Object operation) {
-        AtomicBoolean execFlag = new AtomicBoolean(false);
-        if (operation instanceof TopicOperation) {
-            Mockito.doAnswer(invocationOnMock -> {
-            String role_ = invocationOnMock.getArgument(2);
-            if (role.equals(role_)) {
-                TopicOperation operation_ = invocationOnMock.getArgument(1);
-                Assert.assertEquals(operation_, operation);
-            }
-            execFlag.set(true);
-            return invocationOnMock.callRealMethod();
-        }).when(authorizationService).allowTopicOperationAsync(Mockito.any(), 
Mockito.any(), Mockito.any(),
-                Mockito.any(), Mockito.any());
-        } else if (operation instanceof NamespaceOperation) {
             Mockito.doAnswer(invocationOnMock -> {
-            String role_ = invocationOnMock.getArgument(2);
-            if (role.equals(role_)) {
-                TopicOperation operation_ = invocationOnMock.getArgument(1);
-                Assert.assertEquals(operation_, operation);
-            }
-            execFlag.set(true);
-            return invocationOnMock.callRealMethod();
-        
}).when(authorizationService).allowNamespaceOperationAsync(Mockito.any(), 
Mockito.any(), Mockito.any(),
-                Mockito.any(), Mockito.any());
+                String role_ = invocationOnMock.getArgument(4);
+                if (role.equals(role_)) {
+                    PolicyName policyName_ = invocationOnMock.getArgument(1);
+                    PolicyOperation operation_ = 
invocationOnMock.getArgument(2);
+                    Assert.assertEquals(operation_, operation);
+                    Assert.assertEquals(policyName_, policyName);
+                }
+                execFlag.set(true);
+                return invocationOnMock.callRealMethod();
+            
}).when(authorizationService).allowTopicPolicyOperationAsync(Mockito.any(), 
Mockito.any(), Mockito.any(),
+                    Mockito.any(), Mockito.any(), Mockito.any());
         } else {
             throw new IllegalArgumentException("");
         }
-
         return execFlag;
     }
 
@@ -1213,171 +1173,8 @@ public class TopicAuthZTest extends 
MockedPulsarStandalone {
         deleteTopic(topic, false);
     }
 
-    public enum OperationAuthType {
-        Lookup,
-        Produce,
-        Consume,
-        AdminOrSuperUser,
-        NOAuth
-    }
 
-    private final String testTopic = "persistent://public/default/" + 
UUID.randomUUID().toString();
-    @FunctionalInterface
-    public interface ThrowingBiConsumer<T> {
-        void accept(T t) throws PulsarAdminException;
-    }
 
-    @DataProvider(name = "authFunction")
-    public Object[][] authFunction () throws Exception {
-        String sub = "my-sub";
-        createTopic(testTopic, false);
-        @Cleanup final PulsarClient pulsarClient = PulsarClient.builder()
-                .serviceUrl(getPulsarService().getBrokerServiceUrl())
-                .authentication(new AuthenticationToken(SUPER_USER_TOKEN))
-                .enableTransaction(true)
-                .build();
-        @Cleanup final Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(testTopic).create();
-
-        @Cleanup final Consumer<String> consumer = 
pulsarClient.newConsumer(Schema.STRING)
-                .topic(testTopic)
-                .subscriptionName(sub)
-                .subscribe();
-
-        Transaction transaction = 
pulsarClient.newTransaction().withTransactionTimeout(5, TimeUnit.MINUTES)
-                .build().get();
-        MessageIdImpl messageId = (MessageIdImpl) 
producer.newMessage().value("test message").send();
-
-        consumer.acknowledgeAsync(messageId, transaction).get();
-
-        return new Object[][]{
-                // SCHEMA
-               new Object[] {
-                       (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.schemas().getSchemaInfo(testTopic),
-                       OperationAuthType.Lookup
-               },
-                new Object[] {
-                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.schemas().getSchemaInfo(
-                                testTopic, 0),
-                        OperationAuthType.Lookup
-                },
-                new Object[] {
-                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.schemas().getAllSchemas(
-                                testTopic),
-                        OperationAuthType.Lookup
-                },
-                new Object[] {
-                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.schemas().createSchema(testTopic,
-                                
SchemaInfo.builder().type(SchemaType.STRING).build()),
-                        OperationAuthType.Produce
-                },
-                // TODO: improve the authorization check for testCompatibility 
and deleteSchema
-                new Object[] {
-                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.schemas().testCompatibility(
-                                testTopic, 
SchemaInfo.builder().type(SchemaType.STRING).build()),
-                        OperationAuthType.AdminOrSuperUser
-                },
-                new Object[] {
-                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.schemas().deleteSchema(
-                                testTopic),
-                        OperationAuthType.AdminOrSuperUser
-                },
-
-                // TRANSACTION
-
-                // Modify transaction coordinator
-                new Object[] {
-                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.transactions()
-                                .abortTransaction(transaction.getTxnID()),
-                        OperationAuthType.AdminOrSuperUser
-                },
-                new Object[] {
-                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.transactions()
-                                .scaleTransactionCoordinators(17),
-                        OperationAuthType.AdminOrSuperUser
-                },
-                // TODO: fix authorization check of check transaction 
coordinator stats.
-                // Check transaction coordinator stats
-                new Object[] {
-                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.transactions()
-                                .getCoordinatorInternalStats(1, false),
-                        OperationAuthType.NOAuth
-                },
-                new Object[] {
-                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.transactions()
-                                .getCoordinatorStats(),
-                        OperationAuthType.AdminOrSuperUser
-                },
-                new Object[] {
-                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.transactions()
-                                .getSlowTransactionsByCoordinatorId(1, 5, 
TimeUnit.SECONDS),
-                        OperationAuthType.NOAuth
-                },
-                new Object[] {
-                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.transactions()
-                                
.getTransactionMetadata(transaction.getTxnID()),
-                        OperationAuthType.NOAuth
-                },
-                new Object[] {
-                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.transactions()
-                                .listTransactionCoordinators(),
-                        OperationAuthType.NOAuth
-                },
-                new Object[] {
-                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.transactions()
-                                .getSlowTransactions(5, TimeUnit.SECONDS),
-                        OperationAuthType.AdminOrSuperUser
-                },
-
-                // TODO: Check the authorization of the topic when get stats 
of TB or TP
-                // Check stats related to transaction buffer and transaction 
pending ack
-                new Object[] {
-                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.transactions()
-                                .getPendingAckInternalStats(testTopic, sub, 
false),
-                        OperationAuthType.NOAuth
-                },
-                new Object[] {
-                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.transactions()
-                                .getPendingAckStats(testTopic, sub, false),
-                        OperationAuthType.NOAuth
-                },
-                new Object[] {
-                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.transactions()
-                                .getPositionStatsInPendingAck(testTopic, sub, 
messageId.getLedgerId(),
-                                        messageId.getEntryId(), null),
-                        OperationAuthType.NOAuth
-                },
-                new Object[] {
-                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.transactions()
-                                .getTransactionBufferInternalStats(testTopic, 
false),
-                        OperationAuthType.NOAuth
-                },
-                new Object[] {
-                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.transactions()
-                                .getTransactionBufferStats(testTopic, false),
-                        OperationAuthType.NOAuth
-                },
-                new Object[] {
-                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.transactions()
-                                .getTransactionBufferStats(testTopic, false),
-                        OperationAuthType.NOAuth
-                },
-                new Object[] {
-                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.transactions()
-                                
.getTransactionInBufferStats(transaction.getTxnID(), testTopic),
-                        OperationAuthType.NOAuth
-                },
-                new Object[] {
-                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.transactions()
-                                
.getTransactionInBufferStats(transaction.getTxnID(), testTopic),
-                        OperationAuthType.NOAuth
-                },
-                new Object[] {
-                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.transactions()
-                                
.getTransactionInPendingAckStats(transaction.getTxnID(), testTopic, sub),
-                        OperationAuthType.NOAuth
-                },
-        };
-    }
 
     @Test
     @SneakyThrows
@@ -1410,82 +1207,7 @@ public class TopicAuthZTest extends 
MockedPulsarStandalone {
         deleteTopic(topic, false);
     }
 
-    @Test(dataProvider = "authFunction")
-    public void 
testSchemaAndTransactionAuthorization(ThrowingBiConsumer<PulsarAdmin> 
adminConsumer, OperationAuthType topicOpType)
-            throws Exception {
-        final String subject =  UUID.randomUUID().toString();
-        final String token = Jwts.builder()
-                .claim("sub", subject).signWith(SECRET_KEY).compact();
-
-
-        @Cleanup
-        final PulsarAdmin subAdmin = PulsarAdmin.builder()
-                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
-                .authentication(new AuthenticationToken(token))
-                .build();
-        // test tenant manager
-        if (topicOpType != OperationAuthType.AdminOrSuperUser) {
-            adminConsumer.accept(tenantManagerAdmin);
-        }
-
-        if (topicOpType != OperationAuthType.NOAuth) {
-            
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
-                    () -> adminConsumer.accept(subAdmin));
-        }
-
-        AtomicBoolean execFlag = null;
-        if (topicOpType == OperationAuthType.Lookup) {
-            execFlag = setAuthorizationTopicOperationChecker(subject, 
TopicOperation.LOOKUP);
-        } else if (topicOpType == OperationAuthType.Produce) {
-            execFlag = setAuthorizationTopicOperationChecker(subject, 
TopicOperation.PRODUCE);
-        } else if (topicOpType == OperationAuthType.Consume) {
-            execFlag = setAuthorizationTopicOperationChecker(subject, 
TopicOperation.CONSUME);
-        }
-
-        for (AuthAction action : AuthAction.values()) {
-            superUserAdmin.topics().grantPermission(testTopic, subject, 
Set.of(action));
-
-            if (authActionMatchOperation(topicOpType, action)) {
-                adminConsumer.accept(subAdmin);
-            } else {
-                
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
-                        () -> adminConsumer.accept(subAdmin));
-            }
-            superUserAdmin.topics().revokePermissions(testTopic, subject);
-        }
-
-        if (execFlag != null) {
-            Assert.assertTrue(execFlag.get());
-        }
-
-    }
 
-    private boolean authActionMatchOperation(OperationAuthType 
operationAuthType, AuthAction action) {
-        switch (operationAuthType) {
-            case Lookup -> {
-                if (AuthAction.consume == action || AuthAction.produce == 
action) {
-                    return true;
-                }
-            }
-            case Consume -> {
-                if (AuthAction.consume == action) {
-                    return true;
-                }
-            }
-            case Produce -> {
-                if (AuthAction.produce == action) {
-                    return true;
-                }
-            }
-            case AdminOrSuperUser -> {
-                return false;
-            }
-            case NOAuth -> {
-                return true;
-            }
-        }
-        return false;
-    }
 
     @Test
     @SneakyThrows
@@ -1507,8 +1229,10 @@ public class TopicAuthZTest extends 
MockedPulsarStandalone {
         // test tenant manager
         tenantManagerAdmin.topicPolicies().getEntryFiltersPerTopic(topic, 
true);
 
+        AtomicBoolean execFlag = 
setAuthorizationPolicyOperationChecker(subject, PolicyName.ENTRY_FILTERS, 
PolicyOperation.READ);
         Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
                 () -> subAdmin.topicPolicies().getEntryFiltersPerTopic(topic, 
false));
+        Assert.assertTrue(execFlag.get());
 
         for (AuthAction action : AuthAction.values()) {
             superUserAdmin.topics().grantPermission(topic, subject, 
Set.of(action));
@@ -1553,8 +1277,10 @@ public class TopicAuthZTest extends 
MockedPulsarStandalone {
         // test tenant manager
         tenantManagerAdmin.topicPolicies().setEntryFiltersPerTopic(topic, 
entryFilter);
 
+        AtomicBoolean execFlag = 
setAuthorizationPolicyOperationChecker(subject, PolicyName.ENTRY_FILTERS, 
PolicyOperation.WRITE);
         Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
                 () -> subAdmin.topicPolicies().setEntryFiltersPerTopic(topic, 
entryFilter));
+        Assert.assertTrue(execFlag.get());
 
         for (AuthAction action : AuthAction.values()) {
             superUserAdmin.topics().grantPermission(topic, subject, 
Set.of(action));
@@ -1656,19 +1382,768 @@ public class TopicAuthZTest extends 
MockedPulsarStandalone {
         deleteTopic(topic, false);
     }
 
-    private void createTopic(String topic, boolean partitioned) throws 
Exception {
-        if (partitioned) {
-            superUserAdmin.topics().createPartitionedTopic(topic, 2);
-        } else {
-            superUserAdmin.topics().createNonPartitionedTopic(topic);
-        }
+    @Test
+    @SneakyThrows
+    public void testList() {
+        final String random = UUID.randomUUID().toString();
+        final String topic = "persistent://public/default/" + random;
+        final String subject =  UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        createTopic(topic, false);
+        @Cleanup
+        final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        AtomicBoolean execFlag = 
setAuthorizationTopicOperationChecker(subject, NamespaceOperation.GET_TOPICS);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.topics().getList("public/default"));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationTopicOperationChecker(subject, 
NamespaceOperation.GET_TOPICS);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.topics().getPartitionedTopicList("public/default"));
+        Assert.assertTrue(execFlag.get());
+
+        deleteTopic(topic, false);
     }
 
-    private void deleteTopic(String topic, boolean partitioned) throws 
Exception {
-        if (partitioned) {
-            superUserAdmin.topics().deletePartitionedTopic(topic, true);
-        } else {
-            superUserAdmin.topics().delete(topic, true);
-        }
+    @Test
+    @SneakyThrows
+    public void testPermissionsOnTopic() {
+        final String random = UUID.randomUUID().toString();
+        final String topic = "persistent://public/default/" + random;
+        final String subject =  UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        createTopic(topic, false);
+        @Cleanup
+        final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        //
+        superUserAdmin.topics().getPermissions(topic);
+        superUserAdmin.topics().grantPermission(topic, subject, 
Sets.newHashSet(AuthAction.functions));
+        superUserAdmin.topics().revokePermissions(topic, subject);
+
+        // test tenant manager
+        tenantManagerAdmin.topics().getPermissions(topic);
+        tenantManagerAdmin.topics().grantPermission(topic, subject, 
Sets.newHashSet(AuthAction.functions));
+        tenantManagerAdmin.topics().revokePermissions(topic, subject);
+
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.topics().getPermissions(topic));
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.topics().grantPermission(topic, subject, 
Sets.newHashSet(AuthAction.functions)));
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.topics().revokePermissions(topic, subject));
+
+        deleteTopic(topic, false);
+    }
+
+    @Test
+    @SneakyThrows
+    public void testOffloadPolicies() {
+        final String random = UUID.randomUUID().toString();
+        final String topic = "persistent://public/default/" + random;
+        final String subject =  UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        createTopic(topic, false);
+        @Cleanup
+        final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        AtomicBoolean execFlag = 
setAuthorizationPolicyOperationChecker(subject, PolicyName.OFFLOAD, 
PolicyOperation.READ);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.topicPolicies().getOffloadPolicies(topic));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.OFFLOAD, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.topicPolicies().setOffloadPolicies(topic, 
OffloadPolicies.builder().build()));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.OFFLOAD, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.topicPolicies().removeOffloadPolicies(topic));
+        Assert.assertTrue(execFlag.get());
+
+        deleteTopic(topic, false);
+    }
+
+    @Test
+    @SneakyThrows
+    public void testMaxUnackedMessagesOnConsumer() {
+        final String random = UUID.randomUUID().toString();
+        final String topic = "persistent://public/default/" + random;
+        final String subject =  UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        createTopic(topic, false);
+        @Cleanup
+        final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        AtomicBoolean execFlag = 
setAuthorizationPolicyOperationChecker(subject, PolicyName.MAX_UNACKED, 
PolicyOperation.READ);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.topicPolicies().getMaxUnackedMessagesOnConsumer(topic));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.MAX_UNACKED, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.topicPolicies().setMaxUnackedMessagesOnConsumer(topic, 2));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.MAX_UNACKED, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.topicPolicies().removeMaxUnackedMessagesOnConsumer(topic));
+        Assert.assertTrue(execFlag.get());
+
+        deleteTopic(topic, false);
+    }
+
+    @Test
+    @SneakyThrows
+    public void testDeduplicationSnapshotInterval() {
+        final String random = UUID.randomUUID().toString();
+        final String topic = "persistent://public/default/" + random;
+        final String subject =  UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        createTopic(topic, false);
+        @Cleanup
+        final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        AtomicBoolean execFlag = 
setAuthorizationPolicyOperationChecker(subject, 
PolicyName.DEDUPLICATION_SNAPSHOT, PolicyOperation.READ);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.topicPolicies().getDeduplicationSnapshotInterval(topic));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.DEDUPLICATION_SNAPSHOT, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.topicPolicies().setDeduplicationSnapshotInterval(topic, 2));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.DEDUPLICATION_SNAPSHOT, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.topicPolicies().removeDeduplicationSnapshotInterval(topic));
+        Assert.assertTrue(execFlag.get());
+
+        deleteTopic(topic, false);
+    }
+
+    @Test
+    @SneakyThrows
+    public void testInactiveTopicPolicies() {
+        final String random = UUID.randomUUID().toString();
+        final String topic = "persistent://public/default/" + random;
+        final String subject =  UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        createTopic(topic, false);
+        @Cleanup
+        final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        AtomicBoolean execFlag = 
setAuthorizationPolicyOperationChecker(subject, PolicyName.INACTIVE_TOPIC, 
PolicyOperation.READ);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.topicPolicies().getInactiveTopicPolicies(topic));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.INACTIVE_TOPIC, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.topicPolicies().setInactiveTopicPolicies(topic, 
new InactiveTopicPolicies()));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.INACTIVE_TOPIC, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.topicPolicies().removeInactiveTopicPolicies(topic));
+        Assert.assertTrue(execFlag.get());
+
+        deleteTopic(topic, false);
+    }
+
+    @Test
+    @SneakyThrows
+    public void testMaxUnackedMessagesOnSubscription() {
+        final String random = UUID.randomUUID().toString();
+        final String topic = "persistent://public/default/" + random;
+        final String subject =  UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        createTopic(topic, false);
+        @Cleanup
+        final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        AtomicBoolean execFlag = 
setAuthorizationPolicyOperationChecker(subject, PolicyName.MAX_UNACKED, 
PolicyOperation.READ);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.topicPolicies().getMaxUnackedMessagesOnSubscription(topic));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.MAX_UNACKED, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.topicPolicies().setMaxUnackedMessagesOnSubscription(topic, 2));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.MAX_UNACKED, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.topicPolicies().removeMaxUnackedMessagesOnSubscription(topic));
+        Assert.assertTrue(execFlag.get());
+
+        deleteTopic(topic, false);
+    }
+
+    @Test
+    @SneakyThrows
+    public void testDelayedDeliveryPolicies() {
+        final String random = UUID.randomUUID().toString();
+        final String topic = "persistent://public/default/" + random;
+        final String subject =  UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        createTopic(topic, false);
+        @Cleanup
+        final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        AtomicBoolean execFlag = 
setAuthorizationPolicyOperationChecker(subject, PolicyName.DELAYED_DELIVERY, 
PolicyOperation.READ);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.topicPolicies().getDelayedDeliveryPolicy(topic));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.DELAYED_DELIVERY, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.topicPolicies().setDelayedDeliveryPolicy(topic, 
DelayedDeliveryPolicies.builder().build()));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.DELAYED_DELIVERY, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.topicPolicies().removeDelayedDeliveryPolicy(topic));
+        Assert.assertTrue(execFlag.get());
+
+        deleteTopic(topic, false);
+    }
+
+    @Test
+    @SneakyThrows
+    public void testAutoSubscriptionCreation() {
+        final String random = UUID.randomUUID().toString();
+        final String topic = "persistent://public/default/" + random;
+        final String subject =  UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        createTopic(topic, false);
+        @Cleanup
+        final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        AtomicBoolean execFlag = 
setAuthorizationPolicyOperationChecker(subject, 
PolicyName.AUTO_SUBSCRIPTION_CREATION, PolicyOperation.READ);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.topicPolicies().getAutoSubscriptionCreation(topic, false));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.AUTO_SUBSCRIPTION_CREATION, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.topicPolicies().setAutoSubscriptionCreation(topic, 
AutoSubscriptionCreationOverride.builder().build()));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.AUTO_SUBSCRIPTION_CREATION, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.topicPolicies().removeAutoSubscriptionCreation(topic));
+        Assert.assertTrue(execFlag.get());
+
+        deleteTopic(topic, false);
+    }
+
+    @Test
+    @SneakyThrows
+    public void testSubscribeRate() {
+        final String random = UUID.randomUUID().toString();
+        final String topic = "persistent://public/default/" + random;
+        final String subject =  UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        createTopic(topic, false);
+        @Cleanup
+        final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        AtomicBoolean execFlag = 
setAuthorizationPolicyOperationChecker(subject, PolicyName.RATE, 
PolicyOperation.READ);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.topicPolicies().getSubscribeRate(topic));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.RATE, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.topicPolicies().setSubscribeRate(topic, new 
SubscribeRate()));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.RATE, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.topicPolicies().removeSubscribeRate(topic));
+        Assert.assertTrue(execFlag.get());
+
+        deleteTopic(topic, false);
+    }
+
+    @Test
+    @SneakyThrows
+    public void testSubscriptionTypesEnabled() {
+        final String random = UUID.randomUUID().toString();
+        final String topic = "persistent://public/default/" + random;
+        final String subject =  UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        createTopic(topic, false);
+        @Cleanup
+        final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        AtomicBoolean execFlag = 
setAuthorizationPolicyOperationChecker(subject, 
PolicyName.SUBSCRIPTION_AUTH_MODE, PolicyOperation.READ);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.topicPolicies().getSubscriptionTypesEnabled(topic));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.SUBSCRIPTION_AUTH_MODE, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.topicPolicies().setSubscriptionTypesEnabled(topic, new HashSet<>()));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.SUBSCRIPTION_AUTH_MODE, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.topicPolicies().removeSubscriptionTypesEnabled(topic));
+        Assert.assertTrue(execFlag.get());
+
+        deleteTopic(topic, false);
+    }
+
+    @Test
+    @SneakyThrows
+    public void testPublishRate() {
+        final String random = UUID.randomUUID().toString();
+        final String topic = "persistent://public/default/" + random;
+        final String subject =  UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        createTopic(topic, false);
+        @Cleanup
+        final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        AtomicBoolean execFlag = 
setAuthorizationPolicyOperationChecker(subject, PolicyName.RATE, 
PolicyOperation.READ);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.topicPolicies().getPublishRate(topic));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.RATE, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.topicPolicies().setPublishRate(topic, new 
PublishRate()));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.RATE, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.topicPolicies().removePublishRate(topic));
+        Assert.assertTrue(execFlag.get());
+
+        deleteTopic(topic, false);
+    }
+
+    @Test
+    @SneakyThrows
+    public void testMaxConsumersPerSubscription() {
+        final String random = UUID.randomUUID().toString();
+        final String topic = "persistent://public/default/" + random;
+        final String subject =  UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        createTopic(topic, false);
+        @Cleanup
+        final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        AtomicBoolean execFlag = 
setAuthorizationPolicyOperationChecker(subject, PolicyName.MAX_CONSUMERS, 
PolicyOperation.READ);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.topicPolicies().getMaxConsumersPerSubscription(topic));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.MAX_CONSUMERS, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.topicPolicies().setMaxConsumersPerSubscription(topic, 2));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.MAX_CONSUMERS, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.topicPolicies().removeMaxConsumersPerSubscription(topic));
+        Assert.assertTrue(execFlag.get());
+
+        deleteTopic(topic, false);
+    }
+
+    @Test
+    @SneakyThrows
+    public void testCompactionThreshold() {
+        final String random = UUID.randomUUID().toString();
+        final String topic = "persistent://public/default/" + random;
+        final String subject =  UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        createTopic(topic, false);
+        @Cleanup
+        final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        AtomicBoolean execFlag = 
setAuthorizationPolicyOperationChecker(subject, PolicyName.COMPACTION, 
PolicyOperation.READ);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.topicPolicies().getCompactionThreshold(topic));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.COMPACTION, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.topicPolicies().setCompactionThreshold(topic, 
20000));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.COMPACTION, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.topicPolicies().removeCompactionThreshold(topic));
+        Assert.assertTrue(execFlag.get());
+
+        deleteTopic(topic, false);
+    }
+
+    @Test
+    @SneakyThrows
+    public void testDispatchRate() {
+        final String random = UUID.randomUUID().toString();
+        final String topic = "persistent://public/default/" + random;
+        final String subject =  UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        createTopic(topic, false);
+        @Cleanup
+        final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        AtomicBoolean execFlag = 
setAuthorizationPolicyOperationChecker(subject, PolicyName.RATE, 
PolicyOperation.READ);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.topicPolicies().getDispatchRate(topic));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.RATE, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.topicPolicies().setDispatchRate(topic, 
DispatchRate.builder().build()));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.RATE, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.topicPolicies().removeDispatchRate(topic));
+        Assert.assertTrue(execFlag.get());
+
+        deleteTopic(topic, false);
+    }
+
+    @Test
+    @SneakyThrows
+    public void testMaxConsumers() {
+        final String random = UUID.randomUUID().toString();
+        final String topic = "persistent://public/default/" + random;
+        final String subject =  UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        createTopic(topic, false);
+        @Cleanup
+        final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        AtomicBoolean execFlag = 
setAuthorizationPolicyOperationChecker(subject, PolicyName.MAX_CONSUMERS, 
PolicyOperation.READ);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.topicPolicies().getMaxConsumers(topic));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.MAX_CONSUMERS, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.topicPolicies().setMaxConsumers(topic, 2));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.MAX_CONSUMERS, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.topicPolicies().removeMaxConsumers(topic));
+        Assert.assertTrue(execFlag.get());
+
+        deleteTopic(topic, false);
+    }
+
+    @Test
+    @SneakyThrows
+    public void testMaxProducers() {
+        final String random = UUID.randomUUID().toString();
+        final String topic = "persistent://public/default/" + random;
+        final String subject =  UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        createTopic(topic, false);
+        @Cleanup
+        final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        AtomicBoolean execFlag = 
setAuthorizationPolicyOperationChecker(subject, PolicyName.MAX_PRODUCERS, 
PolicyOperation.READ);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.topicPolicies().getMaxProducers(topic));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.MAX_PRODUCERS, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.topicPolicies().setMaxProducers(topic, 2));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.MAX_PRODUCERS, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.topicPolicies().removeMaxProducers(topic));
+        Assert.assertTrue(execFlag.get());
+
+        deleteTopic(topic, false);
+    }
+
+    @Test
+    @SneakyThrows
+    public void testReplicatorDispatchRate() {
+        final String random = UUID.randomUUID().toString();
+        final String topic = "persistent://public/default/" + random;
+        final String subject =  UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        createTopic(topic, false);
+        @Cleanup
+        final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        AtomicBoolean execFlag = 
setAuthorizationPolicyOperationChecker(subject, PolicyName.REPLICATION_RATE, 
PolicyOperation.READ);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.topicPolicies().getReplicatorDispatchRate(topic));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.REPLICATION_RATE, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.topicPolicies().setReplicatorDispatchRate(topic, 
DispatchRate.builder().build()));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.REPLICATION_RATE, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.topicPolicies().removeReplicatorDispatchRate(topic));
+        Assert.assertTrue(execFlag.get());
+
+        deleteTopic(topic, false);
+    }
+
+    @Test
+    @SneakyThrows
+    public void testPersistence() {
+        final String random = UUID.randomUUID().toString();
+        final String topic = "persistent://public/default/" + random;
+        final String subject =  UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        createTopic(topic, false);
+        @Cleanup
+        final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        AtomicBoolean execFlag = 
setAuthorizationPolicyOperationChecker(subject, PolicyName.PERSISTENCE, 
PolicyOperation.READ);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.topicPolicies().getPersistence(topic));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.PERSISTENCE, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.topicPolicies().setPersistence(topic, new 
PersistencePolicies()));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.PERSISTENCE, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.topicPolicies().removePersistence(topic));
+        Assert.assertTrue(execFlag.get());
+
+        deleteTopic(topic, false);
+    }
+
+    @Test
+    @SneakyThrows
+    public void testRetention() {
+        final String random = UUID.randomUUID().toString();
+        final String topic = "persistent://public/default/" + random;
+        final String subject =  UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        createTopic(topic, false);
+        @Cleanup
+        final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        AtomicBoolean execFlag = 
setAuthorizationPolicyOperationChecker(subject, PolicyName.RETENTION, 
PolicyOperation.READ);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.topicPolicies().getRetention(topic, false));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.RETENTION, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.topicPolicies().setRetention(topic, new 
RetentionPolicies()));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.RETENTION, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.topicPolicies().removeRetention(topic));
+        Assert.assertTrue(execFlag.get());
+
+        deleteTopic(topic, false);
+    }
+
+    @Test
+    @SneakyThrows
+    public void testDeduplication() {
+        final String random = UUID.randomUUID().toString();
+        final String topic = "persistent://public/default/" + random;
+        final String subject =  UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        createTopic(topic, false);
+        @Cleanup
+        final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        AtomicBoolean execFlag = 
setAuthorizationPolicyOperationChecker(subject, PolicyName.DEDUPLICATION, 
PolicyOperation.READ);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.topicPolicies().getDeduplicationStatus(topic, 
false));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.DEDUPLICATION, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.topicPolicies().setDeduplicationStatus(topic, 
false));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.DEDUPLICATION, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
subAdmin.topicPolicies().removeDeduplicationStatus(topic));
+        Assert.assertTrue(execFlag.get());
+
+        deleteTopic(topic, false);
+    }
+
+    @Test
+    @SneakyThrows
+    public void testMessageTTL() {
+        final String random = UUID.randomUUID().toString();
+        final String topic = "persistent://public/default/" + random;
+        final String subject =  UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        createTopic(topic, false);
+        @Cleanup
+        final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        AtomicBoolean execFlag = 
setAuthorizationPolicyOperationChecker(subject, PolicyName.TTL, 
PolicyOperation.READ);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.topicPolicies().getMessageTTL(topic, false));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.TTL, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.topicPolicies().setMessageTTL(topic, 2));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.TTL, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.topicPolicies().removeMessageTTL(topic));
+        Assert.assertTrue(execFlag.get());
+
+        deleteTopic(topic, false);
+    }
+
+    @Test
+    @SneakyThrows
+    public void testBacklogQuota() {
+        final String random = UUID.randomUUID().toString();
+        final String topic = "persistent://public/default/" + random;
+        final String subject =  UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        createTopic(topic, false);
+        @Cleanup
+        final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        AtomicBoolean execFlag = 
setAuthorizationPolicyOperationChecker(subject, PolicyName.BACKLOG, 
PolicyOperation.READ);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.topicPolicies().getBacklogQuotaMap(topic, 
false));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.BACKLOG, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.topicPolicies().setBacklogQuota(topic, 
BacklogQuota.builder().build()));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.BACKLOG, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.topicPolicies().removeBacklogQuota(topic));
+        Assert.assertTrue(execFlag.get());
+
+        deleteTopic(topic, false);
+    }
+
+    @Test
+    @SneakyThrows
+    public void testReplicationClusters() {
+        final String random = UUID.randomUUID().toString();
+        final String topic = "persistent://public/default/" + random;
+        final String subject =  UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+        createTopic(topic, false);
+        @Cleanup
+        final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        AtomicBoolean execFlag = 
setAuthorizationPolicyOperationChecker(subject, PolicyName.REPLICATION, 
PolicyOperation.READ);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.topics().getReplicationClusters(topic, false));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.REPLICATION, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.topics().setReplicationClusters(topic, new 
ArrayList<>()));
+        Assert.assertTrue(execFlag.get());
+
+        execFlag = setAuthorizationPolicyOperationChecker(subject, 
PolicyName.REPLICATION, PolicyOperation.WRITE);
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> subAdmin.topics().removeReplicationClusters(topic));
+        Assert.assertTrue(execFlag.get());
+
+        deleteTopic(topic, false);
     }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TransactionAndSchemaAuthZTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TransactionAndSchemaAuthZTest.java
new file mode 100644
index 00000000000..1bca6f6e308
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TransactionAndSchemaAuthZTest.java
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import io.jsonwebtoken.Jwts;
+import lombok.Cleanup;
+import lombok.SneakyThrows;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.policies.data.TopicOperation;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+@Test(groups = "broker-admin")
+public class TransactionAndSchemaAuthZTest extends AuthZTest {
+
+    @SneakyThrows
+    @BeforeClass(alwaysRun = true)
+    public void setup() {
+        configureTokenAuthentication();
+        configureDefaultAuthorization();
+        enableTransaction();
+        start();
+        createTransactionCoordinatorAssign(16);
+        this.superUserAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(SUPER_USER_TOKEN))
+                .build();
+        final TenantInfo tenantInfo = 
superUserAdmin.tenants().getTenantInfo("public");
+        tenantInfo.getAdminRoles().add(TENANT_ADMIN_SUBJECT);
+        superUserAdmin.tenants().updateTenant("public", tenantInfo);
+        this.tenantManagerAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(TENANT_ADMIN_TOKEN))
+                .build();
+
+        superUserAdmin.tenants().createTenant("pulsar", tenantInfo);
+        superUserAdmin.namespaces().createNamespace("pulsar/system");
+    }
+
+    @SneakyThrows
+    @AfterClass(alwaysRun = true)
+    public void cleanup() {
+        if (superUserAdmin != null) {
+            superUserAdmin.close();
+        }
+        if (tenantManagerAdmin != null) {
+            tenantManagerAdmin.close();
+        }
+        close();
+    }
+
+    @BeforeMethod
+    public void before() throws IllegalAccessException {
+        orignalAuthorizationService = 
getPulsarService().getBrokerService().getAuthorizationService();
+        authorizationService = Mockito.spy(orignalAuthorizationService);
+        FieldUtils.writeField(getPulsarService().getBrokerService(), 
"authorizationService",
+                authorizationService, true);
+    }
+
+    @AfterMethod
+    public void after() throws IllegalAccessException {
+        FieldUtils.writeField(getPulsarService().getBrokerService(), 
"authorizationService",
+                orignalAuthorizationService, true);
+    }
+
+    protected void createTransactionCoordinatorAssign(int numPartitionsOfTC) 
throws MetadataStoreException {
+        getPulsarService().getPulsarResources()
+                .getNamespaceResources()
+                .getPartitionedTopicResources()
+                
.createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
+                        new PartitionedTopicMetadata(numPartitionsOfTC));
+    }
+
+    public enum OperationAuthType {
+        Lookup,
+        Produce,
+        Consume,
+        AdminOrSuperUser,
+        NOAuth
+    }
+
+    private final String testTopic = "persistent://public/default/" + 
UUID.randomUUID().toString();
+    @FunctionalInterface
+    public interface ThrowingBiConsumer<T> {
+        void accept(T t) throws PulsarAdminException;
+    }
+
+    @DataProvider(name = "authFunction")
+    public Object[][] authFunction () throws Exception {
+        String sub = "my-sub";
+        createTopic(testTopic, false);
+        @Cleanup final PulsarClient pulsarClient = PulsarClient.builder()
+                .serviceUrl(getPulsarService().getBrokerServiceUrl())
+                .authentication(new AuthenticationToken(SUPER_USER_TOKEN))
+                .enableTransaction(true)
+                .build();
+        @Cleanup final Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(testTopic).create();
+
+        @Cleanup final Consumer<String> consumer = 
pulsarClient.newConsumer(Schema.STRING)
+                .topic(testTopic)
+                .subscriptionName(sub)
+                .subscribe();
+
+        Transaction transaction = 
pulsarClient.newTransaction().withTransactionTimeout(5, TimeUnit.MINUTES)
+                .build().get();
+        MessageIdImpl messageId = (MessageIdImpl) 
producer.newMessage().value("test message").send();
+
+        consumer.acknowledgeAsync(messageId, transaction).get();
+
+        return new Object[][]{
+                // SCHEMA
+                new Object[] {
+                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.schemas().getSchemaInfo(testTopic),
+                        OperationAuthType.Lookup
+                },
+                new Object[] {
+                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.schemas().getSchemaInfo(
+                                testTopic, 0),
+                        OperationAuthType.Lookup
+                },
+                new Object[] {
+                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.schemas().getAllSchemas(
+                                testTopic),
+                        OperationAuthType.Lookup
+                },
+                new Object[] {
+                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.schemas().createSchema(testTopic,
+                                
SchemaInfo.builder().type(SchemaType.STRING).build()),
+                        OperationAuthType.Produce
+                },
+                // TODO: improve the authorization check for testCompatibility 
and deleteSchema
+                new Object[] {
+                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.schemas().testCompatibility(
+                                testTopic, 
SchemaInfo.builder().type(SchemaType.STRING).build()),
+                        OperationAuthType.AdminOrSuperUser
+                },
+                new Object[] {
+                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.schemas().deleteSchema(
+                                testTopic),
+                        OperationAuthType.AdminOrSuperUser
+                },
+
+                // TRANSACTION
+
+                // Modify transaction coordinator
+                new Object[] {
+                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.transactions()
+                                .abortTransaction(transaction.getTxnID()),
+                        OperationAuthType.AdminOrSuperUser
+                },
+                new Object[] {
+                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.transactions()
+                                .scaleTransactionCoordinators(17),
+                        OperationAuthType.AdminOrSuperUser
+                },
+                // TODO: fix authorization check of check transaction 
coordinator stats.
+                // Check transaction coordinator stats
+                new Object[] {
+                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.transactions()
+                                .getCoordinatorInternalStats(1, false),
+                        OperationAuthType.NOAuth
+                },
+                new Object[] {
+                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.transactions()
+                                .getCoordinatorStats(),
+                        OperationAuthType.AdminOrSuperUser
+                },
+                new Object[] {
+                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.transactions()
+                                .getSlowTransactionsByCoordinatorId(1, 5, 
TimeUnit.SECONDS),
+                        OperationAuthType.NOAuth
+                },
+                new Object[] {
+                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.transactions()
+                                
.getTransactionMetadata(transaction.getTxnID()),
+                        OperationAuthType.NOAuth
+                },
+                new Object[] {
+                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.transactions()
+                                .listTransactionCoordinators(),
+                        OperationAuthType.NOAuth
+                },
+                new Object[] {
+                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.transactions()
+                                .getSlowTransactions(5, TimeUnit.SECONDS),
+                        OperationAuthType.AdminOrSuperUser
+                },
+
+                // TODO: Check the authorization of the topic when get stats 
of TB or TP
+                // Check stats related to transaction buffer and transaction 
pending ack
+                new Object[] {
+                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.transactions()
+                                .getPendingAckInternalStats(testTopic, sub, 
false),
+                        OperationAuthType.NOAuth
+                },
+                new Object[] {
+                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.transactions()
+                                .getPendingAckStats(testTopic, sub, false),
+                        OperationAuthType.NOAuth
+                },
+                new Object[] {
+                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.transactions()
+                                .getPositionStatsInPendingAck(testTopic, sub, 
messageId.getLedgerId(),
+                                        messageId.getEntryId(), null),
+                        OperationAuthType.NOAuth
+                },
+                new Object[] {
+                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.transactions()
+                                .getTransactionBufferInternalStats(testTopic, 
false),
+                        OperationAuthType.NOAuth
+                },
+                new Object[] {
+                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.transactions()
+                                .getTransactionBufferStats(testTopic, false),
+                        OperationAuthType.NOAuth
+                },
+                new Object[] {
+                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.transactions()
+                                .getTransactionBufferStats(testTopic, false),
+                        OperationAuthType.NOAuth
+                },
+                new Object[] {
+                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.transactions()
+                                
.getTransactionInBufferStats(transaction.getTxnID(), testTopic),
+                        OperationAuthType.NOAuth
+                },
+                new Object[] {
+                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.transactions()
+                                
.getTransactionInBufferStats(transaction.getTxnID(), testTopic),
+                        OperationAuthType.NOAuth
+                },
+                new Object[] {
+                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.transactions()
+                                
.getTransactionInPendingAckStats(transaction.getTxnID(), testTopic, sub),
+                        OperationAuthType.NOAuth
+                },
+        };
+    }
+
+    @Test(dataProvider = "authFunction")
+    public void 
testSchemaAndTransactionAuthorization(ThrowingBiConsumer<PulsarAdmin> 
adminConsumer, OperationAuthType topicOpType)
+            throws Exception {
+        final String subject =  UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+
+
+        @Cleanup
+        final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        // test tenant manager
+        if (topicOpType != OperationAuthType.AdminOrSuperUser) {
+            adminConsumer.accept(tenantManagerAdmin);
+        }
+
+        if (topicOpType != OperationAuthType.NOAuth) {
+            
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                    () -> adminConsumer.accept(subAdmin));
+        }
+
+        AtomicBoolean execFlag = null;
+        if (topicOpType == OperationAuthType.Lookup) {
+            execFlag = setAuthorizationTopicOperationChecker(subject, 
TopicOperation.LOOKUP);
+        } else if (topicOpType == OperationAuthType.Produce) {
+            execFlag = setAuthorizationTopicOperationChecker(subject, 
TopicOperation.PRODUCE);
+        } else if (topicOpType == OperationAuthType.Consume) {
+            execFlag = setAuthorizationTopicOperationChecker(subject, 
TopicOperation.CONSUME);
+        }
+
+        for (AuthAction action : AuthAction.values()) {
+            superUserAdmin.topics().grantPermission(testTopic, subject, 
Set.of(action));
+
+            if (authActionMatchOperation(topicOpType, action)) {
+                adminConsumer.accept(subAdmin);
+            } else {
+                
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                        () -> adminConsumer.accept(subAdmin));
+            }
+            superUserAdmin.topics().revokePermissions(testTopic, subject);
+        }
+
+        if (execFlag != null) {
+            Assert.assertTrue(execFlag.get());
+        }
+
+    }
+
+    private boolean authActionMatchOperation(OperationAuthType 
operationAuthType, AuthAction action) {
+        switch (operationAuthType) {
+            case Lookup -> {
+                if (AuthAction.consume == action || AuthAction.produce == 
action) {
+                    return true;
+                }
+            }
+            case Consume -> {
+                if (AuthAction.consume == action) {
+                    return true;
+                }
+            }
+            case Produce -> {
+                if (AuthAction.produce == action) {
+                    return true;
+                }
+            }
+            case AdminOrSuperUser -> {
+                return false;
+            }
+            case NOAuth -> {
+                return true;
+            }
+        }
+        return false;
+    }
+
+}

Reply via email to