This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new fe7ee88  [Issue 10221] Fix authorization error while using proxy and 
`Prefix` subscription authentication mode (#10226)
fe7ee88 is described below

commit fe7ee885a153c105a41eb215a4673597b81286d2
Author: Shen Liu <liushen...@126.com>
AuthorDate: Mon Apr 19 11:11:09 2021 +0800

    [Issue 10221] Fix authorization error while using proxy and `Prefix` 
subscription authentication mode (#10226)
    
    Fixes #10221
    
    ### Motivation
    If using pulsar proxy and `Prefix` subscription authentication mode, 
[org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider#canConsumeAsync](https://github.com/apache/pulsar/blob/master/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java#L135)
 will throw exception which cause the consumer error.
    
    ### Modifications
    
    Update the 
`org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider#allowTopicOperationAsync`
 logic, check `isSuperUser` first, then return `isAuthorizedFuture`.
    
    ### Verifying this change
    
    - [x] Make sure that the change passes the CI checks.
    
    (cherry picked from commit 889b9b8e5efc62d2d0cbc761205fba5759c97af0)
---
 .../authorization/PulsarAuthorizationProvider.java |  18 +-
 .../server/ProxyWithJwtAuthorizationTest.java      | 286 +++++++++++++++++++++
 2 files changed, 297 insertions(+), 7 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index ca7ca9e..e20c89c 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -564,20 +564,24 @@ public class PulsarAuthorizationProvider implements 
AuthorizationProvider {
                 break;
             case CONSUME: isAuthorizedFuture = canConsumeAsync(topicName, 
role, authData, authData.getSubscription());
                 break;
-            default: isAuthorizedFuture = FutureUtil.failedFuture(
-                    new IllegalStateException("TopicOperation is not 
supported."));
+            default:
+                return FutureUtil.failedFuture(new 
IllegalStateException("TopicOperation is not supported."));
         }
 
         CompletableFuture<Boolean> isSuperUserFuture = isSuperUser(role, 
authData, conf);
 
+        // check isSuperUser first
         return isSuperUserFuture
-                .thenCombine(isAuthorizedFuture, (isSuperUser, isAuthorized) 
-> {
+                .thenCompose(isSuperUser -> {
                     if (log.isDebugEnabled()) {
-                        log.debug("Verify if role {} is allowed to {} to topic 
{}:"
-                                + " isSuperUser={}, isAuthorized={}",
-                            role, operation, topicName, isSuperUser, 
isAuthorized);
+                        log.debug("Verify if role {} is allowed to {} to topic 
{}: isSuperUser={}",
+                                role, operation, topicName, isSuperUser);
+                    }
+                    if (isSuperUser) {
+                        return CompletableFuture.completedFuture(true);
+                    } else {
+                        return isAuthorizedFuture;
                     }
-                    return isSuperUser || isAuthorized;
                 });
     }
 
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
new file mode 100644
index 0000000..f683adf
--- /dev/null
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
@@ -0,0 +1,286 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import static org.mockito.Mockito.spy;
+
+import com.google.common.collect.Sets;
+
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+import io.jsonwebtoken.Jwts;
+import io.jsonwebtoken.SignatureAlgorithm;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import javax.crypto.SecretKey;
+
+public class ProxyWithJwtAuthorizationTest extends ProducerConsumerBase {
+    private static final Logger log = 
LoggerFactory.getLogger(ProxyWithAuthorizationTest.class);
+
+    private final String ADMIN_ROLE = "admin";
+    private final String PROXY_ROLE = "proxy";
+    private final String BROKER_ROLE = "broker";
+    private final String CLIENT_ROLE = "client";
+    private final SecretKey SECRET_KEY = 
AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+
+    private final String ADMIN_TOKEN = 
Jwts.builder().setSubject(ADMIN_ROLE).signWith(SECRET_KEY).compact();
+    private final String PROXY_TOKEN = 
Jwts.builder().setSubject(PROXY_ROLE).signWith(SECRET_KEY).compact();
+    private final String BROKER_TOKEN = 
Jwts.builder().setSubject(BROKER_ROLE).signWith(SECRET_KEY).compact();
+    private final String CLIENT_TOKEN = 
Jwts.builder().setSubject(CLIENT_ROLE).signWith(SECRET_KEY).compact();
+
+    private ProxyService proxyService;
+    private final ProxyConfiguration proxyConfig = new ProxyConfiguration();
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        // enable auth&auth and use JWT at broker
+        conf.setAuthenticationEnabled(true);
+        conf.setAuthorizationEnabled(true);
+        conf.getProperties().setProperty("tokenSecretKey", "data:;base64," + 
Base64.getEncoder().encodeToString(SECRET_KEY.getEncoded()));
+
+        Set<String> superUserRoles = new HashSet<>();
+        superUserRoles.add(ADMIN_ROLE);
+        superUserRoles.add(PROXY_ROLE);
+        superUserRoles.add(BROKER_ROLE);
+        conf.setSuperUserRoles(superUserRoles);
+
+        
conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
+        conf.setBrokerClientAuthenticationParameters(BROKER_TOKEN);
+        Set<String> providers = new HashSet<>();
+        providers.add(AuthenticationProviderToken.class.getName());
+        conf.setAuthenticationProviders(providers);
+
+        conf.setClusterName("proxy-authorization");
+        conf.setNumExecutorThreadPoolSize(5);
+
+        super.init();
+
+        // start proxy service
+        proxyConfig.setAuthenticationEnabled(true);
+        proxyConfig.setAuthorizationEnabled(false);
+        proxyConfig.getProperties().setProperty("tokenSecretKey", 
"data:;base64," + Base64.getEncoder().encodeToString(SECRET_KEY.getEncoded()));
+        proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
+
+        proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setWebServicePort(Optional.of(0));
+
+        // enable auth&auth and use JWT at proxy
+        
proxyConfig.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
+        proxyConfig.setBrokerClientAuthenticationParameters(PROXY_TOKEN);
+        proxyConfig.setAuthenticationProviders(providers);
+
+        proxyService = Mockito.spy(new ProxyService(proxyConfig,
+                new AuthenticationService(
+                        PulsarConfigurationLoader.convertFrom(proxyConfig))));
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+        proxyService.close();
+    }
+
+    private void startProxy() throws Exception {
+        proxyService.start();
+    }
+
+    /**
+     * <pre>
+     * It verifies jwt + Authentication + Authorization (client -> proxy -> 
broker).
+     *
+     * 1. client connects to proxy over jwt and pass auth-data
+     * 2. proxy authenticate client and retrieve client-role
+     *    and send it to broker as originalPrincipal over jwt
+     * 3. client creates producer/consumer via proxy
+     * 4. broker authorize producer/consumer create request using 
originalPrincipal
+     *
+     * </pre>
+     */
+    @Test
+    public void testProxyAuthorization() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        startProxy();
+        createAdminClient();
+        PulsarClient proxyClient = 
createPulsarClient(proxyService.getServiceUrl(), PulsarClient.builder());
+
+        String namespaceName = "my-property/proxy-authorization/my-ns";
+
+        admin.clusters().createCluster("proxy-authorization", new 
ClusterData(brokerUrl.toString()));
+
+        admin.tenants().createTenant("my-property",
+                new TenantInfo(Sets.newHashSet("appid1", "appid2"), 
Sets.newHashSet("proxy-authorization")));
+        admin.namespaces().createNamespace(namespaceName);
+
+        Consumer<byte[]> consumer;
+        try {
+            consumer = proxyClient.newConsumer()
+                    
.topic("persistent://my-property/proxy-authorization/my-ns/my-topic1")
+                    .subscriptionName("my-subscriber-name").subscribe();
+            Assert.fail("should have failed with authorization error");
+        } catch (Exception ex) {
+            // excepted
+            admin.namespaces().grantPermissionOnNamespace(namespaceName, 
CLIENT_ROLE,
+                    Sets.newHashSet(AuthAction.consume));
+            log.info("-- Admin permissions {} ---", 
admin.namespaces().getPermissions(namespaceName));
+            consumer = proxyClient.newConsumer()
+                    
.topic("persistent://my-property/proxy-authorization/my-ns/my-topic1")
+                    .subscriptionName("my-subscriber-name").subscribe();
+        }
+
+        Producer<byte[]> producer;
+        try {
+            producer = proxyClient.newProducer(Schema.BYTES)
+                    
.topic("persistent://my-property/proxy-authorization/my-ns/my-topic1").create();
+            Assert.fail("should have failed with authorization error");
+        } catch (Exception ex) {
+            // excepted
+            admin.namespaces().grantPermissionOnNamespace(namespaceName, 
CLIENT_ROLE,
+                    Sets.newHashSet(AuthAction.produce, AuthAction.consume));
+            log.info("-- Admin permissions {} ---", 
admin.namespaces().getPermissions(namespaceName));
+            producer = proxyClient.newProducer(Schema.BYTES)
+                    
.topic("persistent://my-property/proxy-authorization/my-ns/my-topic1").create();
+        }
+        final int msgs = 10;
+        for (int i = 0; i < msgs; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+
+        Message<byte[]> msg = null;
+        Set<String> messageSet = Sets.newHashSet();
+        int count = 0;
+        for (int i = 0; i < 10; i++) {
+            msg = consumer.receive(5, TimeUnit.SECONDS);
+            String receivedMessage = new String(msg.getData());
+            log.debug("Received message: [{}]", receivedMessage);
+            String expectedMessage = "my-message-" + i;
+            testMessageOrderAndDuplicates(messageSet, receivedMessage, 
expectedMessage);
+            count++;
+        }
+        // Acknowledge the consumption of all messages at once
+        Assert.assertEquals(msgs, count);
+        consumer.acknowledgeCumulative(msg);
+        consumer.close();
+        log.info("-- Exiting {} test --", methodName);
+    }
+
+    /**
+     * <pre>
+     * It verifies jwt + Authentication + Authorization (client -> proxy -> 
broker).
+     * It also test `SubscriptionAuthMode.Prefix` mode.
+     *
+     * 1. client connects to proxy over jwt and pass auth-data
+     * 2. proxy authenticate client and retrieve client-role
+     *    and send it to broker as originalPrincipal over jwt
+     * 3. client creates producer/consumer via proxy
+     * 4. broker authorize producer/consumer create request using 
originalPrincipal
+     *
+     * </pre>
+     */
+    @Test
+    public void testProxyAuthorizationWithPrefixSubscriptionAuthMode() throws 
Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        startProxy();
+        createAdminClient();
+        PulsarClient proxyClient = 
createPulsarClient(proxyService.getServiceUrl(), PulsarClient.builder());
+
+        String namespaceName = "my-property/proxy-authorization/my-ns";
+
+        admin.clusters().createCluster("proxy-authorization", new 
ClusterData(brokerUrl.toString()));
+
+        admin.tenants().createTenant("my-property",
+                new TenantInfo(Sets.newHashSet("appid1", "appid2"), 
Sets.newHashSet("proxy-authorization")));
+        admin.namespaces().createNamespace(namespaceName);
+        admin.namespaces().grantPermissionOnNamespace(namespaceName, 
CLIENT_ROLE,
+                Sets.newHashSet(AuthAction.produce, AuthAction.consume));
+        admin.namespaces().setSubscriptionAuthMode(namespaceName, 
SubscriptionAuthMode.Prefix);
+
+        Consumer<byte[]> consumer;
+        try {
+            consumer = proxyClient.newConsumer()
+                    
.topic("persistent://my-property/proxy-authorization/my-ns/my-topic1")
+                    .subscriptionName("my-subscriber-name").subscribe();
+            Assert.fail("should have failed with authorization error");
+        } catch (Exception ex) {
+            // excepted
+            consumer = proxyClient.newConsumer()
+                    
.topic("persistent://my-property/proxy-authorization/my-ns/my-topic1")
+                    .subscriptionName(CLIENT_ROLE + "-sub1").subscribe();
+        }
+
+        Producer<byte[]> producer = proxyClient.newProducer(Schema.BYTES)
+                
.topic("persistent://my-property/proxy-authorization/my-ns/my-topic1").create();
+        final int msgs = 10;
+        for (int i = 0; i < msgs; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+
+        Message<byte[]> msg = null;
+        Set<String> messageSet = Sets.newHashSet();
+        int count = 0;
+        for (int i = 0; i < 10; i++) {
+            msg = consumer.receive(5, TimeUnit.SECONDS);
+            String receivedMessage = new String(msg.getData());
+            log.debug("Received message: [{}]", receivedMessage);
+            String expectedMessage = "my-message-" + i;
+            testMessageOrderAndDuplicates(messageSet, receivedMessage, 
expectedMessage);
+            count++;
+        }
+        // Acknowledge the consumption of all messages at once
+        Assert.assertEquals(msgs, count);
+        consumer.acknowledgeCumulative(msg);
+        consumer.close();
+        log.info("-- Exiting {} test --", methodName);
+    }
+
+    private void createAdminClient() throws Exception {
+        admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString())
+                
.authentication(AuthenticationFactory.token(ADMIN_TOKEN)).build());
+    }
+
+    private PulsarClient createPulsarClient(String proxyServiceUrl, 
ClientBuilder clientBuilder)
+            throws PulsarClientException {
+        return clientBuilder.serviceUrl(proxyServiceUrl).statsInterval(0, 
TimeUnit.SECONDS)
+                .authentication(AuthenticationFactory.token(CLIENT_TOKEN))
+                .operationTimeout(1000, TimeUnit.MILLISECONDS).build();
+    }
+}

Reply via email to