This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c24a6615f71 [Broker]make getPermissionsOnTopic method async (#14179)
c24a6615f71 is described below

commit c24a6615f7190615f54b2cbf6d2612488c96a33b
Author: Dezhi LIiu <[email protected]>
AuthorDate: Wed Apr 27 15:11:30 2022 +0800

    [Broker]make getPermissionsOnTopic method async (#14179)
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 63 ++++++++++------------
 .../pulsar/broker/admin/v1/PersistentTopics.java   | 23 +++++---
 .../pulsar/broker/admin/v2/PersistentTopics.java   | 17 ++++--
 .../org/apache/pulsar/broker/admin/AdminTest.java  | 13 +++--
 .../pulsar/broker/admin/PersistentTopicsTest.java  | 42 +++++++++++----
 5 files changed, 101 insertions(+), 57 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 6c28bd134f7..a50f54dfa5d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -204,44 +204,39 @@ public class PersistentTopicsBase extends AdminResource {
         return getPartitionedTopicList(TopicDomain.getEnum(domain()));
     }
 
-    protected Map<String, Set<AuthAction>> internalGetPermissionsOnTopic() {
+    protected CompletableFuture<Map<String, Set<AuthAction>>> 
internalGetPermissionsOnTopic() {
         // This operation should be reading from zookeeper and it should be 
allowed without having admin privileges
-        validateAdminAccessForTenant(namespaceName.getTenant());
-
-        String topicUri = topicName.toString();
-
-        try {
-            Policies policies = namespaceResources().getPolicies(namespaceName)
-                    .orElseThrow(() -> new RestException(Status.NOT_FOUND, 
"Namespace does not exist"));
-
-            Map<String, Set<AuthAction>> permissions = Maps.newHashMap();
-            AuthPolicies auth = policies.auth_policies;
-
-            // First add namespace level permissions
-            auth.getNamespaceAuthentication().forEach(permissions::put);
-
-            // Then add topic level permissions
-            if (auth.getTopicAuthentication().containsKey(topicUri)) {
-                for (Map.Entry<String, Set<AuthAction>> entry :
-                        
auth.getTopicAuthentication().get(topicUri).entrySet()) {
-                    String role = entry.getKey();
-                    Set<AuthAction> topicPermissions = entry.getValue();
+        return validateAdminAccessForTenantAsync(namespaceName.getTenant())
+                .thenCompose(__ -> 
namespaceResources().getPoliciesAsync(namespaceName)
+            .thenApply(policies -> {
+                if (!policies.isPresent()) {
+                    throw new RestException(Status.NOT_FOUND, "Namespace does 
not exist");
+                }
 
-                    if (!permissions.containsKey(role)) {
-                        permissions.put(role, topicPermissions);
-                    } else {
-                        // Do the union between namespace and topic level
-                        Set<AuthAction> union = 
Sets.union(permissions.get(role), topicPermissions);
-                        permissions.put(role, union);
+                Map<String, Set<AuthAction>> permissions = Maps.newHashMap();
+                String topicUri = topicName.toString();
+                AuthPolicies auth = policies.get().auth_policies;
+                // First add namespace level permissions
+                auth.getNamespaceAuthentication().forEach(permissions::put);
+
+                // Then add topic level permissions
+                if (auth.getTopicAuthentication().containsKey(topicUri)) {
+                    for (Map.Entry<String, Set<AuthAction>> entry :
+                            
auth.getTopicAuthentication().get(topicUri).entrySet()) {
+                        String role = entry.getKey();
+                        Set<AuthAction> topicPermissions = entry.getValue();
+
+                        if (!permissions.containsKey(role)) {
+                            permissions.put(role, topicPermissions);
+                        } else {
+                            // Do the union between namespace and topic level
+                            Set<AuthAction> union = 
Sets.union(permissions.get(role), topicPermissions);
+                            permissions.put(role, union);
+                        }
                     }
                 }
-            }
-
-            return permissions;
-        } catch (Exception e) {
-            log.error("[{}] Failed to get permissions for topic {}", 
clientAppId(), topicUri, e);
-            throw new RestException(e);
-        }
+                return permissions;
+            }));
     }
 
     protected void validateCreateTopic(TopicName topicName) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index c24e0c34b40..61f785c5a34 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -25,7 +25,6 @@ import io.swagger.annotations.ApiParam;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import javax.ws.rs.DELETE;
@@ -107,11 +106,23 @@ public class PersistentTopics extends 
PersistentTopicsBase {
                     + "namespace level combined (union) with any eventual 
specific permission set on the topic.")
     @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have 
admin permission"),
             @ApiResponse(code = 404, message = "Namespace doesn't exist")})
-    public Map<String, Set<AuthAction>> 
getPermissionsOnTopic(@PathParam("property") String property,
-            @PathParam("cluster") String cluster, @PathParam("namespace") 
String namespace,
-            @PathParam("topic") @Encoded String encodedTopic) {
-        validateTopicName(property, cluster, namespace, encodedTopic);
-        return internalGetPermissionsOnTopic();
+    public void getPermissionsOnTopic(@Suspended AsyncResponse asyncResponse,
+                                                              
@PathParam("property") String property,
+                                                              
@PathParam("cluster") String cluster,
+                                                              
@PathParam("namespace") String namespace,
+                                                              
@PathParam("topic") @Encoded String encodedTopic) {
+        try {
+            validateTopicName(property, cluster, namespace, encodedTopic);
+            internalGetPermissionsOnTopic().thenAccept(permissions -> 
asyncResponse.resume(permissions))
+                    .exceptionally(ex -> {
+                        log.error("[{}] Failed to get permissions for topic 
{}", clientAppId(), topicName, ex);
+                        resumeAsyncResponseExceptionally(asyncResponse, ex);
+                        return null;
+                    });
+        } catch (Exception e) {
+            log.error("[{}] Failed to validate topic name {}", clientAppId(), 
topicName, e);
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+        }
     }
 
     @POST
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 3f12df47f4e..5b1544fd069 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -140,15 +140,26 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @ApiResponse(code = 404, message = "tenant/namespace/topic doesn't 
exit"),
             @ApiResponse(code = 412, message = "Topic name is not valid"),
             @ApiResponse(code = 500, message = "Internal server error")})
-    public Map<String, Set<AuthAction>> getPermissionsOnTopic(
+    public void getPermissionsOnTopic(
+            @Suspended final AsyncResponse asyncResponse,
             @ApiParam(value = "Specify the tenant", required = true)
             @PathParam("tenant") String tenant,
             @ApiParam(value = "Specify the namespace", required = true)
             @PathParam("namespace") String namespace,
             @ApiParam(value = "Specify topic name", required = true)
             @PathParam("topic") @Encoded String encodedTopic) {
-        validateTopicName(tenant, namespace, encodedTopic);
-        return internalGetPermissionsOnTopic();
+        try {
+            validateTopicName(tenant, namespace, encodedTopic);
+            internalGetPermissionsOnTopic().thenAccept(permissions -> 
asyncResponse.resume(permissions))
+                    .exceptionally(ex -> {
+                        log.error("[{}] Failed to get permissions for topic 
{}", clientAppId(), topicName, ex);
+                        resumeAsyncResponseExceptionally(asyncResponse, ex);
+                        return null;
+                    });
+        } catch (Exception e) {
+            log.error("[{}] Failed to validate topic name {}", clientAppId(), 
topicName, e);
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+        }
     }
 
     @POST
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index b6f2e3b8b8e..b0e6ecf984b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -788,8 +788,11 @@ public class AdminTest extends MockedPulsarServiceBaseTest 
{
         verify(response, 
timeout(5000).times(1)).resume(responseCaptor.capture());
         Assert.assertEquals(responseCaptor.getValue().getStatus(), 
Response.Status.NO_CONTENT.getStatusCode());
         // verify permission
-        Map<String, Set<AuthAction>> permission = 
persistentTopics.getPermissionsOnTopic(property, cluster,
-                namespace, topic);
+        response = mock(AsyncResponse.class);
+        responseCaptor = ArgumentCaptor.forClass(Response.class);
+        persistentTopics.getPermissionsOnTopic(response, property, cluster, 
namespace, topic);
+        verify(response, 
timeout(5000).times(1)).resume(responseCaptor.capture());
+        Map<String, Set<AuthAction>> permission = (Map<String, 
Set<AuthAction>>) responseCaptor.getValue();
         assertEquals(permission.get(role), actions);
         // remove permission
         response = mock(AsyncResponse.class);
@@ -799,7 +802,11 @@ public class AdminTest extends MockedPulsarServiceBaseTest 
{
         Assert.assertEquals(responseCaptor.getValue().getStatus(), 
Response.Status.NO_CONTENT.getStatusCode());
         // verify removed permission
         Awaitility.await().untilAsserted(() -> {
-            Map<String, Set<AuthAction>> p = 
persistentTopics.getPermissionsOnTopic(property, cluster, namespace, topic);
+            AsyncResponse response1 = mock(AsyncResponse.class);
+            ArgumentCaptor<Response> responseCaptor1 = 
ArgumentCaptor.forClass(Response.class);
+            persistentTopics.getPermissionsOnTopic(response1, property, 
cluster, namespace, topic);
+            verify(response1, 
timeout(5000).times(1)).resume(responseCaptor1.capture());
+            Map<String, Set<AuthAction>> p = (Map<String, Set<AuthAction>>) 
responseCaptor1.getValue();
             assertTrue(p.isEmpty());
         });
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 6c72e19437f..6426dcd6001 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -628,7 +628,11 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
         persistentTopics.grantPermissionsOnTopic(response, testTenant, 
testNamespace, topicName, role, expectActions);
         verify(response, 
timeout(5000).times(1)).resume(responseCaptor.capture());
         Assert.assertEquals(responseCaptor.getValue().getStatus(), 
Response.Status.NO_CONTENT.getStatusCode());
-        Map<String, Set<AuthAction>> permissions = 
persistentTopics.getPermissionsOnTopic(testTenant, testNamespace, topicName);
+        response = mock(AsyncResponse.class);
+        responseCaptor = ArgumentCaptor.forClass(Response.class);
+        persistentTopics.getPermissionsOnTopic(response, testTenant, 
testNamespace, topicName);
+        verify(response, 
timeout(5000).times(1)).resume(responseCaptor.capture());
+        Map<String, Set<AuthAction>> permissions = (Map<String, 
Set<AuthAction>>) responseCaptor.getValue();
         Assert.assertEquals(permissions.get(role), expectActions);
     }
 
@@ -669,15 +673,22 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
                 expectActions);
         verify(response, 
timeout(5000).times(1)).resume(responseCaptor.capture());
         Assert.assertEquals(responseCaptor.getValue().getStatus(), 
Response.Status.NO_CONTENT.getStatusCode());
-        Map<String, Set<AuthAction>> permissions = 
persistentTopics.getPermissionsOnTopic(testTenant, testNamespace,
-                partitionedTopicName);
+        response = mock(AsyncResponse.class);
+        responseCaptor = ArgumentCaptor.forClass(Response.class);
+        persistentTopics.getPermissionsOnTopic(response, testTenant, 
testNamespace, partitionedTopicName);
+        verify(response, 
timeout(5000).times(1)).resume(responseCaptor.capture());
+        Map<String, Set<AuthAction>> permissions = (Map<String, 
Set<AuthAction>>) responseCaptor.getValue();
         Assert.assertEquals(permissions.get(role), expectActions);
         TopicName topicName = TopicName.get(TopicDomain.persistent.value(), 
testTenant, testNamespace,
                 partitionedTopicName);
         for (int i = 0; i < numPartitions; i++) {
             TopicName partition = topicName.getPartition(i);
-            Map<String, Set<AuthAction>> partitionPermissions = 
persistentTopics.getPermissionsOnTopic(testTenant,
-                    testNamespace, partition.getEncodedLocalName());
+            response = mock(AsyncResponse.class);
+            responseCaptor = ArgumentCaptor.forClass(Response.class);
+            persistentTopics.getPermissionsOnTopic(response, testTenant, 
testNamespace, partition.getEncodedLocalName());
+            verify(response, 
timeout(5000).times(1)).resume(responseCaptor.capture());
+            Map<String, Set<AuthAction>> partitionPermissions =
+                    (Map<String, Set<AuthAction>>) responseCaptor.getValue();
             Assert.assertEquals(partitionPermissions.get(role), expectActions);
         }
     }
@@ -699,7 +710,11 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
         persistentTopics.revokePermissionsOnTopic(response, testTenant, 
testNamespace, topicName, role);
         verify(response, 
timeout(5000).times(1)).resume(responseCaptor.capture());
         Assert.assertEquals(responseCaptor.getValue().getStatus(), 
Response.Status.NO_CONTENT.getStatusCode());
-        Map<String, Set<AuthAction>> permissions = 
persistentTopics.getPermissionsOnTopic(testTenant, testNamespace, topicName);
+        response = mock(AsyncResponse.class);
+        responseCaptor = ArgumentCaptor.forClass(Response.class);
+        persistentTopics.getPermissionsOnTopic(response, testTenant, 
testNamespace, topicName);
+        verify(response, 
timeout(5000).times(1)).resume(responseCaptor.capture());
+        Map<String, Set<AuthAction>> permissions = (Map<String, 
Set<AuthAction>>) responseCaptor.getValue();
         Assert.assertEquals(permissions.get(role), null);
     }
 
@@ -727,16 +742,21 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
         responseCaptor = ArgumentCaptor.forClass(Response.class);
         verify(response, 
timeout(5000).times(1)).resume(responseCaptor.capture());
         Assert.assertEquals(responseCaptor.getValue().getStatus(), 
Response.Status.NO_CONTENT.getStatusCode());
-
-        Map<String, Set<AuthAction>> permissions = 
persistentTopics.getPermissionsOnTopic(testTenant, testNamespace,
-                partitionedTopicName);
+        response = mock(AsyncResponse.class);
+        responseCaptor = ArgumentCaptor.forClass(Response.class);
+        persistentTopics.getPermissionsOnTopic(response, testTenant, 
testNamespace, partitionedTopicName);
+        verify(response, 
timeout(5000).times(1)).resume(responseCaptor.capture());
+        Map<String, Set<AuthAction>> permissions = (Map<String, 
Set<AuthAction>>) responseCaptor.getValue();
         Assert.assertEquals(permissions.get(role), null);
         TopicName topicName = TopicName.get(TopicDomain.persistent.value(), 
testTenant, testNamespace,
                 partitionedTopicName);
         for (int i = 0; i < numPartitions; i++) {
             TopicName partition = topicName.getPartition(i);
-            Map<String, Set<AuthAction>> partitionPermissions = 
persistentTopics.getPermissionsOnTopic(testTenant,
-                    testNamespace, partition.getEncodedLocalName());
+            response = mock(AsyncResponse.class);
+            responseCaptor = ArgumentCaptor.forClass(Response.class);
+            persistentTopics.getPermissionsOnTopic(response, testTenant, 
testNamespace, partition.getEncodedLocalName());
+            verify(response, 
timeout(5000).times(1)).resume(responseCaptor.capture());
+            Map<String, Set<AuthAction>> partitionPermissions = (Map<String, 
Set<AuthAction>>) responseCaptor.getValue();
             Assert.assertEquals(partitionPermissions.get(role), null);
         }
     }

Reply via email to