heesung-sn commented on code in PR #18858:
URL: https://github.com/apache/pulsar/pull/18858#discussion_r1050316454


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -510,25 +524,120 @@ private CompletableFuture<Integer> 
closeServiceUnit(String serviceUnit) {
                 });
     }
 
-    private CompletableFuture<Void> splitServiceUnit(String serviceUnit) {
-        // TODO: after the split we need to write the child ownerships to BSC 
instead of ZK.
+    private CompletableFuture<Void> splitServiceUnit(String serviceUnit, 
ServiceUnitStateData data) {
+        // Write the child ownerships to BSC.
         long startTime = System.nanoTime();
-        return pulsar.getNamespaceService()
-                .splitAndOwnBundle(getNamespaceBundle(serviceUnit),
-                        false,
-                        
NamespaceBundleSplitAlgorithm.of(pulsar.getConfig().getDefaultNamespaceBundleSplitAlgorithm()),
-                        null)
-                .whenComplete((__, ex) -> {
-                    double splitBundleTime = TimeUnit.NANOSECONDS
-                            .toMillis((System.nanoTime() - startTime));
-                    if (ex == null) {
-                        log.info("Successfully split {} namespace-bundle in {} 
ms",
-                                serviceUnit, splitBundleTime);
-                    } else {
-                        log.error("Failed to split {} namespace-bundle in {} 
ms",
-                                serviceUnit, splitBundleTime, ex);
-                    }
-                });
+        NamespaceService namespaceService = pulsar.getNamespaceService();
+        NamespaceBundleFactory bundleFactory = 
namespaceService.getNamespaceBundleFactory();
+        NamespaceBundle bundle = getNamespaceBundle(serviceUnit);
+        CompletableFuture<Void> completionFuture = new CompletableFuture<>();
+        final AtomicInteger counter = new AtomicInteger(0);
+        this.splitServiceUnitOnceAndRetry(namespaceService, bundleFactory, 
bundle, serviceUnit, data,
+                counter, startTime, completionFuture);
+        return completionFuture;
+    }
+
+    @VisibleForTesting
+    protected void splitServiceUnitOnceAndRetry(NamespaceService 
namespaceService,
+                                                NamespaceBundleFactory 
bundleFactory,
+                                                NamespaceBundle bundle,
+                                                String serviceUnit,
+                                                ServiceUnitStateData data,
+                                                AtomicInteger counter,
+                                                long startTime,
+                                                CompletableFuture<Void> 
completionFuture) {
+        CompletableFuture<List<NamespaceBundle>> updateFuture = new 
CompletableFuture<>();
+
+        getSplitBoundary(bundle).thenAccept(splitBundles -> {
+            // Split and updateNamespaceBundles. Update may fail because of 
concurrent write to Zookeeper.
+            if (splitBundles == null) {
+                String msg = format("Bundle %s not found under namespace", 
serviceUnit);
+                updateFuture.completeExceptionally(new 
BrokerServiceException.ServiceUnitNotReadyException(msg));
+                return;
+            }
+            List<CompletableFuture<Void>> futures = new ArrayList<>();
+            ServiceUnitStateData next = new ServiceUnitStateData(Owned, 
data.broker());
+            for (NamespaceBundle sBundle : splitBundles.getRight()) {
+                futures.add(pubAsync(sBundle.toString(), next).thenAccept(__ 
-> {}));
+            }
+            NamespaceName nsname = bundle.getNamespaceObject();
+            FutureUtil.waitForAll(futures).thenRun(() ->

Review Comment:
   Sorry, I think we need to revert this next back to `ServiceUnitStateData 
next = new ServiceUnitStateData(Owned, data.broker());`
   
   `Free->Assigned(Non-Transfer)` is for the state when broker A trying to 
assign bundle ownership to broker B when there is no ownership, `Free`. (Since 
broker A assumes the broker B is online, `Assigned-> Owned` change requires 
broker B to to acknowledge `Assigned` by `Owned` broadcast. 
   
   As we defined in the `ServiceUnitState` state diagram, I think the 
children's state can be `Owned` initially (does not need to be `Assigned`, as 
the parent broker(the same broker) always owns the children).
   
   My initial concern was possible racing conditions if the broker does not 
wait until it receives its `Owned` message in its table-view, before updating 
the bundle partitions in ZK(case 2 below). I think this waiting seems redundant 
but still safe to have it. 
   
   Let's review the racing conditions here.
   
   case 1: when lookup parent(before updating zk bundle partitions) bundle 
before/after the broker receives children `Owned` in its table-view
   => happy case: the broker immediately returns the parent bundle lookup 
result(the same broker).
   
   case 2: when lookup children(after updating zk bundle partitions) bundles 
before the broker receives children `Owned` in its table-view
   => unhappy case: the broker runs the bundle assignment logic and probably 
tries to assign the ownership to another broker. However, this 
`Owned->Assigned(Non-Transfer)` transition will be filtered out as invalid. 
Eventually, the children bundle lookup will either timeout or correctly return 
the original owner broker when the first `Owned` msg arrives. Still, it will be 
safe to avoid this case if we can delay zk bundle partition updates til 
`getOwner(children bundles).join(timeout ).get() == this.broker.` 
   
   case 3: when lookup children(after updating zk bundle partitions) after the 
broker receives children `Owned` in its table-view
   => happy case: the broker immediately returns the lookup result.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to