merlimat closed pull request #2883: Add healthcheck HTTP endpoint
URL: https://github.com/apache/pulsar/pull/2883
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index cab707e90c..4ebc4f6d89 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -23,11 +23,18 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.TimeUnit;
 
 import javax.ws.rs.GET;
 import javax.ws.rs.POST;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
 import javax.ws.rs.core.Response.Status;
 
 import org.apache.bookkeeper.conf.ClientConfiguration;
@@ -35,8 +42,15 @@
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.loadbalance.LoadManager;
+import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.conf.InternalConfigurationData;
 import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
@@ -196,4 +210,91 @@ public InternalConfigurationData 
getInternalConfigurationData() {
             conf.getZkLedgersRootPath());
     }
 
+    @GET
+    @Path("/health")
+    @ApiOperation(value = "Run a healthcheck against the broker")
+    @ApiResponses(value = { @ApiResponse(code = 200, message = "Everything is 
OK"),
+                            @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+                            @ApiResponse(code = 404, message = "Cluster 
doesn't exist") })
+    public void healthcheck(@Suspended AsyncResponse asyncResponse) throws 
Exception {
+        validateSuperUserAccess();
+        String heartbeatNamespace = NamespaceService.getHeartbeatNamespace(
+                pulsar().getAdvertisedAddress(), pulsar().getConfiguration());
+        String topic = String.format("persistent://%s/healthcheck", 
heartbeatNamespace);
+
+        PulsarClient client = pulsar().getClient();
+
+        String messageStr = UUID.randomUUID().toString();
+        CompletableFuture<Producer<String>> producerFuture =
+            client.newProducer(Schema.STRING).topic(topic).createAsync();
+        CompletableFuture<Reader<String>> readerFuture = 
client.newReader(Schema.STRING)
+            .topic(topic).startMessageId(MessageId.latest).createAsync();
+
+        CompletableFuture<Void> completePromise = new CompletableFuture<>();
+
+        CompletableFuture.allOf(producerFuture, readerFuture).whenComplete(
+                (ignore, exception) -> {
+                    if (exception != null) {
+                        completePromise.completeExceptionally(exception);
+                    } else {
+                        producerFuture.thenCompose((producer) -> 
producer.sendAsync(messageStr))
+                            .whenComplete((ignore2, exception2) -> {
+                                    if (exception2 != null) {
+                                        
completePromise.completeExceptionally(exception2);
+                                    }
+                                });
+
+                        healthcheckReadLoop(readerFuture, completePromise, 
messageStr);
+
+                        // timeout read loop after 10 seconds
+                        ScheduledFuture<?> timeout = 
pulsar().getExecutor().schedule(() -> {
+                                completePromise.completeExceptionally(new 
TimeoutException("Timed out reading"));
+                            }, 10, TimeUnit.SECONDS);
+                        // don't leave timeout dangling
+                        completePromise.whenComplete((ignore2, exception2) -> {
+                                timeout.cancel(false);
+                            });
+                    }
+                });
+
+        completePromise.whenComplete((ignore, exception) -> {
+                producerFuture.thenAccept((producer) -> {
+                        producer.closeAsync().whenComplete((ignore2, 
exception2) -> {
+                                if (exception2 != null) {
+                                    LOG.warn("Error closing producer for 
healthcheck", exception2);
+                                }
+                            });
+                    });
+                readerFuture.thenAccept((reader) -> {
+                        reader.closeAsync().whenComplete((ignore2, exception2) 
-> {
+                                if (exception2 != null) {
+                                    LOG.warn("Error closing reader for 
healthcheck", exception2);
+                                }
+                            });
+                    });
+                if (exception != null) {
+                    asyncResponse.resume(new RestException(exception));
+                } else {
+                    asyncResponse.resume("ok");
+                }
+            });
+    }
+
+    private void healthcheckReadLoop(CompletableFuture<Reader<String>> 
readerFuture,
+                                     CompletableFuture<?> completablePromise,
+                                     String messageStr) {
+        readerFuture.thenAccept((reader) -> {
+                CompletableFuture<Message<String>> readFuture = 
reader.readNextAsync()
+                    .whenComplete((m, exception) -> {
+                            if (exception != null) {
+                                
completablePromise.completeExceptionally(exception);
+                            } else if (m.getValue().equals(messageStr)) {
+                                completablePromise.complete(null);
+                            } else {
+                                healthcheckReadLoop(readerFuture, 
completablePromise, messageStr);
+                            }
+                        });
+            });
+    }
 }
+
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Brokers.java 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Brokers.java
index a6fadd16db..50503c101e 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Brokers.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Brokers.java
@@ -105,4 +105,11 @@
      * @return internal configuration data.
      */
     InternalConfigurationData getInternalConfigurationData() throws 
PulsarAdminException;
+
+    /**
+     * Run a healthcheck on the broker.
+     *
+     * @throws an exception if the healthcheck fails.
+     */
+    void healthcheck() throws PulsarAdminException;
 }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java
index 52270d3c2d..969f673f7a 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java
@@ -101,4 +101,15 @@ public InternalConfigurationData 
getInternalConfigurationData() throws PulsarAdm
         }
     }
 
+    @Override
+    public void healthcheck() throws PulsarAdminException {
+        try {
+            String result = 
request(adminBrokers.path("/health")).get(String.class);
+            if (!result.trim().toLowerCase().equals("ok")) {
+                throw new PulsarAdminException("Healthcheck returned 
unexpected result: " + result);
+            }
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
 }
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBrokers.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBrokers.java
index 9dc50a0579..9a934749e5 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBrokers.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBrokers.java
@@ -92,7 +92,18 @@ void run() throws Exception {
         }
 
     }
-    
+
+    @Parameters(commandDescription = "Run a health check against the broker")
+    private class HealthcheckCmd extends CliCommand {
+
+        @Override
+        void run() throws Exception {
+            admin.brokers().healthcheck();
+            System.out.println("ok");
+        }
+
+    }
+
     public CmdBrokers(PulsarAdmin admin) {
         super("brokers", admin);
         jcommander.addCommand("list", new List());
@@ -101,5 +112,6 @@ public CmdBrokers(PulsarAdmin admin) {
         jcommander.addCommand("list-dynamic-config", new 
GetUpdatableConfigCmd());
         jcommander.addCommand("get-all-dynamic-config", new 
GetAllConfigurationsCmd());
         jcommander.addCommand("get-internal-config", new 
GetInternalConfigurationCmd());
+        jcommander.addCommand("healthcheck", new HealthcheckCmd());
     }
 }
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/HealthcheckTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/HealthcheckTest.java
new file mode 100644
index 0000000000..7c7412333a
--- /dev/null
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/HealthcheckTest.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.cli;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.pulsar.tests.integration.containers.BKContainer;
+import org.apache.pulsar.tests.integration.containers.BrokerContainer;
+import org.apache.pulsar.tests.integration.docker.ContainerExecException;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
+
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Test the healthcheck command.
+ */
+public class HealthcheckTest {
+    private final static Logger log = 
LoggerFactory.getLogger(HealthcheckTest.class);
+    private final PulsarClusterSpec spec = PulsarClusterSpec.builder()
+        .clusterName("HealthcheckTest-" + 
UUID.randomUUID().toString().substring(0, 8))
+        .numProxies(0).numFunctionWorkers(0).enablePrestoWorker(false).build();
+    private PulsarCluster pulsarCluster = null;
+
+    @BeforeMethod
+    public void setupCluster() throws Exception {
+        pulsarCluster = PulsarCluster.forSpec(spec);
+        pulsarCluster.start();
+    }
+
+    @AfterMethod
+    public void tearDownCluster() throws Exception {
+        if (pulsarCluster != null) {
+            pulsarCluster.stop();
+            pulsarCluster = null;
+        }
+    }
+
+    @Test
+    public void testEverythingOK() throws Exception {
+        for (BrokerContainer b : pulsarCluster.getBrokers()) {
+            ContainerExecResult result = b.execCmd(PulsarCluster.ADMIN_SCRIPT, 
"brokers", "healthcheck");
+            Assert.assertEquals(result.getExitCode(), 0);
+            Assert.assertEquals(result.getStdout().trim(), "ok");
+        }
+    }
+
+    private void assertHealthcheckFailure() throws Exception {
+        for (BrokerContainer b : pulsarCluster.getBrokers()) {
+            try {
+                b.execCmd(PulsarCluster.ADMIN_SCRIPT, "brokers", 
"healthcheck");
+                Assert.fail("Should always fail");
+            } catch (ContainerExecException e) {
+                Assert.assertEquals(e.getResult().getExitCode(), 1);
+            }
+        }
+    }
+
+    @Test
+    public void testZooKeeperDown() throws Exception {
+        pulsarCluster.getZooKeeper().execCmd("pkill", "-STOP", "-f", 
"ZooKeeperStarter");
+        assertHealthcheckFailure();
+    }
+
+    // Disabled until PulsarAdmin can time out (#2891)
+    // @Test
+    // public void testBrokerDown() throws Exception {
+    //     for (BrokerContainer b : pulsarCluster.getBrokers()) {
+    //         b.execCmd("pkill", "-STOP", "-f", "PulsarBrokerStarter");
+    //     }
+    //     assertHealthcheckFailure();
+    // }
+
+    @Test
+    public void testBookKeeperDown() throws Exception {
+        for (BKContainer b : pulsarCluster.getBookies()) {
+            b.execCmd("pkill", "-STOP", "-f", "BookieServer");
+        }
+        assertHealthcheckFailure();
+    }
+
+    private static Map<String, String> parseOutput(String output) throws 
Exception {
+        ObjectMapper mapper = new ObjectMapper();
+        return mapper.readValue(output, new TypeReference<HashMap<String, 
String>>() {});
+    }
+}
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
index 7f9986e5e5..bbb5bd4524 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
@@ -448,6 +448,14 @@ public synchronized WorkerContainer getWorker(int index) {
         return brokerContainers.values();
     }
 
+    public Collection<BKContainer> getBookies() {
+        return bookieContainers.values();
+    }
+
+    public ZKContainer getZooKeeper() {
+        return zkContainer;
+    }
+
     public ContainerExecResult runAdminCommandOnAnyBroker(String...commands) 
throws Exception {
         return runCommandOnAnyBrokerWithScript(ADMIN_SCRIPT, commands);
     }
@@ -472,6 +480,22 @@ public void startAllBrokers() {
         brokerContainers.values().forEach(BrokerContainer::start);
     }
 
+    public void stopAllBookies() {
+        bookieContainers.values().forEach(BKContainer::stop);
+    }
+
+    public void startAllBookies() {
+        bookieContainers.values().forEach(BKContainer::start);
+    }
+
+    public void stopZooKeeper() {
+        zkContainer.stop();
+    }
+
+    public void startZooKeeper() {
+        zkContainer.start();
+    }
+
     public ContainerExecResult createNamespace(String nsName) throws Exception 
{
         return runAdminCommandOnAnyBroker(
             "namespaces", "create", "public/" + nsName,
diff --git a/tests/integration/src/test/resources/pulsar-process.xml 
b/tests/integration/src/test/resources/pulsar-process.xml
index e29a05e223..946c2f9cef 100644
--- a/tests/integration/src/test/resources/pulsar-process.xml
+++ b/tests/integration/src/test/resources/pulsar-process.xml
@@ -26,6 +26,7 @@
             <class 
name="org.apache.pulsar.tests.integration.compaction.TestCompaction" />
             <class 
name="org.apache.pulsar.tests.integration.functions.PulsarFunctionsProcessTest" 
/>
             <class 
name="org.apache.pulsar.tests.integration.presto.TestBasicPresto" />
+            <class 
name="org.apache.pulsar.tests.integration.cli.HealthcheckTest" />
         </classes>
     </test>
-</suite>
\ No newline at end of file
+</suite>


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to