This is an automated email from the ASF dual-hosted git repository. mmarshall pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit bbe65290a64d34f33f22f66c4f8adf46b28359a2 Author: Ruguo Yu <jiang7chengz...@163.com> AuthorDate: Fri Nov 26 18:42:31 2021 +0800 [Authorization] Support CLEAR_BACKLOG namespace op after enable auth (#12963) (cherry picked from commit 64af8df83b7463d7e9231ddabc603705f15d30d6) --- .../authorization/PulsarAuthorizationProvider.java | 1 + .../api/AuthorizationProducerConsumerTest.java | 78 +++++++++++++++++++++- 2 files changed, 78 insertions(+), 1 deletion(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java index 411c253..7be0cac 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java @@ -547,6 +547,7 @@ public class PulsarAuthorizationProvider implements AuthorizationProvider { break; case GET_TOPICS: case UNSUBSCRIBE: + case CLEAR_BACKLOG: isAuthorizedFuture = allowConsumeOpsAsync(namespaceName, role, authData); break; default: diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java index 1af36f5..603a816 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java @@ -267,7 +267,7 @@ public class AuthorizationProducerConsumerTest extends ProducerConsumerBase { Collections.singleton(AuthAction.consume)); // now, subscriptionRole have consume authorization on namespace, so it will successfully unsubscribe namespace - superAdmin.namespaces().unsubscribeNamespaceBundle(namespace, "0x00000000_0xffffffff", subscriptionName2); + sub1Admin.namespaces().unsubscribeNamespaceBundle(namespace, "0x00000000_0xffffffff", subscriptionName2); subscriptions = sub1Admin.topics().getSubscriptions(topicName); assertEquals(subscriptions.size(), 1); @@ -323,6 +323,82 @@ public class AuthorizationProducerConsumerTest extends ProducerConsumerBase { } @Test + public void testClearBacklogPermission() throws Exception { + log.info("-- Starting {} test --", methodName); + + conf.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName()); + setup(); + + final String subscriptionRole = "sub-role"; + final String subscriptionName = "sub1"; + final String namespace = "my-property/my-ns-sub-auth"; + final String topicName = "persistent://" + namespace + "/my-topic"; + Authentication adminAuthentication = new ClientAuthentication("superUser"); + + clientAuthProviderSupportedRoles.add(subscriptionRole); + + @Cleanup + PulsarAdmin superAdmin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString()) + .authentication(adminAuthentication).build()); + + Authentication subAdminAuthentication = new ClientAuthentication(subscriptionRole); + @Cleanup + PulsarAdmin sub1Admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString()) + .authentication(subAdminAuthentication).build()); + + superAdmin.clusters().createCluster("test", + ClusterData.builder().serviceUrl(brokerUrl.toString()).build()); + superAdmin.tenants().createTenant("my-property", + new TenantInfoImpl(Sets.newHashSet(), Sets.newHashSet("test"))); + superAdmin.namespaces().createNamespace(namespace, Sets.newHashSet("test")); + superAdmin.topics().createPartitionedTopic(topicName, 1); + + // grant topic consume&produce authorization to the subscriptionRole + superAdmin.topics().grantPermission(topicName, subscriptionRole, + Sets.newHashSet(AuthAction.produce, AuthAction.consume)); + replacePulsarClient(PulsarClient.builder() + .serviceUrl(pulsar.getBrokerServiceUrl()) + .authentication(subAdminAuthentication)); + + @Cleanup + Producer<byte[]> batchProducer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .create(); + + @Cleanup + Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscriptionName(subscriptionName) + .subscribe(); + + CompletableFuture<MessageId> completableFuture = new CompletableFuture<>(); + for (int i = 0; i < 10; i++) { + completableFuture = batchProducer.sendAsync("a".getBytes()); + } + completableFuture.get(); + assertEquals(sub1Admin.topics().getStats(topicName + "-partition-0").getSubscriptions() + .get(subscriptionName).getMsgBacklog(), 10); + + // subscriptionRole doesn't have namespace-level authorization, so it will fail to clear backlog + try { + sub1Admin.namespaces().clearNamespaceBundleBacklog(namespace, "0x00000000_0xffffffff"); + fail("should have failed with authorization exception"); + } catch (Exception e) { + assertTrue(e.getMessage().startsWith( + "Unauthorized to validateNamespaceOperation for operation [CLEAR_BACKLOG]")); + } + + superAdmin.namespaces().grantPermissionOnNamespace(namespace, subscriptionRole, + Sets.newHashSet(AuthAction.consume)); + // now, subscriptionRole have consume authorization on namespace, so it will successfully clear backlog + sub1Admin.namespaces().clearNamespaceBundleBacklog(namespace, "0x00000000_0xffffffff"); + assertEquals(sub1Admin.topics().getStats(topicName + "-partition-0").getSubscriptions() + .get(subscriptionName).getMsgBacklog(), 0); + + log.info("-- Exiting {} test --", methodName); + } + + @Test public void testSubscriptionPrefixAuthorization() throws Exception { log.info("-- Starting {} test --", methodName);