This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6c7ec4c38c8 [feat] [broker] Add broker health check status into 
prometheus metrics  (#20147)
6c7ec4c38c8 is described below

commit 6c7ec4c38c8dce8351096a494b412952be5cc77b
Author: vineeth1995 <[email protected]>
AuthorDate: Sun Oct 6 22:58:25 2024 -0700

    [feat] [broker] Add broker health check status into prometheus metrics  
(#20147)
---
 conf/broker.conf                                   |  2 +
 .../apache/pulsar/broker/ServiceConfiguration.java |  6 +++
 .../pulsar/broker/admin/impl/BrokersBase.java      | 50 ++++++++++++++--------
 .../pulsar/broker/service/BrokerService.java       | 23 ++++++++++
 .../broker/stats/BrokerOperabilityMetrics.java     | 18 +++++++-
 .../broker/service/PersistentTopicE2ETest.java     | 32 ++++++++++++++
 .../pulsar/broker/stats/PrometheusMetricsTest.java | 17 +++++++-
 7 files changed, 128 insertions(+), 20 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 617e202e5ec..e745fcb2b0a 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -1689,6 +1689,8 @@ exposePublisherStats=true
 statsUpdateFrequencyInSecs=60
 statsUpdateInitialDelayInSecs=60
 
+healthCheckMetricsUpdateTimeInSeconds=-1
+
 # Enable expose the precise backlog stats.
 # Set false to use published counter and consumed counter to calculate, this 
would be more efficient but may be inaccurate.
 # Default is false.
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 58d6444e719..81073b1731b 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -3280,6 +3280,12 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
         doc = "Stats update initial delay in seconds"
     )
     private int statsUpdateInitialDelayInSecs = 60;
+    @FieldContext(
+            category = CATEGORY_METRICS,
+            minValue = -1,
+            doc = "HealthCheck update frequency in seconds. Disable health 
check with value -1 (Default value -1)"
+    )
+    private int healthCheckMetricsUpdateTimeInSeconds = -1;
     @FieldContext(
         category = CATEGORY_METRICS,
         doc = "If true, aggregate publisher stats of PartitionedTopicStats by 
producerName"
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index e13cb1858f7..da4cee7b465 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -51,6 +51,7 @@ import javax.ws.rs.core.Response.Status;
 import org.apache.commons.lang.StringUtils;
 import org.apache.pulsar.PulsarVersion;
 import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.PulsarService.State;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.admin.AdminResource;
@@ -422,26 +423,35 @@ public class BrokersBase extends AdminResource {
     }
 
     private CompletableFuture<Void> internalRunHealthCheck(TopicVersion 
topicVersion) {
-        String brokerId = pulsar().getBrokerId();
+        return internalRunHealthCheck(topicVersion, pulsar(), clientAppId());
+    }
+
+
+    public static CompletableFuture<Void> internalRunHealthCheck(TopicVersion 
topicVersion, PulsarService pulsar,
+                                                                 String 
clientAppId) {
+        NamespaceName namespaceName = (topicVersion == TopicVersion.V2)
+                ? 
NamespaceService.getHeartbeatNamespaceV2(pulsar.getAdvertisedAddress(), 
pulsar.getConfiguration())
+                : 
NamespaceService.getHeartbeatNamespace(pulsar.getAdvertisedAddress(), 
pulsar.getConfiguration());
+        String brokerId = pulsar.getBrokerId();
         final String topicName =
-                getHeartbeatTopicName(brokerId, pulsar().getConfiguration(), 
(topicVersion == TopicVersion.V2));
-        LOG.info("[{}] Running healthCheck with topic={}", clientAppId(), 
topicName);
+                getHeartbeatTopicName(brokerId, pulsar.getConfiguration(), 
(topicVersion == TopicVersion.V2));
+        LOG.info("[{}] Running healthCheck with topic={}", clientAppId, 
topicName);
         final String messageStr = UUID.randomUUID().toString();
         final String subscriptionName = "healthCheck-" + messageStr;
         // create non-partitioned topic manually and close the previous reader 
if present.
-        return pulsar().getBrokerService().getTopic(topicName, true)
+        return pulsar.getBrokerService().getTopic(topicName, true)
             .thenCompose(topicOptional -> {
                 if (!topicOptional.isPresent()) {
                     LOG.error("[{}] Fail to run health check while get topic 
{}. because get null value.",
-                            clientAppId(), topicName);
+                            clientAppId, topicName);
                     throw new RestException(Status.NOT_FOUND,
                             String.format("Topic [%s] not found after 
create.", topicName));
                 }
                 PulsarClient client;
                 try {
-                    client = pulsar().getClient();
+                    client = pulsar.getClient();
                 } catch (PulsarServerException e) {
-                    LOG.error("[{}] Fail to run health check while get 
client.", clientAppId());
+                    LOG.error("[{}] Fail to run health check while get 
client.", clientAppId);
                     throw new RestException(e);
                 }
                 CompletableFuture<Void> resultFuture = new 
CompletableFuture<>();
@@ -451,17 +461,18 @@ public class BrokersBase extends AdminResource {
                                 .startMessageId(MessageId.latest)
                                 .createAsync().exceptionally(createException 
-> {
                                     producer.closeAsync().exceptionally(ex -> {
-                                        LOG.error("[{}] Close producer fail 
while heath check.", clientAppId());
+                                        LOG.error("[{}] Close producer fail 
while heath check.", clientAppId);
                                         return null;
                                     });
                                     throw 
FutureUtil.wrapToCompletionException(createException);
                                 }).thenCompose(reader -> 
producer.sendAsync(messageStr)
                                         .thenCompose(__ -> 
FutureUtil.addTimeoutHandling(
                                                 
healthCheckRecursiveReadNext(reader, messageStr),
-                                                HEALTH_CHECK_READ_TIMEOUT, 
pulsar().getBrokerService().executor(),
+                                                HEALTH_CHECK_READ_TIMEOUT, 
pulsar.getBrokerService().executor(),
                                                 () -> 
HEALTH_CHECK_TIMEOUT_EXCEPTION))
                                         .whenComplete((__, ex) -> {
-                                            closeAndReCheck(producer, reader, 
topicOptional.get(), subscriptionName)
+                                            closeAndReCheck(producer, reader, 
topicOptional.get(), subscriptionName,
+                                                    clientAppId)
                                                     .whenComplete((unused, 
innerEx) -> {
                                                         if (ex != null) {
                                                             
resultFuture.completeExceptionally(ex);
@@ -479,6 +490,11 @@ public class BrokersBase extends AdminResource {
             });
     }
 
+    private CompletableFuture<Void> closeAndReCheck(Producer<String> producer, 
Reader<String> reader,
+                                                           Topic topic, String 
subscriptionName) {
+        return closeAndReCheck(producer, reader, topic, subscriptionName, 
clientAppId());
+    }
+
     /**
      * Close producer and reader and then to re-check if this operation is 
success.
      *
@@ -491,8 +507,8 @@ public class BrokersBase extends AdminResource {
      * @param topic  Topic
      * @param subscriptionName  Subscription name
      */
-    private CompletableFuture<Void> closeAndReCheck(Producer<String> producer, 
Reader<String> reader,
-                                                    Topic topic, String 
subscriptionName) {
+    private static CompletableFuture<Void> closeAndReCheck(Producer<String> 
producer, Reader<String> reader,
+                                                    Topic topic, String 
subscriptionName, String clientAppId) {
         // no matter exception or success, we still need to
         // close producer/reader
         CompletableFuture<Void> producerFuture = producer.closeAsync();
@@ -503,7 +519,7 @@ public class BrokersBase extends AdminResource {
         return FutureUtil.waitForAll(Collections.unmodifiableList(futures))
                 .exceptionally(closeException -> {
                     if (readerFuture.isCompletedExceptionally()) {
-                        LOG.error("[{}] Close reader fail while heath check.", 
clientAppId());
+                        LOG.error("[{}] Close reader fail while heath check.", 
clientAppId);
                         Subscription subscription =
                                 topic.getSubscription(subscriptionName);
                         // re-check subscription after reader close
@@ -511,24 +527,24 @@ public class BrokersBase extends AdminResource {
                             LOG.warn("[{}] Force delete subscription {} "
                                             + "when it still exists after the"
                                             + " reader is closed.",
-                                    clientAppId(), subscription);
+                                    clientAppId, subscription);
                             subscription.deleteForcefully()
                                     .exceptionally(ex -> {
                                         LOG.error("[{}] Force delete 
subscription fail"
                                                         + " while health 
check",
-                                                clientAppId(), ex);
+                                                clientAppId, ex);
                                         return null;
                                     });
                         }
                     } else {
                         // producer future fail.
-                        LOG.error("[{}] Close producer fail while heath 
check.", clientAppId());
+                        LOG.error("[{}] Close producer fail while heath 
check.", clientAppId);
                     }
                     return null;
                 });
     }
 
-    private CompletableFuture<Void> 
healthCheckRecursiveReadNext(Reader<String> reader, String content) {
+    private static CompletableFuture<Void> 
healthCheckRecursiveReadNext(Reader<String> reader, String content) {
         return reader.readNextAsync()
                 .thenCompose(msg -> {
                     if (!Objects.equals(content, msg.getValue())) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index dd722dffcfb..c240c758dcd 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -24,6 +24,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static 
org.apache.bookkeeper.mledger.ManagedLedgerConfig.PROPERTY_SOURCE_TOPIC_KEY;
 import static org.apache.commons.collections4.CollectionUtils.isEmpty;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static 
org.apache.pulsar.broker.admin.impl.BrokersBase.internalRunHealthCheck;
 import static 
org.apache.pulsar.client.util.RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX;
 import static 
org.apache.pulsar.client.util.RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX;
 import static 
org.apache.pulsar.common.naming.SystemTopicNames.isTransactionInternalName;
@@ -157,6 +158,7 @@ import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.naming.TopicVersion;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride;
 import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
@@ -241,6 +243,7 @@ public class BrokerService implements Closeable {
 
     private AuthorizationService authorizationService;
     private final ScheduledExecutorService statsUpdater;
+
     @Getter
     private final ScheduledExecutorService backlogQuotaChecker;
 
@@ -346,6 +349,7 @@ public class BrokerService implements Closeable {
         this.acceptorGroup = EventLoopUtil.newEventLoopGroup(
                 pulsar.getConfiguration().getNumAcceptorThreads(), false, 
acceptorThreadFactory);
         this.workerGroup = eventLoopGroup;
+
         this.statsUpdater = OrderedScheduler.newSchedulerBuilder()
                 .name("pulsar-stats-updater")
                 .numThreads(1)
@@ -611,6 +615,7 @@ public class BrokerService implements Closeable {
         this.startStatsUpdater(
                 serviceConfig.getStatsUpdateInitialDelayInSecs(),
                 serviceConfig.getStatsUpdateFrequencyInSecs());
+        this.initializeHealthChecker();
         this.startInactivityMonitor();
         this.startMessageExpiryMonitor();
         this.startCompactionMonitor();
@@ -640,6 +645,24 @@ public class BrokerService implements Closeable {
         updateRates();
     }
 
+    protected void initializeHealthChecker() {
+        ServiceConfiguration config = pulsar().getConfiguration();
+        if (config.getHealthCheckMetricsUpdateTimeInSeconds() > 0) {
+            int interval = config.getHealthCheckMetricsUpdateTimeInSeconds();
+            statsUpdater.scheduleAtFixedRate(this::checkHealth,
+                    interval, interval, TimeUnit.SECONDS);
+        }
+    }
+
+    public CompletableFuture<Void> checkHealth() {
+        return internalRunHealthCheck(TopicVersion.V2, pulsar(), 
null).thenAccept(__ -> {
+            
this.pulsarStats.getBrokerOperabilityMetrics().recordHealthCheckStatusSuccess();
+        }).exceptionally(ex -> {
+            
this.pulsarStats.getBrokerOperabilityMetrics().recordHealthCheckStatusFail();
+            return null;
+        });
+    }
+
     protected void startDeduplicationSnapshotMonitor() {
         // We do not know whether users will enable deduplication on namespace 
level/topic level or not, so keep this
         // scheduled task runs.
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java
index 3f991be8184..1855e1798b4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java
@@ -42,6 +42,7 @@ public class BrokerOperabilityMetrics implements 
AutoCloseable {
     private final LongAdder connectionTotalCreatedCount;
     private final LongAdder connectionTotalClosedCount;
     private final LongAdder connectionActive;
+    private volatile int healthCheckStatus; // 1=success, 0=failure, -1=unknown
 
     private final LongAdder connectionCreateSuccessCount;
     private final LongAdder connectionCreateFailCount;
@@ -61,7 +62,7 @@ public class BrokerOperabilityMetrics implements 
AutoCloseable {
         this.connectionTotalCreatedCount = new LongAdder();
         this.connectionTotalClosedCount = new LongAdder();
         this.connectionActive = new LongAdder();
-
+        this.healthCheckStatus = -1;
         this.connectionCreateSuccessCount = new LongAdder();
         this.connectionCreateFailCount = new LongAdder();
 
@@ -103,6 +104,7 @@ public class BrokerOperabilityMetrics implements 
AutoCloseable {
         reset();
         metricsList.add(getTopicLoadMetrics());
         metricsList.add(getConnectionMetrics());
+        metricsList.add(getHealthMetrics());
     }
 
     public Metrics generateConnectionMetrics() {
@@ -119,6 +121,12 @@ public class BrokerOperabilityMetrics implements 
AutoCloseable {
         return rMetrics;
     }
 
+    Metrics getHealthMetrics() {
+        Metrics rMetrics = Metrics.create(getDimensionMap("broker_health"));
+        rMetrics.put("brk_health", healthCheckStatus);
+        return rMetrics;
+    }
+
     Map<String, String> getDimensionMap(String metricsName) {
         Map<String, String> dimensionMap = new HashMap<>();
         dimensionMap.put("broker", brokerName);
@@ -179,4 +187,12 @@ public class BrokerOperabilityMetrics implements 
AutoCloseable {
     public void recordConnectionCreateFail() {
         this.connectionCreateFailCount.increment();
     }
+
+    public void recordHealthCheckStatusSuccess() {
+        this.healthCheckStatus = 1;
+    }
+
+    public void recordHealthCheckStatusFail() {
+        this.healthCheckStatus = 0;
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
index 640cd2d37e3..36e741f8fa9 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
@@ -1607,6 +1607,38 @@ public class PersistentTopicE2ETest extends 
BrokerTestBase {
         assertEquals((long) map.get("brk_connection_create_fail_count"), 1);
     }
 
+    /**
+     * There is detailed info about this test.
+     * see: 
https://github.com/apache/pulsar/issues/10150#issuecomment-1112380074
+     */
+    @Test
+    public void testBrokerHealthCheckStatus() throws Exception {
+
+        cleanup();
+        conf.setSystemTopicEnabled(false);
+        conf.setTopicLevelPoliciesEnabled(false);
+        conf.setHealthCheckMetricsUpdateTimeInSeconds(60);
+        setup();
+        BrokerService brokerService = this.pulsar.getBrokerService();
+
+        Map<String, Object> map = null;
+
+        brokerService.checkHealth().get();
+        brokerService.updateRates();
+        Awaitility.await().until(() -> this.activeCount.get() == 1);
+        List<Metrics> metrics = brokerService.getTopicMetrics();
+        System.out.println(metrics);
+
+        for (int i = 0; i < metrics.size(); i++) {
+            if (metrics.get(i).getDimensions().containsValue("broker_health")) 
{
+                map = metrics.get(i).getMetrics();
+                break;
+            }
+        }
+        assertNotNull(map);
+        assertEquals(map.get("brk_health"), 1);
+    }
+
     @Test
     public void testPayloadCorruptionDetection() throws Exception {
         final String topicName = "persistent://prop/ns-abc/topic1";
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index a92f5a4acc2..fa073d3694b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -79,6 +79,7 @@ import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateD
 import org.apache.pulsar.broker.loadbalance.extensions.manager.UnloadManager;
 import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
 import org.apache.pulsar.broker.service.AbstractTopic;
+import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.BrokerTestBase;
 import org.apache.pulsar.broker.service.Topic;
 import 
org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor;
@@ -1789,6 +1790,20 @@ public class PrometheusMetricsTest extends 
BrokerTestBase {
         compareBrokerConnectionStateCount(cm, 2.0);
     }
 
+    @Test
+    public void testBrokerHealthCheckMetric() throws Exception {
+        conf.setHealthCheckMetricsUpdateTimeInSeconds(60);
+        BrokerService brokerService = pulsar.getBrokerService();
+        brokerService.checkHealth().get();
+        brokerService.updateRates();
+        ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
+        PrometheusMetricsTestUtil.generate(pulsar, true, false, false, 
statsOut);
+        String metricsStr = statsOut.toString();
+        Multimap<String, Metric> metrics = parseMetrics(metricsStr);
+        List<Metric> cm = (List<Metric>) metrics.get("pulsar_health");
+        compareBrokerConnectionStateCount(cm, 1);
+    }
+
     private void compareBrokerConnectionStateCount(List<Metric> cm, double 
count) {
         assertEquals(cm.size(), 1);
         assertEquals(cm.get(0).tags.get("cluster"), "test");
@@ -1894,7 +1909,6 @@ public class PrometheusMetricsTest extends BrokerTestBase 
{
         PrometheusMetricsGenerator prometheusMetricsGenerator =
                 new PrometheusMetricsGenerator(pulsar, true, false, false,
                         false, clock);
-
         String previousMetrics = null;
         for (int a = 0; a < 4; a++) {
             ByteArrayOutputStream statsOut1 = new ByteArrayOutputStream();
@@ -1908,7 +1922,6 @@ public class PrometheusMetricsTest extends BrokerTestBase 
{
             assertEquals(metricsStr1, metricsStr2);
             assertNotEquals(metricsStr1, previousMetrics);
             previousMetrics = metricsStr1;
-
             // move time forward
             currentTimeMillis.addAndGet(TimeUnit.SECONDS.toMillis(2));
         }

Reply via email to