This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6be062a690e KAFKA-15599: Move DefaultExternalKRaftMetrics to the
server module (#21163)
6be062a690e is described below
commit 6be062a690ecaea0d384c2699af0c7341d4960e2
Author: Mickael Maison <[email protected]>
AuthorDate: Sat Dec 27 13:24:51 2025 +0100
KAFKA-15599: Move DefaultExternalKRaftMetrics to the server module (#21163)
Another small step in moving all of `kafka.raft` classes out of the
`core` module.
The `DefaultExternalKRaftMetrics` class needs:
- `ControllerMetadataMetrics` from `metadata`
- `ExternalKRaftMetrics` from `raft`
- `BrokerServerMetrics` from `server`.
The `server` module already depends on `metadata` and `raft`. It also
already has a bunch of metrics related classes in
`org.apache.kafka.server.metrics`, so it seems the best fit.
Reviewers: Vincent Jiang
<[email protected]>, Chia-Ping Tsai
<[email protected]>
---
checkstyle/import-control-server.xml | 1 +
.../src/main/scala/kafka/server/SharedServer.scala | 6 +--
.../main/scala/kafka/tools/TestRaftServer.scala | 6 ++-
.../raft/DefaultExternalKRaftMetricsTest.scala | 57 --------------------
.../scala/unit/kafka/raft/RaftManagerTest.scala | 5 +-
.../metrics/DefaultExternalKRaftMetrics.java | 24 ++++-----
.../metrics/DefaultExternalKRaftMetricsTest.java | 61 ++++++++++++++++++++++
7 files changed, 84 insertions(+), 76 deletions(-)
diff --git a/checkstyle/import-control-server.xml
b/checkstyle/import-control-server.xml
index f323a17d8e6..5c99f2539aa 100644
--- a/checkstyle/import-control-server.xml
+++ b/checkstyle/import-control-server.xml
@@ -91,6 +91,7 @@
<subpackage name="metrics">
<allow
class="org.apache.kafka.server.authorizer.AuthorizableRequestContext" />
<allow class="org.apache.kafka.controller.QuorumFeatures" />
+ <allow
class="org.apache.kafka.controller.metrics.ControllerMetadataMetrics" />
<allow pkg="org.apache.kafka.server.telemetry" />
</subpackage>
<subpackage name="share">
diff --git a/core/src/main/scala/kafka/server/SharedServer.scala
b/core/src/main/scala/kafka/server/SharedServer.scala
index 29a5a833b84..3acfc9bf0b9 100644
--- a/core/src/main/scala/kafka/server/SharedServer.scala
+++ b/core/src/main/scala/kafka/server/SharedServer.scala
@@ -18,7 +18,7 @@
package kafka.server
import kafka.metrics.KafkaMetricsReporter
-import kafka.raft.{DefaultExternalKRaftMetrics, KafkaRaftManager}
+import kafka.raft.KafkaRaftManager
import kafka.server.Server.MetricsPrefix
import kafka.utils.{CoreUtils, Logging, VerifiableProperties}
import org.apache.kafka.common.metrics.Metrics
@@ -37,7 +37,7 @@ import org.apache.kafka.raft.Endpoints
import org.apache.kafka.server.{ProcessRole, ServerSocketFactory}
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.server.fault.{FaultHandler, LoggingFaultHandler,
ProcessTerminatingFaultHandler}
-import org.apache.kafka.server.metrics.{BrokerServerMetrics,
KafkaYammerMetrics, NodeMetrics}
+import org.apache.kafka.server.metrics.{BrokerServerMetrics,
DefaultExternalKRaftMetrics, KafkaYammerMetrics, NodeMetrics}
import java.net.InetSocketAddress
import java.util.Arrays
@@ -282,7 +282,7 @@ class SharedServer(
controllerServerMetrics = new
ControllerMetadataMetrics(Optional.of(KafkaYammerMetrics.defaultRegistry()))
}
- val externalKRaftMetrics = new
DefaultExternalKRaftMetrics(Option(brokerMetrics),
Option(controllerServerMetrics))
+ val externalKRaftMetrics = new
DefaultExternalKRaftMetrics(Optional.ofNullable(brokerMetrics),
Optional.ofNullable(controllerServerMetrics))
val _raftManager = new KafkaRaftManager[ApiMessageAndVersion](
clusterId,
diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala
b/core/src/main/scala/kafka/tools/TestRaftServer.scala
index 1d20463955a..3f12d5c009e 100644
--- a/core/src/main/scala/kafka/tools/TestRaftServer.scala
+++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
import java.util.concurrent.{CompletableFuture, CountDownLatch,
LinkedBlockingDeque, TimeUnit}
import joptsimple.{OptionException, OptionSpec}
import kafka.network.SocketServer
-import kafka.raft.{DefaultExternalKRaftMetrics, KafkaRaftManager}
+import kafka.raft.KafkaRaftManager
import kafka.server.{KafkaConfig, KafkaRequestHandlerPool,
KafkaRequestHandlerPoolFactory}
import kafka.utils.{CoreUtils, Logging}
import org.apache.kafka.common.message.ApiMessageType.ListenerType
@@ -42,9 +42,11 @@ import org.apache.kafka.server.SimpleApiVersionManager
import org.apache.kafka.server.common.{FinalizedFeatures, MetadataVersion}
import org.apache.kafka.server.common.serialization.RecordSerde
import org.apache.kafka.server.fault.ProcessTerminatingFaultHandler
+import org.apache.kafka.server.metrics.DefaultExternalKRaftMetrics
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils,
ShutdownableThread}
import org.apache.kafka.snapshot.SnapshotReader
+import java.util.Optional
import scala.jdk.CollectionConverters._
/**
@@ -103,7 +105,7 @@ class TestRaftServer(
topicId,
time,
metrics,
- new DefaultExternalKRaftMetrics(None, None),
+ new DefaultExternalKRaftMetrics(Optional.empty, Optional.empty),
Some(threadNamePrefix),
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumConfig.voters)),
QuorumConfig.parseBootstrapServers(config.quorumConfig.bootstrapServers),
diff --git
a/core/src/test/scala/kafka/raft/DefaultExternalKRaftMetricsTest.scala
b/core/src/test/scala/kafka/raft/DefaultExternalKRaftMetricsTest.scala
deleted file mode 100644
index d81769e4f48..00000000000
--- a/core/src/test/scala/kafka/raft/DefaultExternalKRaftMetricsTest.scala
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.raft
-
-import com.yammer.metrics.core.MetricsRegistry
-import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.controller.metrics.ControllerMetadataMetrics
-import org.apache.kafka.server.metrics.BrokerServerMetrics
-import org.junit.jupiter.api.Assertions.{assertFalse, assertTrue}
-import org.junit.jupiter.api.Test
-
-import java.util.Optional
-
-final class DefaultExternalKRaftMetricsTest {
- @Test
- def testDefaultExternalKRaftMetrics(): Unit = {
- val brokerServerMetrics = new BrokerServerMetrics(new Metrics())
- val controllerMetadataMetrics = new
ControllerMetadataMetrics(Optional.of(new MetricsRegistry()))
- val metrics = new DefaultExternalKRaftMetrics(
- Option(brokerServerMetrics),
- Option(controllerMetadataMetrics)
- )
-
- assertFalse(brokerServerMetrics.ignoredStaticVoters())
- assertFalse(controllerMetadataMetrics.ignoredStaticVoters())
-
- metrics.setIgnoredStaticVoters(true)
-
- assertTrue(brokerServerMetrics.ignoredStaticVoters())
- assertTrue(controllerMetadataMetrics.ignoredStaticVoters())
-
- metrics.setIgnoredStaticVoters(false)
-
- assertFalse(brokerServerMetrics.ignoredStaticVoters())
- assertFalse(controllerMetadataMetrics.ignoredStaticVoters())
- }
-
- @Test
- def testEmptyDefaultExternalKRaftMetrics(): Unit = {
- val metrics = new DefaultExternalKRaftMetrics(None, None)
- metrics.setIgnoredStaticVoters(true)
- }
-}
diff --git a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
index 0486ae20756..9f5f9e1cb34 100644
--- a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
@@ -20,7 +20,7 @@ import java.net.InetSocketAddress
import java.nio.channels.FileChannel
import java.nio.channels.OverlappingFileLockException
import java.nio.file.{Files, Path, StandardOpenOption}
-import java.util.Properties
+import java.util.{Optional, Properties}
import java.util.concurrent.CompletableFuture
import kafka.server.KafkaConfig
import kafka.tools.TestRaftServer.ByteArraySerde
@@ -36,6 +36,7 @@ import org.apache.kafka.raft.{Endpoints, KRaftConfigs,
MetadataLogConfig, Quorum
import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.config.{ReplicationConfigs, ServerLogConfigs}
import org.apache.kafka.server.fault.FaultHandler
+import org.apache.kafka.server.metrics.DefaultExternalKRaftMetrics
import org.apache.kafka.storage.internals.log.LogManager
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
@@ -105,7 +106,7 @@ class RaftManagerTest {
topicId,
Time.SYSTEM,
new Metrics(Time.SYSTEM),
- new DefaultExternalKRaftMetrics(None, None),
+ new DefaultExternalKRaftMetrics(Optional.empty, Optional.empty),
Option.empty,
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumConfig.voters)),
QuorumConfig.parseBootstrapServers(config.quorumConfig.bootstrapServers),
diff --git a/core/src/main/scala/kafka/raft/DefaultExternalKRaftMetrics.scala
b/server/src/main/java/org/apache/kafka/server/metrics/DefaultExternalKRaftMetrics.java
similarity index 57%
rename from core/src/main/scala/kafka/raft/DefaultExternalKRaftMetrics.scala
rename to
server/src/main/java/org/apache/kafka/server/metrics/DefaultExternalKRaftMetrics.java
index 87ac2a5b2b5..01da081a004 100644
--- a/core/src/main/scala/kafka/raft/DefaultExternalKRaftMetrics.scala
+++
b/server/src/main/java/org/apache/kafka/server/metrics/DefaultExternalKRaftMetrics.java
@@ -14,20 +14,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.kafka.server.metrics;
-package kafka.raft
+import org.apache.kafka.controller.metrics.ControllerMetadataMetrics;
+import org.apache.kafka.raft.ExternalKRaftMetrics;
-import org.apache.kafka.controller.metrics.ControllerMetadataMetrics
-import org.apache.kafka.raft.ExternalKRaftMetrics
-import org.apache.kafka.server.metrics.BrokerServerMetrics
+import java.util.Optional;
-class DefaultExternalKRaftMetrics(
- val brokerServerMetrics: Option[BrokerServerMetrics],
- val controllerMetadataMetrics: Option[ControllerMetadataMetrics]
-) extends ExternalKRaftMetrics {
+public record DefaultExternalKRaftMetrics(
+ Optional<BrokerServerMetrics> brokerServerMetrics,
+ Optional<ControllerMetadataMetrics> controllerMetadataMetrics)
implements ExternalKRaftMetrics {
- override def setIgnoredStaticVoters(ignoredStaticVoters: Boolean): Unit = {
- brokerServerMetrics.foreach(metrics =>
metrics.setIgnoredStaticVoters(ignoredStaticVoters))
- controllerMetadataMetrics.foreach(metrics =>
metrics.setIgnoredStaticVoters(ignoredStaticVoters))
- }
+ @Override
+ public void setIgnoredStaticVoters(boolean ignoredStaticVoters) {
+ brokerServerMetrics.ifPresent(metrics ->
metrics.setIgnoredStaticVoters(ignoredStaticVoters));
+ controllerMetadataMetrics.ifPresent(metrics ->
metrics.setIgnoredStaticVoters(ignoredStaticVoters));
+ }
}
diff --git
a/server/src/test/java/org/apache/kafka/server/metrics/DefaultExternalKRaftMetricsTest.java
b/server/src/test/java/org/apache/kafka/server/metrics/DefaultExternalKRaftMetricsTest.java
new file mode 100644
index 00000000000..15a934e95a8
--- /dev/null
+++
b/server/src/test/java/org/apache/kafka/server/metrics/DefaultExternalKRaftMetricsTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.metrics;
+
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.controller.metrics.ControllerMetadataMetrics;
+
+import com.yammer.metrics.core.MetricsRegistry;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class DefaultExternalKRaftMetricsTest {
+
+ @Test
+ public void testDefaultExternalKRaftMetrics() {
+ BrokerServerMetrics brokerServerMetrics = new BrokerServerMetrics(new
Metrics());
+ ControllerMetadataMetrics controllerMetadataMetrics = new
ControllerMetadataMetrics(Optional.of(new MetricsRegistry()));
+ DefaultExternalKRaftMetrics metrics = new DefaultExternalKRaftMetrics(
+ Optional.of(brokerServerMetrics),
+ Optional.of(controllerMetadataMetrics)
+ );
+
+ assertFalse(brokerServerMetrics.ignoredStaticVoters());
+ assertFalse(controllerMetadataMetrics.ignoredStaticVoters());
+
+ metrics.setIgnoredStaticVoters(true);
+
+ assertTrue(brokerServerMetrics.ignoredStaticVoters());
+ assertTrue(controllerMetadataMetrics.ignoredStaticVoters());
+
+ metrics.setIgnoredStaticVoters(false);
+
+ assertFalse(brokerServerMetrics.ignoredStaticVoters());
+ assertFalse(controllerMetadataMetrics.ignoredStaticVoters());
+ }
+
+ @Test
+ public void testEmptyDefaultExternalKRaftMetrics() {
+ DefaultExternalKRaftMetrics metrics = new
DefaultExternalKRaftMetrics(Optional.empty(), Optional.empty());
+ metrics.setIgnoredStaticVoters(true);
+ }
+}