This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9555504ee2c [improve][admin] Add authorization test for schema and 
align auth for transaction (#22399)
9555504ee2c is described below

commit 9555504ee2c7adf9febddc585a699a1fdb724013
Author: Xiangying Meng <[email protected]>
AuthorDate: Tue Apr 9 22:43:35 2024 +0800

    [improve][admin] Add authorization test for schema and align auth for 
transaction (#22399)
---
 .../apache/pulsar/broker/admin/TopicAuthZTest.java | 249 +++++++++++++++++++++
 .../pulsar/security/MockedPulsarStandalone.java    |   4 +-
 2 files changed, 252 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java
index 2e75b59ec85..d09bc0a3ffd 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java
@@ -20,19 +20,27 @@
 package org.apache.pulsar.broker.admin;
 
 import io.jsonwebtoken.Jwts;
+import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import lombok.SneakyThrows;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.transaction.Transaction;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.security.MockedPulsarStandalone;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -62,7 +70,9 @@ public class TopicAuthZTest extends MockedPulsarStandalone {
     public void before() {
         configureTokenAuthentication();
         configureDefaultAuthorization();
+        enableTransaction();
         start();
+        createTransactionCoordinatorAssign(16);
         this.superUserAdmin =PulsarAdmin.builder()
                 .serviceHttpUrl(getPulsarService().getWebServiceAddress())
                 .authentication(new AuthenticationToken(SUPER_USER_TOKEN))
@@ -74,8 +84,18 @@ public class TopicAuthZTest extends MockedPulsarStandalone {
                 .serviceHttpUrl(getPulsarService().getWebServiceAddress())
                 .authentication(new AuthenticationToken(TENANT_ADMIN_TOKEN))
                 .build();
+
+        superUserAdmin.tenants().createTenant("pulsar", tenantInfo);
+        superUserAdmin.namespaces().createNamespace("pulsar/system");
     }
 
+    protected void createTransactionCoordinatorAssign(int numPartitionsOfTC) 
throws MetadataStoreException {
+        getPulsarService().getPulsarResources()
+                .getNamespaceResources()
+                .getPartitionedTopicResources()
+                
.createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
+                        new PartitionedTopicMetadata(numPartitionsOfTC));
+    }
 
     @SneakyThrows
     @AfterClass(alwaysRun = true)
@@ -1086,6 +1106,235 @@ public class TopicAuthZTest extends 
MockedPulsarStandalone {
         deleteTopic(topic, false);
     }
 
+    public enum OperationAuthType {
+        Lookup,
+        Produce,
+        Consume,
+        AdminOrSuperUser,
+        NOAuth
+    }
+
+    private final String testTopic = "persistent://public/default/" + 
UUID.randomUUID().toString();
+    @FunctionalInterface
+    public interface ThrowingBiConsumer<T> {
+        void accept(T t) throws PulsarAdminException;
+    }
+
+    @DataProvider(name = "authFunction")
+    public Object[][] authFunction () throws Exception {
+        String sub = "my-sub";
+        createTopic(testTopic, false);
+        @Cleanup final PulsarClient pulsarClient = PulsarClient.builder()
+                .serviceUrl(getPulsarService().getBrokerServiceUrl())
+                .authentication(new AuthenticationToken(SUPER_USER_TOKEN))
+                .enableTransaction(true)
+                .build();
+        @Cleanup final Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(testTopic).create();
+
+        @Cleanup final Consumer<String> consumer = 
pulsarClient.newConsumer(Schema.STRING)
+                .topic(testTopic)
+                .subscriptionName(sub)
+                .subscribe();
+
+        Transaction transaction = 
pulsarClient.newTransaction().withTransactionTimeout(5, TimeUnit.MINUTES)
+                .build().get();
+        MessageIdImpl messageId = (MessageIdImpl) 
producer.newMessage().value("test message").send();
+
+        consumer.acknowledgeAsync(messageId, transaction).get();
+
+        return new Object[][]{
+                // SCHEMA
+               new Object[] {
+                       (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.schemas().getSchemaInfo(testTopic),
+                       OperationAuthType.Lookup
+               },
+                new Object[] {
+                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.schemas().getSchemaInfo(
+                                testTopic, 0),
+                        OperationAuthType.Lookup
+                },
+                new Object[] {
+                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.schemas().getAllSchemas(
+                                testTopic),
+                        OperationAuthType.Lookup
+                },
+                new Object[] {
+                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.schemas().createSchema(testTopic,
+                                
SchemaInfo.builder().type(SchemaType.STRING).build()),
+                        OperationAuthType.Produce
+                },
+                // TODO: improve the authorization check for testCompatibility 
and deleteSchema
+                new Object[] {
+                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.schemas().testCompatibility(
+                                testTopic, 
SchemaInfo.builder().type(SchemaType.STRING).build()),
+                        OperationAuthType.AdminOrSuperUser
+                },
+                new Object[] {
+                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.schemas().deleteSchema(
+                                testTopic),
+                        OperationAuthType.AdminOrSuperUser
+                },
+
+                // TRANSACTION
+
+                // Modify transaction coordinator
+                new Object[] {
+                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.transactions()
+                                .abortTransaction(transaction.getTxnID()),
+                        OperationAuthType.AdminOrSuperUser
+                },
+                new Object[] {
+                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.transactions()
+                                .scaleTransactionCoordinators(17),
+                        OperationAuthType.AdminOrSuperUser
+                },
+                // TODO: fix authorization check of check transaction 
coordinator stats.
+                // Check transaction coordinator stats
+                new Object[] {
+                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.transactions()
+                                .getCoordinatorInternalStats(1, false),
+                        OperationAuthType.NOAuth
+                },
+                new Object[] {
+                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.transactions()
+                                .getCoordinatorStats(),
+                        OperationAuthType.AdminOrSuperUser
+                },
+                new Object[] {
+                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.transactions()
+                                .getSlowTransactionsByCoordinatorId(1, 5, 
TimeUnit.SECONDS),
+                        OperationAuthType.NOAuth
+                },
+                new Object[] {
+                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.transactions()
+                                
.getTransactionMetadata(transaction.getTxnID()),
+                        OperationAuthType.NOAuth
+                },
+                new Object[] {
+                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.transactions()
+                                .listTransactionCoordinators(),
+                        OperationAuthType.NOAuth
+                },
+                new Object[] {
+                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.transactions()
+                                .getSlowTransactions(5, TimeUnit.SECONDS),
+                        OperationAuthType.AdminOrSuperUser
+                },
+
+                // TODO: Check the authorization of the topic when get stats 
of TB or TP
+                // Check stats related to transaction buffer and transaction 
pending ack
+                new Object[] {
+                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.transactions()
+                                .getPendingAckInternalStats(testTopic, sub, 
false),
+                        OperationAuthType.NOAuth
+                },
+                new Object[] {
+                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.transactions()
+                                .getPendingAckStats(testTopic, sub, false),
+                        OperationAuthType.NOAuth
+                },
+                new Object[] {
+                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.transactions()
+                                .getPositionStatsInPendingAck(testTopic, sub, 
messageId.getLedgerId(),
+                                        messageId.getEntryId(), null),
+                        OperationAuthType.NOAuth
+                },
+                new Object[] {
+                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.transactions()
+                                .getTransactionBufferInternalStats(testTopic, 
false),
+                        OperationAuthType.NOAuth
+                },
+                new Object[] {
+                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.transactions()
+                                .getTransactionBufferStats(testTopic, false),
+                        OperationAuthType.NOAuth
+                },
+                new Object[] {
+                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.transactions()
+                                .getTransactionBufferStats(testTopic, false),
+                        OperationAuthType.NOAuth
+                },
+                new Object[] {
+                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.transactions()
+                                
.getTransactionInBufferStats(transaction.getTxnID(), testTopic),
+                        OperationAuthType.NOAuth
+                },
+                new Object[] {
+                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.transactions()
+                                
.getTransactionInBufferStats(transaction.getTxnID(), testTopic),
+                        OperationAuthType.NOAuth
+                },
+                new Object[] {
+                        (ThrowingBiConsumer<PulsarAdmin>) (admin) -> 
admin.transactions()
+                                
.getTransactionInPendingAckStats(transaction.getTxnID(), testTopic, sub),
+                        OperationAuthType.NOAuth
+                },
+        };
+    }
+
+    @Test(dataProvider = "authFunction")
+    public void 
testSchemaAndTransactionAuthorization(ThrowingBiConsumer<PulsarAdmin> 
adminConsumer, OperationAuthType topicOpType)
+            throws Exception {
+        final String subject =  UUID.randomUUID().toString();
+        final String token = Jwts.builder()
+                .claim("sub", subject).signWith(SECRET_KEY).compact();
+
+        @Cleanup
+        final PulsarAdmin subAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .build();
+        // test tenant manager
+        if (topicOpType != OperationAuthType.AdminOrSuperUser) {
+            adminConsumer.accept(tenantManagerAdmin);
+        }
+
+        if (topicOpType != OperationAuthType.NOAuth) {
+            
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                    () -> adminConsumer.accept(subAdmin));
+        }
+
+        for (AuthAction action : AuthAction.values()) {
+            superUserAdmin.topics().grantPermission(testTopic, subject, 
Set.of(action));
+
+            if (authActionMatchOperation(topicOpType, action)) {
+                adminConsumer.accept(subAdmin);
+            } else {
+                
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                        () -> adminConsumer.accept(subAdmin));
+            }
+            superUserAdmin.topics().revokePermissions(testTopic, subject);
+        }
+    }
+
+
+    private boolean authActionMatchOperation(OperationAuthType 
operationAuthType, AuthAction action) {
+        switch (operationAuthType) {
+            case Lookup -> {
+                if (AuthAction.consume == action || AuthAction.produce == 
action) {
+                    return true;
+                }
+            }
+            case Consume -> {
+                if (AuthAction.consume == action) {
+                    return true;
+                }
+            }
+            case Produce -> {
+                if (AuthAction.produce == action) {
+                    return true;
+                }
+            }
+            case AdminOrSuperUser -> {
+                return false;
+            }
+            case NOAuth -> {
+                return true;
+            }
+        }
+        return false;
+    }
+
     private void createTopic(String topic, boolean partitioned) throws 
Exception {
         if (partitioned) {
             superUserAdmin.topics().createPartitionedTopic(topic, 2);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/security/MockedPulsarStandalone.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/security/MockedPulsarStandalone.java
index 421727c0ed7..b82f3b58406 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/security/MockedPulsarStandalone.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/security/MockedPulsarStandalone.java
@@ -105,7 +105,9 @@ public abstract class MockedPulsarStandalone implements 
AutoCloseable {
 
     }
 
-
+    protected void enableTransaction() {
+        serviceConfiguration.setTransactionCoordinatorEnabled(true);
+    }
 
     protected void configureDefaultAuthorization() {
         serviceConfiguration.setAuthorizationEnabled(true);

Reply via email to