This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new de06bd4  Fix bug that consumer which specify incorrect subscription 
hangs up w… (#1256)
de06bd4 is described below

commit de06bd433fc501b98fd5e57404ba9bba7325ff29
Author: massakam <massa...@yahoo-corp.jp>
AuthorDate: Wed Feb 21 02:53:45 2018 +0900

    Fix bug that consumer which specify incorrect subscription hangs up w… 
(#1256)
    
    * Fix bug that consumer which specify incorrect subscription hangs up when 
subscription_auth_mode is Prefix
    
    * Add test for subscription prefix authorization
---
 .../apache/pulsar/broker/service/ServerCnx.java    | 10 ++++
 .../api/AuthorizationProducerConsumerTest.java     | 60 ++++++++++++++++++++++
 2 files changed, 70 insertions(+)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index cb473c8..b951e39 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -669,6 +669,11 @@ public class ServerCnx extends PulsarHandler {
                         ctx.writeAndFlush(Commands.newError(requestId, 
ServerError.AuthorizationError, msg));
                     }
                     return null;
+                }).exceptionally(e -> {
+                    String msg = String.format("[%s] %s with role %s", 
remoteAddress, e.getMessage(), authRole);
+                    log.warn(msg);
+                    ctx.writeAndFlush(Commands.newError(requestId, 
ServerError.AuthorizationError, e.getMessage()));
+                    return null;
                 });
             } else {
                 final String msg = "Proxy Client is not authorized to 
subscribe";
@@ -855,6 +860,11 @@ public class ServerCnx extends PulsarHandler {
                         ctx.writeAndFlush(Commands.newError(requestId, 
ServerError.AuthorizationError, msg));
                     }
                     return null;
+                }).exceptionally(e -> {
+                    String msg = String.format("[%s] %s with role %s", 
remoteAddress, e.getMessage(), authRole);
+                    log.warn(msg);
+                    ctx.writeAndFlush(Commands.newError(requestId, 
ServerError.AuthorizationError, e.getMessage()));
+                    return null;
                 });
             } else {
                 final String msg = "Proxy Client is not authorized to Produce";
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
index 4f4fcd6..465cc6d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.api;
 
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.mockito.Mockito.spy;
 
 import java.io.IOException;
@@ -29,6 +30,7 @@ import java.util.concurrent.CompletableFuture;
 
 import javax.naming.AuthenticationException;
 
+import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
@@ -148,6 +150,46 @@ public class AuthorizationProducerConsumerTest extends 
ProducerConsumerBase {
     }
 
     @Test
+    public void testSubscriptionPrefixAuthorization() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        
conf.setAuthorizationProvider(TestAuthorizationProviderWithSubscriptionPrefix.class.getName());
+        setup();
+
+        ClientConfiguration adminConf = new ClientConfiguration();
+        Authentication adminAuthentication = new 
ClientAuthentication("superUser");
+        adminConf.setAuthentication(adminAuthentication);
+        admin = spy(new PulsarAdmin(brokerUrl, adminConf));
+
+        String lookupUrl;
+        lookupUrl = new URI("pulsar://localhost:" + BROKER_PORT).toString();
+
+        ClientConfiguration clientConfValid = new ClientConfiguration();
+        Authentication authentication = new ClientAuthentication(clientRole);
+        clientConfValid.setAuthentication(authentication);
+
+        pulsarClient = PulsarClient.create(lookupUrl, clientConfValid);
+
+        admin.properties().createProperty("prop-prefix",
+                new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), 
Sets.newHashSet("use")));
+        admin.namespaces().createNamespace("prop-prefix/use/ns");
+
+        // (1) Valid subscription name will be approved by authorization 
service
+        Consumer consumer = 
pulsarClient.subscribe("persistent://prop-prefix/use/ns/t1", clientRole + 
"-sub1");
+        consumer.close();
+
+        // (2) InValid subscription name will be rejected by authorization 
service
+        try {
+            consumer = 
pulsarClient.subscribe("persistent://prop-prefix/use/ns/t1", "sub1");
+            Assert.fail("should have failed with authorization error");
+        } catch (PulsarClientException.AuthorizationException pa) {
+            // Ok
+        }
+
+        log.info("-- Exiting {} test --", methodName);
+    }
+
+    @Test
     public void testGrantPermission() throws Exception {
         log.info("-- Starting {} test --", methodName);
 
@@ -337,6 +379,24 @@ public class AuthorizationProducerConsumerTest extends 
ProducerConsumerBase {
         }
     }
 
+    public static class TestAuthorizationProviderWithSubscriptionPrefix 
extends TestAuthorizationProvider {
+
+        @Override
+        public CompletableFuture<Boolean> canConsumeAsync(DestinationName 
destination, String role,
+                AuthenticationDataSource authenticationData, String 
subscription) {
+            CompletableFuture<Boolean> future = new CompletableFuture<>();
+            if (isNotBlank(subscription)) {
+                if (!subscription.startsWith(role)) {
+                    future.completeExceptionally(new PulsarServerException(
+                            "The subscription name needs to be prefixed by the 
authentication role"));
+                }
+            }
+            future.complete(clientRole.equals(role));
+            return future;
+        }
+
+    }
+
     public static class TestAuthorizationProviderWithGrantPermission extends 
TestAuthorizationProvider {
 
         private Set<String> grantRoles = Sets.newHashSet();

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.

Reply via email to