This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push: new fe7ee88 [Issue 10221] Fix authorization error while using proxy and `Prefix` subscription authentication mode (#10226) fe7ee88 is described below commit fe7ee885a153c105a41eb215a4673597b81286d2 Author: Shen Liu <liushen...@126.com> AuthorDate: Mon Apr 19 11:11:09 2021 +0800 [Issue 10221] Fix authorization error while using proxy and `Prefix` subscription authentication mode (#10226) Fixes #10221 ### Motivation If using pulsar proxy and `Prefix` subscription authentication mode, [org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider#canConsumeAsync](https://github.com/apache/pulsar/blob/master/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java#L135) will throw exception which cause the consumer error. ### Modifications Update the `org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider#allowTopicOperationAsync` logic, check `isSuperUser` first, then return `isAuthorizedFuture`. ### Verifying this change - [x] Make sure that the change passes the CI checks. (cherry picked from commit 889b9b8e5efc62d2d0cbc761205fba5759c97af0) --- .../authorization/PulsarAuthorizationProvider.java | 18 +- .../server/ProxyWithJwtAuthorizationTest.java | 286 +++++++++++++++++++++ 2 files changed, 297 insertions(+), 7 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java index ca7ca9e..e20c89c 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java @@ -564,20 +564,24 @@ public class PulsarAuthorizationProvider implements AuthorizationProvider { break; case CONSUME: isAuthorizedFuture = canConsumeAsync(topicName, role, authData, authData.getSubscription()); break; - default: isAuthorizedFuture = FutureUtil.failedFuture( - new IllegalStateException("TopicOperation is not supported.")); + default: + return FutureUtil.failedFuture(new IllegalStateException("TopicOperation is not supported.")); } CompletableFuture<Boolean> isSuperUserFuture = isSuperUser(role, authData, conf); + // check isSuperUser first return isSuperUserFuture - .thenCombine(isAuthorizedFuture, (isSuperUser, isAuthorized) -> { + .thenCompose(isSuperUser -> { if (log.isDebugEnabled()) { - log.debug("Verify if role {} is allowed to {} to topic {}:" - + " isSuperUser={}, isAuthorized={}", - role, operation, topicName, isSuperUser, isAuthorized); + log.debug("Verify if role {} is allowed to {} to topic {}: isSuperUser={}", + role, operation, topicName, isSuperUser); + } + if (isSuperUser) { + return CompletableFuture.completedFuture(true); + } else { + return isAuthorizedFuture; } - return isSuperUser || isAuthorized; }); } diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java new file mode 100644 index 0000000..f683adf --- /dev/null +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java @@ -0,0 +1,286 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.proxy.server; + +import static org.mockito.Mockito.spy; + +import com.google.common.collect.Sets; + +import java.util.*; +import java.util.concurrent.TimeUnit; + +import io.jsonwebtoken.Jwts; +import io.jsonwebtoken.SignatureAlgorithm; +import org.apache.pulsar.broker.authentication.AuthenticationProviderToken; +import org.apache.pulsar.broker.authentication.AuthenticationService; +import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.*; +import org.apache.pulsar.client.impl.auth.AuthenticationToken; +import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; +import org.apache.pulsar.common.policies.data.AuthAction; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.SubscriptionAuthMode; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import javax.crypto.SecretKey; + +public class ProxyWithJwtAuthorizationTest extends ProducerConsumerBase { + private static final Logger log = LoggerFactory.getLogger(ProxyWithAuthorizationTest.class); + + private final String ADMIN_ROLE = "admin"; + private final String PROXY_ROLE = "proxy"; + private final String BROKER_ROLE = "broker"; + private final String CLIENT_ROLE = "client"; + private final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256); + + private final String ADMIN_TOKEN = Jwts.builder().setSubject(ADMIN_ROLE).signWith(SECRET_KEY).compact(); + private final String PROXY_TOKEN = Jwts.builder().setSubject(PROXY_ROLE).signWith(SECRET_KEY).compact(); + private final String BROKER_TOKEN = Jwts.builder().setSubject(BROKER_ROLE).signWith(SECRET_KEY).compact(); + private final String CLIENT_TOKEN = Jwts.builder().setSubject(CLIENT_ROLE).signWith(SECRET_KEY).compact(); + + private ProxyService proxyService; + private final ProxyConfiguration proxyConfig = new ProxyConfiguration(); + + @BeforeMethod + @Override + protected void setup() throws Exception { + // enable auth&auth and use JWT at broker + conf.setAuthenticationEnabled(true); + conf.setAuthorizationEnabled(true); + conf.getProperties().setProperty("tokenSecretKey", "data:;base64," + Base64.getEncoder().encodeToString(SECRET_KEY.getEncoded())); + + Set<String> superUserRoles = new HashSet<>(); + superUserRoles.add(ADMIN_ROLE); + superUserRoles.add(PROXY_ROLE); + superUserRoles.add(BROKER_ROLE); + conf.setSuperUserRoles(superUserRoles); + + conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName()); + conf.setBrokerClientAuthenticationParameters(BROKER_TOKEN); + Set<String> providers = new HashSet<>(); + providers.add(AuthenticationProviderToken.class.getName()); + conf.setAuthenticationProviders(providers); + + conf.setClusterName("proxy-authorization"); + conf.setNumExecutorThreadPoolSize(5); + + super.init(); + + // start proxy service + proxyConfig.setAuthenticationEnabled(true); + proxyConfig.setAuthorizationEnabled(false); + proxyConfig.getProperties().setProperty("tokenSecretKey", "data:;base64," + Base64.getEncoder().encodeToString(SECRET_KEY.getEncoded())); + proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl()); + + proxyConfig.setServicePort(Optional.of(0)); + proxyConfig.setWebServicePort(Optional.of(0)); + + // enable auth&auth and use JWT at proxy + proxyConfig.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName()); + proxyConfig.setBrokerClientAuthenticationParameters(PROXY_TOKEN); + proxyConfig.setAuthenticationProviders(providers); + + proxyService = Mockito.spy(new ProxyService(proxyConfig, + new AuthenticationService( + PulsarConfigurationLoader.convertFrom(proxyConfig)))); + } + + @AfterMethod(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + proxyService.close(); + } + + private void startProxy() throws Exception { + proxyService.start(); + } + + /** + * <pre> + * It verifies jwt + Authentication + Authorization (client -> proxy -> broker). + * + * 1. client connects to proxy over jwt and pass auth-data + * 2. proxy authenticate client and retrieve client-role + * and send it to broker as originalPrincipal over jwt + * 3. client creates producer/consumer via proxy + * 4. broker authorize producer/consumer create request using originalPrincipal + * + * </pre> + */ + @Test + public void testProxyAuthorization() throws Exception { + log.info("-- Starting {} test --", methodName); + + startProxy(); + createAdminClient(); + PulsarClient proxyClient = createPulsarClient(proxyService.getServiceUrl(), PulsarClient.builder()); + + String namespaceName = "my-property/proxy-authorization/my-ns"; + + admin.clusters().createCluster("proxy-authorization", new ClusterData(brokerUrl.toString())); + + admin.tenants().createTenant("my-property", + new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("proxy-authorization"))); + admin.namespaces().createNamespace(namespaceName); + + Consumer<byte[]> consumer; + try { + consumer = proxyClient.newConsumer() + .topic("persistent://my-property/proxy-authorization/my-ns/my-topic1") + .subscriptionName("my-subscriber-name").subscribe(); + Assert.fail("should have failed with authorization error"); + } catch (Exception ex) { + // excepted + admin.namespaces().grantPermissionOnNamespace(namespaceName, CLIENT_ROLE, + Sets.newHashSet(AuthAction.consume)); + log.info("-- Admin permissions {} ---", admin.namespaces().getPermissions(namespaceName)); + consumer = proxyClient.newConsumer() + .topic("persistent://my-property/proxy-authorization/my-ns/my-topic1") + .subscriptionName("my-subscriber-name").subscribe(); + } + + Producer<byte[]> producer; + try { + producer = proxyClient.newProducer(Schema.BYTES) + .topic("persistent://my-property/proxy-authorization/my-ns/my-topic1").create(); + Assert.fail("should have failed with authorization error"); + } catch (Exception ex) { + // excepted + admin.namespaces().grantPermissionOnNamespace(namespaceName, CLIENT_ROLE, + Sets.newHashSet(AuthAction.produce, AuthAction.consume)); + log.info("-- Admin permissions {} ---", admin.namespaces().getPermissions(namespaceName)); + producer = proxyClient.newProducer(Schema.BYTES) + .topic("persistent://my-property/proxy-authorization/my-ns/my-topic1").create(); + } + final int msgs = 10; + for (int i = 0; i < msgs; i++) { + String message = "my-message-" + i; + producer.send(message.getBytes()); + } + + Message<byte[]> msg = null; + Set<String> messageSet = Sets.newHashSet(); + int count = 0; + for (int i = 0; i < 10; i++) { + msg = consumer.receive(5, TimeUnit.SECONDS); + String receivedMessage = new String(msg.getData()); + log.debug("Received message: [{}]", receivedMessage); + String expectedMessage = "my-message-" + i; + testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage); + count++; + } + // Acknowledge the consumption of all messages at once + Assert.assertEquals(msgs, count); + consumer.acknowledgeCumulative(msg); + consumer.close(); + log.info("-- Exiting {} test --", methodName); + } + + /** + * <pre> + * It verifies jwt + Authentication + Authorization (client -> proxy -> broker). + * It also test `SubscriptionAuthMode.Prefix` mode. + * + * 1. client connects to proxy over jwt and pass auth-data + * 2. proxy authenticate client and retrieve client-role + * and send it to broker as originalPrincipal over jwt + * 3. client creates producer/consumer via proxy + * 4. broker authorize producer/consumer create request using originalPrincipal + * + * </pre> + */ + @Test + public void testProxyAuthorizationWithPrefixSubscriptionAuthMode() throws Exception { + log.info("-- Starting {} test --", methodName); + + startProxy(); + createAdminClient(); + PulsarClient proxyClient = createPulsarClient(proxyService.getServiceUrl(), PulsarClient.builder()); + + String namespaceName = "my-property/proxy-authorization/my-ns"; + + admin.clusters().createCluster("proxy-authorization", new ClusterData(brokerUrl.toString())); + + admin.tenants().createTenant("my-property", + new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("proxy-authorization"))); + admin.namespaces().createNamespace(namespaceName); + admin.namespaces().grantPermissionOnNamespace(namespaceName, CLIENT_ROLE, + Sets.newHashSet(AuthAction.produce, AuthAction.consume)); + admin.namespaces().setSubscriptionAuthMode(namespaceName, SubscriptionAuthMode.Prefix); + + Consumer<byte[]> consumer; + try { + consumer = proxyClient.newConsumer() + .topic("persistent://my-property/proxy-authorization/my-ns/my-topic1") + .subscriptionName("my-subscriber-name").subscribe(); + Assert.fail("should have failed with authorization error"); + } catch (Exception ex) { + // excepted + consumer = proxyClient.newConsumer() + .topic("persistent://my-property/proxy-authorization/my-ns/my-topic1") + .subscriptionName(CLIENT_ROLE + "-sub1").subscribe(); + } + + Producer<byte[]> producer = proxyClient.newProducer(Schema.BYTES) + .topic("persistent://my-property/proxy-authorization/my-ns/my-topic1").create(); + final int msgs = 10; + for (int i = 0; i < msgs; i++) { + String message = "my-message-" + i; + producer.send(message.getBytes()); + } + + Message<byte[]> msg = null; + Set<String> messageSet = Sets.newHashSet(); + int count = 0; + for (int i = 0; i < 10; i++) { + msg = consumer.receive(5, TimeUnit.SECONDS); + String receivedMessage = new String(msg.getData()); + log.debug("Received message: [{}]", receivedMessage); + String expectedMessage = "my-message-" + i; + testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage); + count++; + } + // Acknowledge the consumption of all messages at once + Assert.assertEquals(msgs, count); + consumer.acknowledgeCumulative(msg); + consumer.close(); + log.info("-- Exiting {} test --", methodName); + } + + private void createAdminClient() throws Exception { + admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString()) + .authentication(AuthenticationFactory.token(ADMIN_TOKEN)).build()); + } + + private PulsarClient createPulsarClient(String proxyServiceUrl, ClientBuilder clientBuilder) + throws PulsarClientException { + return clientBuilder.serviceUrl(proxyServiceUrl).statsInterval(0, TimeUnit.SECONDS) + .authentication(AuthenticationFactory.token(CLIENT_TOKEN)) + .operationTimeout(1000, TimeUnit.MILLISECONDS).build(); + } +}