This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8957e353ded [feat][misc] PIP-320: Add OpenTelemetry scaffolding 
(#22010)
8957e353ded is described below

commit 8957e353ded9ee24eccea349c7747da721d9e66a
Author: Dragos Misca <dragosvic...@users.noreply.github.com>
AuthorDate: Fri Feb 9 15:40:20 2024 -0800

    [feat][misc] PIP-320: Add OpenTelemetry scaffolding (#22010)
    
    Co-authored-by: Matteo Merli <mme...@apache.org>
---
 .github/workflows/pulsar-ci.yaml                   |   3 +
 build/run_integration_group.sh                     |   4 +
 distribution/server/src/assemble/LICENSE.bin.txt   |  28 +++
 pom.xml                                            |  31 ++++
 .../ProxySaslAuthenticationTest.java               |   1 +
 pulsar-broker-common/pom.xml                       |  18 ++
 .../stats/prometheus/PrometheusMetricsClient.java  |   0
 pulsar-broker/pom.xml                              |  14 ++
 .../org/apache/pulsar/broker/PulsarService.java    |   6 +
 .../broker/stats/PulsarBrokerOpenTelemetry.java    |  49 +++++
 pulsar-functions/worker/pom.xml                    |   6 +
 .../worker/PulsarWorkerOpenTelemetry.java          |  48 +++++
 .../functions/worker/PulsarWorkerService.java      |   6 +
 .../worker/FunctionAssignmentTailerTest.java       |   5 +
 .../pom.xml                                        |  88 ++++-----
 .../opentelemetry/OpenTelemetryAttributes.java     |  32 ++++
 .../pulsar/opentelemetry/OpenTelemetryService.java | 108 +++++++++++
 .../apache/pulsar/opentelemetry/package-info.java  |  24 +++
 .../opentelemetry/OpenTelemetryServiceTest.java    | 201 +++++++++++++++++++++
 pulsar-proxy/pom.xml                               |   6 +
 .../apache/pulsar/proxy/server/ProxyService.java   |   7 +
 .../proxy/stats/PulsarProxyOpenTelemetry.java      |  49 +++++
 .../extensions/SimpleProxyExtensionTestBase.java   |   1 +
 .../server/AdminProxyHandlerKeystoreTLSTest.java   |   1 +
 .../proxy/server/AuthedAdminProxyHandlerTest.java  |   1 +
 .../proxy/server/ProxyAdditionalServletTest.java   |   1 +
 .../ProxyAuthenticatedProducerConsumerTest.java    |   1 +
 .../proxy/server/ProxyAuthenticationTest.java      |   4 +-
 .../server/ProxyConnectionThrottlingTest.java      |   1 +
 .../server/ProxyEnableHAProxyProtocolTest.java     |   1 +
 .../proxy/server/ProxyForwardAuthDataTest.java     |   4 +-
 .../server/ProxyKeyStoreTlsTransportTest.java      |   1 +
 .../proxy/server/ProxyKeyStoreTlsWithAuthTest.java |   1 +
 .../server/ProxyKeyStoreTlsWithoutAuthTest.java    |   1 +
 .../proxy/server/ProxyLookupThrottlingTest.java    |   1 +
 .../pulsar/proxy/server/ProxyMutualTlsTest.java    |   1 +
 .../pulsar/proxy/server/ProxyParserTest.java       |   1 +
 .../pulsar/proxy/server/ProxyRefreshAuthTest.java  |   4 +-
 .../proxy/server/ProxyRolesEnforcementTest.java    |   4 +-
 .../ProxyServiceStarterDisableZeroCopyTest.java    |   3 +-
 .../proxy/server/ProxyServiceStarterTest.java      |   1 +
 .../proxy/server/ProxyServiceTlsStarterTest.java   |   1 +
 .../apache/pulsar/proxy/server/ProxyStatsTest.java |   1 +
 .../proxy/server/ProxyStuckConnectionTest.java     |   1 +
 .../org/apache/pulsar/proxy/server/ProxyTest.java  |   1 +
 .../apache/pulsar/proxy/server/ProxyTlsTest.java   |   1 +
 .../pulsar/proxy/server/ProxyTlsWithAuthTest.java  |   1 +
 .../server/ProxyWithAuthorizationNegTest.java      |   4 +-
 .../proxy/server/ProxyWithAuthorizationTest.java   |   5 +-
 .../server/ProxyWithExtensibleLoadManagerTest.java |   1 +
 .../server/ProxyWithJwtAuthorizationTest.java      |   4 +-
 .../server/ProxyWithoutServiceDiscoveryTest.java   |   5 +-
 .../SuperUserAuthedAdminProxyHandlerTest.java      |   1 +
 .../server/UnauthedAdminProxyHandlerTest.java      |   1 +
 tests/integration/pom.xml                          |  20 +-
 .../OpenTelemetryCollectorContainer.java           |  63 +++++++
 .../integration/containers/PulsarContainer.java    |   2 +
 .../metrics/OpenTelemetrySanityTest.java           | 165 +++++++++++++++++
 .../integration/topologies/PulsarCluster.java      |  43 +++--
 .../integration/topologies/PulsarClusterSpec.java  |  17 ++
 .../containers/otel-collector-config.yaml          |  43 +++++
 .../src/test/resources/pulsar-metrics.xml          |  28 +++
 tests/integration/src/test/resources/pulsar.xml    |   1 +
 63 files changed, 1100 insertions(+), 76 deletions(-)

diff --git a/.github/workflows/pulsar-ci.yaml b/.github/workflows/pulsar-ci.yaml
index 7767beaa9aa..effeab90beb 100644
--- a/.github/workflows/pulsar-ci.yaml
+++ b/.github/workflows/pulsar-ci.yaml
@@ -589,6 +589,9 @@ jobs:
           - name: Transaction
             group: TRANSACTION
 
+          - name: Metrics
+            group: METRICS
+
     steps:
       - name: checkout
         uses: actions/checkout@v4
diff --git a/build/run_integration_group.sh b/build/run_integration_group.sh
index f20a7ad0793..2d82fce0887 100755
--- a/build/run_integration_group.sh
+++ b/build/run_integration_group.sh
@@ -181,6 +181,10 @@ test_group_transaction() {
   mvn_run_integration_test "$@" 
-DintegrationTestSuiteFile=pulsar-transaction.xml -DintegrationTests
 }
 
+test_group_metrics() {
+   mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-metrics.xml 
-DintegrationTests
+}
+
 test_group_tiered_filesystem() {
   mvn_run_integration_test "$@" 
-DintegrationTestSuiteFile=tiered-filesystem-storage.xml -DintegrationTests
 }
diff --git a/distribution/server/src/assemble/LICENSE.bin.txt 
b/distribution/server/src/assemble/LICENSE.bin.txt
index e3941c54a74..7c95811faf7 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -333,6 +333,13 @@ The Apache Software License, Version 2.0
     - io.prometheus-simpleclient_tracer_common-0.16.0.jar
     - io.prometheus-simpleclient_tracer_otel-0.16.0.jar
     - io.prometheus-simpleclient_tracer_otel_agent-0.16.0.jar
+ * Prometheus exporter
+    - io.prometheus-prometheus-metrics-config-1.1.0.jar
+    - io.prometheus-prometheus-metrics-exporter-common-1.1.0.jar
+    - io.prometheus-prometheus-metrics-exporter-httpserver-1.1.0.jar
+    - io.prometheus-prometheus-metrics-exposition-formats-1.1.0.jar
+    - io.prometheus-prometheus-metrics-model-1.1.0.jar
+    - io.prometheus-prometheus-metrics-shaded-protobuf-1.1.0.jar
  * Jakarta Bean Validation API
     - jakarta.validation-jakarta.validation-api-2.0.2.jar
     - javax.validation-validation-api-1.1.0.Final.jar
@@ -503,6 +510,27 @@ The Apache Software License, Version 2.0
   * RoaringBitmap
     - org.roaringbitmap-RoaringBitmap-0.9.44.jar
     - org.roaringbitmap-shims-0.9.44.jar
+  * OpenTelemetry
+    - io.opentelemetry-opentelemetry-api-1.34.1.jar
+    - io.opentelemetry-opentelemetry-api-events-1.34.1-alpha.jar
+    - io.opentelemetry-opentelemetry-context-1.34.1.jar
+    - io.opentelemetry-opentelemetry-exporter-common-1.34.1.jar
+    - io.opentelemetry-opentelemetry-exporter-otlp-1.34.1.jar
+    - io.opentelemetry-opentelemetry-exporter-otlp-common-1.34.1.jar
+    - io.opentelemetry-opentelemetry-exporter-prometheus-1.34.1-alpha.jar
+    - io.opentelemetry-opentelemetry-exporter-sender-okhttp-1.34.1.jar
+    - io.opentelemetry-opentelemetry-extension-incubator-1.34.1-alpha.jar
+    - io.opentelemetry-opentelemetry-sdk-1.34.1.jar
+    - io.opentelemetry-opentelemetry-sdk-common-1.34.1.jar
+    - io.opentelemetry-opentelemetry-sdk-extension-autoconfigure-1.34.1.jar
+    - io.opentelemetry-opentelemetry-sdk-extension-autoconfigure-spi-1.34.1.jar
+    - io.opentelemetry-opentelemetry-sdk-logs-1.34.1.jar
+    - io.opentelemetry-opentelemetry-sdk-metrics-1.34.1.jar
+    - io.opentelemetry-opentelemetry-sdk-trace-1.34.1.jar
+    - 
io.opentelemetry.instrumentation-opentelemetry-instrumentation-api-1.32.1.jar
+    - 
io.opentelemetry.instrumentation-opentelemetry-instrumentation-api-semconv-1.32.1-alpha.jar
+    - io.opentelemetry.instrumentation-opentelemetry-resources-1.32.1-alpha.jar
+    - io.opentelemetry.semconv-opentelemetry-semconv-1.23.1-alpha.jar
 
 BSD 3-clause "New" or "Revised" License
  * Google auth library
diff --git a/pom.xml b/pom.xml
index 4dfeb30821a..52a638ac09f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -249,6 +249,10 @@ flexible messaging model and an intuitive client 
API.</description>
     <disruptor.version>3.4.3</disruptor.version>
     <zstd-jni.version>1.5.2-3</zstd-jni.version>
     <netty-reactive-streams.version>2.0.6</netty-reactive-streams.version>
+    <opentelemetry.version>1.34.1</opentelemetry.version>
+    <opentelemetry.alpha.version>1.34.1-alpha</opentelemetry.alpha.version>
+    
<opentelemetry.instrumentation.version>1.32.1-alpha</opentelemetry.instrumentation.version>
+    <opentelemetry.semconv.version>1.23.1-alpha</opentelemetry.semconv.version>
 
     <!-- test dependencies -->
     <testcontainers.version>1.18.3</testcontainers.version>
@@ -1446,6 +1450,31 @@ flexible messaging model and an intuitive client 
API.</description>
         <version>${restassured.version}</version>
         <scope>test</scope>
       </dependency>
+
+      <dependency>
+        <groupId>io.opentelemetry</groupId>
+        <artifactId>opentelemetry-bom</artifactId>
+        <version>${opentelemetry.version}</version>
+        <type>pom</type>
+        <scope>import</scope>
+      </dependency>
+      <dependency>
+        <groupId>io.opentelemetry</groupId>
+        <artifactId>opentelemetry-bom-alpha</artifactId>
+        <version>${opentelemetry.alpha.version}</version>
+        <type>pom</type>
+        <scope>import</scope>
+      </dependency>
+      <dependency>
+        <groupId>io.opentelemetry.instrumentation</groupId>
+        <artifactId>opentelemetry-resources</artifactId>
+        <version>${opentelemetry.instrumentation.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>io.opentelemetry.semconv</groupId>
+        <artifactId>opentelemetry-semconv</artifactId>
+        <version>${opentelemetry.semconv.version}</version>
+      </dependency>
     </dependencies>
   </dependencyManagement>
 
@@ -2266,6 +2295,7 @@ flexible messaging model and an intuitive client 
API.</description>
         <module>pulsar-broker-auth-sasl</module>
         <module>pulsar-client-auth-sasl</module>
         <module>pulsar-config-validation</module>
+        <module>pulsar-opentelemetry</module>
 
         <module>structured-event-log</module>
 
@@ -2330,6 +2360,7 @@ flexible messaging model and an intuitive client 
API.</description>
         <module>pulsar-broker-auth-sasl</module>
         <module>pulsar-client-auth-sasl</module>
         <module>pulsar-config-validation</module>
+        <module>pulsar-opentelemetry</module>
 
         <!-- transaction related modules -->
         <module>pulsar-transaction</module>
diff --git 
a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
 
b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
index f6ad76a083b..a27384c9890 100644
--- 
a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
+++ 
b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
@@ -242,6 +242,7 @@ public class ProxySaslAuthenticationTest extends 
ProducerConsumerBase {
                proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
                proxyConfig.setSaslJaasClientAllowedIds(".*" + localHostname + 
".*");
                proxyConfig.setSaslJaasServerSectionName("PulsarProxy");
+               proxyConfig.setClusterName(configClusterName);
 
                // proxy connect to broker
                
proxyConfig.setBrokerClientAuthenticationPlugin(AuthenticationSasl.class.getName());
diff --git a/pulsar-broker-common/pom.xml b/pulsar-broker-common/pom.xml
index d73dba288a3..8e942c78d5b 100644
--- a/pulsar-broker-common/pom.xml
+++ b/pulsar-broker-common/pom.xml
@@ -82,10 +82,28 @@
       <artifactId>awaitility</artifactId>
       <scope>test</scope>
     </dependency>
+
+    <dependency>
+      <groupId>io.rest-assured</groupId>
+      <artifactId>rest-assured</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
     <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+
       <plugin>
         <groupId>org.gaul</groupId>
         <artifactId>modernizer-maven-plugin</artifactId>
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsClient.java
 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsClient.java
similarity index 100%
rename from 
pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsClient.java
rename to 
pulsar-broker-common/src/test/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsClient.java
diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index c39de184b05..18da38b43dc 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -143,6 +143,12 @@
       <version>${project.version}</version>
     </dependency>
 
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-opentelemetry</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
     <dependency>
       <groupId>${project.groupId}</groupId>
       <artifactId>pulsar-io-batch-discovery-triggerers</artifactId>
@@ -209,6 +215,14 @@
       <scope>test</scope>
     </dependency>
 
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-broker-common</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+
     <!-- functions related dependencies (end) -->
 
     <dependency>
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 054411c49f6..3701f354b62 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -108,6 +108,7 @@ import 
org.apache.pulsar.broker.service.TransactionBufferSnapshotServiceFactory;
 import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
 import org.apache.pulsar.broker.service.schema.SchemaStorageFactory;
 import org.apache.pulsar.broker.stats.MetricsGenerator;
+import org.apache.pulsar.broker.stats.PulsarBrokerOpenTelemetry;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
 import 
org.apache.pulsar.broker.stats.prometheus.PulsarPrometheusMetricsServlet;
 import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
@@ -248,6 +249,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
     private final Timer brokerClientSharedTimer;
 
     private MetricsGenerator metricsGenerator;
+    private PulsarBrokerOpenTelemetry openTelemetry;
 
     private TransactionMetadataStoreService transactionMetadataStoreService;
     private TransactionBufferProvider transactionBufferProvider;
@@ -461,6 +463,9 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
             }
 
             resetMetricsServlet();
+            if (openTelemetry != null) {
+                openTelemetry.close();
+            }
 
             if (this.compactionServiceFactory != null) {
                 try {
@@ -897,6 +902,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
             }
 
             this.metricsGenerator = new MetricsGenerator(this);
+            this.openTelemetry = new PulsarBrokerOpenTelemetry(config);
 
             // Initialize the message protocol handlers.
             // start the protocol handlers only after the broker is ready,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/PulsarBrokerOpenTelemetry.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/PulsarBrokerOpenTelemetry.java
new file mode 100644
index 00000000000..4b76b993001
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/PulsarBrokerOpenTelemetry.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats;
+
+import io.opentelemetry.api.metrics.Meter;
+import java.io.Closeable;
+import lombok.Getter;
+import org.apache.pulsar.PulsarVersion;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.opentelemetry.OpenTelemetryService;
+
+public class PulsarBrokerOpenTelemetry implements Closeable {
+
+    public static final String SERVICE_NAME = "pulsar-broker";
+    private final OpenTelemetryService openTelemetryService;
+
+    @Getter
+    private final Meter meter;
+
+    public PulsarBrokerOpenTelemetry(ServiceConfiguration config) {
+        openTelemetryService = OpenTelemetryService.builder()
+                .clusterName(config.getClusterName())
+                .serviceName(SERVICE_NAME)
+                .serviceVersion(PulsarVersion.getVersion())
+                .build();
+        meter = 
openTelemetryService.getOpenTelemetry().getMeter("org.apache.pulsar.broker");
+    }
+
+    @Override
+    public void close() {
+        openTelemetryService.close();
+    }
+}
diff --git a/pulsar-functions/worker/pom.xml b/pulsar-functions/worker/pom.xml
index cd89bacbf9e..bb93eeb98d7 100644
--- a/pulsar-functions/worker/pom.xml
+++ b/pulsar-functions/worker/pom.xml
@@ -46,6 +46,12 @@
       <version>${project.version}</version>
     </dependency>
 
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-opentelemetry</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
     <dependency>
       <groupId>${project.groupId}</groupId>
       <artifactId>pulsar-client-original</artifactId>
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerOpenTelemetry.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerOpenTelemetry.java
new file mode 100644
index 00000000000..be7c15dfd85
--- /dev/null
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerOpenTelemetry.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker;
+
+import io.opentelemetry.api.metrics.Meter;
+import java.io.Closeable;
+import lombok.Getter;
+import org.apache.pulsar.PulsarVersion;
+import org.apache.pulsar.opentelemetry.OpenTelemetryService;
+
+public class PulsarWorkerOpenTelemetry implements Closeable {
+
+    public static final String SERVICE_NAME = "pulsar-function-worker";
+    private final OpenTelemetryService openTelemetryService;
+
+    @Getter
+    private final Meter meter;
+
+    public PulsarWorkerOpenTelemetry(WorkerConfig workerConfig) {
+        openTelemetryService = OpenTelemetryService.builder()
+                .clusterName(workerConfig.getPulsarFunctionsCluster())
+                .serviceName(SERVICE_NAME)
+                .serviceVersion(PulsarVersion.getVersion())
+                .build();
+        meter = 
openTelemetryService.getOpenTelemetry().getMeter("org.apache.pulsar.function_worker");
+    }
+
+    @Override
+    public void close() {
+        openTelemetryService.close();
+    }
+}
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
index 16cf778e072..9f7d1996e0b 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
@@ -108,6 +108,7 @@ public class PulsarWorkerService implements WorkerService {
     private PulsarAdmin brokerAdmin;
     private PulsarAdmin functionAdmin;
     private MetricsGenerator metricsGenerator;
+    private PulsarWorkerOpenTelemetry openTelemetry;
     @VisibleForTesting
     private URI dlogUri;
     private LeaderService leaderService;
@@ -188,6 +189,7 @@ public class PulsarWorkerService implements WorkerService {
         this.statsUpdater = Executors
             .newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("worker-stats-updater"));
         this.metricsGenerator = new MetricsGenerator(this.statsUpdater, 
workerConfig);
+        this.openTelemetry = new PulsarWorkerOpenTelemetry(workerConfig);
         this.workerConfig = workerConfig;
         this.dlogUri = dlogUri;
         this.workerStatsManager = new WorkerStatsManager(workerConfig, 
runAsStandalone);
@@ -659,6 +661,10 @@ public class PulsarWorkerService implements WorkerService {
         if (null != stateStoreProvider) {
             stateStoreProvider.close();
         }
+
+        if (null != openTelemetry) {
+            openTelemetry.close();
+        }
     }
 
 }
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailerTest.java
index 022ebd6ba48..c78c68f8923 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailerTest.java
@@ -60,6 +60,8 @@ import org.testng.annotations.Test;
 @Slf4j
 public class FunctionAssignmentTailerTest {
 
+    private static final String CLUSTER_NAME = "test-cluster";
+
     @Test(timeOut = 10000)
     public void testErrorNotifier() throws Exception {
         WorkerConfig workerConfig = new WorkerConfig();
@@ -71,6 +73,7 @@ public class FunctionAssignmentTailerTest {
         workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
         workerConfig.setStateStorageServiceUrl("foo");
         workerConfig.setFunctionAssignmentTopicName("assignments");
+        workerConfig.setPulsarFunctionsCluster(CLUSTER_NAME);
 
         Function.FunctionMetaData function1 = 
Function.FunctionMetaData.newBuilder().setFunctionDetails(
                 Function.FunctionDetails.newBuilder()
@@ -183,6 +186,7 @@ public class FunctionAssignmentTailerTest {
         workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
         workerConfig.setStateStorageServiceUrl("foo");
         workerConfig.setFunctionAssignmentTopicName("assignments");
+        workerConfig.setPulsarFunctionsCluster(CLUSTER_NAME);
 
         Function.FunctionMetaData function1 = 
Function.FunctionMetaData.newBuilder().setFunctionDetails(
                 Function.FunctionDetails.newBuilder()
@@ -307,6 +311,7 @@ public class FunctionAssignmentTailerTest {
         workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
         workerConfig.setStateStorageServiceUrl("foo");
         workerConfig.setFunctionAssignmentTopicName("assignments");
+        workerConfig.setPulsarFunctionsCluster(CLUSTER_NAME);
 
         Function.FunctionMetaData function1 = 
Function.FunctionMetaData.newBuilder().setFunctionDetails(
                 Function.FunctionDetails.newBuilder()
diff --git a/pulsar-broker-common/pom.xml b/pulsar-opentelemetry/pom.xml
similarity index 59%
copy from pulsar-broker-common/pom.xml
copy to pulsar-opentelemetry/pom.xml
index d73dba288a3..82a9658cc9d 100644
--- a/pulsar-broker-common/pom.xml
+++ b/pulsar-opentelemetry/pom.xml
@@ -20,8 +20,8 @@
 
 -->
 <project
-  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";
-  xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";>
+        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";
+        xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";>
   <modelVersion>4.0.0</modelVersion>
   <parent>
     <groupId>org.apache.pulsar</groupId>
@@ -29,51 +29,58 @@
     <version>3.3.0-SNAPSHOT</version>
   </parent>
 
-  <artifactId>pulsar-broker-common</artifactId>
-  <description>Common classes used in multiple broker modules</description>
+  <artifactId>pulsar-opentelemetry</artifactId>
+  <description>OpenTelemetry Integration</description>
 
   <dependencies>
+    <!-- OpenTelemetry dependencies -->
     <dependency>
-      <groupId>${project.groupId}</groupId>
-      <artifactId>pulsar-metadata</artifactId>
-      <version>${project.version}</version>
+      <groupId>io.opentelemetry</groupId>
+      <artifactId>opentelemetry-exporter-otlp</artifactId>
     </dependency>
-
     <dependency>
-      <groupId>com.google.guava</groupId>
-      <artifactId>guava</artifactId>
+      <groupId>io.opentelemetry</groupId>
+      <artifactId>opentelemetry-exporter-prometheus</artifactId>
     </dependency>
-
     <dependency>
-      <groupId>io.prometheus</groupId>
-      <artifactId>simpleclient_jetty</artifactId>
+      <groupId>io.opentelemetry</groupId>
+      <artifactId>opentelemetry-sdk</artifactId>
     </dependency>
-
     <dependency>
-      <groupId>javax.servlet</groupId>
-      <artifactId>javax.servlet-api</artifactId>
+      <groupId>io.opentelemetry</groupId>
+      <artifactId>opentelemetry-sdk-extension-autoconfigure</artifactId>
     </dependency>
-
     <dependency>
-      <groupId>javax.ws.rs</groupId>
-      <artifactId>javax.ws.rs-api</artifactId>
+      <groupId>io.opentelemetry.instrumentation</groupId>
+      <artifactId>opentelemetry-resources</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>io.opentelemetry.semconv</groupId>
+      <artifactId>opentelemetry-semconv</artifactId>
     </dependency>
 
     <dependency>
-      <groupId>io.jsonwebtoken</groupId>
-      <artifactId>jjwt-impl</artifactId>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
     </dependency>
 
     <dependency>
-      <groupId>io.jsonwebtoken</groupId>
-      <artifactId>jjwt-jackson</artifactId>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
     </dependency>
 
     <!-- test -->
     <dependency>
-      <groupId>org.bouncycastle</groupId>
-      <artifactId>bc-fips</artifactId>
-      <version>${bouncycastle.bc-fips.version}</version>
+      <groupId>org.apache.pulsar</groupId>
+      <artifactId>pulsar-broker-common</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>io.rest-assured</groupId>
+      <artifactId>rest-assured</artifactId>
       <scope>test</scope>
     </dependency>
 
@@ -82,6 +89,12 @@
       <artifactId>awaitility</artifactId>
       <scope>test</scope>
     </dependency>
+
+    <dependency>
+      <groupId>io.opentelemetry</groupId>
+      <artifactId>opentelemetry-sdk-testing</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
@@ -117,29 +130,6 @@
           </execution>
         </executions>
       </plugin>
-
-      <plugin>
-        <artifactId>maven-resources-plugin</artifactId>
-        <executions>
-          <execution>
-            <id>copy-resources</id>
-            <phase>test-compile</phase>
-            <goals>
-              <goal>copy-resources</goal>
-            </goals>
-            <configuration>
-              
<outputDirectory>${project.build.testOutputDirectory}/certificate-authority</outputDirectory>
-              <overwrite>true</overwrite>
-              <resources>
-                <resource>
-                  
<directory>${project.parent.basedir}/tests/certificate-authority</directory>
-                  <filtering>false</filtering>
-                </resource>
-              </resources>
-            </configuration>
-          </execution>
-        </executions>
-      </plugin>
     </plugins>
   </build>
 </project>
diff --git 
a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
 
b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
new file mode 100644
index 00000000000..bdb002cb359
--- /dev/null
+++ 
b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.opentelemetry;
+
+import io.opentelemetry.api.common.AttributeKey;
+
+/**
+ * Common OpenTelemetry attributes to be used by Pulsar components.
+ */
+public interface OpenTelemetryAttributes {
+    /**
+     * The name of the Pulsar cluster. This attribute is automatically added 
to all signals by
+     * {@link OpenTelemetryService}.
+     */
+    AttributeKey<String> PULSAR_CLUSTER = 
AttributeKey.stringKey("pulsar.cluster");
+}
diff --git 
a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryService.java
 
b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryService.java
new file mode 100644
index 00000000000..5ead1ff265c
--- /dev/null
+++ 
b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryService.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.opentelemetry;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import com.google.common.annotations.VisibleForTesting;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk;
+import 
io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdkBuilder;
+import io.opentelemetry.sdk.resources.Resource;
+import io.opentelemetry.semconv.ResourceAttributes;
+import java.io.Closeable;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Consumer;
+import lombok.Builder;
+import org.apache.commons.lang3.StringUtils;
+
+/**
+ * Provides a common OpenTelemetry service for Pulsar components to use. 
Responsible for instantiating the OpenTelemetry
+ * SDK with a set of override properties. Once initialized, furnishes access 
to OpenTelemetry.
+ */
+public class OpenTelemetryService implements Closeable {
+
+    static final String OTEL_SDK_DISABLED_KEY = "otel.sdk.disabled";
+    static final int MAX_CARDINALITY_LIMIT = 10000;
+
+    private final OpenTelemetrySdk openTelemetrySdk;
+
+    /**
+     * Instantiates the OpenTelemetry SDK. All attributes are overridden by 
system properties or environment
+     * variables.
+     *
+     * @param clusterName
+     *      The name of the Pulsar cluster. Cannot be null or blank.
+     * @param serviceName
+     *      The name of the service. Optional.
+     * @param serviceVersion
+     *      The version of the service. Optional.
+     * @param sdkBuilderConsumer
+     *      Allows customizing the SDK builder; for testing purposes only.
+     */
+    @Builder
+    public OpenTelemetryService(String clusterName,
+                                String serviceName,
+                                String serviceVersion,
+                                @VisibleForTesting 
Consumer<AutoConfiguredOpenTelemetrySdkBuilder> sdkBuilderConsumer) {
+        checkArgument(StringUtils.isNotBlank(clusterName), "Cluster name 
cannot be empty");
+        var sdkBuilder = AutoConfiguredOpenTelemetrySdk.builder();
+
+        sdkBuilder.addPropertiesSupplier(() -> Map.of(
+                OTEL_SDK_DISABLED_KEY, "true",
+                // Cardinality limit includes the overflow attribute set, so 
we need to add 1.
+                "otel.experimental.metrics.cardinality.limit", 
Integer.toString(MAX_CARDINALITY_LIMIT + 1)
+        ));
+
+        sdkBuilder.addResourceCustomizer(
+                (resource, __) -> {
+                    var resourceBuilder = Resource.builder();
+                    // Do not override attributes if already set (via system 
properties or environment variables).
+                    if 
(resource.getAttribute(OpenTelemetryAttributes.PULSAR_CLUSTER) == null) {
+                        
resourceBuilder.put(OpenTelemetryAttributes.PULSAR_CLUSTER, clusterName);
+                    }
+                    if (StringUtils.isNotBlank(serviceName)
+                            && 
Objects.equals(Resource.getDefault().getAttribute(ResourceAttributes.SERVICE_NAME),
+                                              
resource.getAttribute(ResourceAttributes.SERVICE_NAME))) {
+                        resourceBuilder.put(ResourceAttributes.SERVICE_NAME, 
serviceName);
+                    }
+                    if (StringUtils.isNotBlank(serviceVersion)
+                            && 
resource.getAttribute(ResourceAttributes.SERVICE_VERSION) == null) {
+                        
resourceBuilder.put(ResourceAttributes.SERVICE_VERSION, serviceVersion);
+                    }
+                    return resource.merge(resourceBuilder.build());
+                });
+
+        if (sdkBuilderConsumer != null) {
+            sdkBuilderConsumer.accept(sdkBuilder);
+        }
+
+        openTelemetrySdk = sdkBuilder.build().getOpenTelemetrySdk();
+    }
+
+    public OpenTelemetry getOpenTelemetry() {
+        return openTelemetrySdk;
+    }
+
+    @Override
+    public void close() {
+        openTelemetrySdk.close();
+    }
+}
diff --git 
a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/package-info.java
 
b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/package-info.java
new file mode 100644
index 00000000000..9a7426aa047
--- /dev/null
+++ 
b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/package-info.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Provides a wrapper layer for the OpenTelemetry API to be used in Pulsar.
+ * @since 3.3.0
+ */
+package org.apache.pulsar.opentelemetry;
\ No newline at end of file
diff --git 
a/pulsar-opentelemetry/src/test/java/org/apache/pulsar/opentelemetry/OpenTelemetryServiceTest.java
 
b/pulsar-opentelemetry/src/test/java/org/apache/pulsar/opentelemetry/OpenTelemetryServiceTest.java
new file mode 100644
index 00000000000..e5c893794a0
--- /dev/null
+++ 
b/pulsar-opentelemetry/src/test/java/org/apache/pulsar/opentelemetry/OpenTelemetryServiceTest.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.opentelemetry;
+
+import static 
io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
+import static 
io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.LongCounterBuilder;
+import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.instrumentation.resources.JarServiceNameDetector;
+import 
io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdkBuilder;
+import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
+import io.opentelemetry.sdk.metrics.export.MetricReader;
+import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
+import io.opentelemetry.semconv.ResourceAttributes;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import lombok.Cleanup;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient;
+import org.assertj.core.api.AbstractCharSequenceAssert;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class OpenTelemetryServiceTest {
+
+    private OpenTelemetryService openTelemetryService;
+    private InMemoryMetricReader reader;
+    private Meter meter;
+
+    @BeforeMethod
+    public void setup() throws Exception {
+        reader = InMemoryMetricReader.create();
+        openTelemetryService = OpenTelemetryService.builder().
+                sdkBuilderConsumer(getSdkBuilderConsumer(reader,
+                        Map.of(OpenTelemetryService.OTEL_SDK_DISABLED_KEY, 
"false"))).
+                clusterName("openTelemetryServiceTestCluster").
+                build();
+        meter = 
openTelemetryService.getOpenTelemetry().getMeter("openTelemetryServiceTestInstrument");
+    }
+
+    @AfterMethod
+    public void teardown() throws Exception {
+        openTelemetryService.close();
+        reader.close();
+    }
+
+    // Customizes the SDK builder to include the MetricReader and extra 
properties for testing purposes.
+    private static Consumer<AutoConfiguredOpenTelemetrySdkBuilder> 
getSdkBuilderConsumer(MetricReader extraReader,
+                                                                               
  Map<String, String> extraProperties) {
+        return autoConfigurationCustomizer -> {
+            if (extraReader != null) {
+                autoConfigurationCustomizer.addMeterProviderCustomizer(
+                        (sdkMeterProviderBuilder, __) -> 
sdkMeterProviderBuilder.registerMetricReader(extraReader));
+            }
+            autoConfigurationCustomizer.addPropertiesSupplier(() -> 
extraProperties);
+        };
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testClusterNameCannotBeNull() {
+        @Cleanup
+        var ots = OpenTelemetryService.builder().build();
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testClusterNameCannotBeEmpty() {
+        @Cleanup
+        var ots = 
OpenTelemetryService.builder().clusterName(StringUtils.EMPTY).build();
+    }
+
+    @Test
+    public void testResourceAttributesAreSet() throws Exception {
+        @Cleanup
+        var reader = InMemoryMetricReader.create();
+
+        @Cleanup
+        var ots = OpenTelemetryService.builder().
+                sdkBuilderConsumer(getSdkBuilderConsumer(reader,
+                        Map.of(OpenTelemetryService.OTEL_SDK_DISABLED_KEY, 
"false",
+                               "otel.java.disabled.resource.providers", 
JarServiceNameDetector.class.getName()))).
+                clusterName("testServiceNameAndVersion").
+                serviceName("openTelemetryServiceTestService").
+                serviceVersion("1.0.0").
+                build();
+
+        assertThat(reader.collectAllMetrics())
+            .allSatisfy(metric -> assertThat(metric)
+                .hasResourceSatisfying(resource -> resource
+                    .hasAttribute(OpenTelemetryAttributes.PULSAR_CLUSTER, 
"testServiceNameAndVersion")
+                    .hasAttribute(ResourceAttributes.SERVICE_NAME, 
"openTelemetryServiceTestService")
+                    .hasAttribute(ResourceAttributes.SERVICE_VERSION, "1.0.0")
+                    .hasAttribute(satisfies(ResourceAttributes.HOST_NAME, 
AbstractCharSequenceAssert::isNotBlank))));
+    }
+
+    @Test
+    public void testIsInstrumentationNameSetOnMeter() {
+        var meter = 
openTelemetryService.getOpenTelemetry().getMeter("testInstrumentationScope");
+        meter.counterBuilder("dummyCounter").build().add(1);
+        assertThat(reader.collectAllMetrics())
+            .anySatisfy(metricData -> assertThat(metricData)
+                
.hasInstrumentationScope(InstrumentationScopeInfo.create("testInstrumentationScope")));
+    }
+
+    @Test
+    public void testMetricCardinalityIsSet() {
+        var prometheusExporterPort = 9464;
+        @Cleanup
+        var ots = OpenTelemetryService.builder().
+                sdkBuilderConsumer(getSdkBuilderConsumer(null,
+                        Map.of(OpenTelemetryService.OTEL_SDK_DISABLED_KEY, 
"false",
+                        "otel.metrics.exporter", "prometheus",
+                        "otel.exporter.prometheus.port", 
Integer.toString(prometheusExporterPort)))).
+                clusterName("openTelemetryServiceCardinalityTestCluster").
+                build();
+        var meter = 
ots.getOpenTelemetry().getMeter("openTelemetryMetricCardinalityTest");
+        var counter = meter.counterBuilder("dummyCounter").build();
+        for (int i = 0; i < OpenTelemetryService.MAX_CARDINALITY_LIMIT + 100; 
i++) {
+            counter.add(1, Attributes.of(AttributeKey.stringKey("attribute"), 
"value" + i));
+        }
+
+        Awaitility.waitAtMost(30, 
TimeUnit.SECONDS).ignoreExceptions().until(() -> {
+            var client = new PrometheusMetricsClient("localhost", 
prometheusExporterPort);
+            var allMetrics = client.getMetrics();
+            var actualMetrics = 
allMetrics.findByNameAndLabels("dummyCounter_total");
+            var overflowMetric = 
allMetrics.findByNameAndLabels("dummyCounter_total", "otel_metric_overflow", 
"true");
+            return actualMetrics.size() == 
OpenTelemetryService.MAX_CARDINALITY_LIMIT + 1 && overflowMetric.size() == 1;
+        });
+    }
+
+    @Test
+    public void testLongCounter() {
+        var longCounter = meter.counterBuilder("dummyLongCounter").build();
+        var attributes = Attributes.of(AttributeKey.stringKey("dummyAttr"), 
"dummyValue");
+        longCounter.add(1, attributes);
+        longCounter.add(2, attributes);
+
+        assertThat(reader.collectAllMetrics())
+            .anySatisfy(metric -> assertThat(metric)
+                .hasName("dummyLongCounter")
+                .hasLongSumSatisfying(sum -> sum
+                    .hasPointsSatisfying(point -> point
+                        .hasAttributes(attributes)
+                        .hasValue(3))));
+    }
+
+    @Test
+    public void testServiceIsDisabledByDefault() throws Exception {
+        @Cleanup
+        var metricReader = InMemoryMetricReader.create();
+
+        @Cleanup
+        var ots = OpenTelemetryService.builder().
+                sdkBuilderConsumer(getSdkBuilderConsumer(metricReader, 
Map.of())).
+                clusterName("openTelemetryServiceTestCluster").
+                build();
+        var meter = 
ots.getOpenTelemetry().getMeter("openTelemetryServiceTestInstrument");
+
+        var builders = List.of(
+                meter.counterBuilder("dummyCounterA"),
+                meter.counterBuilder("dummyCounterB").setDescription("desc"),
+                
meter.counterBuilder("dummyCounterC").setDescription("desc").setUnit("unit"),
+                meter.counterBuilder("dummyCounterD").setUnit("unit")
+        );
+
+        var callback = new AtomicBoolean();
+        // Validate that no matter how the counters are being built, they are 
all backed by the same underlying object.
+        // This ensures we conserve memory when the SDK is disabled.
+        
assertThat(builders.stream().map(LongCounterBuilder::build).distinct()).hasSize(1);
+        
assertThat(builders.stream().map(LongCounterBuilder::buildObserver).distinct()).hasSize(1);
+        assertThat(builders.stream().map(b -> b.buildWithCallback(__ -> 
callback.set(true))).distinct()).hasSize(1);
+
+        // Validate that no metrics are being emitted at all.
+        assertThat(metricReader.collectAllMetrics()).isEmpty();
+
+        // Validate that the callback has not being called.
+        assertThat(callback).isFalse();
+    }
+}
diff --git a/pulsar-proxy/pom.xml b/pulsar-proxy/pom.xml
index 8fb1313f9ce..55dfd11e40e 100644
--- a/pulsar-proxy/pom.xml
+++ b/pulsar-proxy/pom.xml
@@ -49,6 +49,12 @@
       <version>${project.version}</version>
     </dependency>
 
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-opentelemetry</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
     <dependency>
       <groupId>${project.groupId}</groupId>
       <artifactId>pulsar-docs-tools</artifactId>
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index 719c7c2cbda..61b00871cec 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -73,6 +73,7 @@ import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.apache.pulsar.proxy.extensions.ProxyExtensions;
+import org.apache.pulsar.proxy.stats.PulsarProxyOpenTelemetry;
 import org.apache.pulsar.proxy.stats.TopicStats;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -148,6 +149,8 @@ public class ProxyService implements Closeable {
 
     private PrometheusMetricsServlet metricsServlet;
     private List<PrometheusRawMetricsProvider> pendingMetricsProviders;
+    @Getter
+    private PulsarProxyOpenTelemetry openTelemetry;
 
     @Getter
     private final ConnectionController connectionController;
@@ -284,6 +287,7 @@ public class ProxyService implements Closeable {
         }
 
         createMetricsServlet();
+        openTelemetry = new PulsarProxyOpenTelemetry(proxyConfig);
 
         // Initialize the message protocol handlers.
         // start the protocol handlers only after the broker is ready,
@@ -399,6 +403,9 @@ public class ProxyService implements Closeable {
             proxyAdditionalServlets = null;
         }
 
+        if (openTelemetry != null) {
+            openTelemetry.close();
+        }
         resetMetricsServlet();
 
         if (localMetadataStore != null) {
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/PulsarProxyOpenTelemetry.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/PulsarProxyOpenTelemetry.java
new file mode 100644
index 00000000000..14bbc649466
--- /dev/null
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/PulsarProxyOpenTelemetry.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.stats;
+
+import io.opentelemetry.api.metrics.Meter;
+import java.io.Closeable;
+import lombok.Getter;
+import org.apache.pulsar.PulsarVersion;
+import org.apache.pulsar.opentelemetry.OpenTelemetryService;
+import org.apache.pulsar.proxy.server.ProxyConfiguration;
+
+public class PulsarProxyOpenTelemetry implements Closeable {
+
+    public static final String SERVICE_NAME = "pulsar-proxy";
+    private final OpenTelemetryService openTelemetryService;
+
+    @Getter
+    private final Meter meter;
+
+    public PulsarProxyOpenTelemetry(ProxyConfiguration config) {
+        openTelemetryService = OpenTelemetryService.builder()
+                .clusterName(config.getClusterName())
+                .serviceName(SERVICE_NAME)
+                .serviceVersion(PulsarVersion.getVersion())
+                .build();
+        meter = 
openTelemetryService.getOpenTelemetry().getMeter("org.apache.pulsar.proxy");
+    }
+
+    @Override
+    public void close() {
+        openTelemetryService.close();
+    }
+}
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java
index fde7c938d0a..f9ace716ecd 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java
@@ -140,6 +140,7 @@ public abstract class SimpleProxyExtensionTestBase extends 
MockedPulsarServiceBa
         proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setMetadataStoreUrl(DUMMY_VALUE);
         proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
+        proxyConfig.setClusterName(configClusterName);
 
         proxyService = Mockito.spy(new ProxyService(proxyConfig, new 
AuthenticationService(
                 PulsarConfigurationLoader.convertFrom(proxyConfig))));
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerKeystoreTLSTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerKeystoreTLSTest.java
index bc2029861f4..92c644b470d 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerKeystoreTLSTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerKeystoreTLSTest.java
@@ -101,6 +101,7 @@ public class AdminProxyHandlerKeystoreTLSTest extends 
MockedPulsarServiceBaseTes
         
proxyConfig.setBrokerClientAuthenticationPlugin(AuthenticationKeyStoreTls.class.getName());
         
proxyConfig.setBrokerClientAuthenticationParameters(String.format("keyStoreType:%s,keyStorePath:%s,keyStorePassword:%s",
                 KEYSTORE_TYPE, BROKER_KEYSTORE_FILE_PATH, BROKER_KEYSTORE_PW));
+        proxyConfig.setClusterName(configClusterName);
 
         resource = new PulsarResources(registerCloseable(new 
ZKMetadataStore(mockZooKeeper)),
                 registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal)));
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java
index d83de9652cf..ef58648e35a 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java
@@ -85,6 +85,7 @@ public class AuthedAdminProxyHandlerTest extends 
MockedPulsarServiceBaseTest {
         proxyConfig.setWebServicePortTls(Optional.of(0));
         proxyConfig.setTlsEnabledWithBroker(true);
         proxyConfig.setHttpMaxRequestHeaderSize(20000);
+        proxyConfig.setClusterName(configClusterName);
 
         // enable tls and auth&auth at proxy
         proxyConfig.setTlsCertificateFilePath(PROXY_CERT_FILE_PATH);
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java
index 9f8efa1ec79..f61a73bbf91 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java
@@ -78,6 +78,7 @@ public class ProxyAdditionalServletTest extends 
MockedPulsarServiceBaseTest {
         proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
         // enable full parsing feature
         proxyConfig.setProxyLogLevel(Optional.of(2));
+        proxyConfig.setClusterName(configClusterName);
 
         // this is for nar package test
 //        addServletNar();
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
index 1c93cb20c70..4083c984d98 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
@@ -137,6 +137,7 @@ public class ProxyAuthenticatedProducerConsumerTest extends 
ProducerConsumerBase
 
         proxyConfig.setMetadataStoreUrl(DUMMY_VALUE);
         proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
+        proxyConfig.setClusterName(configClusterName);
 
         proxyService = Mockito.spy(new ProxyService(proxyConfig, new 
AuthenticationService(
                                                             
PulsarConfigurationLoader.convertFrom(proxyConfig))));
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
index fec0673ff9b..662b8305c0e 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
@@ -58,6 +58,7 @@ import org.testng.annotations.Test;
 
 public class ProxyAuthenticationTest extends ProducerConsumerBase {
        private static final Logger log = 
LoggerFactory.getLogger(ProxyAuthenticationTest.class);
+       private static final String CLUSTER_NAME = "test";
 
        public static class BasicAuthenticationData implements 
AuthenticationDataProvider {
                private final String authParam;
@@ -178,7 +179,7 @@ public class ProxyAuthenticationTest extends 
ProducerConsumerBase {
                providers.add(BasicAuthenticationProvider.class.getName());
                conf.setAuthenticationProviders(providers);
 
-               conf.setClusterName("test");
+               conf.setClusterName(CLUSTER_NAME);
                Set<String> proxyRoles = new HashSet<>();
                proxyRoles.add("proxy");
                conf.setProxyRoles(proxyRoles);
@@ -222,6 +223,7 @@ public class ProxyAuthenticationTest extends 
ProducerConsumerBase {
                proxyConfig.setBrokerProxyAllowedTargetPorts("*");
                proxyConfig.setWebServicePort(Optional.of(0));
                proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
+               proxyConfig.setClusterName(CLUSTER_NAME);
 
                
proxyConfig.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName());
                
proxyConfig.setBrokerClientAuthenticationParameters(proxyAuthParams);
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
index a070d1e84d3..78ab9bd0d95 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
@@ -59,6 +59,7 @@ public class ProxyConnectionThrottlingTest extends 
MockedPulsarServiceBaseTest {
         proxyConfig.setMaxConcurrentLookupRequests(NUM_CONCURRENT_LOOKUP);
         
proxyConfig.setMaxConcurrentInboundConnections(NUM_CONCURRENT_INBOUND_CONNECTION);
         
proxyConfig.setMaxConcurrentInboundConnectionsPerIp(NUM_CONCURRENT_INBOUND_CONNECTION);
+        proxyConfig.setClusterName(configClusterName);
         proxyService = Mockito.spy(new ProxyService(proxyConfig, new 
AuthenticationService(
                 PulsarConfigurationLoader.convertFrom(proxyConfig))));
         doReturn(registerCloseable(new 
ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java
index 5704ba55fed..413774daf2c 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java
@@ -60,6 +60,7 @@ public class ProxyEnableHAProxyProtocolTest extends 
MockedPulsarServiceBaseTest
         proxyConfig.setMetadataStoreUrl(DUMMY_VALUE);
         proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
         proxyConfig.setHaProxyProtocolEnabled(true);
+        proxyConfig.setClusterName(configClusterName);
 
         proxyService = Mockito.spy(new ProxyService(proxyConfig, new 
AuthenticationService(
                 PulsarConfigurationLoader.convertFrom(proxyConfig))));
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
index 477fe597f26..5e969ca26e4 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
@@ -46,6 +46,7 @@ import org.testng.annotations.Test;
 
 public class ProxyForwardAuthDataTest extends ProducerConsumerBase {
     private static final Logger log = 
LoggerFactory.getLogger(ProxyForwardAuthDataTest.class);
+    private static final String CLUSTER_NAME = "test";
 
     @BeforeMethod
     @Override
@@ -64,7 +65,7 @@ public class ProxyForwardAuthDataTest extends 
ProducerConsumerBase {
         providers.add(BasicAuthenticationProvider.class.getName());
         conf.setAuthenticationProviders(providers);
 
-        conf.setClusterName("test");
+        conf.setClusterName(CLUSTER_NAME);
         Set<String> proxyRoles = new HashSet<String>();
         proxyRoles.add("proxy");
         conf.setProxyRoles(proxyRoles);
@@ -109,6 +110,7 @@ public class ProxyForwardAuthDataTest extends 
ProducerConsumerBase {
         proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
         
proxyConfig.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName());
         proxyConfig.setBrokerClientAuthenticationParameters(proxyAuthParams);
+        proxyConfig.setClusterName(CLUSTER_NAME);
 
         Set<String> providers = new HashSet<>();
         providers.add(BasicAuthenticationProvider.class.getName());
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTransportTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTransportTest.java
index 5ee03395b80..5671c527f68 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTransportTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTransportTest.java
@@ -77,6 +77,7 @@ public class ProxyKeyStoreTlsTransportTest extends 
MockedPulsarServiceBaseTest {
 
         proxyConfig.setMetadataStoreUrl(DUMMY_VALUE);
         proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
+        proxyConfig.setClusterName(configClusterName);
 
         proxyConfig.setTlsRequireTrustedClientCertOnConnect(false);
 
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsWithAuthTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsWithAuthTest.java
index 1f21281a6f6..99fb8c03a81 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsWithAuthTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsWithAuthTest.java
@@ -77,6 +77,7 @@ public class ProxyKeyStoreTlsWithAuthTest extends 
MockedPulsarServiceBaseTest {
 
         proxyConfig.setMetadataStoreUrl(DUMMY_VALUE);
         proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
+        proxyConfig.setClusterName(configClusterName);
 
         // config for authentication and authorization.
         proxyConfig.setTlsRequireTrustedClientCertOnConnect(true);
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsWithoutAuthTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsWithoutAuthTest.java
index d7935755ce0..1dcebda7935 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsWithoutAuthTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsWithoutAuthTest.java
@@ -74,6 +74,7 @@ public class ProxyKeyStoreTlsWithoutAuthTest extends 
MockedPulsarServiceBaseTest
 
         proxyConfig.setMetadataStoreUrl(DUMMY_VALUE);
         proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
+        proxyConfig.setClusterName(configClusterName);
 
         proxyService = Mockito.spy(new ProxyService(proxyConfig, new 
AuthenticationService(
                                                             
PulsarConfigurationLoader.convertFrom(proxyConfig))));
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
index 167c3b19646..a9017404d0e 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
@@ -65,6 +65,7 @@ public class ProxyLookupThrottlingTest extends 
MockedPulsarServiceBaseTest {
         proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
         proxyConfig.setMaxConcurrentLookupRequests(NUM_CONCURRENT_LOOKUP);
         
proxyConfig.setMaxConcurrentInboundConnections(NUM_CONCURRENT_INBOUND_CONNECTION);
+        proxyConfig.setClusterName(configClusterName);
 
         AuthenticationService authenticationService = new 
AuthenticationService(
                 PulsarConfigurationLoader.convertFrom(proxyConfig));
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyMutualTlsTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyMutualTlsTest.java
index 08066f2e5bf..fae44c00ada 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyMutualTlsTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyMutualTlsTest.java
@@ -66,6 +66,7 @@ public class ProxyMutualTlsTest extends 
MockedPulsarServiceBaseTest {
         proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
         proxyConfig.setTlsRequireTrustedClientCertOnConnect(true);
         proxyConfig.setTlsAllowInsecureConnection(false);
+        proxyConfig.setClusterName(configClusterName);
 
         proxyService = Mockito.spy(new ProxyService(proxyConfig, new 
AuthenticationService(
                                                             
PulsarConfigurationLoader.convertFrom(proxyConfig))));
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java
index 0d93185f5e8..3f58250e6d6 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java
@@ -71,6 +71,7 @@ public class ProxyParserTest extends 
MockedPulsarServiceBaseTest {
         proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setMetadataStoreUrl(DUMMY_VALUE);
         proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
+        proxyConfig.setClusterName(configClusterName);
         //enable full parsing feature
         proxyConfig.setProxyLogLevel(Optional.ofNullable(2));
 
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java
index 6beed27cb66..d06cf4201ff 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java
@@ -52,6 +52,7 @@ import org.testng.annotations.Test;
 
 @Slf4j
 public class ProxyRefreshAuthTest extends ProducerConsumerBase {
+    private static final String CLUSTER_NAME = "proxy-authorization";
     private final SecretKey SECRET_KEY = 
AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
 
     private ProxyService proxyService;
@@ -84,7 +85,7 @@ public class ProxyRefreshAuthTest extends 
ProducerConsumerBase {
         properties.setProperty("tokenAllowedClockSkewSeconds", "2");
         conf.setProperties(properties);
 
-        conf.setClusterName("proxy-authorization");
+        conf.setClusterName(CLUSTER_NAME);
         conf.setNumExecutorThreadPoolSize(5);
 
         conf.setAuthenticationRefreshCheckSeconds(1);
@@ -116,6 +117,7 @@ public class ProxyRefreshAuthTest extends 
ProducerConsumerBase {
         proxyConfig.setServicePort(Optional.of(0));
         proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setWebServicePort(Optional.of(0));
+        proxyConfig.setClusterName(CLUSTER_NAME);
 
         
proxyConfig.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
         proxyConfig.setBrokerClientAuthenticationParameters(
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
index 137ea829515..a1ffc13ee93 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
@@ -49,6 +49,7 @@ import org.testng.annotations.Test;
 
 public class ProxyRolesEnforcementTest extends ProducerConsumerBase {
     private static final Logger log = 
LoggerFactory.getLogger(ProxyRolesEnforcementTest.class);
+    private static final String CLUSTER_NAME = "test";
 
     public static class BasicAuthenticationData implements 
AuthenticationDataProvider {
         private final String authParam;
@@ -154,7 +155,7 @@ public class ProxyRolesEnforcementTest extends 
ProducerConsumerBase {
         providers.add(BasicAuthenticationProvider.class.getName());
         conf.setAuthenticationProviders(providers);
 
-        conf.setClusterName("test");
+        conf.setClusterName(CLUSTER_NAME);
         Set<String> proxyRoles = new HashSet<>();
         proxyRoles.add("proxy");
         conf.setProxyRoles(proxyRoles);
@@ -209,6 +210,7 @@ public class ProxyRolesEnforcementTest extends 
ProducerConsumerBase {
         proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setWebServicePort(Optional.of(0));
         proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
+        proxyConfig.setClusterName(CLUSTER_NAME);
 
         
proxyConfig.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName());
         proxyConfig.setBrokerClientAuthenticationParameters(proxyAuthParams);
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterDisableZeroCopyTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterDisableZeroCopyTest.java
index 0c9fa5c7ac3..3e598a57277 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterDisableZeroCopyTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterDisableZeroCopyTest.java
@@ -21,7 +21,7 @@ package org.apache.pulsar.proxy.server;
 import java.util.Optional;
 import org.testng.annotations.BeforeClass;
 
-public class ProxyServiceStarterDisableZeroCopyTest extends 
ProxyServiceStarterTest{
+public class ProxyServiceStarterDisableZeroCopyTest extends 
ProxyServiceStarterTest {
 
     @Override
     @BeforeClass
@@ -35,6 +35,7 @@ public class ProxyServiceStarterDisableZeroCopyTest extends 
ProxyServiceStarterT
         serviceStarter.getConfig().setWebSocketServiceEnabled(true);
         serviceStarter.getConfig().setBrokerProxyAllowedTargetPorts("*");
         serviceStarter.getConfig().setProxyZeroCopyModeEnabled(false);
+        serviceStarter.getConfig().setClusterName(configClusterName);
         serviceStarter.start();
         serviceUrl = serviceStarter.getProxyService().getServiceUrl();
     }
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
index 71b1087ee64..f2632861253 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
@@ -61,6 +61,7 @@ public class ProxyServiceStarterTest extends 
MockedPulsarServiceBaseTest {
         serviceStarter.getConfig().setServicePort(Optional.of(0));
         serviceStarter.getConfig().setWebSocketServiceEnabled(true);
         serviceStarter.getConfig().setBrokerProxyAllowedTargetPorts("*");
+        serviceStarter.getConfig().setClusterName(configClusterName);
         serviceStarter.start();
         serviceUrl = serviceStarter.getProxyService().getServiceUrl();
     }
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java
index b21162577a2..61718bbac3a 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java
@@ -68,6 +68,7 @@ public class ProxyServiceTlsStarterTest extends 
MockedPulsarServiceBaseTest {
         
serviceStarter.getConfig().setTlsCertificateFilePath(PROXY_CERT_FILE_PATH);
         serviceStarter.getConfig().setTlsKeyFilePath(PROXY_KEY_FILE_PATH);
         serviceStarter.getConfig().setBrokerProxyAllowedTargetPorts("*");
+        serviceStarter.getConfig().setClusterName(configClusterName);
         serviceStarter.start();
         serviceUrl = serviceStarter.getProxyService().getServiceUrlTls();
         webPort = serviceStarter.getServer().getListenPortHTTP().get();
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java
index 155fbf616b0..2866c6c2690 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java
@@ -72,6 +72,7 @@ public class ProxyStatsTest extends 
MockedPulsarServiceBaseTest {
         proxyConfig.setWebServicePort(Optional.of(0));
         proxyConfig.setMetadataStoreUrl(DUMMY_VALUE);
         proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
+        proxyConfig.setClusterName(configClusterName);
         // enable full parsing feature
         proxyConfig.setProxyLogLevel(Optional.of(2));
 
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStuckConnectionTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStuckConnectionTest.java
index 79ea7c5d6a3..6e66008c15a 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStuckConnectionTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStuckConnectionTest.java
@@ -79,6 +79,7 @@ public class ProxyStuckConnectionTest extends 
MockedPulsarServiceBaseTest {
         proxyConfig.setServicePort(Optional.ofNullable(0));
         proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
+        proxyConfig.setClusterName(configClusterName);
 
         startProxyService();
         // use the same port for subsequent restarts
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
index ac08052aaf1..9bc12dcc6fc 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
@@ -106,6 +106,7 @@ public class ProxyTest extends MockedPulsarServiceBaseTest {
         proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setMetadataStoreUrl(DUMMY_VALUE);
         proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
+        proxyConfig.setClusterName(configClusterName);
     }
 
     @Override
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java
index a1b27abece4..4e300d39741 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java
@@ -61,6 +61,7 @@ public class ProxyTlsTest extends MockedPulsarServiceBaseTest 
{
         proxyConfig.setTlsKeyFilePath(PROXY_KEY_FILE_PATH);
         proxyConfig.setMetadataStoreUrl(DUMMY_VALUE);
         proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
+        proxyConfig.setClusterName(configClusterName);
 
         proxyService = Mockito.spy(new ProxyService(proxyConfig, new 
AuthenticationService(
                                                             
PulsarConfigurationLoader.convertFrom(proxyConfig))));
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsWithAuthTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsWithAuthTest.java
index ec5cace8a06..16f610d6d0a 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsWithAuthTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsWithAuthTest.java
@@ -73,6 +73,7 @@ public class ProxyTlsWithAuthTest extends 
MockedPulsarServiceBaseTest {
             " \"issuerUrl\":\"" + server.getIssuer() + "\"," +
             " \"audience\": \"an-audience\"," +
             " \"privateKey\":\"file://" + tempFile.getAbsolutePath() + "\"}");
+        proxyConfig.setClusterName(configClusterName);
 
         proxyService = Mockito.spy(new ProxyService(proxyConfig, new 
AuthenticationService(
             PulsarConfigurationLoader.convertFrom(proxyConfig))));
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
index e0dcefe2714..cf9ad5831ec 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
@@ -57,6 +57,7 @@ import org.testng.collections.Maps;
 
 public class ProxyWithAuthorizationNegTest extends ProducerConsumerBase {
     private static final Logger log = 
LoggerFactory.getLogger(ProxyWithAuthorizationNegTest.class);
+    private static final String CLUSTER_NAME = "proxy-authorization-neg";
 
     private final String TLS_PROXY_TRUST_CERT_FILE_PATH = 
"./src/test/resources/authentication/tls/ProxyWithAuthorizationTest/proxy-cacert.pem";
     private final String TLS_PROXY_CERT_FILE_PATH = 
"./src/test/resources/authentication/tls/ProxyWithAuthorizationTest/proxy-cert.pem";
@@ -104,7 +105,7 @@ public class ProxyWithAuthorizationNegTest extends 
ProducerConsumerBase {
         providers.add(AuthenticationProviderTls.class.getName());
         conf.setAuthenticationProviders(providers);
 
-        conf.setClusterName("proxy-authorization-neg");
+        conf.setClusterName(CLUSTER_NAME);
         conf.setNumExecutorThreadPoolSize(5);
 
         super.init();
@@ -121,6 +122,7 @@ public class ProxyWithAuthorizationNegTest extends 
ProducerConsumerBase {
         proxyConfig.setWebServicePort(Optional.of(0));
         proxyConfig.setWebServicePortTls(Optional.of(0));
         proxyConfig.setTlsEnabledWithBroker(true);
+        proxyConfig.setClusterName(CLUSTER_NAME);
 
         // enable tls and auth&auth at proxy
         proxyConfig.setTlsCertificateFilePath(TLS_PROXY_CERT_FILE_PATH);
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java
index 4e4c3c550cf..bc96c7ea510 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java
@@ -64,6 +64,7 @@ import org.testng.collections.Maps;
 
 public class ProxyWithAuthorizationTest extends ProducerConsumerBase {
     private static final Logger log = 
LoggerFactory.getLogger(ProxyWithAuthorizationTest.class);
+    private static final String CLUSTER_NAME = "proxy-authorization";
 
     private final SecretKey SECRET_KEY = 
AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
     private final String CLIENT_TOKEN = AuthTokenUtils.createToken(SECRET_KEY, 
"Client", Optional.empty());
@@ -189,7 +190,7 @@ public class ProxyWithAuthorizationTest extends 
ProducerConsumerBase {
         properties.setProperty("tokenSecretKey", 
AuthTokenUtils.encodeKeyBase64(SECRET_KEY));
         conf.setProperties(properties);
 
-        conf.setClusterName("proxy-authorization");
+        conf.setClusterName(CLUSTER_NAME);
         conf.setNumExecutorThreadPoolSize(5);
     }
 
@@ -206,6 +207,7 @@ public class ProxyWithAuthorizationTest extends 
ProducerConsumerBase {
         proxyConfig.setBrokerServiceURLTLS(pulsar.getBrokerServiceUrlTls());
         
proxyConfig.setBrokerWebServiceURLTLS(pulsar.getWebServiceAddressTls());
         proxyConfig.setAdvertisedAddress(null);
+        proxyConfig.setClusterName(CLUSTER_NAME);
 
         proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setServicePortTls(Optional.of(0));
@@ -432,6 +434,7 @@ public class ProxyWithAuthorizationTest extends 
ProducerConsumerBase {
         proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
         proxyConfig.setBrokerServiceURLTLS(pulsar.getBrokerServiceUrlTls());
         proxyConfig.setAdvertisedAddress(null);
+        proxyConfig.setClusterName(CLUSTER_NAME);
 
         proxyConfig.setServicePort(Optional.of(0));
         proxyConfig.setBrokerProxyAllowedTargetPorts("*");
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithExtensibleLoadManagerTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithExtensibleLoadManagerTest.java
index f997532b273..d3c05fec721 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithExtensibleLoadManagerTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithExtensibleLoadManagerTest.java
@@ -110,6 +110,7 @@ public class ProxyWithExtensibleLoadManagerTest extends 
MultiBrokerBaseTest {
         proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setMetadataStoreUrl(DUMMY_VALUE);
         proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
+        proxyConfig.setClusterName(configClusterName);
         return proxyConfig;
     }
 
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
index 14be7dadc41..5fb3e046824 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
@@ -67,6 +67,7 @@ import org.testng.annotations.Test;
 
 public class ProxyWithJwtAuthorizationTest extends ProducerConsumerBase {
     private static final Logger log = 
LoggerFactory.getLogger(ProxyWithJwtAuthorizationTest.class);
+    private static final String CLUSTER_NAME = "proxy-authorization";
 
     private final String ADMIN_ROLE = "admin";
     private final String PROXY_ROLE = "proxy";
@@ -104,7 +105,7 @@ public class ProxyWithJwtAuthorizationTest extends 
ProducerConsumerBase {
         providers.add(AuthenticationProviderToken.class.getName());
         conf.setAuthenticationProviders(providers);
 
-        conf.setClusterName("proxy-authorization");
+        conf.setClusterName(CLUSTER_NAME);
         conf.setNumExecutorThreadPoolSize(5);
 
         super.init();
@@ -119,6 +120,7 @@ public class ProxyWithJwtAuthorizationTest extends 
ProducerConsumerBase {
         proxyConfig.setServicePort(Optional.of(0));
         proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setWebServicePort(Optional.of(0));
+        proxyConfig.setClusterName(CLUSTER_NAME);
 
         // enable auth&auth and use JWT at proxy
         
proxyConfig.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java
index e09194bb21d..9d9490e74b5 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java
@@ -54,9 +54,11 @@ import org.testng.collections.Maps;
 
 public class ProxyWithoutServiceDiscoveryTest extends ProducerConsumerBase {
     private static final Logger log = 
LoggerFactory.getLogger(ProxyWithoutServiceDiscoveryTest.class);
+    private static final String CLUSTER_NAME = "without-service-discovery";
     private ProxyService proxyService;
     private ProxyConfiguration proxyConfig = new ProxyConfiguration();
 
+
     @BeforeMethod
     @Override
     protected void setup() throws Exception {
@@ -89,7 +91,7 @@ public class ProxyWithoutServiceDiscoveryTest extends 
ProducerConsumerBase {
         providers.add(AuthenticationProviderTls.class.getName());
         conf.setAuthenticationProviders(providers);
 
-        conf.setClusterName("without-service-discovery");
+        conf.setClusterName(CLUSTER_NAME);
         conf.setNumExecutorThreadPoolSize(5);
 
         super.init();
@@ -106,6 +108,7 @@ public class ProxyWithoutServiceDiscoveryTest extends 
ProducerConsumerBase {
         proxyConfig.setWebServicePort(Optional.of(0));
         proxyConfig.setWebServicePortTls(Optional.of(0));
         proxyConfig.setTlsEnabledWithBroker(true);
+        proxyConfig.setClusterName(CLUSTER_NAME);
 
         // enable tls and auth&auth at proxy
         proxyConfig.setTlsCertificateFilePath(PROXY_CERT_FILE_PATH);
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java
index a44e2a85efa..57522186c8f 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java
@@ -80,6 +80,7 @@ public class SuperUserAuthedAdminProxyHandlerTest extends 
MockedPulsarServiceBas
         proxyConfig.setWebServicePort(Optional.of(0));
         proxyConfig.setWebServicePortTls(Optional.of(0));
         proxyConfig.setTlsEnabledWithBroker(true);
+        proxyConfig.setClusterName(configClusterName);
 
         // enable tls and auth&auth at proxy
         proxyConfig.setTlsCertificateFilePath(BROKER_CERT_FILE_PATH);
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java
index d239815ae81..fe8b1f45385 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java
@@ -75,6 +75,7 @@ public class UnauthedAdminProxyHandlerTest extends 
MockedPulsarServiceBaseTest {
         proxyConfig.setStatusFilePath(STATUS_FILE_PATH);
         proxyConfig.setMetadataStoreUrl(DUMMY_VALUE);
         proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
+        proxyConfig.setClusterName(configClusterName);
 
         webServer = new WebServer(proxyConfig, new AuthenticationService(
                                           
PulsarConfigurationLoader.convertFrom(proxyConfig)));
diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml
index df36c35a191..5582931851b 100644
--- a/tests/integration/pom.xml
+++ b/tests/integration/pom.xml
@@ -55,6 +55,13 @@
       <version>${project.version}</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.pulsar</groupId>
+      <artifactId>pulsar-broker-common</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>org.apache.pulsar</groupId>
       <artifactId>pulsar-common</artifactId>
@@ -73,6 +80,12 @@
       <version>${project.version}</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.pulsar</groupId>
+      <artifactId>pulsar-proxy</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>org.apache.pulsar</groupId>
       <artifactId>managed-ledger</artifactId>
@@ -169,7 +182,6 @@
       <scope>test</scope>
     </dependency>
 
-
     <dependency>
       <groupId>com.rabbitmq</groupId>
       <artifactId>amqp-client</artifactId>
@@ -189,6 +201,12 @@
       <scope>test</scope>
     </dependency>
 
+    <dependency>
+      <groupId>io.rest-assured</groupId>
+      <artifactId>rest-assured</artifactId>
+      <scope>test</scope>
+    </dependency>
+
 <!--    kinesis-->
     <dependency>
       <groupId>org.testcontainers</groupId>
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/OpenTelemetryCollectorContainer.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/OpenTelemetryCollectorContainer.java
new file mode 100644
index 00000000000..2b115ca6b95
--- /dev/null
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/OpenTelemetryCollectorContainer.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.containers;
+
+import java.time.Duration;
+import org.apache.http.HttpStatus;
+import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
+import org.testcontainers.utility.MountableFile;
+
+public class OpenTelemetryCollectorContainer extends 
ChaosContainer<OpenTelemetryCollectorContainer> {
+
+    private static final String IMAGE_NAME = 
"otel/opentelemetry-collector-contrib:latest";
+    private static final String NAME = "otel-collector";
+
+    public static final int PROMETHEUS_EXPORTER_PORT = 8889;
+    private static final int OTLP_RECEIVER_PORT = 4317;
+    private static final int ZPAGES_PORT = 55679;
+
+    public OpenTelemetryCollectorContainer(String clusterName) {
+        super(clusterName, IMAGE_NAME);
+    }
+
+    @Override
+    protected void configure() {
+        super.configure();
+
+        this.withCopyFileToContainer(
+                
MountableFile.forClasspathResource("containers/otel-collector-config.yaml", 
0644),
+                "/etc/otel-collector-config.yaml")
+            .withCommand("--config=/etc/otel-collector-config.yaml")
+            .withExposedPorts(OTLP_RECEIVER_PORT, PROMETHEUS_EXPORTER_PORT, 
ZPAGES_PORT)
+            .waitingFor(new HttpWaitStrategy()
+                    .forPath("/debug/servicez")
+                    .forPort(ZPAGES_PORT)
+                    .forStatusCode(HttpStatus.SC_OK)
+                    .withStartupTimeout(Duration.ofSeconds(300)));
+    }
+
+    @Override
+    public String getContainerName() {
+        return clusterName + "-" + NAME;
+    }
+
+    public String getOtlpEndpoint() {
+        return String.format("http://%s:%d";, NAME, OTLP_RECEIVER_PORT);
+    }
+}
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java
index 56d64ce5b2c..77cdc1bfd28 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java
@@ -26,6 +26,7 @@ import java.io.UncheckedIOException;
 import java.time.Duration;
 import java.util.Objects;
 import java.util.UUID;
+import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.io.FileUtils;
 import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
@@ -70,6 +71,7 @@ public abstract class PulsarContainer<SelfT extends 
PulsarContainer<SelfT>> exte
     public static final boolean PULSAR_CONTAINERS_LEAVE_RUNNING =
             
Boolean.parseBoolean(System.getenv("PULSAR_CONTAINERS_LEAVE_RUNNING"));
 
+    @Getter
     protected final String hostname;
     private final String serviceName;
     private final String serviceEntryPoint;
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/metrics/OpenTelemetrySanityTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/metrics/OpenTelemetrySanityTest.java
new file mode 100644
index 00000000000..38afc1f127d
--- /dev/null
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/metrics/OpenTelemetrySanityTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.metrics;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.PulsarVersion;
+import org.apache.pulsar.broker.stats.PulsarBrokerOpenTelemetry;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient;
+import org.apache.pulsar.functions.worker.PulsarWorkerOpenTelemetry;
+import org.apache.pulsar.proxy.stats.PulsarProxyOpenTelemetry;
+import org.apache.pulsar.tests.integration.containers.ChaosContainer;
+import 
org.apache.pulsar.tests.integration.containers.OpenTelemetryCollectorContainer;
+import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
+import org.apache.pulsar.tests.integration.topologies.PulsarTestBase;
+import org.awaitility.Awaitility;
+import org.testng.annotations.Test;
+
+public class OpenTelemetrySanityTest {
+
+    // Validate that the OpenTelemetry metrics can be exported to a remote 
OpenTelemetry collector.
+    @Test(timeOut = 360_000)
+    public void testOpenTelemetryMetricsOtlpExport() throws Exception {
+        var clusterName = "testOpenTelemetryMetrics-" + UUID.randomUUID();
+        var openTelemetryCollectorContainer = new 
OpenTelemetryCollectorContainer(clusterName);
+
+        var exporter = "otlp";
+        var otlpEndpointProp =
+                Pair.of("OTEL_EXPORTER_OTLP_ENDPOINT", 
openTelemetryCollectorContainer.getOtlpEndpoint());
+
+        var brokerCollectorProps = getOpenTelemetryProps(exporter, 
otlpEndpointProp);
+        var proxyCollectorProps = getOpenTelemetryProps(exporter, 
otlpEndpointProp);
+        var functionWorkerCollectorProps = getOpenTelemetryProps(exporter, 
otlpEndpointProp);
+
+        var spec = PulsarClusterSpec.builder()
+                .clusterName(clusterName)
+                .brokerEnvs(brokerCollectorProps)
+                .proxyEnvs(proxyCollectorProps)
+                .externalService("otel-collector", 
openTelemetryCollectorContainer)
+                .functionWorkerEnvs(functionWorkerCollectorProps)
+                .build();
+        @Cleanup("stop")
+        var pulsarCluster = PulsarCluster.forSpec(spec);
+        pulsarCluster.start();
+        pulsarCluster.setupFunctionWorkers(PulsarTestBase.randomName(), 
FunctionRuntimeType.PROCESS, 1);
+
+        // TODO: Validate cluster name and service version are present once
+        // https://github.com/open-telemetry/opentelemetry-java/issues/6108 is 
solved.
+        var metricName = "queueSize_ratio"; // Sent automatically by the 
OpenTelemetry SDK.
+        Awaitility.waitAtMost(90, 
TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() 
-> {
+            var metrics = getMetricsFromPrometheus(
+                    openTelemetryCollectorContainer, 
OpenTelemetryCollectorContainer.PROMETHEUS_EXPORTER_PORT);
+            return !metrics.findByNameAndLabels(metricName, "job", 
PulsarBrokerOpenTelemetry.SERVICE_NAME).isEmpty();
+        });
+        Awaitility.waitAtMost(90, 
TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() 
-> {
+            var metrics = getMetricsFromPrometheus(
+                    openTelemetryCollectorContainer, 
OpenTelemetryCollectorContainer.PROMETHEUS_EXPORTER_PORT);
+            return !metrics.findByNameAndLabels(metricName, "job", 
PulsarProxyOpenTelemetry.SERVICE_NAME).isEmpty();
+        });
+        Awaitility.waitAtMost(90, 
TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() 
-> {
+            var metrics = getMetricsFromPrometheus(
+                    openTelemetryCollectorContainer, 
OpenTelemetryCollectorContainer.PROMETHEUS_EXPORTER_PORT);
+            return !metrics.findByNameAndLabels(metricName, "job", 
PulsarWorkerOpenTelemetry.SERVICE_NAME).isEmpty();
+        });
+    }
+
+    /*
+     * Validate that the OpenTelemetry metrics can be exported to a local 
Prometheus endpoint running in the same
+     * process space as the broker/proxy/function-worker.
+     * 
https://github.com/open-telemetry/opentelemetry-java/blob/main/sdk-extensions/autoconfigure/README.md#prometheus-exporter
+     */
+    @Test(timeOut = 360_000)
+    public void testOpenTelemetryMetricsPrometheusExport() throws Exception {
+        var prometheusExporterPort = 9464;
+        var clusterName = "testOpenTelemetryMetrics-" + UUID.randomUUID();
+
+        var exporter = "prometheus";
+        var prometheusExporterPortProp =
+                Pair.of("OTEL_EXPORTER_PROMETHEUS_PORT", 
Integer.toString(prometheusExporterPort));
+
+        var brokerCollectorProps = getOpenTelemetryProps(exporter, 
prometheusExporterPortProp);
+        var proxyCollectorProps = getOpenTelemetryProps(exporter, 
prometheusExporterPortProp);
+        var functionWorkerCollectorProps = getOpenTelemetryProps(exporter, 
prometheusExporterPortProp);
+
+        var spec = PulsarClusterSpec.builder()
+                .clusterName(clusterName)
+                .brokerEnvs(brokerCollectorProps)
+                .brokerAdditionalPorts(List.of(prometheusExporterPort))
+                .proxyEnvs(proxyCollectorProps)
+                .proxyAdditionalPorts(List.of(prometheusExporterPort))
+                .functionWorkerEnvs(functionWorkerCollectorProps)
+                .functionWorkerAdditionalPorts(List.of(prometheusExporterPort))
+                .build();
+        @Cleanup("stop")
+        var pulsarCluster = PulsarCluster.forSpec(spec);
+        pulsarCluster.start();
+        pulsarCluster.setupFunctionWorkers(PulsarTestBase.randomName(), 
FunctionRuntimeType.PROCESS, 1);
+
+        var metricName = "target_info"; // Sent automatically by the 
OpenTelemetry SDK.
+        Awaitility.waitAtMost(90, 
TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() 
-> {
+            var metrics = getMetricsFromPrometheus(pulsarCluster.getBroker(0), 
prometheusExporterPort);
+            return !metrics.findByNameAndLabels(metricName,
+                    Pair.of("pulsar_cluster", clusterName),
+                    Pair.of("service_name", 
PulsarBrokerOpenTelemetry.SERVICE_NAME),
+                    Pair.of("service_version", PulsarVersion.getVersion()),
+                    Pair.of("host_name", 
pulsarCluster.getBroker(0).getHostname())).isEmpty();
+        });
+        Awaitility.waitAtMost(90, 
TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() 
-> {
+            var metrics = getMetricsFromPrometheus(pulsarCluster.getProxy(), 
prometheusExporterPort);
+            return !metrics.findByNameAndLabels(metricName,
+                    Pair.of("pulsar_cluster", clusterName),
+                    Pair.of("service_name", 
PulsarProxyOpenTelemetry.SERVICE_NAME),
+                    Pair.of("service_version", PulsarVersion.getVersion()),
+                    Pair.of("host_name", 
pulsarCluster.getProxy().getHostname())).isEmpty();
+        });
+        Awaitility.waitAtMost(90, 
TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() 
-> {
+            var metrics = 
getMetricsFromPrometheus(pulsarCluster.getAnyWorker(), prometheusExporterPort);
+            return !metrics.findByNameAndLabels(metricName,
+                    Pair.of("pulsar_cluster", clusterName),
+                    Pair.of("service_name", 
PulsarWorkerOpenTelemetry.SERVICE_NAME),
+                    Pair.of("service_version", PulsarVersion.getVersion()),
+                    Pair.of("host_name", 
pulsarCluster.getAnyWorker().getHostname())).isEmpty();
+        });
+    }
+
+    private static PrometheusMetricsClient.Metrics 
getMetricsFromPrometheus(ChaosContainer<?> container, int port) {
+        var client = new PrometheusMetricsClient(container.getHost(), 
container.getMappedPort(port));
+        return client.getMetrics();
+    }
+
+    private static Map<String, String> getOpenTelemetryProps(String exporter, 
Pair<String, String> ... extraProps) {
+        var defaultProps = Map.of(
+                "OTEL_SDK_DISABLED", "false",
+                "OTEL_METRIC_EXPORT_INTERVAL", "1000",
+                "OTEL_METRICS_EXPORTER", exporter
+        );
+        var props = new HashMap<>(defaultProps);
+        Arrays.stream(extraProps).forEach(p -> props.put(p.getKey(), 
p.getValue()));
+        return props;
+    }
+}
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
index bc9b1e267b9..5f893f67f74 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
@@ -102,6 +102,8 @@ public class PulsarCluster {
     private final ProxyContainer proxyContainer;
     private Map<String, GenericContainer<?>> externalServices = 
Collections.emptyMap();
     private Map<String, Map<String, String>> externalServiceEnvs;
+    private final Map<String, String> functionWorkerEnvs;
+    private final List<Integer> functionWorkerAdditionalPorts;
 
     private final String metadataStoreUrl;
     private final String configurationMetadataStoreUrl;
@@ -182,6 +184,9 @@ public class PulsarCluster {
         if (spec.proxyMountFiles != null) {
             
spec.proxyMountFiles.forEach(this.proxyContainer::withFileSystemBind);
         }
+        if (spec.proxyAdditionalPorts != null) {
+            
spec.proxyAdditionalPorts.forEach(this.proxyContainer::addExposedPort);
+        }
 
         // create bookies
         bookieContainers.putAll(
@@ -268,6 +273,8 @@ public class PulsarCluster {
             workerContainers.values().forEach(c -> 
c.withClasspathResourceMapping(key, value, BindMode.READ_WRITE));
         });
 
+        functionWorkerEnvs = spec.functionWorkerEnvs;
+        functionWorkerAdditionalPorts = spec.functionWorkerAdditionalPorts;
     }
 
     public String getPlainTextServiceUrl() {
@@ -475,23 +482,25 @@ public class PulsarCluster {
         String serviceUrl = "pulsar://pulsar-broker-0:" + 
PulsarContainer.BROKER_PORT;
         String httpServiceUrl = "http://pulsar-broker-0:"; + 
PulsarContainer.BROKER_HTTP_PORT;
         workerContainers.putAll(runNumContainers(
-                "functions-worker-process-" + suffix,
-                numFunctionWorkers,
-                (name) -> new WorkerContainer(clusterName, name)
-                        .withNetwork(network)
-                        .withNetworkAliases(name)
-                        // worker settings
-                        .withEnv("PF_workerId", name)
-                        .withEnv("PF_workerHostname", name)
-                        .withEnv("PF_workerPort", "" + 
PulsarContainer.BROKER_HTTP_PORT)
-                        .withEnv("PF_pulsarFunctionsCluster", clusterName)
-                        .withEnv("PF_pulsarServiceUrl", serviceUrl)
-                        .withEnv("PF_pulsarWebServiceUrl", httpServiceUrl)
-                        // script
-                        .withEnv("clusterName", clusterName)
-                        .withEnv("zookeeperServers", ZKContainer.NAME)
-                        // bookkeeper tools
-                        .withEnv("zkServers", ZKContainer.NAME)
+            "functions-worker-process-" + suffix,
+            numFunctionWorkers,
+            (name) -> new WorkerContainer(clusterName, name)
+                .withNetwork(network)
+                .withNetworkAliases(name)
+                // worker settings
+                .withEnv("PF_workerId", name)
+                .withEnv("PF_workerHostname", name)
+                .withEnv("PF_workerPort", "" + 
PulsarContainer.BROKER_HTTP_PORT)
+                .withEnv("PF_pulsarFunctionsCluster", clusterName)
+                .withEnv("PF_pulsarServiceUrl", serviceUrl)
+                .withEnv("PF_pulsarWebServiceUrl", httpServiceUrl)
+                // script
+                .withEnv("clusterName", clusterName)
+                .withEnv("zookeeperServers", ZKContainer.NAME)
+                // bookkeeper tools
+                .withEnv("zkServers", ZKContainer.NAME)
+                .withEnv(functionWorkerEnvs)
+                .withExposedPorts(functionWorkerAdditionalPorts.toArray(new 
Integer[0]))
         ));
         this.startWorkers();
     }
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
index b705b347cff..8a991be49fa 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
@@ -147,6 +147,12 @@ public class PulsarClusterSpec {
      */
     Map<String, String> bookkeeperEnvs;
 
+    /**
+     * Specify envs for function workers.
+     */
+    @Singular
+    Map<String, String> functionWorkerEnvs;
+
     /**
      * Specify mount files.
      */
@@ -170,6 +176,17 @@ public class PulsarClusterSpec {
      */
     List<Integer> bookieAdditionalPorts;
 
+    /**
+     * Additional ports to expose on proxy containers.
+     */
+    List<Integer> proxyAdditionalPorts;
+
+    /**
+     * Additional ports to expose on function workers.
+     */
+    @Singular
+    List<Integer> functionWorkerAdditionalPorts;
+
     /**
      * Enable TLS for connection.
      */
diff --git 
a/tests/integration/src/test/resources/containers/otel-collector-config.yaml 
b/tests/integration/src/test/resources/containers/otel-collector-config.yaml
new file mode 100644
index 00000000000..bd332f04283
--- /dev/null
+++ b/tests/integration/src/test/resources/containers/otel-collector-config.yaml
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+receivers:
+  otlp:
+    protocols:
+      grpc:
+
+exporters:
+  prometheus:
+    endpoint: "0.0.0.0:8889"
+
+processors:
+  batch:
+
+extensions:
+  health_check:
+  zpages:
+    endpoint: :55679
+
+service:
+  extensions: [zpages, health_check]
+  pipelines:
+    metrics:
+      receivers: [otlp]
+      processors: [batch]
+      exporters: [prometheus]
\ No newline at end of file
diff --git a/tests/integration/src/test/resources/pulsar-metrics.xml 
b/tests/integration/src/test/resources/pulsar-metrics.xml
new file mode 100644
index 00000000000..1c87f2bdf0d
--- /dev/null
+++ b/tests/integration/src/test/resources/pulsar-metrics.xml
@@ -0,0 +1,28 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<!DOCTYPE suite SYSTEM "https://testng.org/testng-1.0.dtd"; >
+<suite name="Pulsar Metrics Integration Tests" verbose="2" annotations="JDK">
+  <test name="metrics-test-suite" preserve-order="true">
+    <classes>
+      <class 
name="org.apache.pulsar.tests.integration.metrics.OpenTelemetrySanityTest"/>
+    </classes>
+  </test>
+</suite>
diff --git a/tests/integration/src/test/resources/pulsar.xml 
b/tests/integration/src/test/resources/pulsar.xml
index bdc5f27cc78..aa9a59a6cda 100644
--- a/tests/integration/src/test/resources/pulsar.xml
+++ b/tests/integration/src/test/resources/pulsar.xml
@@ -37,5 +37,6 @@
         <suite-file path="./pulsar-python.xml" />
         <suite-file path="./pulsar-semantics.xml" />
         <suite-file path="./pulsar-upgrade.xml" />
+        <suite-file path="./pulsar-metrics.xml" />
     </suite-files>
 </suite>

Reply via email to