This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new c23cfd35f79 [improve][broker] optimize namespaceBundle validation to
fix single-thread 100% CPU during unloading entire namespaces (#25626)
c23cfd35f79 is described below
commit c23cfd35f79b76fbdb5e4867bd44d45c3d962667
Author: Jiwei Guo <[email protected]>
AuthorDate: Tue May 12 19:31:40 2026 +0800
[improve][broker] optimize namespaceBundle validation to fix single-thread
100% CPU during unloading entire namespaces (#25626)
Co-authored-by: zhaizhibo <[email protected]>
(cherry picked from commit 893779df2cd97868efe01cda22e8b2e18ea4c7a9)
---
.../pulsar/broker/admin/impl/NamespacesBase.java | 36 +++---
.../broker/admin/impl/ResourceQuotasBase.java | 3 +-
.../broker/admin/v1/NonPersistentTopics.java | 15 ++-
.../broker/admin/v2/NonPersistentTopics.java | 5 +-
.../pulsar/broker/web/PulsarWebResource.java | 85 +++++++--------
.../apache/pulsar/broker/admin/NamespacesTest.java | 121 +++++++++++++++++++++
6 files changed, 186 insertions(+), 79 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 9c26fc75cf3..293747b5264 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -618,8 +618,7 @@ public abstract class NamespacesBase extends AdminResource {
}
return future
.thenCompose(__ ->
-
validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles,
- bundleRange,
+
validateNamespaceBundleOwnershipAsync(namespaceName, bundleRange,
authoritative, true))
.thenCompose(bundle -> {
return
pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName)
@@ -1166,9 +1165,8 @@ public abstract class NamespacesBase extends
AdminResource {
}
})
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
- .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
- .thenCompose(policies ->
- isBundleOwnedByAnyBroker(namespaceName, policies.bundles,
bundleRange)
+ .thenCompose(__ ->
+ isBundleOwnedByAnyBroker(namespaceName, bundleRange)
.thenCompose(flag -> {
if (!flag) {
log.info("[{}] Namespace bundle is not owned
by any broker {}/{}", clientAppId(),
@@ -1176,7 +1174,7 @@ public abstract class NamespacesBase extends
AdminResource {
return CompletableFuture.completedFuture(null);
}
Optional<String> destinationBrokerOpt =
Optional.ofNullable(destinationBroker);
- return
validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles,
bundleRange,
+ return
validateNamespaceBundleOwnershipAsync(namespaceName, bundleRange,
authoritative, true)
.thenCompose(nsBundle ->
pulsar().getNamespaceService()
.unloadNamespaceBundle(nsBundle,
destinationBrokerOpt));
@@ -1222,10 +1220,8 @@ public abstract class NamespacesBase extends
AdminResource {
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> getBundleRangeAsync(bundleName))
.thenCompose(bundleRange -> {
- return getNamespacePoliciesAsync(namespaceName)
- .thenCompose(policies ->
-
validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles,
bundleRange,
- authoritative, false))
+ return
validateNamespaceBundleOwnershipAsync(namespaceName, bundleRange,
+ authoritative, false)
.thenCompose(nsBundle ->
pulsar().getNamespaceService().splitAndOwnBundle(nsBundle, unload,
pulsar().getNamespaceService()
.getNamespaceBundleSplitAlgorithmByName(splitAlgorithmName),
@@ -1239,9 +1235,8 @@ public abstract class NamespacesBase extends
AdminResource {
log.debug("[{}] Getting hash position for topic list {}, bundle
{}", clientAppId(), topics, bundleRange);
}
return validateNamespacePolicyOperationAsync(namespaceName,
PolicyName.PERSISTENCE, PolicyOperation.READ)
- .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
- .thenCompose(policies -> {
- return
validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles,
bundleRange,
+ .thenCompose(__ -> {
+ return
validateNamespaceBundleOwnershipAsync(namespaceName, bundleRange,
false, true)
.thenCompose(nsBundle ->
pulsar().getNamespaceService().getOwnedTopicListForNamespaceBundle(nsBundle))
@@ -1572,11 +1567,10 @@ public abstract class NamespacesBase extends
AdminResource {
// check cluster ownership for a given global namespace:
redirect if peer-cluster owns it
return
validateGlobalNamespaceOwnershipAsync(namespaceName);
})
- .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
- .thenCompose(policies ->
+ .thenCompose(__ ->
// Allow acquiring ownership for an unassigned bundle
so backlog can be cleared
// even if not loaded.
- validateNamespaceBundleOwnershipAsync(namespaceName,
policies.bundles, bundleRange,
+ validateNamespaceBundleOwnershipAsync(namespaceName,
bundleRange,
authoritative, false))
.thenCompose(bundle -> clearBacklogAsync(bundle, null))
.thenRun(() -> log.info("[{}] Successfully cleared backlog on
namespace bundle {}/{}", clientAppId(),
@@ -1625,11 +1619,10 @@ public abstract class NamespacesBase extends
AdminResource {
// check cluster ownership for a given global namespace:
redirect if peer-cluster owns it
return
validateGlobalNamespaceOwnershipAsync(namespaceName);
})
- .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
- .thenCompose(policies ->
+ .thenCompose(__ ->
// Allow acquiring ownership for an unassigned bundle
so backlog can be cleared
// even if not loaded.
- validateNamespaceBundleOwnershipAsync(namespaceName,
policies.bundles, bundleRange,
+ validateNamespaceBundleOwnershipAsync(namespaceName,
bundleRange,
authoritative, false))
.thenCompose(bundle -> clearBacklogAsync(bundle, subscription))
.thenRun(() -> log.info(
@@ -1681,9 +1674,8 @@ public abstract class NamespacesBase extends
AdminResource {
.thenCompose(unused ->
validateClusterForTenantAsync(namespaceName.getTenant(),
namespaceName.getCluster()));
})
- .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
- .thenCompose(policies ->
- validateNamespaceBundleOwnershipAsync(namespaceName,
policies.bundles, bundleRange,
+ .thenCompose(__ ->
+ validateNamespaceBundleOwnershipAsync(namespaceName,
bundleRange,
authoritative, false))
.thenCompose(bundle -> unsubscribeAsync(bundle, subscription))
.thenRun(() -> log.info("[{}] Successfully unsubscribed {} on
namespace bundle {}/{}",
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ResourceQuotasBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ResourceQuotasBase.java
index 8e4d6f3211d..fe7c59ec4e9 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ResourceQuotasBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ResourceQuotasBase.java
@@ -76,7 +76,6 @@ public abstract class ResourceQuotasBase extends
NamespacesBase {
namespaceName.getCluster()));
}
return ret
- .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
- .thenApply(policies ->
validateNamespaceBundleRange(namespaceName, policies.bundles, bundleRange));
+ .thenCompose(__ ->
validateNamespaceBundleRangeAsync(namespaceName, bundleRange));
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
index 1c1dd747196..7170669e491 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
@@ -250,7 +250,7 @@ public class NonPersistentTopics extends PersistentTopics {
bundleRange);
validateNamespaceName(property, cluster, namespace);
validateNamespaceOperation(namespaceName,
NamespaceOperation.GET_BUNDLE);
- Policies policies = getNamespacePolicies(property, cluster, namespace);
+ getNamespacePolicies(property, cluster, namespace);
if (!cluster.equals(Constants.GLOBAL_CLUSTER)) {
validateClusterOwnership(cluster);
validateClusterForTenant(property, cluster);
@@ -260,14 +260,14 @@ public class NonPersistentTopics extends PersistentTopics
{
}
NamespaceName fqnn = NamespaceName.get(property, cluster, namespace);
try {
- if (!isBundleOwnedByAnyBroker(fqnn, policies.bundles, bundleRange)
+ if (!isBundleOwnedByAnyBroker(fqnn, bundleRange)
.get(pulsar().getConfig().getMetadataStoreOperationTimeoutSeconds(),
TimeUnit.SECONDS)) {
log.info("[{}] Namespace bundle is not owned by any broker
{}/{}/{}/{}", clientAppId(), property,
cluster, namespace, bundleRange);
return null;
}
- NamespaceBundle nsBundle = validateNamespaceBundleOwnership(fqnn,
policies.bundles, bundleRange,
- true, true);
+ NamespaceBundle nsBundle =
validateNamespaceBundleOwnershipAsync(fqnn, bundleRange, true, true)
+
.get(pulsar().getConfig().getMetadataStoreOperationTimeoutSeconds(),
TimeUnit.SECONDS);
final List<String> topicList = new ArrayList<>();
pulsar().getBrokerService().forEachTopic(topic -> {
TopicName topicName = TopicName.get(topic.getName());
@@ -276,6 +276,13 @@ public class NonPersistentTopics extends PersistentTopics {
}
});
return topicList;
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof WebApplicationException wae) {
+ throw wae;
+ }
+ log.error("[{}] Failed to unload namespace bundle {}/{}",
clientAppId(), fqnn.toString(), bundleRange, e);
+ throw new RestException(cause != null ? cause : e);
} catch (WebApplicationException wae) {
throw wae;
} catch (Exception e) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index edf4303e1ad..655108662ad 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -464,18 +464,17 @@ public class NonPersistentTopics extends PersistentTopics
{
}
validateNamespaceOperation(namespaceName,
NamespaceOperation.GET_BUNDLE);
- Policies policies = getNamespacePolicies(namespaceName);
// check cluster ownership for a given global namespace: redirect if
peer-cluster owns it
validateGlobalNamespaceOwnership(namespaceName);
- isBundleOwnedByAnyBroker(namespaceName, policies.bundles,
bundleRange).thenAccept(flag -> {
+ isBundleOwnedByAnyBroker(namespaceName, bundleRange).thenAccept(flag
-> {
if (!flag) {
log.info("[{}] Namespace bundle is not owned by any broker
{}/{}", clientAppId(), namespaceName,
bundleRange);
asyncResponse.resume(Response.noContent().build());
} else {
- validateNamespaceBundleOwnershipAsync(namespaceName,
policies.bundles, bundleRange, true, true)
+ validateNamespaceBundleOwnershipAsync(namespaceName,
bundleRange, true, true)
.thenAccept(nsBundle -> {
final var bundleTopics =
pulsar().getBrokerService().getMultiLayerTopicsMap()
.get(namespaceName.toString());
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 9291097d271..873b5dce431 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -82,7 +82,6 @@ import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
@@ -585,7 +584,7 @@ public abstract class PulsarWebResource {
}
}
- protected NamespaceBundle validateNamespaceBundleRange(NamespaceName fqnn,
BundlesData bundles,
+ protected CompletableFuture<NamespaceBundle>
validateNamespaceBundleRangeAsync(NamespaceName fqnn,
String bundleRange) {
try {
checkArgument(bundleRange.contains("_"), "Invalid bundle range: "
+ bundleRange);
@@ -596,68 +595,58 @@ public abstract class PulsarWebResource {
(upperEndpoint.equals(NamespaceBundles.FULL_UPPER_BOUND))
? BoundType.CLOSED : BoundType.OPEN);
NamespaceBundle nsBundle =
pulsar().getNamespaceService().getNamespaceBundleFactory().getBundle(fqnn,
hashRange);
- NamespaceBundles nsBundles =
pulsar().getNamespaceService().getNamespaceBundleFactory().getBundles(fqnn,
- bundles);
- nsBundles.validateBundle(nsBundle);
- return nsBundle;
+ return
pulsar().getNamespaceService().getNamespaceBundleFactory().getBundlesAsync(fqnn)
+ .thenApply(nsBundles -> {
+ try {
+ nsBundles.validateBundle(nsBundle);
+ return nsBundle;
+ } catch (IllegalArgumentException e) {
+ log.error("Invalid bundle range, namespace={},
bundleRange={}",
+ fqnn.toString(), bundleRange, e);
+ throw new
RestException(Response.Status.PRECONDITION_FAILED, e.getMessage());
+ } catch (Exception e) {
+ log.error("Failed to validate namespace bundle,
namespace={}, bundleRange={}",
+ fqnn.toString(), bundleRange, e);
+ throw new RestException(e);
+ }
+ });
} catch (IllegalArgumentException e) {
log.error("[{}] Invalid bundle range {}/{}, {}", clientAppId(),
fqnn.toString(),
bundleRange, e.getMessage());
- throw new RestException(Response.Status.PRECONDITION_FAILED,
e.getMessage());
+ return CompletableFuture.failedFuture(
+ new RestException(Response.Status.PRECONDITION_FAILED,
e.getMessage()));
} catch (Exception e) {
log.error("[{}] Failed to validate namespace bundle {}/{}",
clientAppId(),
fqnn.toString(), bundleRange, e);
- throw new RestException(e);
+ return CompletableFuture.failedFuture(new RestException(e));
}
}
/**
* Checks whether a given bundle is currently loaded by any broker.
*/
- protected CompletableFuture<Boolean>
isBundleOwnedByAnyBroker(NamespaceName fqnn, BundlesData bundles,
- String bundleRange) {
- NamespaceBundle nsBundle = validateNamespaceBundleRange(fqnn, bundles,
bundleRange);
- NamespaceService nsService = pulsar().getNamespaceService();
-
- if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
- return nsService.checkOwnershipPresentAsync(nsBundle);
- }
-
- LookupOptions options = LookupOptions.builder()
- .authoritative(false)
- .requestHttps(isRequestHttps())
- .readOnly(true)
- .loadTopicsInBundle(false).build();
-
- return nsService.getWebServiceUrlAsync(nsBundle,
options).thenApply(Optional::isPresent);
- }
-
- protected NamespaceBundle validateNamespaceBundleOwnership(NamespaceName
fqnn, BundlesData bundles,
- String bundleRange, boolean authoritative, boolean readOnly) {
- try {
- NamespaceBundle nsBundle = validateNamespaceBundleRange(fqnn,
bundles, bundleRange);
- validateBundleOwnership(nsBundle, authoritative, readOnly);
- return nsBundle;
- } catch (WebApplicationException wae) {
- throw wae;
- } catch (Exception e) {
- log.error("[{}] Failed to validate namespace bundle {}/{}",
clientAppId(),
- fqnn.toString(), bundleRange, e);
- throw new RestException(e);
- }
+ protected CompletableFuture<Boolean>
isBundleOwnedByAnyBroker(NamespaceName fqnn, String bundleRange) {
+ return validateNamespaceBundleRangeAsync(fqnn, bundleRange)
+ .thenCompose(nsBundle -> {
+ NamespaceService nsService =
pulsar().getNamespaceService();
+ if
(ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
+ return nsService.checkOwnershipPresentAsync(nsBundle);
+ }
+ LookupOptions options = LookupOptions.builder()
+ .authoritative(false)
+ .requestHttps(isRequestHttps())
+ .readOnly(true)
+ .loadTopicsInBundle(false).build();
+ return nsService.getWebServiceUrlAsync(nsBundle,
options).thenApply(Optional::isPresent);
+ });
}
protected CompletableFuture<NamespaceBundle>
validateNamespaceBundleOwnershipAsync(
- NamespaceName fqnn, BundlesData bundles, String bundleRange,
+ NamespaceName fqnn, String bundleRange,
boolean authoritative, boolean readOnly) {
- NamespaceBundle nsBundle;
- try {
- nsBundle = validateNamespaceBundleRange(fqnn, bundles,
bundleRange);
- } catch (WebApplicationException wae) {
- return CompletableFuture.failedFuture(wae);
- }
- return validateBundleOwnershipAsync(nsBundle, authoritative, readOnly)
- .thenApply(__ -> nsBundle);
+ return validateNamespaceBundleRangeAsync(fqnn, bundleRange)
+ .thenCompose(nsBundle ->
validateBundleOwnershipAsync(nsBundle, authoritative, readOnly)
+ .thenApply(__ -> nsBundle));
}
public void validateBundleOwnership(NamespaceBundle bundle, boolean
authoritative, boolean readOnly)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index d1b16230e3e..706ac5c3d6d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -96,6 +96,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
+import org.apache.pulsar.common.naming.Constants;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
@@ -2520,4 +2521,124 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
assertEquals(namespacesResp, namespacesWithFullPath);
}
+ @Test
+ public void testBundleValidationWithNonExistentNamespace() throws
Exception {
+ String nonExistentNs = "non-existent-namespace";
+ String bundleRange = "0x00000000_0x80000000";
+
+ // Test unload on non-existent namespace - should return 404
+ // The error is thrown by validateGlobalNamespaceOwnershipAsync before
reaching bundle validation
+ AsyncResponse unloadResponse = mock(AsyncResponse.class);
+ namespaces.unloadNamespaceBundle(unloadResponse, testTenant,
Constants.GLOBAL_CLUSTER, nonExistentNs,
+ bundleRange, false, null);
+ ArgumentCaptor<RestException> unloadCaptor =
ArgumentCaptor.forClass(RestException.class);
+ verify(unloadResponse,
timeout(5000).times(1)).resume(unloadCaptor.capture());
+ assertEquals(unloadCaptor.getValue().getResponse().getStatus(),
+ Response.Status.NOT_FOUND.getStatusCode(),
+ "Non-existent namespace should return 404");
+
+ // Test split on non-existent namespace - should return 404
+ AsyncResponse splitResponse = mock(AsyncResponse.class);
+ namespaces.splitNamespaceBundle(splitResponse, testTenant,
Constants.GLOBAL_CLUSTER, nonExistentNs,
+ bundleRange, false, true, null, null);
+ ArgumentCaptor<RestException> splitCaptor =
ArgumentCaptor.forClass(RestException.class);
+ verify(splitResponse,
timeout(5000).times(1)).resume(splitCaptor.capture());
+ assertEquals(splitCaptor.getValue().getResponse().getStatus(),
+ Response.Status.NOT_FOUND.getStatusCode(),
+ "Non-existent namespace should return 404");
+
+ // Test clear backlog on non-existent namespace - should return 404
+ AsyncResponse clearResponse = mock(AsyncResponse.class);
+ namespaces.clearNamespaceBundleBacklog(clearResponse, testTenant,
Constants.GLOBAL_CLUSTER, nonExistentNs,
+ bundleRange, false);
+ ArgumentCaptor<RestException> clearCaptor =
ArgumentCaptor.forClass(RestException.class);
+ verify(clearResponse,
timeout(5000).times(1)).resume(clearCaptor.capture());
+ assertEquals(clearCaptor.getValue().getResponse().getStatus(),
+ Response.Status.NOT_FOUND.getStatusCode(),
+ "Non-existent namespace should return 404");
+
+ // Test clear backlog for subscription on non-existent namespace -
should return 404
+ AsyncResponse clearSubResponse = mock(AsyncResponse.class);
+
namespaces.clearNamespaceBundleBacklogForSubscription(clearSubResponse,
testTenant, Constants.GLOBAL_CLUSTER,
+ nonExistentNs, bundleRange, "test-sub", false);
+ ArgumentCaptor<RestException> clearSubCaptor =
ArgumentCaptor.forClass(RestException.class);
+ verify(clearSubResponse,
timeout(5000).times(1)).resume(clearSubCaptor.capture());
+ assertEquals(clearSubCaptor.getValue().getResponse().getStatus(),
+ Response.Status.NOT_FOUND.getStatusCode(),
+ "Non-existent namespace should return 404");
+ }
+
+ @Test
+ public void testBundleValidationAfterSplit() throws Exception {
+ URL localWebServiceUrl = new URL(pulsar.getSafeWebServiceAddress());
+ String bundledNsLocal = "test-bundle-validation-after-split";
+ List<String> boundaries = List.of("0x00000000", "0xffffffff");
+ BundlesData bundleData = BundlesData.builder()
+ .boundaries(boundaries)
+ .numBundles(boundaries.size() - 1)
+ .build();
+ createBundledTestNamespaces(this.testTenant, this.testLocalCluster,
bundledNsLocal, bundleData);
+ final NamespaceName testNs = NamespaceName.get(this.testTenant,
this.testLocalCluster, bundledNsLocal);
+
+ OwnershipCache mockOwnershipCache =
spy(pulsar.getNamespaceService().getOwnershipCache());
+
doReturn(CompletableFuture.completedFuture(null)).when(mockOwnershipCache)
+ .disableOwnership(any(NamespaceBundle.class));
+ Field ownership =
NamespaceService.class.getDeclaredField("ownershipCache");
+ ownership.setAccessible(true);
+ ownership.set(pulsar.getNamespaceService(), mockOwnershipCache);
+ mockWebUrl(localWebServiceUrl, testNs);
+
+ // Split the bundle
+ AsyncResponse splitResponse = mock(AsyncResponse.class);
+ namespaces.splitNamespaceBundle(splitResponse, testTenant,
testLocalCluster, bundledNsLocal,
+ "0x00000000_0xffffffff",
+ false, true, null, null);
+ ArgumentCaptor<Response> splitCaptor =
ArgumentCaptor.forClass(Response.class);
+ verify(splitResponse,
timeout(5000).times(1)).resume(splitCaptor.capture());
+
+ // Verify split was successful
+ BundlesData bundlesDataAfterSplit = (BundlesData) asyncRequests(ctx ->
namespaces.getBundlesData(ctx,
+ testTenant, testLocalCluster, bundledNsLocal));
+ assertNotNull(bundlesDataAfterSplit);
+ assertEquals(bundlesDataAfterSplit.getBoundaries().size(), 3);
+ assertEquals(bundlesDataAfterSplit.getBoundaries().get(0),
"0x00000000");
+ assertEquals(bundlesDataAfterSplit.getBoundaries().get(1),
"0x7fffffff");
+ assertEquals(bundlesDataAfterSplit.getBoundaries().get(2),
"0xffffffff");
+
+ // Now test bundle validation with the old (invalid) bundle range -
should return 412
+ AsyncResponse unloadOldBundleResponse = mock(AsyncResponse.class);
+ namespaces.unloadNamespaceBundle(unloadOldBundleResponse, testTenant,
testLocalCluster, bundledNsLocal,
+ "0x00000000_0xffffffff", false, null);
+ ArgumentCaptor<RestException> unloadOldCaptor =
ArgumentCaptor.forClass(RestException.class);
+ verify(unloadOldBundleResponse,
timeout(5000).times(1)).resume(unloadOldCaptor.capture());
+ assertEquals(unloadOldCaptor.getValue().getResponse().getStatus(),
+ Response.Status.PRECONDITION_FAILED.getStatusCode(),
+ "Old bundle range after split should return 412");
+
+ // Test bundle validation with new valid bundle ranges - should succeed
+ doReturn(true).when(nsSvc)
+ .isServiceUnitOwned(Mockito.argThat(bundle ->
bundle.getNamespaceObject().equals(testNs)));
+ doReturn(CompletableFuture.completedFuture(null)).when(nsSvc)
+ .unloadNamespaceBundle(any(NamespaceBundle.class));
+
+ AsyncResponse unloadNewBundle1Response = mock(AsyncResponse.class);
+ namespaces.unloadNamespaceBundle(unloadNewBundle1Response, testTenant,
testLocalCluster, bundledNsLocal,
+ "0x00000000_0x7fffffff", false, null);
+ ArgumentCaptor<Response> newBundle1Captor =
ArgumentCaptor.forClass(Response.class);
+ verify(unloadNewBundle1Response,
timeout(5000).times(1)).resume(newBundle1Captor.capture());
+ assertEquals(newBundle1Captor.getValue().getStatus(),
Response.Status.NO_CONTENT.getStatusCode(),
+ "New bundle range should be valid");
+
+ AsyncResponse unloadNewBundle2Response = mock(AsyncResponse.class);
+ namespaces.unloadNamespaceBundle(unloadNewBundle2Response, testTenant,
testLocalCluster, bundledNsLocal,
+ "0x7fffffff_0xffffffff", false, null);
+ ArgumentCaptor<Response> newBundle2Captor =
ArgumentCaptor.forClass(Response.class);
+ verify(unloadNewBundle2Response,
timeout(5000).times(1)).resume(newBundle2Captor.capture());
+ assertEquals(newBundle2Captor.getValue().getStatus(),
Response.Status.NO_CONTENT.getStatusCode(),
+ "New bundle range should be valid");
+
+ // cleanup
+ resetBroker();
+ }
+
}