This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9555504ee2c [improve][admin] Add authorization test for schema and
align auth for transaction (#22399)
9555504ee2c is described below
commit 9555504ee2c7adf9febddc585a699a1fdb724013
Author: Xiangying Meng <[email protected]>
AuthorDate: Tue Apr 9 22:43:35 2024 +0800
[improve][admin] Add authorization test for schema and align auth for
transaction (#22399)
---
.../apache/pulsar/broker/admin/TopicAuthZTest.java | 249 +++++++++++++++++++++
.../pulsar/security/MockedPulsarStandalone.java | 4 +-
2 files changed, 252 insertions(+), 1 deletion(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java
index 2e75b59ec85..d09bc0a3ffd 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java
@@ -20,19 +20,27 @@
package org.apache.pulsar.broker.admin;
import io.jsonwebtoken.Jwts;
+import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.SneakyThrows;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.security.MockedPulsarStandalone;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
@@ -62,7 +70,9 @@ public class TopicAuthZTest extends MockedPulsarStandalone {
public void before() {
configureTokenAuthentication();
configureDefaultAuthorization();
+ enableTransaction();
start();
+ createTransactionCoordinatorAssign(16);
this.superUserAdmin =PulsarAdmin.builder()
.serviceHttpUrl(getPulsarService().getWebServiceAddress())
.authentication(new AuthenticationToken(SUPER_USER_TOKEN))
@@ -74,8 +84,18 @@ public class TopicAuthZTest extends MockedPulsarStandalone {
.serviceHttpUrl(getPulsarService().getWebServiceAddress())
.authentication(new AuthenticationToken(TENANT_ADMIN_TOKEN))
.build();
+
+ superUserAdmin.tenants().createTenant("pulsar", tenantInfo);
+ superUserAdmin.namespaces().createNamespace("pulsar/system");
}
+ protected void createTransactionCoordinatorAssign(int numPartitionsOfTC)
throws MetadataStoreException {
+ getPulsarService().getPulsarResources()
+ .getNamespaceResources()
+ .getPartitionedTopicResources()
+
.createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
+ new PartitionedTopicMetadata(numPartitionsOfTC));
+ }
@SneakyThrows
@AfterClass(alwaysRun = true)
@@ -1086,6 +1106,235 @@ public class TopicAuthZTest extends
MockedPulsarStandalone {
deleteTopic(topic, false);
}
+ public enum OperationAuthType {
+ Lookup,
+ Produce,
+ Consume,
+ AdminOrSuperUser,
+ NOAuth
+ }
+
+ private final String testTopic = "persistent://public/default/" +
UUID.randomUUID().toString();
+ @FunctionalInterface
+ public interface ThrowingBiConsumer<T> {
+ void accept(T t) throws PulsarAdminException;
+ }
+
+ @DataProvider(name = "authFunction")
+ public Object[][] authFunction () throws Exception {
+ String sub = "my-sub";
+ createTopic(testTopic, false);
+ @Cleanup final PulsarClient pulsarClient = PulsarClient.builder()
+ .serviceUrl(getPulsarService().getBrokerServiceUrl())
+ .authentication(new AuthenticationToken(SUPER_USER_TOKEN))
+ .enableTransaction(true)
+ .build();
+ @Cleanup final Producer<String> producer =
pulsarClient.newProducer(Schema.STRING).topic(testTopic).create();
+
+ @Cleanup final Consumer<String> consumer =
pulsarClient.newConsumer(Schema.STRING)
+ .topic(testTopic)
+ .subscriptionName(sub)
+ .subscribe();
+
+ Transaction transaction =
pulsarClient.newTransaction().withTransactionTimeout(5, TimeUnit.MINUTES)
+ .build().get();
+ MessageIdImpl messageId = (MessageIdImpl)
producer.newMessage().value("test message").send();
+
+ consumer.acknowledgeAsync(messageId, transaction).get();
+
+ return new Object[][]{
+ // SCHEMA
+ new Object[] {
+ (ThrowingBiConsumer<PulsarAdmin>) (admin) ->
admin.schemas().getSchemaInfo(testTopic),
+ OperationAuthType.Lookup
+ },
+ new Object[] {
+ (ThrowingBiConsumer<PulsarAdmin>) (admin) ->
admin.schemas().getSchemaInfo(
+ testTopic, 0),
+ OperationAuthType.Lookup
+ },
+ new Object[] {
+ (ThrowingBiConsumer<PulsarAdmin>) (admin) ->
admin.schemas().getAllSchemas(
+ testTopic),
+ OperationAuthType.Lookup
+ },
+ new Object[] {
+ (ThrowingBiConsumer<PulsarAdmin>) (admin) ->
admin.schemas().createSchema(testTopic,
+
SchemaInfo.builder().type(SchemaType.STRING).build()),
+ OperationAuthType.Produce
+ },
+ // TODO: improve the authorization check for testCompatibility
and deleteSchema
+ new Object[] {
+ (ThrowingBiConsumer<PulsarAdmin>) (admin) ->
admin.schemas().testCompatibility(
+ testTopic,
SchemaInfo.builder().type(SchemaType.STRING).build()),
+ OperationAuthType.AdminOrSuperUser
+ },
+ new Object[] {
+ (ThrowingBiConsumer<PulsarAdmin>) (admin) ->
admin.schemas().deleteSchema(
+ testTopic),
+ OperationAuthType.AdminOrSuperUser
+ },
+
+ // TRANSACTION
+
+ // Modify transaction coordinator
+ new Object[] {
+ (ThrowingBiConsumer<PulsarAdmin>) (admin) ->
admin.transactions()
+ .abortTransaction(transaction.getTxnID()),
+ OperationAuthType.AdminOrSuperUser
+ },
+ new Object[] {
+ (ThrowingBiConsumer<PulsarAdmin>) (admin) ->
admin.transactions()
+ .scaleTransactionCoordinators(17),
+ OperationAuthType.AdminOrSuperUser
+ },
+ // TODO: fix authorization check of check transaction
coordinator stats.
+ // Check transaction coordinator stats
+ new Object[] {
+ (ThrowingBiConsumer<PulsarAdmin>) (admin) ->
admin.transactions()
+ .getCoordinatorInternalStats(1, false),
+ OperationAuthType.NOAuth
+ },
+ new Object[] {
+ (ThrowingBiConsumer<PulsarAdmin>) (admin) ->
admin.transactions()
+ .getCoordinatorStats(),
+ OperationAuthType.AdminOrSuperUser
+ },
+ new Object[] {
+ (ThrowingBiConsumer<PulsarAdmin>) (admin) ->
admin.transactions()
+ .getSlowTransactionsByCoordinatorId(1, 5,
TimeUnit.SECONDS),
+ OperationAuthType.NOAuth
+ },
+ new Object[] {
+ (ThrowingBiConsumer<PulsarAdmin>) (admin) ->
admin.transactions()
+
.getTransactionMetadata(transaction.getTxnID()),
+ OperationAuthType.NOAuth
+ },
+ new Object[] {
+ (ThrowingBiConsumer<PulsarAdmin>) (admin) ->
admin.transactions()
+ .listTransactionCoordinators(),
+ OperationAuthType.NOAuth
+ },
+ new Object[] {
+ (ThrowingBiConsumer<PulsarAdmin>) (admin) ->
admin.transactions()
+ .getSlowTransactions(5, TimeUnit.SECONDS),
+ OperationAuthType.AdminOrSuperUser
+ },
+
+ // TODO: Check the authorization of the topic when get stats
of TB or TP
+ // Check stats related to transaction buffer and transaction
pending ack
+ new Object[] {
+ (ThrowingBiConsumer<PulsarAdmin>) (admin) ->
admin.transactions()
+ .getPendingAckInternalStats(testTopic, sub,
false),
+ OperationAuthType.NOAuth
+ },
+ new Object[] {
+ (ThrowingBiConsumer<PulsarAdmin>) (admin) ->
admin.transactions()
+ .getPendingAckStats(testTopic, sub, false),
+ OperationAuthType.NOAuth
+ },
+ new Object[] {
+ (ThrowingBiConsumer<PulsarAdmin>) (admin) ->
admin.transactions()
+ .getPositionStatsInPendingAck(testTopic, sub,
messageId.getLedgerId(),
+ messageId.getEntryId(), null),
+ OperationAuthType.NOAuth
+ },
+ new Object[] {
+ (ThrowingBiConsumer<PulsarAdmin>) (admin) ->
admin.transactions()
+ .getTransactionBufferInternalStats(testTopic,
false),
+ OperationAuthType.NOAuth
+ },
+ new Object[] {
+ (ThrowingBiConsumer<PulsarAdmin>) (admin) ->
admin.transactions()
+ .getTransactionBufferStats(testTopic, false),
+ OperationAuthType.NOAuth
+ },
+ new Object[] {
+ (ThrowingBiConsumer<PulsarAdmin>) (admin) ->
admin.transactions()
+ .getTransactionBufferStats(testTopic, false),
+ OperationAuthType.NOAuth
+ },
+ new Object[] {
+ (ThrowingBiConsumer<PulsarAdmin>) (admin) ->
admin.transactions()
+
.getTransactionInBufferStats(transaction.getTxnID(), testTopic),
+ OperationAuthType.NOAuth
+ },
+ new Object[] {
+ (ThrowingBiConsumer<PulsarAdmin>) (admin) ->
admin.transactions()
+
.getTransactionInBufferStats(transaction.getTxnID(), testTopic),
+ OperationAuthType.NOAuth
+ },
+ new Object[] {
+ (ThrowingBiConsumer<PulsarAdmin>) (admin) ->
admin.transactions()
+
.getTransactionInPendingAckStats(transaction.getTxnID(), testTopic, sub),
+ OperationAuthType.NOAuth
+ },
+ };
+ }
+
+ @Test(dataProvider = "authFunction")
+ public void
testSchemaAndTransactionAuthorization(ThrowingBiConsumer<PulsarAdmin>
adminConsumer, OperationAuthType topicOpType)
+ throws Exception {
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+
+ @Cleanup
+ final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+ // test tenant manager
+ if (topicOpType != OperationAuthType.AdminOrSuperUser) {
+ adminConsumer.accept(tenantManagerAdmin);
+ }
+
+ if (topicOpType != OperationAuthType.NOAuth) {
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> adminConsumer.accept(subAdmin));
+ }
+
+ for (AuthAction action : AuthAction.values()) {
+ superUserAdmin.topics().grantPermission(testTopic, subject,
Set.of(action));
+
+ if (authActionMatchOperation(topicOpType, action)) {
+ adminConsumer.accept(subAdmin);
+ } else {
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> adminConsumer.accept(subAdmin));
+ }
+ superUserAdmin.topics().revokePermissions(testTopic, subject);
+ }
+ }
+
+
+ private boolean authActionMatchOperation(OperationAuthType
operationAuthType, AuthAction action) {
+ switch (operationAuthType) {
+ case Lookup -> {
+ if (AuthAction.consume == action || AuthAction.produce ==
action) {
+ return true;
+ }
+ }
+ case Consume -> {
+ if (AuthAction.consume == action) {
+ return true;
+ }
+ }
+ case Produce -> {
+ if (AuthAction.produce == action) {
+ return true;
+ }
+ }
+ case AdminOrSuperUser -> {
+ return false;
+ }
+ case NOAuth -> {
+ return true;
+ }
+ }
+ return false;
+ }
+
private void createTopic(String topic, boolean partitioned) throws
Exception {
if (partitioned) {
superUserAdmin.topics().createPartitionedTopic(topic, 2);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/security/MockedPulsarStandalone.java
b/pulsar-broker/src/test/java/org/apache/pulsar/security/MockedPulsarStandalone.java
index 421727c0ed7..b82f3b58406 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/security/MockedPulsarStandalone.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/security/MockedPulsarStandalone.java
@@ -105,7 +105,9 @@ public abstract class MockedPulsarStandalone implements
AutoCloseable {
}
-
+ protected void enableTransaction() {
+ serviceConfiguration.setTransactionCoordinatorEnabled(true);
+ }
protected void configureDefaultAuthorization() {
serviceConfiguration.setAuthorizationEnabled(true);