This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 71640f696fd [improve][admin] Check if the topic existed before the
permission operations (#22742)
71640f696fd is described below
commit 71640f696fd9109bc677408e3c2cbacb2fb7252b
Author: Jiwei Guo <[email protected]>
AuthorDate: Sat May 18 22:57:45 2024 +0800
[improve][admin] Check if the topic existed before the permission
operations (#22742)
---
.../pulsar/broker/admin/impl/PersistentTopicsBase.java | 15 +++++++++------
.../pulsar/broker/admin/AdminApiSchemaWithAuthTest.java | 1 +
.../java/org/apache/pulsar/broker/admin/AdminApiTest.java | 12 ++++++++++++
.../apache/pulsar/broker/admin/PersistentTopicsTest.java | 10 ++++++++--
.../org/apache/pulsar/broker/auth/AuthorizationTest.java | 12 +++++++-----
.../client/api/AuthenticatedProducerConsumerTest.java | 5 +++--
.../client/api/AuthorizationProducerConsumerTest.java | 2 ++
.../pulsar/websocket/proxy/ProxyAuthorizationTest.java | 8 +++++---
.../java/org/apache/pulsar/sql/presto/TestPulsarAuth.java | 2 +-
.../tests/integration/presto/TestPulsarSQLAuth.java | 7 ++-----
10 files changed, 50 insertions(+), 24 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index ebc838756f9..978111d7187 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -218,6 +218,7 @@ public class PersistentTopicsBase extends AdminResource {
protected CompletableFuture<Map<String, Set<AuthAction>>>
internalGetPermissionsOnTopic() {
// This operation should be reading from zookeeper and it should be
allowed without having admin privileges
return validateAdminAccessForTenantAsync(namespaceName.getTenant())
+ .thenCompose(__ -> internalCheckTopicExists(topicName))
.thenCompose(__ ->
namespaceResources().getPoliciesAsync(namespaceName)
.thenApply(policies -> {
if (!policies.isPresent()) {
@@ -298,9 +299,10 @@ public class PersistentTopicsBase extends AdminResource {
Set<AuthAction> actions) {
// This operation should be reading from zookeeper and it should be
allowed without having admin privileges
validateAdminAccessForTenantAsync(namespaceName.getTenant())
- .thenCompose(__ ->
validatePoliciesReadOnlyAccessAsync().thenCompose(unused1 ->
- grantPermissionsAsync(topicName, role, actions)
- .thenAccept(unused ->
asyncResponse.resume(Response.noContent().build()))))
+ .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+ .thenCompose(__ -> internalCheckTopicExists(topicName))
+ .thenCompose(unused1 -> grantPermissionsAsync(topicName, role,
actions))
+ .thenAccept(unused ->
asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
Throwable realCause =
FutureUtil.unwrapCompletionException(ex);
log.error("[{}] Failed to get permissions for topic {}",
clientAppId(), topicName, realCause);
@@ -346,8 +348,9 @@ public class PersistentTopicsBase extends AdminResource {
protected void internalRevokePermissionsOnTopic(AsyncResponse
asyncResponse, String role) {
// This operation should be reading from zookeeper and it should be
allowed without having admin privileges
validateAdminAccessForTenantAsync(namespaceName.getTenant())
- .thenCompose(__ ->
validatePoliciesReadOnlyAccessAsync().thenCompose(unused1 ->
- getPartitionedTopicMetadataAsync(topicName, true, false)
+ .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+ .thenCompose(__ -> internalCheckTopicExists(topicName))
+ .thenCompose(unused1 ->
getPartitionedTopicMetadataAsync(topicName, true, false)
.thenCompose(metadata -> {
int numPartitions = metadata.partitions;
CompletableFuture<Void> future =
CompletableFuture.completedFuture(null);
@@ -360,7 +363,7 @@ public class PersistentTopicsBase extends AdminResource {
}
return future.thenComposeAsync(unused ->
revokePermissionsAsync(topicName.toString(), role, false))
.thenAccept(unused ->
asyncResponse.resume(Response.noContent().build()));
- }))
+ })
).exceptionally(ex -> {
Throwable realCause =
FutureUtil.unwrapCompletionException(ex);
log.error("[{}] Failed to revoke permissions for topic
{}", clientAppId(), topicName, realCause);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java
index 5159d7b7141..15d6e509ca7 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java
@@ -115,6 +115,7 @@ public class AdminApiSchemaWithAuthTest extends
MockedPulsarServiceBaseTest {
.serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() :
brokerUrlTls.toString())
.authentication(AuthenticationToken.class.getName(),
PRODUCE_TOKEN)
.build();
+ admin.topics().createNonPartitionedTopic(topicName);
admin.topics().grantPermission(topicName, "consumer",
EnumSet.of(AuthAction.consume));
admin.topics().grantPermission(topicName, "producer",
EnumSet.of(AuthAction.produce));
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index b5ce948725f..697ce784eed 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -3611,4 +3611,16 @@ public class AdminApiTest extends
MockedPulsarServiceBaseTest {
});
}
+
+ @Test
+ @SneakyThrows
+ public void testPermissions() {
+ String namespace = "prop-xyz/ns1/";
+ final String random = UUID.randomUUID().toString();
+ final String topic = "persistent://" + namespace + random;
+ final String subject = UUID.randomUUID().toString();
+ assertThrows(NotFoundException.class, () ->
admin.topics().getPermissions(topic));
+ assertThrows(NotFoundException.class, () ->
admin.topics().grantPermission(topic, subject, Set.of(AuthAction.produce)));
+ assertThrows(NotFoundException.class, () ->
admin.topics().revokePermissions(topic, subject));
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 23ea5838d56..65b821d6ddc 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -885,12 +885,15 @@ public class PersistentTopicsTest extends
MockedPulsarServiceBaseTest {
public void testGrantNonPartitionedTopic() {
final String topicName = "non-partitioned-topic";
AsyncResponse response = mock(AsyncResponse.class);
+ ArgumentCaptor<Response> responseCaptor =
ArgumentCaptor.forClass(Response.class);
persistentTopics.createNonPartitionedTopic(response, testTenant,
testNamespace, topicName, true, null);
+ verify(response,
timeout(5000).times(1)).resume(responseCaptor.capture());
+ Assert.assertEquals(responseCaptor.getValue().getStatus(),
Response.Status.NO_CONTENT.getStatusCode());
String role = "role";
Set<AuthAction> expectActions = new HashSet<>();
expectActions.add(AuthAction.produce);
response = mock(AsyncResponse.class);
- ArgumentCaptor<Response> responseCaptor =
ArgumentCaptor.forClass(Response.class);
+ responseCaptor = ArgumentCaptor.forClass(Response.class);
persistentTopics.grantPermissionsOnTopic(response, testTenant,
testNamespace, topicName, role, expectActions);
verify(response,
timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(),
Response.Status.NO_CONTENT.getStatusCode());
@@ -948,12 +951,15 @@ public class PersistentTopicsTest extends
MockedPulsarServiceBaseTest {
public void testRevokeNonPartitionedTopic() {
final String topicName = "non-partitioned-topic";
AsyncResponse response = mock(AsyncResponse.class);
+ ArgumentCaptor<Response> responseCaptor =
ArgumentCaptor.forClass(Response.class);
persistentTopics.createNonPartitionedTopic(response, testTenant,
testNamespace, topicName, true, null);
+ verify(response,
timeout(5000).times(1)).resume(responseCaptor.capture());
+ Assert.assertEquals(responseCaptor.getValue().getStatus(),
Response.Status.NO_CONTENT.getStatusCode());
String role = "role";
Set<AuthAction> expectActions = new HashSet<>();
expectActions.add(AuthAction.produce);
response = mock(AsyncResponse.class);
- ArgumentCaptor<Response> responseCaptor =
ArgumentCaptor.forClass(Response.class);
+ responseCaptor = ArgumentCaptor.forClass(Response.class);
persistentTopics.grantPermissionsOnTopic(response, testTenant,
testNamespace, topicName, role, expectActions);
verify(response,
timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(),
Response.Status.NO_CONTENT.getStatusCode());
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
index 7acd39d741d..6a75353240f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
@@ -106,8 +106,9 @@ public class AuthorizationTest extends
MockedPulsarServiceBaseTest {
assertTrue(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"),
"my-role", null));
assertTrue(auth.canProduce(TopicName.get("persistent://p1/c1/ns1/ds1"),
"my-role", null));
- admin.topics().grantPermission("persistent://p1/c1/ns1/ds2",
"other-role",
- EnumSet.of(AuthAction.consume));
+ String topic = "persistent://p1/c1/ns1/ds2";
+ admin.topics().createNonPartitionedTopic(topic);
+ admin.topics().grantPermission(topic, "other-role",
EnumSet.of(AuthAction.consume));
waitForChange();
assertTrue(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds2"),
"other-role", null));
@@ -177,8 +178,9 @@ public class AuthorizationTest extends
MockedPulsarServiceBaseTest {
assertFalse(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds2"),
"my.role.1", null));
assertFalse(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds2"),
"my.role.2", null));
- admin.topics().grantPermission("persistent://p1/c1/ns1/ds1", "my.*",
- EnumSet.of(AuthAction.produce));
+ String topic1 = "persistent://p1/c1/ns1/ds1";
+ admin.topics().createNonPartitionedTopic(topic1);
+ admin.topics().grantPermission(topic1, "my.*",
EnumSet.of(AuthAction.produce));
waitForChange();
assertTrue(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"),
"my.role.1", null));
@@ -241,7 +243,7 @@ public class AuthorizationTest extends
MockedPulsarServiceBaseTest {
assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"),
"role2", null, "role2-sub2"));
assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"),
"pulsar.super_user", null, "role3-sub1"));
- admin.namespaces().deleteNamespace("p1/c1/ns1");
+ admin.namespaces().deleteNamespace("p1/c1/ns1", true);
admin.tenants().deleteTenant("p1");
admin.clusters().deleteCluster("c1");
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
index 3bd8b920a30..44d8549fa62 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
@@ -262,8 +262,9 @@ public class AuthenticatedProducerConsumerTest extends
ProducerConsumerBase {
admin.close();
admin =
spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString()).build());
admin.namespaces().createNamespace("my-property/my-ns",
Sets.newHashSet("test"));
-
admin.topics().grantPermission("persistent://my-property/my-ns/my-topic",
"anonymousUser",
- EnumSet.allOf(AuthAction.class));
+ String topic = "persistent://my-property/my-ns/my-topic";
+ admin.topics().createNonPartitionedTopic(topic);
+ admin.topics().grantPermission(topic, "anonymousUser",
EnumSet.allOf(AuthAction.class));
// setup the client
replacePulsarClient(PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl())
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
index ba41848bf2c..942afcd79e5 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
@@ -236,6 +236,7 @@ public class AuthorizationProducerConsumerTest extends
ProducerConsumerBase {
}
// grant topic consume authorization to the subscriptionRole
+ tenantAdmin.topics().createNonPartitionedTopic(topicName);
tenantAdmin.topics().grantPermission(topicName, subscriptionRole,
Collections.singleton(AuthAction.consume));
@@ -720,6 +721,7 @@ public class AuthorizationProducerConsumerTest extends
ProducerConsumerBase {
admin.tenants().createTenant("my-property",
new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"),
Sets.newHashSet("test")));
admin.namespaces().createNamespace("my-property/my-ns",
Sets.newHashSet("test"));
+ admin.topics().createNonPartitionedTopic(topic);
admin.topics().grantPermission(topic, invalidRole,
Collections.singleton(AuthAction.produce));
admin.topics().grantPermission(topic, producerRole,
Sets.newHashSet(AuthAction.produce, AuthAction.consume));
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java
index a3b26a4a9d1..9b45f38ca8b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java
@@ -55,6 +55,7 @@ public class ProxyAuthorizationTest extends
MockedPulsarServiceBaseTest {
@Override
protected void setup() throws Exception {
conf.setClusterName(configClusterName);
+ conf.setForceDeleteNamespaceAllowed(true);
internalSetup();
WebSocketProxyConfiguration config = new WebSocketProxyConfiguration();
@@ -99,8 +100,9 @@ public class ProxyAuthorizationTest extends
MockedPulsarServiceBaseTest {
assertTrue(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"),
"my-role", null));
assertTrue(auth.canProduce(TopicName.get("persistent://p1/c1/ns1/ds1"),
"my-role", null));
- admin.topics().grantPermission("persistent://p1/c1/ns1/ds2",
"other-role",
- EnumSet.of(AuthAction.consume));
+ String topic = "persistent://p1/c1/ns1/ds2";
+ admin.topics().createNonPartitionedTopic(topic);
+ admin.topics().grantPermission(topic, "other-role",
EnumSet.of(AuthAction.consume));
waitForChange();
assertTrue(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds2"),
"other-role", null));
@@ -117,7 +119,7 @@ public class ProxyAuthorizationTest extends
MockedPulsarServiceBaseTest {
assertTrue(auth.canProduce(TopicName.get("persistent://p1/c1/ns1/ds1"),
"my-role", null));
assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"),
"my-role", null, null));
- admin.namespaces().deleteNamespace("p1/c1/ns1");
+ admin.namespaces().deleteNamespace("p1/c1/ns1", true);
admin.tenants().deleteTenant("p1");
admin.clusters().deleteCluster("c1");
}
diff --git
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarAuth.java
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarAuth.java
index 7b550b7270f..412c41f8b89 100644
---
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarAuth.java
+++
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarAuth.java
@@ -154,7 +154,7 @@ public class TestPulsarAuth extends
MockedPulsarServiceBaseTest {
String partitionedTopic = "persistent://p1/c1/ns1/" +
RandomStringUtils.randomAlphabetic(4);
String passToken = AuthTokenUtils.createToken(secretKey, passRole,
Optional.empty());
String deniedToken = AuthTokenUtils.createToken(secretKey, deniedRole,
Optional.empty());
-
+ admin.topics().createNonPartitionedTopic(topic);
admin.topics().grantPermission(topic, passRole,
EnumSet.of(AuthAction.consume));
admin.topics().createPartitionedTopic(partitionedTopic, 2);
admin.topics().grantPermission(partitionedTopic, passRole,
EnumSet.of(AuthAction.consume));
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLAuth.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLAuth.java
index 87db46f2bb6..e3b232021b1 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLAuth.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLAuth.java
@@ -106,10 +106,8 @@ public class TestPulsarSQLAuth extends TestPulsarSQLBase {
String passToken = AuthTokenUtils.createToken(secretKey, passRole,
Optional.empty());
String deniedToken = AuthTokenUtils.createToken(secretKey, deniedRole,
Optional.empty());
String topic = "testPulsarSQLAuthCheck";
-
- admin.topics().grantPermission(topic, passRole,
EnumSet.of(AuthAction.consume));
-
admin.topics().createNonPartitionedTopic(topic);
+ admin.topics().grantPermission(topic, passRole,
EnumSet.of(AuthAction.consume));
String queryAllDataSql = String.format("select * from
pulsar.\"%s\".\"%s\";", "public/default", topic);
@@ -173,9 +171,8 @@ public class TestPulsarSQLAuth extends TestPulsarSQLBase {
String topic1 = "testCheckAuthForMultipleTopics1";
String topic2 = "testCheckAuthForMultipleTopics2";
- admin.topics().grantPermission(topic1, testRole,
EnumSet.of(AuthAction.consume));
-
admin.topics().createNonPartitionedTopic(topic1);
+ admin.topics().grantPermission(topic1, testRole,
EnumSet.of(AuthAction.consume));
admin.topics().createPartitionedTopic(topic2, 2); // Test for
partitioned topic