This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5657797   refactor AuthorizationProvider interface to allow custom 
logic for determining super user (#3383)
5657797 is described below

commit 5657797e1ef75b882e85759bd3a5f52b70584d3f
Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com>
AuthorDate: Fri Jan 25 01:08:53 2019 -0600

     refactor AuthorizationProvider interface to allow custom logic for 
determining super user (#3383)
    
    * refactor AuthorizationProvider interface to allow custom logic for 
determining super user
    
    * remove redundant super user checks
    
    * addressing comments
    
    * simplifying code
    
    * cleaning up code
    
    * making API backwards compatible
    
    * fixing tests
---
 .../authorization/AuthorizationProvider.java       | 13 ++++-
 .../broker/authorization/AuthorizationService.java | 38 +++++++++++----
 .../authorization/PulsarAuthorizationProvider.java | 21 ++------
 .../pulsar/broker/web/PulsarWebResource.java       | 56 ++++++++++++++++------
 .../pulsar/broker/service/ServerCnxTest.java       |  6 +--
 .../api/AuthorizationProducerConsumerTest.java     |  9 ++++
 6 files changed, 98 insertions(+), 45 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
index 5d8b930..b25c789 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
@@ -36,9 +36,20 @@ import org.apache.pulsar.common.policies.data.AuthAction;
 public interface AuthorizationProvider extends Closeable {
 
     /**
+     * Check if specified role is a super user
+     * @param role the role to check
+     * @return a CompletableFuture containing a boolean in which true means 
the role is a super user
+     * and false if it is not
+     */
+    default CompletableFuture<Boolean> isSuperUser(String role, 
ServiceConfiguration serviceConfiguration) {
+        Set<String> superUserRoles = serviceConfiguration.getSuperUserRoles();
+        return CompletableFuture.completedFuture(role != null && 
superUserRoles.contains(role) ? true : false);
+    }
+
+    /**
      * Perform initialization for the authorization provider
      *
-     * @param config
+     * @param conf
      *            broker config object
      * @param configCache
      *            pulsar zk configuration cache service
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
index 95d012f..42d935c 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
@@ -18,12 +18,6 @@
  */
 package org.apache.pulsar.broker.authorization;
 
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.pulsar.zookeeper.ZooKeeperCache.cacheTimeOutInSec;
-
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.ServiceConfiguration;
@@ -36,6 +30,14 @@ import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.function.Function;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.pulsar.zookeeper.ZooKeeperCache.cacheTimeOutInSec;
+
 /**
  * Authorization service that manages pluggable authorization provider and 
authorize requests accordingly.
  *
@@ -70,6 +72,13 @@ public class AuthorizationService {
         }
     }
 
+    public CompletableFuture<Boolean> isSuperUser(String user) {
+        if (provider != null) {
+           return provider.isSuperUser(user, conf);
+        }
+        return FutureUtil.failedFuture(new IllegalStateException("No 
authorization provider configured"));
+    }
+
     /**
      *
      * Grant authorization-action permission on a namespace to the given client
@@ -164,9 +173,14 @@ public class AuthorizationService {
         if (!this.conf.isAuthorizationEnabled()) {
             return CompletableFuture.completedFuture(true);
         }
-
         if (provider != null) {
-            return provider.canProduceAsync(topicName, role, 
authenticationData);
+            return provider.isSuperUser(role, 
conf).thenComposeAsync(isSuperUser -> {
+                if (isSuperUser) {
+                    return CompletableFuture.completedFuture(true);
+                } else {
+                    return provider.canProduceAsync(topicName, role, 
authenticationData);
+                }
+            });
         }
         return FutureUtil.failedFuture(new IllegalStateException("No 
authorization provider configured"));
     }
@@ -187,7 +201,13 @@ public class AuthorizationService {
             return CompletableFuture.completedFuture(true);
         }
         if (provider != null) {
-            return provider.canConsumeAsync(topicName, role, 
authenticationData, subscription);
+            return provider.isSuperUser(role, 
conf).thenComposeAsync(isSuperUser -> {
+                if (isSuperUser) {
+                    return CompletableFuture.completedFuture(true);
+                } else {
+                    return provider.canConsumeAsync(topicName, role, 
authenticationData, subscription);
+                }
+            });
         }
         return FutureUtil.failedFuture(new IllegalStateException("No 
authorization provider configured"));
     }
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index 229efec..7914168 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -27,6 +27,7 @@ import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.ServiceConfiguration;
@@ -112,7 +113,7 @@ public class PulsarAuthorizationProvider implements 
AuthorizationProvider {
                         log.debug("Policies node couldn't be found for topic : 
{}", topicName);
                     }
                 } else {
-                    if (isNotBlank(subscription) && !isSuperUser(role)) {
+                    if (isNotBlank(subscription)) {
                         // validate if role is authorize to access 
subscription. (skip validatation if authorization
                         // list is empty)
                         Set<String> roles = 
policies.get().auth_policies.subscription_auth_roles.get(subscription);
@@ -323,12 +324,8 @@ public class PulsarAuthorizationProvider implements 
AuthorizationProvider {
     }
 
     private CompletableFuture<Boolean> checkAuthorization(TopicName topicName, 
String role, AuthAction action) {
-        if (isSuperUser(role)) {
-            return CompletableFuture.completedFuture(true);
-        } else {
-            return checkPermission(topicName, role, action)
-                    .thenApply(isPermission -> isPermission && 
checkCluster(topicName));
-        }
+        return checkPermission(topicName, role, action)
+                .thenApply(isPermission -> isPermission && 
checkCluster(topicName));
     }
 
     private boolean checkCluster(TopicName topicName) {
@@ -424,16 +421,6 @@ public class PulsarAuthorizationProvider implements 
AuthorizationProvider {
         return false;
     }
 
-    /**
-     * Super user roles are allowed to do anything, used for replication 
primarily
-     *
-     * @param role
-     *            the app id used to receive messages from the topic.
-     */
-    public boolean isSuperUser(String role) {
-        Set<String> superUserRoles = conf.getSuperUserRoles();
-        return role != null && superUserRoles.contains(role) ? true : false;
-    }
 
     @Override
     public void close() throws IOException {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 81e5a40..dc631d6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -32,6 +32,7 @@ import java.util.List;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 
 import javax.servlet.ServletContext;
 import javax.servlet.http.HttpServletRequest;
@@ -176,14 +177,26 @@ public abstract class PulsarWebResource {
             
validateOriginalPrincipal(pulsar.getConfiguration().getProxyRoles(), appId, 
originalPrincipal);
 
             if (pulsar.getConfiguration().getProxyRoles().contains(appId)) {
-                Set<String> superUserRoles = 
pulsar.getConfiguration().getSuperUserRoles();
-                boolean proxyAuthorized = superUserRoles.contains(appId);
-                boolean originalPrincipalAuthorized = 
superUserRoles.contains(originalPrincipal);
 
-                if (!proxyAuthorized || !originalPrincipalAuthorized) {
-                    throw new RestException(Status.UNAUTHORIZED,
-                            String.format("Proxy not authorized for super-user 
operation (proxy:%s,original:%s)",
-                                          appId, originalPrincipal));
+                CompletableFuture<Boolean> proxyAuthorizedFuture;
+                CompletableFuture<Boolean> originalPrincipalAuthorizedFuture;
+
+                try {
+                    proxyAuthorizedFuture = pulsar.getBrokerService()
+                            .getAuthorizationService()
+                            .isSuperUser(appId);
+
+                    originalPrincipalAuthorizedFuture = 
pulsar.getBrokerService()
+                            .getAuthorizationService()
+                            .isSuperUser(originalPrincipal);
+
+                    if (!proxyAuthorizedFuture.get() || 
!originalPrincipalAuthorizedFuture.get()) {
+                        throw new RestException(Status.UNAUTHORIZED,
+                                String.format("Proxy not authorized for 
super-user operation (proxy:%s,original:%s)",
+                                              appId, originalPrincipal));
+                    }
+                } catch (InterruptedException | ExecutionException e) {
+                    throw new RestException(Status.INTERNAL_SERVER_ERROR, 
e.getMessage());
                 }
                 log.debug("Successfully authorized {} (proxied by {}) as 
super-user",
                           originalPrincipal, appId);
@@ -238,16 +251,29 @@ public abstract class PulsarWebResource {
             
validateOriginalPrincipal(pulsar.getConfiguration().getProxyRoles(), 
clientAppId, originalPrincipal);
 
             if 
(pulsar.getConfiguration().getProxyRoles().contains(clientAppId)) {
-                Set<String> superUserRoles = 
pulsar.getConfiguration().getSuperUserRoles();
+
+                CompletableFuture<Boolean> isProxySuperUserFuture;
+                CompletableFuture<Boolean> isOriginalPrincipalSuperUserFuture;
+                try {
+                    isProxySuperUserFuture = pulsar.getBrokerService()
+                            .getAuthorizationService()
+                            .isSuperUser(clientAppId);
+
+                    isOriginalPrincipalSuperUserFuture = 
pulsar.getBrokerService()
+                            .getAuthorizationService()
+                            .isSuperUser(originalPrincipal);
+
                 Set<String> adminRoles = tenantInfo.getAdminRoles();
-                boolean proxyAuthorized = superUserRoles.contains(clientAppId) 
|| adminRoles.contains(clientAppId);
+                boolean proxyAuthorized = isProxySuperUserFuture.get() || 
adminRoles.contains(clientAppId);
                 boolean originalPrincipalAuthorized
-                    = superUserRoles.contains(originalPrincipal) || 
adminRoles.contains(originalPrincipal);
-
-                if (!proxyAuthorized || !originalPrincipalAuthorized) {
-                    throw new RestException(Status.UNAUTHORIZED,
-                            String.format("Proxy not authorized to access 
resource (proxy:%s,original:%s)",
-                                          clientAppId, originalPrincipal));
+                    = isOriginalPrincipalSuperUserFuture.get() || 
adminRoles.contains(originalPrincipal);
+                    if (!proxyAuthorized || !originalPrincipalAuthorized) {
+                        throw new RestException(Status.UNAUTHORIZED,
+                                String.format("Proxy not authorized to access 
resource (proxy:%s,original:%s)",
+                                              clientAppId, originalPrincipal));
+                    }
+                } catch (InterruptedException | ExecutionException e) {
+                    throw new RestException(Status.INTERNAL_SERVER_ERROR, 
e.getMessage());
                 }
                 log.debug("Successfully authorized {} (proxied by {}) on 
tenant {}",
                           originalPrincipal, clientAppId, tenant);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 1145321..545817f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -490,7 +490,7 @@ public class ServerCnxTest {
         providerField.setAccessible(true);
         PulsarAuthorizationProvider authorizationProvider = spy(new 
PulsarAuthorizationProvider(svcConfig, configCacheService));
         providerField.set(authorizationService, authorizationProvider);
-        
doReturn(false).when(authorizationProvider).isSuperUser(Mockito.anyString());
+        
doReturn(CompletableFuture.completedFuture(false)).when(authorizationProvider).isSuperUser(Mockito.anyString(),
 Mockito.any());
 
         // Test producer creation
         resetChannel();
@@ -520,7 +520,7 @@ public class ServerCnxTest {
         providerField.set(authorizationService, authorizationProvider);
         
doReturn(authorizationService).when(brokerService).getAuthorizationService();
         doReturn(true).when(brokerService).isAuthorizationEnabled();
-        
doReturn(false).when(authorizationProvider).isSuperUser(Mockito.anyString());
+        
doReturn(CompletableFuture.completedFuture(false)).when(authorizationProvider).isSuperUser(Mockito.anyString(),
  Mockito.any());
         
doReturn(CompletableFuture.completedFuture(true)).when(authorizationProvider).checkPermission(any(TopicName.class),
 Mockito.anyString(),
                 any(AuthAction.class));
 
@@ -548,7 +548,7 @@ public class ServerCnxTest {
         providerField.setAccessible(true);
         PulsarAuthorizationProvider authorizationProvider = spy(new 
PulsarAuthorizationProvider(svcConfig, configCacheService));
         providerField.set(authorizationService, authorizationProvider);
-        
doReturn(true).when(authorizationProvider).isSuperUser(Mockito.anyString());
+        
doReturn(CompletableFuture.completedFuture(true)).when(authorizationProvider).isSuperUser(Mockito.anyString(),
  Mockito.any());
 
         // Test producer creation
         resetChannel();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
index 033c638..72838e8 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
@@ -402,13 +402,22 @@ public class AuthorizationProducerConsumerTest extends 
ProducerConsumerBase {
 
     public static class TestAuthorizationProvider implements 
AuthorizationProvider {
 
+        public ServiceConfiguration conf;
+
         @Override
         public void close() throws IOException {
             // No-op
         }
 
         @Override
+        public CompletableFuture<Boolean> isSuperUser(String role, 
ServiceConfiguration serviceConfiguration) {
+            Set<String> superUserRoles = 
serviceConfiguration.getSuperUserRoles();
+            return CompletableFuture.completedFuture(role != null && 
superUserRoles.contains(role) ? true : false);
+        }
+
+        @Override
         public void initialize(ServiceConfiguration conf, 
ConfigurationCacheService configCache) throws IOException {
+            this.conf = conf;
             // No-op
         }
 

Reply via email to