Demogorgon314 commented on code in PR #19988:
URL: https://github.com/apache/pulsar/pull/19988#discussion_r1155046081
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -826,110 +826,156 @@ private CompletableFuture<Void> splitServiceUnit(String
serviceUnit, ServiceUnit
boundariesSet.add(subBundle.getKeyRange().upperEndpoint());
});
boundaries = new ArrayList<>(boundariesSet);
- nsBundleSplitAlgorithm =
NamespaceBundleSplitAlgorithm.SPECIFIED_POSITIONS_DIVIDE_ALGO;
+ nsBundleSplitAlgorithm =
NamespaceBundleSplitAlgorithm.SPECIFIED_POSITIONS_DIVIDE_FORCE_ALGO;
}
final AtomicInteger counter = new AtomicInteger(0);
+ var childBundles =
data.splitServiceUnitToDestBroker().keySet().stream()
+ .map(child -> bundleFactory.getBundle(
+ bundle.getNamespaceObject().toString(), child))
+ .collect(Collectors.toList());
this.splitServiceUnitOnceAndRetry(namespaceService, bundleFactory,
nsBundleSplitAlgorithm,
- bundle, boundaries, serviceUnit, data, counter, startTime,
completionFuture);
+ bundle, childBundles, boundaries, data, counter, startTime,
completionFuture);
return completionFuture;
}
+
+
@VisibleForTesting
protected void splitServiceUnitOnceAndRetry(NamespaceService
namespaceService,
NamespaceBundleFactory
bundleFactory,
NamespaceBundleSplitAlgorithm
algorithm,
- NamespaceBundle bundle,
+ NamespaceBundle parentBundle,
+ List<NamespaceBundle>
childBundles,
List<Long> boundaries,
- String serviceUnit,
- ServiceUnitStateData data,
+ ServiceUnitStateData
parentData,
AtomicInteger counter,
long startTime,
CompletableFuture<Void>
completionFuture) {
- CompletableFuture<List<NamespaceBundle>> updateFuture = new
CompletableFuture<>();
-
- namespaceService.getSplitBoundary(bundle, algorithm, boundaries)
- .thenAccept(splitBundlesPair -> {
- // Split and updateNamespaceBundles. Update may fail because of
concurrent write to Zookeeper.
- if (splitBundlesPair == null) {
- String msg = format("Bundle %s not found under namespace",
serviceUnit);
- updateFuture.completeExceptionally(new
BrokerServiceException.ServiceUnitNotReadyException(msg));
- return;
+ ownChildBundles(childBundles, parentData)
+ .thenCompose(__ -> updateSplitNamespaceBundlesAsync(
+ namespaceService, bundleFactory, algorithm,
parentBundle, childBundles, boundaries))
+ .thenAccept(__ -> // Update bundled_topic cache for
load-report-generation
+
pulsar.getBrokerService().refreshTopicToStatsMaps(parentBundle))
+ .thenAccept(__ -> pubAsync(parentBundle.toString(), new
ServiceUnitStateData(
+ Deleted, null, parentData.sourceBroker(),
getNextVersionId(parentData))))
+ .thenAccept(__ -> {
+ double splitBundleTime =
TimeUnit.NANOSECONDS.toMillis((System.nanoTime() - startTime));
+ log.info("Successfully split {} parent namespace-bundle to
{} in {} ms",
+ parentBundle, childBundles, splitBundleTime);
+ completionFuture.complete(null);
+ })
+ .exceptionally(ex -> {
+ // Retry several times on BadVersion
+ Throwable throwable =
FutureUtil.unwrapCompletionException(ex);
+ if ((throwable instanceof
MetadataStoreException.BadVersionException)
+ && (counter.incrementAndGet() <
NamespaceService.BUNDLE_SPLIT_RETRY_LIMIT)) {
+ log.warn("Failed to update bundle range in metadata
store. Retrying {} th / {} limit",
+ counter.get(),
NamespaceService.BUNDLE_SPLIT_RETRY_LIMIT, ex);
+ pulsar.getExecutor().schedule(() ->
splitServiceUnitOnceAndRetry(
+ namespaceService, bundleFactory, algorithm,
parentBundle, childBundles,
+ boundaries, parentData, counter,
startTime, completionFuture),
+ 100, MILLISECONDS);
+ } else {
+ // Retry enough, or meet other exception
+ String msg = format("Failed to split bundle %s,
Retried %d th / %d limit, reason %s",
+ parentBundle.toString(), counter.get(),
+ NamespaceService.BUNDLE_SPLIT_RETRY_LIMIT,
throwable.getMessage());
+ log.warn(msg, throwable);
+ completionFuture.completeExceptionally(
+ new
BrokerServiceException.ServiceUnitNotReadyException(msg));
+ }
+ return null;
+ });
+ }
+
+ private CompletableFuture<Void> ownChildBundles(List<NamespaceBundle>
childBundles,
+ ServiceUnitStateData
parentData) {
+ List<CompletableFuture<Void>> futures = new
ArrayList<>(childBundles.size());
+ var debug = debug();
+ for (var childBundle : childBundles) {
+ var childBundleStr = childBundle.toString();
+ var childData = tableview.get(childBundleStr);
+ if (childData != null) {
+ if (debug) {
+ log.info("Already owned child bundle:{}", childBundleStr);
+ }
+ } else {
+ childData = new ServiceUnitStateData(Owned,
parentData.sourceBroker(),
+ VERSION_ID_INIT);
+ futures.add(pubAsync(childBundleStr, childData).thenApply(__
-> null));
+ }
+ }
+
+ if (!futures.isEmpty()) {
+ return FutureUtil.waitForAll(futures);
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
+ }
+
+ private CompletableFuture<Void> updateSplitNamespaceBundlesAsync(
+ NamespaceService namespaceService,
+ NamespaceBundleFactory bundleFactory,
+ NamespaceBundleSplitAlgorithm algorithm,
+ NamespaceBundle parentBundle,
+ List<NamespaceBundle> childBundles,
+ List<Long> boundaries) {
+ CompletableFuture<Void> updateSplitNamespaceBundlesFuture = new
CompletableFuture<>();
+ var namespaceName = parentBundle.getNamespaceObject();
+ final var debug = debug();
+ var targetNsBundle =
bundleFactory.getBundles(parentBundle.getNamespaceObject());
+ boolean updated = false;
+ try {
+ targetNsBundle.validateBundle(parentBundle);
+ } catch (IllegalArgumentException e) {
+ if (debug) {
+ log.info("Namespace bundles do not contain the parent
bundle:{}",
Review Comment:
If namespace bundles do not contain the parent bundle, we shouldn't continue
to verify the child bundles, right?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]