This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 748175ce62e KAFKA-15189: only init remote topic metrics when enabled 
(#14133)
748175ce62e is described below

commit 748175ce62ea68d2a99ff9f96c7e349e1d0e1b96
Author: Luke Chen <[email protected]>
AuthorDate: Sat Aug 5 13:00:16 2023 +0800

    KAFKA-15189: only init remote topic metrics when enabled (#14133)
    
    Only initialize remote topic metrics when system-wise remote storage is 
enabled to avoid impacting performance for existing brokers. Also add tests.
    
    Reviewers: Divij Vaidya <[email protected]>, Kamal Chandraprakash 
<[email protected]>
---
 build.gradle                                       |  2 +
 checkstyle/import-control.xml                      |  4 +
 .../java/kafka/log/remote/RemoteLogManager.java    | 10 +--
 .../kafka/server/builders/KafkaApisBuilder.java    |  3 +-
 .../server/builders/ReplicaManagerBuilder.java     |  3 +-
 .../src/main/scala/kafka/server/BrokerServer.scala |  3 +-
 .../scala/kafka/server/KafkaRequestHandler.scala   | 61 +++++++-------
 core/src/main/scala/kafka/server/KafkaServer.scala |  2 +-
 .../kafka/log/remote/RemoteLogManagerTest.java     | 21 +++--
 .../java/kafka/log/remote/RemoteLogReaderTest.java |  9 +-
 .../scala/integration/kafka/api/MetricsTest.scala  | 30 ++++++-
 .../kafka/server/KafkaRequestHandlerTest.scala     | 23 ++++-
 .../unit/kafka/server/ReplicaManagerTest.scala     |  3 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |  4 +
 .../jmh/fetcher/ReplicaFetcherThreadBenchmark.java |  3 +-
 .../metadata/KRaftMetadataRequestBenchmark.java    |  2 +-
 .../jmh/metadata/MetadataRequestBenchmark.java     |  2 +-
 .../partition/PartitionMakeFollowerBenchmark.java  |  2 +-
 .../UpdateFollowerFetchStateBenchmark.java         |  2 +-
 .../kafka/jmh/record/BaseRecordBatchBenchmark.java |  3 +-
 .../apache/kafka/jmh/server/CheckpointBench.java   |  3 +-
 .../kafka/jmh/server/PartitionCreationBench.java   |  3 +-
 .../log/remote/storage/RemoteStorageMetrics.java   | 97 ++++++++++++++++++++++
 .../internals/log/RemoteStorageThreadPool.java     | 22 ++---
 24 files changed, 236 insertions(+), 81 deletions(-)

diff --git a/build.gradle b/build.gradle
index 76dec696680..250aed44911 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1638,6 +1638,8 @@ project(':storage:api') {
 
   dependencies {
     implementation project(':clients')
+    implementation project(':server-common')
+    implementation libs.metrics
     implementation libs.slf4jApi
 
     testImplementation project(':clients')
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 7ecc4d5a1d1..8148de598fa 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -261,6 +261,10 @@
       <allow pkg="org.apache.kafka.storage"/>
       <subpackage name="remote">
         <allow pkg="scala.collection" />
+        <subpackage name="storage">
+          <allow pkg="com.yammer.metrics.core" />
+          <allow pkg="org.apache.kafka.server.metrics" />
+        </subpackage>
       </subpackage>
 
     </subpackage>
diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java 
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index 33bde33882f..d07d60ce3a8 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -111,6 +111,7 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC;
 
 /**
  * This class is responsible for
@@ -123,8 +124,6 @@ public class RemoteLogManager implements Closeable {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(RemoteLogManager.class);
     private static final String REMOTE_LOG_READER_THREAD_NAME_PREFIX = 
"remote-log-reader";
-    public static final String REMOTE_LOG_READER_METRICS_NAME_PREFIX = 
"RemoteLogReader";
-    public static final String REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT = 
"RemoteLogManagerTasksAvgIdlePercent";
     private final RemoteLogManagerConfig rlmConfig;
     private final int brokerId;
     private final String logDir;
@@ -185,7 +184,7 @@ public class RemoteLogManager implements Closeable {
         delayInMs = rlmConfig.remoteLogManagerTaskIntervalMs();
         rlmScheduledThreadPool = new 
RLMScheduledThreadPool(rlmConfig.remoteLogManagerThreadPoolSize());
 
-        metricsGroup.newGauge(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT, new 
Gauge<Double>() {
+        
metricsGroup.newGauge(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC.getName(),
 new Gauge<Double>() {
             @Override
             public Double value() {
                 return rlmScheduledThreadPool.getIdlePercent();
@@ -195,13 +194,12 @@ public class RemoteLogManager implements Closeable {
         remoteStorageReaderThreadPool = new RemoteStorageThreadPool(
                 REMOTE_LOG_READER_THREAD_NAME_PREFIX,
                 rlmConfig.remoteLogReaderThreads(),
-                rlmConfig.remoteLogReaderMaxPendingTasks(),
-                REMOTE_LOG_READER_METRICS_NAME_PREFIX
+                rlmConfig.remoteLogReaderMaxPendingTasks()
         );
     }
 
     private void removeMetrics() {
-        metricsGroup.removeMetric(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT);
+        
metricsGroup.removeMetric(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC.getName());
         remoteStorageReaderThreadPool.removeMetrics();
     }
 
diff --git a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java 
b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
index d5403e0c57a..aed480beba5 100644
--- a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
+++ b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
@@ -38,6 +38,7 @@ import org.apache.kafka.server.authorizer.Authorizer;
 
 import java.util.Collections;
 import java.util.Optional;
+
 import scala.compat.java8.OptionConverters;
 
 
@@ -171,7 +172,7 @@ public class KafkaApisBuilder {
         if (metrics == null) throw new RuntimeException("You must set 
metrics");
         if (quotas == null) throw new RuntimeException("You must set quotas");
         if (fetchManager == null) throw new RuntimeException("You must set 
fetchManager");
-        if (brokerTopicStats == null) brokerTopicStats = new 
BrokerTopicStats();
+        if (brokerTopicStats == null) brokerTopicStats = new 
BrokerTopicStats(Optional.of(config));
         if (apiVersionManager == null) throw new RuntimeException("You must 
set apiVersionManager");
 
         return new KafkaApis(requestChannel,
diff --git 
a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java 
b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
index 5860aa17693..2566e4bcfcb 100644
--- a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
+++ b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
@@ -54,7 +54,7 @@ public class ReplicaManagerBuilder {
     private MetadataCache metadataCache = null;
     private LogDirFailureChannel logDirFailureChannel = null;
     private AlterPartitionManager alterPartitionManager = null;
-    private BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
+    private BrokerTopicStats brokerTopicStats = null;
     private AtomicBoolean isShuttingDown = new AtomicBoolean(false);
     private Optional<RemoteLogManager> remoteLogManager = Optional.empty();
     private Optional<KafkaZkClient> zkClient = Optional.empty();
@@ -179,6 +179,7 @@ public class ReplicaManagerBuilder {
         if (metadataCache == null) throw new RuntimeException("You must set 
metadataCache");
         if (logDirFailureChannel == null) throw new RuntimeException("You must 
set logDirFailureChannel");
         if (alterPartitionManager == null) throw new RuntimeException("You 
must set alterIsrManager");
+        if (brokerTopicStats == null) brokerTopicStats = new 
BrokerTopicStats(Optional.of(config));
         return new ReplicaManager(config,
                              metrics,
                              time,
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 1d0bd0124b0..825d9eb8c19 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -188,8 +188,7 @@ class BrokerServer(
       kafkaScheduler.startup()
 
       /* register broker metrics */
-      brokerTopicStats = new BrokerTopicStats
-
+      brokerTopicStats = new BrokerTopicStats(java.util.Optional.of(config))
 
       quotaManagers = QuotaFactory.instantiate(config, metrics, time, 
s"broker-${config.nodeId}-")
 
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala 
b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index 7a9f166b56d..6191bf88120 100755
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger
 import com.yammer.metrics.core.Meter
 import org.apache.kafka.common.internals.FatalExitError
 import org.apache.kafka.common.utils.{KafkaThread, Time}
+import org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics
 import org.apache.kafka.server.metrics.KafkaMetricsGroup
 
 import java.util.Collections
@@ -227,7 +228,7 @@ class KafkaRequestHandlerPool(val brokerId: Int,
   }
 }
 
-class BrokerTopicMetrics(name: Option[String]) {
+class BrokerTopicMetrics(name: Option[String], configOpt: 
java.util.Optional[KafkaConfig]) {
   private val metricsGroup = new KafkaMetricsGroup(this.getClass)
 
   val tags: java.util.Map[String, String] = name match {
@@ -277,17 +278,12 @@ class BrokerTopicMetrics(name: Option[String]) {
     BrokerTopicStats.TotalFetchRequestsPerSec -> 
MeterWrapper(BrokerTopicStats.TotalFetchRequestsPerSec, "requests"),
     BrokerTopicStats.FetchMessageConversionsPerSec -> 
MeterWrapper(BrokerTopicStats.FetchMessageConversionsPerSec, "requests"),
     BrokerTopicStats.ProduceMessageConversionsPerSec -> 
MeterWrapper(BrokerTopicStats.ProduceMessageConversionsPerSec, "requests"),
-    BrokerTopicStats.RemoteCopyBytesPerSec -> 
MeterWrapper(BrokerTopicStats.RemoteCopyBytesPerSec, "bytes"),
-    BrokerTopicStats.RemoteFetchBytesPerSec -> 
MeterWrapper(BrokerTopicStats.RemoteFetchBytesPerSec, "bytes"),
-    BrokerTopicStats.RemoteFetchRequestsPerSec -> 
MeterWrapper(BrokerTopicStats.RemoteFetchRequestsPerSec, "requests"),
-    BrokerTopicStats.RemoteCopyRequestsPerSec -> 
MeterWrapper(BrokerTopicStats.RemoteCopyRequestsPerSec, "requests"),
-    BrokerTopicStats.FailedRemoteFetchRequestsPerSec -> 
MeterWrapper(BrokerTopicStats.FailedRemoteFetchRequestsPerSec, "requests"),
-    BrokerTopicStats.FailedRemoteCopyRequestsPerSec -> 
MeterWrapper(BrokerTopicStats.FailedRemoteCopyRequestsPerSec, "requests"),
     BrokerTopicStats.NoKeyCompactedTopicRecordsPerSec -> 
MeterWrapper(BrokerTopicStats.NoKeyCompactedTopicRecordsPerSec, "requests"),
     BrokerTopicStats.InvalidMagicNumberRecordsPerSec -> 
MeterWrapper(BrokerTopicStats.InvalidMagicNumberRecordsPerSec, "requests"),
     BrokerTopicStats.InvalidMessageCrcRecordsPerSec -> 
MeterWrapper(BrokerTopicStats.InvalidMessageCrcRecordsPerSec, "requests"),
     BrokerTopicStats.InvalidOffsetOrSequenceRecordsPerSec -> 
MeterWrapper(BrokerTopicStats.InvalidOffsetOrSequenceRecordsPerSec, "requests")
   ).asJava)
+
   if (name.isEmpty) {
     metricTypeMap.put(BrokerTopicStats.ReplicationBytesInPerSec, 
MeterWrapper(BrokerTopicStats.ReplicationBytesInPerSec, "bytes"))
     metricTypeMap.put(BrokerTopicStats.ReplicationBytesOutPerSec, 
MeterWrapper(BrokerTopicStats.ReplicationBytesOutPerSec, "bytes"))
@@ -295,6 +291,18 @@ class BrokerTopicMetrics(name: Option[String]) {
     metricTypeMap.put(BrokerTopicStats.ReassignmentBytesOutPerSec, 
MeterWrapper(BrokerTopicStats.ReassignmentBytesOutPerSec, "bytes"))
   }
 
+  configOpt.ifPresent(config =>
+    if (config.remoteLogManagerConfig.enableRemoteStorageSystem()) {
+      metricTypeMap.putAll(Map(
+        RemoteStorageMetrics.REMOTE_COPY_BYTES_PER_SEC_METRIC.getName -> 
MeterWrapper(RemoteStorageMetrics.REMOTE_COPY_BYTES_PER_SEC_METRIC.getName, 
"bytes"),
+        RemoteStorageMetrics.REMOTE_FETCH_BYTES_PER_SEC_METRIC.getName -> 
MeterWrapper(RemoteStorageMetrics.REMOTE_FETCH_BYTES_PER_SEC_METRIC.getName, 
"bytes"),
+        RemoteStorageMetrics.REMOTE_FETCH_REQUESTS_PER_SEC_METRIC.getName -> 
MeterWrapper(RemoteStorageMetrics.REMOTE_FETCH_REQUESTS_PER_SEC_METRIC.getName, 
"requests"),
+        RemoteStorageMetrics.REMOTE_COPY_REQUESTS_PER_SEC_METRIC.getName -> 
MeterWrapper(RemoteStorageMetrics.REMOTE_COPY_REQUESTS_PER_SEC_METRIC.getName, 
"requests"),
+        RemoteStorageMetrics.FAILED_REMOTE_FETCH_PER_SEC_METRIC.getName -> 
MeterWrapper(RemoteStorageMetrics.FAILED_REMOTE_FETCH_PER_SEC_METRIC.getName, 
"requests"),
+        RemoteStorageMetrics.FAILED_REMOTE_COPY_PER_SEC_METRIC.getName -> 
MeterWrapper(RemoteStorageMetrics.FAILED_REMOTE_COPY_PER_SEC_METRIC.getName, 
"requests")
+      ).asJava)
+    })
+
   // used for testing only
   def metricMap: Map[String, MeterWrapper] = metricTypeMap.toMap
 
@@ -342,17 +350,17 @@ class BrokerTopicMetrics(name: Option[String]) {
 
   def invalidOffsetOrSequenceRecordsPerSec: Meter = 
metricTypeMap.get(BrokerTopicStats.InvalidOffsetOrSequenceRecordsPerSec).meter()
 
-  def remoteCopyBytesRate: Meter = 
metricTypeMap.get(BrokerTopicStats.RemoteCopyBytesPerSec).meter()
+  def remoteCopyBytesRate: Meter = 
metricTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_BYTES_PER_SEC_METRIC.getName).meter()
 
-  def remoteFetchBytesRate: Meter = 
metricTypeMap.get(BrokerTopicStats.RemoteFetchBytesPerSec).meter()
+  def remoteFetchBytesRate: Meter = 
metricTypeMap.get(RemoteStorageMetrics.REMOTE_FETCH_BYTES_PER_SEC_METRIC.getName).meter()
 
-  def remoteFetchRequestRate: Meter = 
metricTypeMap.get(BrokerTopicStats.RemoteFetchRequestsPerSec).meter()
+  def remoteFetchRequestRate: Meter = 
metricTypeMap.get(RemoteStorageMetrics.REMOTE_FETCH_REQUESTS_PER_SEC_METRIC.getName).meter()
 
-  def remoteCopyRequestRate: Meter = 
metricTypeMap.get(BrokerTopicStats.RemoteCopyRequestsPerSec).meter()
+  def remoteCopyRequestRate: Meter = 
metricTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_REQUESTS_PER_SEC_METRIC.getName).meter()
 
-  def failedRemoteFetchRequestRate: Meter = 
metricTypeMap.get(BrokerTopicStats.FailedRemoteFetchRequestsPerSec).meter()
+  def failedRemoteFetchRequestRate: Meter = 
metricTypeMap.get(RemoteStorageMetrics.FAILED_REMOTE_FETCH_PER_SEC_METRIC.getName).meter()
 
-  def failedRemoteCopyRequestRate: Meter = 
metricTypeMap.get(BrokerTopicStats.FailedRemoteCopyRequestsPerSec).meter()
+  def failedRemoteCopyRequestRate: Meter = 
metricTypeMap.get(RemoteStorageMetrics.FAILED_REMOTE_COPY_PER_SEC_METRIC.getName).meter()
 
   def closeMetric(metricType: String): Unit = {
     val meter = metricTypeMap.get(metricType)
@@ -378,27 +386,18 @@ object BrokerTopicStats {
   val ProduceMessageConversionsPerSec = "ProduceMessageConversionsPerSec"
   val ReassignmentBytesInPerSec = "ReassignmentBytesInPerSec"
   val ReassignmentBytesOutPerSec = "ReassignmentBytesOutPerSec"
-  val RemoteCopyBytesPerSec = "RemoteCopyBytesPerSec"
-  val RemoteFetchBytesPerSec = "RemoteFetchBytesPerSec"
-  val RemoteFetchRequestsPerSec = "RemoteFetchRequestsPerSec"
-  val RemoteCopyRequestsPerSec = "RemoteCopyRequestsPerSec"
-  val FailedRemoteFetchRequestsPerSec = "RemoteFetchErrorsPerSec"
-  val FailedRemoteCopyRequestsPerSec = "RemoteCopyErrorsPerSec"
-
   // These following topics are for LogValidator for better debugging on 
failed records
   val NoKeyCompactedTopicRecordsPerSec = "NoKeyCompactedTopicRecordsPerSec"
   val InvalidMagicNumberRecordsPerSec = "InvalidMagicNumberRecordsPerSec"
   val InvalidMessageCrcRecordsPerSec = "InvalidMessageCrcRecordsPerSec"
   val InvalidOffsetOrSequenceRecordsPerSec = 
"InvalidOffsetOrSequenceRecordsPerSec"
-
-  private val valueFactory = (k: String) => new BrokerTopicMetrics(Some(k))
 }
 
-class BrokerTopicStats extends Logging {
-  import BrokerTopicStats._
+class BrokerTopicStats(configOpt: java.util.Optional[KafkaConfig] = 
java.util.Optional.empty()) extends Logging {
 
+  private val valueFactory = (k: String) => new BrokerTopicMetrics(Some(k), 
configOpt)
   private val stats = new Pool[String, BrokerTopicMetrics](Some(valueFactory))
-  val allTopicsStats = new BrokerTopicMetrics(None)
+  val allTopicsStats = new BrokerTopicMetrics(None, configOpt)
 
   def topicStats(topic: String): BrokerTopicMetrics =
     stats.getAndMaybePut(topic)
@@ -439,12 +438,12 @@ class BrokerTopicStats extends Logging {
       
topicMetrics.closeMetric(BrokerTopicStats.ProduceMessageConversionsPerSec)
       topicMetrics.closeMetric(BrokerTopicStats.ReplicationBytesOutPerSec)
       topicMetrics.closeMetric(BrokerTopicStats.ReassignmentBytesOutPerSec)
-      topicMetrics.closeMetric(BrokerTopicStats.RemoteCopyBytesPerSec)
-      topicMetrics.closeMetric(BrokerTopicStats.RemoteFetchBytesPerSec)
-      topicMetrics.closeMetric(BrokerTopicStats.RemoteFetchRequestsPerSec)
-      topicMetrics.closeMetric(BrokerTopicStats.RemoteCopyRequestsPerSec)
-      
topicMetrics.closeMetric(BrokerTopicStats.FailedRemoteFetchRequestsPerSec)
-      topicMetrics.closeMetric(BrokerTopicStats.FailedRemoteCopyRequestsPerSec)
+      
topicMetrics.closeMetric(RemoteStorageMetrics.REMOTE_COPY_BYTES_PER_SEC_METRIC.getName)
+      
topicMetrics.closeMetric(RemoteStorageMetrics.REMOTE_FETCH_BYTES_PER_SEC_METRIC.getName)
+      
topicMetrics.closeMetric(RemoteStorageMetrics.REMOTE_FETCH_REQUESTS_PER_SEC_METRIC.getName)
+      
topicMetrics.closeMetric(RemoteStorageMetrics.REMOTE_COPY_REQUESTS_PER_SEC_METRIC.getName)
+      
topicMetrics.closeMetric(RemoteStorageMetrics.FAILED_REMOTE_FETCH_PER_SEC_METRIC.getName)
+      
topicMetrics.closeMetric(RemoteStorageMetrics.FAILED_REMOTE_COPY_PER_SEC_METRIC.getName)
     }
   }
 
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 0d10f1cdde3..a40c7d2bbea 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -261,7 +261,7 @@ class KafkaServer(
         metrics = Server.initializeMetrics(config, time, clusterId)
 
         /* register broker metrics */
-        _brokerTopicStats = new BrokerTopicStats
+        _brokerTopicStats = new BrokerTopicStats(java.util.Optional.of(config))
 
         quotaManagers = QuotaFactory.instantiate(config, metrics, time, 
threadNamePrefix.getOrElse(""))
         KafkaBroker.notifyClusterListeners(clusterId, kafkaMetricsReporters ++ 
metrics.reporters.asScala)
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java 
b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
index 941f4dc961e..dedcea0e38d 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
@@ -59,7 +59,6 @@ import org.apache.kafka.storage.internals.log.EpochEntry;
 import org.apache.kafka.storage.internals.log.LazyIndex;
 import org.apache.kafka.storage.internals.log.OffsetIndex;
 import org.apache.kafka.storage.internals.log.ProducerStateManager;
-import org.apache.kafka.storage.internals.log.RemoteStorageThreadPool;
 import org.apache.kafka.storage.internals.log.TimeIndex;
 import org.apache.kafka.storage.internals.log.TransactionIndex;
 import org.apache.kafka.test.TestUtils;
@@ -88,14 +87,14 @@ import java.util.Map;
 import java.util.NavigableMap;
 import java.util.Optional;
 import java.util.Properties;
+import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
-import java.util.stream.Collectors;
 
-import static 
kafka.log.remote.RemoteLogManager.REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT;
-import static 
kafka.log.remote.RemoteLogManager.REMOTE_LOG_READER_METRICS_NAME_PREFIX;
 import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_STORAGE_THREAD_POOL_METRICS;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -127,7 +126,8 @@ public class RemoteLogManagerTest {
     RemoteStorageManager remoteStorageManager = 
mock(RemoteStorageManager.class);
     RemoteLogMetadataManager remoteLogMetadataManager = 
mock(RemoteLogMetadataManager.class);
     RemoteLogManagerConfig remoteLogManagerConfig = null;
-    BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
+
+    BrokerTopicStats brokerTopicStats = null;
     RemoteLogManager remoteLogManager = null;
 
     TopicIdPartition leaderTopicIdPartition = new 
TopicIdPartition(Uuid.randomUuid(), new TopicPartition("Leader", 0));
@@ -157,8 +157,10 @@ public class RemoteLogManagerTest {
     void setUp() throws Exception {
         topicIds.put(leaderTopicIdPartition.topicPartition().topic(), 
leaderTopicIdPartition.topicId());
         topicIds.put(followerTopicIdPartition.topicPartition().topic(), 
followerTopicIdPartition.topicId());
-        Properties props = new Properties();
+        Properties props = kafka.utils.TestUtils.createDummyBrokerConfig();
+        
props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, 
"true");
         remoteLogManagerConfig = createRLMConfig(props);
+        brokerTopicStats = new 
BrokerTopicStats(Optional.of(KafkaConfig.fromProps(props)));
 
         kafka.utils.TestUtils.clearYammerMetrics();
 
@@ -922,11 +924,8 @@ public class RemoteLogManagerTest {
             KafkaMetricsGroup mockRlmMetricsGroup = 
mockMetricsGroupCtor.constructed().get(0);
             KafkaMetricsGroup mockThreadPoolMetricsGroup = 
mockMetricsGroupCtor.constructed().get(1);
 
-            List<String> remoteLogManagerMetricNames = 
Collections.singletonList(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT);
-            List<String> remoteStorageThreadPoolMetricNames = 
RemoteStorageThreadPool.METRIC_SUFFIXES
-                .stream()
-                .map(suffix -> REMOTE_LOG_READER_METRICS_NAME_PREFIX + suffix)
-                .collect(Collectors.toList());
+            List<String> remoteLogManagerMetricNames = 
Collections.singletonList(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC.getName());
+            Set<String> remoteStorageThreadPoolMetricNames = 
REMOTE_STORAGE_THREAD_POOL_METRICS;
 
             verify(mockRlmMetricsGroup, 
times(remoteLogManagerMetricNames.size())).newGauge(anyString(), any());
             // Verify that the RemoteLogManager metrics are removed
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java 
b/core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java
index 3e8596f93cd..d1d35b5e5df 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java
@@ -17,9 +17,11 @@
 package kafka.log.remote;
 
 import kafka.server.BrokerTopicStats;
+import kafka.server.KafkaConfig;
 import kafka.utils.TestUtils;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.record.Records;
+import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
 import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
 import org.apache.kafka.storage.internals.log.FetchDataInfo;
 import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
@@ -30,6 +32,8 @@ import org.junit.jupiter.api.Test;
 import org.mockito.ArgumentCaptor;
 
 import java.io.IOException;
+import java.util.Optional;
+import java.util.Properties;
 import java.util.function.Consumer;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -44,13 +48,16 @@ import static org.mockito.Mockito.when;
 public class RemoteLogReaderTest {
     public static final String TOPIC = "test";
     RemoteLogManager mockRLM = mock(RemoteLogManager.class);
-    BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
+    BrokerTopicStats brokerTopicStats = null;
     LogOffsetMetadata logOffsetMetadata = new LogOffsetMetadata(100);
     Records records = mock(Records.class);
 
     @BeforeEach
     public void setUp() {
         TestUtils.clearYammerMetrics();
+        Properties props = kafka.utils.TestUtils.createDummyBrokerConfig();
+        
props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, 
"true");
+        brokerTopicStats = new 
BrokerTopicStats(Optional.of(KafkaConfig.fromProps(props)));
     }
 
     @Test
diff --git a/core/src/test/scala/integration/kafka/api/MetricsTest.scala 
b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
index 09e21ccc084..c34d8ff0cdb 100644
--- a/core/src/test/scala/integration/kafka/api/MetricsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
@@ -24,9 +24,12 @@ import 
org.apache.kafka.common.errors.{InvalidTopicException, UnknownTopicOrPart
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.security.authenticator.TestJaasConfig
+import 
org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, 
NoOpRemoteStorageManager, RemoteLogManagerConfig, RemoteStorageMetrics}
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
 import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 
 import scala.annotation.nowarn
 import scala.jdk.CollectionConverters._
@@ -54,6 +57,12 @@ class MetricsTest extends IntegrationTestHarness with 
SaslSetup {
 
   @BeforeEach
   override def setUp(testInfo: TestInfo): Unit = {
+    if (testInfo.getDisplayName.contains("testMetrics") && 
testInfo.getDisplayName.endsWith("true")) {
+      // systemRemoteStorageEnabled is enabled
+      
this.serverConfig.setProperty(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP,
 "true")
+      
this.serverConfig.setProperty(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP,
 classOf[NoOpRemoteStorageManager].getName)
+      
this.serverConfig.setProperty(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP,
 classOf[NoOpRemoteLogMetadataManager].getName)
+    }
     verifyNoRequestMetrics("Request metrics not removed in a previous test")
     startSasl(jaasSections(kafkaServerSaslMechanisms, 
Some(kafkaClientSaslMechanism), KafkaSasl, kafkaServerJaasEntryName))
     super.setUp(testInfo)
@@ -70,8 +79,9 @@ class MetricsTest extends IntegrationTestHarness with 
SaslSetup {
    * Verifies some of the metrics of producer, consumer as well as server.
    */
   @nowarn("cat=deprecation")
-  @Test
-  def testMetrics(): Unit = {
+  @ParameterizedTest(name = "testMetrics with systemRemoteStorageEnabled: {0}")
+  @ValueSource(booleans = Array(true, false))
+  def testMetrics(systemRemoteStorageEnabled: Boolean): Unit = {
     val topic = "topicWithOldMessageFormat"
     val props = new Properties
     props.setProperty(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG, "0.9.0")
@@ -103,6 +113,7 @@ class MetricsTest extends IntegrationTestHarness with 
SaslSetup {
 
     generateAuthenticationFailure(tp)
     verifyBrokerAuthenticationMetrics(server)
+    verifyRemoteStorageMetrics(systemRemoteStorageEnabled)
   }
 
   private def sendRecords(producer: KafkaProducer[Array[Byte], Array[Byte]], 
numRecords: Int,
@@ -308,4 +319,17 @@ class MetricsTest extends IntegrationTestHarness with 
SaslSetup {
     }
     assertTrue(metrics.isEmpty, s"$errorMessage: ${metrics.keys}")
   }
+
+  private def verifyRemoteStorageMetrics(shouldContainMetrics: Boolean): Unit 
= {
+    val metrics = RemoteStorageMetrics.allMetrics().asScala.filter(name =>
+      KafkaYammerMetrics.defaultRegistry.allMetrics.asScala.find(metric => {
+        metric._1.getMBeanName().equals(name.getMBeanName)
+      }).isDefined
+    ).toList
+    if (shouldContainMetrics) {
+      assertEquals(RemoteStorageMetrics.allMetrics().size(), metrics.size, 
s"Only $metrics appear in the metrics")
+    } else {
+      assertEquals(0, metrics.size, s"$metrics should not appear in the 
metrics")
+    }
+  }
 }
diff --git a/core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala 
b/core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala
index 5b0c0746054..bcdafd4bd4d 100644
--- a/core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala
+++ b/core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala
@@ -25,8 +25,11 @@ import org.apache.kafka.common.protocol.ApiKeys
 import org.apache.kafka.common.requests.{RequestContext, RequestHeader}
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.common.utils.MockTime
-import org.junit.jupiter.api.Assertions.assertEquals
+import org.apache.kafka.server.log.remote.storage.{RemoteLogManagerConfig, 
RemoteStorageMetrics}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
 import org.junit.jupiter.api.Test
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 import org.mockito.ArgumentMatchers
 import org.mockito.ArgumentMatchers.any
 import org.mockito.Mockito.{mock, when}
@@ -77,4 +80,22 @@ class KafkaRequestHandlerTest {
     assertEquals(Some(startTime + 2000000), 
request.callbackRequestDequeueTimeNanos)
     assertEquals(Some(startTime + 3000000), 
request.callbackRequestCompleteTimeNanos)
   }
+
+
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testTopicStats(systemRemoteStorageEnabled: Boolean): Unit = {
+    val topic = "topic"
+    val props = kafka.utils.TestUtils.createDummyBrokerConfig()
+    
props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, 
systemRemoteStorageEnabled.toString)
+    val brokerTopicStats = new 
BrokerTopicStats(java.util.Optional.of(KafkaConfig.fromProps(props)))
+    brokerTopicStats.topicStats(topic)
+    RemoteStorageMetrics.brokerTopicStatsMetrics.forEach(metric => {
+      if (systemRemoteStorageEnabled) {
+        
assertTrue(brokerTopicStats.topicStats(topic).metricMap.contains(metric.getName))
+      } else {
+        
assertFalse(brokerTopicStats.topicStats(topic).metricMap.contains(metric.getName))
+      }
+    })
+  }
 }
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 72483307411..92533d0e746 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -3583,6 +3583,7 @@ class ReplicaManagerTest {
     val tidp0 = new TopicIdPartition(topicId, tp0)
 
     val props = new Properties()
+    props.put("zookeeper.connect", "test")
     props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, 
true.toString)
     props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, 
classOf[NoOpRemoteStorageManager].getName)
     
props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, 
classOf[NoOpRemoteLogMetadataManager].getName)
@@ -3591,7 +3592,7 @@ class ReplicaManagerTest {
     val config = new AbstractConfig(RemoteLogManagerConfig.CONFIG_DEF, props)
     val remoteLogManagerConfig = new RemoteLogManagerConfig(config)
     val mockLog = mock(classOf[UnifiedLog])
-    val brokerTopicStats = new BrokerTopicStats
+    val brokerTopicStats = new 
BrokerTopicStats(java.util.Optional.of(KafkaConfig.fromProps(props)))
     val remoteLogManager = new RemoteLogManager(
       remoteLogManagerConfig,
       0,
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index e587c8cb6e8..15ccb7d339b 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -280,6 +280,10 @@ object TestUtils extends Logging {
     Await.result(future, FiniteDuration(5, TimeUnit.MINUTES))
   }
 
+  def createDummyBrokerConfig(): Properties = {
+    createBrokerConfig(0, "")
+  }
+
   /**
     * Create a test config for the provided parameters.
     *
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
index 12e2cd1468c..511f51064e1 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
@@ -95,6 +95,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
@@ -133,7 +134,7 @@ public class ReplicaFetcherThreadBenchmark {
         KafkaConfig config = new KafkaConfig(props);
         LogConfig logConfig = createLogConfig();
 
-        BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
+        BrokerTopicStats brokerTopicStats = new 
BrokerTopicStats(Optional.empty());
         LogDirFailureChannel logDirFailureChannel = 
Mockito.mock(LogDirFailureChannel.class);
         List<File> logDirs = Collections.singletonList(logDir);
         logManager = new LogManagerBuilder().
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
index 13409a57044..b3ab13fcb43 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
@@ -117,7 +117,7 @@ public class KRaftMetadataRequestBenchmark {
             clientQuotaManager, clientRequestQuotaManager, 
controllerMutationQuotaManager, replicaQuotaManager,
             replicaQuotaManager, replicaQuotaManager, Option.empty());
     private FetchManager fetchManager = Mockito.mock(FetchManager.class);
-    private BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
+    private BrokerTopicStats brokerTopicStats = new 
BrokerTopicStats(Optional.empty());
     private KafkaPrincipal principal = new 
KafkaPrincipal(KafkaPrincipal.USER_TYPE, "test-user");
     private KafkaApis kafkaApis;
     private RequestChannel.Request allTopicMetadataRequest;
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
index 5f0bcec62f0..187743ce0b1 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
@@ -122,7 +122,7 @@ public class MetadataRequestBenchmark {
         clientQuotaManager, clientRequestQuotaManager, 
controllerMutationQuotaManager, replicaQuotaManager,
         replicaQuotaManager, replicaQuotaManager, Option.empty());
     private FetchManager fetchManager = Mockito.mock(FetchManager.class);
-    private BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
+    private BrokerTopicStats brokerTopicStats = new 
BrokerTopicStats(Optional.empty());
     private KafkaPrincipal principal = new 
KafkaPrincipal(KafkaPrincipal.USER_TYPE, "test-user");
     private KafkaApis kafkaApis;
     private RequestChannel.Request allTopicMetadataRequest;
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
index 0e051ac5d89..b28ab90818f 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
@@ -94,7 +94,7 @@ public class PartitionMakeFollowerBenchmark {
         scheduler.startup();
         LogConfig logConfig = new LogConfig(new Properties());
 
-        BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
+        BrokerTopicStats brokerTopicStats = new 
BrokerTopicStats(Optional.empty());
         LogDirFailureChannel logDirFailureChannel = 
Mockito.mock(LogDirFailureChannel.class);
         logManager = new LogManagerBuilder().
             setLogDirs(Collections.singletonList(logDir)).
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
index a8fec969c87..eafe0e4b27e 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
@@ -74,7 +74,7 @@ public class UpdateFollowerFetchStateBenchmark {
     private Option<Uuid> topicId = 
OptionConverters.toScala(Optional.of(Uuid.randomUuid()));
     private File logDir = new File(System.getProperty("java.io.tmpdir"), 
topicPartition.toString());
     private KafkaScheduler scheduler = new KafkaScheduler(1, true, 
"scheduler");
-    private BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
+    private BrokerTopicStats brokerTopicStats = new 
BrokerTopicStats(Optional.empty());
     private LogDirFailureChannel logDirFailureChannel = 
Mockito.mock(LogDirFailureChannel.class);
     private long nextOffset = 0;
     private LogManager logManager;
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/BaseRecordBatchBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/BaseRecordBatchBenchmark.java
index ac79fd357f8..ade2a8fe007 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/BaseRecordBatchBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/BaseRecordBatchBenchmark.java
@@ -36,6 +36,7 @@ import org.openjdk.jmh.annotations.TearDown;
 
 import java.nio.ByteBuffer;
 import java.util.Arrays;
+import java.util.Optional;
 import java.util.Random;
 import java.util.stream.IntStream;
 
@@ -79,7 +80,7 @@ public abstract class BaseRecordBatchBenchmark {
     ByteBuffer[] batchBuffers;
     RequestLocal requestLocal;
     LogValidator.MetricsRecorder validatorMetricsRecorder = 
UnifiedLog.newValidatorMetricsRecorder(
-        new BrokerTopicStats().allTopicsStats());
+        new BrokerTopicStats(Optional.empty()).allTopicsStats());
 
     @Setup
     public void init() {
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
index 1315a2e183b..b0502c955a5 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
@@ -56,6 +56,7 @@ import org.openjdk.jmh.annotations.Warmup;
 import java.io.File;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
@@ -110,7 +111,7 @@ public class CheckpointBench {
                 new LogConfig(new Properties()), new MockConfigRepository(), 
new CleanerConfig(1, 4 * 1024 * 1024L, 0.9d,
                         1024 * 1024, 32 * 1024 * 1024, Double.MAX_VALUE, 15 * 
1000, true), time, MetadataVersion.latest(), 4, false, Option.empty());
         scheduler.startup();
-        final BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
+        final BrokerTopicStats brokerTopicStats = new 
BrokerTopicStats(Optional.empty());
         final MetadataCache metadataCache =
                 MetadataCache.zkMetadataCache(this.brokerProperties.brokerId(),
                     this.brokerProperties.interBrokerProtocolVersion(),
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
index befa467e224..0bbf934e6e8 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
@@ -63,6 +63,7 @@ import java.io.File;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
@@ -113,7 +114,7 @@ public class PartitionCreationBench {
         this.metrics = new Metrics();
         this.time = Time.SYSTEM;
         this.failureChannel = new 
LogDirFailureChannel(brokerProperties.logDirs().size());
-        final BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
+        final BrokerTopicStats brokerTopicStats = new 
BrokerTopicStats(Optional.empty());
         final List<File> files =
                 
JavaConverters.seqAsJavaList(brokerProperties.logDirs()).stream().map(File::new).collect(Collectors.toList());
         CleanerConfig cleanerConfig = new CleanerConfig(1,
diff --git 
a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageMetrics.java
 
b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageMetrics.java
new file mode 100644
index 00000000000..7c5f74836ae
--- /dev/null
+++ 
b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageMetrics.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import com.yammer.metrics.core.MetricName;
+import org.apache.kafka.server.metrics.KafkaYammerMetrics;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * This class contains the metrics related to tiered storage feature, which is 
to have a centralized
+ * place to store them, so that we can verify all of them easily.
+ *
+ * @see kafka.api.MetricsTest
+ */
+public class RemoteStorageMetrics {
+    private static final String REMOTE_LOG_READER_METRICS_NAME_PREFIX = 
"RemoteLogReader";
+    private static final String REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT = 
"RemoteLogManagerTasksAvgIdlePercent";
+    private static final String TASK_QUEUE_SIZE = "TaskQueueSize";
+    private static final String AVG_IDLE_PERCENT = "AvgIdlePercent";
+    private static final String REMOTE_COPY_BYTES_PER_SEC = 
"RemoteCopyBytesPerSec";
+    private static final String REMOTE_FETCH_BYTES_PER_SEC = 
"RemoteFetchBytesPerSec";
+    private static final String REMOTE_FETCH_REQUESTS_PER_SEC = 
"RemoteFetchRequestsPerSec";
+    private static final String REMOTE_COPY_REQUESTS_PER_SEC = 
"RemoteCopyRequestsPerSec";
+    private static final String FAILED_REMOTE_FETCH_PER_SEC = 
"RemoteFetchErrorsPerSec";
+    private static final String FAILED_REMOTE_COPY_PER_SEC = 
"RemoteCopyErrorsPerSec";
+    private static final String REMOTE_LOG_READER_TASK_QUEUE_SIZE = 
REMOTE_LOG_READER_METRICS_NAME_PREFIX + TASK_QUEUE_SIZE;
+    private static final String REMOTE_LOG_READER_AVG_IDLE_PERCENT = 
REMOTE_LOG_READER_METRICS_NAME_PREFIX + AVG_IDLE_PERCENT;
+    public static final Set<String> REMOTE_STORAGE_THREAD_POOL_METRICS = 
Collections.unmodifiableSet(
+            new HashSet<>(Arrays.asList(REMOTE_LOG_READER_TASK_QUEUE_SIZE, 
REMOTE_LOG_READER_AVG_IDLE_PERCENT)));
+
+    public final static MetricName REMOTE_COPY_BYTES_PER_SEC_METRIC = 
getMetricName(
+            "kafka.server", "BrokerTopicMetrics", REMOTE_COPY_BYTES_PER_SEC);
+    public final static MetricName REMOTE_FETCH_BYTES_PER_SEC_METRIC = 
getMetricName(
+            "kafka.server", "BrokerTopicMetrics", REMOTE_FETCH_BYTES_PER_SEC);
+    public final static MetricName REMOTE_FETCH_REQUESTS_PER_SEC_METRIC = 
getMetricName(
+            "kafka.server", "BrokerTopicMetrics", 
REMOTE_FETCH_REQUESTS_PER_SEC);
+    public final static MetricName REMOTE_COPY_REQUESTS_PER_SEC_METRIC = 
getMetricName(
+            "kafka.server", "BrokerTopicMetrics", 
REMOTE_COPY_REQUESTS_PER_SEC);
+    public final static MetricName FAILED_REMOTE_FETCH_PER_SEC_METRIC = 
getMetricName(
+            "kafka.server", "BrokerTopicMetrics", FAILED_REMOTE_FETCH_PER_SEC);
+    public final static MetricName FAILED_REMOTE_COPY_PER_SEC_METRIC = 
getMetricName(
+            "kafka.server", "BrokerTopicMetrics", FAILED_REMOTE_COPY_PER_SEC);
+    public final static MetricName 
REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC = getMetricName(
+            "kafka.log.remote", "RemoteLogManager", 
REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT);
+    public final static MetricName REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC = 
getMetricName(
+            "org.apache.kafka.storage.internals.log", 
"RemoteStorageThreadPool", REMOTE_LOG_READER_TASK_QUEUE_SIZE);
+    public final static MetricName REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC = 
getMetricName(
+            "org.apache.kafka.storage.internals.log", 
"RemoteStorageThreadPool", REMOTE_LOG_READER_AVG_IDLE_PERCENT);
+
+    public static Set<MetricName> allMetrics() {
+        Set<MetricName> metrics = new HashSet<>();
+        metrics.add(REMOTE_COPY_BYTES_PER_SEC_METRIC);
+        metrics.add(REMOTE_FETCH_BYTES_PER_SEC_METRIC);
+        metrics.add(REMOTE_FETCH_REQUESTS_PER_SEC_METRIC);
+        metrics.add(REMOTE_COPY_REQUESTS_PER_SEC_METRIC);
+        metrics.add(FAILED_REMOTE_FETCH_PER_SEC_METRIC);
+        metrics.add(FAILED_REMOTE_COPY_PER_SEC_METRIC);
+        metrics.add(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC);
+        metrics.add(REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC);
+        metrics.add(REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC);
+
+        return metrics;
+    }
+
+    public static Set<MetricName> brokerTopicStatsMetrics() {
+        Set<MetricName> metrics = new HashSet<>();
+        metrics.add(REMOTE_COPY_BYTES_PER_SEC_METRIC);
+        metrics.add(REMOTE_FETCH_BYTES_PER_SEC_METRIC);
+        metrics.add(REMOTE_FETCH_REQUESTS_PER_SEC_METRIC);
+        metrics.add(REMOTE_COPY_REQUESTS_PER_SEC_METRIC);
+        metrics.add(FAILED_REMOTE_FETCH_PER_SEC_METRIC);
+        metrics.add(FAILED_REMOTE_COPY_PER_SEC_METRIC);
+
+        return metrics;
+    }
+    private static MetricName getMetricName(String group, String type, String 
name) {
+        return KafkaYammerMetrics.getMetricName(group, type, name);
+    }
+}
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java
 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java
index 22807a0271b..9c903e51d4b 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java
@@ -23,28 +23,23 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.server.metrics.KafkaMetricsGroup;
 import org.slf4j.Logger;
 
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static 
org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_STORAGE_THREAD_POOL_METRICS;
+
 public class RemoteStorageThreadPool extends ThreadPoolExecutor {
-    public static final String TASK_QUEUE_SIZE = "TaskQueueSize";
-    public static final String AVG_IDLE_PERCENT = "AvgIdlePercent";
-    public static final Set<String> METRIC_SUFFIXES = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(TASK_QUEUE_SIZE, 
AVG_IDLE_PERCENT)));
     private final Logger logger;
-    private final String metricsNamePrefix;
     private final KafkaMetricsGroup metricsGroup = new 
KafkaMetricsGroup(this.getClass());
 
     public RemoteStorageThreadPool(String threadNamePrefix,
                                    int numThreads,
-                                   int maxPendingTasks,
-                                   String metricsNamePrefix) {
+                                   int maxPendingTasks) {
         super(numThreads, numThreads, 0L, TimeUnit.MILLISECONDS, new 
LinkedBlockingQueue<>(maxPendingTasks),
                 new RemoteStorageThreadFactory(threadNamePrefix));
         logger = new LogContext() {
@@ -54,14 +49,13 @@ public class RemoteStorageThreadPool extends 
ThreadPoolExecutor {
             }
         }.logger(RemoteStorageThreadPool.class);
 
-        this.metricsNamePrefix = metricsNamePrefix;
-        metricsGroup.newGauge(metricsNamePrefix.concat(TASK_QUEUE_SIZE), new 
Gauge<Integer>() {
+        
metricsGroup.newGauge(REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC.getName(), new 
Gauge<Integer>() {
             @Override
             public Integer value() {
                 return RemoteStorageThreadPool.this.getQueue().size();
             }
         });
-        metricsGroup.newGauge(metricsNamePrefix.concat(AVG_IDLE_PERCENT), new 
Gauge<Double>() {
+        
metricsGroup.newGauge(REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC.getName(), new 
Gauge<Double>() {
             @Override
             public Double value() {
                 return 1 - (double) 
RemoteStorageThreadPool.this.getActiveCount() / (double) 
RemoteStorageThreadPool.this.getCorePoolSize();
@@ -98,6 +92,6 @@ public class RemoteStorageThreadPool extends 
ThreadPoolExecutor {
     }
 
     public void removeMetrics() {
-        METRIC_SUFFIXES.forEach(metric -> 
metricsGroup.removeMetric(metricsNamePrefix.concat(metric)));
+        REMOTE_STORAGE_THREAD_POOL_METRICS.forEach(metricsGroup::removeMetric);
     }
 }

Reply via email to