This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6c7ec4c38c8 [feat] [broker] Add broker health check status into
prometheus metrics (#20147)
6c7ec4c38c8 is described below
commit 6c7ec4c38c8dce8351096a494b412952be5cc77b
Author: vineeth1995 <[email protected]>
AuthorDate: Sun Oct 6 22:58:25 2024 -0700
[feat] [broker] Add broker health check status into prometheus metrics
(#20147)
---
conf/broker.conf | 2 +
.../apache/pulsar/broker/ServiceConfiguration.java | 6 +++
.../pulsar/broker/admin/impl/BrokersBase.java | 50 ++++++++++++++--------
.../pulsar/broker/service/BrokerService.java | 23 ++++++++++
.../broker/stats/BrokerOperabilityMetrics.java | 18 +++++++-
.../broker/service/PersistentTopicE2ETest.java | 32 ++++++++++++++
.../pulsar/broker/stats/PrometheusMetricsTest.java | 17 +++++++-
7 files changed, 128 insertions(+), 20 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index 617e202e5ec..e745fcb2b0a 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -1689,6 +1689,8 @@ exposePublisherStats=true
statsUpdateFrequencyInSecs=60
statsUpdateInitialDelayInSecs=60
+healthCheckMetricsUpdateTimeInSeconds=-1
+
# Enable expose the precise backlog stats.
# Set false to use published counter and consumed counter to calculate, this
would be more efficient but may be inaccurate.
# Default is false.
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 58d6444e719..81073b1731b 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -3280,6 +3280,12 @@ public class ServiceConfiguration implements
PulsarConfiguration {
doc = "Stats update initial delay in seconds"
)
private int statsUpdateInitialDelayInSecs = 60;
+ @FieldContext(
+ category = CATEGORY_METRICS,
+ minValue = -1,
+ doc = "HealthCheck update frequency in seconds. Disable health
check with value -1 (Default value -1)"
+ )
+ private int healthCheckMetricsUpdateTimeInSeconds = -1;
@FieldContext(
category = CATEGORY_METRICS,
doc = "If true, aggregate publisher stats of PartitionedTopicStats by
producerName"
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index e13cb1858f7..da4cee7b465 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -51,6 +51,7 @@ import javax.ws.rs.core.Response.Status;
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.PulsarService.State;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
@@ -422,26 +423,35 @@ public class BrokersBase extends AdminResource {
}
private CompletableFuture<Void> internalRunHealthCheck(TopicVersion
topicVersion) {
- String brokerId = pulsar().getBrokerId();
+ return internalRunHealthCheck(topicVersion, pulsar(), clientAppId());
+ }
+
+
+ public static CompletableFuture<Void> internalRunHealthCheck(TopicVersion
topicVersion, PulsarService pulsar,
+ String
clientAppId) {
+ NamespaceName namespaceName = (topicVersion == TopicVersion.V2)
+ ?
NamespaceService.getHeartbeatNamespaceV2(pulsar.getAdvertisedAddress(),
pulsar.getConfiguration())
+ :
NamespaceService.getHeartbeatNamespace(pulsar.getAdvertisedAddress(),
pulsar.getConfiguration());
+ String brokerId = pulsar.getBrokerId();
final String topicName =
- getHeartbeatTopicName(brokerId, pulsar().getConfiguration(),
(topicVersion == TopicVersion.V2));
- LOG.info("[{}] Running healthCheck with topic={}", clientAppId(),
topicName);
+ getHeartbeatTopicName(brokerId, pulsar.getConfiguration(),
(topicVersion == TopicVersion.V2));
+ LOG.info("[{}] Running healthCheck with topic={}", clientAppId,
topicName);
final String messageStr = UUID.randomUUID().toString();
final String subscriptionName = "healthCheck-" + messageStr;
// create non-partitioned topic manually and close the previous reader
if present.
- return pulsar().getBrokerService().getTopic(topicName, true)
+ return pulsar.getBrokerService().getTopic(topicName, true)
.thenCompose(topicOptional -> {
if (!topicOptional.isPresent()) {
LOG.error("[{}] Fail to run health check while get topic
{}. because get null value.",
- clientAppId(), topicName);
+ clientAppId, topicName);
throw new RestException(Status.NOT_FOUND,
String.format("Topic [%s] not found after
create.", topicName));
}
PulsarClient client;
try {
- client = pulsar().getClient();
+ client = pulsar.getClient();
} catch (PulsarServerException e) {
- LOG.error("[{}] Fail to run health check while get
client.", clientAppId());
+ LOG.error("[{}] Fail to run health check while get
client.", clientAppId);
throw new RestException(e);
}
CompletableFuture<Void> resultFuture = new
CompletableFuture<>();
@@ -451,17 +461,18 @@ public class BrokersBase extends AdminResource {
.startMessageId(MessageId.latest)
.createAsync().exceptionally(createException
-> {
producer.closeAsync().exceptionally(ex -> {
- LOG.error("[{}] Close producer fail
while heath check.", clientAppId());
+ LOG.error("[{}] Close producer fail
while heath check.", clientAppId);
return null;
});
throw
FutureUtil.wrapToCompletionException(createException);
}).thenCompose(reader ->
producer.sendAsync(messageStr)
.thenCompose(__ ->
FutureUtil.addTimeoutHandling(
healthCheckRecursiveReadNext(reader, messageStr),
- HEALTH_CHECK_READ_TIMEOUT,
pulsar().getBrokerService().executor(),
+ HEALTH_CHECK_READ_TIMEOUT,
pulsar.getBrokerService().executor(),
() ->
HEALTH_CHECK_TIMEOUT_EXCEPTION))
.whenComplete((__, ex) -> {
- closeAndReCheck(producer, reader,
topicOptional.get(), subscriptionName)
+ closeAndReCheck(producer, reader,
topicOptional.get(), subscriptionName,
+ clientAppId)
.whenComplete((unused,
innerEx) -> {
if (ex != null) {
resultFuture.completeExceptionally(ex);
@@ -479,6 +490,11 @@ public class BrokersBase extends AdminResource {
});
}
+ private CompletableFuture<Void> closeAndReCheck(Producer<String> producer,
Reader<String> reader,
+ Topic topic, String
subscriptionName) {
+ return closeAndReCheck(producer, reader, topic, subscriptionName,
clientAppId());
+ }
+
/**
* Close producer and reader and then to re-check if this operation is
success.
*
@@ -491,8 +507,8 @@ public class BrokersBase extends AdminResource {
* @param topic Topic
* @param subscriptionName Subscription name
*/
- private CompletableFuture<Void> closeAndReCheck(Producer<String> producer,
Reader<String> reader,
- Topic topic, String
subscriptionName) {
+ private static CompletableFuture<Void> closeAndReCheck(Producer<String>
producer, Reader<String> reader,
+ Topic topic, String
subscriptionName, String clientAppId) {
// no matter exception or success, we still need to
// close producer/reader
CompletableFuture<Void> producerFuture = producer.closeAsync();
@@ -503,7 +519,7 @@ public class BrokersBase extends AdminResource {
return FutureUtil.waitForAll(Collections.unmodifiableList(futures))
.exceptionally(closeException -> {
if (readerFuture.isCompletedExceptionally()) {
- LOG.error("[{}] Close reader fail while heath check.",
clientAppId());
+ LOG.error("[{}] Close reader fail while heath check.",
clientAppId);
Subscription subscription =
topic.getSubscription(subscriptionName);
// re-check subscription after reader close
@@ -511,24 +527,24 @@ public class BrokersBase extends AdminResource {
LOG.warn("[{}] Force delete subscription {} "
+ "when it still exists after the"
+ " reader is closed.",
- clientAppId(), subscription);
+ clientAppId, subscription);
subscription.deleteForcefully()
.exceptionally(ex -> {
LOG.error("[{}] Force delete
subscription fail"
+ " while health
check",
- clientAppId(), ex);
+ clientAppId, ex);
return null;
});
}
} else {
// producer future fail.
- LOG.error("[{}] Close producer fail while heath
check.", clientAppId());
+ LOG.error("[{}] Close producer fail while heath
check.", clientAppId);
}
return null;
});
}
- private CompletableFuture<Void>
healthCheckRecursiveReadNext(Reader<String> reader, String content) {
+ private static CompletableFuture<Void>
healthCheckRecursiveReadNext(Reader<String> reader, String content) {
return reader.readNextAsync()
.thenCompose(msg -> {
if (!Objects.equals(content, msg.getValue())) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index dd722dffcfb..c240c758dcd 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -24,6 +24,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static
org.apache.bookkeeper.mledger.ManagedLedgerConfig.PROPERTY_SOURCE_TOPIC_KEY;
import static org.apache.commons.collections4.CollectionUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static
org.apache.pulsar.broker.admin.impl.BrokersBase.internalRunHealthCheck;
import static
org.apache.pulsar.client.util.RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX;
import static
org.apache.pulsar.client.util.RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX;
import static
org.apache.pulsar.common.naming.SystemTopicNames.isTransactionInternalName;
@@ -157,6 +158,7 @@ import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.naming.TopicVersion;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
@@ -241,6 +243,7 @@ public class BrokerService implements Closeable {
private AuthorizationService authorizationService;
private final ScheduledExecutorService statsUpdater;
+
@Getter
private final ScheduledExecutorService backlogQuotaChecker;
@@ -346,6 +349,7 @@ public class BrokerService implements Closeable {
this.acceptorGroup = EventLoopUtil.newEventLoopGroup(
pulsar.getConfiguration().getNumAcceptorThreads(), false,
acceptorThreadFactory);
this.workerGroup = eventLoopGroup;
+
this.statsUpdater = OrderedScheduler.newSchedulerBuilder()
.name("pulsar-stats-updater")
.numThreads(1)
@@ -611,6 +615,7 @@ public class BrokerService implements Closeable {
this.startStatsUpdater(
serviceConfig.getStatsUpdateInitialDelayInSecs(),
serviceConfig.getStatsUpdateFrequencyInSecs());
+ this.initializeHealthChecker();
this.startInactivityMonitor();
this.startMessageExpiryMonitor();
this.startCompactionMonitor();
@@ -640,6 +645,24 @@ public class BrokerService implements Closeable {
updateRates();
}
+ protected void initializeHealthChecker() {
+ ServiceConfiguration config = pulsar().getConfiguration();
+ if (config.getHealthCheckMetricsUpdateTimeInSeconds() > 0) {
+ int interval = config.getHealthCheckMetricsUpdateTimeInSeconds();
+ statsUpdater.scheduleAtFixedRate(this::checkHealth,
+ interval, interval, TimeUnit.SECONDS);
+ }
+ }
+
+ public CompletableFuture<Void> checkHealth() {
+ return internalRunHealthCheck(TopicVersion.V2, pulsar(),
null).thenAccept(__ -> {
+
this.pulsarStats.getBrokerOperabilityMetrics().recordHealthCheckStatusSuccess();
+ }).exceptionally(ex -> {
+
this.pulsarStats.getBrokerOperabilityMetrics().recordHealthCheckStatusFail();
+ return null;
+ });
+ }
+
protected void startDeduplicationSnapshotMonitor() {
// We do not know whether users will enable deduplication on namespace
level/topic level or not, so keep this
// scheduled task runs.
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java
index 3f991be8184..1855e1798b4 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java
@@ -42,6 +42,7 @@ public class BrokerOperabilityMetrics implements
AutoCloseable {
private final LongAdder connectionTotalCreatedCount;
private final LongAdder connectionTotalClosedCount;
private final LongAdder connectionActive;
+ private volatile int healthCheckStatus; // 1=success, 0=failure, -1=unknown
private final LongAdder connectionCreateSuccessCount;
private final LongAdder connectionCreateFailCount;
@@ -61,7 +62,7 @@ public class BrokerOperabilityMetrics implements
AutoCloseable {
this.connectionTotalCreatedCount = new LongAdder();
this.connectionTotalClosedCount = new LongAdder();
this.connectionActive = new LongAdder();
-
+ this.healthCheckStatus = -1;
this.connectionCreateSuccessCount = new LongAdder();
this.connectionCreateFailCount = new LongAdder();
@@ -103,6 +104,7 @@ public class BrokerOperabilityMetrics implements
AutoCloseable {
reset();
metricsList.add(getTopicLoadMetrics());
metricsList.add(getConnectionMetrics());
+ metricsList.add(getHealthMetrics());
}
public Metrics generateConnectionMetrics() {
@@ -119,6 +121,12 @@ public class BrokerOperabilityMetrics implements
AutoCloseable {
return rMetrics;
}
+ Metrics getHealthMetrics() {
+ Metrics rMetrics = Metrics.create(getDimensionMap("broker_health"));
+ rMetrics.put("brk_health", healthCheckStatus);
+ return rMetrics;
+ }
+
Map<String, String> getDimensionMap(String metricsName) {
Map<String, String> dimensionMap = new HashMap<>();
dimensionMap.put("broker", brokerName);
@@ -179,4 +187,12 @@ public class BrokerOperabilityMetrics implements
AutoCloseable {
public void recordConnectionCreateFail() {
this.connectionCreateFailCount.increment();
}
+
+ public void recordHealthCheckStatusSuccess() {
+ this.healthCheckStatus = 1;
+ }
+
+ public void recordHealthCheckStatusFail() {
+ this.healthCheckStatus = 0;
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
index 640cd2d37e3..36e741f8fa9 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
@@ -1607,6 +1607,38 @@ public class PersistentTopicE2ETest extends
BrokerTestBase {
assertEquals((long) map.get("brk_connection_create_fail_count"), 1);
}
+ /**
+ * There is detailed info about this test.
+ * see:
https://github.com/apache/pulsar/issues/10150#issuecomment-1112380074
+ */
+ @Test
+ public void testBrokerHealthCheckStatus() throws Exception {
+
+ cleanup();
+ conf.setSystemTopicEnabled(false);
+ conf.setTopicLevelPoliciesEnabled(false);
+ conf.setHealthCheckMetricsUpdateTimeInSeconds(60);
+ setup();
+ BrokerService brokerService = this.pulsar.getBrokerService();
+
+ Map<String, Object> map = null;
+
+ brokerService.checkHealth().get();
+ brokerService.updateRates();
+ Awaitility.await().until(() -> this.activeCount.get() == 1);
+ List<Metrics> metrics = brokerService.getTopicMetrics();
+ System.out.println(metrics);
+
+ for (int i = 0; i < metrics.size(); i++) {
+ if (metrics.get(i).getDimensions().containsValue("broker_health"))
{
+ map = metrics.get(i).getMetrics();
+ break;
+ }
+ }
+ assertNotNull(map);
+ assertEquals(map.get("brk_health"), 1);
+ }
+
@Test
public void testPayloadCorruptionDetection() throws Exception {
final String topicName = "persistent://prop/ns-abc/topic1";
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index a92f5a4acc2..fa073d3694b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -79,6 +79,7 @@ import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateD
import org.apache.pulsar.broker.loadbalance.extensions.manager.UnloadManager;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
import org.apache.pulsar.broker.service.AbstractTopic;
+import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.Topic;
import
org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor;
@@ -1789,6 +1790,20 @@ public class PrometheusMetricsTest extends
BrokerTestBase {
compareBrokerConnectionStateCount(cm, 2.0);
}
+ @Test
+ public void testBrokerHealthCheckMetric() throws Exception {
+ conf.setHealthCheckMetricsUpdateTimeInSeconds(60);
+ BrokerService brokerService = pulsar.getBrokerService();
+ brokerService.checkHealth().get();
+ brokerService.updateRates();
+ ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
+ PrometheusMetricsTestUtil.generate(pulsar, true, false, false,
statsOut);
+ String metricsStr = statsOut.toString();
+ Multimap<String, Metric> metrics = parseMetrics(metricsStr);
+ List<Metric> cm = (List<Metric>) metrics.get("pulsar_health");
+ compareBrokerConnectionStateCount(cm, 1);
+ }
+
private void compareBrokerConnectionStateCount(List<Metric> cm, double
count) {
assertEquals(cm.size(), 1);
assertEquals(cm.get(0).tags.get("cluster"), "test");
@@ -1894,7 +1909,6 @@ public class PrometheusMetricsTest extends BrokerTestBase
{
PrometheusMetricsGenerator prometheusMetricsGenerator =
new PrometheusMetricsGenerator(pulsar, true, false, false,
false, clock);
-
String previousMetrics = null;
for (int a = 0; a < 4; a++) {
ByteArrayOutputStream statsOut1 = new ByteArrayOutputStream();
@@ -1908,7 +1922,6 @@ public class PrometheusMetricsTest extends BrokerTestBase
{
assertEquals(metricsStr1, metricsStr2);
assertNotEquals(metricsStr1, previousMetrics);
previousMetrics = metricsStr1;
-
// move time forward
currentTimeMillis.addAndGet(TimeUnit.SECONDS.toMillis(2));
}