This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 4341f0f301e [feat][broker] PIP-264: Add broker web executor metrics 
(#22816)
4341f0f301e is described below

commit 4341f0f301e0da344bb5ce07bc62c373e7ce48ef
Author: Dragos Misca <dragosvic...@users.noreply.github.com>
AuthorDate: Wed Jun 5 16:34:56 2024 -0700

    [feat][broker] PIP-264: Add broker web executor metrics (#22816)
---
 .../broker/web/WebExecutorThreadPoolStats.java     | 83 ++++++++++++++++++++++
 .../apache/pulsar/broker/web/WebExecutorStats.java |  7 ++
 .../org/apache/pulsar/broker/web/WebService.java   |  5 ++
 .../apache/pulsar/broker/web/WebServiceTest.java   | 18 +++++
 4 files changed, 113 insertions(+)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/WebExecutorThreadPoolStats.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/WebExecutorThreadPoolStats.java
new file mode 100644
index 00000000000..6bfe4e33b8e
--- /dev/null
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/WebExecutorThreadPoolStats.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.web;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.api.metrics.ObservableLongUpDownCounter;
+
+public class WebExecutorThreadPoolStats implements AutoCloseable {
+    // Replaces ['pulsar_web_executor_max_threads', 
'pulsar_web_executor_min_threads']
+    public static final String LIMIT_COUNTER = 
"pulsar.web.executor.thread.limit";
+    private final ObservableLongUpDownCounter limitCounter;
+
+    // Replaces
+    // ['pulsar_web_executor_active_threads', 
'pulsar_web_executor_current_threads', 'pulsar_web_executor_idle_threads']
+    public static final String USAGE_COUNTER = 
"pulsar.web.executor.thread.usage";
+    private final ObservableLongUpDownCounter usageCounter;
+
+    public static final AttributeKey<String> LIMIT_TYPE_KEY =
+            AttributeKey.stringKey("pulsar.web.executor.thread.limit.type");
+    @VisibleForTesting
+    enum LimitType {
+        MAX,
+        MIN;
+        public final Attributes attributes = Attributes.of(LIMIT_TYPE_KEY, 
name().toLowerCase());
+    }
+
+    public static final AttributeKey<String> USAGE_TYPE_KEY =
+            AttributeKey.stringKey("pulsar.web.executor.thread.usage.type");
+    @VisibleForTesting
+    enum UsageType {
+        ACTIVE,
+        CURRENT,
+        IDLE;
+        public final Attributes attributes = Attributes.of(USAGE_TYPE_KEY, 
name().toLowerCase());
+    }
+
+    public WebExecutorThreadPoolStats(Meter meter, WebExecutorThreadPool 
executor) {
+        limitCounter = meter
+                .upDownCounterBuilder(LIMIT_COUNTER)
+                .setUnit("{thread}")
+                .setDescription("The thread limits for the pulsar-web executor 
pool.")
+                .buildWithCallback(measurement -> {
+                    measurement.record(executor.getMaxThreads(), 
LimitType.MAX.attributes);
+                    measurement.record(executor.getMinThreads(), 
LimitType.MIN.attributes);
+                });
+        usageCounter = meter
+                .upDownCounterBuilder(USAGE_COUNTER)
+                .setUnit("{thread}")
+                .setDescription("The current usage of threads in the 
pulsar-web executor pool.")
+                .buildWithCallback(measurement -> {
+                    var idleThreads = executor.getIdleThreads();
+                    var currentThreads = executor.getThreads();
+                    measurement.record(idleThreads, UsageType.IDLE.attributes);
+                    measurement.record(currentThreads, 
UsageType.CURRENT.attributes);
+                    measurement.record(currentThreads - idleThreads, 
UsageType.ACTIVE.attributes);
+                });
+    }
+
+    @Override
+    public synchronized void close() {
+        limitCounter.close();
+        usageCounter.close();
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebExecutorStats.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebExecutorStats.java
index 585df813027..28cfa7430cb 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebExecutorStats.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebExecutorStats.java
@@ -21,14 +21,21 @@ package org.apache.pulsar.broker.web;
 import io.prometheus.client.CollectorRegistry;
 import io.prometheus.client.Gauge;
 import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.pulsar.opentelemetry.annotations.PulsarDeprecatedMetric;
 
+@Deprecated
 class WebExecutorStats implements AutoCloseable {
     private static final AtomicBoolean CLOSED = new AtomicBoolean(false);
 
+    @PulsarDeprecatedMetric(newMetricName = 
WebExecutorThreadPoolStats.LIMIT_COUNTER)
     private final Gauge maxThreads;
+    @PulsarDeprecatedMetric(newMetricName = 
WebExecutorThreadPoolStats.LIMIT_COUNTER)
     private final Gauge minThreads;
+    @PulsarDeprecatedMetric(newMetricName = 
WebExecutorThreadPoolStats.USAGE_COUNTER)
     private final Gauge idleThreads;
+    @PulsarDeprecatedMetric(newMetricName = 
WebExecutorThreadPoolStats.USAGE_COUNTER)
     private final Gauge activeThreads;
+    @PulsarDeprecatedMetric(newMetricName = 
WebExecutorThreadPoolStats.USAGE_COUNTER)
     private final Gauge currentThreads;
     private final WebExecutorThreadPool executor;
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
index 9a439268a8b..bf484d4f41f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
@@ -75,7 +75,9 @@ public class WebService implements AutoCloseable {
     private final PulsarService pulsar;
     private final Server server;
     private final List<Handler> handlers;
+    @Deprecated
     private final WebExecutorStats executorStats;
+    private final WebExecutorThreadPoolStats webExecutorThreadPoolStats;
     private final WebExecutorThreadPool webServiceExecutor;
 
     private final ServerConnector httpConnector;
@@ -101,6 +103,8 @@ public class WebService implements AutoCloseable {
                 "pulsar-web",
                 config.getHttpServerThreadPoolQueueSize());
         this.executorStats = WebExecutorStats.getStats(webServiceExecutor);
+        this.webExecutorThreadPoolStats =
+                new 
WebExecutorThreadPoolStats(pulsar.getOpenTelemetry().getMeter(), 
webServiceExecutor);
         this.server = new Server(webServiceExecutor);
         if (config.getMaxHttpServerConnections() > 0) {
             server.addBean(new 
ConnectionLimit(config.getMaxHttpServerConnections(), server));
@@ -376,6 +380,7 @@ public class WebService implements AutoCloseable {
                 jettyStatisticsCollector = null;
             }
             webServiceExecutor.join();
+            webExecutorThreadPoolStats.close();
             this.executorStats.close();
             log.info("Web service closed");
         } catch (Exception e) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
index 17588a7ecac..30644237a74 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
@@ -18,8 +18,10 @@
  */
 package org.apache.pulsar.broker.web;
 
+import static 
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
 import static 
org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.Metric;
 import static 
org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.parseMetrics;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
@@ -59,6 +61,8 @@ import org.apache.pulsar.PrometheusMetricsTestUtil;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.testcontext.PulsarTestContext;
+import org.apache.pulsar.broker.web.WebExecutorThreadPoolStats.LimitType;
+import org.apache.pulsar.broker.web.WebExecutorThreadPoolStats.UsageType;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminBuilder;
 import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException;
@@ -106,6 +110,19 @@ public class WebServiceTest {
     @Test
     public void testWebExecutorMetrics() throws Exception {
         setupEnv(true, false, false, false, -1, false);
+
+        var otelMetrics = 
pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
+        assertMetricLongSumValue(otelMetrics, 
WebExecutorThreadPoolStats.LIMIT_COUNTER, LimitType.MAX.attributes,
+                value -> assertThat(value).isPositive());
+        assertMetricLongSumValue(otelMetrics, 
WebExecutorThreadPoolStats.LIMIT_COUNTER, LimitType.MIN.attributes,
+                value -> assertThat(value).isPositive());
+        assertMetricLongSumValue(otelMetrics, 
WebExecutorThreadPoolStats.USAGE_COUNTER, UsageType.ACTIVE.attributes,
+                value -> assertThat(value).isNotNegative());
+        assertMetricLongSumValue(otelMetrics, 
WebExecutorThreadPoolStats.USAGE_COUNTER, UsageType.CURRENT.attributes,
+                value -> assertThat(value).isPositive());
+        assertMetricLongSumValue(otelMetrics, 
WebExecutorThreadPoolStats.USAGE_COUNTER, UsageType.IDLE.attributes,
+                value -> assertThat(value).isNotNegative());
+
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
         PrometheusMetricsTestUtil.generate(pulsar, false, false, false, 
statsOut);
         String metricsStr = statsOut.toString();
@@ -498,6 +515,7 @@ public class WebServiceTest {
         pulsarTestContext = PulsarTestContext.builder()
                 .spyByDefault()
                 .config(config)
+                .enableOpenTelemetry(true)
                 .build();
 
         pulsar = pulsarTestContext.getPulsarService();

Reply via email to