This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5506f50fa03 [fix][broker] Fix namespace unload might be blocked too 
long with extensible load manager (#23433)
5506f50fa03 is described below

commit 5506f50fa036f1ad19c3e0abc03e74ecd5c0a665
Author: Yunze Xu <[email protected]>
AuthorDate: Fri Oct 11 00:14:27 2024 +0800

    [fix][broker] Fix namespace unload might be blocked too long with 
extensible load manager (#23433)
---
 .../pulsar/broker/admin/impl/BrokersBase.java      |  6 +++-
 .../channel/ServiceUnitStateChannelImpl.java       | 34 ++++++++++++++++++----
 .../extensions/ExtensibleLoadManagerCloseTest.java | 10 +++++--
 3 files changed, 41 insertions(+), 9 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index da4cee7b465..e397dbb64a0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -388,7 +388,11 @@ public class BrokersBase extends AdminResource {
                     asyncResponse.resume(Response.ok("ok").build());
                 }).exceptionally(ex -> {
                     if (!isRedirectException(ex)) {
-                        LOG.error("[{}] Fail to run health check.", 
clientAppId(), ex);
+                        if (isNotFoundException(ex)) {
+                            LOG.warn("[{}] Failed to run health check: {}", 
clientAppId(), ex.getMessage());
+                        } else {
+                            LOG.error("[{}] Failed to run health check.", 
clientAppId(), ex);
+                        }
                     }
                     resumeAsyncResponseExceptionally(asyncResponse, ex);
                     return null;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index 49d038d512e..ea1bf01be5b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -87,6 +87,7 @@ import org.apache.pulsar.broker.namespace.LookupOptions;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceBundleFactory;
@@ -1291,6 +1292,14 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
                                         broker, cleanupJobs.size());
                             }
                         }
+                    })
+                    .exceptionally(e -> {
+                        if (FutureUtil.unwrapCompletionException(e) instanceof 
PulsarAdminException.NotFoundException) {
+                            log.warn("{} Failed to run health check: {}", 
broker, e.getMessage());
+                        } else {
+                            log.error("{} Failed to run health check", broker, 
e);
+                        }
+                        return null;
                     });
         }
     }
@@ -1323,12 +1332,19 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
         }
     }
 
+    private boolean channelDisabled() {
+        final var channelState = this.channelState;
+        if (channelState == Disabled || channelState == Closed) {
+            log.warn("[{}] Skip scheduleCleanup because the state is {} now", 
brokerId, channelState);
+            return true;
+        }
+        return false;
+    }
+
     private void scheduleCleanup(String broker, long delayInSecs) {
         var scheduled = new MutableObject<CompletableFuture<Void>>();
         try {
-            final var channelState = this.channelState;
-            if (channelState == Disabled || channelState == Closed) {
-                log.warn("[{}] Skip scheduleCleanup because the state is {} 
now", brokerId, channelState);
+            if (channelDisabled()) {
                 return;
             }
             cleanupJobs.computeIfAbsent(broker, k -> {
@@ -1462,6 +1478,10 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
     }
 
     private void doHealthCheckBrokerAsyncWithRetries(String brokerId, int 
retry, CompletableFuture<Void> future) {
+        if (channelDisabled()) {
+            future.complete(null);
+            return;
+        }
         try {
             var admin = getPulsarAdmin();
             admin.brokers().healthcheckAsync(TopicVersion.V2, 
Optional.of(brokerId))
@@ -1472,7 +1492,6 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
                             return;
                         }
                         if (retry == MAX_BROKER_HEALTH_CHECK_RETRY) {
-                            log.error("Failed health-check broker :{}", 
brokerId, e);
                             
future.completeExceptionally(FutureUtil.unwrapCompletionException(e));
                         } else {
                             pulsar.getExecutor()
@@ -1509,7 +1528,12 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
                 return;
             } catch (Exception e) {
                 if (debug()) {
-                    log.info("Failed to check broker:{} health", broker, e);
+                    if (e instanceof ExecutionException
+                            && e.getCause() instanceof 
PulsarAdminException.NotFoundException) {
+                        log.info("The broker {} is not healthy because it's 
not found", broker);
+                    } else {
+                        log.info("Failed to check broker:{} health", broker, 
e);
+                    }
                 }
                 log.info("Checked the broker:{} health. Continue the orphan 
bundle cleanup", broker);
             }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java
index ca44f6bc4d6..c8427d1a66d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java
@@ -38,7 +38,7 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 @Slf4j
-@Test(groups = "flaky")
+@Test(groups = "broker")
 public class ExtensibleLoadManagerCloseTest {
 
     private static final String clusterName = "test";
@@ -88,14 +88,18 @@ public class ExtensibleLoadManagerCloseTest {
         
config.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
         config.setLoadBalancerDebugModeEnabled(true);
         config.setBrokerShutdownTimeoutMs(100);
+
+        // Reduce these timeout configs to avoid failed tests being blocked 
too long
+        config.setMetadataStoreOperationTimeoutSeconds(5);
+        config.setNamespaceBundleUnloadingTimeoutMs(5000);
         return config;
     }
 
 
-    @Test
+    @Test(invocationCount = 10)
     public void testCloseAfterLoadingBundles() throws Exception {
         setupBrokers(3);
-        final var topic = "test";
+        final var topic = "test-" + System.currentTimeMillis();
         final var admin = brokers.get(0).getAdminClient();
         admin.topics().createPartitionedTopic(topic, 20);
         admin.lookups().lookupPartitionedTopic(topic);

Reply via email to