This is an automated email from the ASF dual-hosted git repository.
houxiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7bb290c8d39 [improve][broker][PIP-149]make setPersistence method async
in Namespaces (#17421)
7bb290c8d39 is described below
commit 7bb290c8d394811d436a3e97054d81d4c8208af4
Author: HuangZeGui <[email protected]>
AuthorDate: Sun Sep 11 15:34:50 2022 +0800
[improve][broker][PIP-149]make setPersistence method async in Namespaces
(#17421)
---
.../pulsar/broker/admin/impl/NamespacesBase.java | 26 ++++---------------
.../apache/pulsar/broker/admin/v1/Namespaces.java | 14 +++++++---
.../apache/pulsar/broker/admin/v2/Namespaces.java | 12 +++++++--
.../apache/pulsar/broker/admin/NamespacesTest.java | 30 +++++++++++++++-------
4 files changed, 47 insertions(+), 35 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index d0f1ef0acc2..b8f3ac8a7a3 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -1509,27 +1509,11 @@ public abstract class NamespacesBase extends
AdminResource {
.thenCompose(__ -> doUpdatePersistenceAsync(null));
}
- protected void internalSetPersistence(PersistencePolicies persistence) {
- validateNamespacePolicyOperation(namespaceName,
PolicyName.PERSISTENCE, PolicyOperation.WRITE);
- validatePoliciesReadOnlyAccess();
- validatePersistencePolicies(persistence);
-
- doUpdatePersistence(persistence);
- }
-
- private void doUpdatePersistence(PersistencePolicies persistence) {
- try {
- updatePolicies(namespaceName, policies -> {
- policies.persistence = persistence;
- return policies;
- });
- log.info("[{}] Successfully updated persistence configuration:
namespace={}, map={}", clientAppId(),
- namespaceName,
jsonMapper().writeValueAsString(persistence));
- } catch (Exception e) {
- log.error("[{}] Failed to update persistence configuration for
namespace {}", clientAppId(), namespaceName,
- e);
- throw new RestException(e);
- }
+ protected CompletableFuture<Void>
internalSetPersistenceAsync(PersistencePolicies persistence) {
+ return validateNamespacePolicyOperationAsync(namespaceName,
PolicyName.PERSISTENCE, PolicyOperation.WRITE)
+ .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+ .thenAccept(__ -> validatePersistencePolicies(persistence))
+ .thenCompose(__ -> doUpdatePersistenceAsync(persistence));
}
private CompletableFuture<Void>
doUpdatePersistenceAsync(PersistencePolicies persistence) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index 4f603aaeb30..54f4c0c85c7 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -1213,10 +1213,18 @@ public class Namespaces extends NamespacesBase {
@ApiResponse(code = 404, message = "Namespace does not exist"),
@ApiResponse(code = 409, message = "Concurrent modification"),
@ApiResponse(code = 400, message = "Invalid persistence policies")
})
- public void setPersistence(@PathParam("property") String property,
@PathParam("cluster") String cluster,
- @PathParam("namespace") String namespace, PersistencePolicies
persistence) {
+ public void setPersistence(@Suspended final AsyncResponse asyncResponse,
@PathParam("property") String property,
+ @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace,
+ PersistencePolicies persistence) {
validateNamespaceName(property, cluster, namespace);
- internalSetPersistence(persistence);
+ internalSetPersistenceAsync(persistence)
+ .thenAccept(__ ->
asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to update the persistence for a
namespace {}", clientAppId(), namespaceName,
+ ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@POST
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 445641718da..03f5bbdd115 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -1235,11 +1235,19 @@ public class Namespaces extends NamespacesBase {
@ApiResponse(code = 404, message = "Namespace does not exist"),
@ApiResponse(code = 409, message = "Concurrent modification"),
@ApiResponse(code = 400, message = "Invalid persistence
policies")})
- public void setPersistence(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
+ public void setPersistence(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
@ApiParam(value = "Persistence policies for the
specified namespace", required = true)
PersistencePolicies persistence) {
validateNamespaceName(tenant, namespace);
- internalSetPersistence(persistence);
+ internalSetPersistenceAsync(persistence)
+ .thenAccept(__ ->
asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to update the persistence for a
namespace {}", clientAppId(), namespaceName,
+ ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@DELETE
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index c23407bb447..73cf4914ffe 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -101,6 +101,7 @@ import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
+import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl;
import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
import org.apache.zookeeper.KeeperException.Code;
@@ -196,6 +197,14 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
.validateNamespacePolicyOperation(NamespaceName.get("other-tenant/use/test-namespace-1"),
PolicyName.RETENTION, PolicyOperation.WRITE);
+ doReturn(FutureUtil.failedFuture(new
RestException(Status.UNAUTHORIZED, "unauthorized"))).when(namespaces)
+
.validateNamespacePolicyOperationAsync(NamespaceName.get("other-tenant/use/test-namespace-1"),
+ PolicyName.PERSISTENCE, PolicyOperation.WRITE);
+
+ doReturn(FutureUtil.failedFuture(new
RestException(Status.UNAUTHORIZED, "unauthorized"))).when(namespaces)
+
.validateNamespacePolicyOperationAsync(NamespaceName.get("other-tenant/use/test-namespace-1"),
+ PolicyName.RETENTION, PolicyOperation.WRITE);
+
nsSvc = pulsar.getNamespaceService();
}
@@ -1065,8 +1074,12 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
public void testPersistence() throws Exception {
NamespaceName testNs = this.testLocalNamespaces.get(0);
PersistencePolicies persistence1 = new PersistencePolicies(3, 2, 1,
0.0);
- namespaces.setPersistence(testNs.getTenant(), testNs.getCluster(),
testNs.getLocalName(), persistence1);
AsyncResponse response = mock(AsyncResponse.class);
+ namespaces.setPersistence(response, testNs.getTenant(),
testNs.getCluster(), testNs.getLocalName(), persistence1);
+ ArgumentCaptor<Response> responseCaptor =
ArgumentCaptor.forClass(Response.class);
+ verify(response,
timeout(5000).times(1)).resume(responseCaptor.capture());
+ assertEquals(responseCaptor.getValue().getStatus(),
Response.Status.NO_CONTENT.getStatusCode());
+ response = mock(AsyncResponse.class);
namespaces.getPersistence(response, testNs.getTenant(),
testNs.getCluster(), testNs.getLocalName());
ArgumentCaptor<PersistencePolicies> captor =
ArgumentCaptor.forClass(PersistencePolicies.class);
verify(response, timeout(5000).times(1)).resume(captor.capture());
@@ -1076,14 +1089,13 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
@Test
public void testPersistenceUnauthorized() throws Exception {
- try {
- NamespaceName testNs = this.testLocalNamespaces.get(3);
- PersistencePolicies persistence = new PersistencePolicies(3, 2, 1,
0.0);
- namespaces.setPersistence(testNs.getTenant(), testNs.getCluster(),
testNs.getLocalName(), persistence);
- fail("Should fail");
- } catch (RestException e) {
- assertEquals(e.getResponse().getStatus(),
Status.UNAUTHORIZED.getStatusCode());
- }
+ NamespaceName testNs = this.testLocalNamespaces.get(3);
+ PersistencePolicies persistence = new PersistencePolicies(3, 2, 1,
0.0);
+ AsyncResponse response = mock(AsyncResponse.class);
+ namespaces.setPersistence(response, testNs.getTenant(),
testNs.getCluster(), testNs.getLocalName(), persistence);
+ ArgumentCaptor<RestException> errorCaptor =
ArgumentCaptor.forClass(RestException.class);
+ verify(response, timeout(5000).times(1)).resume(errorCaptor.capture());
+ assertEquals(errorCaptor.getValue().getResponse().getStatus(),
Response.Status.UNAUTHORIZED.getStatusCode());
}
@Test