This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c24a6615f71 [Broker]make getPermissionsOnTopic method async (#14179)
c24a6615f71 is described below
commit c24a6615f7190615f54b2cbf6d2612488c96a33b
Author: Dezhi LIiu <[email protected]>
AuthorDate: Wed Apr 27 15:11:30 2022 +0800
[Broker]make getPermissionsOnTopic method async (#14179)
---
.../broker/admin/impl/PersistentTopicsBase.java | 63 ++++++++++------------
.../pulsar/broker/admin/v1/PersistentTopics.java | 23 +++++---
.../pulsar/broker/admin/v2/PersistentTopics.java | 17 ++++--
.../org/apache/pulsar/broker/admin/AdminTest.java | 13 +++--
.../pulsar/broker/admin/PersistentTopicsTest.java | 42 +++++++++++----
5 files changed, 101 insertions(+), 57 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 6c28bd134f7..a50f54dfa5d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -204,44 +204,39 @@ public class PersistentTopicsBase extends AdminResource {
return getPartitionedTopicList(TopicDomain.getEnum(domain()));
}
- protected Map<String, Set<AuthAction>> internalGetPermissionsOnTopic() {
+ protected CompletableFuture<Map<String, Set<AuthAction>>>
internalGetPermissionsOnTopic() {
// This operation should be reading from zookeeper and it should be
allowed without having admin privileges
- validateAdminAccessForTenant(namespaceName.getTenant());
-
- String topicUri = topicName.toString();
-
- try {
- Policies policies = namespaceResources().getPolicies(namespaceName)
- .orElseThrow(() -> new RestException(Status.NOT_FOUND,
"Namespace does not exist"));
-
- Map<String, Set<AuthAction>> permissions = Maps.newHashMap();
- AuthPolicies auth = policies.auth_policies;
-
- // First add namespace level permissions
- auth.getNamespaceAuthentication().forEach(permissions::put);
-
- // Then add topic level permissions
- if (auth.getTopicAuthentication().containsKey(topicUri)) {
- for (Map.Entry<String, Set<AuthAction>> entry :
-
auth.getTopicAuthentication().get(topicUri).entrySet()) {
- String role = entry.getKey();
- Set<AuthAction> topicPermissions = entry.getValue();
+ return validateAdminAccessForTenantAsync(namespaceName.getTenant())
+ .thenCompose(__ ->
namespaceResources().getPoliciesAsync(namespaceName)
+ .thenApply(policies -> {
+ if (!policies.isPresent()) {
+ throw new RestException(Status.NOT_FOUND, "Namespace does
not exist");
+ }
- if (!permissions.containsKey(role)) {
- permissions.put(role, topicPermissions);
- } else {
- // Do the union between namespace and topic level
- Set<AuthAction> union =
Sets.union(permissions.get(role), topicPermissions);
- permissions.put(role, union);
+ Map<String, Set<AuthAction>> permissions = Maps.newHashMap();
+ String topicUri = topicName.toString();
+ AuthPolicies auth = policies.get().auth_policies;
+ // First add namespace level permissions
+ auth.getNamespaceAuthentication().forEach(permissions::put);
+
+ // Then add topic level permissions
+ if (auth.getTopicAuthentication().containsKey(topicUri)) {
+ for (Map.Entry<String, Set<AuthAction>> entry :
+
auth.getTopicAuthentication().get(topicUri).entrySet()) {
+ String role = entry.getKey();
+ Set<AuthAction> topicPermissions = entry.getValue();
+
+ if (!permissions.containsKey(role)) {
+ permissions.put(role, topicPermissions);
+ } else {
+ // Do the union between namespace and topic level
+ Set<AuthAction> union =
Sets.union(permissions.get(role), topicPermissions);
+ permissions.put(role, union);
+ }
}
}
- }
-
- return permissions;
- } catch (Exception e) {
- log.error("[{}] Failed to get permissions for topic {}",
clientAppId(), topicUri, e);
- throw new RestException(e);
- }
+ return permissions;
+ }));
}
protected void validateCreateTopic(TopicName topicName) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index c24e0c34b40..61f785c5a34 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -25,7 +25,6 @@ import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
import java.util.Set;
import javax.ws.rs.DELETE;
@@ -107,11 +106,23 @@ public class PersistentTopics extends
PersistentTopicsBase {
+ "namespace level combined (union) with any eventual
specific permission set on the topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have
admin permission"),
@ApiResponse(code = 404, message = "Namespace doesn't exist")})
- public Map<String, Set<AuthAction>>
getPermissionsOnTopic(@PathParam("property") String property,
- @PathParam("cluster") String cluster, @PathParam("namespace")
String namespace,
- @PathParam("topic") @Encoded String encodedTopic) {
- validateTopicName(property, cluster, namespace, encodedTopic);
- return internalGetPermissionsOnTopic();
+ public void getPermissionsOnTopic(@Suspended AsyncResponse asyncResponse,
+
@PathParam("property") String property,
+
@PathParam("cluster") String cluster,
+
@PathParam("namespace") String namespace,
+
@PathParam("topic") @Encoded String encodedTopic) {
+ try {
+ validateTopicName(property, cluster, namespace, encodedTopic);
+ internalGetPermissionsOnTopic().thenAccept(permissions ->
asyncResponse.resume(permissions))
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to get permissions for topic
{}", clientAppId(), topicName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
+ } catch (Exception e) {
+ log.error("[{}] Failed to validate topic name {}", clientAppId(),
topicName, e);
+ resumeAsyncResponseExceptionally(asyncResponse, e);
+ }
}
@POST
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 3f12df47f4e..5b1544fd069 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -140,15 +140,26 @@ public class PersistentTopics extends
PersistentTopicsBase {
@ApiResponse(code = 404, message = "tenant/namespace/topic doesn't
exit"),
@ApiResponse(code = 412, message = "Topic name is not valid"),
@ApiResponse(code = 500, message = "Internal server error")})
- public Map<String, Set<AuthAction>> getPermissionsOnTopic(
+ public void getPermissionsOnTopic(
+ @Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic) {
- validateTopicName(tenant, namespace, encodedTopic);
- return internalGetPermissionsOnTopic();
+ try {
+ validateTopicName(tenant, namespace, encodedTopic);
+ internalGetPermissionsOnTopic().thenAccept(permissions ->
asyncResponse.resume(permissions))
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to get permissions for topic
{}", clientAppId(), topicName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
+ } catch (Exception e) {
+ log.error("[{}] Failed to validate topic name {}", clientAppId(),
topicName, e);
+ resumeAsyncResponseExceptionally(asyncResponse, e);
+ }
}
@POST
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index b6f2e3b8b8e..b0e6ecf984b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -788,8 +788,11 @@ public class AdminTest extends MockedPulsarServiceBaseTest
{
verify(response,
timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(),
Response.Status.NO_CONTENT.getStatusCode());
// verify permission
- Map<String, Set<AuthAction>> permission =
persistentTopics.getPermissionsOnTopic(property, cluster,
- namespace, topic);
+ response = mock(AsyncResponse.class);
+ responseCaptor = ArgumentCaptor.forClass(Response.class);
+ persistentTopics.getPermissionsOnTopic(response, property, cluster,
namespace, topic);
+ verify(response,
timeout(5000).times(1)).resume(responseCaptor.capture());
+ Map<String, Set<AuthAction>> permission = (Map<String,
Set<AuthAction>>) responseCaptor.getValue();
assertEquals(permission.get(role), actions);
// remove permission
response = mock(AsyncResponse.class);
@@ -799,7 +802,11 @@ public class AdminTest extends MockedPulsarServiceBaseTest
{
Assert.assertEquals(responseCaptor.getValue().getStatus(),
Response.Status.NO_CONTENT.getStatusCode());
// verify removed permission
Awaitility.await().untilAsserted(() -> {
- Map<String, Set<AuthAction>> p =
persistentTopics.getPermissionsOnTopic(property, cluster, namespace, topic);
+ AsyncResponse response1 = mock(AsyncResponse.class);
+ ArgumentCaptor<Response> responseCaptor1 =
ArgumentCaptor.forClass(Response.class);
+ persistentTopics.getPermissionsOnTopic(response1, property,
cluster, namespace, topic);
+ verify(response1,
timeout(5000).times(1)).resume(responseCaptor1.capture());
+ Map<String, Set<AuthAction>> p = (Map<String, Set<AuthAction>>)
responseCaptor1.getValue();
assertTrue(p.isEmpty());
});
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 6c72e19437f..6426dcd6001 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -628,7 +628,11 @@ public class PersistentTopicsTest extends
MockedPulsarServiceBaseTest {
persistentTopics.grantPermissionsOnTopic(response, testTenant,
testNamespace, topicName, role, expectActions);
verify(response,
timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(),
Response.Status.NO_CONTENT.getStatusCode());
- Map<String, Set<AuthAction>> permissions =
persistentTopics.getPermissionsOnTopic(testTenant, testNamespace, topicName);
+ response = mock(AsyncResponse.class);
+ responseCaptor = ArgumentCaptor.forClass(Response.class);
+ persistentTopics.getPermissionsOnTopic(response, testTenant,
testNamespace, topicName);
+ verify(response,
timeout(5000).times(1)).resume(responseCaptor.capture());
+ Map<String, Set<AuthAction>> permissions = (Map<String,
Set<AuthAction>>) responseCaptor.getValue();
Assert.assertEquals(permissions.get(role), expectActions);
}
@@ -669,15 +673,22 @@ public class PersistentTopicsTest extends
MockedPulsarServiceBaseTest {
expectActions);
verify(response,
timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(),
Response.Status.NO_CONTENT.getStatusCode());
- Map<String, Set<AuthAction>> permissions =
persistentTopics.getPermissionsOnTopic(testTenant, testNamespace,
- partitionedTopicName);
+ response = mock(AsyncResponse.class);
+ responseCaptor = ArgumentCaptor.forClass(Response.class);
+ persistentTopics.getPermissionsOnTopic(response, testTenant,
testNamespace, partitionedTopicName);
+ verify(response,
timeout(5000).times(1)).resume(responseCaptor.capture());
+ Map<String, Set<AuthAction>> permissions = (Map<String,
Set<AuthAction>>) responseCaptor.getValue();
Assert.assertEquals(permissions.get(role), expectActions);
TopicName topicName = TopicName.get(TopicDomain.persistent.value(),
testTenant, testNamespace,
partitionedTopicName);
for (int i = 0; i < numPartitions; i++) {
TopicName partition = topicName.getPartition(i);
- Map<String, Set<AuthAction>> partitionPermissions =
persistentTopics.getPermissionsOnTopic(testTenant,
- testNamespace, partition.getEncodedLocalName());
+ response = mock(AsyncResponse.class);
+ responseCaptor = ArgumentCaptor.forClass(Response.class);
+ persistentTopics.getPermissionsOnTopic(response, testTenant,
testNamespace, partition.getEncodedLocalName());
+ verify(response,
timeout(5000).times(1)).resume(responseCaptor.capture());
+ Map<String, Set<AuthAction>> partitionPermissions =
+ (Map<String, Set<AuthAction>>) responseCaptor.getValue();
Assert.assertEquals(partitionPermissions.get(role), expectActions);
}
}
@@ -699,7 +710,11 @@ public class PersistentTopicsTest extends
MockedPulsarServiceBaseTest {
persistentTopics.revokePermissionsOnTopic(response, testTenant,
testNamespace, topicName, role);
verify(response,
timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(),
Response.Status.NO_CONTENT.getStatusCode());
- Map<String, Set<AuthAction>> permissions =
persistentTopics.getPermissionsOnTopic(testTenant, testNamespace, topicName);
+ response = mock(AsyncResponse.class);
+ responseCaptor = ArgumentCaptor.forClass(Response.class);
+ persistentTopics.getPermissionsOnTopic(response, testTenant,
testNamespace, topicName);
+ verify(response,
timeout(5000).times(1)).resume(responseCaptor.capture());
+ Map<String, Set<AuthAction>> permissions = (Map<String,
Set<AuthAction>>) responseCaptor.getValue();
Assert.assertEquals(permissions.get(role), null);
}
@@ -727,16 +742,21 @@ public class PersistentTopicsTest extends
MockedPulsarServiceBaseTest {
responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response,
timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(),
Response.Status.NO_CONTENT.getStatusCode());
-
- Map<String, Set<AuthAction>> permissions =
persistentTopics.getPermissionsOnTopic(testTenant, testNamespace,
- partitionedTopicName);
+ response = mock(AsyncResponse.class);
+ responseCaptor = ArgumentCaptor.forClass(Response.class);
+ persistentTopics.getPermissionsOnTopic(response, testTenant,
testNamespace, partitionedTopicName);
+ verify(response,
timeout(5000).times(1)).resume(responseCaptor.capture());
+ Map<String, Set<AuthAction>> permissions = (Map<String,
Set<AuthAction>>) responseCaptor.getValue();
Assert.assertEquals(permissions.get(role), null);
TopicName topicName = TopicName.get(TopicDomain.persistent.value(),
testTenant, testNamespace,
partitionedTopicName);
for (int i = 0; i < numPartitions; i++) {
TopicName partition = topicName.getPartition(i);
- Map<String, Set<AuthAction>> partitionPermissions =
persistentTopics.getPermissionsOnTopic(testTenant,
- testNamespace, partition.getEncodedLocalName());
+ response = mock(AsyncResponse.class);
+ responseCaptor = ArgumentCaptor.forClass(Response.class);
+ persistentTopics.getPermissionsOnTopic(response, testTenant,
testNamespace, partition.getEncodedLocalName());
+ verify(response,
timeout(5000).times(1)).resume(responseCaptor.capture());
+ Map<String, Set<AuthAction>> partitionPermissions = (Map<String,
Set<AuthAction>>) responseCaptor.getValue();
Assert.assertEquals(partitionPermissions.get(role), null);
}
}