This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 99eb49a6898 [improve][broker] Add fine-grain authorization to ns/topic
management endpoints (#22305)
99eb49a6898 is described below
commit 99eb49a68982271562597d4c4cea127132bc0b35
Author: Jiwei Guo <[email protected]>
AuthorDate: Wed Mar 20 13:49:54 2024 +0800
[improve][broker] Add fine-grain authorization to ns/topic management
endpoints (#22305)
(cherry picked from commit fd34d4ab9c5aa7e0dca961d5a8badae4613fbe8e)
# Conflicts:
#
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
---
.../authorization/PulsarAuthorizationProvider.java | 1 +
.../apache/pulsar/broker/admin/AdminResource.java | 7 +-
.../pulsar/broker/admin/impl/NamespacesBase.java | 166 +++++-----
.../broker/admin/impl/PersistentTopicsBase.java | 172 +++++-----
.../pulsar/broker/admin/NamespaceAuthZTest.java | 163 ++++++++++
.../apache/pulsar/broker/admin/TopicAuthZTest.java | 345 +++++++++++++++++++++
6 files changed, 683 insertions(+), 171 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index acb6fce9b92..a39c3d05607 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -597,6 +597,7 @@ public class PulsarAuthorizationProvider implements
AuthorizationProvider {
case COMPACT:
case OFFLOAD:
case UNLOAD:
+ case TRIM_TOPIC:
case DELETE_METADATA:
case UPDATE_METADATA:
case ADD_BUNDLE_RANGE:
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 7de426fa1f0..ec6d8a88828 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -60,8 +60,6 @@ import org.apache.pulsar.common.policies.data.EntryFilters;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.Policies;
-import org.apache.pulsar.common.policies.data.PolicyName;
-import org.apache.pulsar.common.policies.data.PolicyOperation;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.SubscribeRate;
@@ -711,10 +709,7 @@ public abstract class AdminResource extends
PulsarWebResource {
}
protected CompletableFuture<SchemaCompatibilityStrategy>
getSchemaCompatibilityStrategyAsync() {
- return validateTopicPolicyOperationAsync(topicName,
- PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
- PolicyOperation.READ)
- .thenCompose((__) ->
getSchemaCompatibilityStrategyAsyncWithoutAuth()).whenComplete((__, ex) -> {
+ return
getSchemaCompatibilityStrategyAsyncWithoutAuth().whenComplete((__, ex) -> {
if (ex != null) {
log.error("[{}] Failed to get schema compatibility
strategy of topic {} {}",
clientAppId(), topicName, ex);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 144545abb2c..9478857032f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -2354,102 +2354,110 @@ public abstract class NamespacesBase extends
AdminResource {
}
protected void internalSetProperty(String key, String value, AsyncResponse
asyncResponse) {
- validatePoliciesReadOnlyAccess();
- updatePoliciesAsync(namespaceName, policies -> {
- policies.properties.put(key, value);
- return policies;
- }).thenAccept(v -> {
- log.info("[{}] Successfully set property for key {} on namespace
{}", clientAppId(), key,
- namespaceName);
- asyncResponse.resume(Response.noContent().build());
- }).exceptionally(ex -> {
- Throwable cause = ex.getCause();
- log.error("[{}] Failed to set property for key {} on namespace {}",
clientAppId(), key,
- namespaceName, cause);
- asyncResponse.resume(cause);
- return null;
- });
+ validateAdminAccessForTenantAsync(namespaceName.getTenant())
+ .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+ .thenCompose(__ -> updatePoliciesAsync(namespaceName, policies
-> {
+ policies.properties.put(key, value);
+ return policies;
+ }))
+ .thenAccept(v -> {
+ log.info("[{}] Successfully set property for key {} on
namespace {}", clientAppId(), key,
+ namespaceName);
+ asyncResponse.resume(Response.noContent().build());
+ }).exceptionally(ex -> {
+ Throwable cause = ex.getCause();
+ log.error("[{}] Failed to set property for key {} on
namespace {}", clientAppId(), key,
+ namespaceName, cause);
+ asyncResponse.resume(cause);
+ return null;
+ });
}
protected void internalSetProperties(Map<String, String> properties,
AsyncResponse asyncResponse) {
- validatePoliciesReadOnlyAccess();
- updatePoliciesAsync(namespaceName, policies -> {
- policies.properties.putAll(properties);
- return policies;
- }).thenAccept(v -> {
- log.info("[{}] Successfully set {} properties on namespace {}",
clientAppId(), properties.size(),
- namespaceName);
- asyncResponse.resume(Response.noContent().build());
- }).exceptionally(ex -> {
- Throwable cause = ex.getCause();
- log.error("[{}] Failed to set {} properties on namespace {}",
clientAppId(), properties.size(),
- namespaceName, cause);
- asyncResponse.resume(cause);
- return null;
- });
+ validateAdminAccessForTenantAsync(namespaceName.getTenant())
+ .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+ .thenCompose(__ -> updatePoliciesAsync(namespaceName, policies
-> {
+ policies.properties.putAll(properties);
+ return policies;
+ }))
+ .thenAccept(v -> {
+ log.info("[{}] Successfully set {} properties on namespace
{}", clientAppId(), properties.size(),
+ namespaceName);
+ asyncResponse.resume(Response.noContent().build());
+ }).exceptionally(ex -> {
+ Throwable cause = ex.getCause();
+ log.error("[{}] Failed to set {} properties on namespace
{}", clientAppId(), properties.size(),
+ namespaceName, cause);
+ asyncResponse.resume(cause);
+ return null;
+ });
}
protected void internalGetProperty(String key, AsyncResponse asyncResponse)
{
- getNamespacePoliciesAsync(namespaceName).thenAccept(policies -> {
- asyncResponse.resume(policies.properties.get(key));
- }).exceptionally(ex -> {
- Throwable cause = ex.getCause();
- log.error("[{}] Failed to get property for key {} of namespace
{}", clientAppId(), key,
- namespaceName, cause);
- asyncResponse.resume(cause);
- return null;
- });
+ validateAdminAccessForTenantAsync(namespaceName.getTenant())
+ .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+ .thenAccept(policies ->
asyncResponse.resume(policies.properties.get(key)))
+ .exceptionally(ex -> {
+ Throwable cause = ex.getCause();
+ log.error("[{}] Failed to get property for key {} of
namespace {}", clientAppId(), key,
+ namespaceName, cause);
+ asyncResponse.resume(cause);
+ return null;
+ });
}
protected void internalGetProperties(AsyncResponse asyncResponse) {
- getNamespacePoliciesAsync(namespaceName).thenAccept(policies -> {
- asyncResponse.resume(policies.properties);
- }).exceptionally(ex -> {
- Throwable cause = ex.getCause();
- log.error("[{}] Failed to get properties of namespace {}",
clientAppId(), namespaceName, cause);
- asyncResponse.resume(cause);
- return null;
- });
+ validateAdminAccessForTenantAsync(namespaceName.getTenant())
+ .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+ .thenAccept(policies ->
asyncResponse.resume(policies.properties))
+ .exceptionally(ex -> {
+ Throwable cause = ex.getCause();
+ log.error("[{}] Failed to get properties of namespace {}",
clientAppId(), namespaceName, cause);
+ asyncResponse.resume(cause);
+ return null;
+ });
}
protected void internalRemoveProperty(String key, AsyncResponse
asyncResponse) {
- validatePoliciesReadOnlyAccess();
-
AtomicReference<String> oldVal = new AtomicReference<>(null);
- updatePoliciesAsync(namespaceName, policies -> {
- oldVal.set(policies.properties.remove(key));
- return policies;
- }).thenAccept(v -> {
- asyncResponse.resume(oldVal.get());
- log.info("[{}] Successfully remove property for key {} on namespace
{}", clientAppId(), key,
- namespaceName);
- }).exceptionally(ex -> {
- Throwable cause = ex.getCause();
- log.error("[{}] Failed to remove property for key {} on namespace
{}", clientAppId(), key,
- namespaceName, cause);
- asyncResponse.resume(cause);
- return null;
- });
+ validateAdminAccessForTenantAsync(namespaceName.getTenant())
+ .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+ .thenCompose(__ -> updatePoliciesAsync(namespaceName, policies
-> {
+ oldVal.set(policies.properties.remove(key));
+ return policies;
+ })).thenAccept(v -> {
+ asyncResponse.resume(oldVal.get());
+ log.info("[{}] Successfully remove property for key {} on
namespace {}", clientAppId(), key,
+ namespaceName);
+ }).exceptionally(ex -> {
+ Throwable cause = ex.getCause();
+ log.error("[{}] Failed to remove property for key {} on
namespace {}", clientAppId(), key,
+ namespaceName, cause);
+ asyncResponse.resume(cause);
+ return null;
+ });
}
protected void internalClearProperties(AsyncResponse asyncResponse) {
- validatePoliciesReadOnlyAccess();
AtomicReference<Integer> clearedCount = new AtomicReference<>(0);
- updatePoliciesAsync(namespaceName, policies -> {
- clearedCount.set(policies.properties.size());
- policies.properties.clear();
- return policies;
- }).thenAccept(v -> {
- asyncResponse.resume(Response.noContent().build());
- log.info("[{}] Successfully clear {} properties on namespace {}",
clientAppId(), clearedCount.get(),
- namespaceName);
- }).exceptionally(ex -> {
- Throwable cause = ex.getCause();
- log.error("[{}] Failed to clear {} properties on namespace {}",
clientAppId(), clearedCount.get(),
- namespaceName, cause);
- asyncResponse.resume(cause);
- return null;
- });
+ validateAdminAccessForTenantAsync(namespaceName.getTenant())
+ .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+ .thenCompose(__ -> updatePoliciesAsync(namespaceName, policies
-> {
+ clearedCount.set(policies.properties.size());
+ policies.properties.clear();
+ return policies;
+ }))
+ .thenAccept(v -> {
+ asyncResponse.resume(Response.noContent().build());
+ log.info("[{}] Successfully clear {} properties on
namespace {}", clientAppId(), clearedCount.get(),
+ namespaceName);
+ }).exceptionally(ex -> {
+ Throwable cause = ex.getCause();
+ log.error("[{}] Failed to clear {} properties on namespace
{}", clientAppId(), clearedCount.get(),
+ namespaceName, cause);
+ asyncResponse.resume(cause);
+ return null;
+ });
}
private CompletableFuture<Void> updatePoliciesAsync(NamespaceName ns,
Function<Policies, Policies> updateFunction) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index c5a8d83f9c5..fe451d1650e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -501,7 +501,9 @@ public class PersistentTopicsBase extends AdminResource {
protected void internalCreateMissedPartitions(AsyncResponse asyncResponse)
{
getPartitionedTopicMetadataAsync(topicName, false,
false).thenAccept(metadata -> {
if (metadata != null) {
- tryCreatePartitionsAsync(metadata.partitions).thenAccept(v -> {
+ CompletableFuture<Void> future =
validateNamespaceOperationAsync(topicName.getNamespaceObject(),
+ NamespaceOperation.CREATE_TOPIC);
+ future.thenCompose(__ ->
tryCreatePartitionsAsync(metadata.partitions)).thenAccept(v -> {
asyncResponse.resume(Response.noContent().build());
}).exceptionally(e -> {
log.error("[{}] Failed to create partitions for topic {}",
clientAppId(), topicName);
@@ -833,13 +835,13 @@ public class PersistentTopicsBase extends AdminResource {
protected void internalUnloadTopic(AsyncResponse asyncResponse, boolean
authoritative) {
log.info("[{}] Unloading topic {}", clientAppId(), topicName);
- CompletableFuture<Void> future;
- if (topicName.isGlobal()) {
- future = validateGlobalNamespaceOwnershipAsync(namespaceName);
- } else {
- future = CompletableFuture.completedFuture(null);
- }
- future.thenAccept(__ -> {
+ CompletableFuture<Void> future =
validateTopicOperationAsync(topicName, TopicOperation.UNLOAD);
+ future.thenCompose(__ -> {
+ if (topicName.isGlobal()) {
+ return validateGlobalNamespaceOwnershipAsync(namespaceName);
+ }
+ return CompletableFuture.completedFuture(null);
+ }).thenAccept(__ -> {
// If the topic name is a partition name, no need to get partition
topic metadata again
if (topicName.isPartitioned()) {
if (isTransactionCoordinatorAssign(topicName)) {
@@ -1056,13 +1058,12 @@ public class PersistentTopicsBase extends AdminResource
{
private void internalUnloadNonPartitionedTopicAsync(AsyncResponse
asyncResponse, boolean authoritative) {
validateTopicOwnershipAsync(topicName, authoritative)
- .thenCompose(unused -> validateTopicOperationAsync(topicName,
TopicOperation.UNLOAD)
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenCompose(topic -> topic.close(false))
.thenRun(() -> {
log.info("[{}] Successfully unloaded topic {}",
clientAppId(), topicName);
asyncResponse.resume(Response.noContent().build());
- }))
+ })
.exceptionally(ex -> {
// If the exception is not redirect exception we need to
log it.
if (!isNot307And404Exception(ex)) {
@@ -1075,16 +1076,14 @@ public class PersistentTopicsBase extends AdminResource
{
private void internalUnloadTransactionCoordinatorAsync(AsyncResponse
asyncResponse, boolean authoritative) {
validateTopicOwnershipAsync(topicName, authoritative)
- .thenCompose(__ -> validateTopicOperationAsync(topicName,
TopicOperation.UNLOAD)
- .thenCompose(v -> pulsar()
- .getTransactionMetadataStoreService()
- .removeTransactionMetadataStore(
-
TransactionCoordinatorID.get(topicName.getPartitionIndex())))
- .thenRun(() -> {
- log.info("[{}] Successfully unloaded tc {}",
clientAppId(),
- topicName.getPartitionIndex());
- asyncResponse.resume(Response.noContent().build());
- }))
+ .thenCompose(v -> pulsar()
+ .getTransactionMetadataStoreService()
+ .removeTransactionMetadataStore(
+
TransactionCoordinatorID.get(topicName.getPartitionIndex())))
+ .thenRun(() -> {
+ log.info("[{}] Successfully unloaded tc {}",
clientAppId(), topicName.getPartitionIndex());
+ asyncResponse.resume(Response.noContent().build());
+ })
.exceptionally(ex -> {
// If the exception is not redirect exception we need to
log it.
if (!isNot307And404Exception(ex)) {
@@ -1295,13 +1294,13 @@ public class PersistentTopicsBase extends AdminResource
{
}
protected void internalGetManagedLedgerInfo(AsyncResponse asyncResponse,
boolean authoritative) {
- CompletableFuture<Void> future;
- if (topicName.isGlobal()) {
- future = validateGlobalNamespaceOwnershipAsync(namespaceName);
- } else {
- future = CompletableFuture.completedFuture(null);
- }
- future.thenAccept(__ -> {
+ CompletableFuture<Void> future =
validateTopicOperationAsync(topicName, TopicOperation.GET_STATS);
+ future.thenCompose(__ -> {
+ if (topicName.isGlobal()) {
+ return validateGlobalNamespaceOwnershipAsync(namespaceName);
+ }
+ return CompletableFuture.completedFuture(null);
+ }).thenAccept(__ -> {
// If the topic name is a partition name, no need to get partition
topic metadata again
if (topicName.isPartitioned()) {
internalGetManagedLedgerInfoForNonPartitionedTopic(asyncResponse);
@@ -1406,13 +1405,13 @@ public class PersistentTopicsBase extends AdminResource
{
protected void internalGetPartitionedStats(AsyncResponse asyncResponse,
boolean authoritative, boolean perPartition,
boolean getPreciseBacklog,
boolean subscriptionBacklogSize,
boolean
getEarliestTimeInBacklog) {
- CompletableFuture<Void> future;
- if (topicName.isGlobal()) {
- future = validateGlobalNamespaceOwnershipAsync(namespaceName);
- } else {
- future = CompletableFuture.completedFuture(null);
- }
- future.thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName,
+ CompletableFuture<Void> future =
validateTopicOperationAsync(topicName, TopicOperation.GET_STATS);
+ future.thenCompose(__ -> {
+ if (topicName.isGlobal()) {
+ return validateGlobalNamespaceOwnershipAsync(namespaceName);
+ }
+ return CompletableFuture.completedFuture(null);
+ }).thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName,
authoritative, false)).thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions == 0) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
@@ -1492,14 +1491,15 @@ public class PersistentTopicsBase extends AdminResource
{
}
protected void internalGetPartitionedStatsInternal(AsyncResponse
asyncResponse, boolean authoritative) {
- CompletableFuture<Void> future;
- if (topicName.isGlobal()) {
- future = validateGlobalNamespaceOwnershipAsync(namespaceName);
- } else {
- future = CompletableFuture.completedFuture(null);
- }
- future.thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName,
authoritative, false))
- .thenAccept(partitionMetadata -> {
+ CompletableFuture<Void> future =
validateTopicOperationAsync(topicName, TopicOperation.GET_STATS);
+ future.thenCompose(__ -> {
+ if (topicName.isGlobal()) {
+ return validateGlobalNamespaceOwnershipAsync(namespaceName);
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
+ }).thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName,
authoritative, false))
+ .thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions == 0) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
getPartitionedTopicNotFoundErrorMessage(topicName.toString())));
@@ -2252,13 +2252,14 @@ public class PersistentTopicsBase extends AdminResource
{
protected void internalCreateSubscription(AsyncResponse asyncResponse,
String subscriptionName,
MessageIdImpl messageId, boolean authoritative, boolean
replicated, Map<String, String> properties) {
- CompletableFuture<Void> ret;
- if (topicName.isGlobal()) {
- ret = validateGlobalNamespaceOwnershipAsync(namespaceName);
- } else {
- ret = CompletableFuture.completedFuture(null);
- }
- ret.thenAccept(__ -> {
+ CompletableFuture<Void> ret = validateTopicOperationAsync(topicName,
TopicOperation.SUBSCRIBE,
+ subscriptionName);
+ ret.thenCompose(__ -> {
+ if (topicName.isGlobal()) {
+ return validateGlobalNamespaceOwnershipAsync(namespaceName);
+ }
+ return CompletableFuture.completedFuture(null);
+ }).thenAccept(__ -> {
final MessageIdImpl targetMessageId = messageId == null ?
(MessageIdImpl) MessageId.latest : messageId;
log.info("[{}][{}] Creating subscription {} at message id {} with
properties {}", clientAppId(),
topicName, subscriptionName, targetMessageId, properties);
@@ -2417,14 +2418,13 @@ public class PersistentTopicsBase extends AdminResource
{
protected void internalUpdateSubscriptionProperties(AsyncResponse
asyncResponse, String subName,
Map<String, String>
subscriptionProperties,
boolean authoritative)
{
- CompletableFuture<Void> future;
- if (topicName.isGlobal()) {
- future = validateGlobalNamespaceOwnershipAsync(namespaceName);
- } else {
- future = CompletableFuture.completedFuture(null);
- }
-
- future.thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative)).thenAccept(__ -> {
+ CompletableFuture<Void> future =
validateTopicOperationAsync(topicName, TopicOperation.SUBSCRIBE, subName);
+ future.thenCompose(__ -> {
+ if (topicName.isGlobal()) {
+ return validateGlobalNamespaceOwnershipAsync(namespaceName);
+ }
+ return CompletableFuture.completedFuture(null);
+ }).thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative)).thenAccept(__ -> {
if (topicName.isPartitioned()) {
internalUpdateSubscriptionPropertiesForNonPartitionedTopic(asyncResponse,
subName,
subscriptionProperties, authoritative);
@@ -2496,14 +2496,13 @@ public class PersistentTopicsBase extends AdminResource
{
protected void internalAnalyzeSubscriptionBacklog(AsyncResponse
asyncResponse, String subName,
Optional<Position>
position,
boolean authoritative) {
- CompletableFuture<Void> future;
- if (topicName.isGlobal()) {
- future = validateGlobalNamespaceOwnershipAsync(namespaceName);
- } else {
- future = CompletableFuture.completedFuture(null);
- }
-
- future.thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative))
+ CompletableFuture<Void> future =
validateTopicOperationAsync(topicName, TopicOperation.CONSUME, subName);
+ future.thenCompose(__ -> {
+ if (topicName.isGlobal()) {
+ return validateGlobalNamespaceOwnershipAsync(namespaceName);
+ }
+ return CompletableFuture.completedFuture(null);
+ }).thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative))
.thenCompose(__ -> {
if (topicName.isPartitioned()) {
return CompletableFuture.completedFuture(null);
@@ -2535,14 +2534,13 @@ public class PersistentTopicsBase extends AdminResource
{
protected void internalGetSubscriptionProperties(AsyncResponse
asyncResponse, String subName,
boolean authoritative)
{
- CompletableFuture<Void> future;
- if (topicName.isGlobal()) {
- future = validateGlobalNamespaceOwnershipAsync(namespaceName);
- } else {
- future = CompletableFuture.completedFuture(null);
- }
-
- future.thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative)).thenAccept(__ -> {
+ CompletableFuture<Void> future =
validateTopicOperationAsync(topicName, TopicOperation.CONSUME, subName);
+ future.thenCompose(__ -> {
+ if (topicName.isGlobal()) {
+ return validateGlobalNamespaceOwnershipAsync(namespaceName);
+ }
+ return CompletableFuture.completedFuture(null);
+ }).thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative)).thenAccept(__ -> {
if (topicName.isPartitioned()) {
internalGetSubscriptionPropertiesForNonPartitionedTopic(asyncResponse, subName,
authoritative);
@@ -4181,13 +4179,14 @@ public class PersistentTopicsBase extends AdminResource
{
protected void internalTriggerCompaction(AsyncResponse asyncResponse,
boolean authoritative) {
log.info("[{}] Trigger compaction on topic {}", clientAppId(),
topicName);
- CompletableFuture<Void> future;
- if (topicName.isGlobal()) {
- future = validateGlobalNamespaceOwnershipAsync(namespaceName);
- } else {
- future = CompletableFuture.completedFuture(null);
- }
- future.thenAccept(__ -> {
+ CompletableFuture<Void> future =
validateTopicOperationAsync(topicName, TopicOperation.COMPACT);
+ future.thenCompose(__ -> {
+ if (topicName.isGlobal()) {
+ return validateGlobalNamespaceOwnershipAsync(namespaceName);
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
+ }).thenAccept(__ -> {
// If the topic name is a partition name, no need to get partition
topic metadata again
if (topicName.isPartitioned()) {
internalTriggerCompactionNonPartitionedTopic(asyncResponse,
authoritative);
@@ -4667,11 +4666,12 @@ public class PersistentTopicsBase extends AdminResource
{
"Trim on a non-persistent topic is not allowed"));
return null;
}
+ CompletableFuture<Void> future =
validateTopicOperationAsync(topicName, TopicOperation.TRIM_TOPIC);
if (topicName.isPartitioned()) {
- return validateTopicOperationAsync(topicName,
TopicOperation.TRIM_TOPIC).thenCompose((x)
+ return future.thenCompose((x)
-> trimNonPartitionedTopic(asyncResponse, topicName,
authoritative));
}
- return validateTopicOperationAsync(topicName,
TopicOperation.TRIM_TOPIC)
+ return future
.thenCompose(__ ->
pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName))
.thenCompose(metadata -> {
if (metadata.partitions > 0) {
@@ -5353,12 +5353,12 @@ public class PersistentTopicsBase extends AdminResource
{
}
protected CompletableFuture<SchemaCompatibilityStrategy>
internalGetSchemaCompatibilityStrategy(boolean applied) {
+ CompletableFuture<Void> future =
validateTopicPolicyOperationAsync(topicName,
+ PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
PolicyOperation.READ);
if (applied) {
- return getSchemaCompatibilityStrategyAsync();
+ return future.thenCompose(__ ->
getSchemaCompatibilityStrategyAsync());
}
- return validateTopicPolicyOperationAsync(topicName,
- PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
- PolicyOperation.READ)
+ return future
.thenCompose(n ->
getTopicPoliciesAsyncWithRetry(topicName).thenApply(op -> {
if (!op.isPresent()) {
return null;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java
new file mode 100644
index 00000000000..ce0b925614c
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.broker.admin;
+
+import io.jsonwebtoken.Jwts;
+import lombok.Cleanup;
+import lombok.SneakyThrows;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.security.MockedPulsarStandalone;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+@Test(groups = "broker-admin")
+public class NamespaceAuthZTest extends MockedPulsarStandalone {
+
+ private PulsarAdmin superUserAdmin;
+
+ private PulsarAdmin tenantManagerAdmin;
+
+ private static final String TENANT_ADMIN_SUBJECT =
UUID.randomUUID().toString();
+ private static final String TENANT_ADMIN_TOKEN = Jwts.builder()
+ .claim("sub", TENANT_ADMIN_SUBJECT).signWith(SECRET_KEY).compact();
+
+ @SneakyThrows
+ @BeforeClass
+ public void before() {
+ configureTokenAuthentication();
+ configureDefaultAuthorization();
+ start();
+ this.superUserAdmin =PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(SUPER_USER_TOKEN))
+ .build();
+ final TenantInfo tenantInfo =
superUserAdmin.tenants().getTenantInfo("public");
+ tenantInfo.getAdminRoles().add(TENANT_ADMIN_SUBJECT);
+ superUserAdmin.tenants().updateTenant("public", tenantInfo);
+ this.tenantManagerAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(TENANT_ADMIN_TOKEN))
+ .build();
+ }
+
+
+ @SneakyThrows
+ @AfterClass
+ public void after() {
+ if (superUserAdmin != null) {
+ superUserAdmin.close();
+ }
+ if (tenantManagerAdmin != null) {
+ tenantManagerAdmin.close();
+ }
+ close();
+ }
+
+
+ @SneakyThrows
+ @Test
+ public void testProperties() {
+ final String random = UUID.randomUUID().toString();
+ final String namespace = "public/default";
+ final String topic = "persistent://public/default/" + random;
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ superUserAdmin.topics().createNonPartitionedTopic(topic);
+
+ @Cleanup
+ final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+ // test superuser
+ Map<String, String> properties = new HashMap<>();
+ properties.put("key1", "value1");
+ superUserAdmin.namespaces().setProperties(namespace, properties);
+ superUserAdmin.namespaces().setProperty(namespace, "key2", "value2");
+ superUserAdmin.namespaces().getProperties(namespace);
+ superUserAdmin.namespaces().getProperty(namespace, "key2");
+ superUserAdmin.namespaces().removeProperty(namespace, "key2");
+ superUserAdmin.namespaces().clearProperties(namespace);
+
+ // test tenant manager
+ tenantManagerAdmin.namespaces().setProperties(namespace, properties);
+ tenantManagerAdmin.namespaces().setProperty(namespace, "key2",
"value2");
+ tenantManagerAdmin.namespaces().getProperties(namespace);
+ tenantManagerAdmin.namespaces().getProperty(namespace, "key2");
+ tenantManagerAdmin.namespaces().removeProperty(namespace, "key2");
+ tenantManagerAdmin.namespaces().clearProperties(namespace);
+
+ // test nobody
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().setProperties(namespace,
properties));
+
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().setProperty(namespace, "key2",
"value2"));
+
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().getProperties(namespace));
+
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().getProperty(namespace, "key2"));
+
+
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().removeProperty(namespace, "key2"));
+
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().clearProperties(namespace));
+
+ for (AuthAction action : AuthAction.values()) {
+ superUserAdmin.namespaces().grantPermissionOnNamespace(namespace,
subject, Set.of(action));
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().setProperties(namespace,
properties));
+
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().setProperty(namespace, "key2",
"value2"));
+
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().getProperties(namespace));
+
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().getProperty(namespace,
"key2"));
+
+
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().removeProperty(namespace,
"key2"));
+
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().clearProperties(namespace));
+
+
superUserAdmin.namespaces().revokePermissionsOnNamespace(namespace, subject);
+ }
+ superUserAdmin.topics().delete(topic, true);
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java
new file mode 100644
index 00000000000..e23f9bbaf9b
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.broker.admin;
+
+import io.jsonwebtoken.Jwts;
+import lombok.Cleanup;
+import lombok.SneakyThrows;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.security.MockedPulsarStandalone;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Test(groups = "broker-admin")
+public class TopicAuthZTest extends MockedPulsarStandalone {
+
+ private PulsarAdmin superUserAdmin;
+
+ private PulsarAdmin tenantManagerAdmin;
+
+ private static final String TENANT_ADMIN_SUBJECT =
UUID.randomUUID().toString();
+ private static final String TENANT_ADMIN_TOKEN = Jwts.builder()
+ .claim("sub", TENANT_ADMIN_SUBJECT).signWith(SECRET_KEY).compact();
+
+ @SneakyThrows
+ @BeforeClass
+ public void before() {
+ configureTokenAuthentication();
+ configureDefaultAuthorization();
+ start();
+ this.superUserAdmin =PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(SUPER_USER_TOKEN))
+ .build();
+ final TenantInfo tenantInfo =
superUserAdmin.tenants().getTenantInfo("public");
+ tenantInfo.getAdminRoles().add(TENANT_ADMIN_SUBJECT);
+ superUserAdmin.tenants().updateTenant("public", tenantInfo);
+ this.tenantManagerAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(TENANT_ADMIN_TOKEN))
+ .build();
+ }
+
+
+ @SneakyThrows
+ @AfterClass
+ public void after() {
+ if (superUserAdmin != null) {
+ superUserAdmin.close();
+ }
+ if (tenantManagerAdmin != null) {
+ tenantManagerAdmin.close();
+ }
+ close();
+ }
+
+
+ @SneakyThrows
+ @Test
+ public void testUnloadAndCompactAndTrim() {
+ final String random = UUID.randomUUID().toString();
+ final String topic = "persistent://public/default/" + random;
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ superUserAdmin.topics().createPartitionedTopic(topic, 2);
+
+ @Cleanup
+ final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+ // test superuser
+ superUserAdmin.topics().unload(topic);
+ superUserAdmin.topics().triggerCompaction(topic);
+
superUserAdmin.topics().trimTopic(TopicName.get(topic).getPartition(0).getLocalName());
+ superUserAdmin.topicPolicies().getSchemaCompatibilityStrategy(topic,
false);
+
+ // test tenant manager
+ tenantManagerAdmin.topics().unload(topic);
+ tenantManagerAdmin.topics().triggerCompaction(topic);
+
tenantManagerAdmin.topics().trimTopic(TopicName.get(topic).getPartition(0).getLocalName());
+
tenantManagerAdmin.topicPolicies().getSchemaCompatibilityStrategy(topic, false);
+
+ // test nobody
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.topics().unload(topic));
+
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.topics().triggerCompaction(topic));
+
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.topics().trimTopic(TopicName.get(topic).getPartition(0).getLocalName()));
+
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.topicPolicies().getSchemaCompatibilityStrategy(topic, false));
+
+ // Test only super/admin can do the operation, other auth are not
permitted.
+ for (AuthAction action : AuthAction.values()) {
+ superUserAdmin.topics().grantPermission(topic, subject,
Set.of(action));
+
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.topics().unload(topic));
+
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.topics().triggerCompaction(topic));
+
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.topics().trimTopic(topic));
+
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.topicPolicies().getSchemaCompatibilityStrategy(topic, false));
+
+ superUserAdmin.topics().revokePermissions(topic, subject);
+ }
+ superUserAdmin.topics().deletePartitionedTopic(topic, true);
+ }
+
+ @Test
+ @SneakyThrows
+ public void testGetManagedLedgerInfo() {
+ final String random = UUID.randomUUID().toString();
+ final String topic = "persistent://public/default/" + random;
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ superUserAdmin.topics().createPartitionedTopic(topic, 2);
+
+ @Cleanup
+ final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+ // test superuser
+ superUserAdmin.topics().getInternalInfo(topic);
+
+ // test tenant manager
+ tenantManagerAdmin.topics().getInternalInfo(topic);
+
+ // test nobody
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.topics().getInternalInfo(topic));
+
+ for (AuthAction action : AuthAction.values()) {
+ superUserAdmin.topics().grantPermission(topic, subject,
Set.of(action));
+ if (action == AuthAction.produce || action == AuthAction.consume) {
+ subAdmin.topics().getInternalInfo(topic);
+ } else {
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.topics().getInternalInfo(topic));
+ }
+ superUserAdmin.topics().revokePermissions(topic, subject);
+ }
+ superUserAdmin.topics().deletePartitionedTopic(topic, true);
+ }
+
+ @Test
+ @SneakyThrows
+ public void testGetPartitionedStatsAndInternalStats() {
+ final String random = UUID.randomUUID().toString();
+ final String topic = "persistent://public/default/" + random;
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ superUserAdmin.topics().createPartitionedTopic(topic, 2);
+
+ @Cleanup
+ final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+ // test superuser
+ superUserAdmin.topics().getPartitionedStats(topic, false);
+ superUserAdmin.topics().getPartitionedInternalStats(topic);
+
+ // test tenant manager
+ tenantManagerAdmin.topics().getPartitionedStats(topic, false);
+ tenantManagerAdmin.topics().getPartitionedInternalStats(topic);
+
+ // test nobody
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.topics().getPartitionedStats(topic, false));
+
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.topics().getPartitionedInternalStats(topic));
+
+ for (AuthAction action : AuthAction.values()) {
+ superUserAdmin.topics().grantPermission(topic, subject,
Set.of(action));
+ if (action == AuthAction.produce || action == AuthAction.consume) {
+ subAdmin.topics().getPartitionedStats(topic, false);
+ subAdmin.topics().getPartitionedInternalStats(topic);
+ } else {
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.topics().getPartitionedStats(topic,
false));
+
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.topics().getPartitionedInternalStats(topic));
+ }
+ superUserAdmin.topics().revokePermissions(topic, subject);
+ }
+ superUserAdmin.topics().deletePartitionedTopic(topic, true);
+ }
+
+ @Test
+ @SneakyThrows
+ public void
testCreateSubscriptionAndUpdateSubscriptionPropertiesAndAnalyzeSubscriptionBacklog()
{
+ final String random = UUID.randomUUID().toString();
+ final String topic = "persistent://public/default/" + random;
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ superUserAdmin.topics().createPartitionedTopic(topic, 2);
+ AtomicInteger suffix = new AtomicInteger(1);
+ @Cleanup
+ final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+ //
+ superUserAdmin.topics().createSubscription(topic, "test-sub" +
suffix.incrementAndGet(), MessageId.earliest);
+
+ // test tenant manager
+ tenantManagerAdmin.topics().createSubscription(topic, "test-sub" +
suffix.incrementAndGet(), MessageId.earliest);
+
+ // test nobody
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.topics().createSubscription(topic, "test-sub" +
suffix.incrementAndGet(), MessageId.earliest));
+
+ for (AuthAction action : AuthAction.values()) {
+ superUserAdmin.topics().grantPermission(topic, subject,
Set.of(action));
+ if (action == AuthAction.consume) {
+ subAdmin.topics().createSubscription(topic, "test-sub" +
suffix.incrementAndGet(), MessageId.earliest);
+ } else {
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.topics().createSubscription(topic,
"test-sub" + suffix.incrementAndGet(), MessageId.earliest));
+ }
+ superUserAdmin.topics().revokePermissions(topic, subject);
+ }
+ // test UpdateSubscriptionProperties
+ Map<String, String> properties = new HashMap<>();
+ superUserAdmin.topics().createSubscription(topic, "test-sub",
MessageId.earliest);
+ // test superuser
+ superUserAdmin.topics().updateSubscriptionProperties(topic, "test-sub"
, properties);
+ superUserAdmin.topics().getSubscriptionProperties(topic, "test-sub");
+
superUserAdmin.topics().analyzeSubscriptionBacklog(TopicName.get(topic).getPartition(0).getLocalName(),
"test-sub", Optional.empty());
+
+ // test tenant manager
+ tenantManagerAdmin.topics().updateSubscriptionProperties(topic,
"test-sub" , properties);
+ tenantManagerAdmin.topics().getSubscriptionProperties(topic,
"test-sub");
+
tenantManagerAdmin.topics().analyzeSubscriptionBacklog(TopicName.get(topic).getPartition(0).getLocalName(),
"test-sub", Optional.empty());
+
+ // test nobody
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.topics().updateSubscriptionProperties(topic,
"test-sub", properties));
+
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.topics().getSubscriptionProperties(topic,
"test-sub"));
+
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.topics().analyzeSubscriptionBacklog(TopicName.get(topic).getPartition(0).getLocalName(),
"test-sub", Optional.empty()));
+
+ for (AuthAction action : AuthAction.values()) {
+ superUserAdmin.topics().grantPermission(topic, subject,
Set.of(action));
+ if (action == AuthAction.consume) {
+ subAdmin.topics().updateSubscriptionProperties(topic,
"test-sub", properties);
+ subAdmin.topics().getSubscriptionProperties(topic, "test-sub");
+
subAdmin.topics().analyzeSubscriptionBacklog(TopicName.get(topic).getPartition(0).getLocalName(),
"test-sub", Optional.empty());
+ } else {
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.topics().updateSubscriptionProperties(topic, "test-sub", properties));
+
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.topics().getSubscriptionProperties(topic, "test-sub"));
+
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.topics().analyzeSubscriptionBacklog(TopicName.get(topic).getPartition(0).getLocalName(),
"test-sub", Optional.empty()));
+ }
+ superUserAdmin.topics().revokePermissions(topic, subject);
+ }
+ superUserAdmin.topics().deletePartitionedTopic(topic, true);
+ }
+
+ @Test
+ @SneakyThrows
+ public void testCreateMissingPartition() {
+ final String random = UUID.randomUUID().toString();
+ final String topic = "persistent://public/default/" + random;
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ superUserAdmin.topics().createPartitionedTopic(topic, 2);
+ AtomicInteger suffix = new AtomicInteger(1);
+ @Cleanup
+ final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+ //
+ superUserAdmin.topics().createMissedPartitions(topic);
+
+ // test tenant manager
+ tenantManagerAdmin.topics().createMissedPartitions(topic);
+
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.topics().createMissedPartitions(topic));
+
+ for (AuthAction action : AuthAction.values()) {
+ superUserAdmin.topics().grantPermission(topic, subject,
Set.of(action));
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.topics().createMissedPartitions(topic));
+ superUserAdmin.topics().revokePermissions(topic, subject);
+ }
+ superUserAdmin.topics().deletePartitionedTopic(topic, true);
+ }
+}