This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new 5c4f4cb6416 [improve][broker] Add fine-grain authorization to ns/topic
management endpoints (#22305)
5c4f4cb6416 is described below
commit 5c4f4cb64167c0ea1a5775f6d991b386ad95c786
Author: Jiwei Guo <[email protected]>
AuthorDate: Wed Mar 20 13:49:54 2024 +0800
[improve][broker] Add fine-grain authorization to ns/topic management
endpoints (#22305)
(cherry picked from commit fd34d4ab9c5aa7e0dca961d5a8badae4613fbe8e)
# Conflicts:
#
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
(cherry picked from commit 99eb49a68982271562597d4c4cea127132bc0b35)
# Conflicts:
#
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
#
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
---
.../apache/pulsar/broker/admin/AdminResource.java | 7 +-
.../pulsar/broker/admin/impl/NamespacesBase.java | 166 ++++++------
.../broker/admin/impl/PersistentTopicsBase.java | 167 ++++++------
.../apache/pulsar/broker/admin/BaseAuthZTest.java | 107 ++++++++
.../pulsar/broker/admin/NamespaceAuthZTest.java | 117 +++++++++
.../apache/pulsar/broker/admin/TopicAuthZTest.java | 292 +++++++++++++++++++++
.../broker/admin/TopicPoliciesAuthZTest.java | 87 +-----
7 files changed, 688 insertions(+), 255 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 5eddb4cecf2..07459b9190a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -54,8 +54,6 @@ import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.Policies;
-import org.apache.pulsar.common.policies.data.PolicyName;
-import org.apache.pulsar.common.policies.data.PolicyOperation;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.SubscribeRate;
@@ -772,10 +770,7 @@ public abstract class AdminResource extends
PulsarWebResource {
}
protected CompletableFuture<SchemaCompatibilityStrategy>
getSchemaCompatibilityStrategyAsync() {
- return validateTopicPolicyOperationAsync(topicName,
- PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
- PolicyOperation.READ)
- .thenCompose((__) ->
getSchemaCompatibilityStrategyAsyncWithoutAuth()).whenComplete((__, ex) -> {
+ return
getSchemaCompatibilityStrategyAsyncWithoutAuth().whenComplete((__, ex) -> {
if (ex != null) {
log.error("[{}] Failed to get schema compatibility
strategy of topic {} {}",
clientAppId(), topicName, ex);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 0a602a631c2..fc111cd2370 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -2639,102 +2639,110 @@ public abstract class NamespacesBase extends
AdminResource {
}
protected void internalSetProperty(String key, String value, AsyncResponse
asyncResponse) {
- validatePoliciesReadOnlyAccess();
- updatePoliciesAsync(namespaceName, policies -> {
- policies.properties.put(key, value);
- return policies;
- }).thenAccept(v -> {
- log.info("[{}] Successfully set property for key {} on namespace
{}", clientAppId(), key,
- namespaceName);
- asyncResponse.resume(Response.noContent().build());
- }).exceptionally(ex -> {
- Throwable cause = ex.getCause();
- log.error("[{}] Failed to set property for key {} on namespace {}",
clientAppId(), key,
- namespaceName, cause);
- asyncResponse.resume(cause);
- return null;
- });
+ validateAdminAccessForTenantAsync(namespaceName.getTenant())
+ .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+ .thenCompose(__ -> updatePoliciesAsync(namespaceName, policies
-> {
+ policies.properties.put(key, value);
+ return policies;
+ }))
+ .thenAccept(v -> {
+ log.info("[{}] Successfully set property for key {} on
namespace {}", clientAppId(), key,
+ namespaceName);
+ asyncResponse.resume(Response.noContent().build());
+ }).exceptionally(ex -> {
+ Throwable cause = ex.getCause();
+ log.error("[{}] Failed to set property for key {} on
namespace {}", clientAppId(), key,
+ namespaceName, cause);
+ asyncResponse.resume(cause);
+ return null;
+ });
}
protected void internalSetProperties(Map<String, String> properties,
AsyncResponse asyncResponse) {
- validatePoliciesReadOnlyAccess();
- updatePoliciesAsync(namespaceName, policies -> {
- policies.properties.putAll(properties);
- return policies;
- }).thenAccept(v -> {
- log.info("[{}] Successfully set {} properties on namespace {}",
clientAppId(), properties.size(),
- namespaceName);
- asyncResponse.resume(Response.noContent().build());
- }).exceptionally(ex -> {
- Throwable cause = ex.getCause();
- log.error("[{}] Failed to set {} properties on namespace {}",
clientAppId(), properties.size(),
- namespaceName, cause);
- asyncResponse.resume(cause);
- return null;
- });
+ validateAdminAccessForTenantAsync(namespaceName.getTenant())
+ .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+ .thenCompose(__ -> updatePoliciesAsync(namespaceName, policies
-> {
+ policies.properties.putAll(properties);
+ return policies;
+ }))
+ .thenAccept(v -> {
+ log.info("[{}] Successfully set {} properties on namespace
{}", clientAppId(), properties.size(),
+ namespaceName);
+ asyncResponse.resume(Response.noContent().build());
+ }).exceptionally(ex -> {
+ Throwable cause = ex.getCause();
+ log.error("[{}] Failed to set {} properties on namespace
{}", clientAppId(), properties.size(),
+ namespaceName, cause);
+ asyncResponse.resume(cause);
+ return null;
+ });
}
protected void internalGetProperty(String key, AsyncResponse asyncResponse)
{
- getNamespacePoliciesAsync(namespaceName).thenAccept(policies -> {
- asyncResponse.resume(policies.properties.get(key));
- }).exceptionally(ex -> {
- Throwable cause = ex.getCause();
- log.error("[{}] Failed to get property for key {} of namespace
{}", clientAppId(), key,
- namespaceName, cause);
- asyncResponse.resume(cause);
- return null;
- });
+ validateAdminAccessForTenantAsync(namespaceName.getTenant())
+ .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+ .thenAccept(policies ->
asyncResponse.resume(policies.properties.get(key)))
+ .exceptionally(ex -> {
+ Throwable cause = ex.getCause();
+ log.error("[{}] Failed to get property for key {} of
namespace {}", clientAppId(), key,
+ namespaceName, cause);
+ asyncResponse.resume(cause);
+ return null;
+ });
}
protected void internalGetProperties(AsyncResponse asyncResponse) {
- getNamespacePoliciesAsync(namespaceName).thenAccept(policies -> {
- asyncResponse.resume(policies.properties);
- }).exceptionally(ex -> {
- Throwable cause = ex.getCause();
- log.error("[{}] Failed to get properties of namespace {}",
clientAppId(), namespaceName, cause);
- asyncResponse.resume(cause);
- return null;
- });
+ validateAdminAccessForTenantAsync(namespaceName.getTenant())
+ .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+ .thenAccept(policies ->
asyncResponse.resume(policies.properties))
+ .exceptionally(ex -> {
+ Throwable cause = ex.getCause();
+ log.error("[{}] Failed to get properties of namespace {}",
clientAppId(), namespaceName, cause);
+ asyncResponse.resume(cause);
+ return null;
+ });
}
protected void internalRemoveProperty(String key, AsyncResponse
asyncResponse) {
- validatePoliciesReadOnlyAccess();
-
AtomicReference<String> oldVal = new AtomicReference<>(null);
- updatePoliciesAsync(namespaceName, policies -> {
- oldVal.set(policies.properties.remove(key));
- return policies;
- }).thenAccept(v -> {
- asyncResponse.resume(oldVal.get());
- log.info("[{}] Successfully remove property for key {} on namespace
{}", clientAppId(), key,
- namespaceName);
- }).exceptionally(ex -> {
- Throwable cause = ex.getCause();
- log.error("[{}] Failed to remove property for key {} on namespace
{}", clientAppId(), key,
- namespaceName, cause);
- asyncResponse.resume(cause);
- return null;
- });
+ validateAdminAccessForTenantAsync(namespaceName.getTenant())
+ .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+ .thenCompose(__ -> updatePoliciesAsync(namespaceName, policies
-> {
+ oldVal.set(policies.properties.remove(key));
+ return policies;
+ })).thenAccept(v -> {
+ asyncResponse.resume(oldVal.get());
+ log.info("[{}] Successfully remove property for key {} on
namespace {}", clientAppId(), key,
+ namespaceName);
+ }).exceptionally(ex -> {
+ Throwable cause = ex.getCause();
+ log.error("[{}] Failed to remove property for key {} on
namespace {}", clientAppId(), key,
+ namespaceName, cause);
+ asyncResponse.resume(cause);
+ return null;
+ });
}
protected void internalClearProperties(AsyncResponse asyncResponse) {
- validatePoliciesReadOnlyAccess();
AtomicReference<Integer> clearedCount = new AtomicReference<>(0);
- updatePoliciesAsync(namespaceName, policies -> {
- clearedCount.set(policies.properties.size());
- policies.properties.clear();
- return policies;
- }).thenAccept(v -> {
- asyncResponse.resume(Response.noContent().build());
- log.info("[{}] Successfully clear {} properties on namespace {}",
clientAppId(), clearedCount.get(),
- namespaceName);
- }).exceptionally(ex -> {
- Throwable cause = ex.getCause();
- log.error("[{}] Failed to clear {} properties on namespace {}",
clientAppId(), clearedCount.get(),
- namespaceName, cause);
- asyncResponse.resume(cause);
- return null;
- });
+ validateAdminAccessForTenantAsync(namespaceName.getTenant())
+ .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+ .thenCompose(__ -> updatePoliciesAsync(namespaceName, policies
-> {
+ clearedCount.set(policies.properties.size());
+ policies.properties.clear();
+ return policies;
+ }))
+ .thenAccept(v -> {
+ asyncResponse.resume(Response.noContent().build());
+ log.info("[{}] Successfully clear {} properties on
namespace {}", clientAppId(), clearedCount.get(),
+ namespaceName);
+ }).exceptionally(ex -> {
+ Throwable cause = ex.getCause();
+ log.error("[{}] Failed to clear {} properties on namespace
{}", clientAppId(), clearedCount.get(),
+ namespaceName, cause);
+ asyncResponse.resume(cause);
+ return null;
+ });
}
private CompletableFuture<Void> updatePoliciesAsync(NamespaceName ns,
Function<Policies, Policies> updateFunction) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 3c1a43a9c69..8f6191938ed 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -503,7 +503,9 @@ public class PersistentTopicsBase extends AdminResource {
protected void internalCreateMissedPartitions(AsyncResponse asyncResponse)
{
getPartitionedTopicMetadataAsync(topicName, false,
false).thenAccept(metadata -> {
if (metadata != null) {
- tryCreatePartitionsAsync(metadata.partitions).thenAccept(v -> {
+ CompletableFuture<Void> future =
validateNamespaceOperationAsync(topicName.getNamespaceObject(),
+ NamespaceOperation.CREATE_TOPIC);
+ future.thenCompose(__ ->
tryCreatePartitionsAsync(metadata.partitions)).thenAccept(v -> {
asyncResponse.resume(Response.noContent().build());
}).exceptionally(e -> {
log.error("[{}] Failed to create partitions for topic {}",
clientAppId(), topicName);
@@ -741,13 +743,13 @@ public class PersistentTopicsBase extends AdminResource {
protected void internalUnloadTopic(AsyncResponse asyncResponse, boolean
authoritative) {
log.info("[{}] Unloading topic {}", clientAppId(), topicName);
- CompletableFuture<Void> future;
- if (topicName.isGlobal()) {
- future = validateGlobalNamespaceOwnershipAsync(namespaceName);
- } else {
- future = CompletableFuture.completedFuture(null);
- }
- future.thenAccept(__ -> {
+ CompletableFuture<Void> future =
validateTopicOperationAsync(topicName, TopicOperation.UNLOAD);
+ future.thenCompose(__ -> {
+ if (topicName.isGlobal()) {
+ return validateGlobalNamespaceOwnershipAsync(namespaceName);
+ }
+ return CompletableFuture.completedFuture(null);
+ }).thenAccept(__ -> {
// If the topic name is a partition name, no need to get partition
topic metadata again
if (topicName.isPartitioned()) {
if (isTransactionCoordinatorAssign(topicName)) {
@@ -965,13 +967,12 @@ public class PersistentTopicsBase extends AdminResource {
private void internalUnloadNonPartitionedTopicAsync(AsyncResponse
asyncResponse, boolean authoritative) {
validateTopicOwnershipAsync(topicName, authoritative)
- .thenCompose(unused -> validateTopicOperationAsync(topicName,
TopicOperation.UNLOAD)
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenCompose(topic -> topic.close(false))
.thenRun(() -> {
log.info("[{}] Successfully unloaded topic {}",
clientAppId(), topicName);
asyncResponse.resume(Response.noContent().build());
- }))
+ })
.exceptionally(ex -> {
// If the exception is not redirect exception we need to
log it.
if (!isNot307And404Exception(ex)) {
@@ -984,16 +985,14 @@ public class PersistentTopicsBase extends AdminResource {
private void internalUnloadTransactionCoordinatorAsync(AsyncResponse
asyncResponse, boolean authoritative) {
validateTopicOwnershipAsync(topicName, authoritative)
- .thenCompose(__ -> validateTopicOperationAsync(topicName,
TopicOperation.UNLOAD)
- .thenCompose(v -> pulsar()
- .getTransactionMetadataStoreService()
- .removeTransactionMetadataStore(
-
TransactionCoordinatorID.get(topicName.getPartitionIndex())))
- .thenRun(() -> {
- log.info("[{}] Successfully unloaded tc {}",
clientAppId(),
- topicName.getPartitionIndex());
- asyncResponse.resume(Response.noContent().build());
- }))
+ .thenCompose(v -> pulsar()
+ .getTransactionMetadataStoreService()
+ .removeTransactionMetadataStore(
+
TransactionCoordinatorID.get(topicName.getPartitionIndex())))
+ .thenRun(() -> {
+ log.info("[{}] Successfully unloaded tc {}",
clientAppId(), topicName.getPartitionIndex());
+ asyncResponse.resume(Response.noContent().build());
+ })
.exceptionally(ex -> {
// If the exception is not redirect exception we need to
log it.
if (!isNot307And404Exception(ex)) {
@@ -1204,13 +1203,13 @@ public class PersistentTopicsBase extends AdminResource
{
}
protected void internalGetManagedLedgerInfo(AsyncResponse asyncResponse,
boolean authoritative) {
- CompletableFuture<Void> future;
- if (topicName.isGlobal()) {
- future = validateGlobalNamespaceOwnershipAsync(namespaceName);
- } else {
- future = CompletableFuture.completedFuture(null);
- }
- future.thenAccept(__ -> {
+ CompletableFuture<Void> future =
validateTopicOperationAsync(topicName, TopicOperation.GET_STATS);
+ future.thenCompose(__ -> {
+ if (topicName.isGlobal()) {
+ return validateGlobalNamespaceOwnershipAsync(namespaceName);
+ }
+ return CompletableFuture.completedFuture(null);
+ }).thenAccept(__ -> {
// If the topic name is a partition name, no need to get partition
topic metadata again
if (topicName.isPartitioned()) {
internalGetManagedLedgerInfoForNonPartitionedTopic(asyncResponse);
@@ -1317,13 +1316,13 @@ public class PersistentTopicsBase extends AdminResource
{
protected void internalGetPartitionedStats(AsyncResponse asyncResponse,
boolean authoritative, boolean perPartition,
boolean getPreciseBacklog,
boolean subscriptionBacklogSize,
boolean
getEarliestTimeInBacklog) {
- CompletableFuture<Void> future;
- if (topicName.isGlobal()) {
- future = validateGlobalNamespaceOwnershipAsync(namespaceName);
- } else {
- future = CompletableFuture.completedFuture(null);
- }
- future.thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName,
+ CompletableFuture<Void> future =
validateTopicOperationAsync(topicName, TopicOperation.GET_STATS);
+ future.thenCompose(__ -> {
+ if (topicName.isGlobal()) {
+ return validateGlobalNamespaceOwnershipAsync(namespaceName);
+ }
+ return CompletableFuture.completedFuture(null);
+ }).thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName,
authoritative, false)).thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions == 0) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
@@ -1403,14 +1402,15 @@ public class PersistentTopicsBase extends AdminResource
{
}
protected void internalGetPartitionedStatsInternal(AsyncResponse
asyncResponse, boolean authoritative) {
- CompletableFuture<Void> future;
- if (topicName.isGlobal()) {
- future = validateGlobalNamespaceOwnershipAsync(namespaceName);
- } else {
- future = CompletableFuture.completedFuture(null);
- }
- future.thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName,
authoritative, false))
- .thenAccept(partitionMetadata -> {
+ CompletableFuture<Void> future =
validateTopicOperationAsync(topicName, TopicOperation.GET_STATS);
+ future.thenCompose(__ -> {
+ if (topicName.isGlobal()) {
+ return validateGlobalNamespaceOwnershipAsync(namespaceName);
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
+ }).thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName,
authoritative, false))
+ .thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions == 0) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
getPartitionedTopicNotFoundErrorMessage(topicName.toString())));
@@ -2213,13 +2213,14 @@ public class PersistentTopicsBase extends AdminResource
{
protected void internalCreateSubscription(AsyncResponse asyncResponse,
String subscriptionName,
MessageIdImpl messageId, boolean authoritative, boolean
replicated, Map<String, String> properties) {
- CompletableFuture<Void> ret;
- if (topicName.isGlobal()) {
- ret = validateGlobalNamespaceOwnershipAsync(namespaceName);
- } else {
- ret = CompletableFuture.completedFuture(null);
- }
- ret.thenAccept(__ -> {
+ CompletableFuture<Void> ret = validateTopicOperationAsync(topicName,
TopicOperation.SUBSCRIBE,
+ subscriptionName);
+ ret.thenCompose(__ -> {
+ if (topicName.isGlobal()) {
+ return validateGlobalNamespaceOwnershipAsync(namespaceName);
+ }
+ return CompletableFuture.completedFuture(null);
+ }).thenAccept(__ -> {
final MessageIdImpl targetMessageId = messageId == null ?
(MessageIdImpl) MessageId.latest : messageId;
log.info("[{}][{}] Creating subscription {} at message id {} with
properties {}", clientAppId(),
topicName, subscriptionName, targetMessageId, properties);
@@ -2378,14 +2379,13 @@ public class PersistentTopicsBase extends AdminResource
{
protected void internalUpdateSubscriptionProperties(AsyncResponse
asyncResponse, String subName,
Map<String, String>
subscriptionProperties,
boolean authoritative)
{
- CompletableFuture<Void> future;
- if (topicName.isGlobal()) {
- future = validateGlobalNamespaceOwnershipAsync(namespaceName);
- } else {
- future = CompletableFuture.completedFuture(null);
- }
-
- future.thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative)).thenAccept(__ -> {
+ CompletableFuture<Void> future =
validateTopicOperationAsync(topicName, TopicOperation.SUBSCRIBE, subName);
+ future.thenCompose(__ -> {
+ if (topicName.isGlobal()) {
+ return validateGlobalNamespaceOwnershipAsync(namespaceName);
+ }
+ return CompletableFuture.completedFuture(null);
+ }).thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative)).thenAccept(__ -> {
if (topicName.isPartitioned()) {
internalUpdateSubscriptionPropertiesForNonPartitionedTopic(asyncResponse,
subName,
subscriptionProperties, authoritative);
@@ -2457,14 +2457,13 @@ public class PersistentTopicsBase extends AdminResource
{
protected void internalAnalyzeSubscriptionBacklog(AsyncResponse
asyncResponse, String subName,
Optional<Position>
position,
boolean authoritative) {
- CompletableFuture<Void> future;
- if (topicName.isGlobal()) {
- future = validateGlobalNamespaceOwnershipAsync(namespaceName);
- } else {
- future = CompletableFuture.completedFuture(null);
- }
-
- future.thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative))
+ CompletableFuture<Void> future =
validateTopicOperationAsync(topicName, TopicOperation.CONSUME, subName);
+ future.thenCompose(__ -> {
+ if (topicName.isGlobal()) {
+ return validateGlobalNamespaceOwnershipAsync(namespaceName);
+ }
+ return CompletableFuture.completedFuture(null);
+ }).thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative))
.thenCompose(__ -> {
if (topicName.isPartitioned()) {
return CompletableFuture.completedFuture(null);
@@ -2496,14 +2495,13 @@ public class PersistentTopicsBase extends AdminResource
{
protected void internalGetSubscriptionProperties(AsyncResponse
asyncResponse, String subName,
boolean authoritative)
{
- CompletableFuture<Void> future;
- if (topicName.isGlobal()) {
- future = validateGlobalNamespaceOwnershipAsync(namespaceName);
- } else {
- future = CompletableFuture.completedFuture(null);
- }
-
- future.thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative)).thenAccept(__ -> {
+ CompletableFuture<Void> future =
validateTopicOperationAsync(topicName, TopicOperation.CONSUME, subName);
+ future.thenCompose(__ -> {
+ if (topicName.isGlobal()) {
+ return validateGlobalNamespaceOwnershipAsync(namespaceName);
+ }
+ return CompletableFuture.completedFuture(null);
+ }).thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative)).thenAccept(__ -> {
if (topicName.isPartitioned()) {
internalGetSubscriptionPropertiesForNonPartitionedTopic(asyncResponse, subName,
authoritative);
@@ -4119,13 +4117,14 @@ public class PersistentTopicsBase extends AdminResource
{
protected void internalTriggerCompaction(AsyncResponse asyncResponse,
boolean authoritative) {
log.info("[{}] Trigger compaction on topic {}", clientAppId(),
topicName);
- CompletableFuture<Void> future;
- if (topicName.isGlobal()) {
- future = validateGlobalNamespaceOwnershipAsync(namespaceName);
- } else {
- future = CompletableFuture.completedFuture(null);
- }
- future.thenAccept(__ -> {
+ CompletableFuture<Void> future =
validateTopicOperationAsync(topicName, TopicOperation.COMPACT);
+ future.thenCompose(__ -> {
+ if (topicName.isGlobal()) {
+ return validateGlobalNamespaceOwnershipAsync(namespaceName);
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
+ }).thenAccept(__ -> {
// If the topic name is a partition name, no need to get partition
topic metadata again
if (topicName.isPartitioned()) {
internalTriggerCompactionNonPartitionedTopic(asyncResponse,
authoritative);
@@ -5344,12 +5343,12 @@ public class PersistentTopicsBase extends AdminResource
{
}
protected CompletableFuture<SchemaCompatibilityStrategy>
internalGetSchemaCompatibilityStrategy(boolean applied) {
+ CompletableFuture<Void> future =
validateTopicPolicyOperationAsync(topicName,
+ PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
PolicyOperation.READ);
if (applied) {
- return getSchemaCompatibilityStrategyAsync();
+ return future.thenCompose(__ ->
getSchemaCompatibilityStrategyAsync());
}
- return validateTopicPolicyOperationAsync(topicName,
- PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
- PolicyOperation.READ)
+ return future
.thenCompose(n ->
getTopicPoliciesAsyncWithRetry(topicName).thenApply(op -> {
if (!op.isPresent()) {
return null;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BaseAuthZTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BaseAuthZTest.java
new file mode 100644
index 00000000000..2300a92e2df
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BaseAuthZTest.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import io.jsonwebtoken.Jwts;
+import io.jsonwebtoken.SignatureAlgorithm;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import javax.crypto.SecretKey;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+
+public abstract class BaseAuthZTest extends MockedPulsarServiceBaseTest {
+ protected static final SecretKey SECRET_KEY =
AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+ private static final String TENANT_ADMIN_SUBJECT =
UUID.randomUUID().toString();
+ private static final String TENANT_ADMIN_TOKEN = Jwts.builder()
+ .claim("sub", TENANT_ADMIN_SUBJECT).signWith(SECRET_KEY).compact();
+ private static final String BROKER_INTERNAL_CLIENT_SUBJECT =
"broker_internal";
+ private static final String BROKER_INTERNAL_CLIENT_TOKEN = Jwts.builder()
+ .claim("sub",
BROKER_INTERNAL_CLIENT_SUBJECT).signWith(SECRET_KEY).compact();
+ private static final String SUPER_USER_SUBJECT = "super-user";
+ private static final String SUPER_USER_TOKEN = Jwts.builder()
+ .claim("sub", SUPER_USER_SUBJECT).signWith(SECRET_KEY).compact();
+ private static final String NOBODY_SUBJECT = "nobody";
+ private static final String NOBODY_TOKEN = Jwts.builder()
+ .claim("sub", NOBODY_SUBJECT).signWith(SECRET_KEY).compact();
+ protected PulsarAdmin superUserAdmin;
+ protected PulsarAdmin tenantManagerAdmin;
+
+ @BeforeClass
+ @Override
+ protected void setup() throws Exception {
+ conf.setAuthorizationEnabled(true);
+
conf.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
+ conf.setSuperUserRoles(Set.of(SUPER_USER_SUBJECT,
BROKER_INTERNAL_CLIENT_SUBJECT));
+ conf.setAuthenticationEnabled(true);
+
conf.setAuthenticationProviders(Set.of(AuthenticationProviderToken.class.getName()));
+ // internal client
+
conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
+ final Map<String, String> brokerClientAuthParams = new HashMap<>();
+ brokerClientAuthParams.put("token", BROKER_INTERNAL_CLIENT_TOKEN);
+ final String brokerClientAuthParamStr =
ObjectMapperFactory.getThreadLocal()
+ .writeValueAsString(brokerClientAuthParams);
+ conf.setBrokerClientAuthenticationParameters(brokerClientAuthParamStr);
+
+ Properties properties = conf.getProperties();
+ if (properties == null) {
+ properties = new Properties();
+ conf.setProperties(properties);
+ }
+ properties.put("tokenSecretKey",
AuthTokenUtils.encodeKeyBase64(SECRET_KEY));
+
+ internalSetup();
+ setupDefaultTenantAndNamespace();
+
+ this.superUserAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(pulsar.getWebServiceAddress())
+ .authentication(new AuthenticationToken(SUPER_USER_TOKEN))
+ .build();
+ final TenantInfo tenantInfo =
superUserAdmin.tenants().getTenantInfo("public");
+ tenantInfo.getAdminRoles().add(TENANT_ADMIN_SUBJECT);
+ superUserAdmin.tenants().updateTenant("public", tenantInfo);
+ this.tenantManagerAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(pulsar.getWebServiceAddress())
+ .authentication(new AuthenticationToken(TENANT_ADMIN_TOKEN))
+ .build();
+ }
+
+ @Override
+ protected void customizeNewPulsarAdminBuilder(PulsarAdminBuilder
pulsarAdminBuilder) {
+ pulsarAdminBuilder.authentication(new
AuthenticationToken(SUPER_USER_TOKEN));
+ }
+
+ @AfterClass
+ @Override
+ protected void cleanup() throws Exception {
+ internalCleanup();
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java
new file mode 100644
index 00000000000..a96de5a1136
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.broker.admin;
+
+import io.jsonwebtoken.Jwts;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import lombok.Cleanup;
+import lombok.SneakyThrows;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-admin")
+public class NamespaceAuthZTest extends BaseAuthZTest {
+ @SneakyThrows
+ @Test
+ public void testProperties() {
+ final String random = UUID.randomUUID().toString();
+ final String namespace = "public/default";
+ final String topic = "persistent://public/default/" + random;
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ superUserAdmin.topics().createNonPartitionedTopic(topic);
+
+ @Cleanup
+ final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(pulsar.getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+ // test superuser
+ Map<String, String> properties = new HashMap<>();
+ properties.put("key1", "value1");
+ superUserAdmin.namespaces().setProperties(namespace, properties);
+ superUserAdmin.namespaces().setProperty(namespace, "key2", "value2");
+ superUserAdmin.namespaces().getProperties(namespace);
+ superUserAdmin.namespaces().getProperty(namespace, "key2");
+ superUserAdmin.namespaces().removeProperty(namespace, "key2");
+ superUserAdmin.namespaces().clearProperties(namespace);
+
+ // test tenant manager
+ tenantManagerAdmin.namespaces().setProperties(namespace, properties);
+ tenantManagerAdmin.namespaces().setProperty(namespace, "key2",
"value2");
+ tenantManagerAdmin.namespaces().getProperties(namespace);
+ tenantManagerAdmin.namespaces().getProperty(namespace, "key2");
+ tenantManagerAdmin.namespaces().removeProperty(namespace, "key2");
+ tenantManagerAdmin.namespaces().clearProperties(namespace);
+
+ // test nobody
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().setProperties(namespace,
properties));
+
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().setProperty(namespace, "key2",
"value2"));
+
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().getProperties(namespace));
+
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().getProperty(namespace, "key2"));
+
+
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().removeProperty(namespace, "key2"));
+
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().clearProperties(namespace));
+
+ for (AuthAction action : AuthAction.values()) {
+ superUserAdmin.namespaces().grantPermissionOnNamespace(namespace,
subject, Set.of(action));
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().setProperties(namespace,
properties));
+
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().setProperty(namespace, "key2",
"value2"));
+
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().getProperties(namespace));
+
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().getProperty(namespace,
"key2"));
+
+
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().removeProperty(namespace,
"key2"));
+
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.namespaces().clearProperties(namespace));
+
+
superUserAdmin.namespaces().revokePermissionsOnNamespace(namespace, subject);
+ }
+ superUserAdmin.topics().delete(topic, true);
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java
new file mode 100644
index 00000000000..9da4f6fcc40
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java
@@ -0,0 +1,292 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.broker.admin;
+
+import io.jsonwebtoken.Jwts;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.Cleanup;
+import lombok.SneakyThrows;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-admin")
+public class TopicAuthZTest extends BaseAuthZTest {
+
+ @SneakyThrows
+ @Test
+ public void testUnloadAndCompactAndTrim() {
+ final String random = UUID.randomUUID().toString();
+ final String topic = "persistent://public/default/" + random;
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ superUserAdmin.topics().createPartitionedTopic(topic, 2);
+
+ @Cleanup
+ final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(pulsar.getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+ // test superuser
+ superUserAdmin.topics().unload(topic);
+ superUserAdmin.topics().triggerCompaction(topic);
+ superUserAdmin.topicPolicies().getSchemaCompatibilityStrategy(topic,
false);
+
+ // test tenant manager
+ tenantManagerAdmin.topics().unload(topic);
+ tenantManagerAdmin.topics().triggerCompaction(topic);
+
tenantManagerAdmin.topicPolicies().getSchemaCompatibilityStrategy(topic, false);
+
+ // test nobody
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.topics().unload(topic));
+
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.topics().triggerCompaction(topic));
+
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.topicPolicies().getSchemaCompatibilityStrategy(topic, false));
+
+ // Test only super/admin can do the operation, other auth are not
permitted.
+ for (AuthAction action : AuthAction.values()) {
+ superUserAdmin.topics().grantPermission(topic, subject,
Set.of(action));
+
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.topics().unload(topic));
+
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.topics().triggerCompaction(topic));
+
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.topicPolicies().getSchemaCompatibilityStrategy(topic, false));
+
+ superUserAdmin.topics().revokePermissions(topic, subject);
+ }
+ superUserAdmin.topics().deletePartitionedTopic(topic, true);
+ }
+
+ @Test
+ @SneakyThrows
+ public void testGetManagedLedgerInfo() {
+ final String random = UUID.randomUUID().toString();
+ final String topic = "persistent://public/default/" + random;
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ superUserAdmin.topics().createPartitionedTopic(topic, 2);
+
+ @Cleanup
+ final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(pulsar.getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+ // test superuser
+ superUserAdmin.topics().getInternalInfo(topic);
+
+ // test tenant manager
+ tenantManagerAdmin.topics().getInternalInfo(topic);
+
+ // test nobody
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.topics().getInternalInfo(topic));
+
+ for (AuthAction action : AuthAction.values()) {
+ superUserAdmin.topics().grantPermission(topic, subject,
Set.of(action));
+ if (action == AuthAction.produce || action == AuthAction.consume) {
+ subAdmin.topics().getInternalInfo(topic);
+ } else {
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.topics().getInternalInfo(topic));
+ }
+ superUserAdmin.topics().revokePermissions(topic, subject);
+ }
+ superUserAdmin.topics().deletePartitionedTopic(topic, true);
+ }
+
+ @Test
+ @SneakyThrows
+ public void testGetPartitionedStatsAndInternalStats() {
+ final String random = UUID.randomUUID().toString();
+ final String topic = "persistent://public/default/" + random;
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ superUserAdmin.topics().createPartitionedTopic(topic, 2);
+
+ @Cleanup
+ final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(pulsar.getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+ // test superuser
+ superUserAdmin.topics().getPartitionedStats(topic, false);
+ superUserAdmin.topics().getPartitionedInternalStats(topic);
+
+ // test tenant manager
+ tenantManagerAdmin.topics().getPartitionedStats(topic, false);
+ tenantManagerAdmin.topics().getPartitionedInternalStats(topic);
+
+ // test nobody
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.topics().getPartitionedStats(topic, false));
+
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.topics().getPartitionedInternalStats(topic));
+
+ for (AuthAction action : AuthAction.values()) {
+ superUserAdmin.topics().grantPermission(topic, subject,
Set.of(action));
+ if (action == AuthAction.produce || action == AuthAction.consume) {
+ subAdmin.topics().getPartitionedStats(topic, false);
+ subAdmin.topics().getPartitionedInternalStats(topic);
+ } else {
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.topics().getPartitionedStats(topic,
false));
+
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.topics().getPartitionedInternalStats(topic));
+ }
+ superUserAdmin.topics().revokePermissions(topic, subject);
+ }
+ superUserAdmin.topics().deletePartitionedTopic(topic, true);
+ }
+
+ @Test
+ @SneakyThrows
+ public void
testCreateSubscriptionAndUpdateSubscriptionPropertiesAndAnalyzeSubscriptionBacklog()
{
+ final String random = UUID.randomUUID().toString();
+ final String topic = "persistent://public/default/" + random;
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ superUserAdmin.topics().createPartitionedTopic(topic, 2);
+ AtomicInteger suffix = new AtomicInteger(1);
+ @Cleanup
+ final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(pulsar.getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+ //
+ superUserAdmin.topics().createSubscription(topic, "test-sub" +
suffix.incrementAndGet(), MessageId.earliest);
+
+ // test tenant manager
+ tenantManagerAdmin.topics().createSubscription(topic, "test-sub" +
suffix.incrementAndGet(), MessageId.earliest);
+
+ // test nobody
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.topics().createSubscription(topic, "test-sub" +
suffix.incrementAndGet(), MessageId.earliest));
+
+ for (AuthAction action : AuthAction.values()) {
+ superUserAdmin.topics().grantPermission(topic, subject,
Set.of(action));
+ if (action == AuthAction.consume) {
+ subAdmin.topics().createSubscription(topic, "test-sub" +
suffix.incrementAndGet(), MessageId.earliest);
+ } else {
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.topics().createSubscription(topic,
"test-sub" + suffix.incrementAndGet(), MessageId.earliest));
+ }
+ superUserAdmin.topics().revokePermissions(topic, subject);
+ }
+ // test UpdateSubscriptionProperties
+ Map<String, String> properties = new HashMap<>();
+ superUserAdmin.topics().createSubscription(topic, "test-sub",
MessageId.earliest);
+ // test superuser
+ superUserAdmin.topics().updateSubscriptionProperties(topic, "test-sub"
, properties);
+ superUserAdmin.topics().getSubscriptionProperties(topic, "test-sub");
+
superUserAdmin.topics().analyzeSubscriptionBacklog(TopicName.get(topic).getPartition(0).getLocalName(),
"test-sub", Optional.empty());
+
+ // test tenant manager
+ tenantManagerAdmin.topics().updateSubscriptionProperties(topic,
"test-sub" , properties);
+ tenantManagerAdmin.topics().getSubscriptionProperties(topic,
"test-sub");
+
tenantManagerAdmin.topics().analyzeSubscriptionBacklog(TopicName.get(topic).getPartition(0).getLocalName(),
"test-sub", Optional.empty());
+
+ // test nobody
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.topics().updateSubscriptionProperties(topic,
"test-sub", properties));
+
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.topics().getSubscriptionProperties(topic,
"test-sub"));
+
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.topics().analyzeSubscriptionBacklog(TopicName.get(topic).getPartition(0).getLocalName(),
"test-sub", Optional.empty()));
+
+ for (AuthAction action : AuthAction.values()) {
+ superUserAdmin.topics().grantPermission(topic, subject,
Set.of(action));
+ if (action == AuthAction.consume) {
+ subAdmin.topics().updateSubscriptionProperties(topic,
"test-sub", properties);
+ subAdmin.topics().getSubscriptionProperties(topic, "test-sub");
+
subAdmin.topics().analyzeSubscriptionBacklog(TopicName.get(topic).getPartition(0).getLocalName(),
"test-sub", Optional.empty());
+ } else {
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.topics().updateSubscriptionProperties(topic, "test-sub", properties));
+
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.topics().getSubscriptionProperties(topic, "test-sub"));
+
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
subAdmin.topics().analyzeSubscriptionBacklog(TopicName.get(topic).getPartition(0).getLocalName(),
"test-sub", Optional.empty()));
+ }
+ superUserAdmin.topics().revokePermissions(topic, subject);
+ }
+ superUserAdmin.topics().deletePartitionedTopic(topic, true);
+ }
+
+ @Test
+ @SneakyThrows
+ public void testCreateMissingPartition() {
+ final String random = UUID.randomUUID().toString();
+ final String topic = "persistent://public/default/" + random;
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ superUserAdmin.topics().createPartitionedTopic(topic, 2);
+ AtomicInteger suffix = new AtomicInteger(1);
+ @Cleanup
+ final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(pulsar.getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+ //
+ superUserAdmin.topics().createMissedPartitions(topic);
+
+ // test tenant manager
+ tenantManagerAdmin.topics().createMissedPartitions(topic);
+
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.topics().createMissedPartitions(topic));
+
+ for (AuthAction action : AuthAction.values()) {
+ superUserAdmin.topics().grantPermission(topic, subject,
Set.of(action));
+
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> subAdmin.topics().createMissedPartitions(topic));
+ superUserAdmin.topics().revokePermissions(topic, subject);
+ }
+ superUserAdmin.topics().deletePartitionedTopic(topic, true);
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesAuthZTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesAuthZTest.java
index 5966fe81e44..660c8b08378 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesAuthZTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesAuthZTest.java
@@ -20,107 +20,22 @@ package org.apache.pulsar.broker.admin;
import static org.awaitility.Awaitility.await;
import io.jsonwebtoken.Jwts;
-import io.jsonwebtoken.SignatureAlgorithm;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
import java.util.Set;
import java.util.UUID;
-import javax.crypto.SecretKey;
import lombok.Cleanup;
import lombok.SneakyThrows;
-import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
-import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
-import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
-import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
-import org.apache.pulsar.common.policies.data.TenantInfo;
-import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-public final class TopicPoliciesAuthZTest extends MockedPulsarServiceBaseTest {
-
- private PulsarAdmin superUserAdmin;
-
- private PulsarAdmin tenantManagerAdmin;
-
- private static final SecretKey SECRET_KEY =
AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
- private static final String TENANT_ADMIN_SUBJECT =
UUID.randomUUID().toString();
- private static final String TENANT_ADMIN_TOKEN = Jwts.builder()
- .claim("sub", TENANT_ADMIN_SUBJECT).signWith(SECRET_KEY).compact();
-
-
- private static final String BROKER_INTERNAL_CLIENT_SUBJECT =
"broker_internal";
- private static final String BROKER_INTERNAL_CLIENT_TOKEN = Jwts.builder()
- .claim("sub",
BROKER_INTERNAL_CLIENT_SUBJECT).signWith(SECRET_KEY).compact();
- private static final String SUPER_USER_SUBJECT = "super-user";
- private static final String SUPER_USER_TOKEN = Jwts.builder()
- .claim("sub", SUPER_USER_SUBJECT).signWith(SECRET_KEY).compact();
- private static final String NOBODY_SUBJECT = "nobody";
- private static final String NOBODY_TOKEN = Jwts.builder()
- .claim("sub", NOBODY_SUBJECT).signWith(SECRET_KEY).compact();
-
-
- @BeforeClass
- @Override
- protected void setup() throws Exception {
- conf.setAuthorizationEnabled(true);
-
conf.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
- conf.setSuperUserRoles(Set.of(SUPER_USER_SUBJECT,
BROKER_INTERNAL_CLIENT_SUBJECT));
- conf.setAuthenticationEnabled(true);
-
conf.setAuthenticationProviders(Set.of(AuthenticationProviderToken.class.getName()));
- // internal client
-
conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
- final Map<String, String> brokerClientAuthParams = new HashMap<>();
- brokerClientAuthParams.put("token", BROKER_INTERNAL_CLIENT_TOKEN);
- final String brokerClientAuthParamStr =
ObjectMapperFactory.getThreadLocal()
- .writeValueAsString(brokerClientAuthParams);
- conf.setBrokerClientAuthenticationParameters(brokerClientAuthParamStr);
-
- Properties properties = conf.getProperties();
- if (properties == null) {
- properties = new Properties();
- conf.setProperties(properties);
- }
- properties.put("tokenSecretKey",
AuthTokenUtils.encodeKeyBase64(SECRET_KEY));
-
- internalSetup();
- setupDefaultTenantAndNamespace();
-
- this.superUserAdmin =PulsarAdmin.builder()
- .serviceHttpUrl(pulsar.getWebServiceAddress())
- .authentication(new AuthenticationToken(SUPER_USER_TOKEN))
- .build();
- final TenantInfo tenantInfo =
superUserAdmin.tenants().getTenantInfo("public");
- tenantInfo.getAdminRoles().add(TENANT_ADMIN_SUBJECT);
- superUserAdmin.tenants().updateTenant("public", tenantInfo);
- this.tenantManagerAdmin = PulsarAdmin.builder()
- .serviceHttpUrl(pulsar.getWebServiceAddress())
- .authentication(new AuthenticationToken(TENANT_ADMIN_TOKEN))
- .build();
- }
-
- @Override
- protected void customizeNewPulsarAdminBuilder(PulsarAdminBuilder
pulsarAdminBuilder) {
- pulsarAdminBuilder.authentication(new
AuthenticationToken(SUPER_USER_TOKEN));
- }
-
- @AfterClass
- @Override
- protected void cleanup() throws Exception {
- internalCleanup();
- }
+public final class TopicPoliciesAuthZTest extends BaseAuthZTest {
@SneakyThrows