This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 55d65cb3bad MINOR: Cleanups in CoreUtils (#19175)
55d65cb3bad is described below
commit 55d65cb3badaa8ec89d0085a59474ed9455d5ba0
Author: Mickael Maison <[email protected]>
AuthorDate: Wed Mar 12 19:43:30 2025 +0100
MINOR: Cleanups in CoreUtils (#19175)
Delete unused methods in CoreUtils and switch to Utils.newInstance().
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../org/apache/kafka/common/utils/UtilsTest.java | 1 +
.../scala/kafka/metrics/KafkaMetricsReporter.scala | 5 +-
.../main/scala/kafka/server/ReplicaManager.scala | 6 +--
.../main/scala/kafka/tools/DumpLogSegments.scala | 5 +-
core/src/main/scala/kafka/utils/CoreUtils.scala | 55 ++--------------------
.../kafka/server/DynamicConfigChangeTest.scala | 6 +--
.../scala/unit/kafka/utils/CoreUtilsTest.scala | 46 ------------------
7 files changed, 15 insertions(+), 109 deletions(-)
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
index 16fc6af154b..4220e84b7cc 100755
--- a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
@@ -202,6 +202,7 @@ public class UtilsTest {
assertEquals(10, Utils.abs(10));
assertEquals(0, Utils.abs(0));
assertEquals(1, Utils.abs(-1));
+ assertEquals(Integer.MAX_VALUE, Utils.abs(Integer.MAX_VALUE));
}
@Test
diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala
b/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala
index 136bb88b289..eb6bae3ced6 100755
--- a/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala
@@ -21,8 +21,9 @@
package kafka.metrics
import kafka.utils.{CoreUtils, VerifiableProperties}
-import java.util.concurrent.atomic.AtomicBoolean
+import org.apache.kafka.common.utils.Utils
+import java.util.concurrent.atomic.AtomicBoolean
import scala.collection.Seq
import scala.collection.mutable.ArrayBuffer
@@ -62,7 +63,7 @@ object KafkaMetricsReporter {
val metricsConfig = new KafkaMetricsConfig(verifiableProps)
if (metricsConfig.reporters.nonEmpty) {
metricsConfig.reporters.foreach(reporterType => {
- val reporter =
CoreUtils.createObject[KafkaMetricsReporter](reporterType)
+ val reporter = Utils.newInstance(reporterType,
classOf[KafkaMetricsReporter])
reporter.init(verifiableProps)
reporters += reporter
reporter match {
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 9fdd42b3174..e052d840cb2 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -45,7 +45,7 @@ import org.apache.kafka.common.replica._
import org.apache.kafka.common.requests.FetchRequest.PartitionData
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests._
-import org.apache.kafka.common.utils.{Exit, Time}
+import org.apache.kafka.common.utils.{Exit, Time, Utils}
import org.apache.kafka.common.{IsolationLevel, Node, TopicIdPartition,
TopicPartition, Uuid}
import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta}
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
@@ -58,7 +58,7 @@ import
org.apache.kafka.server.share.fetch.{DelayedShareFetchKey, DelayedShareFe
import org.apache.kafka.server.storage.log.{FetchParams, FetchPartitionData}
import org.apache.kafka.server.util.{Scheduler, ShutdownableThread}
import org.apache.kafka.storage.internals.checkpoint.{LazyOffsetCheckpoints,
OffsetCheckpointFile, OffsetCheckpoints}
-import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo,
LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel,
LogOffsetMetadata, LogReadInfo, OffsetResultHolder, RecordValidationException,
RemoteLogReadResult, RemoteStorageFetchInfo, UnifiedLog => JUnifiedLog,
VerificationGuard}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo,
LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel,
LogOffsetMetadata, LogReadInfo, OffsetResultHolder, RecordValidationException,
RemoteLogReadResult, RemoteStorageFetchInfo, VerificationGuard, UnifiedLog =>
JUnifiedLog}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import java.io.File
@@ -2585,7 +2585,7 @@ class ReplicaManager(val config: KafkaConfig,
private def createReplicaSelector(): Option[ReplicaSelector] = {
config.replicaSelectorClassName.map { className =>
- val tmpReplicaSelector: ReplicaSelector =
CoreUtils.createObject[ReplicaSelector](className)
+ val tmpReplicaSelector: ReplicaSelector = Utils.newInstance(className,
classOf[ReplicaSelector])
tmpReplicaSelector.configure(config.originals())
tmpReplicaSelector
}
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index 68545e0271c..0791d73b6af 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -21,7 +21,6 @@ import com.fasterxml.jackson.databind.JsonNode
import java.io._
import com.fasterxml.jackson.databind.node.{IntNode, JsonNodeFactory,
ObjectNode, TextNode}
-import kafka.utils.CoreUtils
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
import org.apache.kafka.common.message.ConsumerProtocolAssignment
import org.apache.kafka.common.message.ConsumerProtocolAssignmentJsonConverter
@@ -646,8 +645,8 @@ object DumpLogSegments {
} else if (options.has(shareStateOpt)) {
new ShareGroupStateMessageParser
} else {
- val valueDecoder =
CoreUtils.createObject[org.apache.kafka.tools.api.Decoder[_]](options.valueOf(valueDecoderOpt))
- val keyDecoder =
CoreUtils.createObject[org.apache.kafka.tools.api.Decoder[_]](options.valueOf(keyDecoderOpt))
+ val valueDecoder = Utils.newInstance(options.valueOf(valueDecoderOpt),
classOf[Decoder[_]])
+ val keyDecoder = Utils.newInstance(options.valueOf(keyDecoderOpt),
classOf[Decoder[_]])
new DecoderMessageParser(keyDecoder, valueDecoder)
}
diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala
b/core/src/main/scala/kafka/utils/CoreUtils.scala
index 41af6c752fa..98a06f80f46 100755
--- a/core/src/main/scala/kafka/utils/CoreUtils.scala
+++ b/core/src/main/scala/kafka/utils/CoreUtils.scala
@@ -17,14 +17,12 @@
package kafka.utils
-import java.io._
-import java.nio._
+import java.io.File
import java.util.concurrent.locks.{Lock, ReadWriteLock}
-import java.lang.management._
-import java.util.{Base64, Properties, UUID}
+import java.lang.management.ManagementFactory
import com.typesafe.scalalogging.Logger
-import javax.management._
+import javax.management.ObjectName
import scala.collection._
import scala.collection.Seq
import org.apache.kafka.network.EndPoint
@@ -35,7 +33,6 @@ import org.apache.kafka.common.utils.Utils
import org.apache.kafka.network.SocketServerConfigs
import org.slf4j.event.Level
-import java.util
import scala.jdk.CollectionConverters._
/**
@@ -109,15 +106,6 @@ object CoreUtils {
}
}
- /**
- * Create an instance of the class with the given class name
- */
- def createObject[T <: AnyRef](className: String, args: AnyRef*): T = {
- val klass = Utils.loadClass(className,
classOf[Object]).asInstanceOf[Class[T]]
- val constructor = klass.getConstructor(args.map(_.getClass): _*)
- constructor.newInstance(args: _*)
- }
-
/**
* Execute the given function inside the lock
*/
@@ -134,16 +122,6 @@ object CoreUtils {
def inWriteLock[T](lock: ReadWriteLock)(fun: => T): T =
inLock[T](lock.writeLock)(fun)
- /**
- * Returns a list of duplicated items
- */
- def duplicates[T](s: Iterable[T]): Iterable[T] = {
- s.groupBy(identity)
- .map { case (k, l) => (k, l.size)}
- .filter { case (_, l) => l > 1 }
- .keys
- }
-
def listenerListToEndPoints(listeners: String, securityProtocolMap:
Map[ListenerName, SecurityProtocol]): Seq[EndPoint] = {
listenerListToEndPoints(listeners, securityProtocolMap,
requireDistinctPorts = true)
}
@@ -217,31 +195,4 @@ object CoreUtils {
validate(endPoints)
endPoints
}
-
- def generateUuidAsBase64(): String = {
- val uuid = UUID.randomUUID()
- Base64.getUrlEncoder.withoutPadding.encodeToString(getBytesFromUuid(uuid))
- }
-
- def getBytesFromUuid(uuid: UUID): Array[Byte] = {
- // Extract bytes for uuid which is 128 bits (or 16 bytes) long.
- val uuidBytes = ByteBuffer.wrap(new Array[Byte](16))
- uuidBytes.putLong(uuid.getMostSignificantBits)
- uuidBytes.putLong(uuid.getLeastSignificantBits)
- uuidBytes.array
- }
-
- def propsWith(key: String, value: String): Properties = {
- propsWith((key, value))
- }
-
- def propsWith(props: (String, String)*): Properties = {
- val properties = new Properties()
- props.foreach { case (k, v) => properties.put(k, v) }
- properties
- }
-
- def replicaToBrokerAssignmentAsScala(map: util.Map[Integer,
util.List[Integer]]): Map[Int, Seq[Int]] = {
- map.asScala.map(e => (e._1.asInstanceOf[Int],
e._2.asScala.map(_.asInstanceOf[Int])))
- }
}
diff --git
a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index f18c4ca980d..048da3eedc8 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -518,9 +518,9 @@ class DynamicConfigChangeUnitTest {
@Test
def shouldParseRegardlessOfWhitespaceAroundValues(): Unit = {
def parse(configHandler: TopicConfigHandler, value: String): Seq[Int] = {
- configHandler.parseThrottledPartitions(
-
CoreUtils.propsWith(QuotaConfig.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG,
value),
- 102, QuotaConfig.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG)
+ val props = new Properties()
+ props.put(QuotaConfig.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG,
value)
+ configHandler.parseThrottledPartitions(props, 102,
QuotaConfig.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG)
}
val configHandler: TopicConfigHandler = new TopicConfigHandler(null, null,
null)
assertEquals(ReplicationQuotaManager.ALL_REPLICAS.asScala.map(_.toInt).toSeq,
parse(configHandler, "* "))
diff --git a/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala
b/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala
index 5f703c7be33..73a2403870f 100755
--- a/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala
@@ -17,16 +17,12 @@
package kafka.utils
-import java.util
-import java.util.{Base64, UUID}
import java.util.concurrent.locks.ReentrantLock
-import java.nio.ByteBuffer
import java.util.regex.Pattern
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import kafka.utils.CoreUtils.inLock
import org.apache.kafka.common.KafkaException
-import org.apache.kafka.common.utils.Utils
import org.slf4j.event.Level
@@ -67,23 +63,6 @@ class CoreUtilsTest extends Logging {
assertEquals(Some("test"+Level.ERROR),loggedMessage)
}
- @Test
- def testReadBytes(): Unit = {
- for (testCase <- List("", "a", "abcd")) {
- val bytes = testCase.getBytes
- assertTrue(util.Arrays.equals(bytes,
Utils.readBytes(ByteBuffer.wrap(bytes))))
- }
- }
-
- @Test
- def testAbs(): Unit = {
- assertEquals(0, Utils.abs(Integer.MIN_VALUE))
- assertEquals(1, Utils.abs(-1))
- assertEquals(0, Utils.abs(0))
- assertEquals(1, Utils.abs(1))
- assertEquals(Integer.MAX_VALUE, Utils.abs(Integer.MAX_VALUE))
- }
-
@Test
def testInLock(): Unit = {
val lock = new ReentrantLock()
@@ -94,29 +73,4 @@ class CoreUtilsTest extends Logging {
assertEquals(2, result)
assertFalse(lock.isLocked, "Should be unlocked")
}
-
- @Test
- def testUrlSafeBase64EncodeUUID(): Unit = {
-
- // Test a UUID that has no + or / characters in base64 encoding
[a149b4a3-06e1-4b49-a8cb-8a9c4a59fa46 ->(base64)-> oUm0owbhS0moy4qcSln6Rg==]
- val clusterId1 =
Base64.getUrlEncoder.withoutPadding.encodeToString(CoreUtils.getBytesFromUuid(UUID.fromString(
- "a149b4a3-06e1-4b49-a8cb-8a9c4a59fa46")))
- assertEquals(clusterId1, "oUm0owbhS0moy4qcSln6Rg")
- assertEquals(clusterId1.length, 22)
- assertTrue(clusterIdPattern.matcher(clusterId1).matches())
-
- // Test a UUID that has + or / characters in base64 encoding
[d418ec02-277e-4853-81e6-afe30259daec ->(base64)-> 1BjsAid+SFOB5q/jAlna7A==]
- val clusterId2 =
Base64.getUrlEncoder.withoutPadding.encodeToString(CoreUtils.getBytesFromUuid(UUID.fromString(
- "d418ec02-277e-4853-81e6-afe30259daec")))
- assertEquals(clusterId2, "1BjsAid-SFOB5q_jAlna7A")
- assertEquals(clusterId2.length, 22)
- assertTrue(clusterIdPattern.matcher(clusterId2).matches())
- }
-
- @Test
- def testGenerateUuidAsBase64(): Unit = {
- val clusterId = CoreUtils.generateUuidAsBase64()
- assertEquals(clusterId.length, 22)
- assertTrue(clusterIdPattern.matcher(clusterId).matches())
- }
}