This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f323342a4aa [feat][broker] PIP-264: Add OpenTelemetry broker 
replicator metrics (#22972)
f323342a4aa is described below

commit f323342a4aa158ac72a9a3dc3cc67b8c2c5fd986
Author: Dragos Misca <dragosvic...@users.noreply.github.com>
AuthorDate: Tue Jun 25 07:34:59 2024 -0700

    [feat][broker] PIP-264: Add OpenTelemetry broker replicator metrics (#22972)
---
 .../org/apache/pulsar/broker/PulsarService.java    |   7 +
 .../pulsar/broker/service/AbstractReplicator.java  |  40 +++++
 .../pulsar/broker/service/AbstractTopic.java       |   8 +
 .../apache/pulsar/broker/service/Replicator.java   |   6 +-
 .../nonpersistent/NonPersistentReplicator.java     |  25 ++--
 .../service/nonpersistent/NonPersistentTopic.java  |   2 +-
 .../persistent/GeoPersistentReplicator.java        |   2 +
 .../service/persistent/PersistentReplicator.java   |  48 +++---
 .../broker/service/persistent/PersistentTopic.java |   4 +-
 .../service/persistent/ShadowReplicator.java       |   2 +
 .../broker/stats/OpenTelemetryReplicatorStats.java | 166 +++++++++++++++++++++
 .../stats/prometheus/NamespaceStatsAggregator.java |   2 +-
 .../broker/service/AbstractReplicatorTest.java     |   5 +
 .../pulsar/broker/service/ReplicatorTest.java      | 107 ++++++++++++-
 .../pulsar/broker/service/ReplicatorTestBase.java  |  56 +++++--
 .../broker/stats/BrokerOpenTelemetryTestUtil.java  |  18 +++
 .../pulsar/client/api/BrokerServiceLookupTest.java |   1 +
 .../data/NonPersistentReplicatorStats.java         |   3 +
 .../common/policies/data/ReplicatorStats.java      |  20 +++
 .../stats/NonPersistentReplicatorStatsImpl.java    |  24 ++-
 .../policies/data/stats/ReplicatorStatsImpl.java   |  62 +++++++-
 .../opentelemetry/OpenTelemetryAttributes.java     |   6 +
 22 files changed, 539 insertions(+), 75 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 65dd90f7a12..8cf1376642b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -113,6 +113,7 @@ import 
org.apache.pulsar.broker.service.schema.SchemaStorageFactory;
 import org.apache.pulsar.broker.stats.MetricsGenerator;
 import org.apache.pulsar.broker.stats.OpenTelemetryConsumerStats;
 import org.apache.pulsar.broker.stats.OpenTelemetryProducerStats;
+import org.apache.pulsar.broker.stats.OpenTelemetryReplicatorStats;
 import org.apache.pulsar.broker.stats.OpenTelemetryTopicStats;
 import org.apache.pulsar.broker.stats.PulsarBrokerOpenTelemetry;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
@@ -260,6 +261,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
     private OpenTelemetryTopicStats openTelemetryTopicStats;
     private OpenTelemetryConsumerStats openTelemetryConsumerStats;
     private OpenTelemetryProducerStats openTelemetryProducerStats;
+    private OpenTelemetryReplicatorStats openTelemetryReplicatorStats;
 
     private TransactionMetadataStoreService transactionMetadataStoreService;
     private TransactionBufferProvider transactionBufferProvider;
@@ -678,6 +680,10 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
             brokerClientSharedTimer.stop();
             monotonicSnapshotClock.close();
 
+            if (openTelemetryReplicatorStats != null) {
+                openTelemetryReplicatorStats.close();
+                openTelemetryReplicatorStats = null;
+            }
             if (openTelemetryProducerStats != null) {
                 openTelemetryProducerStats.close();
                 openTelemetryProducerStats = null;
@@ -834,6 +840,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
             openTelemetryTopicStats = new OpenTelemetryTopicStats(this);
             openTelemetryConsumerStats = new OpenTelemetryConsumerStats(this);
             openTelemetryProducerStats = new OpenTelemetryProducerStats(this);
+            openTelemetryReplicatorStats = new 
OpenTelemetryReplicatorStats(this);
 
             localMetadataSynchronizer = 
StringUtils.isNotBlank(config.getMetadataSyncEventTopic())
                     ? new PulsarMetadataEventSynchronizer(this, 
config.getMetadataSyncEventTopic())
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
index 869a4bc81d3..8552a9f09e9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.service;
 
 import com.google.common.annotations.VisibleForTesting;
+import io.opentelemetry.api.common.Attributes;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
@@ -42,6 +43,7 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.Backoff;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.StringInterner;
+import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,6 +57,7 @@ public abstract class AbstractReplicator implements 
Replicator {
     protected final PulsarClientImpl replicationClient;
     protected final PulsarClientImpl client;
     protected String replicatorId;
+    @Getter
     protected final Topic localTopic;
 
     protected volatile ProducerImpl producer;
@@ -74,6 +77,10 @@ public abstract class AbstractReplicator implements 
Replicator {
     @Getter
     protected volatile State state = State.Disconnected;
 
+    private volatile Attributes attributes = null;
+    private static final AtomicReferenceFieldUpdater<AbstractReplicator, 
Attributes> ATTRIBUTES_UPDATER =
+            AtomicReferenceFieldUpdater.newUpdater(AbstractReplicator.class, 
Attributes.class, "attributes");
+
     public enum State {
         /**
          * This enum has two mean meanings:
@@ -136,6 +143,17 @@ public abstract class AbstractReplicator implements 
Replicator {
 
     protected abstract void disableReplicatorRead();
 
+    @Override
+    public boolean isConnected() {
+        var producer = this.producer;
+        return producer != null && producer.isConnected();
+    }
+
+    public long getReplicationDelayMs() {
+        var producer = this.producer;
+        return producer == null ? 0 : producer.getDelayInMillis();
+    }
+
     public String getRemoteCluster() {
         return remoteCluster;
     }
@@ -476,4 +494,26 @@ public abstract class AbstractReplicator implements 
Replicator {
     public boolean isTerminated() {
         return state == State.Terminating || state == State.Terminated;
     }
+
+    public Attributes getAttributes() {
+        if (attributes != null) {
+            return attributes;
+        }
+        return ATTRIBUTES_UPDATER.updateAndGet(this, old -> {
+            if (old != null) {
+                return old;
+            }
+            var topicName = TopicName.get(getLocalTopic().getName());
+            var builder = Attributes.builder()
+                    .put(OpenTelemetryAttributes.PULSAR_DOMAIN, 
topicName.getDomain().toString())
+                    .put(OpenTelemetryAttributes.PULSAR_TENANT, 
topicName.getTenant())
+                    .put(OpenTelemetryAttributes.PULSAR_NAMESPACE, 
topicName.getNamespace())
+                    .put(OpenTelemetryAttributes.PULSAR_TOPIC, 
topicName.getPartitionedTopicName());
+            if (topicName.isPartitioned()) {
+                builder.put(OpenTelemetryAttributes.PULSAR_PARTITION_INDEX, 
topicName.getPartitionIndex());
+            }
+            
builder.put(OpenTelemetryAttributes.PULSAR_REPLICATION_REMOTE_CLUSTER_NAME, 
getRemoteCluster());
+            return builder.build();
+        });
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 572b54e0d3e..fbf11f1d0ad 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -933,6 +933,14 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
         if (isSystemTopic()) {
             systemTopicBytesInCounter.add(msgSizeInBytes);
         }
+
+        if (producer.isRemote()) {
+            var remoteClusterName = producer.getRemoteCluster();
+            var replicator = getReplicators().get(remoteClusterName);
+            if (replicator != null) {
+                replicator.getStats().incrementPublishCount(numOfMessages, 
msgSizeInBytes);
+            }
+        }
     }
 
     private void handlePublishThrottling(Producer producer, int numOfMessages, 
long msgSizeInBytes) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java
index 5c314397da8..667063e4910 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java
@@ -27,7 +27,9 @@ public interface Replicator {
 
     void startProducer();
 
-    ReplicatorStatsImpl getStats();
+    Topic getLocalTopic();
+
+    ReplicatorStatsImpl computeStats();
 
     CompletableFuture<Void> terminate();
 
@@ -53,4 +55,6 @@ public interface Replicator {
     long getNumberOfEntriesInBacklog();
 
     boolean isTerminated();
+
+    ReplicatorStatsImpl getStats();
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
index 51509f3818a..6441230fad8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
@@ -116,6 +116,8 @@ public class NonPersistentReplicator extends 
AbstractReplicator implements Repli
             }
 
             msgOut.recordEvent(headersAndPayload.readableBytes());
+            stats.incrementMsgOutCounter();
+            stats.incrementBytesOutCounter(headersAndPayload.readableBytes());
 
             msg.setReplicatedFrom(localCluster);
 
@@ -129,6 +131,7 @@ public class NonPersistentReplicator extends 
AbstractReplicator implements Repli
                         replicatorId);
             }
             msgDrop.recordEvent();
+            stats.incrementMsgDropCount();
             entry.release();
         }
     }
@@ -143,11 +146,11 @@ public class NonPersistentReplicator extends 
AbstractReplicator implements Repli
     }
 
     @Override
-    public NonPersistentReplicatorStatsImpl getStats() {
-        stats.connected = producer != null && producer.isConnected();
-        stats.replicationDelayInSeconds = getReplicationDelayInSeconds();
-
+    public NonPersistentReplicatorStatsImpl computeStats() {
         ProducerImpl producer = this.producer;
+        stats.connected = isConnected();
+        stats.replicationDelayInSeconds = 
TimeUnit.MILLISECONDS.toSeconds(getReplicationDelayMs());
+
         if (producer != null) {
             stats.outboundConnection = producer.getConnectionId();
             stats.outboundConnectedSince = producer.getConnectedSince();
@@ -159,11 +162,9 @@ public class NonPersistentReplicator extends 
AbstractReplicator implements Repli
         return stats;
     }
 
-    private long getReplicationDelayInSeconds() {
-        if (producer != null) {
-            return 
TimeUnit.MILLISECONDS.toSeconds(producer.getDelayInMillis());
-        }
-        return 0L;
+    @Override
+    public NonPersistentReplicatorStatsImpl getStats() {
+        return stats;
     }
 
     private static final class ProducerSendCallback implements SendCallback {
@@ -256,10 +257,4 @@ public class NonPersistentReplicator extends 
AbstractReplicator implements Repli
     protected void disableReplicatorRead() {
         // No-op
     }
-
-    @Override
-    public boolean isConnected() {
-        ProducerImpl<?> producer = this.producer;
-        return producer != null && producer.isConnected();
-    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index a6f65f6da32..0c6ebdfefa0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -961,7 +961,7 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic, TopicPol
         });
 
         replicators.forEach((cluster, replicator) -> {
-            NonPersistentReplicatorStatsImpl replicatorStats = 
replicator.getStats();
+            NonPersistentReplicatorStatsImpl replicatorStats = 
replicator.computeStats();
 
             // Add incoming msg rates
             PublisherStatsImpl pubStats = 
remotePublishersStats.get(replicator.getRemoteCluster());
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java
index 1314b2d2ed0..1d9df2bcccd 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java
@@ -166,6 +166,8 @@ public class GeoPersistentReplicator extends 
PersistentReplicator {
                     msg.getMessageBuilder().clearTxnidMostBits();
                     msg.getMessageBuilder().clearTxnidLeastBits();
                     msgOut.recordEvent(headersAndPayload.readableBytes());
+                    stats.incrementMsgOutCounter();
+                    
stats.incrementBytesOutCounter(headersAndPayload.readableBytes());
                     // Increment pending messages for messages produced locally
                     PENDING_MESSAGES_UPDATER.incrementAndGet(this);
                     producer.sendAsync(msg, ProducerSendCallback.create(this, 
entry, msg));
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index 6263c512997..aa53a93da5c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -28,11 +28,13 @@ import io.netty.buffer.ByteBuf;
 import io.netty.util.Recycler;
 import io.netty.util.Recycler.Handle;
 import java.util.List;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import lombok.Getter;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
@@ -107,7 +109,8 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
     // for connected subscriptions, message expiry will be checked if the 
backlog is greater than this threshold
     private static final int MINIMUM_BACKLOG_FOR_EXPIRY_CHECK = 1000;
 
-    private final ReplicatorStatsImpl stats = new ReplicatorStatsImpl();
+    @Getter
+    protected final ReplicatorStatsImpl stats = new ReplicatorStatsImpl();
 
     protected volatile boolean fetchSchemaInProgress = false;
 
@@ -118,7 +121,7 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
         super(localCluster, localTopic, remoteCluster, remoteTopic, 
localTopic.getReplicatorPrefix(),
                 brokerService, replicationClient);
         this.topic = localTopic;
-        this.cursor = cursor;
+        this.cursor = Objects.requireNonNull(cursor);
         this.expiryMonitor = new PersistentMessageExpiryMonitor(localTopic,
                 Codec.decode(cursor.getName()), cursor, null);
         HAVE_PENDING_READ_UPDATER.set(this, FALSE);
@@ -186,12 +189,14 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
         return cursor.getNumberOfEntriesInBacklog(true);
     }
 
+    public long getMessageExpiredCount() {
+        return expiryMonitor.getTotalMessageExpired();
+    }
+
     @Override
     protected void disableReplicatorRead() {
-        if (this.cursor != null) {
-            // deactivate cursor after successfully close the producer
-            this.cursor.setInactive();
-        }
+        // deactivate cursor after successfully close the producer
+        this.cursor.setInactive();
     }
 
     /**
@@ -330,12 +335,10 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
     }
 
     public void updateCursorState() {
-        if (this.cursor != null) {
-            if (producer != null && producer.isConnected()) {
-                this.cursor.setActive();
-            } else {
-                this.cursor.setInactive();
-            }
+        if (isConnected()) {
+            cursor.setActive();
+        } else {
+            cursor.setInactive();
         }
     }
 
@@ -595,10 +598,10 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
         stats.msgRateExpired = msgExpired.getRate() + 
expiryMonitor.getMessageExpiryRate();
     }
 
-    public ReplicatorStatsImpl getStats() {
-        stats.replicationBacklog = cursor != null ? 
cursor.getNumberOfEntriesInBacklog(false) : 0;
-        stats.connected = producer != null && producer.isConnected();
-        stats.replicationDelayInSeconds = getReplicationDelayInSeconds();
+    public ReplicatorStatsImpl computeStats() {
+        stats.replicationBacklog = cursor.getNumberOfEntriesInBacklog(false);
+        stats.connected = isConnected();
+        stats.replicationDelayInSeconds = 
TimeUnit.MILLISECONDS.toSeconds(getReplicationDelayMs());
 
         ProducerImpl producer = this.producer;
         if (producer != null) {
@@ -616,13 +619,6 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
         this.messageTTLInSeconds = messageTTLInSeconds;
     }
 
-    private long getReplicationDelayInSeconds() {
-        if (producer != null) {
-            return 
TimeUnit.MILLISECONDS.toSeconds(producer.getDelayInMillis());
-        }
-        return 0L;
-    }
-
     @Override
     public boolean expireMessages(int messageTTLInSeconds) {
         if ((cursor.getNumberOfEntriesInBacklog(false) == 0)
@@ -691,12 +687,6 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
         }
     }
 
-    @Override
-    public boolean isConnected() {
-        ProducerImpl<?> producer = this.producer;
-        return producer != null && producer.isConnected();
-    }
-
     @Override
     protected void doReleaseResources() {
         dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 1983fa3c383..6e3d49fbe9f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2353,7 +2353,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
             }
 
             // Update replicator stats
-            ReplicatorStatsImpl rStat = replicator.getStats();
+            ReplicatorStatsImpl rStat = replicator.computeStats();
 
             // Add incoming msg rates
             PublisherStatsImpl pubStats = 
topicStatsHelper.remotePublishersStats.get(replicator.getRemoteCluster());
@@ -2636,7 +2636,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         });
 
         replicators.forEach((cluster, replicator) -> {
-            ReplicatorStatsImpl replicatorStats = replicator.getStats();
+            ReplicatorStatsImpl replicatorStats = replicator.computeStats();
 
             // Add incoming msg rates
             PublisherStatsImpl pubStats = 
remotePublishersStats.get(replicator.getRemoteCluster());
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java
index 85e837ff187..25591857aa1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java
@@ -92,6 +92,8 @@ public class ShadowReplicator extends PersistentReplicator {
                 dispatchRateLimiter.ifPresent(rateLimiter -> 
rateLimiter.consumeDispatchQuota(1, entry.getLength()));
 
                 msgOut.recordEvent(headersAndPayload.readableBytes());
+                stats.incrementMsgOutCounter();
+                
stats.incrementBytesOutCounter(headersAndPayload.readableBytes());
 
                 msg.setReplicatedFrom(localCluster);
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryReplicatorStats.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryReplicatorStats.java
new file mode 100644
index 00000000000..04bc805a64b
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryReplicatorStats.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats;
+
+import io.opentelemetry.api.metrics.BatchCallback;
+import io.opentelemetry.api.metrics.ObservableDoubleMeasurement;
+import io.opentelemetry.api.metrics.ObservableLongMeasurement;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.service.AbstractReplicator;
+import org.apache.pulsar.broker.service.nonpersistent.NonPersistentReplicator;
+import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
+import org.apache.pulsar.common.stats.MetricsUtil;
+
+public class OpenTelemetryReplicatorStats implements AutoCloseable {
+
+    // Replaces pulsar_replication_rate_in
+    public static final String MESSAGE_IN_COUNTER = 
"pulsar.broker.replication.message.incoming.count";
+    private final ObservableLongMeasurement messageInCounter;
+
+    // Replaces pulsar_replication_rate_out
+    public static final String MESSAGE_OUT_COUNTER = 
"pulsar.broker.replication.message.outgoing.count";
+    private final ObservableLongMeasurement messageOutCounter;
+
+    // Replaces pulsar_replication_throughput_in
+    public static final String BYTES_IN_COUNTER = 
"pulsar.broker.replication.message.incoming.size";
+    private final ObservableLongMeasurement bytesInCounter;
+
+    // Replaces pulsar_replication_throughput_out
+    public static final String BYTES_OUT_COUNTER = 
"pulsar.broker.replication.message.outgoing.size";
+    private final ObservableLongMeasurement bytesOutCounter;
+
+    // Replaces pulsar_replication_backlog
+    public static final String BACKLOG_COUNTER = 
"pulsar.broker.replication.message.backlog.count";
+    private final ObservableLongMeasurement backlogCounter;
+
+    // Replaces pulsar_replication_delay_in_seconds
+    public static final String DELAY_GAUGE = 
"pulsar.broker.replication.message.backlog.age";
+    private final ObservableDoubleMeasurement delayGauge;
+
+    // Replaces pulsar_replication_rate_expired
+    public static final String EXPIRED_COUNTER = 
"pulsar.broker.replication.message.expired.count";
+    private final ObservableLongMeasurement expiredCounter;
+
+    public static final String DROPPED_COUNTER = 
"pulsar.broker.replication.message.dropped.count";
+    private final ObservableLongMeasurement droppedCounter;
+
+    private final BatchCallback batchCallback;
+
+    public OpenTelemetryReplicatorStats(PulsarService pulsar) {
+        var meter = pulsar.getOpenTelemetry().getMeter();
+
+        messageInCounter = meter
+                .upDownCounterBuilder(MESSAGE_IN_COUNTER)
+                .setUnit("{message}")
+                .setDescription(
+                        "The total number of messages received from the remote 
cluster through this replicator.")
+                .buildObserver();
+
+        messageOutCounter = meter
+                .upDownCounterBuilder(MESSAGE_OUT_COUNTER)
+                .setUnit("{message}")
+                .setDescription("The total number of messages sent to the 
remote cluster through this replicator.")
+                .buildObserver();
+
+        bytesInCounter = meter
+                .upDownCounterBuilder(BYTES_IN_COUNTER)
+                .setUnit("{By}")
+                .setDescription(
+                        "The total number of messages bytes received from the 
remote cluster through this replicator.")
+                .buildObserver();
+
+        bytesOutCounter = meter
+                .upDownCounterBuilder(BYTES_OUT_COUNTER)
+                .setUnit("{By}")
+                .setDescription(
+                        "The total number of messages bytes sent to the remote 
cluster through this replicator.")
+                .buildObserver();
+
+        backlogCounter = meter
+                .upDownCounterBuilder(BACKLOG_COUNTER)
+                .setUnit("{message}")
+                .setDescription("The total number of messages in the backlog 
for this replicator.")
+                .buildObserver();
+
+        delayGauge = meter
+                .gaugeBuilder(DELAY_GAUGE)
+                .setUnit("s")
+                .setDescription("The age of the oldest message in the 
replicator backlog.")
+                .buildObserver();
+
+        expiredCounter = meter
+                .upDownCounterBuilder(EXPIRED_COUNTER)
+                .setUnit("{message}")
+                .setDescription("The total number of messages that expired for 
this replicator.")
+                .buildObserver();
+
+        droppedCounter = meter
+                .upDownCounterBuilder(DROPPED_COUNTER)
+                .setUnit("{message}")
+                .setDescription("The total number of messages dropped by this 
replicator.")
+                .buildObserver();
+
+        batchCallback = meter.batchCallback(() -> pulsar.getBrokerService()
+                        .getTopics()
+                        .values()
+                        .stream()
+                        .filter(topicFuture -> topicFuture.isDone() && 
!topicFuture.isCompletedExceptionally())
+                        .map(CompletableFuture::join)
+                        .filter(Optional::isPresent)
+                        .map(Optional::get)
+                        .flatMap(topic -> 
topic.getReplicators().values().stream())
+                        .map(AbstractReplicator.class::cast)
+                        .forEach(this::recordMetricsForReplicator),
+                messageInCounter,
+                messageOutCounter,
+                bytesInCounter,
+                bytesOutCounter,
+                backlogCounter,
+                delayGauge,
+                expiredCounter,
+                droppedCounter);
+    }
+
+    @Override
+    public void close() {
+        batchCallback.close();
+    }
+
+    private void recordMetricsForReplicator(AbstractReplicator replicator) {
+        var attributes = replicator.getAttributes();
+        var stats = replicator.getStats();
+
+        messageInCounter.record(stats.getMsgInCount(), attributes);
+        messageOutCounter.record(stats.getMsgOutCount(), attributes);
+        bytesInCounter.record(stats.getBytesInCount(), attributes);
+        bytesOutCounter.record(stats.getBytesOutCount(), attributes);
+        var delaySeconds = 
MetricsUtil.convertToSeconds(replicator.getReplicationDelayMs(), 
TimeUnit.MILLISECONDS);
+        delayGauge.record(delaySeconds, attributes);
+
+        if (replicator instanceof PersistentReplicator persistentReplicator) {
+            
expiredCounter.record(persistentReplicator.getMessageExpiredCount(), 
attributes);
+            
backlogCounter.record(persistentReplicator.getNumberOfEntriesInBacklog(), 
attributes);
+        } else if (replicator instanceof NonPersistentReplicator 
nonPersistentReplicator) {
+            
droppedCounter.record(nonPersistentReplicator.getStats().getMsgDropCount(), 
attributes);
+        }
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index 3bbc9100b36..a229ef54c79 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -290,7 +290,7 @@ public class NamespaceStatsAggregator {
         }
 
         topic.getReplicators().forEach((cluster, replicator) -> {
-            ReplicatorStatsImpl replStats = replicator.getStats();
+            ReplicatorStatsImpl replStats = replicator.computeStats();
             AggregatedReplicationStats aggReplStats = 
stats.replicationStats.get(replicator.getRemoteCluster());
             if (aggReplStats == null) {
                 aggReplStats = new AggregatedReplicationStats();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java
index d20f5f0d520..64d3088b206 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java
@@ -139,6 +139,11 @@ public class AbstractReplicatorTest {
             return PositionFactory.EARLIEST;
         }
 
+        @Override
+        public ReplicatorStatsImpl computeStats() {
+            return null;
+        }
+
         @Override
         public ReplicatorStatsImpl getStats() {
             return null;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index d83b2ed4ee6..1c47abab775 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -18,11 +18,15 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static 
io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
 import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName;
+import static 
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricDoubleGaugeValue;
+import static 
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
@@ -31,6 +35,7 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.scurrilous.circe.checksum.Crc32cIntChecksum;
 import io.netty.buffer.ByteBuf;
+import io.opentelemetry.api.common.Attributes;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.nio.charset.StandardCharsets;
@@ -58,10 +63,10 @@ import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import 
org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedException;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.PositionFactory;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
-import org.apache.bookkeeper.mledger.PositionFactory;
 import org.apache.commons.lang3.RandomUtils;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.PulsarService;
@@ -69,6 +74,7 @@ import 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
 import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.stats.OpenTelemetryReplicatorStats;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
@@ -104,6 +110,7 @@ import 
org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
 import org.apache.pulsar.schema.Schemas;
 import org.awaitility.Awaitility;
 import org.awaitility.reflect.WhiteboxImpl;
@@ -667,7 +674,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
         Thread.sleep(100);
         replicator.updateRates(); // for code-coverage
         replicator.expireMessages(1); // for code-coverage
-        ReplicatorStats status = replicator.getStats();
+        ReplicatorStats status = replicator.computeStats();
         assertEquals(status.getReplicationBacklog(), 0);
     }
 
@@ -697,7 +704,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
 
         replicator.updateRates();
 
-        ReplicatorStats status = replicator.getStats();
+        ReplicatorStats status = replicator.computeStats();
         assertEquals(status.getReplicationBacklog(), 0);
     }
 
@@ -997,14 +1004,28 @@ public class ReplicatorTest extends ReplicatorTestBase {
 
             Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000);
 
-            assertEquals(replicator.getStats().replicationBacklog, 0);
+            assertEquals(replicator.computeStats().replicationBacklog, 0);
+            var attributes = Attributes.of(
+                    OpenTelemetryAttributes.PULSAR_DOMAIN, 
dest.getDomain().value(),
+                    OpenTelemetryAttributes.PULSAR_TENANT, dest.getTenant(),
+                    OpenTelemetryAttributes.PULSAR_NAMESPACE, 
dest.getNamespace(),
+                    OpenTelemetryAttributes.PULSAR_TOPIC, 
dest.getPartitionedTopicName(),
+                    
OpenTelemetryAttributes.PULSAR_REPLICATION_REMOTE_CLUSTER_NAME, cluster2
+            );
+            var metrics = metricReader1.collectAllMetrics();
+            assertMetricLongSumValue(metrics, 
OpenTelemetryReplicatorStats.BACKLOG_COUNTER, attributes, 0);
+            assertMetricDoubleGaugeValue(metrics, 
OpenTelemetryReplicatorStats.DELAY_GAUGE, attributes, 0.0);
 
             // Next message will not be replicated, because r2 has reached the 
quota
             producer1.produce(1);
 
             Thread.sleep(500);
 
-            assertEquals(replicator.getStats().replicationBacklog, 1);
+            assertEquals(replicator.computeStats().replicationBacklog, 1);
+            metrics = metricReader1.collectAllMetrics();
+            assertMetricLongSumValue(metrics, 
OpenTelemetryReplicatorStats.BACKLOG_COUNTER, attributes, 1);
+            assertMetricDoubleGaugeValue(metrics, 
OpenTelemetryReplicatorStats.DELAY_GAUGE, attributes,
+                    aDouble -> assertThat(aDouble).isPositive());
 
             // Consumer will now drain 1 message and the replication backlog 
will be cleared
             consumer2.receive(1);
@@ -1013,13 +1034,16 @@ public class ReplicatorTest extends ReplicatorTestBase {
             consumer2.receive(1);
 
             int retry = 10;
-            for (int i = 0; i < retry && 
replicator.getStats().replicationBacklog > 0; i++) {
+            for (int i = 0; i < retry && 
replicator.computeStats().replicationBacklog > 0; i++) {
                 if (i != retry - 1) {
                     Thread.sleep(100);
                 }
             }
 
-            assertEquals(replicator.getStats().replicationBacklog, 0);
+            assertEquals(replicator.computeStats().replicationBacklog, 0);
+            metrics = metricReader1.collectAllMetrics();
+            assertMetricLongSumValue(metrics, 
OpenTelemetryReplicatorStats.BACKLOG_COUNTER, attributes, 0);
+            assertMetricDoubleGaugeValue(metrics, 
OpenTelemetryReplicatorStats.DELAY_GAUGE, attributes, 0.0);
         }
     }
 
@@ -1813,6 +1837,72 @@ public class ReplicatorTest extends ReplicatorTestBase {
         assertEquals(result, Lists.newArrayList("V1", "V2", "V3", "V4"));
     }
 
+    @Test
+    public void testReplicationMetrics() throws Exception {
+        var destTopicName = 
TopicName.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/replicationMetrics"));
+
+        @Cleanup
+        var producer1 = new MessageProducer(url1, destTopicName);
+
+        @Cleanup
+        var consumer1 = new MessageConsumer(url1, destTopicName);
+
+        @Cleanup
+        var consumer2 = new MessageConsumer(url2, destTopicName);
+
+        // Produce from cluster 1 and consume from the 1 and 2.
+        producer1.produce(3);
+        consumer1.receive(2);
+        consumer2.receive(1);
+
+        {
+            // Validate replicator metrics on cluster 1 from cluster 2
+            var attributes = Attributes.of(
+                    OpenTelemetryAttributes.PULSAR_DOMAIN, 
destTopicName.getDomain().value(),
+                    OpenTelemetryAttributes.PULSAR_TENANT, 
destTopicName.getTenant(),
+                    OpenTelemetryAttributes.PULSAR_NAMESPACE, 
destTopicName.getNamespace(),
+                    OpenTelemetryAttributes.PULSAR_TOPIC, 
destTopicName.getPartitionedTopicName(),
+                    
OpenTelemetryAttributes.PULSAR_REPLICATION_REMOTE_CLUSTER_NAME, cluster2
+            );
+            var metrics = metricReader1.collectAllMetrics();
+            assertMetricLongSumValue(metrics, 
OpenTelemetryReplicatorStats.MESSAGE_OUT_COUNTER, attributes, 3);
+            assertMetricLongSumValue(metrics, 
OpenTelemetryReplicatorStats.BYTES_OUT_COUNTER, attributes,
+                    aLong -> assertThat(aLong).isPositive());
+
+            var topicOpt = 
pulsar1.getBrokerService().getTopicReference(destTopicName.toString());
+            assertThat(topicOpt).isPresent();
+            var topic = topicOpt.get();
+            var persistentReplicators = topic.getReplicators()
+                    .values()
+                    .stream()
+                    .map(PersistentReplicator.class::cast)
+                    .toList();
+            persistentReplicators.forEach(this::pauseReplicator);
+            producer1.produce(5);
+            Awaitility.await().untilAsserted(() -> {
+                persistentReplicators.forEach(repl -> repl.expireMessages(1));
+                assertMetricLongSumValue(metricReader1.collectAllMetrics(),
+                        OpenTelemetryReplicatorStats.EXPIRED_COUNTER,
+                        attributes, 5);
+            });
+        }
+
+        {
+            // Validate replicator metrics on cluster 2 from cluster 1
+            var attributes = Attributes.of(
+                    OpenTelemetryAttributes.PULSAR_DOMAIN, 
destTopicName.getDomain().value(),
+                    OpenTelemetryAttributes.PULSAR_TENANT, 
destTopicName.getTenant(),
+                    OpenTelemetryAttributes.PULSAR_NAMESPACE, 
destTopicName.getNamespace(),
+                    OpenTelemetryAttributes.PULSAR_TOPIC, 
destTopicName.getPartitionedTopicName(),
+                    
OpenTelemetryAttributes.PULSAR_REPLICATION_REMOTE_CLUSTER_NAME, cluster1
+            );
+            var metrics = metricReader2.collectAllMetrics();
+            assertMetricLongSumValue(metrics, 
OpenTelemetryReplicatorStats.MESSAGE_IN_COUNTER, attributes, 3);
+            assertMetricLongSumValue(metrics, 
OpenTelemetryReplicatorStats.BYTES_IN_COUNTER, attributes,
+                    aLong -> assertThat(aLong).isPositive());
+        }
+    }
+
     @Test
     public void testEnableReplicationWithNamespaceAllowedClustersPolices() 
throws Exception {
         log.info("--- testEnableReplicationWithNamespaceAllowedClustersPolices 
---");
@@ -1873,5 +1963,8 @@ public class ReplicatorTest extends ReplicatorTestBase {
             assertTrue(replicator.isConnected());
         });
         replicator.closeProducerAsync(true);
+        Awaitility.await().untilAsserted(() -> {
+            assertFalse(replicator.isConnected());
+        });
     }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
index 838632febd8..33877b68118 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
@@ -21,12 +21,10 @@ package org.apache.pulsar.broker.service;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
-
-import com.google.common.io.Resources;
 import com.google.common.collect.Sets;
-
+import com.google.common.io.Resources;
 import io.netty.util.concurrent.DefaultThreadFactory;
-
+import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
 import java.net.URL;
 import java.util.Optional;
 import java.util.Set;
@@ -35,12 +33,9 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.TopicType;
-import org.apache.pulsar.tests.TestRetrySupport;
+import org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
@@ -51,7 +46,11 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.policies.data.TopicType;
+import org.apache.pulsar.functions.worker.WorkerConfig;
+import org.apache.pulsar.tests.TestRetrySupport;
 import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
 import org.apache.pulsar.zookeeper.ZookeeperServerTest;
 import org.slf4j.Logger;
@@ -63,6 +62,7 @@ public abstract class ReplicatorTestBase extends 
TestRetrySupport {
     ServiceConfiguration config1 = new ServiceConfiguration();
     PulsarService pulsar1;
     BrokerService ns1;
+    protected InMemoryMetricReader metricReader1;
 
     PulsarAdmin admin1;
     LocalBookkeeperEnsemble bkEnsemble1;
@@ -74,6 +74,7 @@ public abstract class ReplicatorTestBase extends 
TestRetrySupport {
     BrokerService ns2;
     PulsarAdmin admin2;
     LocalBookkeeperEnsemble bkEnsemble2;
+    protected InMemoryMetricReader metricReader2;
 
     URL url3;
     URL urlTls3;
@@ -82,6 +83,7 @@ public abstract class ReplicatorTestBase extends 
TestRetrySupport {
     BrokerService ns3;
     PulsarAdmin admin3;
     LocalBookkeeperEnsemble bkEnsemble3;
+    protected InMemoryMetricReader metricReader3;
 
     URL url4;
     URL urlTls4;
@@ -89,6 +91,7 @@ public abstract class ReplicatorTestBase extends 
TestRetrySupport {
     PulsarService pulsar4;
     PulsarAdmin admin4;
     LocalBookkeeperEnsemble bkEnsemble4;
+    protected InMemoryMetricReader metricReader4;
 
     ZookeeperServerTest globalZkS;
 
@@ -154,7 +157,8 @@ public abstract class ReplicatorTestBase extends 
TestRetrySupport {
         // completely
         // independent config objects instead of referring to the same 
properties object
         setConfig1DefaultValue();
-        pulsar1 = new PulsarService(config1);
+        metricReader1 = InMemoryMetricReader.create();
+        pulsar1 = buildPulsarService(config1, metricReader1);
         pulsar1.start();
         ns1 = pulsar1.getBrokerService();
 
@@ -169,7 +173,8 @@ public abstract class ReplicatorTestBase extends 
TestRetrySupport {
         bkEnsemble2.start();
 
         setConfig2DefaultValue();
-        pulsar2 = new PulsarService(config2);
+        metricReader2 = InMemoryMetricReader.create();
+        pulsar2 = buildPulsarService(config2, metricReader2);
         pulsar2.start();
         ns2 = pulsar2.getBrokerService();
 
@@ -184,7 +189,8 @@ public abstract class ReplicatorTestBase extends 
TestRetrySupport {
         bkEnsemble3.start();
 
         setConfig3DefaultValue();
-        pulsar3 = new PulsarService(config3);
+        metricReader3 = InMemoryMetricReader.create();
+        pulsar3 = buildPulsarService(config3, metricReader3);
         pulsar3.start();
         ns3 = pulsar3.getBrokerService();
 
@@ -199,7 +205,8 @@ public abstract class ReplicatorTestBase extends 
TestRetrySupport {
         bkEnsemble4.start();
 
         setConfig4DefaultValue();
-        pulsar4 = new PulsarService(config4);
+        metricReader4 = InMemoryMetricReader.create();
+        pulsar4 = buildPulsarService(config4, metricReader4);
         pulsar4.start();
 
         url4 = new URL(pulsar4.getWebServiceAddress());
@@ -312,6 +319,14 @@ public abstract class ReplicatorTestBase extends 
TestRetrySupport {
 
     }
 
+    private PulsarService buildPulsarService(ServiceConfiguration config, 
InMemoryMetricReader metricReader) {
+        return new PulsarService(config,
+                new WorkerConfig(),
+                Optional.empty(),
+                exitCode -> log.info("Pulsar service finished with exit code 
{}", exitCode),
+                
BrokerOpenTelemetryTestUtil.getOpenTelemetrySdkBuilderConsumer(metricReader));
+    }
+
     public void setConfig3DefaultValue() {
         setConfigDefaults(config3, cluster3, bkEnsemble3);
         config3.setTlsEnabled(true);
@@ -409,6 +424,23 @@ public abstract class ReplicatorTestBase extends 
TestRetrySupport {
             admin4 = null;
         }
 
+        if (metricReader4 != null) {
+            metricReader4.close();
+            metricReader4 = null;
+        }
+        if (metricReader3 != null) {
+            metricReader3.close();
+            metricReader3 = null;
+        }
+        if (metricReader2 != null) {
+            metricReader2.close();
+            metricReader2 = null;
+        }
+        if (metricReader1 != null) {
+            metricReader1.close();
+            metricReader1 = null;
+        }
+
         if (pulsar4 != null) {
             pulsar4.close();
             pulsar4 = null;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerOpenTelemetryTestUtil.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerOpenTelemetryTestUtil.java
index cb61677ab95..d7ad0588201 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerOpenTelemetryTestUtil.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerOpenTelemetryTestUtil.java
@@ -89,4 +89,22 @@ public class BrokerOpenTelemetryTestUtil {
                                             
valueConsumer.accept(point.getValue());
                                         }))));
     }
+
+    public static void assertMetricDoubleGaugeValue(Collection<MetricData> 
metrics, String metricName,
+                                                    Attributes attributes, 
double expected) {
+        assertMetricDoubleGaugeValue(metrics, metricName, attributes, actual 
-> assertThat(actual).isEqualTo(expected));
+    }
+
+    public static void assertMetricDoubleGaugeValue(Collection<MetricData> 
metrics, String metricName,
+                                                  Attributes attributes, 
Consumer<Double> valueConsumer) {
+        assertThat(metrics)
+                .anySatisfy(metric -> assertThat(metric)
+                        .hasName(metricName)
+                        .hasDoubleGaugeSatisfying(gauge -> gauge.satisfies(
+                                pointData -> 
assertThat(pointData.getPoints()).anySatisfy(
+                                        point -> {
+                                            
assertThat(point.getAttributes()).isEqualTo(attributes);
+                                            
valueConsumer.accept(point.getValue());
+                                        }))));
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
index e99802a5bc5..157df118530 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
@@ -193,6 +193,7 @@ public class BrokerServiceLookupTest extends 
ProducerConsumerBase {
         pulsar2.getOpenTelemetryTopicStats().close();
         pulsar2.getOpenTelemetryConsumerStats().close();
         pulsar2.getOpenTelemetryProducerStats().close();
+        pulsar2.getOpenTelemetryReplicatorStats().close();
 
         var metricReader = pulsarTestContext.getOpenTelemetryMetricReader();
         var lookupRequestSemaphoreField = 
BrokerService.class.getDeclaredField("lookupRequestSemaphore");
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/NonPersistentReplicatorStats.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/NonPersistentReplicatorStats.java
index 6c77de91957..bfeeb6d037a 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/NonPersistentReplicatorStats.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/NonPersistentReplicatorStats.java
@@ -27,4 +27,7 @@ public interface NonPersistentReplicatorStats extends 
ReplicatorStats {
      * for non-persistent topic: broker drops msg for replicator if replicator 
connection is not writable.
      **/
     double getMsgDropRate();
+
+    /** Total number of messages dropped by the broker for the replicator. */
+    long getMsgDropCount();
 }
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ReplicatorStats.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ReplicatorStats.java
index 24be2f9380b..1790cc35f50 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ReplicatorStats.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ReplicatorStats.java
@@ -24,20 +24,40 @@ package org.apache.pulsar.common.policies.data;
 public interface ReplicatorStats {
 
     /** Total rate of messages received from the remote cluster (msg/s). */
+    @Deprecated
     double getMsgRateIn();
 
+    /** Total number of messages received from the remote cluster. */
+    long getMsgInCount();
+
     /** Total throughput received from the remote cluster (bytes/s). */
+    @Deprecated
     double getMsgThroughputIn();
 
+    /** Total number of bytes received from the remote cluster. */
+    long getBytesInCount();
+
     /** Total rate of messages delivered to the replication-subscriber 
(msg/s). */
+    @Deprecated
     double getMsgRateOut();
 
+    /** Total number of messages sent to the remote cluster. */
+    long getMsgOutCount();
+
     /** Total throughput delivered to the replication-subscriber (bytes/s). */
+    @Deprecated
     double getMsgThroughputOut();
 
+    /** Total number of bytes sent to the remote cluster. */
+    long getBytesOutCount();
+
     /** Total rate of messages expired (msg/s). */
+    @Deprecated
     double getMsgRateExpired();
 
+    /** Total number of messages expired. */
+    long getMsgExpiredCount();
+
     /** Number of messages pending to be replicated to remote cluster. */
     long getReplicationBacklog();
 
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentReplicatorStatsImpl.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentReplicatorStatsImpl.java
index 98f838a9449..a09d03b21a0 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentReplicatorStatsImpl.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentReplicatorStatsImpl.java
@@ -18,27 +18,43 @@
  */
 package org.apache.pulsar.common.policies.data.stats;
 
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
 import java.util.Objects;
-import lombok.Getter;
+import java.util.concurrent.atomic.LongAdder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
 import org.apache.pulsar.common.policies.data.NonPersistentReplicatorStats;
 
 /**
  * Statistics for a non-persistent replicator.
  */
-@SuppressFBWarnings("EQ_DOESNT_OVERRIDE_EQUALS")
+@Data
+@EqualsAndHashCode(callSuper = true)
 public class NonPersistentReplicatorStatsImpl extends ReplicatorStatsImpl 
implements NonPersistentReplicatorStats {
 
     /**
      * for non-persistent topic: broker drops msg for replicator if replicator 
connection is not writable.
      **/
-    @Getter
     public double msgDropRate;
 
+    @JsonIgnore
+    private final LongAdder msgDropCount = new LongAdder();
+
     public NonPersistentReplicatorStatsImpl 
add(NonPersistentReplicatorStatsImpl stats) {
         Objects.requireNonNull(stats);
         super.add(stats);
         this.msgDropRate += stats.msgDropRate;
         return this;
     }
+
+    @Override
+    @JsonProperty
+    public long getMsgDropCount() {
+        return msgDropCount.sum();
+    }
+
+    public void incrementMsgDropCount() {
+        msgDropCount.increment();
+    }
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ReplicatorStatsImpl.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ReplicatorStatsImpl.java
index 6933f5cc7ed..c19169cbee5 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ReplicatorStatsImpl.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ReplicatorStatsImpl.java
@@ -18,7 +18,10 @@
  */
 package org.apache.pulsar.common.policies.data.stats;
 
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
 import java.util.Objects;
+import java.util.concurrent.atomic.LongAdder;
 import lombok.Data;
 import org.apache.pulsar.common.policies.data.ReplicatorStats;
 
@@ -31,15 +34,27 @@ public class ReplicatorStatsImpl implements ReplicatorStats 
{
     /** Total rate of messages received from the remote cluster (msg/s). */
     public double msgRateIn;
 
+    @JsonIgnore
+    private final LongAdder msgInCount = new LongAdder();
+
     /** Total throughput received from the remote cluster (bytes/s). */
     public double msgThroughputIn;
 
+    @JsonIgnore
+    private final LongAdder bytesInCount = new LongAdder();
+
     /** Total rate of messages delivered to the replication-subscriber 
(msg/s). */
     public double msgRateOut;
 
+    @JsonIgnore
+    private final LongAdder msgOutCount = new LongAdder();
+
     /** Total throughput delivered to the replication-subscriber (bytes/s). */
     public double msgThroughputOut;
 
+    @JsonIgnore
+    private final LongAdder bytesOutCount = new LongAdder();
+
     /** Total rate of messages expired (msg/s). */
     public double msgRateExpired;
 
@@ -72,10 +87,51 @@ public class ReplicatorStatsImpl implements ReplicatorStats 
{
         this.msgThroughputOut += stats.msgThroughputOut;
         this.msgRateExpired += stats.msgRateExpired;
         this.replicationBacklog += stats.replicationBacklog;
-        if (this.connected) {
-            this.connected &= stats.connected;
-        }
+        this.connected &= stats.connected;
         this.replicationDelayInSeconds = 
Math.max(this.replicationDelayInSeconds, stats.replicationDelayInSeconds);
         return this;
     }
+
+    @Override
+    @JsonProperty
+    public long getMsgInCount() {
+        return msgInCount.sum();
+    }
+
+    @Override
+    @JsonProperty
+    public long getBytesInCount() {
+        return bytesInCount.sum();
+    }
+
+    public void incrementPublishCount(int numOfMessages, long msgSizeInBytes) {
+        msgInCount.add(numOfMessages);
+        bytesInCount.add(msgSizeInBytes);
+    }
+
+    @Override
+    @JsonProperty
+    public long getMsgOutCount() {
+        return msgOutCount.sum();
+    }
+
+    public void incrementMsgOutCounter() {
+        msgOutCount.increment();
+    }
+
+    @Override
+    @JsonProperty
+    public long getBytesOutCount() {
+        return bytesOutCount.sum();
+    }
+
+    public void incrementBytesOutCounter(long bytes) {
+        bytesOutCount.add(bytes);
+    }
+
+    @Override
+    @JsonProperty
+    public long getMsgExpiredCount() {
+        return 0;
+    }
 }
diff --git 
a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
 
b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
index 6639cd68b39..31e527f0286 100644
--- 
a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
+++ 
b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
@@ -143,6 +143,12 @@ public interface OpenTelemetryAttributes {
         public final Attributes attributes = 
Attributes.of(PULSAR_BACKLOG_QUOTA_TYPE, name().toLowerCase());
     }
 
+    /**
+     * The name of the remote cluster for a Pulsar replicator.
+     */
+    AttributeKey<String> PULSAR_REPLICATION_REMOTE_CLUSTER_NAME =
+            AttributeKey.stringKey("pulsar.replication.remote.cluster.name");
+
     AttributeKey<String> PULSAR_CONNECTION_STATUS = 
AttributeKey.stringKey("pulsar.connection.status");
     enum ConnectionStatus {
         ACTIVE,

Reply via email to