This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7009071b6d5 [fix][broker] Optimize /metrics, fix unbounded request 
queue issue and fix race conditions in metricsBufferResponse mode (#22494)
7009071b6d5 is described below

commit 7009071b6d53bbc3d740ea99cdc0c010692679ab
Author: Lari Hotari <lhot...@users.noreply.github.com>
AuthorDate: Sat Apr 13 10:00:23 2024 -0700

    [fix][broker] Optimize /metrics, fix unbounded request queue issue and fix 
race conditions in metricsBufferResponse mode (#22494)
---
 conf/proxy.conf                                    |   6 +-
 .../PrometheusMetricsGeneratorUtils.java           |   2 +-
 .../stats/prometheus/PrometheusMetricsServlet.java | 149 +++++++---
 .../org/apache/pulsar/broker/stats/TimeWindow.java |  94 ------
 .../org/apache/pulsar/broker/stats/WindowWrap.java |  56 ----
 .../broker/stats/prometheus/MetricsExports.java    |  68 +++++
 .../stats/prometheus/PrometheusMetricStreams.java  |   2 +-
 .../prometheus/PrometheusMetricsGenerator.java     | 328 ++++++++++++---------
 .../prometheus/PulsarPrometheusMetricsServlet.java | 140 ++++++++-
 .../pulsar/broker/stats/prometheus/TopicStats.java |  12 +-
 .../apache/pulsar/PrometheusMetricsTestUtil.java   |  84 ++++++
 .../persistent/BucketDelayedDeliveryTest.java      |   6 +-
 .../service/persistent/PersistentTopicTest.java    |   4 +-
 .../broker/service/schema/SchemaServiceTest.java   |   4 +-
 .../pulsar/broker/stats/ConsumerStatsTest.java     |   4 +-
 .../broker/stats/MetadataStoreStatsTest.java       |   6 +-
 .../pulsar/broker/stats/PrometheusMetricsTest.java | 120 ++++----
 .../pulsar/broker/stats/SubscriptionStatsTest.java |   4 +-
 .../apache/pulsar/broker/stats/TimeWindowTest.java |  83 ------
 .../broker/stats/TransactionMetricsTest.java       |  18 +-
 .../buffer/TransactionBufferClientTest.java        |   4 +-
 .../pendingack/PendingAckPersistentTest.java       |   4 +-
 .../apache/pulsar/broker/web/WebServiceTest.java   |   4 +-
 .../pulsar/common/util/SimpleTextOutputStream.java |  16 +-
 .../pulsar/proxy/server/ProxyConfiguration.java    |   6 +
 .../apache/pulsar/proxy/server/ProxyService.java   |   3 +-
 .../pulsar/proxy/server/ProxyServiceStarter.java   |  40 ++-
 27 files changed, 739 insertions(+), 528 deletions(-)

diff --git a/conf/proxy.conf b/conf/proxy.conf
index 8285e1cb753..5a9d433f39c 100644
--- a/conf/proxy.conf
+++ b/conf/proxy.conf
@@ -376,5 +376,7 @@ zooKeeperCacheExpirySeconds=-1
 enableProxyStatsEndpoints=true
 # Whether the '/metrics' endpoint requires authentication. Defaults to true
 authenticateMetricsEndpoint=true
-# Enable cache metrics data, default value is false
-metricsBufferResponse=false
+# Time in milliseconds that metrics endpoint would time out. Default is 30s.
+# Set it to 0 to disable timeout.
+metricsServletTimeoutMs=30000
+
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGeneratorUtils.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGeneratorUtils.java
index 828d9871bb3..077d5280b51 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGeneratorUtils.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGeneratorUtils.java
@@ -76,7 +76,7 @@ public class PrometheusMetricsGeneratorUtils {
                 }
                 for (int j = 0; j < sample.labelNames.size(); j++) {
                     String labelValue = sample.labelValues.get(j);
-                    if (labelValue != null) {
+                    if (labelValue != null && labelValue.indexOf('"') > -1) {
                         labelValue = labelValue.replace("\"", "\\\"");
                     }
                     if (j > 0) {
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
index 64d1fcdab6f..8a41bed29d4 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
@@ -25,9 +25,13 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import javax.servlet.AsyncContext;
+import javax.servlet.AsyncEvent;
+import javax.servlet.AsyncListener;
 import javax.servlet.ServletException;
-import javax.servlet.ServletOutputStream;
 import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
@@ -35,67 +39,132 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class PrometheusMetricsServlet extends HttpServlet {
-
     private static final long serialVersionUID = 1L;
-    private static final int HTTP_STATUS_OK_200 = 200;
-    private static final int HTTP_STATUS_INTERNAL_SERVER_ERROR_500 = 500;
-
-    private final long metricsServletTimeoutMs;
-    private final String cluster;
+    static final int HTTP_STATUS_OK_200 = 200;
+    static final int HTTP_STATUS_INTERNAL_SERVER_ERROR_500 = 500;
+    protected final long metricsServletTimeoutMs;
+    protected final String cluster;
     protected List<PrometheusRawMetricsProvider> metricsProviders;
 
-    private ExecutorService executor = null;
+    protected ExecutorService executor = null;
+    protected final int executorMaxThreads;
 
     public PrometheusMetricsServlet(long metricsServletTimeoutMs, String 
cluster) {
+        this(metricsServletTimeoutMs, cluster, 1);
+    }
+
+    public PrometheusMetricsServlet(long metricsServletTimeoutMs, String 
cluster, int executorMaxThreads) {
         this.metricsServletTimeoutMs = metricsServletTimeoutMs;
         this.cluster = cluster;
+        this.executorMaxThreads = executorMaxThreads;
     }
 
     @Override
     public void init() throws ServletException {
-        executor = Executors.newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("prometheus-stats"));
+        if (executorMaxThreads > 0) {
+            executor =
+                    Executors.newScheduledThreadPool(executorMaxThreads, new 
DefaultThreadFactory("prometheus-stats"));
+        }
     }
 
     @Override
     protected void doGet(HttpServletRequest request, HttpServletResponse 
response) {
         AsyncContext context = request.startAsync();
-        context.setTimeout(metricsServletTimeoutMs);
-        executor.execute(() -> {
-            long start = System.currentTimeMillis();
-            HttpServletResponse res = (HttpServletResponse) 
context.getResponse();
-            try {
-                res.setStatus(HTTP_STATUS_OK_200);
-                res.setContentType("text/plain;charset=utf-8");
-                generateMetrics(cluster, res.getOutputStream());
-            } catch (Exception e) {
-                long end = System.currentTimeMillis();
-                long time = end - start;
-                if (e instanceof EOFException) {
-                    // NO STACKTRACE
-                    log.error("Failed to send metrics, "
-                            + "likely the client or this server closed "
-                            + "the connection due to a timeout ({} ms 
elapsed): {}", time, e + "");
-                } else {
-                    log.error("Failed to generate prometheus stats, {} ms 
elapsed", time, e);
+        // set hard timeout to 2 * timeout
+        if (metricsServletTimeoutMs > 0) {
+            context.setTimeout(metricsServletTimeoutMs * 2);
+        }
+        long startNanos = System.nanoTime();
+        AtomicBoolean taskStarted = new AtomicBoolean(false);
+        Future<?> future = executor.submit(() -> {
+            taskStarted.set(true);
+            long elapsedNanos = System.nanoTime() - startNanos;
+            // check if the request has been timed out, implement a soft 
timeout
+            // so that response writing can continue to up to 2 * timeout
+            if (metricsServletTimeoutMs > 0 && elapsedNanos > 
TimeUnit.MILLISECONDS.toNanos(metricsServletTimeoutMs)) {
+                log.warn("Prometheus metrics request was too long in queue 
({}ms). Skipping sending metrics.",
+                        TimeUnit.NANOSECONDS.toMillis(elapsedNanos));
+                if (!response.isCommitted()) {
+                    response.setStatus(HTTP_STATUS_INTERNAL_SERVER_ERROR_500);
                 }
-                res.setStatus(HTTP_STATUS_INTERNAL_SERVER_ERROR_500);
-            } finally {
-                long end = System.currentTimeMillis();
-                long time = end - start;
-                try {
-                    context.complete();
-                } catch (IllegalStateException e) {
-                    // this happens when metricsServletTimeoutMs expires
-                    // java.lang.IllegalStateException: AsyncContext completed 
and/or Request lifecycle recycled
-                    log.error("Failed to generate prometheus stats, "
-                            + "this is likely due to metricsServletTimeoutMs: 
{} ms elapsed: {}", time, e + "");
+                context.complete();
+                return;
+            }
+            handleAsyncMetricsRequest(context);
+        });
+        context.addListener(new AsyncListener() {
+            @Override
+            public void onComplete(AsyncEvent asyncEvent) throws IOException {
+                if (!taskStarted.get()) {
+                    future.cancel(false);
                 }
             }
+
+            @Override
+            public void onTimeout(AsyncEvent asyncEvent) throws IOException {
+                if (!taskStarted.get()) {
+                    future.cancel(false);
+                }
+                log.warn("Prometheus metrics request timed out");
+                HttpServletResponse res = (HttpServletResponse) 
context.getResponse();
+                if (!res.isCommitted()) {
+                    res.setStatus(HTTP_STATUS_INTERNAL_SERVER_ERROR_500);
+                }
+                context.complete();
+            }
+
+            @Override
+            public void onError(AsyncEvent asyncEvent) throws IOException {
+                if (!taskStarted.get()) {
+                    future.cancel(false);
+                }
+            }
+
+            @Override
+            public void onStartAsync(AsyncEvent asyncEvent) throws IOException 
{
+
+            }
         });
+
+    }
+
+    private void handleAsyncMetricsRequest(AsyncContext context) {
+        long start = System.currentTimeMillis();
+        HttpServletResponse res = (HttpServletResponse) context.getResponse();
+        try {
+            generateMetricsSynchronously(res);
+        } catch (Exception e) {
+            long end = System.currentTimeMillis();
+            long time = end - start;
+            if (e instanceof EOFException) {
+                // NO STACKTRACE
+                log.error("Failed to send metrics, "
+                        + "likely the client or this server closed "
+                        + "the connection due to a timeout ({} ms elapsed): 
{}", time, e + "");
+            } else {
+                log.error("Failed to generate prometheus stats, {} ms 
elapsed", time, e);
+            }
+            if (!res.isCommitted()) {
+                res.setStatus(HTTP_STATUS_INTERNAL_SERVER_ERROR_500);
+            }
+        } finally {
+            long end = System.currentTimeMillis();
+            long time = end - start;
+            try {
+                context.complete();
+            } catch (IllegalStateException e) {
+                // this happens when metricsServletTimeoutMs expires
+                // java.lang.IllegalStateException: AsyncContext completed 
and/or Request lifecycle recycled
+                log.error("Failed to generate prometheus stats, "
+                        + "this is likely due to metricsServletTimeoutMs: {} 
ms elapsed: {}", time, e + "");
+            }
+        }
     }
 
-    protected void generateMetrics(String cluster, ServletOutputStream 
outputStream) throws IOException {
-        PrometheusMetricsGeneratorUtils.generate(cluster, outputStream, 
metricsProviders);
+    private void generateMetricsSynchronously(HttpServletResponse res) throws 
IOException {
+        res.setStatus(HTTP_STATUS_OK_200);
+        res.setContentType("text/plain;charset=utf-8");
+        PrometheusMetricsGeneratorUtils.generate(cluster, 
res.getOutputStream(), metricsProviders);
     }
 
     @Override
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/TimeWindow.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/TimeWindow.java
deleted file mode 100644
index 08730189322..00000000000
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/TimeWindow.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.stats;
-
-import java.util.concurrent.atomic.AtomicReferenceArray;
-import java.util.function.Function;
-
-public final class TimeWindow<T> {
-    private final int interval;
-    private final int sampleCount;
-    private final AtomicReferenceArray<WindowWrap<T>> array;
-
-    public TimeWindow(int sampleCount, int interval) {
-        this.sampleCount = sampleCount;
-        this.interval = interval;
-        this.array = new AtomicReferenceArray<>(sampleCount);
-    }
-
-    /**
-     * return current time window data.
-     *
-     * @param function generate data.
-     * @return
-     */
-    public synchronized WindowWrap<T> current(Function<T, T> function) {
-        long millis = System.currentTimeMillis();
-
-        if (millis < 0) {
-            return null;
-        }
-        int idx = calculateTimeIdx(millis);
-        long windowStart = calculateWindowStart(millis);
-        while (true) {
-            WindowWrap<T> old = array.get(idx);
-            if (old == null) {
-                WindowWrap<T> window = new WindowWrap<>(interval, windowStart, 
null);
-                if (array.compareAndSet(idx, null, window)) {
-                    T value = null == function ? null : function.apply(null);
-                    window.value(value);
-                    return window;
-                } else {
-                    Thread.yield();
-                }
-            } else if (windowStart == old.start()) {
-                return old;
-            } else if (windowStart > old.start()) {
-                T value = null == function ? null : 
function.apply(old.value());
-                old.value(value);
-                old.resetWindowStart(windowStart);
-                return old;
-            } else {
-                //it should never goes here
-                throw new IllegalStateException();
-            }
-        }
-    }
-
-    private int calculateTimeIdx(long timeMillis) {
-        long timeId = timeMillis / this.interval;
-        return (int) (timeId % sampleCount);
-    }
-
-    private long calculateWindowStart(long timeMillis) {
-        return timeMillis - timeMillis % this.interval;
-    }
-
-    public int sampleCount() {
-        return sampleCount;
-    }
-
-    public int interval() {
-        return interval;
-    }
-
-    public long currentWindowStart(long millis) {
-        return this.calculateWindowStart(millis);
-    }
-}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/WindowWrap.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/WindowWrap.java
deleted file mode 100644
index 12869b82921..00000000000
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/WindowWrap.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.stats;
-
-public final class WindowWrap<T> {
-    private final long interval;
-    private long start;
-    private T value;
-
-    public WindowWrap(long interval, long windowStart, T value) {
-        this.interval = interval;
-        this.start = windowStart;
-        this.value = value;
-    }
-
-    public long interval() {
-        return this.interval;
-    }
-
-    public long start() {
-        return this.start;
-    }
-
-    public T value() {
-        return value;
-    }
-
-    public void value(T value) {
-        this.value = value;
-    }
-
-    public WindowWrap<T> resetWindowStart(long startTime) {
-        this.start = startTime;
-        return this;
-    }
-
-    public boolean isTimeInWindow(long timeMillis) {
-        return start <= timeMillis && timeMillis < start + interval;
-    }
-}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/MetricsExports.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/MetricsExports.java
new file mode 100644
index 00000000000..b80e5747d8a
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/MetricsExports.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats.prometheus;
+
+import static org.apache.pulsar.common.stats.JvmMetrics.getJvmDirectMemoryUsed;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.Gauge;
+import io.prometheus.client.hotspot.DefaultExports;
+import org.apache.pulsar.PulsarVersion;
+import org.apache.pulsar.common.util.DirectMemoryUtils;
+
+public class MetricsExports {
+    private static boolean initialized = false;
+
+    private MetricsExports() {
+    }
+
+    public static synchronized void initialize() {
+        if (!initialized) {
+            DefaultExports.initialize();
+            register(CollectorRegistry.defaultRegistry);
+            initialized = true;
+        }
+    }
+
+    public static void register(CollectorRegistry registry) {
+        Gauge.build("jvm_memory_direct_bytes_used", "-").create().setChild(new 
Gauge.Child() {
+            @Override
+            public double get() {
+                return getJvmDirectMemoryUsed();
+            }
+        }).register(registry);
+
+        Gauge.build("jvm_memory_direct_bytes_max", "-").create().setChild(new 
Gauge.Child() {
+            @Override
+            public double get() {
+                return DirectMemoryUtils.jvmMaxDirectMemory();
+            }
+        }).register(registry);
+
+        // metric to export pulsar version info
+        Gauge.build("pulsar_version_info", "-")
+                .labelNames("version", "commit").create()
+                .setChild(new Gauge.Child() {
+                    @Override
+                    public double get() {
+                        return 1.0;
+                    }
+                }, PulsarVersion.getVersion(), PulsarVersion.getGitSha())
+                .register(registry);
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricStreams.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricStreams.java
index 93cbad4e195..5a5a61404b8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricStreams.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricStreams.java
@@ -42,7 +42,7 @@ public class PrometheusMetricStreams {
         stream.write(metricName).write('{');
         for (int i = 0; i < labelsAndValuesArray.length; i += 2) {
             String labelValue = labelsAndValuesArray[i + 1];
-            if (labelValue != null) {
+            if (labelValue != null && labelValue.indexOf('"') > -1) {
                 labelValue = labelValue.replace("\"", "\\\"");
             }
             
stream.write(labelsAndValuesArray[i]).write("=\"").write(labelValue).write('\"');
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
index 124f0d3e54e..bbd09335c0a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
@@ -20,40 +20,39 @@ package org.apache.pulsar.broker.stats.prometheus;
 
 import static 
org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGeneratorUtils.generateSystemMetrics;
 import static 
org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGeneratorUtils.getTypeStr;
-import static org.apache.pulsar.common.stats.JvmMetrics.getJvmDirectMemoryUsed;
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.UnpooledByteBufAllocator;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
 import io.prometheus.client.Collector;
-import io.prometheus.client.CollectorRegistry;
-import io.prometheus.client.Gauge;
-import io.prometheus.client.Gauge.Child;
-import io.prometheus.client.hotspot.DefaultExports;
+import java.io.BufferedOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
-import java.io.StringWriter;
+import java.io.OutputStreamWriter;
 import java.io.Writer;
-import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.StandardCharsets;
+import java.time.Clock;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.stats.NullStatsProvider;
 import org.apache.bookkeeper.stats.StatsProvider;
-import org.apache.pulsar.PulsarVersion;
 import org.apache.pulsar.broker.PulsarService;
-import org.apache.pulsar.broker.stats.TimeWindow;
-import org.apache.pulsar.broker.stats.WindowWrap;
 import org.apache.pulsar.broker.stats.metrics.ManagedCursorMetrics;
 import org.apache.pulsar.broker.stats.metrics.ManagedLedgerCacheMetrics;
 import org.apache.pulsar.broker.stats.metrics.ManagedLedgerMetrics;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.stats.Metrics;
-import org.apache.pulsar.common.util.DirectMemoryUtils;
 import org.apache.pulsar.common.util.SimpleTextOutputStream;
-import org.eclipse.jetty.server.HttpOutput;
 
 /**
  * Generate metrics aggregated at the namespace level and optionally at a 
topic level and formats them out
@@ -62,123 +61,80 @@ import org.eclipse.jetty.server.HttpOutput;
  * 
href="https://prometheus.io/docs/instrumenting/exposition_formats/";>Exposition 
Formats</a>
  */
 @Slf4j
-public class PrometheusMetricsGenerator {
-    private static volatile TimeWindow<ByteBuf> timeWindow;
-    private static final int MAX_COMPONENTS = 64;
-
-    static {
-        DefaultExports.initialize();
-
-        Gauge.build("jvm_memory_direct_bytes_used", "-").create().setChild(new 
Child() {
-            @Override
-            public double get() {
-                return getJvmDirectMemoryUsed();
-            }
-        }).register(CollectorRegistry.defaultRegistry);
-
-        Gauge.build("jvm_memory_direct_bytes_max", "-").create().setChild(new 
Child() {
-            @Override
-            public double get() {
-                return DirectMemoryUtils.jvmMaxDirectMemory();
-            }
-        }).register(CollectorRegistry.defaultRegistry);
-
-        // metric to export pulsar version info
-        Gauge.build("pulsar_version_info", "-")
-                .labelNames("version", "commit").create()
-                .setChild(new Child() {
-                    @Override
-                    public double get() {
-                        return 1.0;
-                    }
-                }, PulsarVersion.getVersion(), PulsarVersion.getGitSha())
-                .register(CollectorRegistry.defaultRegistry);
-    }
-
-    public static void generate(PulsarService pulsar, boolean 
includeTopicMetrics, boolean includeConsumerMetrics,
-                                boolean includeProducerMetrics, OutputStream 
out) throws IOException {
-        generate(pulsar, includeTopicMetrics, includeConsumerMetrics, 
includeProducerMetrics, false, out, null);
-    }
-
-    public static void generate(PulsarService pulsar, boolean 
includeTopicMetrics, boolean includeConsumerMetrics,
-                                boolean includeProducerMetrics, boolean 
splitTopicAndPartitionIndexLabel,
-                                OutputStream out) throws IOException {
-        generate(pulsar, includeTopicMetrics, includeConsumerMetrics, 
includeProducerMetrics,
-                splitTopicAndPartitionIndexLabel, out, null);
-    }
-
-    public static synchronized void generate(PulsarService pulsar, boolean 
includeTopicMetrics,
-                                             boolean includeConsumerMetrics, 
boolean includeProducerMetrics,
-                                             boolean 
splitTopicAndPartitionIndexLabel, OutputStream out,
-                                             
List<PrometheusRawMetricsProvider> metricsProviders) throws IOException {
-        ByteBuf buffer;
-        boolean exposeBufferMetrics = 
pulsar.getConfiguration().isMetricsBufferResponse();
+public class PrometheusMetricsGenerator implements AutoCloseable {
+    private static final int DEFAULT_INITIAL_BUFFER_SIZE = 1024 * 1024; // 1MB
+    private static final int MINIMUM_FOR_MAX_COMPONENTS = 64;
+
+    private volatile MetricsBuffer metricsBuffer;
+    private static AtomicReferenceFieldUpdater<PrometheusMetricsGenerator, 
MetricsBuffer> metricsBufferFieldUpdater =
+            
AtomicReferenceFieldUpdater.newUpdater(PrometheusMetricsGenerator.class, 
MetricsBuffer.class,
+                    "metricsBuffer");
+    private volatile boolean closed;
+
+    public static class MetricsBuffer {
+        private final CompletableFuture<ByteBuf> bufferFuture;
+        private final long createTimeslot;
+        private final AtomicInteger refCnt = new AtomicInteger(2);
+
+        MetricsBuffer(long timeslot) {
+            bufferFuture = new CompletableFuture<>();
+            createTimeslot = timeslot;
+        }
 
-        if (!exposeBufferMetrics) {
-            buffer = generate0(pulsar, includeTopicMetrics, 
includeConsumerMetrics, includeProducerMetrics,
-                    splitTopicAndPartitionIndexLabel, metricsProviders);
-        } else {
-            if (null == timeWindow) {
-                int period = 
pulsar.getConfiguration().getManagedLedgerStatsPeriodSeconds();
-                timeWindow = new TimeWindow<>(1, (int) 
TimeUnit.SECONDS.toMillis(period));
-            }
-            WindowWrap<ByteBuf> window = timeWindow.current(oldBuf -> {
-                // release expired buffer, in case of memory leak
-                if (oldBuf != null && oldBuf.refCnt() > 0) {
-                    oldBuf.release();
-                    log.debug("Cached metrics buffer released");
-                }
+        public CompletableFuture<ByteBuf> getBufferFuture() {
+            return bufferFuture;
+        }
 
-                try {
-                    ByteBuf buf = generate0(pulsar, includeTopicMetrics, 
includeConsumerMetrics, includeProducerMetrics,
-                            splitTopicAndPartitionIndexLabel, 
metricsProviders);
-                    log.debug("Generated metrics buffer size {}", 
buf.readableBytes());
-                    return buf;
-                } catch (IOException e) {
-                    log.error("Generate metrics failed", e);
-                    //return empty buffer if exception happens
-                    return PulsarByteBufAllocator.DEFAULT.heapBuffer(0);
-                }
-            });
+        long getCreateTimeslot() {
+            return createTimeslot;
+        }
 
-            if (null == window || null == window.value()) {
-                return;
-            }
-            buffer = window.value();
-            log.debug("Current window start {}, current cached buf size {}", 
window.start(), buffer.readableBytes());
+        /**
+         * Retain the buffer. This is allowed, only when the buffer is not 
already released.
+         *
+         * @return true if the buffer is retained successfully, false 
otherwise.
+         */
+        boolean retain() {
+            return refCnt.updateAndGet(x -> x > 0 ? x + 1 : x) > 0;
         }
 
-        try {
-            if (out instanceof HttpOutput) {
-                HttpOutput output = (HttpOutput) out;
-                //no mem_copy and memory allocations here
-                ByteBuffer[] buffers = buffer.nioBuffers();
-                for (ByteBuffer buffer0 : buffers) {
-                    output.write(buffer0);
-                }
-            } else {
-                //read data from buffer and write it to output stream, with no 
more heap buffer(byte[]) allocation.
-                //not modify buffer readIndex/writeIndex here.
-                int readIndex = buffer.readerIndex();
-                int readableBytes = buffer.readableBytes();
-                for (int i = 0; i < readableBytes; i++) {
-                    out.write(buffer.getByte(readIndex + i));
-                }
-            }
-        } finally {
-            if (!exposeBufferMetrics && buffer.refCnt() > 0) {
-                buffer.release();
-                log.debug("Metrics buffer released.");
+        /**
+         * Release the buffer.
+         */
+        public void release() {
+            int newValue = refCnt.decrementAndGet();
+            if (newValue == 0) {
+                bufferFuture.whenComplete((byteBuf, throwable) -> {
+                    if (byteBuf != null) {
+                        byteBuf.release();
+                    }
+                });
             }
         }
     }
 
-    private static ByteBuf generate0(PulsarService pulsar, boolean 
includeTopicMetrics, boolean includeConsumerMetrics,
-                                     boolean includeProducerMetrics, boolean 
splitTopicAndPartitionIndexLabel,
-                                     List<PrometheusRawMetricsProvider> 
metricsProviders) throws IOException {
-        //Use unpooled buffers here to avoid direct buffer usage increasing.
-        //when write out 200MB data, MAX_COMPONENTS = 64 needn't mem_copy. 
see: CompositeByteBuf#consolidateIfNeeded()
-        ByteBuf buf = 
UnpooledByteBufAllocator.DEFAULT.compositeDirectBuffer(MAX_COMPONENTS);
+    private final PulsarService pulsar;
+    private final boolean includeTopicMetrics;
+    private final boolean includeConsumerMetrics;
+    private final boolean includeProducerMetrics;
+    private final boolean splitTopicAndPartitionIndexLabel;
+    private final Clock clock;
+
+    private volatile int initialBufferSize = DEFAULT_INITIAL_BUFFER_SIZE;
+
+    public PrometheusMetricsGenerator(PulsarService pulsar, boolean 
includeTopicMetrics,
+                                      boolean includeConsumerMetrics, boolean 
includeProducerMetrics,
+                                      boolean 
splitTopicAndPartitionIndexLabel, Clock clock) {
+        this.pulsar = pulsar;
+        this.includeTopicMetrics = includeTopicMetrics;
+        this.includeConsumerMetrics = includeConsumerMetrics;
+        this.includeProducerMetrics = includeProducerMetrics;
+        this.splitTopicAndPartitionIndexLabel = 
splitTopicAndPartitionIndexLabel;
+        this.clock = clock;
+    }
+
+    private ByteBuf generate0(List<PrometheusRawMetricsProvider> 
metricsProviders) {
+        ByteBuf buf = allocateMultipartCompositeDirectBuffer();
         boolean exceptionHappens = false;
         //Used in namespace/topic and transaction aggregators as share metric 
names
         PrometheusMetricStreams metricStreams = new PrometheusMetricStreams();
@@ -220,10 +176,34 @@ public class PrometheusMetricsGenerator {
             //if exception happens, release buffer
             if (exceptionHappens) {
                 buf.release();
+            } else {
+                // for the next time, the initial buffer size will be 
suggested by the last buffer size
+                initialBufferSize = Math.max(DEFAULT_INITIAL_BUFFER_SIZE, 
buf.readableBytes());
             }
         }
     }
 
+    private ByteBuf allocateMultipartCompositeDirectBuffer() {
+        // use composite buffer with pre-allocated buffers to ensure that the 
pooled allocator can be used
+        // for allocating the buffers
+        ByteBufAllocator byteBufAllocator = PulsarByteBufAllocator.DEFAULT;
+        int chunkSize;
+        if (byteBufAllocator instanceof PooledByteBufAllocator) {
+            PooledByteBufAllocator pooledByteBufAllocator = 
(PooledByteBufAllocator) byteBufAllocator;
+            chunkSize = Math.max(pooledByteBufAllocator.metric().chunkSize(), 
DEFAULT_INITIAL_BUFFER_SIZE);
+        } else {
+            chunkSize = DEFAULT_INITIAL_BUFFER_SIZE;
+        }
+        CompositeByteBuf buf = byteBufAllocator.compositeDirectBuffer(
+                Math.max(MINIMUM_FOR_MAX_COMPONENTS, (initialBufferSize / 
chunkSize) + 1));
+        int totalLen = 0;
+        while (totalLen < initialBufferSize) {
+            totalLen += chunkSize;
+            buf.addComponent(false, byteBufAllocator.directBuffer(chunkSize));
+        }
+        return buf;
+    }
+
     private static void generateBrokerBasicMetrics(PulsarService pulsar, 
SimpleTextOutputStream stream) {
         String clusterName = pulsar.getConfiguration().getClusterName();
         // generate managedLedgerCache metrics
@@ -269,12 +249,13 @@ public class PrometheusMetricsGenerator {
                         String name = key.substring(0, nameIndex);
                         value = key.substring(nameIndex + 1);
                         if (!names.contains(name)) {
-                            stream.write("# TYPE ").write(name.replace("brk_", 
"pulsar_")).write(' ')
-                                    .write(getTypeStr(metricType)).write("\n");
+                            stream.write("# TYPE ");
+                            writeNameReplacingBrkPrefix(stream, name);
+                            stream.write(' 
').write(getTypeStr(metricType)).write("\n");
                             names.add(name);
                         }
-                        stream.write(name.replace("brk_", "pulsar_"))
-                                
.write("{cluster=\"").write(cluster).write('"');
+                        writeNameReplacingBrkPrefix(stream, name);
+                        stream.write("{cluster=\"").write(cluster).write('"');
                     } catch (Exception e) {
                         continue;
                     }
@@ -283,12 +264,13 @@ public class PrometheusMetricsGenerator {
 
                     String name = entry.getKey();
                     if (!names.contains(name)) {
-                        stream.write("# TYPE 
").write(entry.getKey().replace("brk_", "pulsar_")).write(' ')
-                                .write(getTypeStr(metricType)).write('\n');
+                        stream.write("# TYPE ");
+                        writeNameReplacingBrkPrefix(stream, entry.getKey());
+                        stream.write(' 
').write(getTypeStr(metricType)).write('\n');
                         names.add(name);
                     }
-                    stream.write(name.replace("brk_", "pulsar_"))
-                            .write("{cluster=\"").write(cluster).write('"');
+                    writeNameReplacingBrkPrefix(stream, name);
+                    stream.write("{cluster=\"").write(cluster).write('"');
                 }
 
                 //to avoid quantile label duplicated
@@ -308,18 +290,98 @@ public class PrometheusMetricsGenerator {
         }
     }
 
+    private static SimpleTextOutputStream 
writeNameReplacingBrkPrefix(SimpleTextOutputStream stream, String name) {
+        if (name.startsWith("brk_")) {
+            return 
stream.write("pulsar_").write(CharBuffer.wrap(name).position("brk_".length()));
+        } else {
+            return stream.write(name);
+        }
+    }
+
     private static void generateManagedLedgerBookieClientMetrics(PulsarService 
pulsar, SimpleTextOutputStream stream) {
         StatsProvider statsProvider = 
pulsar.getManagedLedgerClientFactory().getStatsProvider();
         if (statsProvider instanceof NullStatsProvider) {
             return;
         }
 
-        try {
-            Writer writer = new StringWriter();
+        try (Writer writer = new OutputStreamWriter(new 
BufferedOutputStream(new OutputStream() {
+                @Override
+                public void write(int b) throws IOException {
+                    stream.writeByte(b);
+                }
+
+                @Override
+                public void write(byte b[], int off, int len) throws 
IOException {
+                    stream.write(b, off, len);
+                }
+            }), StandardCharsets.UTF_8)) {
             statsProvider.writeAllMetrics(writer);
-            stream.write(writer.toString());
         } catch (IOException e) {
-            // nop
+            log.error("Failed to write managed ledger bookie client metrics", 
e);
+        }
+    }
+
+    public MetricsBuffer renderToBuffer(Executor executor, 
List<PrometheusRawMetricsProvider> metricsProviders) {
+        boolean cacheMetricsResponse = 
pulsar.getConfiguration().isMetricsBufferResponse();
+        while (!closed && !Thread.currentThread().isInterrupted()) {
+            long currentTimeSlot = cacheMetricsResponse ? 
calculateCurrentTimeSlot() : 0;
+            MetricsBuffer currentMetricsBuffer = metricsBuffer;
+            if (currentMetricsBuffer == null || 
currentMetricsBuffer.getBufferFuture().isCompletedExceptionally()
+                    || (currentMetricsBuffer.getBufferFuture().isDone()
+                    && (currentMetricsBuffer.getCreateTimeslot() != 0
+                    && currentTimeSlot > 
currentMetricsBuffer.getCreateTimeslot()))) {
+                MetricsBuffer newMetricsBuffer = new 
MetricsBuffer(currentTimeSlot);
+                if (metricsBufferFieldUpdater.compareAndSet(this, 
currentMetricsBuffer, newMetricsBuffer)) {
+                    if (currentMetricsBuffer != null) {
+                        currentMetricsBuffer.release();
+                    }
+                    CompletableFuture<ByteBuf> bufferFuture = 
newMetricsBuffer.getBufferFuture();
+                    executor.execute(() -> {
+                        try {
+                            bufferFuture.complete(generate0(metricsProviders));
+                        } catch (Exception e) {
+                            bufferFuture.completeExceptionally(e);
+                        } finally {
+                            if (currentTimeSlot == 0) {
+                                // if the buffer is not cached, release it 
after the future is completed
+                                metricsBufferFieldUpdater.compareAndSet(this, 
newMetricsBuffer, null);
+                                newMetricsBuffer.release();
+                            }
+                        }
+                    });
+                    // no need to retain before returning since the new buffer 
starts with refCnt 2
+                    return newMetricsBuffer;
+                } else {
+                    currentMetricsBuffer = metricsBuffer;
+                }
+            }
+            // retain the buffer before returning
+            // if the buffer is already released, retaining won't succeed, 
retry in that case
+            if (currentMetricsBuffer != null && currentMetricsBuffer.retain()) 
{
+                return currentMetricsBuffer;
+            }
+        }
+        return null;
+    }
+
+    /**
+     * Calculate the current time slot based on the current time.
+     * This is to ensure that cached metrics are refreshed consistently at a 
fixed interval regardless of the request
+     * time.
+     */
+    private long calculateCurrentTimeSlot() {
+        long cacheTimeoutMillis =
+                TimeUnit.SECONDS.toMillis(Math.max(1, 
pulsar.getConfiguration().getManagedLedgerStatsPeriodSeconds()));
+        long now = clock.millis();
+        return now / cacheTimeoutMillis;
+    }
+
+    @Override
+    public void close() {
+        closed = true;
+        MetricsBuffer buffer = metricsBufferFieldUpdater.getAndSet(this, null);
+        if (buffer != null) {
+            buffer.release();
         }
     }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PulsarPrometheusMetricsServlet.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PulsarPrometheusMetricsServlet.java
index 42bd2652883..7fcc74e965c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PulsarPrometheusMetricsServlet.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PulsarPrometheusMetricsServlet.java
@@ -18,34 +18,142 @@
  */
 package org.apache.pulsar.broker.stats.prometheus;
 
+import java.io.EOFException;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.time.Clock;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.servlet.AsyncContext;
+import javax.servlet.AsyncEvent;
+import javax.servlet.AsyncListener;
 import javax.servlet.ServletOutputStream;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.PulsarService;
+import org.eclipse.jetty.server.HttpOutput;
 
+@Slf4j
 public class PulsarPrometheusMetricsServlet extends PrometheusMetricsServlet {
-
     private static final long serialVersionUID = 1L;
+    private static final int EXECUTOR_MAX_THREADS = 4;
 
-    private final PulsarService pulsar;
-    private final boolean shouldExportTopicMetrics;
-    private final boolean shouldExportConsumerMetrics;
-    private final boolean shouldExportProducerMetrics;
-    private final boolean splitTopicAndPartitionLabel;
+    private final PrometheusMetricsGenerator prometheusMetricsGenerator;
 
     public PulsarPrometheusMetricsServlet(PulsarService pulsar, boolean 
includeTopicMetrics,
-                                          boolean includeConsumerMetrics, 
boolean shouldExportProducerMetrics,
+                                          boolean includeConsumerMetrics, 
boolean includeProducerMetrics,
                                           boolean splitTopicAndPartitionLabel) 
{
-        super(pulsar.getConfiguration().getMetricsServletTimeoutMs(), 
pulsar.getConfiguration().getClusterName());
-        this.pulsar = pulsar;
-        this.shouldExportTopicMetrics = includeTopicMetrics;
-        this.shouldExportConsumerMetrics = includeConsumerMetrics;
-        this.shouldExportProducerMetrics = shouldExportProducerMetrics;
-        this.splitTopicAndPartitionLabel = splitTopicAndPartitionLabel;
+        super(pulsar.getConfiguration().getMetricsServletTimeoutMs(), 
pulsar.getConfiguration().getClusterName(),
+                EXECUTOR_MAX_THREADS);
+        MetricsExports.initialize();
+        prometheusMetricsGenerator =
+                new PrometheusMetricsGenerator(pulsar, includeTopicMetrics, 
includeConsumerMetrics,
+                        includeProducerMetrics, splitTopicAndPartitionLabel, 
Clock.systemUTC());
     }
 
+
     @Override
-    protected void generateMetrics(String cluster, ServletOutputStream 
outputStream) throws IOException {
-        PrometheusMetricsGenerator.generate(pulsar, shouldExportTopicMetrics, 
shouldExportConsumerMetrics,
-                shouldExportProducerMetrics, splitTopicAndPartitionLabel, 
outputStream, metricsProviders);
+    public void destroy() {
+        super.destroy();
+        prometheusMetricsGenerator.close();
+    }
+
+    protected void doGet(HttpServletRequest request, HttpServletResponse 
response) {
+        AsyncContext context = request.startAsync();
+        // set hard timeout to 2 * timeout
+        if (metricsServletTimeoutMs > 0) {
+            context.setTimeout(metricsServletTimeoutMs * 2);
+        }
+        long startNanos = System.nanoTime();
+        AtomicBoolean skipWritingResponse = new AtomicBoolean(false);
+        context.addListener(new AsyncListener() {
+            @Override
+            public void onComplete(AsyncEvent event) throws IOException {
+            }
+
+            @Override
+            public void onTimeout(AsyncEvent event) throws IOException {
+                log.warn("Prometheus metrics request timed out");
+                skipWritingResponse.set(true);
+                HttpServletResponse res = (HttpServletResponse) 
context.getResponse();
+                if (!res.isCommitted()) {
+                    res.setStatus(HTTP_STATUS_INTERNAL_SERVER_ERROR_500);
+                }
+                context.complete();
+            }
+
+            @Override
+            public void onError(AsyncEvent event) throws IOException {
+                skipWritingResponse.set(true);
+            }
+
+            @Override
+            public void onStartAsync(AsyncEvent event) throws IOException {
+            }
+        });
+        PrometheusMetricsGenerator.MetricsBuffer metricsBuffer =
+                prometheusMetricsGenerator.renderToBuffer(executor, 
metricsProviders);
+        if (metricsBuffer == null) {
+            log.info("Service is closing, skip writing metrics.");
+            response.setStatus(HTTP_STATUS_INTERNAL_SERVER_ERROR_500);
+            context.complete();
+            return;
+        }
+        metricsBuffer.getBufferFuture().whenComplete((buffer, ex) -> 
executor.execute(() -> {
+            try {
+                long elapsedNanos = System.nanoTime() - startNanos;
+                // check if the request has been timed out, implement a soft 
timeout
+                // so that response writing can continue to up to 2 * timeout
+                if (metricsServletTimeoutMs > 0 && elapsedNanos > 
TimeUnit.MILLISECONDS.toNanos(
+                        metricsServletTimeoutMs)) {
+                    log.warn("Prometheus metrics request was too long in queue 
({}ms). Skipping sending metrics.",
+                            TimeUnit.NANOSECONDS.toMillis(elapsedNanos));
+                    if (!response.isCommitted() && !skipWritingResponse.get()) 
{
+                        
response.setStatus(HTTP_STATUS_INTERNAL_SERVER_ERROR_500);
+                    }
+                    return;
+                }
+                if (skipWritingResponse.get()) {
+                    log.warn("Response has timed or failed, skip writing 
metrics.");
+                    return;
+                }
+                if (response.isCommitted()) {
+                    log.warn("Response is already committed, cannot write 
metrics");
+                    return;
+                }
+                if (ex != null) {
+                    log.error("Failed to generate metrics", ex);
+                    response.setStatus(HTTP_STATUS_INTERNAL_SERVER_ERROR_500);
+                    return;
+                }
+                if (buffer == null) {
+                    log.error("Failed to generate metrics, buffer is null");
+                    response.setStatus(HTTP_STATUS_INTERNAL_SERVER_ERROR_500);
+                } else {
+                    response.setStatus(HTTP_STATUS_OK_200);
+                    response.setContentType("text/plain;charset=utf-8");
+                    ServletOutputStream outputStream = 
response.getOutputStream();
+                    if (outputStream instanceof HttpOutput) {
+                        HttpOutput output = (HttpOutput) outputStream;
+                        for (ByteBuffer nioBuffer : buffer.nioBuffers()) {
+                            output.write(nioBuffer);
+                        }
+                    } else {
+                        int length = buffer.readableBytes();
+                        if (length > 0) {
+                            buffer.duplicate().readBytes(outputStream, length);
+                        }
+                    }
+                }
+            } catch (EOFException e) {
+                log.error("Failed to write metrics to response due to 
EOFException");
+            } catch (IOException e) {
+                log.error("Failed to write metrics to response", e);
+            } finally {
+                metricsBuffer.release();
+                context.complete();
+            }
+        }));
     }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
index 4be006423f5..27288291d29 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
@@ -507,7 +507,9 @@ class TopicStats {
     static void writeTopicMetric(PrometheusMetricStreams stream, String 
metricName, Number value, String cluster,
                                  String namespace, String topic, boolean 
splitTopicAndPartitionIndexLabel,
                                  String... extraLabelsAndValues) {
-        String[] labelsAndValues = new String[splitTopicAndPartitionIndexLabel 
? 8 : 6];
+        int baseLabelCount = splitTopicAndPartitionIndexLabel ? 8 : 6;
+        String[] labelsAndValues =
+                new String[baseLabelCount + (extraLabelsAndValues != null ? 
extraLabelsAndValues.length : 0)];
         labelsAndValues[0] = "cluster";
         labelsAndValues[1] = cluster;
         labelsAndValues[2] = "namespace";
@@ -527,7 +529,11 @@ class TopicStats {
         } else {
             labelsAndValues[5] = topic;
         }
-        String[] labels = ArrayUtils.addAll(labelsAndValues, 
extraLabelsAndValues);
-        stream.writeSample(metricName, value, labels);
+        if (extraLabelsAndValues != null) {
+            for (int i = 0; i < extraLabelsAndValues.length; i++) {
+                labelsAndValues[baseLabelCount + i] = extraLabelsAndValues[i];
+            }
+        }
+        stream.writeSample(metricName, value, labelsAndValues);
     }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/PrometheusMetricsTestUtil.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/PrometheusMetricsTestUtil.java
new file mode 100644
index 00000000000..fcc3b6aa88f
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/PrometheusMetricsTestUtil.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar;
+
+import com.google.common.util.concurrent.MoreExecutors;
+import io.netty.buffer.ByteBuf;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.time.Clock;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
+import org.eclipse.jetty.server.HttpOutput;
+
+public class PrometheusMetricsTestUtil {
+    public static void generate(PulsarService pulsar, boolean 
includeTopicMetrics, boolean includeConsumerMetrics,
+                                boolean includeProducerMetrics, OutputStream 
out) throws IOException {
+        generate(new PrometheusMetricsGenerator(pulsar, includeTopicMetrics, 
includeConsumerMetrics,
+                includeProducerMetrics, false, Clock.systemUTC()), out, null);
+    }
+
+    public static void generate(PulsarService pulsar, boolean 
includeTopicMetrics, boolean includeConsumerMetrics,
+                                boolean includeProducerMetrics, boolean 
splitTopicAndPartitionIndexLabel,
+                                OutputStream out) throws IOException {
+        generate(new PrometheusMetricsGenerator(pulsar, includeTopicMetrics, 
includeConsumerMetrics,
+                includeProducerMetrics, splitTopicAndPartitionIndexLabel, 
Clock.systemUTC()), out, null);
+    }
+
+    public static void generate(PrometheusMetricsGenerator metricsGenerator, 
OutputStream out,
+                                List<PrometheusRawMetricsProvider> 
metricsProviders) throws IOException {
+        PrometheusMetricsGenerator.MetricsBuffer metricsBuffer =
+                
metricsGenerator.renderToBuffer(MoreExecutors.directExecutor(), 
metricsProviders);
+        try {
+            ByteBuf buffer = null;
+            try {
+                buffer = metricsBuffer.getBufferFuture().get(5, 
TimeUnit.SECONDS);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new IOException(e);
+            } catch (ExecutionException | TimeoutException e) {
+                throw new IOException(e);
+            }
+            if (buffer == null) {
+                return;
+            }
+            if (out instanceof HttpOutput) {
+                HttpOutput output = (HttpOutput) out;
+                ByteBuffer[] nioBuffers = buffer.nioBuffers();
+                for (ByteBuffer nioBuffer : nioBuffers) {
+                    output.write(nioBuffer);
+                }
+            } else {
+                int length = buffer.readableBytes();
+                if (length > 0) {
+                    buffer.duplicate().readBytes(out, length);
+                }
+            }
+        } finally {
+            metricsBuffer.release();
+        }
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java
index 8be0aa4bc7d..ff8e418c024 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java
@@ -40,10 +40,10 @@ import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.pulsar.PrometheusMetricsTestUtil;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory;
 import org.apache.pulsar.broker.service.Dispatcher;
-import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
@@ -218,7 +218,7 @@ public class BucketDelayedDeliveryTest extends 
DelayedDeliveryTest {
         Thread.sleep(2000);
 
         ByteArrayOutputStream output = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, true, true, true, output);
+        PrometheusMetricsTestUtil.generate(pulsar, true, true, true, output);
         String metricsStr = output.toString(StandardCharsets.UTF_8);
         Multimap<String, Metric> metricsMap = parseMetrics(metricsStr);
 
@@ -304,7 +304,7 @@ public class BucketDelayedDeliveryTest extends 
DelayedDeliveryTest {
         assertEquals(opLatencyMetricsSum.intValue(), 
opLatencyTopicMetrics.get().value);
 
         ByteArrayOutputStream namespaceOutput = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, false, true, true, 
namespaceOutput);
+        PrometheusMetricsTestUtil.generate(pulsar, false, true, true, 
namespaceOutput);
         Multimap<String, Metric> namespaceMetricsMap = 
parseMetrics(namespaceOutput.toString(StandardCharsets.UTF_8));
 
         Optional<Metric> namespaceMetric =
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index c214634e6ed..44d24668cc3 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -65,11 +65,11 @@ import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.PrometheusMetricsTestUtil;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.BrokerTestBase;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.TopicPoliciesService;
-import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
@@ -367,7 +367,7 @@ public class PersistentTopicTest extends BrokerTestBase {
 
         latch.await(10, TimeUnit.SECONDS);
         ByteArrayOutputStream output = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, exposeTopicLevelMetrics, 
true, true, output);
+        PrometheusMetricsTestUtil.generate(pulsar, exposeTopicLevelMetrics, 
true, true, output);
         String metricsStr = output.toString(StandardCharsets.UTF_8);
 
         Multimap<String, Metric> metricsMap = parseMetrics(metricsStr);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
index a520b8c241b..3a4016eb79c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
@@ -43,11 +43,11 @@ import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import org.apache.pulsar.PrometheusMetricsTestUtil;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import 
org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata;
-import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo;
 import org.apache.pulsar.common.naming.TopicName;
@@ -121,7 +121,7 @@ public class SchemaServiceTest extends 
MockedPulsarServiceBaseTest {
         deleteSchema(schemaId, version(1));
 
         ByteArrayOutputStream output = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, false, false, false, 
output);
+        PrometheusMetricsTestUtil.generate(pulsar, false, false, false, 
output);
         output.flush();
         String metricsStr = output.toString(StandardCharsets.UTF_8);
         Multimap<String, Metric> metrics = parseMetrics(metricsStr);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
index eb4500c1366..512a5cfcab6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
@@ -45,6 +45,7 @@ import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.PrometheusMetricsTestUtil;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.broker.service.Topic;
@@ -52,7 +53,6 @@ import 
org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.service.plugin.EntryFilter;
 import org.apache.pulsar.broker.service.plugin.EntryFilterProducerTest;
 import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader;
-import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
@@ -335,7 +335,7 @@ public class ConsumerStatsTest extends ProducerConsumerBase 
{
         consumer2.updateRates();
 
         ByteArrayOutputStream output = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, exposeTopicLevelMetrics, 
true, true, output);
+        PrometheusMetricsTestUtil.generate(pulsar, exposeTopicLevelMetrics, 
true, true, output);
         String metricStr = output.toString(StandardCharsets.UTF_8);
 
         Multimap<String, Metric> metricsMap = parseMetrics(metricStr);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/MetadataStoreStatsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/MetadataStoreStatsTest.java
index 15f41365da8..726bde3f3d0 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/MetadataStoreStatsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/MetadataStoreStatsTest.java
@@ -30,10 +30,10 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import lombok.Cleanup;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.PrometheusMetricsTestUtil;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
 import org.apache.pulsar.broker.service.BrokerTestBase;
-import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
@@ -101,7 +101,7 @@ public class MetadataStoreStatsTest extends BrokerTestBase {
         }
 
         ByteArrayOutputStream output = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, false, false, false, 
false, output);
+        PrometheusMetricsTestUtil.generate(pulsar, false, false, false, false, 
output);
         String metricsStr = output.toString();
         Multimap<String, Metric> metricsMap = parseMetrics(metricsStr);
 
@@ -191,7 +191,7 @@ public class MetadataStoreStatsTest extends BrokerTestBase {
         }
 
         ByteArrayOutputStream output = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, false, false, false, 
false, output);
+        PrometheusMetricsTestUtil.generate(pulsar, false, false, false, false, 
output);
         String metricsStr = output.toString();
         Multimap<String, Metric> metricsMap = parseMetrics(metricsStr);
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index d3891931496..1fe0e99b498 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -21,7 +21,10 @@ package org.apache.pulsar.broker.stats;
 import static com.google.common.base.Preconditions.checkArgument;
 import static 
org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.Metric;
 import static 
org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.parseMetrics;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
@@ -35,6 +38,7 @@ import java.lang.reflect.Field;
 import java.math.BigDecimal;
 import java.math.RoundingMode;
 import java.nio.charset.StandardCharsets;
+import java.time.Clock;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -51,6 +55,7 @@ import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -61,6 +66,7 @@ import lombok.Cleanup;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.PrometheusMetricsTestUtil;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
@@ -85,7 +91,6 @@ import org.apache.pulsar.compaction.Compactor;
 import org.apache.pulsar.compaction.PulsarCompactionServiceFactory;
 import org.apache.zookeeper.CreateMode;
 import org.awaitility.Awaitility;
-import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -155,7 +160,7 @@ public class PrometheusMetricsTest extends BrokerTestBase {
         });
         @Cleanup
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, true, false, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, true, false, false, 
statsOut);
         String metricsStr = statsOut.toString();
         Multimap<String, Metric> metrics = parseMetrics(metricsStr);
         assertTrue(metrics.containsKey("pulsar_publish_rate_limit_times"));
@@ -185,7 +190,7 @@ public class PrometheusMetricsTest extends BrokerTestBase {
 
         @Cleanup
         ByteArrayOutputStream statsOut2 = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, true, false, false, 
statsOut2);
+        PrometheusMetricsTestUtil.generate(pulsar, true, false, false, 
statsOut2);
         String metricsStr2 = statsOut2.toString();
         Multimap<String, Metric> metrics2 = parseMetrics(metricsStr2);
         assertTrue(metrics2.containsKey("pulsar_publish_rate_limit_times"));
@@ -217,7 +222,7 @@ public class PrometheusMetricsTest extends BrokerTestBase {
         Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
         @Cleanup
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, true, false, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, true, false, false, 
statsOut);
         String metricsStr = statsOut.toString();
         Multimap<String, Metric> metrics = parseMetrics(metricsStr);
         Collection<Metric> metric = metrics.get("pulsar_topics_count");
@@ -254,7 +259,7 @@ public class PrometheusMetricsTest extends BrokerTestBase {
         producerInServer.getStats().msgThroughputIn = 100;
         @Cleanup
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, true, false, true, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, true, false, true, 
statsOut);
         String metricsStr = statsOut.toString();
         Multimap<String, Metric> metrics = parseMetrics(metricsStr);
         assertTrue(metrics.containsKey("pulsar_average_msg_size"));
@@ -297,7 +302,7 @@ public class PrometheusMetricsTest extends BrokerTestBase {
         }
 
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, true, false, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, true, false, false, 
statsOut);
         String metricsStr = statsOut.toString();
         Multimap<String, Metric> metrics = parseMetrics(metricsStr);
 
@@ -395,7 +400,7 @@ public class PrometheusMetricsTest extends BrokerTestBase {
         }
 
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, true, false, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, true, false, false, 
statsOut);
         String metricsStr = statsOut.toString();
         Multimap<String, Metric> metrics = parseMetrics(metricsStr);
 
@@ -504,7 +509,7 @@ public class PrometheusMetricsTest extends BrokerTestBase {
         c2.close();
 
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, true, false, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, true, false, false, 
statsOut);
         String metricsStr = statsOut.toString();
         Multimap<String, Metric> metrics = parseMetrics(metricsStr);
 
@@ -582,7 +587,7 @@ public class PrometheusMetricsTest extends BrokerTestBase {
 
         // includeTopicMetric true
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, true, false, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, true, false, false, 
statsOut);
         String metricsStr = statsOut.toString();
         Multimap<String, Metric> metrics = parseMetrics(metricsStr);
 
@@ -614,7 +619,7 @@ public class PrometheusMetricsTest extends BrokerTestBase {
 
         // includeTopicMetric false
         ByteArrayOutputStream statsOut2 = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, false, false, false, 
statsOut2);
+        PrometheusMetricsTestUtil.generate(pulsar, false, false, false, 
statsOut2);
         String metricsStr2 = statsOut2.toString();
         Multimap<String, Metric> metrics2 = parseMetrics(metricsStr2);
 
@@ -698,7 +703,7 @@ public class PrometheusMetricsTest extends BrokerTestBase {
         Awaitility.await().until(() -> sub2.getExpiredMessageRate() != 0.0);
 
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, true, false, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, true, false, false, 
statsOut);
         String metricsStr = statsOut.toString();
         Multimap<String, Metric> metrics = parseMetrics(metricsStr);
         // There should be 2 metrics with different tags for each topic
@@ -780,15 +785,15 @@ public class PrometheusMetricsTest extends BrokerTestBase 
{
         for (var latencyMetric : UnloadManager.LatencyMetric.values()) {
             var serviceUnit = "serviceUnit";
             var brokerLookupAddress = "lookupAddress";
-            var serviceUnitStateData = 
Mockito.mock(ServiceUnitStateData.class);
-            
Mockito.when(serviceUnitStateData.sourceBroker()).thenReturn(brokerLookupAddress);
-            
Mockito.when(serviceUnitStateData.dstBroker()).thenReturn(brokerLookupAddress);
+            var serviceUnitStateData = mock(ServiceUnitStateData.class);
+            
when(serviceUnitStateData.sourceBroker()).thenReturn(brokerLookupAddress);
+            
when(serviceUnitStateData.dstBroker()).thenReturn(brokerLookupAddress);
             latencyMetric.beginMeasurement(serviceUnit, brokerLookupAddress, 
serviceUnitStateData);
             latencyMetric.endMeasurement(serviceUnit);
         }
 
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, false, false, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, false, false, false, 
statsOut);
         String metricsStr = statsOut.toString();
         Multimap<String, Metric> metrics = parseMetrics(metricsStr);
         assertTrue(metrics.containsKey("pulsar_bundle_msg_rate_in"));
@@ -838,7 +843,7 @@ public class PrometheusMetricsTest extends BrokerTestBase {
         }
 
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, true, false, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, true, false, false, 
statsOut);
         String metricsStr = statsOut.toString();
         Multimap<String, Metric> metrics = parseMetrics(metricsStr);
         assertTrue(metrics.containsKey("pulsar_subscription_back_log"));
@@ -885,7 +890,7 @@ public class PrometheusMetricsTest extends BrokerTestBase {
         }
 
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, false, false, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, false, false, false, 
statsOut);
         String metricsStr = statsOut.toString();
 
         Multimap<String, Metric> metrics = parseMetrics(metricsStr);
@@ -958,7 +963,7 @@ public class PrometheusMetricsTest extends BrokerTestBase {
         }
 
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, true, false, true, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, true, false, true, 
statsOut);
         String metricsStr = statsOut.toString();
 
         Multimap<String, Metric> metrics = parseMetrics(metricsStr);
@@ -1026,7 +1031,7 @@ public class PrometheusMetricsTest extends BrokerTestBase 
{
         }
 
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, true, true, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, true, true, false, 
statsOut);
         String metricsStr = statsOut.toString();
 
         Multimap<String, Metric> metrics = parseMetrics(metricsStr);
@@ -1113,7 +1118,7 @@ public class PrometheusMetricsTest extends BrokerTestBase 
{
         }
 
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, false, false, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, false, false, false, 
statsOut);
         String metricsStr = statsOut.toString();
 
         Map<String, String> typeDefs = new HashMap<>();
@@ -1217,7 +1222,7 @@ public class PrometheusMetricsTest extends BrokerTestBase 
{
         }
 
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, false, false, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, false, false, false, 
statsOut);
         String metricsStr = statsOut.toString();
 
         Multimap<String, Metric> metrics = parseMetrics(metricsStr);
@@ -1253,7 +1258,7 @@ public class PrometheusMetricsTest extends BrokerTestBase 
{
         }
 
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, false, false, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, false, false, false, 
statsOut);
         String metricsStr = statsOut.toString();
 
         Multimap<String, Metric> metrics = parseMetrics(metricsStr);
@@ -1331,7 +1336,7 @@ public class PrometheusMetricsTest extends BrokerTestBase 
{
         }
 
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, false, false, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, false, false, false, 
statsOut);
         String metricsStr = statsOut.toString();
 
         Multimap<String, Metric> metrics = parseMetrics(metricsStr);
@@ -1412,7 +1417,7 @@ public class PrometheusMetricsTest extends BrokerTestBase 
{
         });
 
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, false, false, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, false, false, false, 
statsOut);
         String metricsStr = statsOut.toString();
         Multimap<String, Metric> metrics = parseMetrics(metricsStr);
         List<Metric> cm = (List<Metric>) 
metrics.get("pulsar_authentication_success_total");
@@ -1473,7 +1478,7 @@ public class PrometheusMetricsTest extends BrokerTestBase 
{
         }
 
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, false, false, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, false, false, false, 
statsOut);
         String metricsStr = statsOut.toString();
         Multimap<String, Metric> metrics = parseMetrics(metricsStr);
         List<Metric> cm = (List<Metric>) 
metrics.get("pulsar_expired_token_total");
@@ -1514,7 +1519,7 @@ public class PrometheusMetricsTest extends BrokerTestBase 
{
         }
 
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, false, false, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, false, false, false, 
statsOut);
         String metricsStr = statsOut.toString();
         Multimap<String, Metric> metrics = parseMetrics(metricsStr);
         Metric countMetric = ((List<Metric>) 
metrics.get("pulsar_expiring_token_minutes_count")).get(0);
@@ -1588,7 +1593,7 @@ public class PrometheusMetricsTest extends BrokerTestBase 
{
         // enable ExposeManagedCursorMetricsInPrometheus
         
pulsar.getConfiguration().setExposeManagedCursorMetricsInPrometheus(true);
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, true, false, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, true, false, false, 
statsOut);
         String metricsStr = statsOut.toString();
 
         Multimap<String, Metric> metrics = parseMetrics(metricsStr);
@@ -1601,7 +1606,7 @@ public class PrometheusMetricsTest extends BrokerTestBase 
{
         // disable ExposeManagedCursorMetricsInPrometheus
         
pulsar.getConfiguration().setExposeManagedCursorMetricsInPrometheus(false);
         ByteArrayOutputStream statsOut2 = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, true, false, false, 
statsOut2);
+        PrometheusMetricsTestUtil.generate(pulsar, true, false, false, 
statsOut2);
         String metricsStr2 = statsOut2.toString();
         Multimap<String, Metric> metrics2 = parseMetrics(metricsStr2);
         List<Metric> cm2 = (List<Metric>) 
metrics2.get("pulsar_ml_cursor_persistLedgerSucceed");
@@ -1620,7 +1625,7 @@ public class PrometheusMetricsTest extends BrokerTestBase 
{
                 .create();
 
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, true, false, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, true, false, false, 
statsOut);
         String metricsStr = statsOut.toString();
         Multimap<String, Metric> metrics = parseMetrics(metricsStr);
         List<Metric> cm = (List<Metric>) 
metrics.get("pulsar_connection_created_total_count");
@@ -1637,7 +1642,7 @@ public class PrometheusMetricsTest extends BrokerTestBase 
{
 
         pulsarClient.close();
         statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, true, false, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, true, false, false, 
statsOut);
         metricsStr = statsOut.toString();
 
         metrics = parseMetrics(metricsStr);
@@ -1660,7 +1665,7 @@ public class PrometheusMetricsTest extends BrokerTestBase 
{
 
         pulsarClient.close();
         statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, true, false, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, true, false, false, 
statsOut);
         metricsStr = statsOut.toString();
 
         metrics = parseMetrics(metricsStr);
@@ -1704,7 +1709,7 @@ public class PrometheusMetricsTest extends BrokerTestBase 
{
                 .messageRoutingMode(MessageRoutingMode.SinglePartition)
                 .create();
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, true, false, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, true, false, false, 
statsOut);
         String metricsStr = statsOut.toString();
         Multimap<String, Metric> metrics = parseMetrics(metricsStr);
         List<Metric> cm = (List<Metric>) 
metrics.get("pulsar_compaction_removed_event_count");
@@ -1739,7 +1744,7 @@ public class PrometheusMetricsTest extends BrokerTestBase 
{
         Compactor compactor = 
((PulsarCompactionServiceFactory)pulsar.getCompactionServiceFactory()).getCompactor();
         compactor.compact(topicName).get();
         statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, true, false, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, true, false, false, 
statsOut);
         metricsStr = statsOut.toString();
         metrics = parseMetrics(metricsStr);
         cm = (List<Metric>) 
metrics.get("pulsar_compaction_removed_event_count");
@@ -1772,31 +1777,36 @@ public class PrometheusMetricsTest extends 
BrokerTestBase {
 
     @Test
     public void testMetricsWithCache() throws Throwable {
-        ServiceConfiguration configuration = 
Mockito.mock(ServiceConfiguration.class);
-        
Mockito.when(configuration.getManagedLedgerStatsPeriodSeconds()).thenReturn(2);
-        Mockito.when(configuration.isMetricsBufferResponse()).thenReturn(true);
-        
Mockito.when(configuration.getClusterName()).thenReturn(configClusterName);
-        Mockito.when(pulsar.getConfiguration()).thenReturn(configuration);
+        ServiceConfiguration configuration = pulsar.getConfiguration();
+        configuration.setManagedLedgerStatsPeriodSeconds(2);
+        configuration.setMetricsBufferResponse(true);
+        configuration.setClusterName(configClusterName);
 
-        int period = 
pulsar.getConfiguration().getManagedLedgerStatsPeriodSeconds();
-        TimeWindow<Object> timeWindow = new TimeWindow<>(2, (int) 
TimeUnit.SECONDS.toMillis(period));
+        // create a mock clock to control the time
+        AtomicLong currentTimeMillis = new 
AtomicLong(System.currentTimeMillis());
+        Clock clock = mock();
+        when(clock.millis()).thenAnswer(invocation -> currentTimeMillis.get());
 
+        PrometheusMetricsGenerator prometheusMetricsGenerator =
+                new PrometheusMetricsGenerator(pulsar, true, false, false,
+                        false, clock);
+
+        String previousMetrics = null;
         for (int a = 0; a < 4; a++) {
-            long start = System.currentTimeMillis();
             ByteArrayOutputStream statsOut1 = new ByteArrayOutputStream();
-            PrometheusMetricsGenerator.generate(pulsar, true, false, false, 
false, statsOut1, null);
+            PrometheusMetricsTestUtil.generate(prometheusMetricsGenerator, 
statsOut1, null);
             ByteArrayOutputStream statsOut2 = new ByteArrayOutputStream();
-            PrometheusMetricsGenerator.generate(pulsar, true, false, false, 
false, statsOut2, null);
-            long end = System.currentTimeMillis();
-
-            if (timeWindow.currentWindowStart(start) == 
timeWindow.currentWindowStart(end)) {
-                String metricsStr1 = statsOut1.toString();
-                String metricsStr2 = statsOut2.toString();
-                assertEquals(metricsStr1, metricsStr2);
-                Multimap<String, Metric> metrics = parseMetrics(metricsStr1);
-            }
+            PrometheusMetricsTestUtil.generate(prometheusMetricsGenerator, 
statsOut2, null);
+
+            String metricsStr1 = statsOut1.toString();
+            String metricsStr2 = statsOut2.toString();
+            assertTrue(metricsStr1.length() > 1000);
+            assertEquals(metricsStr1, metricsStr2);
+            assertNotEquals(metricsStr1, previousMetrics);
+            previousMetrics = metricsStr1;
 
-            Thread.sleep(TimeUnit.SECONDS.toMillis(period / 2));
+            // move time forward
+            currentTimeMillis.addAndGet(TimeUnit.SECONDS.toMillis(2));
         }
     }
 
@@ -1824,7 +1834,7 @@ public class PrometheusMetricsTest extends BrokerTestBase 
{
                 .subscribe();
         @Cleanup
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, true, false, false, true,  
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, true, false, false, true,  
statsOut);
         String metricsStr = statsOut.toString();
         Multimap<String, Metric> metrics = parseMetrics(metricsStr);
         Collection<Metric> metric = metrics.get("pulsar_consumers_count");
@@ -1860,7 +1870,7 @@ public class PrometheusMetricsTest extends BrokerTestBase 
{
         }
 
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, false, false, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, false, false, false, 
statsOut);
         String metricsStr = statsOut.toString();
 
         Pattern typePattern = 
Pattern.compile("^#\\s+TYPE\\s+(\\w+)\\s+(\\w+)");
@@ -1920,7 +1930,7 @@ public class PrometheusMetricsTest extends BrokerTestBase 
{
                 .subscribe();
         @Cleanup
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, true, false,
+        PrometheusMetricsTestUtil.generate(pulsar, true, false,
                 false, statsOut);
         String metricsStr = statsOut.toString();
         final List<String> subCountLines = metricsStr.lines()
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java
index e39860274d1..3e71d8f2111 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java
@@ -31,13 +31,13 @@ import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.PrometheusMetricsTestUtil;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.Dispatcher;
 import org.apache.pulsar.broker.service.EntryFilterSupport;
 import org.apache.pulsar.broker.service.plugin.EntryFilter;
 import org.apache.pulsar.broker.service.plugin.EntryFilterTest;
 import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader;
-import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
@@ -233,7 +233,7 @@ public class SubscriptionStatsTest extends 
ProducerConsumerBase {
         }
 
         ByteArrayOutputStream output = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, enableTopicStats, false, 
false, output);
+        PrometheusMetricsTestUtil.generate(pulsar, enableTopicStats, false, 
false, output);
         String metricsStr = output.toString();
         Multimap<String, Metric> metrics = parseMetrics(metricsStr);
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TimeWindowTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TimeWindowTest.java
deleted file mode 100644
index 89528c19653..00000000000
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TimeWindowTest.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.stats;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-import org.testng.annotations.Test;
-
-public class TimeWindowTest {
-
-    @Test
-    public void windowTest() throws Exception {
-        int intervalInMs = 1000;
-        int sampleCount = 2;
-        TimeWindow<Integer> timeWindow = new TimeWindow<>(sampleCount, 
intervalInMs);
-
-        WindowWrap<Integer> expect1 = timeWindow.current(oldValue -> 1);
-        WindowWrap<Integer> expect2 = timeWindow.current(oldValue -> null);
-        assertNotNull(expect1);
-        assertNotNull(expect2);
-
-        if (expect1.start() == expect2.start()) {
-            assertEquals((int) expect1.value(), 1);
-            assertEquals(expect1, expect2);
-            assertEquals(expect1.value(), expect2.value());
-        }
-
-        Thread.sleep(intervalInMs);
-
-        WindowWrap<Integer> expect3 = timeWindow.current(oldValue -> 2);
-        WindowWrap<Integer> expect4 = timeWindow.current(oldValue -> null);
-        assertNotNull(expect3);
-        assertNotNull(expect4);
-
-        if (expect3.start() == expect4.start()) {
-            assertEquals((int) expect3.value(), 2);
-            assertEquals(expect3, expect4);
-            assertEquals(expect3.value(), expect4.value());
-        }
-
-        Thread.sleep(intervalInMs);
-
-        WindowWrap<Integer> expect5 = timeWindow.current(oldValue -> 3);
-        WindowWrap<Integer> expect6 = timeWindow.current(oldValue -> null);
-        assertNotNull(expect5);
-        assertNotNull(expect6);
-
-        if (expect5.start() == expect6.start()) {
-            assertEquals((int) expect5.value(), 3);
-            assertEquals(expect5, expect6);
-            assertEquals(expect5.value(), expect6.value());
-        }
-
-        Thread.sleep(intervalInMs);
-
-        WindowWrap<Integer> expect7 = timeWindow.current(oldValue -> 4);
-        WindowWrap<Integer> expect8 = timeWindow.current(oldValue -> null);
-        assertNotNull(expect7);
-        assertNotNull(expect8);
-
-        if (expect7.start() == expect8.start()) {
-            assertEquals((int) expect7.value(), 4);
-            assertEquals(expect7, expect8);
-            assertEquals(expect7.value(), expect8.value());
-        }
-    }
-}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
index 723a493eca1..8d5cb9dc391 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
@@ -38,9 +38,9 @@ import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.PrometheusMetricsTestUtil;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.BrokerTestBase;
-import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
@@ -118,7 +118,7 @@ public class TransactionMetricsTest extends BrokerTestBase {
         pulsar.getTransactionMetadataStoreService().getStores()
                 .get(transactionCoordinatorIDTwo).newTransaction(timeout, 
null).get();
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, true, false, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, true, false, false, 
statsOut);
         String metricsStr = statsOut.toString();
         Multimap<String, Metric> metrics = parseMetrics(metricsStr);
         Collection<Metric> metric = metrics.get("pulsar_txn_active_count");
@@ -186,7 +186,7 @@ public class TransactionMetricsTest extends BrokerTestBase {
         pulsar.getBrokerService().updateRates();
 
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, true, false, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, true, false, false, 
statsOut);
         String metricsStr = statsOut.toString();
         Multimap<String, Metric> metrics = parseMetrics(metricsStr);
 
@@ -216,7 +216,7 @@ public class TransactionMetricsTest extends BrokerTestBase {
         });
 
         statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, true, false, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, true, false, false, 
statsOut);
         metricsStr = statsOut.toString();
         metrics = parseMetrics(metricsStr);
 
@@ -272,7 +272,7 @@ public class TransactionMetricsTest extends BrokerTestBase {
         producer.send("hello pulsar".getBytes());
         consumer.acknowledgeAsync(consumer.receive().getMessageId(), 
transaction).get();
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, true, false, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, true, false, false, 
statsOut);
         String metricsStr = statsOut.toString();
 
         Multimap<String, Metric> metrics = parseMetrics(metricsStr);
@@ -290,7 +290,7 @@ public class TransactionMetricsTest extends BrokerTestBase {
         
checkManagedLedgerMetrics(MLTransactionLogImpl.TRANSACTION_SUBSCRIPTION_NAME, 
126, metric);
 
         statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, false, false, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, false, false, false, 
statsOut);
         metricsStr = statsOut.toString();
         metrics = parseMetrics(metricsStr);
         metric = metrics.get("pulsar_storage_size");
@@ -334,7 +334,7 @@ public class TransactionMetricsTest extends BrokerTestBase {
         producer.send("hello pulsar".getBytes());
         consumer.acknowledgeAsync(consumer.receive().getMessageId(), 
transaction).get();
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, true, false, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, true, false, false, 
statsOut);
         String metricsStr = statsOut.toString();
 
         Multimap<String, Metric> metrics = parseMetrics(metricsStr);
@@ -359,7 +359,7 @@ public class TransactionMetricsTest extends BrokerTestBase {
         consumer.acknowledgeAsync(consumer.receive().getMessageId(), 
transaction).get();
 
         statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, true, false, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, true, false, false, 
statsOut);
         metricsStr = statsOut.toString();
         metrics = parseMetrics(metricsStr);
         metric = metrics.get("pulsar_storage_size");
@@ -393,7 +393,7 @@ public class TransactionMetricsTest extends BrokerTestBase {
                     .send();
         }
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, false, false, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, false, false, false, 
statsOut);
         String metricsStr = statsOut.toString();
 
         Map<String, String> typeDefs = new HashMap<>();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
index be036a0cf59..1c3de777e93 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
@@ -46,9 +46,9 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.PrometheusMetricsTestUtil;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
-import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
 import org.apache.pulsar.broker.transaction.TransactionTestBase;
 import 
org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl;
 import 
org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferHandlerImpl;
@@ -229,7 +229,7 @@ public class TransactionBufferClientTest extends 
TransactionTestBase {
 
         @Cleanup
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsarServiceList.get(0), true, 
false, false, statsOut);
+        PrometheusMetricsTestUtil.generate(pulsarServiceList.get(0), true, 
false, false, statsOut);
         String metricsStr = statsOut.toString();
         Multimap<String, Metric> metricsMap = parseMetrics(metricsStr);
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
index 6c24b6b3f01..db9daf56104 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
@@ -46,12 +46,12 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.commons.collections4.map.LinkedMap;
+import org.apache.pulsar.PrometheusMetricsTestUtil;
 import org.apache.pulsar.broker.service.AbstractTopic;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
-import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
 import org.apache.pulsar.broker.transaction.TransactionTestBase;
 import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
 import 
org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl;
@@ -255,7 +255,7 @@ public class PendingAckPersistentTest extends 
TransactionTestBase {
 
         @Cleanup
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsarServiceList.get(0), true, 
false, false, statsOut);
+        PrometheusMetricsTestUtil.generate(pulsarServiceList.get(0), true, 
false, false, statsOut);
         String metricsStr = statsOut.toString();
         Multimap<String, Metric> metricsMap = parseMetrics(metricsStr);
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
index d2b59ed0e49..17588a7ecac 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
@@ -55,9 +55,9 @@ import javax.net.ssl.SSLContext;
 import javax.net.ssl.TrustManager;
 import lombok.Cleanup;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.PrometheusMetricsTestUtil;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
 import org.apache.pulsar.broker.testcontext.PulsarTestContext;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminBuilder;
@@ -107,7 +107,7 @@ public class WebServiceTest {
     public void testWebExecutorMetrics() throws Exception {
         setupEnv(true, false, false, false, -1, false);
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, false, false, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, false, false, false, 
statsOut);
         String metricsStr = statsOut.toString();
         Multimap<String, Metric> metrics = parseMetrics(metricsStr);
 
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SimpleTextOutputStream.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SimpleTextOutputStream.java
index c8c639606aa..9bf6302f50f 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SimpleTextOutputStream.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SimpleTextOutputStream.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.common.util;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.util.CharsetUtil;
+import java.nio.CharBuffer;
 
 /**
  * Format strings and numbers into a ByteBuf without any memory allocation.
@@ -28,6 +29,7 @@ public class SimpleTextOutputStream {
     private final ByteBuf buffer;
     private static final char[] hexChars = {'0', '1', '2', '3', '4', '5', '6', 
'7', '8', '9', 'a', 'b', 'c', 'd', 'e',
             'f'};
+    private final CharBuffer singleCharBuffer = CharBuffer.allocate(1);
 
     public SimpleTextOutputStream(ByteBuf buffer) {
         this.buffer = buffer;
@@ -44,11 +46,17 @@ public class SimpleTextOutputStream {
     }
 
     public SimpleTextOutputStream write(char c) {
-        write(String.valueOf(c));
+        //  In UTF-8, any character from U+0000 to U+007F is encoded in one 
byte
+        if (c <= '\u007F') {
+            buffer.writeByte((byte) c);
+            return this;
+        }
+        singleCharBuffer.put(0, c);
+        buffer.writeCharSequence(singleCharBuffer, CharsetUtil.UTF_8);
         return this;
     }
 
-    public SimpleTextOutputStream write(String s) {
+    public SimpleTextOutputStream write(CharSequence s) {
         if (s == null) {
             return this;
         }
@@ -136,4 +144,8 @@ public class SimpleTextOutputStream {
     public ByteBuf getBuffer() {
         return buffer;
     }
+
+    public void writeByte(int b) {
+        buffer.writeByte(b);
+    }
 }
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index db2969e3c39..39c8fb5e086 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -392,6 +392,12 @@ public class ProxyConfiguration implements 
PulsarConfiguration {
     )
     private boolean authenticateMetricsEndpoint = true;
 
+    @FieldContext(
+            category = CATEGORY_HTTP,
+            doc = "Time in milliseconds that metrics endpoint would time out. 
Default is 30s.\n"
+                    + " Set it to 0 to disable timeout."
+    )
+    private long metricsServletTimeoutMs = 30000;
 
     @FieldContext(
         category = CATEGORY_SASL_AUTH,
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index 61b00871cec..ea9e4ebfaa9 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -299,7 +299,8 @@ public class ProxyService implements Closeable {
     }
 
     private synchronized void createMetricsServlet() {
-        this.metricsServlet = new PrometheusMetricsServlet(-1L, 
proxyConfig.getClusterName());
+        this.metricsServlet =
+                new 
PrometheusMetricsServlet(proxyConfig.getMetricsServletTimeoutMs(), 
proxyConfig.getClusterName());
         if (pendingMetricsProviders != null) {
             pendingMetricsProviders.forEach(provider -> 
metricsServlet.addRawMetricsProvider(provider));
             this.pendingMetricsProviders = null;
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
index 72d54601995..50a8e3ab7d7 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
@@ -24,6 +24,7 @@ import static org.apache.commons.lang3.StringUtils.isEmpty;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.apache.pulsar.common.stats.JvmMetrics.getJvmDirectMemoryUsed;
 import com.google.common.annotations.VisibleForTesting;
+import io.prometheus.client.Collector;
 import io.prometheus.client.CollectorRegistry;
 import io.prometheus.client.Gauge;
 import io.prometheus.client.Gauge.Child;
@@ -236,21 +237,36 @@ public class ProxyServiceStarter {
         if (!metricsInitialized) {
             // Setup metrics
             DefaultExports.initialize();
+            CollectorRegistry registry = CollectorRegistry.defaultRegistry;
 
             // Report direct memory from Netty counters
-            Gauge.build("jvm_memory_direct_bytes_used", 
"-").create().setChild(new Child() {
-                @Override
-                public double get() {
-                    return getJvmDirectMemoryUsed();
-                }
-            }).register(CollectorRegistry.defaultRegistry);
+            Collector jvmMemoryDirectBytesUsed =
+                    Gauge.build("jvm_memory_direct_bytes_used", 
"-").create().setChild(new Child() {
+                        @Override
+                        public double get() {
+                            return getJvmDirectMemoryUsed();
+                        }
+                    });
+            try {
+                registry.register(jvmMemoryDirectBytesUsed);
+            } catch (IllegalArgumentException e) {
+                // workaround issue in tests where the metric is already 
registered
+                log.debug("Failed to register jvm_memory_direct_bytes_used 
metric: {}", e.getMessage());
+            }
 
-            Gauge.build("jvm_memory_direct_bytes_max", 
"-").create().setChild(new Child() {
-                @Override
-                public double get() {
-                    return DirectMemoryUtils.jvmMaxDirectMemory();
-                }
-            }).register(CollectorRegistry.defaultRegistry);
+            Collector jvmMemoryDirectBytesMax =
+                    Gauge.build("jvm_memory_direct_bytes_max", 
"-").create().setChild(new Child() {
+                        @Override
+                        public double get() {
+                            return DirectMemoryUtils.jvmMaxDirectMemory();
+                        }
+                    });
+            try {
+                registry.register(jvmMemoryDirectBytesMax);
+            } catch (IllegalArgumentException e) {
+                // workaround issue in tests where the metric is already 
registered
+                log.debug("Failed to register jvm_memory_direct_bytes_max 
metric: {}", e.getMessage());
+            }
 
             metricsInitialized = true;
         }

Reply via email to