This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 36ea153 Instead of always using admin access for topic, use read/write/admin access for topic (#6504) 36ea153 is described below commit 36ea153c0ff4fc3e3f04de4a37b658daa9f116fa Author: Sanjeev Kulkarni <sanjee...@gmail.com> AuthorDate: Sat Mar 7 18:10:03 2020 -0800 Instead of always using admin access for topic, use read/write/admin access for topic (#6504) Co-authored-by: Sanjeev Kulkarni <sanje...@splunk.com> --- .../broker/admin/impl/PersistentTopicsBase.java | 64 +++++++++++++++++----- 1 file changed, 51 insertions(+), 13 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index d74017c..685d195 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -234,6 +234,44 @@ public class PersistentTopicsBase extends AdminResource { validateTopicOwnership(topicName, authoritative); } + public void validateReadOperationOnTopic(boolean authoritative) { + validateTopicOwnership(topicName, authoritative); + try { + validateAdminAccessForTenant(topicName.getTenant()); + } catch (Exception e) { + if (log.isDebugEnabled()) { + log.debug("[{}] failed to validate admin access for {}", topicName, clientAppId()); + } + validateAdminAccessForSubscriber(""); + } + } + + public void validateWriteOperationOnTopic(boolean authoritative) { + validateTopicOwnership(topicName, authoritative); + try { + validateAdminAccessForTenant(topicName.getTenant()); + } catch (Exception e) { + if (log.isDebugEnabled()) { + log.debug("[{}] failed to validate admin access for {}", topicName, clientAppId()); + } + try { + if (!pulsar().getBrokerService().getAuthorizationService().canProduce(topicName, clientAppId(), + clientAuthData())) { + log.warn("[{}} Subscriber {} is not authorized to access api", topicName, clientAppId()); + throw new RestException(Status.UNAUTHORIZED, + String.format("Subscriber %s is not authorized to access this operation", clientAppId())); + } + } catch (RestException re) { + throw re; + } catch (Exception ex) { + // unknown error marked as internal server error + log.warn("Unexpected error while authorizing request. topic={}, role={}. Error: {}", topicName, + clientAppId(), e.getMessage(), ex); + throw new RestException(ex); + } + } + } + protected void validateAdminAccessForSubscriber(String subscriptionName, boolean authoritative) { validateTopicOwnership(topicName, authoritative); try { @@ -317,7 +355,7 @@ public class PersistentTopicsBase extends AdminResource { } protected void internalDeleteTopicForcefully(boolean authoritative) { - validateAdminOperationOnTopic(authoritative); + validateWriteOperationOnTopic(authoritative); Topic topic = getTopicReference(topicName); try { topic.deleteForcefully().get(); @@ -391,7 +429,7 @@ public class PersistentTopicsBase extends AdminResource { } protected void internalCreateNonPartitionedTopic(boolean authoritative) { - validateAdminAccessForTenant(topicName.getTenant()); + validateWriteOperationOnTopic(authoritative); validateNonPartitionTopicName(topicName.getLocalName()); if (topicName.isGlobal()) { validateGlobalNamespaceOwnership(namespaceName); @@ -426,7 +464,7 @@ public class PersistentTopicsBase extends AdminResource { * @param numPartitions */ protected void internalUpdatePartitionedTopic(int numPartitions, boolean updateLocalTopicOnly) { - validateAdminAccessForTenant(topicName.getTenant()); + validateWriteOperationOnTopic(false); // Only do the validation if it's the first hop. if (!updateLocalTopicOnly) { validatePartitionTopicUpdate(topicName.getLocalName(), numPartitions); @@ -540,7 +578,7 @@ public class PersistentTopicsBase extends AdminResource { protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse, boolean authoritative, boolean force) { try { - validateAdminAccessForTenant(topicName.getTenant()); + validateWriteOperationOnTopic(authoritative); } catch (Exception e) { log.error("[{}] Failed to delete partitioned topic {}", clientAppId(), topicName, e); resumeAsyncResponseExceptionally(asyncResponse, e); @@ -738,7 +776,7 @@ public class PersistentTopicsBase extends AdminResource { } protected void internalDeleteTopic(boolean authoritative) { - validateAdminOperationOnTopic(authoritative); + validateWriteOperationOnTopic(authoritative); Topic topic = getTopicReference(topicName); // v2 topics have a global name so check if the topic is replicated. @@ -825,7 +863,7 @@ public class PersistentTopicsBase extends AdminResource { private void internalGetSubscriptionsForNonPartitionedTopic(AsyncResponse asyncResponse, boolean authoritative) { try { - validateAdminOperationOnTopic(authoritative); + validateReadOperationOnTopic(authoritative); Topic topic = getTopicReference(topicName); final List<String> subscriptions = Lists.newArrayList(); topic.getSubscriptions().forEach((subName, sub) -> subscriptions.add(subName)); @@ -1279,7 +1317,7 @@ public class PersistentTopicsBase extends AdminResource { // validate ownership and redirect if current broker is not owner PersistentTopic topic; try { - validateAdminOperationOnTopic(authoritative); + validateWriteOperationOnTopic(authoritative); topic = (PersistentTopic) getTopicReference(topicName); } catch (Exception e) { @@ -1744,7 +1782,7 @@ public class PersistentTopicsBase extends AdminResource { if (partitionMetadata.partitions > 0) { throw new RestException(Status.METHOD_NOT_ALLOWED, "Termination of a partitioned topic is not allowed"); } - validateAdminOperationOnTopic(authoritative); + validateWriteOperationOnTopic(authoritative); Topic topic = getTopicReference(topicName); try { return ((PersistentTopic) topic).terminate().get(); @@ -1867,7 +1905,7 @@ public class PersistentTopicsBase extends AdminResource { } protected void internalTriggerCompaction(boolean authoritative) { - validateAdminOperationOnTopic(authoritative); + validateWriteOperationOnTopic(authoritative); PersistentTopic topic = (PersistentTopic) getTopicReference(topicName); try { @@ -1880,13 +1918,13 @@ public class PersistentTopicsBase extends AdminResource { } protected LongRunningProcessStatus internalCompactionStatus(boolean authoritative) { - validateAdminOperationOnTopic(authoritative); + validateReadOperationOnTopic(authoritative); PersistentTopic topic = (PersistentTopic) getTopicReference(topicName); return topic.compactionStatus(); } protected void internalTriggerOffload(boolean authoritative, MessageIdImpl messageId) { - validateAdminOperationOnTopic(authoritative); + validateWriteOperationOnTopic(authoritative); PersistentTopic topic = (PersistentTopic) getTopicReference(topicName); try { topic.triggerOffload(messageId); @@ -1899,7 +1937,7 @@ public class PersistentTopicsBase extends AdminResource { } protected OffloadProcessStatus internalOffloadStatus(boolean authoritative) { - validateAdminOperationOnTopic(authoritative); + validateReadOperationOnTopic(authoritative); PersistentTopic topic = (PersistentTopic) getTopicReference(topicName); return topic.offloadStatus(); } @@ -2237,7 +2275,7 @@ public class PersistentTopicsBase extends AdminResource { } protected MessageId internalGetLastMessageId(boolean authoritative) { - validateAdminOperationOnTopic(authoritative); + validateReadOperationOnTopic(authoritative); if (!(getTopicReference(topicName) instanceof PersistentTopic)) { log.error("[{}] Not supported operation of non-persistent topic {}", clientAppId(), topicName);