This is an automated email from the ASF dual-hosted git repository.
mmarshall pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new 02b27e8e5aa [refactor][broker] Use AuthenticationParameters for rest
producer (#20046)
02b27e8e5aa is described below
commit 02b27e8e5aa0280a5b0f8a8cd80409b1cfe8d2db
Author: Michael Marshall <[email protected]>
AuthorDate: Mon Apr 10 12:13:13 2023 -0500
[refactor][broker] Use AuthenticationParameters for rest producer (#20046)
In #19975, we introduced a wrapper for all authentication parameters. This
PR adds that wrapper to the Rest Producer.
* Use `AuthenticationParameters` to simplify parameter management in Rest
Producer.
* Add method to the `AuthorizationService` that takes the
`AuthenticationParameters`.
* Update annotations on Rest Producer to indicate that a 401 is an expected
response.
This change is covered by the `TopicsAuthTest`.
- [x] `doc-not-needed`
This is an internal change that does not need to be documented.
PR in forked repository: skipping PR since the relevant tests pass locally
(cherry picked from commit 7990948a73e2c4dfa0e9c99ff223f0ee90e82dc3)
---
.../broker/authorization/AuthorizationService.java | 7 ++++++
.../java/org/apache/pulsar/broker/rest/Topics.java | 4 ++++
.../org/apache/pulsar/broker/rest/TopicsBase.java | 25 ++++++++++++++++------
3 files changed, 30 insertions(+), 6 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
index 759f253db98..4b59f1628cb 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
@@ -734,6 +734,13 @@ public class AuthorizationService {
}
}
+ public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName
topicName,
+ TopicOperation
operation,
+
AuthenticationParameters authParams) {
+ return allowTopicOperationAsync(topicName, operation,
authParams.getOriginalPrincipal(),
+ authParams.getClientRole(),
authParams.getClientAuthenticationDataSource());
+ }
+
public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName
topicName,
TopicOperation
operation,
String
originalRole,
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/Topics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/Topics.java
index a8095f03279..1a6bb524309 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/Topics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/Topics.java
@@ -49,6 +49,7 @@ public class Topics extends TopicsBase {
@Path("/persistent/{tenant}/{namespace}/{topic}")
@ApiOperation(value = "Produce message to a persistent topic.", response =
String.class, responseContainer = "List")
@ApiResponses(value = {
+ @ApiResponse(code = 401, message = "Client is not authorized to
perform operation"),
@ApiResponse(code = 404, message = "tenant/namespace/topic doesn't
exit"),
@ApiResponse(code = 412, message = "Namespace name is not valid"),
@ApiResponse(code = 500, message = "Internal server error") })
@@ -76,6 +77,7 @@ public class Topics extends TopicsBase {
@ApiOperation(value = "Produce message to a partition of a persistent
topic.",
response = String.class, responseContainer = "List")
@ApiResponses(value = {
+ @ApiResponse(code = 401, message = "Client is not authorized to
perform operation"),
@ApiResponse(code = 404, message = "tenant/namespace/topic doesn't
exit"),
@ApiResponse(code = 412, message = "Namespace name is not valid"),
@ApiResponse(code = 500, message = "Internal server error") })
@@ -104,6 +106,7 @@ public class Topics extends TopicsBase {
@Path("/non-persistent/{tenant}/{namespace}/{topic}")
@ApiOperation(value = "Produce message to a persistent topic.", response =
String.class, responseContainer = "List")
@ApiResponses(value = {
+ @ApiResponse(code = 401, message = "Client is not authorized to
perform operation"),
@ApiResponse(code = 404, message = "tenant/namespace/topic doesn't
exit"),
@ApiResponse(code = 412, message = "Namespace name is not valid"),
@ApiResponse(code = 500, message = "Internal server error") })
@@ -132,6 +135,7 @@ public class Topics extends TopicsBase {
@ApiOperation(value = "Produce message to a partition of a persistent
topic.",
response = String.class, responseContainer = "List")
@ApiResponses(value = {
+ @ApiResponse(code = 401, message = "Client is not authorized to
perform operation"),
@ApiResponse(code = 404, message = "tenant/namespace/topic doesn't
exit"),
@ApiResponse(code = 412, message = "Namespace name is not valid"),
@ApiResponse(code = 500, message = "Internal server error") })
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java
index 86e8956d950..14996c35b19 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.rest;
+import static java.util.concurrent.TimeUnit.SECONDS;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.net.URI;
@@ -53,6 +54,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
+import org.apache.pulsar.broker.authentication.AuthenticationParameters;
import org.apache.pulsar.broker.lookup.LookupResult;
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.broker.service.BrokerServiceException;
@@ -80,6 +82,7 @@ import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
@@ -760,14 +763,24 @@ public class TopicsBase extends PersistentTopicsBase {
if (!isClientAuthenticated(clientAppId())) {
throw new RestException(Status.UNAUTHORIZED, "Need to
authenticate to perform the request");
}
+ AuthenticationParameters authParams = authParams();
+ boolean isAuthorized;
+ try {
+ isAuthorized =
pulsar().getBrokerService().getAuthorizationService()
+ .allowTopicOperationAsync(topicName,
TopicOperation.PRODUCE, authParams)
+
.get(config().getMetadataStoreOperationTimeoutSeconds(), SECONDS);
+ } catch (InterruptedException e) {
+ log.warn("Time-out {} sec while checking authorization on {} ",
+ config().getMetadataStoreOperationTimeoutSeconds(),
topicName);
+ throw new RestException(Status.INTERNAL_SERVER_ERROR,
"Time-out while checking authorization");
+ } catch (Exception e) {
+ log.warn("Producer-client with Role - {} {} failed to get
permissions for topic - {}. {}",
+ authParams.getClientRole(),
authParams.getOriginalPrincipal(), topicName, e.getMessage());
+ throw new RestException(Status.INTERNAL_SERVER_ERROR, "Failed
to get permissions");
+ }
- boolean isAuthorized =
pulsar().getBrokerService().getAuthorizationService()
- .canProduce(topicName, originalPrincipal() == null ?
clientAppId() : originalPrincipal(),
- clientAuthData());
if (!isAuthorized) {
- throw new RestException(Status.UNAUTHORIZED,
String.format("Unauthorized to produce to topic %s"
- + " with clientAppId [%s] and authdata
%s", topicName.toString(),
- clientAppId(), clientAuthData()));
+ throw new RestException(Status.UNAUTHORIZED, "Unauthorized to
produce to topic " + topicName);
}
}
}