This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 65af6ce  [Branch-2.9] Fix Broker HealthCheck Endpoint Exposes Race 
Conditions. (#14658)
65af6ce is described below

commit 65af6ce2aac7585cbbedb4e40edb52ff0e6c1ad7
Author: ran <[email protected]>
AuthorDate: Mon Mar 14 08:51:21 2022 +0800

    [Branch-2.9] Fix Broker HealthCheck Endpoint Exposes Race Conditions. 
(#14658)
    
    Original PR https://github.com/apache/pulsar/pull/14367
    
    Because the PR https://github.com/apache/pulsar/pull/14367 is based on PR 
https://github.com/apache/pulsar/pull/14091, so I want to cherry-pick these two 
PRs to branch-2.9, the PR https://github.com/apache/pulsar/pull/13525 is also 
needed.
    
    ---
    
    Fix Issue: #14362
    
    ### Motivation
    
    According to relative PR #7724, we will force delete all subscriptions when 
calling ``healthCheck`` REST API. but it has a race condition when two threads 
call this API.
    Please consider this case:
    
    > Thread A: Clean up all subscriptions, then create a reader.
    > Thread B: Prepare to force delete all subscriptions.
    
    So, in this case, the reader of thread A is deleted and then an NPE or 
other exception occurs.
    
    ### Modifications
    
    - Use ``Completable#handle`` to fix problem 1, the reader needs to be 
closed regardless of whether there is an exception.
    - Recheck the subscription after closing reading, and force deletion if it 
still exists after closing reading.
    - Added multi-threaded tests for health checks.
---
 .../org/apache/pulsar/broker/PulsarService.java    |   3 +
 .../pulsar/broker/admin/impl/BrokersBase.java      | 223 +++++++++++----------
 .../service/nonpersistent/NonPersistentTopic.java  |   5 +
 .../broker/service/persistent/PersistentTopic.java |   5 +
 .../pulsar/broker/web/PulsarWebResource.java       |  15 ++
 .../broker/admin/AdminApiHealthCheckTest.java      | 108 ++++++++--
 .../org/apache/pulsar/common/util/FutureUtil.java  |  14 ++
 7 files changed, 256 insertions(+), 117 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index ba41784..56d8b2b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -272,6 +272,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
     private volatile CompletableFuture<Void> closeFuture;
     // key is listener name , value is pulsar address and pulsar ssl address
     private Map<String, AdvertisedListener> advertisedListeners;
+    private NamespaceName heartbeatNamespaceV2;
 
     public PulsarService(ServiceConfiguration config) {
         this(config, Optional.empty(), (exitCode) -> {
@@ -683,6 +684,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
 
             this.addWebServerHandlers(webService, metricsServlet, this.config);
             this.webService.start();
+            heartbeatNamespaceV2 = 
NamespaceService.getHeartbeatNamespaceV2(this.advertisedAddress, this.config);
 
             // Refresh addresses and update configuration, since the port 
might have been dynamically assigned
             if (config.getBrokerServicePort().equals(Optional.of(0))) {
@@ -696,6 +698,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
             this.brokerServiceUrl = brokerUrl(config);
             this.brokerServiceUrlTls = brokerUrlTls(config);
 
+
             if (null != this.webSocketService) {
                 ClusterDataImpl clusterData = ClusterDataImpl.builder()
                         .serviceUrl(webServiceAddress)
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index bb9453e..cb579ec 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -24,9 +24,11 @@ import io.swagger.annotations.ApiParam;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -47,9 +49,10 @@ import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.loadbalance.LeaderBroker;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.Subscription;
+import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.web.RestException;
-import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
@@ -294,123 +297,139 @@ public class BrokersBase extends PulsarWebResource {
 
     @GET
     @Path("/health")
-    @ApiOperation(value = "Run a healthcheck against the broker")
+    @ApiOperation(value = "Run a healthCheck against the broker")
     @ApiResponses(value = {
         @ApiResponse(code = 200, message = "Everything is OK"),
         @ApiResponse(code = 403, message = "Don't have admin permission"),
         @ApiResponse(code = 404, message = "Cluster doesn't exist"),
         @ApiResponse(code = 500, message = "Internal server error")})
     @ApiParam(value = "Topic Version")
-    public void healthcheck(@Suspended AsyncResponse asyncResponse,
-                            @QueryParam("topicVersion") TopicVersion 
topicVersion) throws Exception {
-        String topic;
-        PulsarClient client;
-        try {
-            validateSuperUserAccess();
-            NamespaceName heartbeatNamespace = (topicVersion == 
TopicVersion.V2)
-                    ?
-                    NamespaceService.getHeartbeatNamespaceV2(
-                            pulsar().getAdvertisedAddress(),
-                            pulsar().getConfiguration())
-                    :
-                    NamespaceService.getHeartbeatNamespace(
-                            pulsar().getAdvertisedAddress(),
-                            pulsar().getConfiguration());
-
-
-            topic = String.format("persistent://%s/%s", heartbeatNamespace, 
HEALTH_CHECK_TOPIC_SUFFIX);
-
-            LOG.info("Running healthCheck with topic={}", topic);
-
-            client = pulsar().getClient();
-        } catch (Exception e) {
-            LOG.error("Error getting heathcheck topic info", e);
-            throw new PulsarServerException(e);
-        }
+    public void healthCheck(@Suspended AsyncResponse asyncResponse,
+                            @QueryParam("topicVersion") TopicVersion 
topicVersion) {
+        validateSuperUserAccess();
+        internalRunHealthCheck(topicVersion)
+                .thenAccept(__ -> {
+                    LOG.info("[{}] Successfully run health check.", 
clientAppId());
+                    asyncResponse.resume("ok");
+                }).exceptionally(ex -> {
+                    LOG.error("[{}] Fail to run health check.", clientAppId(), 
ex);
+                    return handleCommonRestAsyncException(asyncResponse, ex);
+                });
+    }
 
-        String messageStr = UUID.randomUUID().toString();
+    private CompletableFuture<Void> internalRunHealthCheck(TopicVersion 
topicVersion) {
+        NamespaceName namespaceName = (topicVersion == TopicVersion.V2)
+                ? 
NamespaceService.getHeartbeatNamespaceV2(pulsar().getAdvertisedAddress(), 
pulsar().getConfiguration())
+                : 
NamespaceService.getHeartbeatNamespace(pulsar().getAdvertisedAddress(), 
pulsar().getConfiguration());
+        final String topicName = String.format("persistent://%s/%s", 
namespaceName, HEALTH_CHECK_TOPIC_SUFFIX);
+        LOG.info("[{}] Running healthCheck with topic={}", clientAppId(), 
topicName);
+        final String messageStr = UUID.randomUUID().toString();
+        final String subscriptionName = "healthCheck-" + messageStr;
         // create non-partitioned topic manually and close the previous reader 
if present.
-        try {
-            pulsar().getBrokerService().getTopic(topic, 
true).get().ifPresent(t -> {
-                t.getSubscriptions().forEach((__, value) -> {
-                    try {
-                        value.deleteForcefully();
-                    } catch (Exception e) {
-                        LOG.warn("Failed to delete previous subscription {} 
for health check", value.getName(), e);
-                    }
-                });
+        return pulsar().getBrokerService().getTopic(topicName, true)
+            .thenCompose(topicOptional -> {
+                if (!topicOptional.isPresent()) {
+                    LOG.error("[{}] Fail to run health check while get topic 
{}. because get null value.",
+                            clientAppId(), topicName);
+                    throw new RestException(Status.NOT_FOUND,
+                            String.format("Topic [%s] not found after 
create.", topicName));
+                }
+                PulsarClient client;
+                try {
+                    client = pulsar().getClient();
+                } catch (PulsarServerException e) {
+                    LOG.error("[{}] Fail to run health check while get 
client.", clientAppId());
+                    throw new RestException(e);
+                }
+                CompletableFuture<Void> resultFuture = new 
CompletableFuture<>();
+                
client.newProducer(Schema.STRING).topic(topicName).createAsync()
+                        .thenCompose(producer -> 
client.newReader(Schema.STRING).topic(topicName)
+                                .subscriptionName(subscriptionName)
+                                .startMessageId(MessageId.latest)
+                                .createAsync().exceptionally(createException 
-> {
+                                    producer.closeAsync().exceptionally(ex -> {
+                                        LOG.error("[{}] Close producer fail 
while heath check.", clientAppId());
+                                        return null;
+                                    });
+                                    throw 
FutureUtil.wrapToCompletionException(createException);
+                                }).thenCompose(reader -> 
producer.sendAsync(messageStr)
+                                        .thenCompose(__ -> 
healthCheckRecursiveReadNext(reader, messageStr))
+                                        .whenComplete((__, ex) -> {
+                                            closeAndReCheck(producer, reader, 
topicOptional.get(), subscriptionName)
+                                                    .whenComplete((unused, 
innerEx) -> {
+                                                        if (ex != null) {
+                                                            
resultFuture.completeExceptionally(ex);
+                                                        } else {
+                                                            
resultFuture.complete(null);
+                                                        }
+                                                    });
+                                        }
+                                ))
+                        ).exceptionally(ex -> {
+                            resultFuture.completeExceptionally(ex);
+                            return null;
+                        });
+                return resultFuture;
             });
-        } catch (Exception e) {
-            LOG.warn("Failed to try to delete subscriptions for health check", 
e);
-        }
-
-        CompletableFuture<Producer<String>> producerFuture =
-                client.newProducer(Schema.STRING).topic(topic).createAsync();
-        CompletableFuture<Reader<String>> readerFuture = 
client.newReader(Schema.STRING)
-                .topic(topic).startMessageId(MessageId.latest).createAsync();
-
-        CompletableFuture<Void> completePromise = new CompletableFuture<>();
+    }
 
-        CompletableFuture.allOf(producerFuture, readerFuture).whenComplete(
-                (ignore, exception) -> {
-                    if (exception != null) {
-                        completePromise.completeExceptionally(exception);
+    /**
+     * Close producer and reader and then to re-check if this operation is 
success.
+     *
+     * Re-check
+     * - Producer: If close fails we will print error log to notify user.
+     * - Consumer: If close fails we will force delete subscription.
+     *
+     * @param producer Producer
+     * @param reader Reader
+     * @param topic  Topic
+     * @param subscriptionName  Subscription name
+     */
+    private CompletableFuture<Void> closeAndReCheck(Producer<String> producer, 
Reader<String> reader,
+                                                    Topic topic, String 
subscriptionName) {
+        // no matter exception or success, we still need to
+        // close producer/reader
+        CompletableFuture<Void> producerFuture = producer.closeAsync();
+        CompletableFuture<Void> readerFuture = reader.closeAsync();
+        List<CompletableFuture<Void>> futures = new ArrayList<>(2);
+        futures.add(producerFuture);
+        futures.add(readerFuture);
+        return FutureUtil.waitForAll(Collections.unmodifiableList(futures))
+                .exceptionally(closeException -> {
+                    if (readerFuture.isCompletedExceptionally()) {
+                        LOG.error("[{}] Close reader fail while heath check.", 
clientAppId());
+                        Subscription subscription =
+                                topic.getSubscription(subscriptionName);
+                        // re-check subscription after reader close
+                        if (subscription != null) {
+                            LOG.warn("[{}] Force delete subscription {} "
+                                            + "when it still exists after the"
+                                            + " reader is closed.",
+                                    clientAppId(), subscription);
+                            subscription.deleteForcefully()
+                                    .exceptionally(ex -> {
+                                        LOG.error("[{}] Force delete 
subscription fail"
+                                                        + " while health 
check",
+                                                clientAppId(), ex);
+                                        return null;
+                                    });
+                        }
                     } else {
-                        producerFuture.thenCompose((producer) -> 
producer.sendAsync(messageStr))
-                                .whenComplete((ignore2, exception2) -> {
-                                    if (exception2 != null) {
-                                        
completePromise.completeExceptionally(exception2);
-                                    }
-                                });
-
-                        healthcheckReadLoop(readerFuture, completePromise, 
messageStr);
-
-                        // timeout read loop after 10 seconds
-                        FutureUtil.addTimeoutHandling(completePromise,
-                                HEALTHCHECK_READ_TIMEOUT, 
pulsar().getExecutor(),
-                                () -> FutureUtil.createTimeoutException("Timed 
out reading", getClass(),
-                                        "healthcheck(...)"));
+                        // producer future fail.
+                        LOG.error("[{}] Close producer fail while heath 
check.", clientAppId());
                     }
+                    return null;
                 });
+    }
 
-        completePromise.whenComplete((ignore, exception) -> {
-            producerFuture.thenAccept((producer) -> {
-                producer.closeAsync().whenComplete((ignore2, exception2) -> {
-                    if (exception2 != null) {
-                        LOG.warn("Error closing producer for healthcheck", 
exception2);
+    private CompletableFuture<Void> 
healthCheckRecursiveReadNext(Reader<String> reader, String content) {
+        return reader.readNextAsync()
+                .thenCompose(msg -> {
+                    if (!Objects.equals(content, msg.getValue())) {
+                        return healthCheckRecursiveReadNext(reader, content);
                     }
+                    return CompletableFuture.completedFuture(null);
                 });
-            });
-            readerFuture.thenAccept((reader) -> {
-                reader.closeAsync().whenComplete((ignore2, exception2) -> {
-                    if (exception2 != null) {
-                        LOG.warn("Error closing reader for healthcheck", 
exception2);
-                    }
-                });
-            });
-            if (exception != null) {
-                asyncResponse.resume(new RestException(exception));
-            } else {
-                asyncResponse.resume("ok");
-            }
-        });
-    }
-
-    private void healthcheckReadLoop(CompletableFuture<Reader<String>> 
readerFuture,
-                                     CompletableFuture<?> completablePromise,
-                                     String messageStr) {
-        readerFuture.thenAccept((reader) -> {
-                CompletableFuture<Message<String>> readFuture = 
reader.readNextAsync()
-                    .whenComplete((m, exception) -> {
-                            if (exception != null) {
-                                
completablePromise.completeExceptionally(exception);
-                            } else if (m.getValue().equals(messageStr)) {
-                                completablePromise.complete(null);
-                            } else {
-                                healthcheckReadLoop(readerFuture, 
completablePromise, messageStr);
-                            }
-                        });
-            });
     }
 
     private synchronized void deleteDynamicConfigurationOnZk(String 
configName) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 808b403..4a5aa1f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -66,6 +66,7 @@ import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.api.proto.KeySharedMeta;
+import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import 
org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats.CursorStats;
@@ -497,6 +498,10 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic {
         if (!name.isGlobal()) {
             return CompletableFuture.completedFuture(null);
         }
+        NamespaceName heartbeatNamespace = 
brokerService.pulsar().getHeartbeatNamespaceV2();
+        if (name.getNamespaceObject().equals(heartbeatNamespace)) {
+            return CompletableFuture.completedFuture(null);
+        }
 
         if (log.isDebugEnabled()) {
             log.debug("[{}] Checking replication status", name);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index e96fdc4..e6cf77f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -126,6 +126,7 @@ import org.apache.pulsar.common.api.proto.KeySharedMeta;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.api.proto.TxnAction;
 import org.apache.pulsar.common.events.EventsTopicNames;
+import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
@@ -1354,6 +1355,10 @@ public class PersistentTopic extends AbstractTopic
         if (!name.isGlobal()) {
             return CompletableFuture.completedFuture(null);
         }
+        NamespaceName heartbeatNamespace = 
brokerService.pulsar().getHeartbeatNamespaceV2();
+        if (name.getNamespaceObject().equals(heartbeatNamespace)) {
+            return CompletableFuture.completedFuture(null);
+        }
 
         if (log.isDebugEnabled()) {
             log.debug("[{}] Checking replication status", name);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 8b6f30b..e3ae827 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -38,6 +38,7 @@ import java.util.concurrent.TimeoutException;
 import javax.servlet.ServletContext;
 import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
@@ -709,6 +710,11 @@ public abstract class PulsarWebResource {
         if (!namespace.isGlobal()) {
             return CompletableFuture.completedFuture(null);
         }
+        NamespaceName heartbeatNamespace = 
pulsarService.getHeartbeatNamespaceV2();
+        if (namespace.equals(heartbeatNamespace)) {
+            return CompletableFuture.completedFuture(null);
+        }
+
         final CompletableFuture<ClusterDataImpl> validationFuture = new 
CompletableFuture<>();
         final String localCluster = 
pulsarService.getConfiguration().getClusterName();
 
@@ -1055,4 +1061,13 @@ public abstract class PulsarWebResource {
         }
     }
 
+    protected Void handleCommonRestAsyncException(AsyncResponse asyncResponse, 
Throwable ex) {
+        Throwable realCause = FutureUtil.unwrapCompletionException(ex);
+        if (realCause instanceof WebApplicationException) {
+            asyncResponse.resume(realCause);
+        } else {
+            asyncResponse.resume(new RestException(realCause));
+        }
+        return null;
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java
index e0c887f..b4b97ca 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java
@@ -25,11 +25,15 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.common.naming.TopicVersion;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.compaction.Compactor;
+import org.awaitility.Awaitility;
+import org.junit.Assert;
+import org.springframework.util.CollectionUtils;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
-
-import java.net.URL;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
 
 @Test(groups = "broker")
 @Slf4j
@@ -58,26 +62,100 @@ public class AdminApiHealthCheckTest extends 
MockedPulsarServiceBaseTest {
 
     @Test
     public void testHealthCheckup() throws Exception {
-        admin.brokers().healthcheck();
+        final int times = 30;
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        pulsar.getExecutor().execute(() -> {
+            try {
+                for (int i = 0; i < times; i++) {
+                    admin.brokers().healthcheck();
+                }
+                future.complete(null);
+            }catch (PulsarAdminException e) {
+                future.completeExceptionally(e);
+            }
+        });
+        for (int i = 0; i < times; i++) {
+            admin.brokers().healthcheck();
+        }
+        // To ensure we don't have any subscription
+        final String testHealthCheckTopic = 
String.format("persistent://pulsar/test/localhost:%s/healthcheck",
+                pulsar.getConfig().getWebServicePort().get());
+        Awaitility.await().untilAsserted(() -> {
+            Assert.assertFalse(future.isCompletedExceptionally());
+        });
+        Awaitility.await().untilAsserted(() ->
+                Assert.assertTrue(CollectionUtils.isEmpty(admin.topics()
+                        .getSubscriptions(testHealthCheckTopic).stream()
+                        // All system topics are using compaction, even though 
is not explicitly set in the policies.
+                        .filter(v -> 
!v.equals(Compactor.COMPACTION_SUBSCRIPTION))
+                        .collect(Collectors.toList())
+                ))
+        );
     }
 
     @Test
     public void testHealthCheckupV1() throws Exception {
-        admin.brokers().healthcheck(TopicVersion.V1);
-    }
-
-    @Test(expectedExceptions = PulsarAdminException.class)
-    public void testHealthCheckupV2Error() throws Exception {
-        admin.brokers().healthcheck(TopicVersion.V2);
+        final int times = 30;
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        pulsar.getExecutor().execute(() -> {
+            try {
+                for (int i = 0; i < times; i++) {
+                    admin.brokers().healthcheck(TopicVersion.V1);
+                }
+                future.complete(null);
+            }catch (PulsarAdminException e) {
+                future.completeExceptionally(e);
+            }
+        });
+        for (int i = 0; i < times; i++) {
+            admin.brokers().healthcheck(TopicVersion.V1);
+        }
+        final String testHealthCheckTopic = 
String.format("persistent://pulsar/test/localhost:%s/healthcheck",
+                pulsar.getConfig().getWebServicePort().get());
+        Awaitility.await().untilAsserted(() -> {
+            Assert.assertFalse(future.isCompletedExceptionally());
+        });
+        // To ensure we don't have any subscription
+        Awaitility.await().untilAsserted(() ->
+                Assert.assertTrue(CollectionUtils.isEmpty(admin.topics()
+                        .getSubscriptions(testHealthCheckTopic).stream()
+                        // All system topics are using compaction, even though 
is not explicitly set in the policies.
+                        .filter(v -> 
!v.equals(Compactor.COMPACTION_SUBSCRIPTION))
+                        .collect(Collectors.toList())
+                ))
+        );
     }
 
     @Test
     public void testHealthCheckupV2() throws Exception {
-        final URL pulsarWebAddress = new URL(pulsar.getWebServiceAddress());
-        final String targetNameSpace = "pulsar/" +
-                pulsarWebAddress.getHost() + ":" + pulsarWebAddress.getPort();
-        log.info("Target namespace for broker admin healthcheck V2 endpoint is 
{}", targetNameSpace);
-        admin.namespaces().createNamespace(targetNameSpace);
-        admin.brokers().healthcheck(TopicVersion.V2);
+        final int times = 30;
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        pulsar.getExecutor().execute(() -> {
+            try {
+                for (int i = 0; i < times; i++) {
+                    admin.brokers().healthcheck(TopicVersion.V2);
+                }
+                future.complete(null);
+            }catch (PulsarAdminException e) {
+                future.completeExceptionally(e);
+            }
+        });
+        for (int i = 0; i < times; i++) {
+            admin.brokers().healthcheck(TopicVersion.V2);
+        }
+        final String testHealthCheckTopic = 
String.format("persistent://pulsar/localhost:%s/healthcheck",
+                pulsar.getConfig().getWebServicePort().get());
+        Awaitility.await().untilAsserted(() -> {
+            Assert.assertFalse(future.isCompletedExceptionally());
+        });
+        // To ensure we don't have any subscription
+        Awaitility.await().untilAsserted(() ->
+                Assert.assertTrue(CollectionUtils.isEmpty(admin.topics()
+                        .getSubscriptions(testHealthCheckTopic).stream()
+                        // All system topics are using compaction, even though 
is not explicitly set in the policies.
+                        .filter(v -> 
!v.equals(Compactor.COMPACTION_SUBSCRIPTION))
+                        .collect(Collectors.toList())
+                ))
+        );
     }
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
index 3d2fcae..2cdd9fc 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
@@ -187,4 +187,18 @@ public class FutureUtil {
         }
         return Optional.empty();
     }
+
+    /**
+     * Wrap throwable exception to CompletionException if that exception is 
not an instance of CompletionException.
+     *
+     * @param throwable Exception
+     * @return CompletionException
+     */
+    public static CompletionException wrapToCompletionException(Throwable 
throwable) {
+        if (throwable instanceof CompletionException) {
+            return (CompletionException) throwable;
+        } else {
+            return new CompletionException(throwable);
+        }
+    }
 }

Reply via email to