This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f61adc03d5d [improve][broker] Make some operation auto topic creation 
in Namespaces async. (#15621)
f61adc03d5d is described below

commit f61adc03d5db26809d935c4fe4631384cfbf5036
Author: Baodi Shi <[email protected]>
AuthorDate: Thu May 26 10:50:03 2022 +0800

    [improve][broker] Make some operation auto topic creation in Namespaces 
async. (#15621)
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 81 +++++++++-------------
 .../apache/pulsar/broker/admin/v1/Namespaces.java  | 69 +++++++++++++-----
 .../apache/pulsar/broker/admin/v2/Namespaces.java  | 70 ++++++++++++++-----
 .../apache/pulsar/broker/admin/NamespacesTest.java | 39 +++++++++++
 4 files changed, 174 insertions(+), 85 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 8701618ee39..dd882dfdadf 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -819,56 +819,43 @@ public abstract class NamespacesBase extends 
AdminResource {
         });
     }
 
-    protected AutoTopicCreationOverride internalGetAutoTopicCreation() {
-        validateNamespacePolicyOperation(namespaceName, 
PolicyName.AUTO_TOPIC_CREATION, PolicyOperation.READ);
-        Policies policies = getNamespacePolicies(namespaceName);
-        return policies.autoTopicCreationOverride;
+    protected CompletableFuture<AutoTopicCreationOverride> 
internalGetAutoTopicCreationAsync() {
+        return validateNamespacePolicyOperationAsync(namespaceName, 
PolicyName.AUTO_TOPIC_CREATION,
+                PolicyOperation.READ)
+                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+                .thenApply(policies -> policies.autoTopicCreationOverride);
     }
 
-    protected void internalSetAutoTopicCreation(AsyncResponse asyncResponse,
-                                                AutoTopicCreationOverride 
autoTopicCreationOverride) {
-        final int maxPartitions = 
pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
-        validateNamespacePolicyOperation(namespaceName, 
PolicyName.AUTO_TOPIC_CREATION, PolicyOperation.WRITE);
-        validatePoliciesReadOnlyAccess();
-        if (autoTopicCreationOverride != null) {
-            ValidateResult validateResult = 
AutoTopicCreationOverrideImpl.validateOverride(autoTopicCreationOverride);
-            if (!validateResult.isSuccess()) {
-                throw new RestException(Status.PRECONDITION_FAILED,
-                        "Invalid configuration for autoTopicCreationOverride. 
the detail is "
-                                + validateResult.getErrorInfo());
-            }
-            if (Objects.equals(autoTopicCreationOverride.getTopicType(), 
TopicType.PARTITIONED.toString())) {
-                if (maxPartitions > 0 && 
autoTopicCreationOverride.getDefaultNumPartitions() > maxPartitions) {
-                    throw new RestException(Status.NOT_ACCEPTABLE,
-                            "Number of partitions should be less than or equal 
to " + maxPartitions);
-                }
-            }
-        }
-        // Force to read the data s.t. the watch to the cache content is setup.
-        namespaceResources().setPoliciesAsync(namespaceName, policies -> {
-            policies.autoTopicCreationOverride = autoTopicCreationOverride;
-            return policies;
-        }).thenApply(r -> {
-            String autoOverride = (autoTopicCreationOverride != null
-                    && autoTopicCreationOverride.isAllowAutoTopicCreation()) ? 
"enabled" : "disabled";
-            log.info("[{}] Successfully {} autoTopicCreation on namespace {}", 
clientAppId(),
-                    autoOverride != null ? autoOverride : "removed", 
namespaceName);
-            asyncResponse.resume(Response.noContent().build());
-            return null;
-        }).exceptionally(e -> {
-            log.error("[{}] Failed to modify autoTopicCreation status on 
namespace {}", clientAppId(), namespaceName,
-                    e.getCause());
-            if (e.getCause() instanceof NotFoundException) {
-                asyncResponse.resume(new RestException(Status.NOT_FOUND, 
"Namespace does not exist"));
-                return null;
-            }
-            asyncResponse.resume(new RestException(e.getCause()));
-            return null;
-        });
-    }
+    protected CompletableFuture<Void> internalSetAutoTopicCreationAsync(
+            AutoTopicCreationOverride autoTopicCreationOverride) {
+        return validateNamespacePolicyOperationAsync(namespaceName,
+                PolicyName.AUTO_TOPIC_CREATION, PolicyOperation.WRITE)
+                .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+                .thenAccept(__ -> {
+                    int maxPartitions = 
pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
+                    if (autoTopicCreationOverride != null) {
+                        ValidateResult validateResult =
+                                
AutoTopicCreationOverrideImpl.validateOverride(autoTopicCreationOverride);
+                        if (!validateResult.isSuccess()) {
+                            throw new RestException(Status.PRECONDITION_FAILED,
+                                    "Invalid configuration for 
autoTopicCreationOverride. the detail is "
+                                            + validateResult.getErrorInfo());
+                        }
+                        if 
(Objects.equals(autoTopicCreationOverride.getTopicType(),
+                                                                  
TopicType.PARTITIONED.toString())){
+                            if (maxPartitions > 0
+                                    && 
autoTopicCreationOverride.getDefaultNumPartitions() > maxPartitions) {
+                                throw new RestException(Status.NOT_ACCEPTABLE,
+                                        "Number of partitions should be less 
than or equal to " + maxPartitions);
+                            }
 
-    protected void internalRemoveAutoTopicCreation(AsyncResponse 
asyncResponse) {
-        internalSetAutoTopicCreation(asyncResponse, null);
+                        }
+                    }
+                })
+                .thenCompose(__ -> 
namespaceResources().setPoliciesAsync(namespaceName, policies -> {
+                    policies.autoTopicCreationOverride = 
autoTopicCreationOverride;
+                    return policies;
+                }));
     }
 
     protected void internalSetAutoSubscriptionCreation(
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index 6076ba10c52..fc207efd7a7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -553,11 +553,18 @@ public class Namespaces extends NamespacesBase {
     @ApiOperation(value = "Get autoTopicCreation info in a namespace")
     @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have 
admin permission"),
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace 
doesn't exist")})
-    public AutoTopicCreationOverride 
getAutoTopicCreation(@PathParam("property") String property,
-                                                          
@PathParam("cluster") String cluster,
-                                                          
@PathParam("namespace") String namespace) {
+    public void getAutoTopicCreation(@Suspended AsyncResponse asyncResponse,
+                                     @PathParam("property") String property,
+                                     @PathParam("cluster") String cluster,
+                                     @PathParam("namespace") String namespace) 
{
         validateNamespaceName(property, cluster, namespace);
-        return internalGetAutoTopicCreation();
+        internalGetAutoTopicCreationAsync()
+                .thenAccept(asyncResponse::resume)
+                .exceptionally(ex -> {
+                    log.error("Failed to get autoTopicCreation info for 
namespace {}", namespaceName, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @POST
@@ -572,14 +579,27 @@ public class Namespaces extends NamespacesBase {
                                      @PathParam("property") String property, 
@PathParam("cluster") String cluster,
                                      @PathParam("namespace") String namespace,
                                      AutoTopicCreationOverride 
autoTopicCreationOverride) {
-        try {
-            validateNamespaceName(property, cluster, namespace);
-            internalSetAutoTopicCreation(asyncResponse, 
autoTopicCreationOverride);
-        } catch (RestException e) {
-            asyncResponse.resume(e);
-        } catch (Exception e) {
-            asyncResponse.resume(new RestException(e));
-        }
+        validateNamespaceName(property, cluster, namespace);
+        internalSetAutoTopicCreationAsync(autoTopicCreationOverride)
+                .thenAccept(__ -> {
+                    String autoOverride = (autoTopicCreationOverride != null
+                            && 
autoTopicCreationOverride.isAllowAutoTopicCreation()) ? "enabled" : "disabled";
+                    log.info("[{}] Successfully {} autoTopicCreation on 
namespace {}", clientAppId(),
+                                                                               
          autoOverride, namespaceName);
+                    asyncResponse.resume(Response.noContent().build());
+                })
+                .exceptionally(e -> {
+                    Throwable ex = FutureUtil.unwrapCompletionException(e);
+                    log.error("[{}] Failed to set autoTopicCreation status on 
namespace {}", clientAppId(),
+                            namespaceName,
+                            ex);
+                    if (ex instanceof NotFoundException) {
+                        asyncResponse.resume(new 
RestException(Status.NOT_FOUND, "Namespace does not exist"));
+                    } else {
+                        resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    }
+                    return null;
+                });
     }
 
     @DELETE
@@ -590,14 +610,25 @@ public class Namespaces extends NamespacesBase {
     public void removeAutoTopicCreation(@Suspended final AsyncResponse 
asyncResponse,
                                         @PathParam("property") String 
property, @PathParam("cluster") String cluster,
                                         @PathParam("namespace") String 
namespace) {
-        try {
             validateNamespaceName(property, cluster, namespace);
-            internalRemoveAutoTopicCreation(asyncResponse);
-        } catch (RestException e) {
-            asyncResponse.resume(e);
-        } catch (Exception e) {
-            asyncResponse.resume(new RestException(e));
-        }
+            internalSetAutoTopicCreationAsync(null)
+                    .thenAccept(__ -> {
+                        log.info("[{}] Successfully remove autoTopicCreation 
on namespace {}",
+                                clientAppId(), namespaceName);
+                        asyncResponse.resume(Response.noContent().build());
+                    })
+                    .exceptionally(e -> {
+                        Throwable ex = FutureUtil.unwrapCompletionException(e);
+                        log.error("[{}] Failed to remove autoTopicCreation 
status on namespace {}", clientAppId(),
+                                namespaceName,
+                                ex);
+                        if (ex instanceof NotFoundException) {
+                            asyncResponse.resume(new 
RestException(Status.NOT_FOUND, "Namespace does not exist"));
+                        } else {
+                            resumeAsyncResponseExceptionally(asyncResponse, 
ex);
+                        }
+                        return null;
+                    });
     }
 
     @POST
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 2162fa3ee87..52e47b534dc 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -473,7 +473,8 @@ public class Namespaces extends NamespacesBase {
         validateNamespaceName(tenant, namespace);
         internalModifyDeduplicationAsync(null)
                 .thenAccept(__ -> 
asyncResponse.resume(Response.noContent().build()))
-                .exceptionally(ex -> {
+                .exceptionally(e -> {
+                    Throwable ex = FutureUtil.unwrapCompletionException(e);
                     log.error("Failed to remove broker deduplication config 
for namespace {}", namespaceName, ex);
                     resumeAsyncResponseExceptionally(asyncResponse, ex);
                     return null;
@@ -485,10 +486,17 @@ public class Namespaces extends NamespacesBase {
     @ApiOperation(value = "Get autoTopicCreation info in a namespace")
     @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have 
admin permission"),
             @ApiResponse(code = 404, message = "Tenant or namespace doesn't 
exist")})
-    public AutoTopicCreationOverride getAutoTopicCreation(@PathParam("tenant") 
String tenant,
+    public void getAutoTopicCreation(@Suspended AsyncResponse asyncResponse,
+                                                          @PathParam("tenant") 
String tenant,
                                                           
@PathParam("namespace") String namespace) {
         validateNamespaceName(tenant, namespace);
-        return internalGetAutoTopicCreation();
+        internalGetAutoTopicCreationAsync()
+                .thenAccept(asyncResponse::resume)
+                .exceptionally(ex -> {
+                    log.error("Failed to get autoTopicCreation info for 
namespace {}", namespaceName, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @POST
@@ -504,14 +512,27 @@ public class Namespaces extends NamespacesBase {
             @PathParam("tenant") String tenant, @PathParam("namespace") String 
namespace,
             @ApiParam(value = "Settings for automatic topic creation", 
required = true)
                     AutoTopicCreationOverride autoTopicCreationOverride) {
-        try {
-            validateNamespaceName(tenant, namespace);
-            internalSetAutoTopicCreation(asyncResponse, 
autoTopicCreationOverride);
-        } catch (RestException e) {
-            asyncResponse.resume(e);
-        } catch (Exception e) {
-            asyncResponse.resume(new RestException(e));
-        }
+        validateNamespaceName(tenant, namespace);
+        internalSetAutoTopicCreationAsync(autoTopicCreationOverride)
+                .thenAccept(__ -> {
+                    String autoOverride = (autoTopicCreationOverride != null
+                            && 
autoTopicCreationOverride.isAllowAutoTopicCreation()) ? "enabled" : "disabled";
+                    log.info("[{}] Successfully {} autoTopicCreation on 
namespace {}", clientAppId(),
+                            autoOverride, namespaceName);
+                    asyncResponse.resume(Response.noContent().build());
+                })
+                .exceptionally(e -> {
+                    Throwable ex = FutureUtil.unwrapCompletionException(e);
+                    log.error("[{}] Failed to set autoTopicCreation status on 
namespace {}", clientAppId(),
+                            namespaceName,
+                            ex);
+                    if (ex instanceof 
MetadataStoreException.NotFoundException) {
+                        asyncResponse.resume(new 
RestException(Response.Status.NOT_FOUND, "Namespace does not exist"));
+                    } else {
+                        resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    }
+                    return null;
+                });
     }
 
     @DELETE
@@ -521,14 +542,25 @@ public class Namespaces extends NamespacesBase {
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace 
doesn't exist") })
     public void removeAutoTopicCreation(@Suspended final AsyncResponse 
asyncResponse,
                                         @PathParam("tenant") String tenant, 
@PathParam("namespace") String namespace) {
-        try {
-            validateNamespaceName(tenant, namespace);
-            internalRemoveAutoTopicCreation(asyncResponse);
-        } catch (RestException e) {
-            asyncResponse.resume(e);
-        } catch (Exception e) {
-            asyncResponse.resume(new RestException(e));
-        }
+        validateNamespaceName(tenant, namespace);
+        internalSetAutoTopicCreationAsync(null)
+                .thenAccept(__ -> {
+                    log.info("[{}] Successfully remove autoTopicCreation on 
namespace {}",
+                            clientAppId(), namespaceName);
+                    asyncResponse.resume(Response.noContent().build());
+                })
+                .exceptionally(e -> {
+                    Throwable ex = FutureUtil.unwrapCompletionException(e);
+                    log.error("[{}] Failed to remove autoTopicCreation status 
on namespace {}", clientAppId(),
+                            namespaceName,
+                            ex);
+                    if (ex instanceof 
MetadataStoreException.NotFoundException) {
+                        asyncResponse.resume(new 
RestException(Response.Status.NOT_FOUND, "Namespace does not exist"));
+                    } else {
+                        resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    }
+                    return null;
+                });
     }
 
     @POST
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index c64dc7bdf4e..dba0b6bd6ba 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -87,6 +87,7 @@ import org.apache.pulsar.common.naming.NamespaceBundles;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
 import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
@@ -1677,6 +1678,44 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
         assertInvalidRetentionPolicyAsPartOfAllPolicies(policies, 1, -2);
     }
 
+    @Test
+    public void testOptionsAutoTopicCreation() throws Exception {
+        String namespace = "auto_topic_namespace";
+        AutoTopicCreationOverride autoTopicCreationOverride =
+                
AutoTopicCreationOverride.builder().allowAutoTopicCreation(true).topicType("partitioned")
+                        .defaultNumPartitions(4).build();
+        try {
+            asyncRequests(response -> 
namespaces.setAutoTopicCreation(response, this.testTenant, 
this.testLocalCluster,
+                    namespace, autoTopicCreationOverride));
+            fail("should have failed");
+        } catch (RestException e) {
+            assertEquals(e.getResponse().getStatus(), 
Status.NOT_FOUND.getStatusCode());
+        }
+
+        asyncRequests(response -> namespaces.createNamespace(response, 
this.testTenant, this.testLocalCluster,
+                namespace, BundlesData.builder().build()));
+
+        // 1. set auto topic creation
+        asyncRequests(response -> namespaces.setAutoTopicCreation(response, 
this.testTenant, this.testLocalCluster,
+                namespace, autoTopicCreationOverride));
+
+        // 2. assert get auto topic creation
+        AutoTopicCreationOverride autoTopicCreationOverrideRsp = 
(AutoTopicCreationOverride) asyncRequests(
+                response -> namespaces.getAutoTopicCreation(response, 
this.testTenant, this.testLocalCluster,
+                        namespace));
+        assertEquals(autoTopicCreationOverride.getTopicType(), 
autoTopicCreationOverrideRsp.getTopicType());
+        assertEquals(autoTopicCreationOverride.getDefaultNumPartitions(),
+                                          
autoTopicCreationOverrideRsp.getDefaultNumPartitions());
+        assertEquals(autoTopicCreationOverride.isAllowAutoTopicCreation(),
+                                          
autoTopicCreationOverrideRsp.isAllowAutoTopicCreation());
+        // 2. remove auto topic creation and assert get null
+        asyncRequests(response -> namespaces.removeAutoTopicCreation(response, 
this.testTenant,
+                this.testLocalCluster, namespace));
+        assertNull(asyncRequests(
+                response -> namespaces.getAutoTopicCreation(response, 
this.testTenant, this.testLocalCluster,
+                        namespace)));
+    }
+
     @Test
     public void testSubscriptionTypesEnabled() throws PulsarAdminException, 
PulsarClientException {
         pulsar.getConfiguration().setAuthorizationEnabled(false);

Reply via email to