This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 5506f50fa03 [fix][broker] Fix namespace unload might be blocked too
long with extensible load manager (#23433)
5506f50fa03 is described below
commit 5506f50fa036f1ad19c3e0abc03e74ecd5c0a665
Author: Yunze Xu <[email protected]>
AuthorDate: Fri Oct 11 00:14:27 2024 +0800
[fix][broker] Fix namespace unload might be blocked too long with
extensible load manager (#23433)
---
.../pulsar/broker/admin/impl/BrokersBase.java | 6 +++-
.../channel/ServiceUnitStateChannelImpl.java | 34 ++++++++++++++++++----
.../extensions/ExtensibleLoadManagerCloseTest.java | 10 +++++--
3 files changed, 41 insertions(+), 9 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index da4cee7b465..e397dbb64a0 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -388,7 +388,11 @@ public class BrokersBase extends AdminResource {
asyncResponse.resume(Response.ok("ok").build());
}).exceptionally(ex -> {
if (!isRedirectException(ex)) {
- LOG.error("[{}] Fail to run health check.",
clientAppId(), ex);
+ if (isNotFoundException(ex)) {
+ LOG.warn("[{}] Failed to run health check: {}",
clientAppId(), ex.getMessage());
+ } else {
+ LOG.error("[{}] Failed to run health check.",
clientAppId(), ex);
+ }
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index 49d038d512e..ea1bf01be5b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -87,6 +87,7 @@ import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
@@ -1291,6 +1292,14 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
broker, cleanupJobs.size());
}
}
+ })
+ .exceptionally(e -> {
+ if (FutureUtil.unwrapCompletionException(e) instanceof
PulsarAdminException.NotFoundException) {
+ log.warn("{} Failed to run health check: {}",
broker, e.getMessage());
+ } else {
+ log.error("{} Failed to run health check", broker,
e);
+ }
+ return null;
});
}
}
@@ -1323,12 +1332,19 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
}
}
+ private boolean channelDisabled() {
+ final var channelState = this.channelState;
+ if (channelState == Disabled || channelState == Closed) {
+ log.warn("[{}] Skip scheduleCleanup because the state is {} now",
brokerId, channelState);
+ return true;
+ }
+ return false;
+ }
+
private void scheduleCleanup(String broker, long delayInSecs) {
var scheduled = new MutableObject<CompletableFuture<Void>>();
try {
- final var channelState = this.channelState;
- if (channelState == Disabled || channelState == Closed) {
- log.warn("[{}] Skip scheduleCleanup because the state is {}
now", brokerId, channelState);
+ if (channelDisabled()) {
return;
}
cleanupJobs.computeIfAbsent(broker, k -> {
@@ -1462,6 +1478,10 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
}
private void doHealthCheckBrokerAsyncWithRetries(String brokerId, int
retry, CompletableFuture<Void> future) {
+ if (channelDisabled()) {
+ future.complete(null);
+ return;
+ }
try {
var admin = getPulsarAdmin();
admin.brokers().healthcheckAsync(TopicVersion.V2,
Optional.of(brokerId))
@@ -1472,7 +1492,6 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
return;
}
if (retry == MAX_BROKER_HEALTH_CHECK_RETRY) {
- log.error("Failed health-check broker :{}",
brokerId, e);
future.completeExceptionally(FutureUtil.unwrapCompletionException(e));
} else {
pulsar.getExecutor()
@@ -1509,7 +1528,12 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
return;
} catch (Exception e) {
if (debug()) {
- log.info("Failed to check broker:{} health", broker, e);
+ if (e instanceof ExecutionException
+ && e.getCause() instanceof
PulsarAdminException.NotFoundException) {
+ log.info("The broker {} is not healthy because it's
not found", broker);
+ } else {
+ log.info("Failed to check broker:{} health", broker,
e);
+ }
}
log.info("Checked the broker:{} health. Continue the orphan
bundle cleanup", broker);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java
index ca44f6bc4d6..c8427d1a66d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java
@@ -38,7 +38,7 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@Slf4j
-@Test(groups = "flaky")
+@Test(groups = "broker")
public class ExtensibleLoadManagerCloseTest {
private static final String clusterName = "test";
@@ -88,14 +88,18 @@ public class ExtensibleLoadManagerCloseTest {
config.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
config.setLoadBalancerDebugModeEnabled(true);
config.setBrokerShutdownTimeoutMs(100);
+
+ // Reduce these timeout configs to avoid failed tests being blocked
too long
+ config.setMetadataStoreOperationTimeoutSeconds(5);
+ config.setNamespaceBundleUnloadingTimeoutMs(5000);
return config;
}
- @Test
+ @Test(invocationCount = 10)
public void testCloseAfterLoadingBundles() throws Exception {
setupBrokers(3);
- final var topic = "test";
+ final var topic = "test-" + System.currentTimeMillis();
final var admin = brokers.get(0).getAdminClient();
admin.topics().createPartitionedTopic(topic, 20);
admin.lookups().lookupPartitionedTopic(topic);