This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 748175ce62e KAFKA-15189: only init remote topic metrics when enabled
(#14133)
748175ce62e is described below
commit 748175ce62ea68d2a99ff9f96c7e349e1d0e1b96
Author: Luke Chen <[email protected]>
AuthorDate: Sat Aug 5 13:00:16 2023 +0800
KAFKA-15189: only init remote topic metrics when enabled (#14133)
Only initialize remote topic metrics when system-wise remote storage is
enabled to avoid impacting performance for existing brokers. Also add tests.
Reviewers: Divij Vaidya <[email protected]>, Kamal Chandraprakash
<[email protected]>
---
build.gradle | 2 +
checkstyle/import-control.xml | 4 +
.../java/kafka/log/remote/RemoteLogManager.java | 10 +--
.../kafka/server/builders/KafkaApisBuilder.java | 3 +-
.../server/builders/ReplicaManagerBuilder.java | 3 +-
.../src/main/scala/kafka/server/BrokerServer.scala | 3 +-
.../scala/kafka/server/KafkaRequestHandler.scala | 61 +++++++-------
core/src/main/scala/kafka/server/KafkaServer.scala | 2 +-
.../kafka/log/remote/RemoteLogManagerTest.java | 21 +++--
.../java/kafka/log/remote/RemoteLogReaderTest.java | 9 +-
.../scala/integration/kafka/api/MetricsTest.scala | 30 ++++++-
.../kafka/server/KafkaRequestHandlerTest.scala | 23 ++++-
.../unit/kafka/server/ReplicaManagerTest.scala | 3 +-
.../test/scala/unit/kafka/utils/TestUtils.scala | 4 +
.../jmh/fetcher/ReplicaFetcherThreadBenchmark.java | 3 +-
.../metadata/KRaftMetadataRequestBenchmark.java | 2 +-
.../jmh/metadata/MetadataRequestBenchmark.java | 2 +-
.../partition/PartitionMakeFollowerBenchmark.java | 2 +-
.../UpdateFollowerFetchStateBenchmark.java | 2 +-
.../kafka/jmh/record/BaseRecordBatchBenchmark.java | 3 +-
.../apache/kafka/jmh/server/CheckpointBench.java | 3 +-
.../kafka/jmh/server/PartitionCreationBench.java | 3 +-
.../log/remote/storage/RemoteStorageMetrics.java | 97 ++++++++++++++++++++++
.../internals/log/RemoteStorageThreadPool.java | 22 ++---
24 files changed, 236 insertions(+), 81 deletions(-)
diff --git a/build.gradle b/build.gradle
index 76dec696680..250aed44911 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1638,6 +1638,8 @@ project(':storage:api') {
dependencies {
implementation project(':clients')
+ implementation project(':server-common')
+ implementation libs.metrics
implementation libs.slf4jApi
testImplementation project(':clients')
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 7ecc4d5a1d1..8148de598fa 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -261,6 +261,10 @@
<allow pkg="org.apache.kafka.storage"/>
<subpackage name="remote">
<allow pkg="scala.collection" />
+ <subpackage name="storage">
+ <allow pkg="com.yammer.metrics.core" />
+ <allow pkg="org.apache.kafka.server.metrics" />
+ </subpackage>
</subpackage>
</subpackage>
diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index 33bde33882f..d07d60ce3a8 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -111,6 +111,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import static
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX;
+import static
org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC;
/**
* This class is responsible for
@@ -123,8 +124,6 @@ public class RemoteLogManager implements Closeable {
private static final Logger LOGGER =
LoggerFactory.getLogger(RemoteLogManager.class);
private static final String REMOTE_LOG_READER_THREAD_NAME_PREFIX =
"remote-log-reader";
- public static final String REMOTE_LOG_READER_METRICS_NAME_PREFIX =
"RemoteLogReader";
- public static final String REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT =
"RemoteLogManagerTasksAvgIdlePercent";
private final RemoteLogManagerConfig rlmConfig;
private final int brokerId;
private final String logDir;
@@ -185,7 +184,7 @@ public class RemoteLogManager implements Closeable {
delayInMs = rlmConfig.remoteLogManagerTaskIntervalMs();
rlmScheduledThreadPool = new
RLMScheduledThreadPool(rlmConfig.remoteLogManagerThreadPoolSize());
- metricsGroup.newGauge(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT, new
Gauge<Double>() {
+
metricsGroup.newGauge(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC.getName(),
new Gauge<Double>() {
@Override
public Double value() {
return rlmScheduledThreadPool.getIdlePercent();
@@ -195,13 +194,12 @@ public class RemoteLogManager implements Closeable {
remoteStorageReaderThreadPool = new RemoteStorageThreadPool(
REMOTE_LOG_READER_THREAD_NAME_PREFIX,
rlmConfig.remoteLogReaderThreads(),
- rlmConfig.remoteLogReaderMaxPendingTasks(),
- REMOTE_LOG_READER_METRICS_NAME_PREFIX
+ rlmConfig.remoteLogReaderMaxPendingTasks()
);
}
private void removeMetrics() {
- metricsGroup.removeMetric(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT);
+
metricsGroup.removeMetric(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC.getName());
remoteStorageReaderThreadPool.removeMetrics();
}
diff --git a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
index d5403e0c57a..aed480beba5 100644
--- a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
+++ b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
@@ -38,6 +38,7 @@ import org.apache.kafka.server.authorizer.Authorizer;
import java.util.Collections;
import java.util.Optional;
+
import scala.compat.java8.OptionConverters;
@@ -171,7 +172,7 @@ public class KafkaApisBuilder {
if (metrics == null) throw new RuntimeException("You must set
metrics");
if (quotas == null) throw new RuntimeException("You must set quotas");
if (fetchManager == null) throw new RuntimeException("You must set
fetchManager");
- if (brokerTopicStats == null) brokerTopicStats = new
BrokerTopicStats();
+ if (brokerTopicStats == null) brokerTopicStats = new
BrokerTopicStats(Optional.of(config));
if (apiVersionManager == null) throw new RuntimeException("You must
set apiVersionManager");
return new KafkaApis(requestChannel,
diff --git
a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
index 5860aa17693..2566e4bcfcb 100644
--- a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
+++ b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
@@ -54,7 +54,7 @@ public class ReplicaManagerBuilder {
private MetadataCache metadataCache = null;
private LogDirFailureChannel logDirFailureChannel = null;
private AlterPartitionManager alterPartitionManager = null;
- private BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
+ private BrokerTopicStats brokerTopicStats = null;
private AtomicBoolean isShuttingDown = new AtomicBoolean(false);
private Optional<RemoteLogManager> remoteLogManager = Optional.empty();
private Optional<KafkaZkClient> zkClient = Optional.empty();
@@ -179,6 +179,7 @@ public class ReplicaManagerBuilder {
if (metadataCache == null) throw new RuntimeException("You must set
metadataCache");
if (logDirFailureChannel == null) throw new RuntimeException("You must
set logDirFailureChannel");
if (alterPartitionManager == null) throw new RuntimeException("You
must set alterIsrManager");
+ if (brokerTopicStats == null) brokerTopicStats = new
BrokerTopicStats(Optional.of(config));
return new ReplicaManager(config,
metrics,
time,
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 1d0bd0124b0..825d9eb8c19 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -188,8 +188,7 @@ class BrokerServer(
kafkaScheduler.startup()
/* register broker metrics */
- brokerTopicStats = new BrokerTopicStats
-
+ brokerTopicStats = new BrokerTopicStats(java.util.Optional.of(config))
quotaManagers = QuotaFactory.instantiate(config, metrics, time,
s"broker-${config.nodeId}-")
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index 7a9f166b56d..6191bf88120 100755
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger
import com.yammer.metrics.core.Meter
import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.utils.{KafkaThread, Time}
+import org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import java.util.Collections
@@ -227,7 +228,7 @@ class KafkaRequestHandlerPool(val brokerId: Int,
}
}
-class BrokerTopicMetrics(name: Option[String]) {
+class BrokerTopicMetrics(name: Option[String], configOpt:
java.util.Optional[KafkaConfig]) {
private val metricsGroup = new KafkaMetricsGroup(this.getClass)
val tags: java.util.Map[String, String] = name match {
@@ -277,17 +278,12 @@ class BrokerTopicMetrics(name: Option[String]) {
BrokerTopicStats.TotalFetchRequestsPerSec ->
MeterWrapper(BrokerTopicStats.TotalFetchRequestsPerSec, "requests"),
BrokerTopicStats.FetchMessageConversionsPerSec ->
MeterWrapper(BrokerTopicStats.FetchMessageConversionsPerSec, "requests"),
BrokerTopicStats.ProduceMessageConversionsPerSec ->
MeterWrapper(BrokerTopicStats.ProduceMessageConversionsPerSec, "requests"),
- BrokerTopicStats.RemoteCopyBytesPerSec ->
MeterWrapper(BrokerTopicStats.RemoteCopyBytesPerSec, "bytes"),
- BrokerTopicStats.RemoteFetchBytesPerSec ->
MeterWrapper(BrokerTopicStats.RemoteFetchBytesPerSec, "bytes"),
- BrokerTopicStats.RemoteFetchRequestsPerSec ->
MeterWrapper(BrokerTopicStats.RemoteFetchRequestsPerSec, "requests"),
- BrokerTopicStats.RemoteCopyRequestsPerSec ->
MeterWrapper(BrokerTopicStats.RemoteCopyRequestsPerSec, "requests"),
- BrokerTopicStats.FailedRemoteFetchRequestsPerSec ->
MeterWrapper(BrokerTopicStats.FailedRemoteFetchRequestsPerSec, "requests"),
- BrokerTopicStats.FailedRemoteCopyRequestsPerSec ->
MeterWrapper(BrokerTopicStats.FailedRemoteCopyRequestsPerSec, "requests"),
BrokerTopicStats.NoKeyCompactedTopicRecordsPerSec ->
MeterWrapper(BrokerTopicStats.NoKeyCompactedTopicRecordsPerSec, "requests"),
BrokerTopicStats.InvalidMagicNumberRecordsPerSec ->
MeterWrapper(BrokerTopicStats.InvalidMagicNumberRecordsPerSec, "requests"),
BrokerTopicStats.InvalidMessageCrcRecordsPerSec ->
MeterWrapper(BrokerTopicStats.InvalidMessageCrcRecordsPerSec, "requests"),
BrokerTopicStats.InvalidOffsetOrSequenceRecordsPerSec ->
MeterWrapper(BrokerTopicStats.InvalidOffsetOrSequenceRecordsPerSec, "requests")
).asJava)
+
if (name.isEmpty) {
metricTypeMap.put(BrokerTopicStats.ReplicationBytesInPerSec,
MeterWrapper(BrokerTopicStats.ReplicationBytesInPerSec, "bytes"))
metricTypeMap.put(BrokerTopicStats.ReplicationBytesOutPerSec,
MeterWrapper(BrokerTopicStats.ReplicationBytesOutPerSec, "bytes"))
@@ -295,6 +291,18 @@ class BrokerTopicMetrics(name: Option[String]) {
metricTypeMap.put(BrokerTopicStats.ReassignmentBytesOutPerSec,
MeterWrapper(BrokerTopicStats.ReassignmentBytesOutPerSec, "bytes"))
}
+ configOpt.ifPresent(config =>
+ if (config.remoteLogManagerConfig.enableRemoteStorageSystem()) {
+ metricTypeMap.putAll(Map(
+ RemoteStorageMetrics.REMOTE_COPY_BYTES_PER_SEC_METRIC.getName ->
MeterWrapper(RemoteStorageMetrics.REMOTE_COPY_BYTES_PER_SEC_METRIC.getName,
"bytes"),
+ RemoteStorageMetrics.REMOTE_FETCH_BYTES_PER_SEC_METRIC.getName ->
MeterWrapper(RemoteStorageMetrics.REMOTE_FETCH_BYTES_PER_SEC_METRIC.getName,
"bytes"),
+ RemoteStorageMetrics.REMOTE_FETCH_REQUESTS_PER_SEC_METRIC.getName ->
MeterWrapper(RemoteStorageMetrics.REMOTE_FETCH_REQUESTS_PER_SEC_METRIC.getName,
"requests"),
+ RemoteStorageMetrics.REMOTE_COPY_REQUESTS_PER_SEC_METRIC.getName ->
MeterWrapper(RemoteStorageMetrics.REMOTE_COPY_REQUESTS_PER_SEC_METRIC.getName,
"requests"),
+ RemoteStorageMetrics.FAILED_REMOTE_FETCH_PER_SEC_METRIC.getName ->
MeterWrapper(RemoteStorageMetrics.FAILED_REMOTE_FETCH_PER_SEC_METRIC.getName,
"requests"),
+ RemoteStorageMetrics.FAILED_REMOTE_COPY_PER_SEC_METRIC.getName ->
MeterWrapper(RemoteStorageMetrics.FAILED_REMOTE_COPY_PER_SEC_METRIC.getName,
"requests")
+ ).asJava)
+ })
+
// used for testing only
def metricMap: Map[String, MeterWrapper] = metricTypeMap.toMap
@@ -342,17 +350,17 @@ class BrokerTopicMetrics(name: Option[String]) {
def invalidOffsetOrSequenceRecordsPerSec: Meter =
metricTypeMap.get(BrokerTopicStats.InvalidOffsetOrSequenceRecordsPerSec).meter()
- def remoteCopyBytesRate: Meter =
metricTypeMap.get(BrokerTopicStats.RemoteCopyBytesPerSec).meter()
+ def remoteCopyBytesRate: Meter =
metricTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_BYTES_PER_SEC_METRIC.getName).meter()
- def remoteFetchBytesRate: Meter =
metricTypeMap.get(BrokerTopicStats.RemoteFetchBytesPerSec).meter()
+ def remoteFetchBytesRate: Meter =
metricTypeMap.get(RemoteStorageMetrics.REMOTE_FETCH_BYTES_PER_SEC_METRIC.getName).meter()
- def remoteFetchRequestRate: Meter =
metricTypeMap.get(BrokerTopicStats.RemoteFetchRequestsPerSec).meter()
+ def remoteFetchRequestRate: Meter =
metricTypeMap.get(RemoteStorageMetrics.REMOTE_FETCH_REQUESTS_PER_SEC_METRIC.getName).meter()
- def remoteCopyRequestRate: Meter =
metricTypeMap.get(BrokerTopicStats.RemoteCopyRequestsPerSec).meter()
+ def remoteCopyRequestRate: Meter =
metricTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_REQUESTS_PER_SEC_METRIC.getName).meter()
- def failedRemoteFetchRequestRate: Meter =
metricTypeMap.get(BrokerTopicStats.FailedRemoteFetchRequestsPerSec).meter()
+ def failedRemoteFetchRequestRate: Meter =
metricTypeMap.get(RemoteStorageMetrics.FAILED_REMOTE_FETCH_PER_SEC_METRIC.getName).meter()
- def failedRemoteCopyRequestRate: Meter =
metricTypeMap.get(BrokerTopicStats.FailedRemoteCopyRequestsPerSec).meter()
+ def failedRemoteCopyRequestRate: Meter =
metricTypeMap.get(RemoteStorageMetrics.FAILED_REMOTE_COPY_PER_SEC_METRIC.getName).meter()
def closeMetric(metricType: String): Unit = {
val meter = metricTypeMap.get(metricType)
@@ -378,27 +386,18 @@ object BrokerTopicStats {
val ProduceMessageConversionsPerSec = "ProduceMessageConversionsPerSec"
val ReassignmentBytesInPerSec = "ReassignmentBytesInPerSec"
val ReassignmentBytesOutPerSec = "ReassignmentBytesOutPerSec"
- val RemoteCopyBytesPerSec = "RemoteCopyBytesPerSec"
- val RemoteFetchBytesPerSec = "RemoteFetchBytesPerSec"
- val RemoteFetchRequestsPerSec = "RemoteFetchRequestsPerSec"
- val RemoteCopyRequestsPerSec = "RemoteCopyRequestsPerSec"
- val FailedRemoteFetchRequestsPerSec = "RemoteFetchErrorsPerSec"
- val FailedRemoteCopyRequestsPerSec = "RemoteCopyErrorsPerSec"
-
// These following topics are for LogValidator for better debugging on
failed records
val NoKeyCompactedTopicRecordsPerSec = "NoKeyCompactedTopicRecordsPerSec"
val InvalidMagicNumberRecordsPerSec = "InvalidMagicNumberRecordsPerSec"
val InvalidMessageCrcRecordsPerSec = "InvalidMessageCrcRecordsPerSec"
val InvalidOffsetOrSequenceRecordsPerSec =
"InvalidOffsetOrSequenceRecordsPerSec"
-
- private val valueFactory = (k: String) => new BrokerTopicMetrics(Some(k))
}
-class BrokerTopicStats extends Logging {
- import BrokerTopicStats._
+class BrokerTopicStats(configOpt: java.util.Optional[KafkaConfig] =
java.util.Optional.empty()) extends Logging {
+ private val valueFactory = (k: String) => new BrokerTopicMetrics(Some(k),
configOpt)
private val stats = new Pool[String, BrokerTopicMetrics](Some(valueFactory))
- val allTopicsStats = new BrokerTopicMetrics(None)
+ val allTopicsStats = new BrokerTopicMetrics(None, configOpt)
def topicStats(topic: String): BrokerTopicMetrics =
stats.getAndMaybePut(topic)
@@ -439,12 +438,12 @@ class BrokerTopicStats extends Logging {
topicMetrics.closeMetric(BrokerTopicStats.ProduceMessageConversionsPerSec)
topicMetrics.closeMetric(BrokerTopicStats.ReplicationBytesOutPerSec)
topicMetrics.closeMetric(BrokerTopicStats.ReassignmentBytesOutPerSec)
- topicMetrics.closeMetric(BrokerTopicStats.RemoteCopyBytesPerSec)
- topicMetrics.closeMetric(BrokerTopicStats.RemoteFetchBytesPerSec)
- topicMetrics.closeMetric(BrokerTopicStats.RemoteFetchRequestsPerSec)
- topicMetrics.closeMetric(BrokerTopicStats.RemoteCopyRequestsPerSec)
-
topicMetrics.closeMetric(BrokerTopicStats.FailedRemoteFetchRequestsPerSec)
- topicMetrics.closeMetric(BrokerTopicStats.FailedRemoteCopyRequestsPerSec)
+
topicMetrics.closeMetric(RemoteStorageMetrics.REMOTE_COPY_BYTES_PER_SEC_METRIC.getName)
+
topicMetrics.closeMetric(RemoteStorageMetrics.REMOTE_FETCH_BYTES_PER_SEC_METRIC.getName)
+
topicMetrics.closeMetric(RemoteStorageMetrics.REMOTE_FETCH_REQUESTS_PER_SEC_METRIC.getName)
+
topicMetrics.closeMetric(RemoteStorageMetrics.REMOTE_COPY_REQUESTS_PER_SEC_METRIC.getName)
+
topicMetrics.closeMetric(RemoteStorageMetrics.FAILED_REMOTE_FETCH_PER_SEC_METRIC.getName)
+
topicMetrics.closeMetric(RemoteStorageMetrics.FAILED_REMOTE_COPY_PER_SEC_METRIC.getName)
}
}
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 0d10f1cdde3..a40c7d2bbea 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -261,7 +261,7 @@ class KafkaServer(
metrics = Server.initializeMetrics(config, time, clusterId)
/* register broker metrics */
- _brokerTopicStats = new BrokerTopicStats
+ _brokerTopicStats = new BrokerTopicStats(java.util.Optional.of(config))
quotaManagers = QuotaFactory.instantiate(config, metrics, time,
threadNamePrefix.getOrElse(""))
KafkaBroker.notifyClusterListeners(clusterId, kafkaMetricsReporters ++
metrics.reporters.asScala)
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
index 941f4dc961e..dedcea0e38d 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
@@ -59,7 +59,6 @@ import org.apache.kafka.storage.internals.log.EpochEntry;
import org.apache.kafka.storage.internals.log.LazyIndex;
import org.apache.kafka.storage.internals.log.OffsetIndex;
import org.apache.kafka.storage.internals.log.ProducerStateManager;
-import org.apache.kafka.storage.internals.log.RemoteStorageThreadPool;
import org.apache.kafka.storage.internals.log.TimeIndex;
import org.apache.kafka.storage.internals.log.TransactionIndex;
import org.apache.kafka.test.TestUtils;
@@ -88,14 +87,14 @@ import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.Properties;
+import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
-import java.util.stream.Collectors;
-import static
kafka.log.remote.RemoteLogManager.REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT;
-import static
kafka.log.remote.RemoteLogManager.REMOTE_LOG_READER_METRICS_NAME_PREFIX;
import static
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX;
+import static
org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC;
+import static
org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_STORAGE_THREAD_POOL_METRICS;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -127,7 +126,8 @@ public class RemoteLogManagerTest {
RemoteStorageManager remoteStorageManager =
mock(RemoteStorageManager.class);
RemoteLogMetadataManager remoteLogMetadataManager =
mock(RemoteLogMetadataManager.class);
RemoteLogManagerConfig remoteLogManagerConfig = null;
- BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
+
+ BrokerTopicStats brokerTopicStats = null;
RemoteLogManager remoteLogManager = null;
TopicIdPartition leaderTopicIdPartition = new
TopicIdPartition(Uuid.randomUuid(), new TopicPartition("Leader", 0));
@@ -157,8 +157,10 @@ public class RemoteLogManagerTest {
void setUp() throws Exception {
topicIds.put(leaderTopicIdPartition.topicPartition().topic(),
leaderTopicIdPartition.topicId());
topicIds.put(followerTopicIdPartition.topicPartition().topic(),
followerTopicIdPartition.topicId());
- Properties props = new Properties();
+ Properties props = kafka.utils.TestUtils.createDummyBrokerConfig();
+
props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP,
"true");
remoteLogManagerConfig = createRLMConfig(props);
+ brokerTopicStats = new
BrokerTopicStats(Optional.of(KafkaConfig.fromProps(props)));
kafka.utils.TestUtils.clearYammerMetrics();
@@ -922,11 +924,8 @@ public class RemoteLogManagerTest {
KafkaMetricsGroup mockRlmMetricsGroup =
mockMetricsGroupCtor.constructed().get(0);
KafkaMetricsGroup mockThreadPoolMetricsGroup =
mockMetricsGroupCtor.constructed().get(1);
- List<String> remoteLogManagerMetricNames =
Collections.singletonList(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT);
- List<String> remoteStorageThreadPoolMetricNames =
RemoteStorageThreadPool.METRIC_SUFFIXES
- .stream()
- .map(suffix -> REMOTE_LOG_READER_METRICS_NAME_PREFIX + suffix)
- .collect(Collectors.toList());
+ List<String> remoteLogManagerMetricNames =
Collections.singletonList(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC.getName());
+ Set<String> remoteStorageThreadPoolMetricNames =
REMOTE_STORAGE_THREAD_POOL_METRICS;
verify(mockRlmMetricsGroup,
times(remoteLogManagerMetricNames.size())).newGauge(anyString(), any());
// Verify that the RemoteLogManager metrics are removed
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java
b/core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java
index 3e8596f93cd..d1d35b5e5df 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java
@@ -17,9 +17,11 @@
package kafka.log.remote;
import kafka.server.BrokerTopicStats;
+import kafka.server.KafkaConfig;
import kafka.utils.TestUtils;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.Records;
+import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.apache.kafka.storage.internals.log.FetchDataInfo;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
@@ -30,6 +32,8 @@ import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import java.io.IOException;
+import java.util.Optional;
+import java.util.Properties;
import java.util.function.Consumer;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -44,13 +48,16 @@ import static org.mockito.Mockito.when;
public class RemoteLogReaderTest {
public static final String TOPIC = "test";
RemoteLogManager mockRLM = mock(RemoteLogManager.class);
- BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
+ BrokerTopicStats brokerTopicStats = null;
LogOffsetMetadata logOffsetMetadata = new LogOffsetMetadata(100);
Records records = mock(Records.class);
@BeforeEach
public void setUp() {
TestUtils.clearYammerMetrics();
+ Properties props = kafka.utils.TestUtils.createDummyBrokerConfig();
+
props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP,
"true");
+ brokerTopicStats = new
BrokerTopicStats(Optional.of(KafkaConfig.fromProps(props)));
}
@Test
diff --git a/core/src/test/scala/integration/kafka/api/MetricsTest.scala
b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
index 09e21ccc084..c34d8ff0cdb 100644
--- a/core/src/test/scala/integration/kafka/api/MetricsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
@@ -24,9 +24,12 @@ import
org.apache.kafka.common.errors.{InvalidTopicException, UnknownTopicOrPart
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.security.authenticator.TestJaasConfig
+import
org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager,
NoOpRemoteStorageManager, RemoteLogManagerConfig, RemoteStorageMetrics}
import org.apache.kafka.server.metrics.KafkaYammerMetrics
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
@@ -54,6 +57,12 @@ class MetricsTest extends IntegrationTestHarness with
SaslSetup {
@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
+ if (testInfo.getDisplayName.contains("testMetrics") &&
testInfo.getDisplayName.endsWith("true")) {
+ // systemRemoteStorageEnabled is enabled
+
this.serverConfig.setProperty(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP,
"true")
+
this.serverConfig.setProperty(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP,
classOf[NoOpRemoteStorageManager].getName)
+
this.serverConfig.setProperty(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP,
classOf[NoOpRemoteLogMetadataManager].getName)
+ }
verifyNoRequestMetrics("Request metrics not removed in a previous test")
startSasl(jaasSections(kafkaServerSaslMechanisms,
Some(kafkaClientSaslMechanism), KafkaSasl, kafkaServerJaasEntryName))
super.setUp(testInfo)
@@ -70,8 +79,9 @@ class MetricsTest extends IntegrationTestHarness with
SaslSetup {
* Verifies some of the metrics of producer, consumer as well as server.
*/
@nowarn("cat=deprecation")
- @Test
- def testMetrics(): Unit = {
+ @ParameterizedTest(name = "testMetrics with systemRemoteStorageEnabled: {0}")
+ @ValueSource(booleans = Array(true, false))
+ def testMetrics(systemRemoteStorageEnabled: Boolean): Unit = {
val topic = "topicWithOldMessageFormat"
val props = new Properties
props.setProperty(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG, "0.9.0")
@@ -103,6 +113,7 @@ class MetricsTest extends IntegrationTestHarness with
SaslSetup {
generateAuthenticationFailure(tp)
verifyBrokerAuthenticationMetrics(server)
+ verifyRemoteStorageMetrics(systemRemoteStorageEnabled)
}
private def sendRecords(producer: KafkaProducer[Array[Byte], Array[Byte]],
numRecords: Int,
@@ -308,4 +319,17 @@ class MetricsTest extends IntegrationTestHarness with
SaslSetup {
}
assertTrue(metrics.isEmpty, s"$errorMessage: ${metrics.keys}")
}
+
+ private def verifyRemoteStorageMetrics(shouldContainMetrics: Boolean): Unit
= {
+ val metrics = RemoteStorageMetrics.allMetrics().asScala.filter(name =>
+ KafkaYammerMetrics.defaultRegistry.allMetrics.asScala.find(metric => {
+ metric._1.getMBeanName().equals(name.getMBeanName)
+ }).isDefined
+ ).toList
+ if (shouldContainMetrics) {
+ assertEquals(RemoteStorageMetrics.allMetrics().size(), metrics.size,
s"Only $metrics appear in the metrics")
+ } else {
+ assertEquals(0, metrics.size, s"$metrics should not appear in the
metrics")
+ }
+ }
}
diff --git a/core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala
b/core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala
index 5b0c0746054..bcdafd4bd4d 100644
--- a/core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala
+++ b/core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala
@@ -25,8 +25,11 @@ import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.requests.{RequestContext, RequestHeader}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.utils.MockTime
-import org.junit.jupiter.api.Assertions.assertEquals
+import org.apache.kafka.server.log.remote.storage.{RemoteLogManagerConfig,
RemoteStorageMetrics}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.api.Test
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
import org.mockito.ArgumentMatchers
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.{mock, when}
@@ -77,4 +80,22 @@ class KafkaRequestHandlerTest {
assertEquals(Some(startTime + 2000000),
request.callbackRequestDequeueTimeNanos)
assertEquals(Some(startTime + 3000000),
request.callbackRequestCompleteTimeNanos)
}
+
+
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
+ def testTopicStats(systemRemoteStorageEnabled: Boolean): Unit = {
+ val topic = "topic"
+ val props = kafka.utils.TestUtils.createDummyBrokerConfig()
+
props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP,
systemRemoteStorageEnabled.toString)
+ val brokerTopicStats = new
BrokerTopicStats(java.util.Optional.of(KafkaConfig.fromProps(props)))
+ brokerTopicStats.topicStats(topic)
+ RemoteStorageMetrics.brokerTopicStatsMetrics.forEach(metric => {
+ if (systemRemoteStorageEnabled) {
+
assertTrue(brokerTopicStats.topicStats(topic).metricMap.contains(metric.getName))
+ } else {
+
assertFalse(brokerTopicStats.topicStats(topic).metricMap.contains(metric.getName))
+ }
+ })
+ }
}
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 72483307411..92533d0e746 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -3583,6 +3583,7 @@ class ReplicaManagerTest {
val tidp0 = new TopicIdPartition(topicId, tp0)
val props = new Properties()
+ props.put("zookeeper.connect", "test")
props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP,
true.toString)
props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP,
classOf[NoOpRemoteStorageManager].getName)
props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP,
classOf[NoOpRemoteLogMetadataManager].getName)
@@ -3591,7 +3592,7 @@ class ReplicaManagerTest {
val config = new AbstractConfig(RemoteLogManagerConfig.CONFIG_DEF, props)
val remoteLogManagerConfig = new RemoteLogManagerConfig(config)
val mockLog = mock(classOf[UnifiedLog])
- val brokerTopicStats = new BrokerTopicStats
+ val brokerTopicStats = new
BrokerTopicStats(java.util.Optional.of(KafkaConfig.fromProps(props)))
val remoteLogManager = new RemoteLogManager(
remoteLogManagerConfig,
0,
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index e587c8cb6e8..15ccb7d339b 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -280,6 +280,10 @@ object TestUtils extends Logging {
Await.result(future, FiniteDuration(5, TimeUnit.MINUTES))
}
+ def createDummyBrokerConfig(): Properties = {
+ createBrokerConfig(0, "")
+ }
+
/**
* Create a test config for the provided parameters.
*
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
index 12e2cd1468c..511f51064e1 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
@@ -95,6 +95,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@@ -133,7 +134,7 @@ public class ReplicaFetcherThreadBenchmark {
KafkaConfig config = new KafkaConfig(props);
LogConfig logConfig = createLogConfig();
- BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
+ BrokerTopicStats brokerTopicStats = new
BrokerTopicStats(Optional.empty());
LogDirFailureChannel logDirFailureChannel =
Mockito.mock(LogDirFailureChannel.class);
List<File> logDirs = Collections.singletonList(logDir);
logManager = new LogManagerBuilder().
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
index 13409a57044..b3ab13fcb43 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
@@ -117,7 +117,7 @@ public class KRaftMetadataRequestBenchmark {
clientQuotaManager, clientRequestQuotaManager,
controllerMutationQuotaManager, replicaQuotaManager,
replicaQuotaManager, replicaQuotaManager, Option.empty());
private FetchManager fetchManager = Mockito.mock(FetchManager.class);
- private BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
+ private BrokerTopicStats brokerTopicStats = new
BrokerTopicStats(Optional.empty());
private KafkaPrincipal principal = new
KafkaPrincipal(KafkaPrincipal.USER_TYPE, "test-user");
private KafkaApis kafkaApis;
private RequestChannel.Request allTopicMetadataRequest;
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
index 5f0bcec62f0..187743ce0b1 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
@@ -122,7 +122,7 @@ public class MetadataRequestBenchmark {
clientQuotaManager, clientRequestQuotaManager,
controllerMutationQuotaManager, replicaQuotaManager,
replicaQuotaManager, replicaQuotaManager, Option.empty());
private FetchManager fetchManager = Mockito.mock(FetchManager.class);
- private BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
+ private BrokerTopicStats brokerTopicStats = new
BrokerTopicStats(Optional.empty());
private KafkaPrincipal principal = new
KafkaPrincipal(KafkaPrincipal.USER_TYPE, "test-user");
private KafkaApis kafkaApis;
private RequestChannel.Request allTopicMetadataRequest;
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
index 0e051ac5d89..b28ab90818f 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
@@ -94,7 +94,7 @@ public class PartitionMakeFollowerBenchmark {
scheduler.startup();
LogConfig logConfig = new LogConfig(new Properties());
- BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
+ BrokerTopicStats brokerTopicStats = new
BrokerTopicStats(Optional.empty());
LogDirFailureChannel logDirFailureChannel =
Mockito.mock(LogDirFailureChannel.class);
logManager = new LogManagerBuilder().
setLogDirs(Collections.singletonList(logDir)).
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
index a8fec969c87..eafe0e4b27e 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
@@ -74,7 +74,7 @@ public class UpdateFollowerFetchStateBenchmark {
private Option<Uuid> topicId =
OptionConverters.toScala(Optional.of(Uuid.randomUuid()));
private File logDir = new File(System.getProperty("java.io.tmpdir"),
topicPartition.toString());
private KafkaScheduler scheduler = new KafkaScheduler(1, true,
"scheduler");
- private BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
+ private BrokerTopicStats brokerTopicStats = new
BrokerTopicStats(Optional.empty());
private LogDirFailureChannel logDirFailureChannel =
Mockito.mock(LogDirFailureChannel.class);
private long nextOffset = 0;
private LogManager logManager;
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/BaseRecordBatchBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/BaseRecordBatchBenchmark.java
index ac79fd357f8..ade2a8fe007 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/BaseRecordBatchBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/BaseRecordBatchBenchmark.java
@@ -36,6 +36,7 @@ import org.openjdk.jmh.annotations.TearDown;
import java.nio.ByteBuffer;
import java.util.Arrays;
+import java.util.Optional;
import java.util.Random;
import java.util.stream.IntStream;
@@ -79,7 +80,7 @@ public abstract class BaseRecordBatchBenchmark {
ByteBuffer[] batchBuffers;
RequestLocal requestLocal;
LogValidator.MetricsRecorder validatorMetricsRecorder =
UnifiedLog.newValidatorMetricsRecorder(
- new BrokerTopicStats().allTopicsStats());
+ new BrokerTopicStats(Optional.empty()).allTopicsStats());
@Setup
public void init() {
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
index 1315a2e183b..b0502c955a5 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
@@ -56,6 +56,7 @@ import org.openjdk.jmh.annotations.Warmup;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -110,7 +111,7 @@ public class CheckpointBench {
new LogConfig(new Properties()), new MockConfigRepository(),
new CleanerConfig(1, 4 * 1024 * 1024L, 0.9d,
1024 * 1024, 32 * 1024 * 1024, Double.MAX_VALUE, 15 *
1000, true), time, MetadataVersion.latest(), 4, false, Option.empty());
scheduler.startup();
- final BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
+ final BrokerTopicStats brokerTopicStats = new
BrokerTopicStats(Optional.empty());
final MetadataCache metadataCache =
MetadataCache.zkMetadataCache(this.brokerProperties.brokerId(),
this.brokerProperties.interBrokerProtocolVersion(),
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
index befa467e224..0bbf934e6e8 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
@@ -63,6 +63,7 @@ import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -113,7 +114,7 @@ public class PartitionCreationBench {
this.metrics = new Metrics();
this.time = Time.SYSTEM;
this.failureChannel = new
LogDirFailureChannel(brokerProperties.logDirs().size());
- final BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
+ final BrokerTopicStats brokerTopicStats = new
BrokerTopicStats(Optional.empty());
final List<File> files =
JavaConverters.seqAsJavaList(brokerProperties.logDirs()).stream().map(File::new).collect(Collectors.toList());
CleanerConfig cleanerConfig = new CleanerConfig(1,
diff --git
a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageMetrics.java
b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageMetrics.java
new file mode 100644
index 00000000000..7c5f74836ae
--- /dev/null
+++
b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageMetrics.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import com.yammer.metrics.core.MetricName;
+import org.apache.kafka.server.metrics.KafkaYammerMetrics;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * This class contains the metrics related to tiered storage feature, which is
to have a centralized
+ * place to store them, so that we can verify all of them easily.
+ *
+ * @see kafka.api.MetricsTest
+ */
+public class RemoteStorageMetrics {
+ private static final String REMOTE_LOG_READER_METRICS_NAME_PREFIX =
"RemoteLogReader";
+ private static final String REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT =
"RemoteLogManagerTasksAvgIdlePercent";
+ private static final String TASK_QUEUE_SIZE = "TaskQueueSize";
+ private static final String AVG_IDLE_PERCENT = "AvgIdlePercent";
+ private static final String REMOTE_COPY_BYTES_PER_SEC =
"RemoteCopyBytesPerSec";
+ private static final String REMOTE_FETCH_BYTES_PER_SEC =
"RemoteFetchBytesPerSec";
+ private static final String REMOTE_FETCH_REQUESTS_PER_SEC =
"RemoteFetchRequestsPerSec";
+ private static final String REMOTE_COPY_REQUESTS_PER_SEC =
"RemoteCopyRequestsPerSec";
+ private static final String FAILED_REMOTE_FETCH_PER_SEC =
"RemoteFetchErrorsPerSec";
+ private static final String FAILED_REMOTE_COPY_PER_SEC =
"RemoteCopyErrorsPerSec";
+ private static final String REMOTE_LOG_READER_TASK_QUEUE_SIZE =
REMOTE_LOG_READER_METRICS_NAME_PREFIX + TASK_QUEUE_SIZE;
+ private static final String REMOTE_LOG_READER_AVG_IDLE_PERCENT =
REMOTE_LOG_READER_METRICS_NAME_PREFIX + AVG_IDLE_PERCENT;
+ public static final Set<String> REMOTE_STORAGE_THREAD_POOL_METRICS =
Collections.unmodifiableSet(
+ new HashSet<>(Arrays.asList(REMOTE_LOG_READER_TASK_QUEUE_SIZE,
REMOTE_LOG_READER_AVG_IDLE_PERCENT)));
+
+ public final static MetricName REMOTE_COPY_BYTES_PER_SEC_METRIC =
getMetricName(
+ "kafka.server", "BrokerTopicMetrics", REMOTE_COPY_BYTES_PER_SEC);
+ public final static MetricName REMOTE_FETCH_BYTES_PER_SEC_METRIC =
getMetricName(
+ "kafka.server", "BrokerTopicMetrics", REMOTE_FETCH_BYTES_PER_SEC);
+ public final static MetricName REMOTE_FETCH_REQUESTS_PER_SEC_METRIC =
getMetricName(
+ "kafka.server", "BrokerTopicMetrics",
REMOTE_FETCH_REQUESTS_PER_SEC);
+ public final static MetricName REMOTE_COPY_REQUESTS_PER_SEC_METRIC =
getMetricName(
+ "kafka.server", "BrokerTopicMetrics",
REMOTE_COPY_REQUESTS_PER_SEC);
+ public final static MetricName FAILED_REMOTE_FETCH_PER_SEC_METRIC =
getMetricName(
+ "kafka.server", "BrokerTopicMetrics", FAILED_REMOTE_FETCH_PER_SEC);
+ public final static MetricName FAILED_REMOTE_COPY_PER_SEC_METRIC =
getMetricName(
+ "kafka.server", "BrokerTopicMetrics", FAILED_REMOTE_COPY_PER_SEC);
+ public final static MetricName
REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC = getMetricName(
+ "kafka.log.remote", "RemoteLogManager",
REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT);
+ public final static MetricName REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC =
getMetricName(
+ "org.apache.kafka.storage.internals.log",
"RemoteStorageThreadPool", REMOTE_LOG_READER_TASK_QUEUE_SIZE);
+ public final static MetricName REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC =
getMetricName(
+ "org.apache.kafka.storage.internals.log",
"RemoteStorageThreadPool", REMOTE_LOG_READER_AVG_IDLE_PERCENT);
+
+ public static Set<MetricName> allMetrics() {
+ Set<MetricName> metrics = new HashSet<>();
+ metrics.add(REMOTE_COPY_BYTES_PER_SEC_METRIC);
+ metrics.add(REMOTE_FETCH_BYTES_PER_SEC_METRIC);
+ metrics.add(REMOTE_FETCH_REQUESTS_PER_SEC_METRIC);
+ metrics.add(REMOTE_COPY_REQUESTS_PER_SEC_METRIC);
+ metrics.add(FAILED_REMOTE_FETCH_PER_SEC_METRIC);
+ metrics.add(FAILED_REMOTE_COPY_PER_SEC_METRIC);
+ metrics.add(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC);
+ metrics.add(REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC);
+ metrics.add(REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC);
+
+ return metrics;
+ }
+
+ public static Set<MetricName> brokerTopicStatsMetrics() {
+ Set<MetricName> metrics = new HashSet<>();
+ metrics.add(REMOTE_COPY_BYTES_PER_SEC_METRIC);
+ metrics.add(REMOTE_FETCH_BYTES_PER_SEC_METRIC);
+ metrics.add(REMOTE_FETCH_REQUESTS_PER_SEC_METRIC);
+ metrics.add(REMOTE_COPY_REQUESTS_PER_SEC_METRIC);
+ metrics.add(FAILED_REMOTE_FETCH_PER_SEC_METRIC);
+ metrics.add(FAILED_REMOTE_COPY_PER_SEC_METRIC);
+
+ return metrics;
+ }
+ private static MetricName getMetricName(String group, String type, String
name) {
+ return KafkaYammerMetrics.getMetricName(group, type, name);
+ }
+}
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java
index 22807a0271b..9c903e51d4b 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java
@@ -23,28 +23,23 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
import org.slf4j.Logger;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import static
org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC;
+import static
org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC;
+import static
org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_STORAGE_THREAD_POOL_METRICS;
+
public class RemoteStorageThreadPool extends ThreadPoolExecutor {
- public static final String TASK_QUEUE_SIZE = "TaskQueueSize";
- public static final String AVG_IDLE_PERCENT = "AvgIdlePercent";
- public static final Set<String> METRIC_SUFFIXES =
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(TASK_QUEUE_SIZE,
AVG_IDLE_PERCENT)));
private final Logger logger;
- private final String metricsNamePrefix;
private final KafkaMetricsGroup metricsGroup = new
KafkaMetricsGroup(this.getClass());
public RemoteStorageThreadPool(String threadNamePrefix,
int numThreads,
- int maxPendingTasks,
- String metricsNamePrefix) {
+ int maxPendingTasks) {
super(numThreads, numThreads, 0L, TimeUnit.MILLISECONDS, new
LinkedBlockingQueue<>(maxPendingTasks),
new RemoteStorageThreadFactory(threadNamePrefix));
logger = new LogContext() {
@@ -54,14 +49,13 @@ public class RemoteStorageThreadPool extends
ThreadPoolExecutor {
}
}.logger(RemoteStorageThreadPool.class);
- this.metricsNamePrefix = metricsNamePrefix;
- metricsGroup.newGauge(metricsNamePrefix.concat(TASK_QUEUE_SIZE), new
Gauge<Integer>() {
+
metricsGroup.newGauge(REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC.getName(), new
Gauge<Integer>() {
@Override
public Integer value() {
return RemoteStorageThreadPool.this.getQueue().size();
}
});
- metricsGroup.newGauge(metricsNamePrefix.concat(AVG_IDLE_PERCENT), new
Gauge<Double>() {
+
metricsGroup.newGauge(REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC.getName(), new
Gauge<Double>() {
@Override
public Double value() {
return 1 - (double)
RemoteStorageThreadPool.this.getActiveCount() / (double)
RemoteStorageThreadPool.this.getCorePoolSize();
@@ -98,6 +92,6 @@ public class RemoteStorageThreadPool extends
ThreadPoolExecutor {
}
public void removeMetrics() {
- METRIC_SUFFIXES.forEach(metric ->
metricsGroup.removeMetric(metricsNamePrefix.concat(metric)));
+ REMOTE_STORAGE_THREAD_POOL_METRICS.forEach(metricsGroup::removeMetric);
}
}