This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit edeb4e40063c51c5ac35bfb6da4d80ae11b66f10 Author: sinan liu <[email protected]> AuthorDate: Sun Sep 28 23:12:08 2025 +0800 [fix][broker] Flaky-test: ExtensibleLoadManagerImplTest.testDisableBroker (#24770) (cherry picked from commit e44e084faf66738a22d61892a86d9fc9943bc484) --- .../channel/ServiceUnitStateChannelImpl.java | 27 +++++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index 8247e566606..8e992095c52 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -460,7 +460,32 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel { String serviceUnit, ServiceUnitState state, Optional<String> owner) { - + // When the channel is disabled/closed, do not perform liveness verification, return according to the status: + if (channelState == Disabled || channelState == Closed) { + switch (state) { + // Owned/Splitting: Directly return owner (for isOwner judgment as true) + case Owned: + case Splitting: + return CompletableFuture.completedFuture(owner); + case Assigning: + case Releasing: + if (owner.isPresent()) { + if (isTargetBroker(owner.get())) { + // This machine is the target taker, + // return an unfinished future with "waiting for ownership" + return dedupeGetOwnerRequest(serviceUnit).thenApply(Optional::ofNullable); + } else { + // The target is another broker, return directly so that the upper layer can redirect + return CompletableFuture.completedFuture(owner); + } + } else { + return CompletableFuture.completedFuture(Optional.empty()); + } + // Other status: return empty + default: + return CompletableFuture.completedFuture(Optional.empty()); + } + } // If this broker's registry does not exist(possibly suffering from connecting to the metadata store), // we return the owner without its activeness check. // This broker tries to serve lookups on a best efforts basis when metadata store connection is unstable.
