This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 65af6ce [Branch-2.9] Fix Broker HealthCheck Endpoint Exposes Race
Conditions. (#14658)
65af6ce is described below
commit 65af6ce2aac7585cbbedb4e40edb52ff0e6c1ad7
Author: ran <[email protected]>
AuthorDate: Mon Mar 14 08:51:21 2022 +0800
[Branch-2.9] Fix Broker HealthCheck Endpoint Exposes Race Conditions.
(#14658)
Original PR https://github.com/apache/pulsar/pull/14367
Because the PR https://github.com/apache/pulsar/pull/14367 is based on PR
https://github.com/apache/pulsar/pull/14091, so I want to cherry-pick these two
PRs to branch-2.9, the PR https://github.com/apache/pulsar/pull/13525 is also
needed.
---
Fix Issue: #14362
### Motivation
According to relative PR #7724, we will force delete all subscriptions when
calling ``healthCheck`` REST API. but it has a race condition when two threads
call this API.
Please consider this case:
> Thread A: Clean up all subscriptions, then create a reader.
> Thread B: Prepare to force delete all subscriptions.
So, in this case, the reader of thread A is deleted and then an NPE or
other exception occurs.
### Modifications
- Use ``Completable#handle`` to fix problem 1, the reader needs to be
closed regardless of whether there is an exception.
- Recheck the subscription after closing reading, and force deletion if it
still exists after closing reading.
- Added multi-threaded tests for health checks.
---
.../org/apache/pulsar/broker/PulsarService.java | 3 +
.../pulsar/broker/admin/impl/BrokersBase.java | 223 +++++++++++----------
.../service/nonpersistent/NonPersistentTopic.java | 5 +
.../broker/service/persistent/PersistentTopic.java | 5 +
.../pulsar/broker/web/PulsarWebResource.java | 15 ++
.../broker/admin/AdminApiHealthCheckTest.java | 108 ++++++++--
.../org/apache/pulsar/common/util/FutureUtil.java | 14 ++
7 files changed, 256 insertions(+), 117 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index ba41784..56d8b2b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -272,6 +272,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
private volatile CompletableFuture<Void> closeFuture;
// key is listener name , value is pulsar address and pulsar ssl address
private Map<String, AdvertisedListener> advertisedListeners;
+ private NamespaceName heartbeatNamespaceV2;
public PulsarService(ServiceConfiguration config) {
this(config, Optional.empty(), (exitCode) -> {
@@ -683,6 +684,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
this.addWebServerHandlers(webService, metricsServlet, this.config);
this.webService.start();
+ heartbeatNamespaceV2 =
NamespaceService.getHeartbeatNamespaceV2(this.advertisedAddress, this.config);
// Refresh addresses and update configuration, since the port
might have been dynamically assigned
if (config.getBrokerServicePort().equals(Optional.of(0))) {
@@ -696,6 +698,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
this.brokerServiceUrl = brokerUrl(config);
this.brokerServiceUrlTls = brokerUrlTls(config);
+
if (null != this.webSocketService) {
ClusterDataImpl clusterData = ClusterDataImpl.builder()
.serviceUrl(webServiceAddress)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index bb9453e..cb579ec 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -24,9 +24,11 @@ import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -47,9 +49,10 @@ import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.LeaderBroker;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.Subscription;
+import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
-import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
@@ -294,123 +297,139 @@ public class BrokersBase extends PulsarWebResource {
@GET
@Path("/health")
- @ApiOperation(value = "Run a healthcheck against the broker")
+ @ApiOperation(value = "Run a healthCheck against the broker")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Everything is OK"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Cluster doesn't exist"),
@ApiResponse(code = 500, message = "Internal server error")})
@ApiParam(value = "Topic Version")
- public void healthcheck(@Suspended AsyncResponse asyncResponse,
- @QueryParam("topicVersion") TopicVersion
topicVersion) throws Exception {
- String topic;
- PulsarClient client;
- try {
- validateSuperUserAccess();
- NamespaceName heartbeatNamespace = (topicVersion ==
TopicVersion.V2)
- ?
- NamespaceService.getHeartbeatNamespaceV2(
- pulsar().getAdvertisedAddress(),
- pulsar().getConfiguration())
- :
- NamespaceService.getHeartbeatNamespace(
- pulsar().getAdvertisedAddress(),
- pulsar().getConfiguration());
-
-
- topic = String.format("persistent://%s/%s", heartbeatNamespace,
HEALTH_CHECK_TOPIC_SUFFIX);
-
- LOG.info("Running healthCheck with topic={}", topic);
-
- client = pulsar().getClient();
- } catch (Exception e) {
- LOG.error("Error getting heathcheck topic info", e);
- throw new PulsarServerException(e);
- }
+ public void healthCheck(@Suspended AsyncResponse asyncResponse,
+ @QueryParam("topicVersion") TopicVersion
topicVersion) {
+ validateSuperUserAccess();
+ internalRunHealthCheck(topicVersion)
+ .thenAccept(__ -> {
+ LOG.info("[{}] Successfully run health check.",
clientAppId());
+ asyncResponse.resume("ok");
+ }).exceptionally(ex -> {
+ LOG.error("[{}] Fail to run health check.", clientAppId(),
ex);
+ return handleCommonRestAsyncException(asyncResponse, ex);
+ });
+ }
- String messageStr = UUID.randomUUID().toString();
+ private CompletableFuture<Void> internalRunHealthCheck(TopicVersion
topicVersion) {
+ NamespaceName namespaceName = (topicVersion == TopicVersion.V2)
+ ?
NamespaceService.getHeartbeatNamespaceV2(pulsar().getAdvertisedAddress(),
pulsar().getConfiguration())
+ :
NamespaceService.getHeartbeatNamespace(pulsar().getAdvertisedAddress(),
pulsar().getConfiguration());
+ final String topicName = String.format("persistent://%s/%s",
namespaceName, HEALTH_CHECK_TOPIC_SUFFIX);
+ LOG.info("[{}] Running healthCheck with topic={}", clientAppId(),
topicName);
+ final String messageStr = UUID.randomUUID().toString();
+ final String subscriptionName = "healthCheck-" + messageStr;
// create non-partitioned topic manually and close the previous reader
if present.
- try {
- pulsar().getBrokerService().getTopic(topic,
true).get().ifPresent(t -> {
- t.getSubscriptions().forEach((__, value) -> {
- try {
- value.deleteForcefully();
- } catch (Exception e) {
- LOG.warn("Failed to delete previous subscription {}
for health check", value.getName(), e);
- }
- });
+ return pulsar().getBrokerService().getTopic(topicName, true)
+ .thenCompose(topicOptional -> {
+ if (!topicOptional.isPresent()) {
+ LOG.error("[{}] Fail to run health check while get topic
{}. because get null value.",
+ clientAppId(), topicName);
+ throw new RestException(Status.NOT_FOUND,
+ String.format("Topic [%s] not found after
create.", topicName));
+ }
+ PulsarClient client;
+ try {
+ client = pulsar().getClient();
+ } catch (PulsarServerException e) {
+ LOG.error("[{}] Fail to run health check while get
client.", clientAppId());
+ throw new RestException(e);
+ }
+ CompletableFuture<Void> resultFuture = new
CompletableFuture<>();
+
client.newProducer(Schema.STRING).topic(topicName).createAsync()
+ .thenCompose(producer ->
client.newReader(Schema.STRING).topic(topicName)
+ .subscriptionName(subscriptionName)
+ .startMessageId(MessageId.latest)
+ .createAsync().exceptionally(createException
-> {
+ producer.closeAsync().exceptionally(ex -> {
+ LOG.error("[{}] Close producer fail
while heath check.", clientAppId());
+ return null;
+ });
+ throw
FutureUtil.wrapToCompletionException(createException);
+ }).thenCompose(reader ->
producer.sendAsync(messageStr)
+ .thenCompose(__ ->
healthCheckRecursiveReadNext(reader, messageStr))
+ .whenComplete((__, ex) -> {
+ closeAndReCheck(producer, reader,
topicOptional.get(), subscriptionName)
+ .whenComplete((unused,
innerEx) -> {
+ if (ex != null) {
+
resultFuture.completeExceptionally(ex);
+ } else {
+
resultFuture.complete(null);
+ }
+ });
+ }
+ ))
+ ).exceptionally(ex -> {
+ resultFuture.completeExceptionally(ex);
+ return null;
+ });
+ return resultFuture;
});
- } catch (Exception e) {
- LOG.warn("Failed to try to delete subscriptions for health check",
e);
- }
-
- CompletableFuture<Producer<String>> producerFuture =
- client.newProducer(Schema.STRING).topic(topic).createAsync();
- CompletableFuture<Reader<String>> readerFuture =
client.newReader(Schema.STRING)
- .topic(topic).startMessageId(MessageId.latest).createAsync();
-
- CompletableFuture<Void> completePromise = new CompletableFuture<>();
+ }
- CompletableFuture.allOf(producerFuture, readerFuture).whenComplete(
- (ignore, exception) -> {
- if (exception != null) {
- completePromise.completeExceptionally(exception);
+ /**
+ * Close producer and reader and then to re-check if this operation is
success.
+ *
+ * Re-check
+ * - Producer: If close fails we will print error log to notify user.
+ * - Consumer: If close fails we will force delete subscription.
+ *
+ * @param producer Producer
+ * @param reader Reader
+ * @param topic Topic
+ * @param subscriptionName Subscription name
+ */
+ private CompletableFuture<Void> closeAndReCheck(Producer<String> producer,
Reader<String> reader,
+ Topic topic, String
subscriptionName) {
+ // no matter exception or success, we still need to
+ // close producer/reader
+ CompletableFuture<Void> producerFuture = producer.closeAsync();
+ CompletableFuture<Void> readerFuture = reader.closeAsync();
+ List<CompletableFuture<Void>> futures = new ArrayList<>(2);
+ futures.add(producerFuture);
+ futures.add(readerFuture);
+ return FutureUtil.waitForAll(Collections.unmodifiableList(futures))
+ .exceptionally(closeException -> {
+ if (readerFuture.isCompletedExceptionally()) {
+ LOG.error("[{}] Close reader fail while heath check.",
clientAppId());
+ Subscription subscription =
+ topic.getSubscription(subscriptionName);
+ // re-check subscription after reader close
+ if (subscription != null) {
+ LOG.warn("[{}] Force delete subscription {} "
+ + "when it still exists after the"
+ + " reader is closed.",
+ clientAppId(), subscription);
+ subscription.deleteForcefully()
+ .exceptionally(ex -> {
+ LOG.error("[{}] Force delete
subscription fail"
+ + " while health
check",
+ clientAppId(), ex);
+ return null;
+ });
+ }
} else {
- producerFuture.thenCompose((producer) ->
producer.sendAsync(messageStr))
- .whenComplete((ignore2, exception2) -> {
- if (exception2 != null) {
-
completePromise.completeExceptionally(exception2);
- }
- });
-
- healthcheckReadLoop(readerFuture, completePromise,
messageStr);
-
- // timeout read loop after 10 seconds
- FutureUtil.addTimeoutHandling(completePromise,
- HEALTHCHECK_READ_TIMEOUT,
pulsar().getExecutor(),
- () -> FutureUtil.createTimeoutException("Timed
out reading", getClass(),
- "healthcheck(...)"));
+ // producer future fail.
+ LOG.error("[{}] Close producer fail while heath
check.", clientAppId());
}
+ return null;
});
+ }
- completePromise.whenComplete((ignore, exception) -> {
- producerFuture.thenAccept((producer) -> {
- producer.closeAsync().whenComplete((ignore2, exception2) -> {
- if (exception2 != null) {
- LOG.warn("Error closing producer for healthcheck",
exception2);
+ private CompletableFuture<Void>
healthCheckRecursiveReadNext(Reader<String> reader, String content) {
+ return reader.readNextAsync()
+ .thenCompose(msg -> {
+ if (!Objects.equals(content, msg.getValue())) {
+ return healthCheckRecursiveReadNext(reader, content);
}
+ return CompletableFuture.completedFuture(null);
});
- });
- readerFuture.thenAccept((reader) -> {
- reader.closeAsync().whenComplete((ignore2, exception2) -> {
- if (exception2 != null) {
- LOG.warn("Error closing reader for healthcheck",
exception2);
- }
- });
- });
- if (exception != null) {
- asyncResponse.resume(new RestException(exception));
- } else {
- asyncResponse.resume("ok");
- }
- });
- }
-
- private void healthcheckReadLoop(CompletableFuture<Reader<String>>
readerFuture,
- CompletableFuture<?> completablePromise,
- String messageStr) {
- readerFuture.thenAccept((reader) -> {
- CompletableFuture<Message<String>> readFuture =
reader.readNextAsync()
- .whenComplete((m, exception) -> {
- if (exception != null) {
-
completablePromise.completeExceptionally(exception);
- } else if (m.getValue().equals(messageStr)) {
- completablePromise.complete(null);
- } else {
- healthcheckReadLoop(readerFuture,
completablePromise, messageStr);
- }
- });
- });
}
private synchronized void deleteDynamicConfigurationOnZk(String
configName) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 808b403..4a5aa1f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -66,6 +66,7 @@ import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
+import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import
org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats.CursorStats;
@@ -497,6 +498,10 @@ public class NonPersistentTopic extends AbstractTopic
implements Topic {
if (!name.isGlobal()) {
return CompletableFuture.completedFuture(null);
}
+ NamespaceName heartbeatNamespace =
brokerService.pulsar().getHeartbeatNamespaceV2();
+ if (name.getNamespaceObject().equals(heartbeatNamespace)) {
+ return CompletableFuture.completedFuture(null);
+ }
if (log.isDebugEnabled()) {
log.debug("[{}] Checking replication status", name);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index e96fdc4..e6cf77f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -126,6 +126,7 @@ import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.events.EventsTopicNames;
+import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
@@ -1354,6 +1355,10 @@ public class PersistentTopic extends AbstractTopic
if (!name.isGlobal()) {
return CompletableFuture.completedFuture(null);
}
+ NamespaceName heartbeatNamespace =
brokerService.pulsar().getHeartbeatNamespaceV2();
+ if (name.getNamespaceObject().equals(heartbeatNamespace)) {
+ return CompletableFuture.completedFuture(null);
+ }
if (log.isDebugEnabled()) {
log.debug("[{}] Checking replication status", name);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 8b6f30b..e3ae827 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -38,6 +38,7 @@ import java.util.concurrent.TimeoutException;
import javax.servlet.ServletContext;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
@@ -709,6 +710,11 @@ public abstract class PulsarWebResource {
if (!namespace.isGlobal()) {
return CompletableFuture.completedFuture(null);
}
+ NamespaceName heartbeatNamespace =
pulsarService.getHeartbeatNamespaceV2();
+ if (namespace.equals(heartbeatNamespace)) {
+ return CompletableFuture.completedFuture(null);
+ }
+
final CompletableFuture<ClusterDataImpl> validationFuture = new
CompletableFuture<>();
final String localCluster =
pulsarService.getConfiguration().getClusterName();
@@ -1055,4 +1061,13 @@ public abstract class PulsarWebResource {
}
}
+ protected Void handleCommonRestAsyncException(AsyncResponse asyncResponse,
Throwable ex) {
+ Throwable realCause = FutureUtil.unwrapCompletionException(ex);
+ if (realCause instanceof WebApplicationException) {
+ asyncResponse.resume(realCause);
+ } else {
+ asyncResponse.resume(new RestException(realCause));
+ }
+ return null;
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java
index e0c887f..b4b97ca 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java
@@ -25,11 +25,15 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.TopicVersion;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.compaction.Compactor;
+import org.awaitility.Awaitility;
+import org.junit.Assert;
+import org.springframework.util.CollectionUtils;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-
-import java.net.URL;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
@Test(groups = "broker")
@Slf4j
@@ -58,26 +62,100 @@ public class AdminApiHealthCheckTest extends
MockedPulsarServiceBaseTest {
@Test
public void testHealthCheckup() throws Exception {
- admin.brokers().healthcheck();
+ final int times = 30;
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ pulsar.getExecutor().execute(() -> {
+ try {
+ for (int i = 0; i < times; i++) {
+ admin.brokers().healthcheck();
+ }
+ future.complete(null);
+ }catch (PulsarAdminException e) {
+ future.completeExceptionally(e);
+ }
+ });
+ for (int i = 0; i < times; i++) {
+ admin.brokers().healthcheck();
+ }
+ // To ensure we don't have any subscription
+ final String testHealthCheckTopic =
String.format("persistent://pulsar/test/localhost:%s/healthcheck",
+ pulsar.getConfig().getWebServicePort().get());
+ Awaitility.await().untilAsserted(() -> {
+ Assert.assertFalse(future.isCompletedExceptionally());
+ });
+ Awaitility.await().untilAsserted(() ->
+ Assert.assertTrue(CollectionUtils.isEmpty(admin.topics()
+ .getSubscriptions(testHealthCheckTopic).stream()
+ // All system topics are using compaction, even though
is not explicitly set in the policies.
+ .filter(v ->
!v.equals(Compactor.COMPACTION_SUBSCRIPTION))
+ .collect(Collectors.toList())
+ ))
+ );
}
@Test
public void testHealthCheckupV1() throws Exception {
- admin.brokers().healthcheck(TopicVersion.V1);
- }
-
- @Test(expectedExceptions = PulsarAdminException.class)
- public void testHealthCheckupV2Error() throws Exception {
- admin.brokers().healthcheck(TopicVersion.V2);
+ final int times = 30;
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ pulsar.getExecutor().execute(() -> {
+ try {
+ for (int i = 0; i < times; i++) {
+ admin.brokers().healthcheck(TopicVersion.V1);
+ }
+ future.complete(null);
+ }catch (PulsarAdminException e) {
+ future.completeExceptionally(e);
+ }
+ });
+ for (int i = 0; i < times; i++) {
+ admin.brokers().healthcheck(TopicVersion.V1);
+ }
+ final String testHealthCheckTopic =
String.format("persistent://pulsar/test/localhost:%s/healthcheck",
+ pulsar.getConfig().getWebServicePort().get());
+ Awaitility.await().untilAsserted(() -> {
+ Assert.assertFalse(future.isCompletedExceptionally());
+ });
+ // To ensure we don't have any subscription
+ Awaitility.await().untilAsserted(() ->
+ Assert.assertTrue(CollectionUtils.isEmpty(admin.topics()
+ .getSubscriptions(testHealthCheckTopic).stream()
+ // All system topics are using compaction, even though
is not explicitly set in the policies.
+ .filter(v ->
!v.equals(Compactor.COMPACTION_SUBSCRIPTION))
+ .collect(Collectors.toList())
+ ))
+ );
}
@Test
public void testHealthCheckupV2() throws Exception {
- final URL pulsarWebAddress = new URL(pulsar.getWebServiceAddress());
- final String targetNameSpace = "pulsar/" +
- pulsarWebAddress.getHost() + ":" + pulsarWebAddress.getPort();
- log.info("Target namespace for broker admin healthcheck V2 endpoint is
{}", targetNameSpace);
- admin.namespaces().createNamespace(targetNameSpace);
- admin.brokers().healthcheck(TopicVersion.V2);
+ final int times = 30;
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ pulsar.getExecutor().execute(() -> {
+ try {
+ for (int i = 0; i < times; i++) {
+ admin.brokers().healthcheck(TopicVersion.V2);
+ }
+ future.complete(null);
+ }catch (PulsarAdminException e) {
+ future.completeExceptionally(e);
+ }
+ });
+ for (int i = 0; i < times; i++) {
+ admin.brokers().healthcheck(TopicVersion.V2);
+ }
+ final String testHealthCheckTopic =
String.format("persistent://pulsar/localhost:%s/healthcheck",
+ pulsar.getConfig().getWebServicePort().get());
+ Awaitility.await().untilAsserted(() -> {
+ Assert.assertFalse(future.isCompletedExceptionally());
+ });
+ // To ensure we don't have any subscription
+ Awaitility.await().untilAsserted(() ->
+ Assert.assertTrue(CollectionUtils.isEmpty(admin.topics()
+ .getSubscriptions(testHealthCheckTopic).stream()
+ // All system topics are using compaction, even though
is not explicitly set in the policies.
+ .filter(v ->
!v.equals(Compactor.COMPACTION_SUBSCRIPTION))
+ .collect(Collectors.toList())
+ ))
+ );
}
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
index 3d2fcae..2cdd9fc 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
@@ -187,4 +187,18 @@ public class FutureUtil {
}
return Optional.empty();
}
+
+ /**
+ * Wrap throwable exception to CompletionException if that exception is
not an instance of CompletionException.
+ *
+ * @param throwable Exception
+ * @return CompletionException
+ */
+ public static CompletionException wrapToCompletionException(Throwable
throwable) {
+ if (throwable instanceof CompletionException) {
+ return (CompletionException) throwable;
+ } else {
+ return new CompletionException(throwable);
+ }
+ }
}