This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit edeb4e40063c51c5ac35bfb6da4d80ae11b66f10
Author: sinan liu <[email protected]>
AuthorDate: Sun Sep 28 23:12:08 2025 +0800

    [fix][broker] Flaky-test: ExtensibleLoadManagerImplTest.testDisableBroker 
(#24770)
    
    (cherry picked from commit e44e084faf66738a22d61892a86d9fc9943bc484)
---
 .../channel/ServiceUnitStateChannelImpl.java       | 27 +++++++++++++++++++++-
 1 file changed, 26 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index 8247e566606..8e992095c52 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -460,7 +460,32 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
             String serviceUnit,
             ServiceUnitState state,
             Optional<String> owner) {
-
+        // When the channel is disabled/closed, do not perform liveness 
verification, return according to the status:
+        if (channelState == Disabled || channelState == Closed) {
+            switch (state) {
+                // Owned/Splitting: Directly return owner (for isOwner 
judgment as true)
+                case Owned:
+                case Splitting:
+                    return CompletableFuture.completedFuture(owner);
+                case Assigning:
+                case Releasing:
+                    if (owner.isPresent()) {
+                        if (isTargetBroker(owner.get())) {
+                            // This machine is the target taker,
+                            // return an unfinished future with "waiting for 
ownership"
+                            return 
dedupeGetOwnerRequest(serviceUnit).thenApply(Optional::ofNullable);
+                        } else {
+                            // The target is another broker, return directly 
so that the upper layer can redirect
+                            return CompletableFuture.completedFuture(owner);
+                        }
+                    } else {
+                        return 
CompletableFuture.completedFuture(Optional.empty());
+                    }
+                // Other status: return empty
+                default:
+                    return CompletableFuture.completedFuture(Optional.empty());
+            }
+        }
         // If this broker's registry does not exist(possibly suffering from 
connecting to the metadata store),
         // we return the owner without its activeness check.
         // This broker tries to serve lookups on a best efforts basis when 
metadata store connection is unstable.

Reply via email to