This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 893779df2cd [improve][broker] optimize namespaceBundle validation to 
fix single-thread 100% CPU during unloading entire namespaces. (#25626)
893779df2cd is described below

commit 893779df2cd97868efe01cda22e8b2e18ea4c7a9
Author: zzb <[email protected]>
AuthorDate: Mon May 11 20:13:52 2026 +0800

    [improve][broker] optimize namespaceBundle validation to fix single-thread 
100% CPU during unloading entire namespaces. (#25626)
    
    Co-authored-by: zhaizhibo <[email protected]>
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   |  37 +++----
 .../broker/admin/impl/ResourceQuotasBase.java      |   3 +-
 .../broker/admin/v2/NonPersistentTopics.java       |   5 +-
 .../pulsar/broker/web/PulsarWebResource.java       |  96 ++++++++---------
 .../apache/pulsar/broker/admin/NamespacesTest.java | 120 +++++++++++++++++++++
 5 files changed, 181 insertions(+), 80 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index c94c59df2bf..4820a61eebe 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -612,8 +612,7 @@ public abstract class NamespacesBase extends AdminResource {
                     }
                     return future
                             .thenCompose(__ ->
-                                    
validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles,
-                                            bundleRange,
+                                    
validateNamespaceBundleOwnershipAsync(namespaceName, bundleRange,
                                             authoritative, true))
                             .thenCompose(bundle -> {
                                 return 
pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName)
@@ -1528,9 +1527,8 @@ public abstract class NamespacesBase extends 
AdminResource {
                     }
                 })
                 .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
-                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
-                .thenCompose(policies ->
-                     isBundleOwnedByAnyBroker(namespaceName, policies.bundles, 
bundleRange)
+                .thenCompose(__ ->
+                     isBundleOwnedByAnyBroker(namespaceName, bundleRange)
                         .thenCompose(flag -> {
                             if (!flag) {
                                 log.info()
@@ -1540,7 +1538,7 @@ public abstract class NamespacesBase extends 
AdminResource {
                                 return CompletableFuture.completedFuture(null);
                             }
                             Optional<String> destinationBrokerOpt = 
Optional.ofNullable(destinationBroker);
-                            return 
validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, 
bundleRange,
+                            return 
validateNamespaceBundleOwnershipAsync(namespaceName, bundleRange,
                                     authoritative, true)
                                     .thenCompose(nsBundle -> 
pulsar().getNamespaceService()
                                             .unloadNamespaceBundle(nsBundle, 
destinationBrokerOpt));
@@ -1583,10 +1581,8 @@ public abstract class NamespacesBase extends 
AdminResource {
                 .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
                 .thenCompose(__ -> getBundleRangeAsync(bundleName))
                 .thenCompose(bundleRange -> {
-                    return getNamespacePoliciesAsync(namespaceName)
-                            .thenCompose(policies ->
-                                    
validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, 
bundleRange,
-                                        authoritative, false))
+                    return 
validateNamespaceBundleOwnershipAsync(namespaceName, bundleRange,
+                                        authoritative, false)
                             .thenCompose(nsBundle -> 
pulsar().getNamespaceService().splitAndOwnBundle(nsBundle, unload,
                                     pulsar().getNamespaceService()
                                             
.getNamespaceBundleSplitAlgorithmByName(splitAlgorithmName),
@@ -1602,9 +1598,8 @@ public abstract class NamespacesBase extends 
AdminResource {
                     .log("Getting hash position for topic list , bundle");
                 return validateNamespacePolicyOperationAsync(namespaceName, 
PolicyName.PERSISTENCE,
                         PolicyOperation.READ)
-                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
-                .thenCompose(policies -> {
-                    return 
validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, 
bundleRange,
+                .thenCompose(__ -> {
+                    return 
validateNamespaceBundleOwnershipAsync(namespaceName, bundleRange,
                             false, true)
                             .thenCompose(nsBundle ->
                                     
pulsar().getNamespaceService().getOwnedTopicListForNamespaceBundle(nsBundle))
@@ -1972,11 +1967,10 @@ public abstract class NamespacesBase extends 
AdminResource {
                     // check cluster ownership for a given global namespace: 
redirect if peer-cluster owns it
                     return 
validateGlobalNamespaceOwnershipAsync(namespaceName);
                 })
-                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
-                .thenCompose(policies ->
+                .thenCompose(__ ->
                         // Allow acquiring ownership for an unassigned bundle 
so backlog can be cleared
                         // even if not loaded.
-                        validateNamespaceBundleOwnershipAsync(namespaceName, 
policies.bundles, bundleRange,
+                        validateNamespaceBundleOwnershipAsync(namespaceName, 
bundleRange,
                                 authoritative, false))
                 .thenCompose(bundle -> clearBacklogAsync(bundle, null))
                 .thenRun(() -> log.info()
@@ -2028,11 +2022,10 @@ public abstract class NamespacesBase extends 
AdminResource {
                     // check cluster ownership for a given global namespace: 
redirect if peer-cluster owns it
                     return 
validateGlobalNamespaceOwnershipAsync(namespaceName);
                 })
-                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
-                .thenCompose(policies ->
+                .thenCompose(__ ->
                         // Allow acquiring ownership for an unassigned bundle 
so backlog can be cleared
                         // even if not loaded.
-                        validateNamespaceBundleOwnershipAsync(namespaceName, 
policies.bundles, bundleRange,
+                        validateNamespaceBundleOwnershipAsync(namespaceName, 
bundleRange,
                                 authoritative, false))
                 .thenCompose(bundle -> clearBacklogAsync(bundle, subscription))
                 .thenRun(() -> log.info()
@@ -2080,10 +2073,8 @@ public abstract class NamespacesBase extends 
AdminResource {
 
         return validateNamespaceOperationAsync(namespaceName, 
NamespaceOperation.UNSUBSCRIBE)
                 .thenCompose(__ -> 
validateGlobalNamespaceOwnershipAsync(namespaceName))
-                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
-                .thenCompose(policies ->
-                        validateNamespaceBundleOwnershipAsync(namespaceName, 
policies.bundles, bundleRange,
-                                authoritative, false))
+                .thenCompose(__ -> 
validateNamespaceBundleOwnershipAsync(namespaceName, bundleRange,
+                        authoritative, false))
                 .thenCompose(bundle -> unsubscribeAsync(bundle, subscription))
                 .thenRun(() -> log.info()
                         .attr("subscription", subscription)
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ResourceQuotasBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ResourceQuotasBase.java
index 2b9c13596c1..2c448a5e9d5 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ResourceQuotasBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ResourceQuotasBase.java
@@ -69,7 +69,6 @@ public abstract class ResourceQuotasBase extends 
NamespacesBase {
                     }
                 });
         return ret
-                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
-                .thenApply(policies -> 
validateNamespaceBundleRange(namespaceName, policies.bundles, bundleRange));
+                .thenCompose(__ -> 
validateNamespaceBundleRangeAsync(namespaceName, bundleRange));
     }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index 2f33c15d77f..52c4addd991 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -478,12 +478,11 @@ public class NonPersistentTopics extends PersistentTopics 
{
                     .attr("bundleRange", bundleRange)
                     .log("list of topics on namespace bundle");
                 validateNamespaceOperation(namespaceName, 
NamespaceOperation.GET_BUNDLE);
-        Policies policies = getNamespacePolicies(namespaceName);
 
         // check cluster ownership for a given global namespace: redirect if 
peer-cluster owns it
         validateGlobalNamespaceOwnership(namespaceName);
 
-        isBundleOwnedByAnyBroker(namespaceName, policies.bundles, 
bundleRange).thenAccept(flag -> {
+        isBundleOwnedByAnyBroker(namespaceName, bundleRange).thenAccept(flag 
-> {
             if (!flag) {
                 log.info()
                         .attr("namespace", namespaceName)
@@ -491,7 +490,7 @@ public class NonPersistentTopics extends PersistentTopics {
                         .log("Namespace bundle is not owned by any broker");
                 asyncResponse.resume(Response.noContent().build());
             } else {
-                validateNamespaceBundleOwnershipAsync(namespaceName, 
policies.bundles, bundleRange, true, true)
+                validateNamespaceBundleOwnershipAsync(namespaceName, 
bundleRange, true, true)
                         .thenAccept(nsBundle -> {
                             final var bundleTopics = 
pulsar().getBrokerService().getMultiLayerTopicsMap()
                                     .get(namespaceName.toString());
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 109d44ad851..fbb65de9fe7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -79,7 +79,6 @@ import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceBundles;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.ClusterDataImpl;
 import org.apache.pulsar.common.policies.data.NamespaceOperation;
@@ -585,7 +584,7 @@ public abstract class PulsarWebResource {
         return !pulsarService.getConfiguration().isAuthorizationEnabled();
     }
 
-    protected NamespaceBundle validateNamespaceBundleRange(NamespaceName fqnn, 
BundlesData bundles,
+    protected CompletableFuture<NamespaceBundle> 
validateNamespaceBundleRangeAsync(NamespaceName fqnn,
             String bundleRange) {
         try {
             checkArgument(bundleRange.contains("_"), "Invalid bundle range: " 
+ bundleRange);
@@ -596,77 +595,70 @@ public abstract class PulsarWebResource {
                     (upperEndpoint.equals(NamespaceBundles.FULL_UPPER_BOUND)) 
? BoundType.CLOSED : BoundType.OPEN);
             NamespaceBundle nsBundle = 
pulsar().getNamespaceService().getNamespaceBundleFactory().getBundle(fqnn,
                     hashRange);
-            NamespaceBundles nsBundles = 
pulsar().getNamespaceService().getNamespaceBundleFactory().getBundles(fqnn,
-                    bundles);
-            nsBundles.validateBundle(nsBundle);
-            return nsBundle;
+            return 
pulsar().getNamespaceService().getNamespaceBundleFactory().getBundlesAsync(fqnn)
+                    .thenApply(nsBundles -> {
+                        try {
+                            nsBundles.validateBundle(nsBundle);
+                            return nsBundle;
+                        } catch (IllegalArgumentException e) {
+                            log.error()
+                                    .attr("namespace", fqnn.toString())
+                                    .attr("bundleRange", bundleRange)
+                                    .exceptionMessage(e)
+                                    .log("Invalid bundle range");
+                            throw new 
RestException(Response.Status.PRECONDITION_FAILED, e.getMessage());
+                        } catch (Exception e) {
+                            log.error()
+                                    .attr("namespace", fqnn.toString())
+                                    .attr("bundleRange", bundleRange)
+                                    .exception(e)
+                                    .log("Failed to validate namespace 
bundle");
+                            throw new RestException(e);
+                        }
+                    });
         } catch (IllegalArgumentException e) {
             log.error()
                     .attr("namespace", fqnn.toString())
                     .attr("bundleRange", bundleRange)
                     .exceptionMessage(e)
                     .log("Invalid bundle range");
-            throw new RestException(Response.Status.PRECONDITION_FAILED, 
e.getMessage());
+            return CompletableFuture.failedFuture(
+                    new RestException(Response.Status.PRECONDITION_FAILED, 
e.getMessage()));
         } catch (Exception e) {
             log.error()
-                    .attr("bundle", fqnn.toString())
+                    .attr("namespace", fqnn.toString())
                     .attr("bundleRange", bundleRange)
                     .exception(e)
                     .log("Failed to validate namespace bundle");
-            throw new RestException(e);
+            return CompletableFuture.failedFuture(new RestException(e));
         }
     }
 
     /**
      * Checks whether a given bundle is currently loaded by any broker.
      */
-    protected CompletableFuture<Boolean> 
isBundleOwnedByAnyBroker(NamespaceName fqnn, BundlesData bundles,
-            String bundleRange) {
-        NamespaceBundle nsBundle = validateNamespaceBundleRange(fqnn, bundles, 
bundleRange);
-        NamespaceService nsService = pulsar().getNamespaceService();
-
-        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
-            return nsService.checkOwnershipPresentAsync(nsBundle);
-        }
-
-        LookupOptions options = LookupOptions.builder()
-                .authoritative(false)
-                .requestHttps(isRequestHttps())
-                .readOnly(true)
-                .loadTopicsInBundle(false).build();
-
-        return nsService.getWebServiceUrlAsync(nsBundle, 
options).thenApply(Optional::isPresent);
-    }
-
-    protected NamespaceBundle validateNamespaceBundleOwnership(NamespaceName 
fqnn, BundlesData bundles,
-            String bundleRange, boolean authoritative, boolean readOnly) {
-        try {
-            NamespaceBundle nsBundle = validateNamespaceBundleRange(fqnn, 
bundles, bundleRange);
-            validateBundleOwnership(nsBundle, authoritative, readOnly);
-            return nsBundle;
-        } catch (WebApplicationException wae) {
-            throw wae;
-        } catch (Exception e) {
-            log.error()
-                    .attr("bundle", fqnn.toString())
-                    .attr("bundleRange", bundleRange)
-                    .exception(e)
-                    .log("Failed to validate namespace bundle");
-            throw new RestException(e);
-        }
+    protected CompletableFuture<Boolean> 
isBundleOwnedByAnyBroker(NamespaceName fqnn, String bundleRange) {
+        return validateNamespaceBundleRangeAsync(fqnn, bundleRange)
+                .thenCompose(nsBundle -> {
+                    NamespaceService nsService = 
pulsar().getNamespaceService();
+                    if 
(ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
+                       return nsService.checkOwnershipPresentAsync(nsBundle);
+                    }
+                    LookupOptions options = LookupOptions.builder()
+                        .authoritative(false)
+                        .requestHttps(isRequestHttps())
+                        .readOnly(true)
+                        .loadTopicsInBundle(false).build();
+                    return nsService.getWebServiceUrlAsync(nsBundle, 
options).thenApply(Optional::isPresent);
+                });
     }
 
     protected CompletableFuture<NamespaceBundle> 
validateNamespaceBundleOwnershipAsync(
-            NamespaceName fqnn, BundlesData bundles, String bundleRange,
+            NamespaceName fqnn, String bundleRange,
             boolean authoritative, boolean readOnly) {
-        NamespaceBundle nsBundle;
-        try {
-            nsBundle = validateNamespaceBundleRange(fqnn, bundles, 
bundleRange);
-        } catch (WebApplicationException wae) {
-            return CompletableFuture.failedFuture(wae);
-        }
-        return validateBundleOwnershipAsync(nsBundle, authoritative, readOnly)
-                .thenApply(__ -> nsBundle);
+        return validateNamespaceBundleRangeAsync(fqnn, bundleRange)
+                .thenCompose(nsBundle -> 
validateBundleOwnershipAsync(nsBundle, authoritative, readOnly)
+                        .thenApply(__ -> nsBundle));
     }
 
     public void validateBundleOwnership(NamespaceBundle bundle, boolean 
authoritative, boolean readOnly)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index 4ac64405c60..e3f5dba1e6b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -2571,4 +2571,124 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
         assertEquals(namespacesResp, namespacesWithFullPath);
     }
 
+    @Test
+    public void testBundleValidationWithNonExistentNamespace() throws 
Exception {
+        String nonExistentNs = "non-existent-namespace";
+        String bundleRange = "0x00000000_0x80000000";
+
+        // Test unload on non-existent namespace - should return 404
+        // The error is thrown by validateGlobalNamespaceOwnershipAsync before 
reaching bundle validation
+        AsyncResponse unloadResponse = mock(AsyncResponse.class);
+        namespaces.unloadNamespaceBundle(unloadResponse, testTenant, 
nonExistentNs,
+                bundleRange, false, null);
+        ArgumentCaptor<RestException> unloadCaptor = 
ArgumentCaptor.forClass(RestException.class);
+        verify(unloadResponse, 
timeout(5000).times(1)).resume(unloadCaptor.capture());
+        assertEquals(unloadCaptor.getValue().getResponse().getStatus(),
+                Response.Status.NOT_FOUND.getStatusCode(),
+                "Non-existent namespace should return 404");
+
+        // Test split on non-existent namespace - should return 404
+        AsyncResponse splitResponse = mock(AsyncResponse.class);
+        namespaces.splitNamespaceBundle(splitResponse, testTenant, 
nonExistentNs,
+                bundleRange, false, true, null, null);
+        ArgumentCaptor<RestException> splitCaptor = 
ArgumentCaptor.forClass(RestException.class);
+        verify(splitResponse, 
timeout(5000).times(1)).resume(splitCaptor.capture());
+        assertEquals(splitCaptor.getValue().getResponse().getStatus(),
+                Response.Status.NOT_FOUND.getStatusCode(),
+                "Non-existent namespace should return 404");
+
+        // Test clear backlog on non-existent namespace - should return 404
+        AsyncResponse clearResponse = mock(AsyncResponse.class);
+        namespaces.clearNamespaceBundleBacklog(clearResponse, testTenant, 
nonExistentNs,
+                bundleRange, false);
+        ArgumentCaptor<RestException> clearCaptor = 
ArgumentCaptor.forClass(RestException.class);
+        verify(clearResponse, 
timeout(5000).times(1)).resume(clearCaptor.capture());
+        assertEquals(clearCaptor.getValue().getResponse().getStatus(),
+                Response.Status.NOT_FOUND.getStatusCode(),
+                "Non-existent namespace should return 404");
+
+        // Test clear backlog for subscription on non-existent namespace - 
should return 404
+        AsyncResponse clearSubResponse = mock(AsyncResponse.class);
+        
namespaces.clearNamespaceBundleBacklogForSubscription(clearSubResponse, 
testTenant, nonExistentNs,
+                bundleRange, "test-sub", false);
+        ArgumentCaptor<RestException> clearSubCaptor = 
ArgumentCaptor.forClass(RestException.class);
+        verify(clearSubResponse, 
timeout(5000).times(1)).resume(clearSubCaptor.capture());
+        assertEquals(clearSubCaptor.getValue().getResponse().getStatus(),
+                Response.Status.NOT_FOUND.getStatusCode(),
+                "Non-existent namespace should return 404");
+    }
+
+    @Test
+    public void testBundleValidationAfterSplit() throws Exception {
+        URL localWebServiceUrl = new URL(pulsar.getSafeWebServiceAddress());
+        String bundledNsLocal = "test-bundle-validation-after-split";
+        List<String> boundaries = List.of("0x00000000", "0xffffffff");
+        BundlesData bundleData = BundlesData.builder()
+                .boundaries(boundaries)
+                .numBundles(boundaries.size() - 1)
+                .build();
+        createBundledTestNamespaces(this.testTenant, bundledNsLocal, 
bundleData);
+        final NamespaceName testNs = NamespaceName.get(this.testTenant, 
bundledNsLocal);
+
+        OwnershipCache mockOwnershipCache = 
spy(pulsar.getNamespaceService().getOwnershipCache());
+        
doReturn(CompletableFuture.completedFuture(null)).when(mockOwnershipCache)
+                .disableOwnership(any(NamespaceBundle.class));
+        Field ownership = 
NamespaceService.class.getDeclaredField("ownershipCache");
+        ownership.setAccessible(true);
+        ownership.set(pulsar.getNamespaceService(), mockOwnershipCache);
+        mockWebUrl(localWebServiceUrl, testNs);
+
+        // Split the bundle
+        AsyncResponse splitResponse = mock(AsyncResponse.class);
+        namespaces.splitNamespaceBundle(splitResponse, testTenant, 
bundledNsLocal,
+                "0x00000000_0xffffffff",
+                false, true, null, null);
+        ArgumentCaptor<Response> splitCaptor = 
ArgumentCaptor.forClass(Response.class);
+        verify(splitResponse, 
timeout(5000).times(1)).resume(splitCaptor.capture());
+
+        // Verify split was successful
+        BundlesData bundlesDataAfterSplit = (BundlesData) asyncRequests(ctx -> 
namespaces.getBundlesData(ctx,
+                testTenant, bundledNsLocal));
+        assertNotNull(bundlesDataAfterSplit);
+        assertEquals(bundlesDataAfterSplit.getBoundaries().size(), 3);
+        assertEquals(bundlesDataAfterSplit.getBoundaries().get(0), 
"0x00000000");
+        assertEquals(bundlesDataAfterSplit.getBoundaries().get(1), 
"0x7fffffff");
+        assertEquals(bundlesDataAfterSplit.getBoundaries().get(2), 
"0xffffffff");
+
+        // Now test bundle validation with the old (invalid) bundle range - 
should return 412
+        AsyncResponse unloadOldBundleResponse = mock(AsyncResponse.class);
+        namespaces.unloadNamespaceBundle(unloadOldBundleResponse, testTenant, 
bundledNsLocal,
+                "0x00000000_0xffffffff", false, null);
+        ArgumentCaptor<RestException> unloadOldCaptor = 
ArgumentCaptor.forClass(RestException.class);
+        verify(unloadOldBundleResponse, 
timeout(5000).times(1)).resume(unloadOldCaptor.capture());
+        assertEquals(unloadOldCaptor.getValue().getResponse().getStatus(),
+                Response.Status.PRECONDITION_FAILED.getStatusCode(),
+                "Old bundle range after split should return 412");
+
+        // Test bundle validation with new valid bundle ranges - should succeed
+        doReturn(true).when(nsSvc)
+                .isServiceUnitOwned(Mockito.argThat(bundle -> 
bundle.getNamespaceObject().equals(testNs)));
+        doReturn(CompletableFuture.completedFuture(null)).when(nsSvc)
+                .unloadNamespaceBundle(any(NamespaceBundle.class));
+
+        AsyncResponse unloadNewBundle1Response = mock(AsyncResponse.class);
+        namespaces.unloadNamespaceBundle(unloadNewBundle1Response, testTenant, 
bundledNsLocal,
+                "0x00000000_0x7fffffff", false, null);
+        ArgumentCaptor<Response> newBundle1Captor = 
ArgumentCaptor.forClass(Response.class);
+        verify(unloadNewBundle1Response, 
timeout(5000).times(1)).resume(newBundle1Captor.capture());
+        assertEquals(newBundle1Captor.getValue().getStatus(), 
Response.Status.NO_CONTENT.getStatusCode(),
+                "New bundle range should be valid");
+
+        AsyncResponse unloadNewBundle2Response = mock(AsyncResponse.class);
+        namespaces.unloadNamespaceBundle(unloadNewBundle2Response, testTenant, 
bundledNsLocal,
+                "0x7fffffff_0xffffffff", false, null);
+        ArgumentCaptor<Response> newBundle2Captor = 
ArgumentCaptor.forClass(Response.class);
+        verify(unloadNewBundle2Response, 
timeout(5000).times(1)).resume(newBundle2Captor.capture());
+        assertEquals(newBundle2Captor.getValue().getStatus(), 
Response.Status.NO_CONTENT.getStatusCode(),
+                "New bundle range should be valid");
+
+        // cleanup
+        resetBroker();
+    }
+
 }

Reply via email to