gaoran10 commented on code in PR #19546: URL: https://github.com/apache/pulsar/pull/19546#discussion_r1111056732
########## pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitState.java: ########## @@ -24,55 +24,37 @@ /** * Defines the possible states for service units. * - * The following diagram defines the valid state changes - * - * ┌───────────┐ - * ┌──────────┤ released │◄────────┐ - * │own └───────────┘ │release - * │ │ - * │ │ - * ▼ │ - * ┌────────┐ assign(transfer) ┌─────┴────┐ - * │ ├───────────────────►│ │ - * │ owned │ │ assigned │ - * │ │◄───────────────────┤ │ - * └──┬─────┤ own └──────────┘ - * │ ▲ │ ▲ - * │ │ │ │ - * │ │ └──────────────┐ │ - * │ │ │ │ - * │ │ unload │ │ assign(assignment) - * split │ │ │ │ - * │ │ │ │ - * │ │ create(child) │ │ - * │ │ │ │ - * ▼ │ │ │ - * ┌─────┴─────┐ └─────►┌───┴──────┐ - * │ │ │ │ - * │ splitting ├────────────────► │ free │ - * │ │ discard(parent)│ │ - * └───────────┘ └──────────┘ + * Refer to Service Unit State Channel in https://github.com/apache/pulsar/issues/16691 for additional details. */ public enum ServiceUnitState { - Free, // not owned by any broker (terminal state) + Init, // initializing the state. no previous state(terminal state) + + Disabled, // disabled by the owner broker + + Free, // not owned by any broker (semi-terminal state) Owned, // owned by a broker (terminal state) Assigned, // the ownership is assigned(but the assigned broker has not been notified the ownership yet) Released, // the source broker's ownership has been released (e.g. the topic connections are closed) - Splitting; // the service unit(e.g. bundle) is in the process of splitting. + Splitting, // the service unit(e.g. bundle) is in the process of splitting. + + Deleted; // deleted in the system (semi-terminal state) private static Map<ServiceUnitState, Set<ServiceUnitState>> validTransitions = Map.of( - // (Free -> Released | Splitting) transitions are required - // when the topic is compacted in the middle of transfer or split. - Free, Set.of(Owned, Assigned, Released, Splitting), - Owned, Set.of(Assigned, Splitting, Free), - Assigned, Set.of(Owned, Released, Free), - Released, Set.of(Owned, Free), - Splitting, Set.of(Free) + // (Init -> all states) transitions are required + // when the topic is compacted in the middle of assign, transfer or split. + Init, Set.of(Disabled, Owned, Assigned, Released, Splitting, Deleted, Init), + Disabled, Set.of(Free, Init), + Free, Set.of(Assigned, Init), + Owned, Set.of(Assigned, Splitting, Disabled, Init), Review Comment: Do `owend`, `Assigned`, `Released`, `Splitting`, `Disabled` state can transist to `Init` state? ########## pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java: ########## @@ -268,11 +338,13 @@ public synchronized void close() throws PulsarServerException { log.info("Successfully closed the channel producer."); } - // TODO: clean brokerRegistry + if (brokerRegistry != null) { + brokerRegistry = null; Review Comment: Do we need to close the broker registry? ########## pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitState.java: ########## @@ -24,55 +24,37 @@ /** * Defines the possible states for service units. * - * The following diagram defines the valid state changes - * - * ┌───────────┐ - * ┌──────────┤ released │◄────────┐ - * │own └───────────┘ │release - * │ │ - * │ │ - * ▼ │ - * ┌────────┐ assign(transfer) ┌─────┴────┐ - * │ ├───────────────────►│ │ - * │ owned │ │ assigned │ - * │ │◄───────────────────┤ │ - * └──┬─────┤ own └──────────┘ - * │ ▲ │ ▲ - * │ │ │ │ - * │ │ └──────────────┐ │ - * │ │ │ │ - * │ │ unload │ │ assign(assignment) - * split │ │ │ │ - * │ │ │ │ - * │ │ create(child) │ │ - * │ │ │ │ - * ▼ │ │ │ - * ┌─────┴─────┐ └─────►┌───┴──────┐ - * │ │ │ │ - * │ splitting ├────────────────► │ free │ - * │ │ discard(parent)│ │ - * └───────────┘ └──────────┘ + * Refer to Service Unit State Channel in https://github.com/apache/pulsar/issues/16691 for additional details. */ public enum ServiceUnitState { - Free, // not owned by any broker (terminal state) + Init, // initializing the state. no previous state(terminal state) + + Disabled, // disabled by the owner broker + + Free, // not owned by any broker (semi-terminal state) Owned, // owned by a broker (terminal state) Assigned, // the ownership is assigned(but the assigned broker has not been notified the ownership yet) Released, // the source broker's ownership has been released (e.g. the topic connections are closed) - Splitting; // the service unit(e.g. bundle) is in the process of splitting. + Splitting, // the service unit(e.g. bundle) is in the process of splitting. + + Deleted; // deleted in the system (semi-terminal state) private static Map<ServiceUnitState, Set<ServiceUnitState>> validTransitions = Map.of( - // (Free -> Released | Splitting) transitions are required - // when the topic is compacted in the middle of transfer or split. - Free, Set.of(Owned, Assigned, Released, Splitting), - Owned, Set.of(Assigned, Splitting, Free), - Assigned, Set.of(Owned, Released, Free), - Released, Set.of(Owned, Free), - Splitting, Set.of(Free) + // (Init -> all states) transitions are required + // when the topic is compacted in the middle of assign, transfer or split. + Init, Set.of(Disabled, Owned, Assigned, Released, Splitting, Deleted, Init), + Disabled, Set.of(Free, Init), Review Comment: Why need to add `Disabled` state, it seems only `Disabled` state can transist to `Free` state. ########## pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java: ########## @@ -207,14 +244,29 @@ public synchronized void start() throws PulsarServerException { throw new IllegalStateException("Invalid channel state:" + channelState.name()); } + boolean debug = debug(); try { + this.brokerRegistry = getBrokerRegistry(); + this.brokerRegistry.addListener((broker, type) -> { + if (type == NotificationType.Deleted) { Review Comment: There is a method `handleBrokerRegistrationEvent`, does it is useful? ########## pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java: ########## @@ -294,7 +366,7 @@ private boolean validateChannelState(ChannelState targetState, boolean checkLowe } private boolean debug() { - return pulsar.getConfiguration().isLoadBalancerDebugModeEnabled() || log.isDebugEnabled(); + return config.isLoadBalancerDebugModeEnabled() || log.isDebugEnabled(); Review Comment: I'm not sure why need to check log is in debug level or not, if the log in debug level, the logger will print too many logs, it's hard to check load balancer logs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
