This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 3a0f908e80d [improve][test] Add topic policy test for topic API (#22546) 3a0f908e80d is described below commit 3a0f908e80d0863920a1258362fd782e95fe8f17 Author: Jiwei Guo <techno...@apache.org> AuthorDate: Mon Apr 22 19:47:03 2024 +0800 [improve][test] Add topic policy test for topic API (#22546) --- .../org/apache/pulsar/broker/admin/AuthZTest.java | 113 ++ .../apache/pulsar/broker/admin/TopicAuthZTest.java | 1121 ++++++++++++++------ .../admin/TransactionAndSchemaAuthZTest.java | 359 +++++++ 3 files changed, 1270 insertions(+), 323 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AuthZTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AuthZTest.java new file mode 100644 index 00000000000..a710a03970d --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AuthZTest.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.admin; + +import io.jsonwebtoken.Jwts; +import org.apache.commons.lang3.reflect.FieldUtils; +import org.apache.pulsar.broker.authorization.AuthorizationService; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.common.policies.data.NamespaceOperation; +import org.apache.pulsar.common.policies.data.TopicOperation; +import org.apache.pulsar.security.MockedPulsarStandalone; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; +import static org.mockito.Mockito.doReturn; + +public class AuthZTest extends MockedPulsarStandalone { + + protected PulsarAdmin superUserAdmin; + + protected PulsarAdmin tenantManagerAdmin; + + protected AuthorizationService authorizationService; + + protected AuthorizationService orignalAuthorizationService; + + protected static final String TENANT_ADMIN_SUBJECT = UUID.randomUUID().toString(); + protected static final String TENANT_ADMIN_TOKEN = Jwts.builder() + .claim("sub", TENANT_ADMIN_SUBJECT).signWith(SECRET_KEY).compact(); + + @BeforeMethod(alwaysRun = true) + public void before() throws IllegalAccessException { + orignalAuthorizationService = getPulsarService().getBrokerService().getAuthorizationService(); + authorizationService = Mockito.spy(orignalAuthorizationService); + FieldUtils.writeField(getPulsarService().getBrokerService(), "authorizationService", + authorizationService, true); + } + + @AfterMethod(alwaysRun = true) + public void after() throws IllegalAccessException { + FieldUtils.writeField(getPulsarService().getBrokerService(), "authorizationService", + orignalAuthorizationService, true); + } + + protected AtomicBoolean setAuthorizationTopicOperationChecker(String role, Object operation) { + AtomicBoolean execFlag = new AtomicBoolean(false); + if (operation instanceof TopicOperation) { + Mockito.doAnswer(invocationOnMock -> { + String role_ = invocationOnMock.getArgument(2); + if (role.equals(role_)) { + TopicOperation operation_ = invocationOnMock.getArgument(1); + Assert.assertEquals(operation_, operation); + } + execFlag.set(true); + return invocationOnMock.callRealMethod(); + }).when(authorizationService).allowTopicOperationAsync(Mockito.any(), Mockito.any(), Mockito.any(), + Mockito.any(), Mockito.any()); + } else if (operation instanceof NamespaceOperation) { + doReturn(true) + .when(authorizationService).isValidOriginalPrincipal(Mockito.any(), Mockito.any(), Mockito.any()); + Mockito.doAnswer(invocationOnMock -> { + String role_ = invocationOnMock.getArgument(2); + if (role.equals(role_)) { + TopicOperation operation_ = invocationOnMock.getArgument(1); + Assert.assertEquals(operation_, operation); + } + execFlag.set(true); + return invocationOnMock.callRealMethod(); + }).when(authorizationService).allowNamespaceOperationAsync(Mockito.any(), Mockito.any(), Mockito.any(), + Mockito.any(), Mockito.any()); + } else { + throw new IllegalArgumentException(""); + } + + + return execFlag; + } + + protected void createTopic(String topic, boolean partitioned) throws Exception { + if (partitioned) { + superUserAdmin.topics().createPartitionedTopic(topic, 2); + } else { + superUserAdmin.topics().createNonPartitionedTopic(topic); + } + } + + protected void deleteTopic(String topic, boolean partitioned) throws Exception { + if (partitioned) { + superUserAdmin.topics().deletePartitionedTopic(topic, true); + } else { + superUserAdmin.topics().delete(topic, true); + } + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java index 3c0596d531f..ad47ac74a89 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java @@ -20,6 +20,7 @@ package org.apache.pulsar.broker.admin; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import io.jsonwebtoken.Jwts; import lombok.Cleanup; import lombok.SneakyThrows; @@ -38,59 +39,48 @@ import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.auth.AuthenticationToken; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.AuthAction; +import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride; +import org.apache.pulsar.common.policies.data.BacklogQuota; +import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; +import org.apache.pulsar.common.policies.data.DispatchRate; import org.apache.pulsar.common.policies.data.EntryFilters; +import org.apache.pulsar.common.policies.data.InactiveTopicPolicies; +import org.apache.pulsar.common.policies.data.OffloadPolicies; +import org.apache.pulsar.common.policies.data.PersistencePolicies; +import org.apache.pulsar.common.policies.data.PolicyName; +import org.apache.pulsar.common.policies.data.PolicyOperation; +import org.apache.pulsar.common.policies.data.PublishRate; +import org.apache.pulsar.common.policies.data.RetentionPolicies; +import org.apache.pulsar.common.policies.data.SubscribeRate; import org.apache.pulsar.common.policies.data.TenantInfo; -import org.apache.pulsar.security.MockedPulsarStandalone; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.UUID; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.pulsar.broker.authorization.AuthorizationService; -import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.transaction.Transaction; -import org.apache.pulsar.common.naming.SystemTopicNames; -import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.NamespaceOperation; import org.apache.pulsar.common.policies.data.TopicOperation; -import org.apache.pulsar.common.schema.SchemaInfo; -import org.apache.pulsar.common.schema.SchemaType; -import org.apache.pulsar.metadata.api.MetadataStoreException; import org.mockito.Mockito; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; +import static org.mockito.Mockito.doReturn; @Test(groups = "broker-admin") -public class TopicAuthZTest extends MockedPulsarStandalone { - - private PulsarAdmin superUserAdmin; - - private PulsarAdmin tenantManagerAdmin; - - private AuthorizationService authorizationService; - - private AuthorizationService orignalAuthorizationService; - - private static final String TENANT_ADMIN_SUBJECT = UUID.randomUUID().toString(); - private static final String TENANT_ADMIN_TOKEN = Jwts.builder() - .claim("sub", TENANT_ADMIN_SUBJECT).signWith(SECRET_KEY).compact(); +public class TopicAuthZTest extends AuthZTest { @SneakyThrows @BeforeClass(alwaysRun = true) public void setup() { configureTokenAuthentication(); configureDefaultAuthorization(); - enableTransaction(); start(); - createTransactionCoordinatorAssign(16); this.superUserAdmin =PulsarAdmin.builder() .serviceHttpUrl(getPulsarService().getWebServiceAddress()) .authentication(new AuthenticationToken(SUPER_USER_TOKEN)) @@ -103,16 +93,6 @@ public class TopicAuthZTest extends MockedPulsarStandalone { .authentication(new AuthenticationToken(TENANT_ADMIN_TOKEN)) .build(); - superUserAdmin.tenants().createTenant("pulsar", tenantInfo); - superUserAdmin.namespaces().createNamespace("pulsar/system"); - } - - protected void createTransactionCoordinatorAssign(int numPartitionsOfTC) throws MetadataStoreException { - getPulsarService().getPulsarResources() - .getNamespaceResources() - .getPartitionedTopicResources() - .createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, - new PartitionedTopicMetadata(numPartitionsOfTC)); } @SneakyThrows @@ -127,48 +107,28 @@ public class TopicAuthZTest extends MockedPulsarStandalone { close(); } - @BeforeMethod - public void before() throws IllegalAccessException { - orignalAuthorizationService = getPulsarService().getBrokerService().getAuthorizationService(); - authorizationService = Mockito.spy(orignalAuthorizationService); - FieldUtils.writeField(getPulsarService().getBrokerService(), "authorizationService", - authorizationService, true); - } + private AtomicBoolean setAuthorizationPolicyOperationChecker(String role, Object policyName, Object operation) { + AtomicBoolean execFlag = new AtomicBoolean(false); + if (operation instanceof PolicyOperation ) { - @AfterMethod - public void after() throws IllegalAccessException { - FieldUtils.writeField(getPulsarService().getBrokerService(), "authorizationService", - orignalAuthorizationService, true); - } + doReturn(true) + .when(authorizationService).isValidOriginalPrincipal(Mockito.any(), Mockito.any(), Mockito.any()); - private AtomicBoolean setAuthorizationTopicOperationChecker(String role, Object operation) { - AtomicBoolean execFlag = new AtomicBoolean(false); - if (operation instanceof TopicOperation) { - Mockito.doAnswer(invocationOnMock -> { - String role_ = invocationOnMock.getArgument(2); - if (role.equals(role_)) { - TopicOperation operation_ = invocationOnMock.getArgument(1); - Assert.assertEquals(operation_, operation); - } - execFlag.set(true); - return invocationOnMock.callRealMethod(); - }).when(authorizationService).allowTopicOperationAsync(Mockito.any(), Mockito.any(), Mockito.any(), - Mockito.any(), Mockito.any()); - } else if (operation instanceof NamespaceOperation) { Mockito.doAnswer(invocationOnMock -> { - String role_ = invocationOnMock.getArgument(2); - if (role.equals(role_)) { - TopicOperation operation_ = invocationOnMock.getArgument(1); - Assert.assertEquals(operation_, operation); - } - execFlag.set(true); - return invocationOnMock.callRealMethod(); - }).when(authorizationService).allowNamespaceOperationAsync(Mockito.any(), Mockito.any(), Mockito.any(), - Mockito.any(), Mockito.any()); + String role_ = invocationOnMock.getArgument(4); + if (role.equals(role_)) { + PolicyName policyName_ = invocationOnMock.getArgument(1); + PolicyOperation operation_ = invocationOnMock.getArgument(2); + Assert.assertEquals(operation_, operation); + Assert.assertEquals(policyName_, policyName); + } + execFlag.set(true); + return invocationOnMock.callRealMethod(); + }).when(authorizationService).allowTopicPolicyOperationAsync(Mockito.any(), Mockito.any(), Mockito.any(), + Mockito.any(), Mockito.any(), Mockito.any()); } else { throw new IllegalArgumentException(""); } - return execFlag; } @@ -1213,171 +1173,8 @@ public class TopicAuthZTest extends MockedPulsarStandalone { deleteTopic(topic, false); } - public enum OperationAuthType { - Lookup, - Produce, - Consume, - AdminOrSuperUser, - NOAuth - } - private final String testTopic = "persistent://public/default/" + UUID.randomUUID().toString(); - @FunctionalInterface - public interface ThrowingBiConsumer<T> { - void accept(T t) throws PulsarAdminException; - } - @DataProvider(name = "authFunction") - public Object[][] authFunction () throws Exception { - String sub = "my-sub"; - createTopic(testTopic, false); - @Cleanup final PulsarClient pulsarClient = PulsarClient.builder() - .serviceUrl(getPulsarService().getBrokerServiceUrl()) - .authentication(new AuthenticationToken(SUPER_USER_TOKEN)) - .enableTransaction(true) - .build(); - @Cleanup final Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(testTopic).create(); - - @Cleanup final Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING) - .topic(testTopic) - .subscriptionName(sub) - .subscribe(); - - Transaction transaction = pulsarClient.newTransaction().withTransactionTimeout(5, TimeUnit.MINUTES) - .build().get(); - MessageIdImpl messageId = (MessageIdImpl) producer.newMessage().value("test message").send(); - - consumer.acknowledgeAsync(messageId, transaction).get(); - - return new Object[][]{ - // SCHEMA - new Object[] { - (ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.schemas().getSchemaInfo(testTopic), - OperationAuthType.Lookup - }, - new Object[] { - (ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.schemas().getSchemaInfo( - testTopic, 0), - OperationAuthType.Lookup - }, - new Object[] { - (ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.schemas().getAllSchemas( - testTopic), - OperationAuthType.Lookup - }, - new Object[] { - (ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.schemas().createSchema(testTopic, - SchemaInfo.builder().type(SchemaType.STRING).build()), - OperationAuthType.Produce - }, - // TODO: improve the authorization check for testCompatibility and deleteSchema - new Object[] { - (ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.schemas().testCompatibility( - testTopic, SchemaInfo.builder().type(SchemaType.STRING).build()), - OperationAuthType.AdminOrSuperUser - }, - new Object[] { - (ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.schemas().deleteSchema( - testTopic), - OperationAuthType.AdminOrSuperUser - }, - - // TRANSACTION - - // Modify transaction coordinator - new Object[] { - (ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.transactions() - .abortTransaction(transaction.getTxnID()), - OperationAuthType.AdminOrSuperUser - }, - new Object[] { - (ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.transactions() - .scaleTransactionCoordinators(17), - OperationAuthType.AdminOrSuperUser - }, - // TODO: fix authorization check of check transaction coordinator stats. - // Check transaction coordinator stats - new Object[] { - (ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.transactions() - .getCoordinatorInternalStats(1, false), - OperationAuthType.NOAuth - }, - new Object[] { - (ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.transactions() - .getCoordinatorStats(), - OperationAuthType.AdminOrSuperUser - }, - new Object[] { - (ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.transactions() - .getSlowTransactionsByCoordinatorId(1, 5, TimeUnit.SECONDS), - OperationAuthType.NOAuth - }, - new Object[] { - (ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.transactions() - .getTransactionMetadata(transaction.getTxnID()), - OperationAuthType.NOAuth - }, - new Object[] { - (ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.transactions() - .listTransactionCoordinators(), - OperationAuthType.NOAuth - }, - new Object[] { - (ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.transactions() - .getSlowTransactions(5, TimeUnit.SECONDS), - OperationAuthType.AdminOrSuperUser - }, - - // TODO: Check the authorization of the topic when get stats of TB or TP - // Check stats related to transaction buffer and transaction pending ack - new Object[] { - (ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.transactions() - .getPendingAckInternalStats(testTopic, sub, false), - OperationAuthType.NOAuth - }, - new Object[] { - (ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.transactions() - .getPendingAckStats(testTopic, sub, false), - OperationAuthType.NOAuth - }, - new Object[] { - (ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.transactions() - .getPositionStatsInPendingAck(testTopic, sub, messageId.getLedgerId(), - messageId.getEntryId(), null), - OperationAuthType.NOAuth - }, - new Object[] { - (ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.transactions() - .getTransactionBufferInternalStats(testTopic, false), - OperationAuthType.NOAuth - }, - new Object[] { - (ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.transactions() - .getTransactionBufferStats(testTopic, false), - OperationAuthType.NOAuth - }, - new Object[] { - (ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.transactions() - .getTransactionBufferStats(testTopic, false), - OperationAuthType.NOAuth - }, - new Object[] { - (ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.transactions() - .getTransactionInBufferStats(transaction.getTxnID(), testTopic), - OperationAuthType.NOAuth - }, - new Object[] { - (ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.transactions() - .getTransactionInBufferStats(transaction.getTxnID(), testTopic), - OperationAuthType.NOAuth - }, - new Object[] { - (ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.transactions() - .getTransactionInPendingAckStats(transaction.getTxnID(), testTopic, sub), - OperationAuthType.NOAuth - }, - }; - } @Test @SneakyThrows @@ -1410,82 +1207,7 @@ public class TopicAuthZTest extends MockedPulsarStandalone { deleteTopic(topic, false); } - @Test(dataProvider = "authFunction") - public void testSchemaAndTransactionAuthorization(ThrowingBiConsumer<PulsarAdmin> adminConsumer, OperationAuthType topicOpType) - throws Exception { - final String subject = UUID.randomUUID().toString(); - final String token = Jwts.builder() - .claim("sub", subject).signWith(SECRET_KEY).compact(); - - - @Cleanup - final PulsarAdmin subAdmin = PulsarAdmin.builder() - .serviceHttpUrl(getPulsarService().getWebServiceAddress()) - .authentication(new AuthenticationToken(token)) - .build(); - // test tenant manager - if (topicOpType != OperationAuthType.AdminOrSuperUser) { - adminConsumer.accept(tenantManagerAdmin); - } - - if (topicOpType != OperationAuthType.NOAuth) { - Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, - () -> adminConsumer.accept(subAdmin)); - } - - AtomicBoolean execFlag = null; - if (topicOpType == OperationAuthType.Lookup) { - execFlag = setAuthorizationTopicOperationChecker(subject, TopicOperation.LOOKUP); - } else if (topicOpType == OperationAuthType.Produce) { - execFlag = setAuthorizationTopicOperationChecker(subject, TopicOperation.PRODUCE); - } else if (topicOpType == OperationAuthType.Consume) { - execFlag = setAuthorizationTopicOperationChecker(subject, TopicOperation.CONSUME); - } - - for (AuthAction action : AuthAction.values()) { - superUserAdmin.topics().grantPermission(testTopic, subject, Set.of(action)); - - if (authActionMatchOperation(topicOpType, action)) { - adminConsumer.accept(subAdmin); - } else { - Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, - () -> adminConsumer.accept(subAdmin)); - } - superUserAdmin.topics().revokePermissions(testTopic, subject); - } - - if (execFlag != null) { - Assert.assertTrue(execFlag.get()); - } - - } - private boolean authActionMatchOperation(OperationAuthType operationAuthType, AuthAction action) { - switch (operationAuthType) { - case Lookup -> { - if (AuthAction.consume == action || AuthAction.produce == action) { - return true; - } - } - case Consume -> { - if (AuthAction.consume == action) { - return true; - } - } - case Produce -> { - if (AuthAction.produce == action) { - return true; - } - } - case AdminOrSuperUser -> { - return false; - } - case NOAuth -> { - return true; - } - } - return false; - } @Test @SneakyThrows @@ -1507,8 +1229,10 @@ public class TopicAuthZTest extends MockedPulsarStandalone { // test tenant manager tenantManagerAdmin.topicPolicies().getEntryFiltersPerTopic(topic, true); + AtomicBoolean execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.ENTRY_FILTERS, PolicyOperation.READ); Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, () -> subAdmin.topicPolicies().getEntryFiltersPerTopic(topic, false)); + Assert.assertTrue(execFlag.get()); for (AuthAction action : AuthAction.values()) { superUserAdmin.topics().grantPermission(topic, subject, Set.of(action)); @@ -1553,8 +1277,10 @@ public class TopicAuthZTest extends MockedPulsarStandalone { // test tenant manager tenantManagerAdmin.topicPolicies().setEntryFiltersPerTopic(topic, entryFilter); + AtomicBoolean execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.ENTRY_FILTERS, PolicyOperation.WRITE); Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, () -> subAdmin.topicPolicies().setEntryFiltersPerTopic(topic, entryFilter)); + Assert.assertTrue(execFlag.get()); for (AuthAction action : AuthAction.values()) { superUserAdmin.topics().grantPermission(topic, subject, Set.of(action)); @@ -1656,19 +1382,768 @@ public class TopicAuthZTest extends MockedPulsarStandalone { deleteTopic(topic, false); } - private void createTopic(String topic, boolean partitioned) throws Exception { - if (partitioned) { - superUserAdmin.topics().createPartitionedTopic(topic, 2); - } else { - superUserAdmin.topics().createNonPartitionedTopic(topic); - } + @Test + @SneakyThrows + public void testList() { + final String random = UUID.randomUUID().toString(); + final String topic = "persistent://public/default/" + random; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + createTopic(topic, false); + @Cleanup + final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + AtomicBoolean execFlag = setAuthorizationTopicOperationChecker(subject, NamespaceOperation.GET_TOPICS); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topics().getList("public/default")); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationTopicOperationChecker(subject, NamespaceOperation.GET_TOPICS); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topics().getPartitionedTopicList("public/default")); + Assert.assertTrue(execFlag.get()); + + deleteTopic(topic, false); } - private void deleteTopic(String topic, boolean partitioned) throws Exception { - if (partitioned) { - superUserAdmin.topics().deletePartitionedTopic(topic, true); - } else { - superUserAdmin.topics().delete(topic, true); - } + @Test + @SneakyThrows + public void testPermissionsOnTopic() { + final String random = UUID.randomUUID().toString(); + final String topic = "persistent://public/default/" + random; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + createTopic(topic, false); + @Cleanup + final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + // + superUserAdmin.topics().getPermissions(topic); + superUserAdmin.topics().grantPermission(topic, subject, Sets.newHashSet(AuthAction.functions)); + superUserAdmin.topics().revokePermissions(topic, subject); + + // test tenant manager + tenantManagerAdmin.topics().getPermissions(topic); + tenantManagerAdmin.topics().grantPermission(topic, subject, Sets.newHashSet(AuthAction.functions)); + tenantManagerAdmin.topics().revokePermissions(topic, subject); + + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topics().getPermissions(topic)); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topics().grantPermission(topic, subject, Sets.newHashSet(AuthAction.functions))); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topics().revokePermissions(topic, subject)); + + deleteTopic(topic, false); + } + + @Test + @SneakyThrows + public void testOffloadPolicies() { + final String random = UUID.randomUUID().toString(); + final String topic = "persistent://public/default/" + random; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + createTopic(topic, false); + @Cleanup + final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + AtomicBoolean execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.OFFLOAD, PolicyOperation.READ); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().getOffloadPolicies(topic)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.OFFLOAD, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().setOffloadPolicies(topic, OffloadPolicies.builder().build())); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.OFFLOAD, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().removeOffloadPolicies(topic)); + Assert.assertTrue(execFlag.get()); + + deleteTopic(topic, false); + } + + @Test + @SneakyThrows + public void testMaxUnackedMessagesOnConsumer() { + final String random = UUID.randomUUID().toString(); + final String topic = "persistent://public/default/" + random; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + createTopic(topic, false); + @Cleanup + final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + AtomicBoolean execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.MAX_UNACKED, PolicyOperation.READ); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().getMaxUnackedMessagesOnConsumer(topic)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.MAX_UNACKED, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().setMaxUnackedMessagesOnConsumer(topic, 2)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.MAX_UNACKED, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().removeMaxUnackedMessagesOnConsumer(topic)); + Assert.assertTrue(execFlag.get()); + + deleteTopic(topic, false); + } + + @Test + @SneakyThrows + public void testDeduplicationSnapshotInterval() { + final String random = UUID.randomUUID().toString(); + final String topic = "persistent://public/default/" + random; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + createTopic(topic, false); + @Cleanup + final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + AtomicBoolean execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.DEDUPLICATION_SNAPSHOT, PolicyOperation.READ); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().getDeduplicationSnapshotInterval(topic)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.DEDUPLICATION_SNAPSHOT, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().setDeduplicationSnapshotInterval(topic, 2)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.DEDUPLICATION_SNAPSHOT, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().removeDeduplicationSnapshotInterval(topic)); + Assert.assertTrue(execFlag.get()); + + deleteTopic(topic, false); + } + + @Test + @SneakyThrows + public void testInactiveTopicPolicies() { + final String random = UUID.randomUUID().toString(); + final String topic = "persistent://public/default/" + random; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + createTopic(topic, false); + @Cleanup + final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + AtomicBoolean execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.INACTIVE_TOPIC, PolicyOperation.READ); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().getInactiveTopicPolicies(topic)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.INACTIVE_TOPIC, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().setInactiveTopicPolicies(topic, new InactiveTopicPolicies())); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.INACTIVE_TOPIC, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().removeInactiveTopicPolicies(topic)); + Assert.assertTrue(execFlag.get()); + + deleteTopic(topic, false); + } + + @Test + @SneakyThrows + public void testMaxUnackedMessagesOnSubscription() { + final String random = UUID.randomUUID().toString(); + final String topic = "persistent://public/default/" + random; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + createTopic(topic, false); + @Cleanup + final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + AtomicBoolean execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.MAX_UNACKED, PolicyOperation.READ); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().getMaxUnackedMessagesOnSubscription(topic)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.MAX_UNACKED, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().setMaxUnackedMessagesOnSubscription(topic, 2)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.MAX_UNACKED, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().removeMaxUnackedMessagesOnSubscription(topic)); + Assert.assertTrue(execFlag.get()); + + deleteTopic(topic, false); + } + + @Test + @SneakyThrows + public void testDelayedDeliveryPolicies() { + final String random = UUID.randomUUID().toString(); + final String topic = "persistent://public/default/" + random; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + createTopic(topic, false); + @Cleanup + final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + AtomicBoolean execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.DELAYED_DELIVERY, PolicyOperation.READ); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().getDelayedDeliveryPolicy(topic)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.DELAYED_DELIVERY, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().setDelayedDeliveryPolicy(topic, DelayedDeliveryPolicies.builder().build())); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.DELAYED_DELIVERY, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().removeDelayedDeliveryPolicy(topic)); + Assert.assertTrue(execFlag.get()); + + deleteTopic(topic, false); + } + + @Test + @SneakyThrows + public void testAutoSubscriptionCreation() { + final String random = UUID.randomUUID().toString(); + final String topic = "persistent://public/default/" + random; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + createTopic(topic, false); + @Cleanup + final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + AtomicBoolean execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.AUTO_SUBSCRIPTION_CREATION, PolicyOperation.READ); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().getAutoSubscriptionCreation(topic, false)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.AUTO_SUBSCRIPTION_CREATION, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().setAutoSubscriptionCreation(topic, AutoSubscriptionCreationOverride.builder().build())); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.AUTO_SUBSCRIPTION_CREATION, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().removeAutoSubscriptionCreation(topic)); + Assert.assertTrue(execFlag.get()); + + deleteTopic(topic, false); + } + + @Test + @SneakyThrows + public void testSubscribeRate() { + final String random = UUID.randomUUID().toString(); + final String topic = "persistent://public/default/" + random; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + createTopic(topic, false); + @Cleanup + final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + AtomicBoolean execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.RATE, PolicyOperation.READ); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().getSubscribeRate(topic)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.RATE, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().setSubscribeRate(topic, new SubscribeRate())); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.RATE, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().removeSubscribeRate(topic)); + Assert.assertTrue(execFlag.get()); + + deleteTopic(topic, false); + } + + @Test + @SneakyThrows + public void testSubscriptionTypesEnabled() { + final String random = UUID.randomUUID().toString(); + final String topic = "persistent://public/default/" + random; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + createTopic(topic, false); + @Cleanup + final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + AtomicBoolean execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.SUBSCRIPTION_AUTH_MODE, PolicyOperation.READ); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().getSubscriptionTypesEnabled(topic)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.SUBSCRIPTION_AUTH_MODE, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().setSubscriptionTypesEnabled(topic, new HashSet<>())); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.SUBSCRIPTION_AUTH_MODE, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().removeSubscriptionTypesEnabled(topic)); + Assert.assertTrue(execFlag.get()); + + deleteTopic(topic, false); + } + + @Test + @SneakyThrows + public void testPublishRate() { + final String random = UUID.randomUUID().toString(); + final String topic = "persistent://public/default/" + random; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + createTopic(topic, false); + @Cleanup + final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + AtomicBoolean execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.RATE, PolicyOperation.READ); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().getPublishRate(topic)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.RATE, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().setPublishRate(topic, new PublishRate())); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.RATE, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().removePublishRate(topic)); + Assert.assertTrue(execFlag.get()); + + deleteTopic(topic, false); + } + + @Test + @SneakyThrows + public void testMaxConsumersPerSubscription() { + final String random = UUID.randomUUID().toString(); + final String topic = "persistent://public/default/" + random; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + createTopic(topic, false); + @Cleanup + final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + AtomicBoolean execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.MAX_CONSUMERS, PolicyOperation.READ); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().getMaxConsumersPerSubscription(topic)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.MAX_CONSUMERS, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().setMaxConsumersPerSubscription(topic, 2)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.MAX_CONSUMERS, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().removeMaxConsumersPerSubscription(topic)); + Assert.assertTrue(execFlag.get()); + + deleteTopic(topic, false); + } + + @Test + @SneakyThrows + public void testCompactionThreshold() { + final String random = UUID.randomUUID().toString(); + final String topic = "persistent://public/default/" + random; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + createTopic(topic, false); + @Cleanup + final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + AtomicBoolean execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.COMPACTION, PolicyOperation.READ); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().getCompactionThreshold(topic)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.COMPACTION, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().setCompactionThreshold(topic, 20000)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.COMPACTION, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().removeCompactionThreshold(topic)); + Assert.assertTrue(execFlag.get()); + + deleteTopic(topic, false); + } + + @Test + @SneakyThrows + public void testDispatchRate() { + final String random = UUID.randomUUID().toString(); + final String topic = "persistent://public/default/" + random; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + createTopic(topic, false); + @Cleanup + final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + AtomicBoolean execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.RATE, PolicyOperation.READ); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().getDispatchRate(topic)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.RATE, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().setDispatchRate(topic, DispatchRate.builder().build())); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.RATE, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().removeDispatchRate(topic)); + Assert.assertTrue(execFlag.get()); + + deleteTopic(topic, false); + } + + @Test + @SneakyThrows + public void testMaxConsumers() { + final String random = UUID.randomUUID().toString(); + final String topic = "persistent://public/default/" + random; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + createTopic(topic, false); + @Cleanup + final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + AtomicBoolean execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.MAX_CONSUMERS, PolicyOperation.READ); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().getMaxConsumers(topic)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.MAX_CONSUMERS, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().setMaxConsumers(topic, 2)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.MAX_CONSUMERS, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().removeMaxConsumers(topic)); + Assert.assertTrue(execFlag.get()); + + deleteTopic(topic, false); + } + + @Test + @SneakyThrows + public void testMaxProducers() { + final String random = UUID.randomUUID().toString(); + final String topic = "persistent://public/default/" + random; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + createTopic(topic, false); + @Cleanup + final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + AtomicBoolean execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.MAX_PRODUCERS, PolicyOperation.READ); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().getMaxProducers(topic)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.MAX_PRODUCERS, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().setMaxProducers(topic, 2)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.MAX_PRODUCERS, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().removeMaxProducers(topic)); + Assert.assertTrue(execFlag.get()); + + deleteTopic(topic, false); + } + + @Test + @SneakyThrows + public void testReplicatorDispatchRate() { + final String random = UUID.randomUUID().toString(); + final String topic = "persistent://public/default/" + random; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + createTopic(topic, false); + @Cleanup + final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + AtomicBoolean execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.REPLICATION_RATE, PolicyOperation.READ); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().getReplicatorDispatchRate(topic)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.REPLICATION_RATE, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().setReplicatorDispatchRate(topic, DispatchRate.builder().build())); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.REPLICATION_RATE, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().removeReplicatorDispatchRate(topic)); + Assert.assertTrue(execFlag.get()); + + deleteTopic(topic, false); + } + + @Test + @SneakyThrows + public void testPersistence() { + final String random = UUID.randomUUID().toString(); + final String topic = "persistent://public/default/" + random; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + createTopic(topic, false); + @Cleanup + final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + AtomicBoolean execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.PERSISTENCE, PolicyOperation.READ); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().getPersistence(topic)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.PERSISTENCE, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().setPersistence(topic, new PersistencePolicies())); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.PERSISTENCE, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().removePersistence(topic)); + Assert.assertTrue(execFlag.get()); + + deleteTopic(topic, false); + } + + @Test + @SneakyThrows + public void testRetention() { + final String random = UUID.randomUUID().toString(); + final String topic = "persistent://public/default/" + random; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + createTopic(topic, false); + @Cleanup + final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + AtomicBoolean execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.RETENTION, PolicyOperation.READ); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().getRetention(topic, false)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.RETENTION, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().setRetention(topic, new RetentionPolicies())); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.RETENTION, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().removeRetention(topic)); + Assert.assertTrue(execFlag.get()); + + deleteTopic(topic, false); + } + + @Test + @SneakyThrows + public void testDeduplication() { + final String random = UUID.randomUUID().toString(); + final String topic = "persistent://public/default/" + random; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + createTopic(topic, false); + @Cleanup + final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + AtomicBoolean execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.DEDUPLICATION, PolicyOperation.READ); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().getDeduplicationStatus(topic, false)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.DEDUPLICATION, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().setDeduplicationStatus(topic, false)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.DEDUPLICATION, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().removeDeduplicationStatus(topic)); + Assert.assertTrue(execFlag.get()); + + deleteTopic(topic, false); + } + + @Test + @SneakyThrows + public void testMessageTTL() { + final String random = UUID.randomUUID().toString(); + final String topic = "persistent://public/default/" + random; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + createTopic(topic, false); + @Cleanup + final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + AtomicBoolean execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.TTL, PolicyOperation.READ); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().getMessageTTL(topic, false)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.TTL, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().setMessageTTL(topic, 2)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.TTL, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().removeMessageTTL(topic)); + Assert.assertTrue(execFlag.get()); + + deleteTopic(topic, false); + } + + @Test + @SneakyThrows + public void testBacklogQuota() { + final String random = UUID.randomUUID().toString(); + final String topic = "persistent://public/default/" + random; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + createTopic(topic, false); + @Cleanup + final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + AtomicBoolean execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.BACKLOG, PolicyOperation.READ); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().getBacklogQuotaMap(topic, false)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.BACKLOG, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().setBacklogQuota(topic, BacklogQuota.builder().build())); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.BACKLOG, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().removeBacklogQuota(topic)); + Assert.assertTrue(execFlag.get()); + + deleteTopic(topic, false); + } + + @Test + @SneakyThrows + public void testReplicationClusters() { + final String random = UUID.randomUUID().toString(); + final String topic = "persistent://public/default/" + random; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + createTopic(topic, false); + @Cleanup + final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + AtomicBoolean execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.REPLICATION, PolicyOperation.READ); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topics().getReplicationClusters(topic, false)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.REPLICATION, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topics().setReplicationClusters(topic, new ArrayList<>())); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.REPLICATION, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topics().removeReplicationClusters(topic)); + Assert.assertTrue(execFlag.get()); + + deleteTopic(topic, false); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TransactionAndSchemaAuthZTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TransactionAndSchemaAuthZTest.java new file mode 100644 index 00000000000..1bca6f6e308 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TransactionAndSchemaAuthZTest.java @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.admin; + +import io.jsonwebtoken.Jwts; +import lombok.Cleanup; +import lombok.SneakyThrows; +import org.apache.commons.lang3.reflect.FieldUtils; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.transaction.Transaction; +import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.client.impl.auth.AuthenticationToken; +import org.apache.pulsar.common.naming.SystemTopicNames; +import org.apache.pulsar.common.partition.PartitionedTopicMetadata; +import org.apache.pulsar.common.policies.data.AuthAction; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.common.policies.data.TopicOperation; +import org.apache.pulsar.common.schema.SchemaInfo; +import org.apache.pulsar.common.schema.SchemaType; +import org.apache.pulsar.metadata.api.MetadataStoreException; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +@Test(groups = "broker-admin") +public class TransactionAndSchemaAuthZTest extends AuthZTest { + + @SneakyThrows + @BeforeClass(alwaysRun = true) + public void setup() { + configureTokenAuthentication(); + configureDefaultAuthorization(); + enableTransaction(); + start(); + createTransactionCoordinatorAssign(16); + this.superUserAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(SUPER_USER_TOKEN)) + .build(); + final TenantInfo tenantInfo = superUserAdmin.tenants().getTenantInfo("public"); + tenantInfo.getAdminRoles().add(TENANT_ADMIN_SUBJECT); + superUserAdmin.tenants().updateTenant("public", tenantInfo); + this.tenantManagerAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(TENANT_ADMIN_TOKEN)) + .build(); + + superUserAdmin.tenants().createTenant("pulsar", tenantInfo); + superUserAdmin.namespaces().createNamespace("pulsar/system"); + } + + @SneakyThrows + @AfterClass(alwaysRun = true) + public void cleanup() { + if (superUserAdmin != null) { + superUserAdmin.close(); + } + if (tenantManagerAdmin != null) { + tenantManagerAdmin.close(); + } + close(); + } + + @BeforeMethod + public void before() throws IllegalAccessException { + orignalAuthorizationService = getPulsarService().getBrokerService().getAuthorizationService(); + authorizationService = Mockito.spy(orignalAuthorizationService); + FieldUtils.writeField(getPulsarService().getBrokerService(), "authorizationService", + authorizationService, true); + } + + @AfterMethod + public void after() throws IllegalAccessException { + FieldUtils.writeField(getPulsarService().getBrokerService(), "authorizationService", + orignalAuthorizationService, true); + } + + protected void createTransactionCoordinatorAssign(int numPartitionsOfTC) throws MetadataStoreException { + getPulsarService().getPulsarResources() + .getNamespaceResources() + .getPartitionedTopicResources() + .createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, + new PartitionedTopicMetadata(numPartitionsOfTC)); + } + + public enum OperationAuthType { + Lookup, + Produce, + Consume, + AdminOrSuperUser, + NOAuth + } + + private final String testTopic = "persistent://public/default/" + UUID.randomUUID().toString(); + @FunctionalInterface + public interface ThrowingBiConsumer<T> { + void accept(T t) throws PulsarAdminException; + } + + @DataProvider(name = "authFunction") + public Object[][] authFunction () throws Exception { + String sub = "my-sub"; + createTopic(testTopic, false); + @Cleanup final PulsarClient pulsarClient = PulsarClient.builder() + .serviceUrl(getPulsarService().getBrokerServiceUrl()) + .authentication(new AuthenticationToken(SUPER_USER_TOKEN)) + .enableTransaction(true) + .build(); + @Cleanup final Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(testTopic).create(); + + @Cleanup final Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(testTopic) + .subscriptionName(sub) + .subscribe(); + + Transaction transaction = pulsarClient.newTransaction().withTransactionTimeout(5, TimeUnit.MINUTES) + .build().get(); + MessageIdImpl messageId = (MessageIdImpl) producer.newMessage().value("test message").send(); + + consumer.acknowledgeAsync(messageId, transaction).get(); + + return new Object[][]{ + // SCHEMA + new Object[] { + (ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.schemas().getSchemaInfo(testTopic), + OperationAuthType.Lookup + }, + new Object[] { + (ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.schemas().getSchemaInfo( + testTopic, 0), + OperationAuthType.Lookup + }, + new Object[] { + (ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.schemas().getAllSchemas( + testTopic), + OperationAuthType.Lookup + }, + new Object[] { + (ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.schemas().createSchema(testTopic, + SchemaInfo.builder().type(SchemaType.STRING).build()), + OperationAuthType.Produce + }, + // TODO: improve the authorization check for testCompatibility and deleteSchema + new Object[] { + (ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.schemas().testCompatibility( + testTopic, SchemaInfo.builder().type(SchemaType.STRING).build()), + OperationAuthType.AdminOrSuperUser + }, + new Object[] { + (ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.schemas().deleteSchema( + testTopic), + OperationAuthType.AdminOrSuperUser + }, + + // TRANSACTION + + // Modify transaction coordinator + new Object[] { + (ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.transactions() + .abortTransaction(transaction.getTxnID()), + OperationAuthType.AdminOrSuperUser + }, + new Object[] { + (ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.transactions() + .scaleTransactionCoordinators(17), + OperationAuthType.AdminOrSuperUser + }, + // TODO: fix authorization check of check transaction coordinator stats. + // Check transaction coordinator stats + new Object[] { + (ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.transactions() + .getCoordinatorInternalStats(1, false), + OperationAuthType.NOAuth + }, + new Object[] { + (ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.transactions() + .getCoordinatorStats(), + OperationAuthType.AdminOrSuperUser + }, + new Object[] { + (ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.transactions() + .getSlowTransactionsByCoordinatorId(1, 5, TimeUnit.SECONDS), + OperationAuthType.NOAuth + }, + new Object[] { + (ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.transactions() + .getTransactionMetadata(transaction.getTxnID()), + OperationAuthType.NOAuth + }, + new Object[] { + (ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.transactions() + .listTransactionCoordinators(), + OperationAuthType.NOAuth + }, + new Object[] { + (ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.transactions() + .getSlowTransactions(5, TimeUnit.SECONDS), + OperationAuthType.AdminOrSuperUser + }, + + // TODO: Check the authorization of the topic when get stats of TB or TP + // Check stats related to transaction buffer and transaction pending ack + new Object[] { + (ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.transactions() + .getPendingAckInternalStats(testTopic, sub, false), + OperationAuthType.NOAuth + }, + new Object[] { + (ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.transactions() + .getPendingAckStats(testTopic, sub, false), + OperationAuthType.NOAuth + }, + new Object[] { + (ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.transactions() + .getPositionStatsInPendingAck(testTopic, sub, messageId.getLedgerId(), + messageId.getEntryId(), null), + OperationAuthType.NOAuth + }, + new Object[] { + (ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.transactions() + .getTransactionBufferInternalStats(testTopic, false), + OperationAuthType.NOAuth + }, + new Object[] { + (ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.transactions() + .getTransactionBufferStats(testTopic, false), + OperationAuthType.NOAuth + }, + new Object[] { + (ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.transactions() + .getTransactionBufferStats(testTopic, false), + OperationAuthType.NOAuth + }, + new Object[] { + (ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.transactions() + .getTransactionInBufferStats(transaction.getTxnID(), testTopic), + OperationAuthType.NOAuth + }, + new Object[] { + (ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.transactions() + .getTransactionInBufferStats(transaction.getTxnID(), testTopic), + OperationAuthType.NOAuth + }, + new Object[] { + (ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.transactions() + .getTransactionInPendingAckStats(transaction.getTxnID(), testTopic, sub), + OperationAuthType.NOAuth + }, + }; + } + + @Test(dataProvider = "authFunction") + public void testSchemaAndTransactionAuthorization(ThrowingBiConsumer<PulsarAdmin> adminConsumer, OperationAuthType topicOpType) + throws Exception { + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + + + @Cleanup + final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + // test tenant manager + if (topicOpType != OperationAuthType.AdminOrSuperUser) { + adminConsumer.accept(tenantManagerAdmin); + } + + if (topicOpType != OperationAuthType.NOAuth) { + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> adminConsumer.accept(subAdmin)); + } + + AtomicBoolean execFlag = null; + if (topicOpType == OperationAuthType.Lookup) { + execFlag = setAuthorizationTopicOperationChecker(subject, TopicOperation.LOOKUP); + } else if (topicOpType == OperationAuthType.Produce) { + execFlag = setAuthorizationTopicOperationChecker(subject, TopicOperation.PRODUCE); + } else if (topicOpType == OperationAuthType.Consume) { + execFlag = setAuthorizationTopicOperationChecker(subject, TopicOperation.CONSUME); + } + + for (AuthAction action : AuthAction.values()) { + superUserAdmin.topics().grantPermission(testTopic, subject, Set.of(action)); + + if (authActionMatchOperation(topicOpType, action)) { + adminConsumer.accept(subAdmin); + } else { + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> adminConsumer.accept(subAdmin)); + } + superUserAdmin.topics().revokePermissions(testTopic, subject); + } + + if (execFlag != null) { + Assert.assertTrue(execFlag.get()); + } + + } + + private boolean authActionMatchOperation(OperationAuthType operationAuthType, AuthAction action) { + switch (operationAuthType) { + case Lookup -> { + if (AuthAction.consume == action || AuthAction.produce == action) { + return true; + } + } + case Consume -> { + if (AuthAction.consume == action) { + return true; + } + } + case Produce -> { + if (AuthAction.produce == action) { + return true; + } + } + case AdminOrSuperUser -> { + return false; + } + case NOAuth -> { + return true; + } + } + return false; + } + +}