Demogorgon314 commented on code in PR #19546:
URL: https://github.com/apache/pulsar/pull/19546#discussion_r1111661832


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -538,7 +616,15 @@ private void handleSplitEvent(String serviceUnit, 
ServiceUnitStateData data) {
         }
     }
 
-    private void handleFreeEvent(String serviceUnit) {
+    private void handleDisableEvent(String serviceUnit, ServiceUnitStateData 
data) {

Review Comment:
   This method was never used. Please remove it.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -522,12 +590,22 @@ private void handleAssignEvent(String serviceUnit, 
ServiceUnitStateData data) {
     }
 
     private void handleReleaseEvent(String serviceUnit, ServiceUnitStateData 
data) {
-        if (isTargetBroker(data.sourceBroker())) {
-            ServiceUnitStateData next = new ServiceUnitStateData(Owned, 
data.broker(), data.sourceBroker());
-            // TODO: when close, pass message to clients to connect to the new 
broker
-            closeServiceUnit(serviceUnit)
-                    .thenCompose(__ -> pubAsync(serviceUnit, next))
-                    .whenComplete((__, e) -> log(e, serviceUnit, data, next));
+
+        if (isTransferCommand(data)) {
+            if (isTargetBroker(data.sourceBroker())) {
+                ServiceUnitStateData next = new ServiceUnitStateData(Owned, 
data.broker(), data.sourceBroker());
+                // TODO: when close, pass message to clients to connect to the 
new broker
+                closeServiceUnit(serviceUnit)
+                        .thenCompose(__ -> pubAsync(serviceUnit, next))
+                        .whenComplete((__, e) -> log(e, serviceUnit, data, 
next));
+            }
+        } else {
+            if (isTargetBroker(data.broker())) {
+                ServiceUnitStateData next = new ServiceUnitStateData(Free, 
data.broker());
+                closeServiceUnit(serviceUnit)

Review Comment:
   Since we already closed the service unit here, do we need to close the 
service unit again in `handleInitEvent`?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -522,12 +590,22 @@ private void handleAssignEvent(String serviceUnit, 
ServiceUnitStateData data) {
     }
 
     private void handleReleaseEvent(String serviceUnit, ServiceUnitStateData 
data) {
-        if (isTargetBroker(data.sourceBroker())) {
-            ServiceUnitStateData next = new ServiceUnitStateData(Owned, 
data.broker(), data.sourceBroker());
-            // TODO: when close, pass message to clients to connect to the new 
broker
-            closeServiceUnit(serviceUnit)
-                    .thenCompose(__ -> pubAsync(serviceUnit, next))
-                    .whenComplete((__, e) -> log(e, serviceUnit, data, next));
+
+        if (isTransferCommand(data)) {
+            if (isTargetBroker(data.sourceBroker())) {
+                ServiceUnitStateData next = new ServiceUnitStateData(Owned, 
data.broker(), data.sourceBroker());
+                // TODO: when close, pass message to clients to connect to the 
new broker
+                closeServiceUnit(serviceUnit)
+                        .thenCompose(__ -> pubAsync(serviceUnit, next))
+                        .whenComplete((__, e) -> log(e, serviceUnit, data, 
next));
+            }
+        } else {
+            if (isTargetBroker(data.broker())) {

Review Comment:
   I have been thinking, should we rename the `sourceBroker` to `destBroker`, 
since the `broker` will usually be treated as "sourceBroker".
   
   Then we can change the logic like this:
   
   ```
           if (isTargetBroker(data.broker())) {
               if (isTransferCommand(data)) {
                   ServiceUnitStateData next = new ServiceUnitStateData(Owned, 
data.broker(), data.destBroker());
                   // TODO: when close, pass message to clients to connect to 
the new broker
                   closeServiceUnit(serviceUnit)
                           .thenCompose(__ -> pubAsync(serviceUnit, next))
                           .whenComplete((__, e) -> log(e, serviceUnit, data, 
next));
               } else {
                   ServiceUnitStateData next = new ServiceUnitStateData(Free, 
data.broker());
                   closeServiceUnit(serviceUnit)
                           .thenCompose(__ -> pubAsync(serviceUnit, next))
                           .whenComplete((__, e) -> log(e, serviceUnit, data, 
next));
               }
           }
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitState.java:
##########
@@ -24,55 +24,34 @@
 /**
  * Defines the possible states for service units.
  *
- * The following diagram defines the valid state changes
- *
- *                  ┌───────────┐
- *       ┌──────────┤ released  │◄────────┐
- *       │own       └───────────┘         │release
- *       │                                │
- *       │                                │
- *       ▼                                │
- *    ┌────────┐  assign(transfer)  ┌─────┴────┐
- *    │        ├───────────────────►│          │
- *    │ owned  │                    │ assigned │
- *    │        │◄───────────────────┤          │
- *    └──┬─────┤      own           └──────────┘
- *       │  ▲  │                         ▲
- *       │  │  │                         │
- *       │  │  └──────────────┐          │
- *       │  │                 │          │
- *       │  │        unload   │          │ assign(assignment)
- * split │  │                 │          │
- *       │  │                 │          │
- *       │  │ create(child)   │          │
- *       │  │                 │          │
- *       ▼  │                 │          │
- *    ┌─────┴─────┐           └─────►┌───┴──────┐
- *    │           │                  │          │
- *    │ splitting ├────────────────► │   free   │
- *    │           │   discard(parent)│          │
- *    └───────────┘                  └──────────┘
+ * Refer to Service Unit State Channel in 
https://github.com/apache/pulsar/issues/16691 for additional details.
  */
 public enum ServiceUnitState {
 
-    Free, // not owned by any broker (terminal state)
+    Init, // initializing the state. no previous state(terminal state)
+
+    Free, // not owned by any broker (semi-terminal state)
 
     Owned, // owned by a broker (terminal state)
 
     Assigned, // the ownership is assigned(but the assigned broker has not 
been notified the ownership yet)
 
     Released, // the source broker's ownership has been released (e.g. the 
topic connections are closed)
 
-    Splitting; // the service unit(e.g. bundle) is in the process of splitting.
+    Splitting, // the service unit(e.g. bundle) is in the process of splitting.
+
+    Disabled; // disabled in the system (semi-terminal state)
 
     private static Map<ServiceUnitState, Set<ServiceUnitState>> 
validTransitions = Map.of(

Review Comment:
   ```suggestion
       private static final Map<ServiceUnitState, Set<ServiceUnitState>> 
validTransitions = Map.of(
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -207,14 +243,23 @@ public synchronized void start() throws 
PulsarServerException {
             throw new IllegalStateException("Invalid channel state:" + 
channelState.name());
         }
 
+        boolean debug = debug();
         try {
+            this.brokerRegistry = getBrokerRegistry();
+            this.brokerRegistry.addListener((broker, type) -> {
+                handleBrokerRegistrationEvent(broker, type);
+            });

Review Comment:
   ```suggestion
               
this.brokerRegistry.addListener(this::handleBrokerRegistrationEvent);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to