This is an automated email from the ASF dual-hosted git repository.
Technoboy- pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.2 by this push:
new 34354be6a68 [improve][broker] optimize namespaceBundle validation to
fix single-thread 100% CPU during unloading entire namespaces (#25626)
34354be6a68 is described below
commit 34354be6a6856b6bd3961f849eefd5d7b40844fe
Author: Jiwei Guo <[email protected]>
AuthorDate: Tue May 12 19:31:40 2026 +0800
[improve][broker] optimize namespaceBundle validation to fix single-thread
100% CPU during unloading entire namespaces (#25626)
---
.../pulsar/broker/admin/impl/NamespacesBase.java | 36 +++----
.../broker/admin/impl/ResourceQuotasBase.java | 3 +-
.../broker/admin/v2/NonPersistentTopics.java | 5 +-
.../pulsar/broker/web/PulsarWebResource.java | 85 +++++++--------
.../apache/pulsar/broker/admin/NamespacesTest.java | 120 +++++++++++++++++++++
5 files changed, 174 insertions(+), 75 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 0b8a0591ae1..1e99aa0716b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -596,8 +596,7 @@ public abstract class NamespacesBase extends AdminResource {
}
return future
.thenCompose(__ ->
-
validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles,
- bundleRange,
+
validateNamespaceBundleOwnershipAsync(namespaceName, bundleRange,
authoritative, true))
.thenCompose(bundle -> {
return
pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName)
@@ -1459,9 +1458,8 @@ public abstract class NamespacesBase extends
AdminResource {
}
})
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
- .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
- .thenCompose(policies ->
- isBundleOwnedByAnyBroker(namespaceName, policies.bundles,
bundleRange)
+ .thenCompose(__ ->
+ isBundleOwnedByAnyBroker(namespaceName, bundleRange)
.thenCompose(flag -> {
if (!flag) {
log.info("[{}] Namespace bundle is not owned
by any broker {}/{}", clientAppId(),
@@ -1469,7 +1467,7 @@ public abstract class NamespacesBase extends
AdminResource {
return CompletableFuture.completedFuture(null);
}
Optional<String> destinationBrokerOpt =
Optional.ofNullable(destinationBroker);
- return
validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles,
bundleRange,
+ return
validateNamespaceBundleOwnershipAsync(namespaceName, bundleRange,
authoritative, true)
.thenCompose(nsBundle ->
pulsar().getNamespaceService()
.unloadNamespaceBundle(nsBundle,
destinationBrokerOpt));
@@ -1509,10 +1507,8 @@ public abstract class NamespacesBase extends
AdminResource {
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> getBundleRangeAsync(bundleName))
.thenCompose(bundleRange -> {
- return getNamespacePoliciesAsync(namespaceName)
- .thenCompose(policies ->
-
validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles,
bundleRange,
- authoritative, false))
+ return
validateNamespaceBundleOwnershipAsync(namespaceName, bundleRange,
+ authoritative, false)
.thenCompose(nsBundle ->
pulsar().getNamespaceService().splitAndOwnBundle(nsBundle, unload,
pulsar().getNamespaceService()
.getNamespaceBundleSplitAlgorithmByName(splitAlgorithmName),
@@ -1526,9 +1522,8 @@ public abstract class NamespacesBase extends
AdminResource {
log.debug("[{}] Getting hash position for topic list {}, bundle
{}", clientAppId(), topics, bundleRange);
}
return validateNamespacePolicyOperationAsync(namespaceName,
PolicyName.PERSISTENCE, PolicyOperation.READ)
- .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
- .thenCompose(policies -> {
- return
validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles,
bundleRange,
+ .thenCompose(__ -> {
+ return
validateNamespaceBundleOwnershipAsync(namespaceName, bundleRange,
false, true)
.thenCompose(nsBundle ->
pulsar().getNamespaceService().getOwnedTopicListForNamespaceBundle(nsBundle))
@@ -1859,11 +1854,10 @@ public abstract class NamespacesBase extends
AdminResource {
// check cluster ownership for a given global namespace:
redirect if peer-cluster owns it
return
validateGlobalNamespaceOwnershipAsync(namespaceName);
})
- .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
- .thenCompose(policies ->
+ .thenCompose(__ ->
// Allow acquiring ownership for an unassigned bundle
so backlog can be cleared
// even if not loaded.
- validateNamespaceBundleOwnershipAsync(namespaceName,
policies.bundles, bundleRange,
+ validateNamespaceBundleOwnershipAsync(namespaceName,
bundleRange,
authoritative, false))
.thenCompose(bundle -> clearBacklogAsync(bundle, null))
.thenRun(() -> log.info("[{}] Successfully cleared backlog on
namespace bundle {}/{}", clientAppId(),
@@ -1912,11 +1906,10 @@ public abstract class NamespacesBase extends
AdminResource {
// check cluster ownership for a given global namespace:
redirect if peer-cluster owns it
return
validateGlobalNamespaceOwnershipAsync(namespaceName);
})
- .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
- .thenCompose(policies ->
+ .thenCompose(__ ->
// Allow acquiring ownership for an unassigned bundle
so backlog can be cleared
// even if not loaded.
- validateNamespaceBundleOwnershipAsync(namespaceName,
policies.bundles, bundleRange,
+ validateNamespaceBundleOwnershipAsync(namespaceName,
bundleRange,
authoritative, false))
.thenCompose(bundle -> clearBacklogAsync(bundle, subscription))
.thenRun(() -> log.info(
@@ -1960,9 +1953,8 @@ public abstract class NamespacesBase extends
AdminResource {
return validateNamespaceOperationAsync(namespaceName,
NamespaceOperation.UNSUBSCRIBE)
.thenCompose(__ ->
validateGlobalNamespaceOwnershipAsync(namespaceName))
- .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
- .thenCompose(policies ->
- validateNamespaceBundleOwnershipAsync(namespaceName,
policies.bundles, bundleRange,
+ .thenCompose(__ ->
+ validateNamespaceBundleOwnershipAsync(namespaceName,
bundleRange,
authoritative, false))
.thenCompose(bundle -> unsubscribeAsync(bundle, subscription))
.thenRun(() -> log.info("[{}] Successfully unsubscribed {} on
namespace bundle {}/{}",
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ResourceQuotasBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ResourceQuotasBase.java
index a1a3ce65583..8e1956e6d0d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ResourceQuotasBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ResourceQuotasBase.java
@@ -71,7 +71,6 @@ public abstract class ResourceQuotasBase extends
NamespacesBase {
}
});
return ret
- .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
- .thenApply(policies ->
validateNamespaceBundleRange(namespaceName, policies.bundles, bundleRange));
+ .thenCompose(__ ->
validateNamespaceBundleRangeAsync(namespaceName, bundleRange));
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index 16cbc7104d4..cf06b84c365 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -463,18 +463,17 @@ public class NonPersistentTopics extends PersistentTopics
{
}
validateNamespaceOperation(namespaceName,
NamespaceOperation.GET_BUNDLE);
- Policies policies = getNamespacePolicies(namespaceName);
// check cluster ownership for a given global namespace: redirect if
peer-cluster owns it
validateGlobalNamespaceOwnership(namespaceName);
- isBundleOwnedByAnyBroker(namespaceName, policies.bundles,
bundleRange).thenAccept(flag -> {
+ isBundleOwnedByAnyBroker(namespaceName, bundleRange).thenAccept(flag
-> {
if (!flag) {
log.info("[{}] Namespace bundle is not owned by any broker
{}/{}", clientAppId(), namespaceName,
bundleRange);
asyncResponse.resume(Response.noContent().build());
} else {
- validateNamespaceBundleOwnershipAsync(namespaceName,
policies.bundles, bundleRange, true, true)
+ validateNamespaceBundleOwnershipAsync(namespaceName,
bundleRange, true, true)
.thenAccept(nsBundle -> {
final var bundleTopics =
pulsar().getBrokerService().getMultiLayerTopicsMap()
.get(namespaceName.toString());
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 3fe51df90f7..41c1e709aee 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -79,7 +79,6 @@ import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
@@ -563,7 +562,7 @@ public abstract class PulsarWebResource {
return !pulsarService.getConfiguration().isAuthorizationEnabled();
}
- protected NamespaceBundle validateNamespaceBundleRange(NamespaceName fqnn,
BundlesData bundles,
+ protected CompletableFuture<NamespaceBundle>
validateNamespaceBundleRangeAsync(NamespaceName fqnn,
String bundleRange) {
try {
checkArgument(bundleRange.contains("_"), "Invalid bundle range: "
+ bundleRange);
@@ -574,68 +573,58 @@ public abstract class PulsarWebResource {
(upperEndpoint.equals(NamespaceBundles.FULL_UPPER_BOUND))
? BoundType.CLOSED : BoundType.OPEN);
NamespaceBundle nsBundle =
pulsar().getNamespaceService().getNamespaceBundleFactory().getBundle(fqnn,
hashRange);
- NamespaceBundles nsBundles =
pulsar().getNamespaceService().getNamespaceBundleFactory().getBundles(fqnn,
- bundles);
- nsBundles.validateBundle(nsBundle);
- return nsBundle;
+ return
pulsar().getNamespaceService().getNamespaceBundleFactory().getBundlesAsync(fqnn)
+ .thenApply(nsBundles -> {
+ try {
+ nsBundles.validateBundle(nsBundle);
+ return nsBundle;
+ } catch (IllegalArgumentException e) {
+ log.error("Invalid bundle range, namespace={},
bundleRange={}",
+ fqnn.toString(), bundleRange, e);
+ throw new
RestException(Response.Status.PRECONDITION_FAILED, e.getMessage());
+ } catch (Exception e) {
+ log.error("Failed to validate namespace bundle,
namespace={}, bundleRange={}",
+ fqnn.toString(), bundleRange, e);
+ throw new RestException(e);
+ }
+ });
} catch (IllegalArgumentException e) {
log.error("[{}] Invalid bundle range {}/{}, {}", clientAppId(),
fqnn.toString(),
bundleRange, e.getMessage());
- throw new RestException(Response.Status.PRECONDITION_FAILED,
e.getMessage());
+ return CompletableFuture.failedFuture(
+ new RestException(Response.Status.PRECONDITION_FAILED,
e.getMessage()));
} catch (Exception e) {
log.error("[{}] Failed to validate namespace bundle {}/{}",
clientAppId(),
fqnn.toString(), bundleRange, e);
- throw new RestException(e);
+ return CompletableFuture.failedFuture(new RestException(e));
}
}
/**
* Checks whether a given bundle is currently loaded by any broker.
*/
- protected CompletableFuture<Boolean>
isBundleOwnedByAnyBroker(NamespaceName fqnn, BundlesData bundles,
- String bundleRange) {
- NamespaceBundle nsBundle = validateNamespaceBundleRange(fqnn, bundles,
bundleRange);
- NamespaceService nsService = pulsar().getNamespaceService();
-
- if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
- return nsService.checkOwnershipPresentAsync(nsBundle);
- }
-
- LookupOptions options = LookupOptions.builder()
- .authoritative(false)
- .requestHttps(isRequestHttps())
- .readOnly(true)
- .loadTopicsInBundle(false).build();
-
- return nsService.getWebServiceUrlAsync(nsBundle,
options).thenApply(Optional::isPresent);
- }
-
- protected NamespaceBundle validateNamespaceBundleOwnership(NamespaceName
fqnn, BundlesData bundles,
- String bundleRange, boolean authoritative, boolean readOnly) {
- try {
- NamespaceBundle nsBundle = validateNamespaceBundleRange(fqnn,
bundles, bundleRange);
- validateBundleOwnership(nsBundle, authoritative, readOnly);
- return nsBundle;
- } catch (WebApplicationException wae) {
- throw wae;
- } catch (Exception e) {
- log.error("[{}] Failed to validate namespace bundle {}/{}",
clientAppId(),
- fqnn.toString(), bundleRange, e);
- throw new RestException(e);
- }
+ protected CompletableFuture<Boolean>
isBundleOwnedByAnyBroker(NamespaceName fqnn, String bundleRange) {
+ return validateNamespaceBundleRangeAsync(fqnn, bundleRange)
+ .thenCompose(nsBundle -> {
+ NamespaceService nsService =
pulsar().getNamespaceService();
+ if
(ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
+ return nsService.checkOwnershipPresentAsync(nsBundle);
+ }
+ LookupOptions options = LookupOptions.builder()
+ .authoritative(false)
+ .requestHttps(isRequestHttps())
+ .readOnly(true)
+ .loadTopicsInBundle(false).build();
+ return nsService.getWebServiceUrlAsync(nsBundle,
options).thenApply(Optional::isPresent);
+ });
}
protected CompletableFuture<NamespaceBundle>
validateNamespaceBundleOwnershipAsync(
- NamespaceName fqnn, BundlesData bundles, String bundleRange,
+ NamespaceName fqnn, String bundleRange,
boolean authoritative, boolean readOnly) {
- NamespaceBundle nsBundle;
- try {
- nsBundle = validateNamespaceBundleRange(fqnn, bundles,
bundleRange);
- } catch (WebApplicationException wae) {
- return CompletableFuture.failedFuture(wae);
- }
- return validateBundleOwnershipAsync(nsBundle, authoritative, readOnly)
- .thenApply(__ -> nsBundle);
+ return validateNamespaceBundleRangeAsync(fqnn, bundleRange)
+ .thenCompose(nsBundle ->
validateBundleOwnershipAsync(nsBundle, authoritative, readOnly)
+ .thenApply(__ -> nsBundle));
}
public void validateBundleOwnership(NamespaceBundle bundle, boolean
authoritative, boolean readOnly)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index c40e8a84c5e..6ff2d8a8216 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -2559,4 +2559,124 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
assertEquals(namespacesResp, namespacesWithFullPath);
}
+ @Test
+ public void testBundleValidationWithNonExistentNamespace() throws
Exception {
+ String nonExistentNs = "non-existent-namespace";
+ String bundleRange = "0x00000000_0x80000000";
+
+ // Test unload on non-existent namespace - should return 404
+ // The error is thrown by validateGlobalNamespaceOwnershipAsync before
reaching bundle validation
+ AsyncResponse unloadResponse = mock(AsyncResponse.class);
+ namespaces.unloadNamespaceBundle(unloadResponse, testTenant,
nonExistentNs,
+ bundleRange, false, null);
+ ArgumentCaptor<RestException> unloadCaptor =
ArgumentCaptor.forClass(RestException.class);
+ verify(unloadResponse,
timeout(5000).times(1)).resume(unloadCaptor.capture());
+ assertEquals(unloadCaptor.getValue().getResponse().getStatus(),
+ Response.Status.NOT_FOUND.getStatusCode(),
+ "Non-existent namespace should return 404");
+
+ // Test split on non-existent namespace - should return 404
+ AsyncResponse splitResponse = mock(AsyncResponse.class);
+ namespaces.splitNamespaceBundle(splitResponse, testTenant,
nonExistentNs,
+ bundleRange, false, true, null, null);
+ ArgumentCaptor<RestException> splitCaptor =
ArgumentCaptor.forClass(RestException.class);
+ verify(splitResponse,
timeout(5000).times(1)).resume(splitCaptor.capture());
+ assertEquals(splitCaptor.getValue().getResponse().getStatus(),
+ Response.Status.NOT_FOUND.getStatusCode(),
+ "Non-existent namespace should return 404");
+
+ // Test clear backlog on non-existent namespace - should return 404
+ AsyncResponse clearResponse = mock(AsyncResponse.class);
+ namespaces.clearNamespaceBundleBacklog(clearResponse, testTenant,
nonExistentNs,
+ bundleRange, false);
+ ArgumentCaptor<RestException> clearCaptor =
ArgumentCaptor.forClass(RestException.class);
+ verify(clearResponse,
timeout(5000).times(1)).resume(clearCaptor.capture());
+ assertEquals(clearCaptor.getValue().getResponse().getStatus(),
+ Response.Status.NOT_FOUND.getStatusCode(),
+ "Non-existent namespace should return 404");
+
+ // Test clear backlog for subscription on non-existent namespace -
should return 404
+ AsyncResponse clearSubResponse = mock(AsyncResponse.class);
+
namespaces.clearNamespaceBundleBacklogForSubscription(clearSubResponse,
testTenant, nonExistentNs,
+ bundleRange, "test-sub", false);
+ ArgumentCaptor<RestException> clearSubCaptor =
ArgumentCaptor.forClass(RestException.class);
+ verify(clearSubResponse,
timeout(5000).times(1)).resume(clearSubCaptor.capture());
+ assertEquals(clearSubCaptor.getValue().getResponse().getStatus(),
+ Response.Status.NOT_FOUND.getStatusCode(),
+ "Non-existent namespace should return 404");
+ }
+
+ @Test
+ public void testBundleValidationAfterSplit() throws Exception {
+ URL localWebServiceUrl = new URL(pulsar.getSafeWebServiceAddress());
+ String bundledNsLocal = "test-bundle-validation-after-split";
+ List<String> boundaries = List.of("0x00000000", "0xffffffff");
+ BundlesData bundleData = BundlesData.builder()
+ .boundaries(boundaries)
+ .numBundles(boundaries.size() - 1)
+ .build();
+ createBundledTestNamespaces(this.testTenant, bundledNsLocal,
bundleData);
+ final NamespaceName testNs = NamespaceName.get(this.testTenant,
bundledNsLocal);
+
+ OwnershipCache mockOwnershipCache =
spy(pulsar.getNamespaceService().getOwnershipCache());
+
doReturn(CompletableFuture.completedFuture(null)).when(mockOwnershipCache)
+ .disableOwnership(any(NamespaceBundle.class));
+ Field ownership =
NamespaceService.class.getDeclaredField("ownershipCache");
+ ownership.setAccessible(true);
+ ownership.set(pulsar.getNamespaceService(), mockOwnershipCache);
+ mockWebUrl(localWebServiceUrl, testNs);
+
+ // Split the bundle
+ AsyncResponse splitResponse = mock(AsyncResponse.class);
+ namespaces.splitNamespaceBundle(splitResponse, testTenant,
bundledNsLocal,
+ "0x00000000_0xffffffff",
+ false, true, null, null);
+ ArgumentCaptor<Response> splitCaptor =
ArgumentCaptor.forClass(Response.class);
+ verify(splitResponse,
timeout(5000).times(1)).resume(splitCaptor.capture());
+
+ // Verify split was successful
+ BundlesData bundlesDataAfterSplit = (BundlesData) asyncRequests(ctx ->
namespaces.getBundlesData(ctx,
+ testTenant, bundledNsLocal));
+ assertNotNull(bundlesDataAfterSplit);
+ assertEquals(bundlesDataAfterSplit.getBoundaries().size(), 3);
+ assertEquals(bundlesDataAfterSplit.getBoundaries().get(0),
"0x00000000");
+ assertEquals(bundlesDataAfterSplit.getBoundaries().get(1),
"0x7fffffff");
+ assertEquals(bundlesDataAfterSplit.getBoundaries().get(2),
"0xffffffff");
+
+ // Now test bundle validation with the old (invalid) bundle range -
should return 412
+ AsyncResponse unloadOldBundleResponse = mock(AsyncResponse.class);
+ namespaces.unloadNamespaceBundle(unloadOldBundleResponse, testTenant,
bundledNsLocal,
+ "0x00000000_0xffffffff", false, null);
+ ArgumentCaptor<RestException> unloadOldCaptor =
ArgumentCaptor.forClass(RestException.class);
+ verify(unloadOldBundleResponse,
timeout(5000).times(1)).resume(unloadOldCaptor.capture());
+ assertEquals(unloadOldCaptor.getValue().getResponse().getStatus(),
+ Response.Status.PRECONDITION_FAILED.getStatusCode(),
+ "Old bundle range after split should return 412");
+
+ // Test bundle validation with new valid bundle ranges - should succeed
+ doReturn(true).when(nsSvc)
+ .isServiceUnitOwned(Mockito.argThat(bundle ->
bundle.getNamespaceObject().equals(testNs)));
+ doReturn(CompletableFuture.completedFuture(null)).when(nsSvc)
+ .unloadNamespaceBundle(any(NamespaceBundle.class));
+
+ AsyncResponse unloadNewBundle1Response = mock(AsyncResponse.class);
+ namespaces.unloadNamespaceBundle(unloadNewBundle1Response, testTenant,
bundledNsLocal,
+ "0x00000000_0x7fffffff", false, null);
+ ArgumentCaptor<Response> newBundle1Captor =
ArgumentCaptor.forClass(Response.class);
+ verify(unloadNewBundle1Response,
timeout(5000).times(1)).resume(newBundle1Captor.capture());
+ assertEquals(newBundle1Captor.getValue().getStatus(),
Response.Status.NO_CONTENT.getStatusCode(),
+ "New bundle range should be valid");
+
+ AsyncResponse unloadNewBundle2Response = mock(AsyncResponse.class);
+ namespaces.unloadNamespaceBundle(unloadNewBundle2Response, testTenant,
bundledNsLocal,
+ "0x7fffffff_0xffffffff", false, null);
+ ArgumentCaptor<Response> newBundle2Captor =
ArgumentCaptor.forClass(Response.class);
+ verify(unloadNewBundle2Response,
timeout(5000).times(1)).resume(newBundle2Captor.capture());
+ assertEquals(newBundle2Captor.getValue().getStatus(),
Response.Status.NO_CONTENT.getStatusCode(),
+ "New bundle range should be valid");
+
+ // cleanup
+ resetBroker();
+ }
+
}