This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f61adc03d5d [improve][broker] Make some operation auto topic creation
in Namespaces async. (#15621)
f61adc03d5d is described below
commit f61adc03d5db26809d935c4fe4631384cfbf5036
Author: Baodi Shi <[email protected]>
AuthorDate: Thu May 26 10:50:03 2022 +0800
[improve][broker] Make some operation auto topic creation in Namespaces
async. (#15621)
---
.../pulsar/broker/admin/impl/NamespacesBase.java | 81 +++++++++-------------
.../apache/pulsar/broker/admin/v1/Namespaces.java | 69 +++++++++++++-----
.../apache/pulsar/broker/admin/v2/Namespaces.java | 70 ++++++++++++++-----
.../apache/pulsar/broker/admin/NamespacesTest.java | 39 +++++++++++
4 files changed, 174 insertions(+), 85 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 8701618ee39..dd882dfdadf 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -819,56 +819,43 @@ public abstract class NamespacesBase extends
AdminResource {
});
}
- protected AutoTopicCreationOverride internalGetAutoTopicCreation() {
- validateNamespacePolicyOperation(namespaceName,
PolicyName.AUTO_TOPIC_CREATION, PolicyOperation.READ);
- Policies policies = getNamespacePolicies(namespaceName);
- return policies.autoTopicCreationOverride;
+ protected CompletableFuture<AutoTopicCreationOverride>
internalGetAutoTopicCreationAsync() {
+ return validateNamespacePolicyOperationAsync(namespaceName,
PolicyName.AUTO_TOPIC_CREATION,
+ PolicyOperation.READ)
+ .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+ .thenApply(policies -> policies.autoTopicCreationOverride);
}
- protected void internalSetAutoTopicCreation(AsyncResponse asyncResponse,
- AutoTopicCreationOverride
autoTopicCreationOverride) {
- final int maxPartitions =
pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
- validateNamespacePolicyOperation(namespaceName,
PolicyName.AUTO_TOPIC_CREATION, PolicyOperation.WRITE);
- validatePoliciesReadOnlyAccess();
- if (autoTopicCreationOverride != null) {
- ValidateResult validateResult =
AutoTopicCreationOverrideImpl.validateOverride(autoTopicCreationOverride);
- if (!validateResult.isSuccess()) {
- throw new RestException(Status.PRECONDITION_FAILED,
- "Invalid configuration for autoTopicCreationOverride.
the detail is "
- + validateResult.getErrorInfo());
- }
- if (Objects.equals(autoTopicCreationOverride.getTopicType(),
TopicType.PARTITIONED.toString())) {
- if (maxPartitions > 0 &&
autoTopicCreationOverride.getDefaultNumPartitions() > maxPartitions) {
- throw new RestException(Status.NOT_ACCEPTABLE,
- "Number of partitions should be less than or equal
to " + maxPartitions);
- }
- }
- }
- // Force to read the data s.t. the watch to the cache content is setup.
- namespaceResources().setPoliciesAsync(namespaceName, policies -> {
- policies.autoTopicCreationOverride = autoTopicCreationOverride;
- return policies;
- }).thenApply(r -> {
- String autoOverride = (autoTopicCreationOverride != null
- && autoTopicCreationOverride.isAllowAutoTopicCreation()) ?
"enabled" : "disabled";
- log.info("[{}] Successfully {} autoTopicCreation on namespace {}",
clientAppId(),
- autoOverride != null ? autoOverride : "removed",
namespaceName);
- asyncResponse.resume(Response.noContent().build());
- return null;
- }).exceptionally(e -> {
- log.error("[{}] Failed to modify autoTopicCreation status on
namespace {}", clientAppId(), namespaceName,
- e.getCause());
- if (e.getCause() instanceof NotFoundException) {
- asyncResponse.resume(new RestException(Status.NOT_FOUND,
"Namespace does not exist"));
- return null;
- }
- asyncResponse.resume(new RestException(e.getCause()));
- return null;
- });
- }
+ protected CompletableFuture<Void> internalSetAutoTopicCreationAsync(
+ AutoTopicCreationOverride autoTopicCreationOverride) {
+ return validateNamespacePolicyOperationAsync(namespaceName,
+ PolicyName.AUTO_TOPIC_CREATION, PolicyOperation.WRITE)
+ .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+ .thenAccept(__ -> {
+ int maxPartitions =
pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
+ if (autoTopicCreationOverride != null) {
+ ValidateResult validateResult =
+
AutoTopicCreationOverrideImpl.validateOverride(autoTopicCreationOverride);
+ if (!validateResult.isSuccess()) {
+ throw new RestException(Status.PRECONDITION_FAILED,
+ "Invalid configuration for
autoTopicCreationOverride. the detail is "
+ + validateResult.getErrorInfo());
+ }
+ if
(Objects.equals(autoTopicCreationOverride.getTopicType(),
+
TopicType.PARTITIONED.toString())){
+ if (maxPartitions > 0
+ &&
autoTopicCreationOverride.getDefaultNumPartitions() > maxPartitions) {
+ throw new RestException(Status.NOT_ACCEPTABLE,
+ "Number of partitions should be less
than or equal to " + maxPartitions);
+ }
- protected void internalRemoveAutoTopicCreation(AsyncResponse
asyncResponse) {
- internalSetAutoTopicCreation(asyncResponse, null);
+ }
+ }
+ })
+ .thenCompose(__ ->
namespaceResources().setPoliciesAsync(namespaceName, policies -> {
+ policies.autoTopicCreationOverride =
autoTopicCreationOverride;
+ return policies;
+ }));
}
protected void internalSetAutoSubscriptionCreation(
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index 6076ba10c52..fc207efd7a7 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -553,11 +553,18 @@ public class Namespaces extends NamespacesBase {
@ApiOperation(value = "Get autoTopicCreation info in a namespace")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have
admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace
doesn't exist")})
- public AutoTopicCreationOverride
getAutoTopicCreation(@PathParam("property") String property,
-
@PathParam("cluster") String cluster,
-
@PathParam("namespace") String namespace) {
+ public void getAutoTopicCreation(@Suspended AsyncResponse asyncResponse,
+ @PathParam("property") String property,
+ @PathParam("cluster") String cluster,
+ @PathParam("namespace") String namespace)
{
validateNamespaceName(property, cluster, namespace);
- return internalGetAutoTopicCreation();
+ internalGetAutoTopicCreationAsync()
+ .thenAccept(asyncResponse::resume)
+ .exceptionally(ex -> {
+ log.error("Failed to get autoTopicCreation info for
namespace {}", namespaceName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@POST
@@ -572,14 +579,27 @@ public class Namespaces extends NamespacesBase {
@PathParam("property") String property,
@PathParam("cluster") String cluster,
@PathParam("namespace") String namespace,
AutoTopicCreationOverride
autoTopicCreationOverride) {
- try {
- validateNamespaceName(property, cluster, namespace);
- internalSetAutoTopicCreation(asyncResponse,
autoTopicCreationOverride);
- } catch (RestException e) {
- asyncResponse.resume(e);
- } catch (Exception e) {
- asyncResponse.resume(new RestException(e));
- }
+ validateNamespaceName(property, cluster, namespace);
+ internalSetAutoTopicCreationAsync(autoTopicCreationOverride)
+ .thenAccept(__ -> {
+ String autoOverride = (autoTopicCreationOverride != null
+ &&
autoTopicCreationOverride.isAllowAutoTopicCreation()) ? "enabled" : "disabled";
+ log.info("[{}] Successfully {} autoTopicCreation on
namespace {}", clientAppId(),
+
autoOverride, namespaceName);
+ asyncResponse.resume(Response.noContent().build());
+ })
+ .exceptionally(e -> {
+ Throwable ex = FutureUtil.unwrapCompletionException(e);
+ log.error("[{}] Failed to set autoTopicCreation status on
namespace {}", clientAppId(),
+ namespaceName,
+ ex);
+ if (ex instanceof NotFoundException) {
+ asyncResponse.resume(new
RestException(Status.NOT_FOUND, "Namespace does not exist"));
+ } else {
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ }
+ return null;
+ });
}
@DELETE
@@ -590,14 +610,25 @@ public class Namespaces extends NamespacesBase {
public void removeAutoTopicCreation(@Suspended final AsyncResponse
asyncResponse,
@PathParam("property") String
property, @PathParam("cluster") String cluster,
@PathParam("namespace") String
namespace) {
- try {
validateNamespaceName(property, cluster, namespace);
- internalRemoveAutoTopicCreation(asyncResponse);
- } catch (RestException e) {
- asyncResponse.resume(e);
- } catch (Exception e) {
- asyncResponse.resume(new RestException(e));
- }
+ internalSetAutoTopicCreationAsync(null)
+ .thenAccept(__ -> {
+ log.info("[{}] Successfully remove autoTopicCreation
on namespace {}",
+ clientAppId(), namespaceName);
+ asyncResponse.resume(Response.noContent().build());
+ })
+ .exceptionally(e -> {
+ Throwable ex = FutureUtil.unwrapCompletionException(e);
+ log.error("[{}] Failed to remove autoTopicCreation
status on namespace {}", clientAppId(),
+ namespaceName,
+ ex);
+ if (ex instanceof NotFoundException) {
+ asyncResponse.resume(new
RestException(Status.NOT_FOUND, "Namespace does not exist"));
+ } else {
+ resumeAsyncResponseExceptionally(asyncResponse,
ex);
+ }
+ return null;
+ });
}
@POST
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 2162fa3ee87..52e47b534dc 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -473,7 +473,8 @@ public class Namespaces extends NamespacesBase {
validateNamespaceName(tenant, namespace);
internalModifyDeduplicationAsync(null)
.thenAccept(__ ->
asyncResponse.resume(Response.noContent().build()))
- .exceptionally(ex -> {
+ .exceptionally(e -> {
+ Throwable ex = FutureUtil.unwrapCompletionException(e);
log.error("Failed to remove broker deduplication config
for namespace {}", namespaceName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
@@ -485,10 +486,17 @@ public class Namespaces extends NamespacesBase {
@ApiOperation(value = "Get autoTopicCreation info in a namespace")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have
admin permission"),
@ApiResponse(code = 404, message = "Tenant or namespace doesn't
exist")})
- public AutoTopicCreationOverride getAutoTopicCreation(@PathParam("tenant")
String tenant,
+ public void getAutoTopicCreation(@Suspended AsyncResponse asyncResponse,
+ @PathParam("tenant")
String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
- return internalGetAutoTopicCreation();
+ internalGetAutoTopicCreationAsync()
+ .thenAccept(asyncResponse::resume)
+ .exceptionally(ex -> {
+ log.error("Failed to get autoTopicCreation info for
namespace {}", namespaceName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@POST
@@ -504,14 +512,27 @@ public class Namespaces extends NamespacesBase {
@PathParam("tenant") String tenant, @PathParam("namespace") String
namespace,
@ApiParam(value = "Settings for automatic topic creation",
required = true)
AutoTopicCreationOverride autoTopicCreationOverride) {
- try {
- validateNamespaceName(tenant, namespace);
- internalSetAutoTopicCreation(asyncResponse,
autoTopicCreationOverride);
- } catch (RestException e) {
- asyncResponse.resume(e);
- } catch (Exception e) {
- asyncResponse.resume(new RestException(e));
- }
+ validateNamespaceName(tenant, namespace);
+ internalSetAutoTopicCreationAsync(autoTopicCreationOverride)
+ .thenAccept(__ -> {
+ String autoOverride = (autoTopicCreationOverride != null
+ &&
autoTopicCreationOverride.isAllowAutoTopicCreation()) ? "enabled" : "disabled";
+ log.info("[{}] Successfully {} autoTopicCreation on
namespace {}", clientAppId(),
+ autoOverride, namespaceName);
+ asyncResponse.resume(Response.noContent().build());
+ })
+ .exceptionally(e -> {
+ Throwable ex = FutureUtil.unwrapCompletionException(e);
+ log.error("[{}] Failed to set autoTopicCreation status on
namespace {}", clientAppId(),
+ namespaceName,
+ ex);
+ if (ex instanceof
MetadataStoreException.NotFoundException) {
+ asyncResponse.resume(new
RestException(Response.Status.NOT_FOUND, "Namespace does not exist"));
+ } else {
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ }
+ return null;
+ });
}
@DELETE
@@ -521,14 +542,25 @@ public class Namespaces extends NamespacesBase {
@ApiResponse(code = 404, message = "Tenant or cluster or namespace
doesn't exist") })
public void removeAutoTopicCreation(@Suspended final AsyncResponse
asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
- try {
- validateNamespaceName(tenant, namespace);
- internalRemoveAutoTopicCreation(asyncResponse);
- } catch (RestException e) {
- asyncResponse.resume(e);
- } catch (Exception e) {
- asyncResponse.resume(new RestException(e));
- }
+ validateNamespaceName(tenant, namespace);
+ internalSetAutoTopicCreationAsync(null)
+ .thenAccept(__ -> {
+ log.info("[{}] Successfully remove autoTopicCreation on
namespace {}",
+ clientAppId(), namespaceName);
+ asyncResponse.resume(Response.noContent().build());
+ })
+ .exceptionally(e -> {
+ Throwable ex = FutureUtil.unwrapCompletionException(e);
+ log.error("[{}] Failed to remove autoTopicCreation status
on namespace {}", clientAppId(),
+ namespaceName,
+ ex);
+ if (ex instanceof
MetadataStoreException.NotFoundException) {
+ asyncResponse.resume(new
RestException(Response.Status.NOT_FOUND, "Namespace does not exist"));
+ } else {
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ }
+ return null;
+ });
}
@POST
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index c64dc7bdf4e..dba0b6bd6ba 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -87,6 +87,7 @@ import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
@@ -1677,6 +1678,44 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
assertInvalidRetentionPolicyAsPartOfAllPolicies(policies, 1, -2);
}
+ @Test
+ public void testOptionsAutoTopicCreation() throws Exception {
+ String namespace = "auto_topic_namespace";
+ AutoTopicCreationOverride autoTopicCreationOverride =
+
AutoTopicCreationOverride.builder().allowAutoTopicCreation(true).topicType("partitioned")
+ .defaultNumPartitions(4).build();
+ try {
+ asyncRequests(response ->
namespaces.setAutoTopicCreation(response, this.testTenant,
this.testLocalCluster,
+ namespace, autoTopicCreationOverride));
+ fail("should have failed");
+ } catch (RestException e) {
+ assertEquals(e.getResponse().getStatus(),
Status.NOT_FOUND.getStatusCode());
+ }
+
+ asyncRequests(response -> namespaces.createNamespace(response,
this.testTenant, this.testLocalCluster,
+ namespace, BundlesData.builder().build()));
+
+ // 1. set auto topic creation
+ asyncRequests(response -> namespaces.setAutoTopicCreation(response,
this.testTenant, this.testLocalCluster,
+ namespace, autoTopicCreationOverride));
+
+ // 2. assert get auto topic creation
+ AutoTopicCreationOverride autoTopicCreationOverrideRsp =
(AutoTopicCreationOverride) asyncRequests(
+ response -> namespaces.getAutoTopicCreation(response,
this.testTenant, this.testLocalCluster,
+ namespace));
+ assertEquals(autoTopicCreationOverride.getTopicType(),
autoTopicCreationOverrideRsp.getTopicType());
+ assertEquals(autoTopicCreationOverride.getDefaultNumPartitions(),
+
autoTopicCreationOverrideRsp.getDefaultNumPartitions());
+ assertEquals(autoTopicCreationOverride.isAllowAutoTopicCreation(),
+
autoTopicCreationOverrideRsp.isAllowAutoTopicCreation());
+ // 2. remove auto topic creation and assert get null
+ asyncRequests(response -> namespaces.removeAutoTopicCreation(response,
this.testTenant,
+ this.testLocalCluster, namespace));
+ assertNull(asyncRequests(
+ response -> namespaces.getAutoTopicCreation(response,
this.testTenant, this.testLocalCluster,
+ namespace)));
+ }
+
@Test
public void testSubscriptionTypesEnabled() throws PulsarAdminException,
PulsarClientException {
pulsar.getConfiguration().setAuthorizationEnabled(false);