This is an automated email from the ASF dual-hosted git repository.

frankvicky pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a090dc3ba5e MINOR: Cleanup Core Module- Scala Modules (4/n) (#19805)
a090dc3ba5e is described below

commit a090dc3ba5e711133658d7ee794da2a6902b85f0
Author: Sanskar Jhajharia <[email protected]>
AuthorDate: Fri Jun 6 12:19:16 2025 +0530

    MINOR: Cleanup Core Module- Scala Modules (4/n) (#19805)
    
    Now that Kafka Brokers support Java 17, this PR makes some changes in
    core module. The changes in this PR are limited to only some Scala files
    in the Core module's tests. The changes mostly include:
    - Collections.emptyList(), Collections.singletonList() and
    Arrays.asList() are replaced with List.of()
    - Collections.emptyMap() and Collections.singletonMap() are replaced
    with Map.of()
    - Collections.singleton() is replaced with Set.of()
    
    To be clear, the directories being targeted in this PR from unit.kafka
    module:
    - log
    - network
    - security
    - tools
    - utils
    
    Reviewers: TengYao Chi <[email protected]>
---
 .../test/scala/unit/kafka/KafkaConfigTest.scala    |  4 +-
 .../unit/kafka/log/LogCleanerManagerTest.scala     | 45 +++++++++++++--------
 .../test/scala/unit/kafka/log/LogCleanerTest.scala | 12 +++---
 .../test/scala/unit/kafka/log/LogConfigTest.scala  | 47 +++++++++++-----------
 .../test/scala/unit/kafka/log/LogLoaderTest.scala  | 16 ++++----
 .../test/scala/unit/kafka/log/LogManagerTest.scala |  2 +-
 .../test/scala/unit/kafka/log/UnifiedLogTest.scala | 16 ++++----
 .../unit/kafka/network/ConnectionQuotasTest.scala  | 33 ++++++++-------
 .../scala/unit/kafka/network/ProcessorTest.scala   |  6 +--
 .../unit/kafka/network/RequestChannelTest.scala    | 41 +++++++++----------
 .../unit/kafka/network/SocketServerTest.scala      | 18 ++++-----
 .../kafka/security/authorizer/AuthorizerTest.scala |  8 ++--
 .../unit/kafka/tools/DumpLogSegmentsTest.scala     | 30 +++++++-------
 .../scala/unit/kafka/tools/StorageToolTest.scala   | 22 +++++-----
 .../test/scala/unit/kafka/utils/TestUtils.scala    | 28 ++++++-------
 15 files changed, 169 insertions(+), 159 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
index 8834f6f3608..81f2c8e5e08 100644
--- a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
@@ -116,12 +116,12 @@ class KafkaConfigTest {
     // We should be also able to set completely new property
     val config3 = 
KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, 
"--override", "log.cleanup.policy=compact")))
     assertEquals(1, config3.nodeId)
-    assertEquals(util.Arrays.asList("compact"), config3.logCleanupPolicy)
+    assertEquals(util.List.of("compact"), config3.logCleanupPolicy)
 
     // We should be also able to set several properties
     val config4 = 
KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, 
"--override", "log.cleanup.policy=compact,delete", "--override", "node.id=2")))
     assertEquals(2, config4.nodeId)
-    assertEquals(util.Arrays.asList("compact","delete"), 
config4.logCleanupPolicy)
+    assertEquals(util.List.of("compact","delete"), config4.logCleanupPolicy)
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index 8445baa7719..dc9a1d0928a 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -37,8 +37,7 @@ import org.junit.jupiter.api.{AfterEach, Test}
 import java.lang.{Long => JLong}
 import java.util
 import java.util.concurrent.ConcurrentHashMap
-import scala.collection.mutable
-import scala.jdk.CollectionConverters._
+import java.util.stream.Collectors
 import scala.jdk.OptionConverters.RichOptional
 
 /**
@@ -61,13 +60,13 @@ class LogCleanerManagerTest extends Logging {
   val offset = 999
   val producerStateManagerConfig = new 
ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
 false)
 
-  val cleanerCheckpoints: mutable.Map[TopicPartition, JLong] = 
mutable.Map[TopicPartition, JLong]()
+  val cleanerCheckpoints: util.HashMap[TopicPartition, JLong] = new 
util.HashMap[TopicPartition, JLong]()
 
   class LogCleanerManagerMock(logDirs: util.List[File],
                               logs: 
util.concurrent.ConcurrentMap[TopicPartition, UnifiedLog],
                               logDirFailureChannel: LogDirFailureChannel) 
extends LogCleanerManager(logDirs, logs, logDirFailureChannel) {
     override def allCleanerCheckpoints: util.Map[TopicPartition, JLong] = {
-      cleanerCheckpoints.toMap.asJava
+      cleanerCheckpoints
     }
 
     override def updateCheckpoints(dataDir: File, partitionToUpdateOrAdd: 
Optional[util.Map.Entry[TopicPartition, JLong]],
@@ -382,7 +381,11 @@ class LogCleanerManagerTest extends Logging {
     assertEquals(0, cleanable.size, "should have 0 logs ready to be compacted")
 
     // log cleanup finished, and log can be picked up for compaction
-    
cleanerManager.resumeCleaning(deletableLog.asScala.map(_.getKey).toSet.asJava)
+    cleanerManager.resumeCleaning(
+      deletableLog.stream()
+        .map[TopicPartition](entry => entry.getKey)
+        .collect(Collectors.toSet[TopicPartition]())
+    )
     val cleanable2 = cleanerManager.grabFilthiestCompactedLog(time, new 
PreCleanStats()).toScala
     assertEquals(1, cleanable2.size, "should have 1 logs ready to be 
compacted")
 
@@ -396,7 +399,7 @@ class LogCleanerManagerTest extends Logging {
     assertEquals(0, deletableLog2.size, "should have 0 logs ready to be 
deleted")
 
     // compaction done, should have 1 log eligible for log cleanup
-    cleanerManager.doneDeleting(Seq(cleanable2.get.topicPartition).asJava)
+    cleanerManager.doneDeleting(util.List.of(cleanable2.get.topicPartition))
     val deletableLog3 = cleanerManager.pauseCleaningForNonCompactedPartitions()
     assertEquals(1, deletableLog3.size, "should have 1 logs ready to be 
deleted")
   }
@@ -501,9 +504,13 @@ class LogCleanerManagerTest extends Logging {
     val pausedPartitions = 
cleanerManager.pauseCleaningForNonCompactedPartitions()
     // Log truncation happens due to unclean leader election
     cleanerManager.abortAndPauseCleaning(log.topicPartition)
-    cleanerManager.resumeCleaning(Set(log.topicPartition).asJava)
+    cleanerManager.resumeCleaning(util.Set.of(log.topicPartition))
     // log cleanup finishes and pausedPartitions are resumed
-    
cleanerManager.resumeCleaning(pausedPartitions.asScala.map(_.getKey).toSet.asJava)
+    cleanerManager.resumeCleaning(
+      pausedPartitions.stream()
+        .map[TopicPartition](entry => entry.getKey)
+        .collect(Collectors.toSet[TopicPartition]())
+    )
 
     assertEquals(Optional.empty(), 
cleanerManager.cleaningState(log.topicPartition))
   }
@@ -522,7 +529,11 @@ class LogCleanerManagerTest extends Logging {
     // Broker processes StopReplicaRequest with delete=true
     cleanerManager.abortCleaning(log.topicPartition)
     // log cleanup finishes and pausedPartitions are resumed
-    
cleanerManager.resumeCleaning(pausedPartitions.asScala.map(_.getKey).toSet.asJava)
+    cleanerManager.resumeCleaning(
+      pausedPartitions.stream()
+        .map[TopicPartition](entry => entry.getKey)
+        .collect(Collectors.toSet[TopicPartition]())
+    )
 
     assertEquals(Optional.empty(), 
cleanerManager.cleaningState(log.topicPartition))
   }
@@ -743,17 +754,17 @@ class LogCleanerManagerTest extends Logging {
     val cleanerManager: LogCleanerManager = createCleanerManager(log)
     val tp = new TopicPartition("log", 0)
 
-    assertThrows(classOf[IllegalStateException], () => 
cleanerManager.doneDeleting(Seq(tp).asJava))
+    assertThrows(classOf[IllegalStateException], () => 
cleanerManager.doneDeleting(util.List.of(tp)))
 
     cleanerManager.setCleaningState(tp, LogCleaningState.logCleaningPaused(1))
-    assertThrows(classOf[IllegalStateException], () => 
cleanerManager.doneDeleting(Seq(tp).asJava))
+    assertThrows(classOf[IllegalStateException], () => 
cleanerManager.doneDeleting(util.List.of(tp)))
 
     cleanerManager.setCleaningState(tp, LOG_CLEANING_IN_PROGRESS)
-    cleanerManager.doneDeleting(Seq(tp).asJava)
+    cleanerManager.doneDeleting(util.List.of(tp))
     assertTrue(cleanerManager.cleaningState(tp).isEmpty)
 
     cleanerManager.setCleaningState(tp, LOG_CLEANING_ABORTED)
-    cleanerManager.doneDeleting(Seq(tp).asJava)
+    cleanerManager.doneDeleting(util.List.of(tp))
     assertEquals(LogCleaningState.logCleaningPaused(1), 
cleanerManager.cleaningState(tp).get)
   }
 
@@ -771,7 +782,7 @@ class LogCleanerManagerTest extends Logging {
 
     val filthiestLog = cleanerManager.grabFilthiestCompactedLog(time, new 
PreCleanStats())
     assertEquals(Optional.empty(), filthiestLog, "Log should not be selected 
for cleaning")
-    assertEquals(20L, cleanerCheckpoints(tp), "Unselected log should have 
checkpoint offset updated")
+    assertEquals(20L, cleanerCheckpoints.get(tp), "Unselected log should have 
checkpoint offset updated")
   }
 
   /**
@@ -793,17 +804,17 @@ class LogCleanerManagerTest extends Logging {
 
     val filthiestLog = cleanerManager.grabFilthiestCompactedLog(time, new 
PreCleanStats()).get
     assertEquals(tp1, filthiestLog.topicPartition, "Dirtier log should be 
selected")
-    assertEquals(15L, cleanerCheckpoints(tp0), "Unselected log should have 
checkpoint offset updated")
+    assertEquals(15L, cleanerCheckpoints.get(tp0), "Unselected log should have 
checkpoint offset updated")
   }
 
   private def createCleanerManager(log: UnifiedLog): LogCleanerManager = {
     val logs = new util.concurrent.ConcurrentHashMap[TopicPartition, 
UnifiedLog]()
     logs.put(topicPartition, log)
-    new LogCleanerManager(Seq(logDir, logDir2).asJava, logs, null)
+    new LogCleanerManager(util.List.of(logDir, logDir2), logs, null)
   }
 
   private def createCleanerManagerMock(pool: 
util.concurrent.ConcurrentMap[TopicPartition, UnifiedLog]): 
LogCleanerManagerMock = {
-    new LogCleanerManagerMock(Seq(logDir).asJava, pool, null)
+    new LogCleanerManagerMock(util.List.of(logDir), pool, null)
   }
 
   private def createLog(segmentSize: Int,
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 1bebfaa49e1..5b842d60e46 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -913,7 +913,7 @@ class LogCleanerTest extends Logging {
 
     // clean the log
     val stats = new CleanerStats(Time.SYSTEM)
-    cleaner.cleanSegments(log, Seq(log.logSegments.asScala.head).asJava, map, 
0L, stats, new CleanedTransactionMetadata, -1, 
log.logSegments.asScala.head.readNextOffset)
+    cleaner.cleanSegments(log, util.List.of(log.logSegments.asScala.head), 
map, 0L, stats, new CleanedTransactionMetadata, -1, 
log.logSegments.asScala.head.readNextOffset)
     val shouldRemain = LogTestUtils.keysInLog(log).filterNot(keys.contains)
     assertEquals(shouldRemain, LogTestUtils.keysInLog(log))
   }
@@ -926,7 +926,7 @@ class LogCleanerTest extends Logging {
     val (log, offsetMap) = 
createLogWithMessagesLargerThanMaxSize(largeMessageSize = 1024 * 1024)
 
     val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024)
-    cleaner.cleanSegments(log, Seq(log.logSegments.asScala.head).asJava, 
offsetMap, 0L, new CleanerStats(Time.SYSTEM), new CleanedTransactionMetadata, 
-1, log.logSegments.asScala.head.readNextOffset)
+    cleaner.cleanSegments(log, util.List.of(log.logSegments.asScala.head), 
offsetMap, 0L, new CleanerStats(Time.SYSTEM), new CleanedTransactionMetadata, 
-1, log.logSegments.asScala.head.readNextOffset)
     val shouldRemain = LogTestUtils.keysInLog(log).filter(k => 
!offsetMap.map.containsKey(k.toString))
     assertEquals(shouldRemain, LogTestUtils.keysInLog(log))
   }
@@ -945,7 +945,7 @@ class LogCleanerTest extends Logging {
 
     val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024)
     assertThrows(classOf[CorruptRecordException], () =>
-      cleaner.cleanSegments(log, Seq(log.logSegments.asScala.head).asJava, 
offsetMap, 0L, new CleanerStats(Time.SYSTEM), new CleanedTransactionMetadata, 
-1, log.logSegments.asScala.head.readNextOffset)
+      cleaner.cleanSegments(log, util.List.of(log.logSegments.asScala.head), 
offsetMap, 0L, new CleanerStats(Time.SYSTEM), new CleanedTransactionMetadata, 
-1, log.logSegments.asScala.head.readNextOffset)
     )
   }
 
@@ -962,7 +962,7 @@ class LogCleanerTest extends Logging {
 
     val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024)
     assertThrows(classOf[CorruptRecordException], () =>
-      cleaner.cleanSegments(log, Seq(log.logSegments.asScala.head).asJava, 
offsetMap, 0L, new CleanerStats(Time.SYSTEM), new CleanedTransactionMetadata, 
-1, log.logSegments.asScala.head.readNextOffset)
+      cleaner.cleanSegments(log, util.List.of(log.logSegments.asScala.head), 
offsetMap, 0L, new CleanerStats(Time.SYSTEM), new CleanedTransactionMetadata, 
-1, log.logSegments.asScala.head.readNextOffset)
     )
   }
 
@@ -1636,7 +1636,7 @@ class LogCleanerTest extends Logging {
 
     // Try to clean segment with offset overflow. This will trigger log split 
and the cleaning itself must abort.
     assertThrows(classOf[LogCleaningAbortedException], () =>
-      cleaner.cleanSegments(log, Seq(segmentWithOverflow).asJava, offsetMap, 
0L, new CleanerStats(Time.SYSTEM),
+      cleaner.cleanSegments(log, util.List.of(segmentWithOverflow), offsetMap, 
0L, new CleanerStats(Time.SYSTEM),
         new CleanedTransactionMetadata, -1, segmentWithOverflow.readNextOffset)
     )
     assertEquals(numSegmentsInitial + 1, log.logSegments.size)
@@ -1646,7 +1646,7 @@ class LogCleanerTest extends Logging {
     // Clean each segment now that split is complete.
     val upperBoundOffset = log.logSegments.asScala.last.readNextOffset
     for (segmentToClean <- log.logSegments.asScala)
-      cleaner.cleanSegments(log, List(segmentToClean).asJava, offsetMap, 0L, 
new CleanerStats(Time.SYSTEM),
+      cleaner.cleanSegments(log, util.List.of(segmentToClean), offsetMap, 0L, 
new CleanerStats(Time.SYSTEM),
         new CleanedTransactionMetadata, -1, upperBoundOffset)
     assertEquals(expectedKeysAfterCleaning, LogTestUtils.keysInLog(log))
     assertFalse(LogTestUtils.hasOffsetOverflow(log))
diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala 
b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
index 296736fc678..1b5c61fc777 100644
--- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
@@ -27,7 +27,8 @@ import org.apache.kafka.server.common.MetadataVersion
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
 
-import java.util.{Collections, Properties}
+import java.util
+import java.util.Properties
 import org.apache.kafka.server.config.ServerLogConfigs
 import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
 import org.apache.kafka.storage.internals.log.{LogConfig, 
ThrottledReplicaListValidator}
@@ -122,7 +123,7 @@ class LogConfigTest {
   /* Sanity check that toHtml produces one of the expected configs */
   @Test
   def testToHtml(): Unit = {
-    val html = LogConfig.configDefCopy.toHtml(4, (key: String) => "prefix_" + 
key, Collections.emptyMap())
+    val html = LogConfig.configDefCopy.toHtml(4, (key: String) => "prefix_" + 
key, util.Map.of)
     val expectedConfig = "<h4><a id=\"file.delete.delay.ms\"></a><a 
id=\"prefix_file.delete.delay.ms\" 
href=\"#prefix_file.delete.delay.ms\">file.delete.delay.ms</a></h4>"
     assertTrue(html.contains(expectedConfig), s"Could not find 
`$expectedConfig` in:\n $html")
   }
@@ -273,7 +274,7 @@ class LogConfigTest {
     props.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, 
localRetentionMs.toString)
     props.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, 
localRetentionBytes.toString)
     assertThrows(classOf[ConfigException],
-      () => LogConfig.validate(Collections.emptyMap(), props, 
kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
+      () => LogConfig.validate(util.Map.of, props, 
kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled))
   }
 
   @Test
@@ -283,7 +284,7 @@ class LogConfigTest {
     val kafkaConfig = KafkaConfig.fromProps(kafkaProps)
     val logProps = new Properties()
     def validateCleanupPolicy(): Unit = {
-      LogConfig.validate(Collections.emptyMap(), logProps, 
kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
+      LogConfig.validate(util.Map.of, logProps, 
kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled)
     }
     logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, 
TopicConfig.CLEANUP_POLICY_DELETE)
     logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
@@ -310,10 +311,10 @@ class LogConfigTest {
     val logProps = new Properties()
     logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
     if (sysRemoteStorageEnabled) {
-      LogConfig.validate(Collections.emptyMap(), logProps, 
kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
+      LogConfig.validate(util.Map.of, logProps, 
kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled)
     } else {
       val message = assertThrows(classOf[ConfigException],
-        () => LogConfig.validate(Collections.emptyMap(), logProps, 
kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
+        () => LogConfig.validate(util.Map.of, logProps, 
kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled))
       assertTrue(message.getMessage.contains("Tiered Storage functionality is 
disabled in the broker"))
     }
   }
@@ -329,8 +330,8 @@ class LogConfigTest {
     logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false")
     if (wasRemoteStorageEnabled) {
       val message = assertThrows(classOf[InvalidConfigurationException],
-        () => 
LogConfig.validate(Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
 "true"),
-          logProps, kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
+        () => 
LogConfig.validate(util.Map.of(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, 
"true"),
+          logProps, kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled))
       assertTrue(message.getMessage.contains("It is invalid to disable remote 
storage without deleting remote data. " +
         "If you want to keep the remote data and turn to read only, please set 
`remote.storage.enable=true,remote.log.copy.disable=true`. " +
         "If you want to disable remote storage and delete all remote data, 
please set `remote.storage.enable=false,remote.log.delete.on.disable=true`."))
@@ -338,12 +339,12 @@ class LogConfigTest {
 
       // It should be able to disable the remote log storage when delete on 
disable is set to true
       logProps.put(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, "true")
-      
LogConfig.validate(Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
 "true"),
-        logProps, kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
+      
LogConfig.validate(util.Map.of(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, 
"true"),
+        logProps, kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled)
     } else {
-      LogConfig.validate(Collections.emptyMap(), logProps, 
kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
-      
LogConfig.validate(Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
 "false"), logProps,
-        kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
+      LogConfig.validate(util.Map.of, logProps, 
kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled)
+      
LogConfig.validate(util.Map.of(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, 
"false"), logProps,
+        kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled)
     }
   }
 
@@ -362,12 +363,12 @@ class LogConfigTest {
     logProps.put(TopicConfig.RETENTION_MS_CONFIG, "500")
     if (sysRemoteStorageEnabled) {
       val message = assertThrows(classOf[ConfigException],
-        () => LogConfig.validate(Collections.emptyMap(), logProps, 
kafkaConfig.extractLogConfigMap,
-          kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
+        () => LogConfig.validate(util.Map.of, logProps, 
kafkaConfig.extractLogConfigMap,
+          kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled))
       
assertTrue(message.getMessage.contains(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG))
     } else {
-      LogConfig.validate(Collections.emptyMap(), logProps, 
kafkaConfig.extractLogConfigMap,
-        kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
+      LogConfig.validate(util.Map.of, logProps, 
kafkaConfig.extractLogConfigMap,
+        kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled)
     }
   }
 
@@ -386,12 +387,12 @@ class LogConfigTest {
     logProps.put(TopicConfig.RETENTION_BYTES_CONFIG, "128")
     if (sysRemoteStorageEnabled) {
       val message = assertThrows(classOf[ConfigException],
-        () => LogConfig.validate(Collections.emptyMap(), logProps, 
kafkaConfig.extractLogConfigMap,
-          kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
+        () => LogConfig.validate(util.Map.of, logProps, 
kafkaConfig.extractLogConfigMap,
+          kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled))
       
assertTrue(message.getMessage.contains(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG))
     } else {
-      LogConfig.validate(Collections.emptyMap(), logProps, 
kafkaConfig.extractLogConfigMap,
-        kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
+      LogConfig.validate(util.Map.of, logProps, 
kafkaConfig.extractLogConfigMap,
+        kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled)
     }
   }
 
@@ -406,10 +407,10 @@ class LogConfigTest {
 
     if (sysRemoteStorageEnabled) {
       val message = assertThrows(classOf[ConfigException],
-        () => 
LogConfig.validateBrokerLogConfigValues(kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
+        () => 
LogConfig.validateBrokerLogConfigValues(kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled))
       
assertTrue(message.getMessage.contains(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG))
     } else {
-      LogConfig.validateBrokerLogConfigValues(kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
+      LogConfig.validateBrokerLogConfigValues(kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled)
     }
   }
 
diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala 
b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
index 0c465cf2138..c1d611ce6dc 100644
--- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
@@ -286,7 +286,7 @@ class LogLoaderTest {
           val wrapper = Mockito.spy(segment)
           Mockito.doAnswer { in =>
             segmentsWithReads += wrapper
-            segment.read(in.getArgument(0, classOf[java.lang.Long]), 
in.getArgument(1, classOf[java.lang.Integer]), in.getArgument(2, 
classOf[java.util.Optional[java.lang.Long]]), in.getArgument(3, 
classOf[java.lang.Boolean]))
+            segment.read(in.getArgument(0, classOf[java.lang.Long]), 
in.getArgument(1, classOf[java.lang.Integer]), in.getArgument(2, 
classOf[util.Optional[java.lang.Long]]), in.getArgument(3, 
classOf[java.lang.Boolean]))
           }.when(wrapper).read(ArgumentMatchers.any(), ArgumentMatchers.any(), 
ArgumentMatchers.any(), ArgumentMatchers.any())
           Mockito.doAnswer { in =>
             recoveredSegments += wrapper
@@ -391,12 +391,12 @@ class LogLoaderTest {
                                               codec: Compression = 
Compression.NONE,
                                               timestamp: Long = 
RecordBatch.NO_TIMESTAMP,
                                               magicValue: Byte = 
RecordBatch.CURRENT_MAGIC_VALUE): MemoryRecords = {
-    val records = Seq(new SimpleRecord(timestamp, key, value))
+    val records = util.List.of(new SimpleRecord(timestamp, key, value))
 
-    val buf = 
ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava))
+    val buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records))
     val builder = MemoryRecords.builder(buf, magicValue, codec, 
TimestampType.CREATE_TIME, offset,
       mockTime.milliseconds, leaderEpoch)
-    records.foreach(builder.append)
+    records.forEach(builder.append)
     builder.build()
   }
 
@@ -559,7 +559,7 @@ class LogLoaderTest {
       false,
       LogOffsetsListener.NO_OP_OFFSETS_LISTENER)
 
-    
verify(stateManager).removeStraySnapshots(any[java.util.List[java.lang.Long]])
+    verify(stateManager).removeStraySnapshots(any[util.List[java.lang.Long]])
     verify(stateManager, times(2)).updateMapEndOffset(0L)
     verify(stateManager, times(2)).takeSnapshot()
     verify(stateManager).isEmpty
@@ -1215,11 +1215,11 @@ class LogLoaderTest {
     val fourthBatch = singletonRecordsWithLeaderEpoch(value = 
"random".getBytes, leaderEpoch = 3, offset = 3)
     log.appendAsFollower(fourthBatch, Int.MaxValue)
 
-    assertEquals(java.util.Arrays.asList(new EpochEntry(1, 0), new 
EpochEntry(2, 1), new EpochEntry(3, 3)), leaderEpochCache.epochEntries)
+    assertEquals(util.List.of(new EpochEntry(1, 0), new EpochEntry(2, 1), new 
EpochEntry(3, 3)), leaderEpochCache.epochEntries)
 
     // deliberately remove some of the epoch entries
     leaderEpochCache.truncateFromEndAsyncFlush(2)
-    assertNotEquals(java.util.Arrays.asList(new EpochEntry(1, 0), new 
EpochEntry(2, 1), new EpochEntry(3, 3)), leaderEpochCache.epochEntries)
+    assertNotEquals(util.List.of(new EpochEntry(1, 0), new EpochEntry(2, 1), 
new EpochEntry(3, 3)), leaderEpochCache.epochEntries)
     log.close()
 
     // reopen the log and recover from the beginning
@@ -1227,7 +1227,7 @@ class LogLoaderTest {
     val recoveredLeaderEpochCache = recoveredLog.leaderEpochCache
 
     // epoch entries should be recovered
-    assertEquals(java.util.Arrays.asList(new EpochEntry(1, 0), new 
EpochEntry(2, 1), new EpochEntry(3, 3)), recoveredLeaderEpochCache.epochEntries)
+    assertEquals(util.List.of(new EpochEntry(1, 0), new EpochEntry(2, 1), new 
EpochEntry(3, 3)), recoveredLeaderEpochCache.epochEntries)
     recoveredLog.close()
   }
 
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 04f4acca5de..a8946a3d139 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -534,7 +534,7 @@ class LogManagerTest {
       true
     }
 
-    logManager.loadLog(log.dir, hadCleanShutdown = true, 
Collections.emptyMap[TopicPartition, JLong], 
Collections.emptyMap[TopicPartition, JLong], logConfig, Map.empty, new 
ConcurrentHashMap[String, Integer](),  providedIsStray)
+    logManager.loadLog(log.dir, hadCleanShutdown = true, 
util.Map.of[TopicPartition, JLong], util.Map.of[TopicPartition, JLong], 
logConfig, Map.empty, new ConcurrentHashMap[String, Integer](),  
providedIsStray)
     assertEquals(1, invokedCount)
     assertTrue(
       logDir.listFiles().toSet
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala 
b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index 42c813074aa..e42b509b247 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -231,7 +231,7 @@ class UnifiedLogTest {
 
     reopened.truncateFullyAndStartAt(2L, Optional.of(1L))
     assertEquals(Optional.empty, reopened.firstUnstableOffset)
-    assertEquals(java.util.Collections.emptyMap(), 
reopened.producerStateManager.activeProducers)
+    assertEquals(util.Map.of, reopened.producerStateManager.activeProducers)
     assertEquals(1L, reopened.logStartOffset)
     assertEquals(2L, reopened.logEndOffset)
   }
@@ -274,7 +274,7 @@ class UnifiedLogTest {
 
     truncateFunc(reopened, 0L)
     assertEquals(Optional.empty, reopened.firstUnstableOffset)
-    assertEquals(java.util.Collections.emptyMap(), 
reopened.producerStateManager.activeProducers)
+    assertEquals(util.Map.of, reopened.producerStateManager.activeProducers)
   }
 
   @Test
@@ -1980,7 +1980,7 @@ class UnifiedLogTest {
     val log = createLog(logDir, LogTestUtils.createLogConfig(maxMessageBytes = 
second.sizeInBytes - 1))
 
     log.appendAsFollower(first, Int.MaxValue)
-    // the second record is larger then limit but appendAsFollower does not 
validate the size.
+    // the second record is larger than limit but appendAsFollower does not 
validate the size.
     log.appendAsFollower(second, Int.MaxValue)
   }
 
@@ -1991,7 +1991,7 @@ class UnifiedLogTest {
     val log = createLog(logDir, logConfig)
     val previousEndOffset = log.logEndOffsetMetadata.messageOffset
 
-    if (expectedException.isPresent()) {
+    if (expectedException.isPresent) {
       assertThrows(
         expectedException.get(),
         () => log.appendAsFollower(records, Int.MaxValue)
@@ -2991,7 +2991,7 @@ class UnifiedLogTest {
     for (_ <- 0 until 15)
       log.appendAsLeader(createRecords, 0)
 
-    // mark oldest segment as older the retention.ms
+    // mark the oldest segment as older the retention.ms
     log.logSegments.asScala.head.setLastModified(mockTime.milliseconds - 20000)
 
     val segments = log.numberOfSegments
@@ -3110,7 +3110,7 @@ class UnifiedLogTest {
     log.deleteOldSegments()
 
     //The oldest epoch entry should have been removed
-    assertEquals(java.util.Arrays.asList(new EpochEntry(1, 5), new 
EpochEntry(2, 10)), cache.epochEntries)
+    assertEquals(util.List.of(new EpochEntry(1, 5), new EpochEntry(2, 10)), 
cache.epochEntries)
   }
 
   @Test
@@ -3135,7 +3135,7 @@ class UnifiedLogTest {
     log.deleteOldSegments()
 
     //The first entry should have gone from (0,0) => (0,5)
-    assertEquals(java.util.Arrays.asList(new EpochEntry(0, 5), new 
EpochEntry(1, 7), new EpochEntry(2, 10)), cache.epochEntries)
+    assertEquals(util.List.of(new EpochEntry(0, 5), new EpochEntry(1, 7), new 
EpochEntry(2, 10)), cache.epochEntries)
   }
 
   @Test
@@ -4599,7 +4599,7 @@ class UnifiedLogTest {
   def testGetFirstBatchTimestampForSegments(): Unit = {
     val log = createLog(logDir, LogTestUtils.createLogConfig())
 
-    val segments: java.util.List[LogSegment] = new 
java.util.ArrayList[LogSegment]()
+    val segments: util.List[LogSegment] = new util.ArrayList[LogSegment]()
     val seg1 = LogTestUtils.createSegment(1, logDir, 10, Time.SYSTEM)
     val seg2 = LogTestUtils.createSegment(2, logDir, 10, Time.SYSTEM)
     segments.add(seg1)
diff --git a/core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala 
b/core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
index 3906011a203..d9a64b4186f 100644
--- a/core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
@@ -20,7 +20,7 @@ package kafka.network
 import java.net.InetAddress
 import java.util
 import java.util.concurrent.{Callable, ExecutorService, Executors, TimeUnit}
-import java.util.{Collections, Properties}
+import java.util.Properties
 import com.yammer.metrics.core.Meter
 import kafka.network.Processor.ListenerMetricTag
 import kafka.server.KafkaConfig
@@ -37,7 +37,6 @@ import org.apache.kafka.server.util.MockTime
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api._
 
-import scala.jdk.CollectionConverters._
 import scala.collection.{Map, mutable}
 import scala.concurrent.TimeoutException
 
@@ -91,12 +90,12 @@ class ConnectionQuotasTest {
 
     listeners.keys.foreach { name =>
         blockedPercentMeters.put(name, new 
KafkaMetricsGroup(this.getClass).newMeter(
-          s"${name}BlockedPercent", "blocked time", TimeUnit.NANOSECONDS, 
Map(ListenerMetricTag -> name).asJava))
+          s"${name}BlockedPercent", "blocked time", TimeUnit.NANOSECONDS, 
util.Map.of(ListenerMetricTag, name)))
     }
     // use system time, because ConnectionQuota causes the current thread to 
wait with timeout, which waits based on
     // system time; so using mock time will likely result in test flakiness 
due to a mixed use of mock and system time
     time = Time.SYSTEM
-    metrics = new Metrics(new MetricConfig(), Collections.emptyList(), time)
+    metrics = new Metrics(new MetricConfig(), util.List.of, time)
     executor = Executors.newFixedThreadPool(listeners.size)
   }
 
@@ -282,7 +281,7 @@ class ConnectionQuotasTest {
 
     addListenersAndVerify(config, connectionQuotas)
 
-    val listenerConfig = Map(SocketServerConfigs.MAX_CONNECTIONS_CONFIG -> 
listenerMaxConnections.toString).asJava
+    val listenerConfig = 
util.Map.of(SocketServerConfigs.MAX_CONNECTIONS_CONFIG, 
listenerMaxConnections.toString)
     listeners.values.foreach { listener =>
       
connectionQuotas.maxConnectionsPerListener(listener.listenerName).configure(listenerConfig)
     }
@@ -374,7 +373,7 @@ class ConnectionQuotasTest {
     val config = KafkaConfig.fromProps(props)
     connectionQuotas = new ConnectionQuotas(config, time, metrics)
 
-    val listenerConfig = 
Map(SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG -> 
listenerRateLimit.toString).asJava
+    val listenerConfig = 
util.Map.of(SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG, 
listenerRateLimit.toString)
     addListenersAndVerify(config, listenerConfig, connectionQuotas)
 
     // create connections with the rate < listener quota on every listener, 
and verify there is no throttling
@@ -400,7 +399,7 @@ class ConnectionQuotasTest {
     val config = KafkaConfig.fromProps(props)
     connectionQuotas = new ConnectionQuotas(config, time, metrics)
 
-    val listenerConfig = 
Map(SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG -> 
listenerRateLimit.toString).asJava
+    val listenerConfig = 
util.Map.of(SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG, 
listenerRateLimit.toString)
     addListenersAndVerify(config, listenerConfig, connectionQuotas)
 
     // create connections with the rate > listener quota on every listener
@@ -498,7 +497,7 @@ class ConnectionQuotasTest {
     // with a default per-IP limit of 25 and a listener rate of 30, only one 
IP should be able to saturate their IP rate
     // limit, the other IP will hit listener rate limits and block
     connectionQuotas.updateIpConnectionRateQuota(None, 
Some(ipConnectionRateLimit))
-    val listenerConfig = 
Map(SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG -> 
listenerRateLimit.toString).asJava
+    val listenerConfig = 
util.Map.of(SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG, 
listenerRateLimit.toString)
     addListenersAndVerify(config, listenerConfig, connectionQuotas)
     val listener = listeners("EXTERNAL").listenerName
     // use a small number of connections because a longer-running test will 
have both IPs throttle at different times
@@ -556,7 +555,7 @@ class ConnectionQuotasTest {
     connectionQuotas.addListener(config, listeners("EXTERNAL").listenerName)
 
     val maxListenerConnectionRate = 0
-    val listenerConfig = 
Map(SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG -> 
maxListenerConnectionRate.toString).asJava
+    val listenerConfig = 
util.Map.of(SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG, 
maxListenerConnectionRate.toString)
     assertThrows(classOf[ConfigException],
       () => 
connectionQuotas.maxConnectionsPerListener(listeners("EXTERNAL").listenerName).validateReconfiguration(listenerConfig)
     )
@@ -569,11 +568,11 @@ class ConnectionQuotasTest {
     connectionQuotas.addListener(config, listeners("EXTERNAL").listenerName)
 
     val listenerRateLimit = 20
-    val listenerConfig = 
Map(SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG -> 
listenerRateLimit.toString).asJava
+    val listenerConfig = 
util.Map.of(SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG, 
listenerRateLimit.toString)
     
connectionQuotas.maxConnectionsPerListener(listeners("EXTERNAL").listenerName).configure(listenerConfig)
 
     // remove connection rate limit
-    
connectionQuotas.maxConnectionsPerListener(listeners("EXTERNAL").listenerName).reconfigure(Map.empty.asJava)
+    
connectionQuotas.maxConnectionsPerListener(listeners("EXTERNAL").listenerName).reconfigure(util.Map.of)
 
     // create connections as fast as possible, will timeout if connections get 
throttled with previous rate
     // (50s to create 1000 connections)
@@ -586,7 +585,7 @@ class ConnectionQuotasTest {
 
     // configure 100 connection/second rate limit
     val newMaxListenerConnectionRate = 10
-    val newListenerConfig = 
Map(SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG -> 
newMaxListenerConnectionRate.toString).asJava
+    val newListenerConfig = 
util.Map.of(SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG, 
newMaxListenerConnectionRate.toString)
     
connectionQuotas.maxConnectionsPerListener(listeners("EXTERNAL").listenerName).reconfigure(newListenerConfig)
 
     // verify rate limit
@@ -750,7 +749,7 @@ class ConnectionQuotasTest {
   }
 
   private def addListenersAndVerify(config: KafkaConfig, connectionQuotas: 
ConnectionQuotas) : Unit = {
-    addListenersAndVerify(config, Map.empty.asJava, connectionQuotas)
+    addListenersAndVerify(config, util.Map.of, connectionQuotas)
   }
 
   private def addListenersAndVerify(config: KafkaConfig,
@@ -829,7 +828,7 @@ class ConnectionQuotasTest {
     val metricName = metrics.metricName(
       "connection-accept-throttle-time",
       SocketServer.MetricsGroup,
-      Collections.singletonMap(Processor.ListenerMetricTag, listener))
+      util.Map.of(Processor.ListenerMetricTag, listener))
     metrics.metric(metricName)
   }
 
@@ -837,7 +836,7 @@ class ConnectionQuotasTest {
     val metricName = metrics.metricName(
       "ip-connection-accept-throttle-time",
       SocketServer.MetricsGroup,
-      Collections.singletonMap(Processor.ListenerMetricTag, listener))
+      util.Map.of(Processor.ListenerMetricTag, listener))
     metrics.metric(metricName)
   }
 
@@ -845,7 +844,7 @@ class ConnectionQuotasTest {
     val metricName = metrics.metricName(
       "connection-accept-rate",
       SocketServer.MetricsGroup,
-      Collections.singletonMap(Processor.ListenerMetricTag, listener))
+      util.Map.of(Processor.ListenerMetricTag, listener))
     metrics.metric(metricName)
   }
 
@@ -860,7 +859,7 @@ class ConnectionQuotasTest {
     val metricName = metrics.metricName(
       s"connection-accept-rate",
       SocketServer.MetricsGroup,
-      Collections.singletonMap("ip", ip))
+      util.Map.of("ip", ip))
     metrics.metric(metricName)
   }
 
diff --git a/core/src/test/scala/unit/kafka/network/ProcessorTest.scala 
b/core/src/test/scala/unit/kafka/network/ProcessorTest.scala
index 575f004fe0f..54bbd0bf201 100644
--- a/core/src/test/scala/unit/kafka/network/ProcessorTest.scala
+++ b/core/src/test/scala/unit/kafka/network/ProcessorTest.scala
@@ -31,8 +31,9 @@ import org.junit.jupiter.api.Test
 import org.junit.jupiter.api.function.Executable
 import org.mockito.Mockito.mock
 
+import java.util
 import java.util.function.Supplier
-import java.util.{Collections, Optional}
+import java.util.Optional
 
 class ProcessorTest {
 
@@ -41,7 +42,7 @@ class ProcessorTest {
     val requestHeader = RequestTestUtils.serializeRequestHeader(
       new RequestHeader(ApiKeys.INIT_PRODUCER_ID, 0, "clientid", 0))
     val apiVersionManager = new 
SimpleApiVersionManager(ListenerType.CONTROLLER, true,
-      () => new FinalizedFeatures(MetadataVersion.latestTesting(), 
Collections.emptyMap[String, java.lang.Short], 0))
+      () => new FinalizedFeatures(MetadataVersion.latestTesting(), 
util.Map.of[String, java.lang.Short], 0))
     val e = assertThrows(classOf[InvalidRequestException],
       (() => Processor.parseRequestHeader(apiVersionManager, requestHeader)): 
Executable,
       "INIT_PRODUCER_ID with listener type CONTROLLER should throw 
InvalidRequestException exception")
@@ -95,5 +96,4 @@ class ProcessorTest {
       assertTrue(e.toString.contains("unsupported version"))
     }
   }
-
 }
diff --git a/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala 
b/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
index ecea412e989..8dbfa808d7f 100644
--- a/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
+++ b/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
@@ -17,7 +17,6 @@
 
 package kafka.network
 
-
 import com.fasterxml.jackson.databind.ObjectMapper
 import kafka.network
 import kafka.server.EnvelopeUtils
@@ -47,9 +46,9 @@ import org.mockito.Mockito.mock
 import java.io.IOException
 import java.net.InetAddress
 import java.nio.ByteBuffer
-import java.util.Collections
+import java.util
 import java.util.concurrent.atomic.AtomicReference
-import scala.collection.{Map, Seq}
+import scala.collection.Map
 import scala.jdk.CollectionConverters._
 import scala.jdk.OptionConverters.RichOption
 
@@ -64,9 +63,9 @@ class RequestChannelTest {
   def testAlterRequests(): Unit = {
 
     val sensitiveValue = "secret"
-    def verifyConfig(resource: ConfigResource, entries: Seq[ConfigEntry], 
expectedValues: Map[String, String]): Unit = {
+    def verifyConfig(resource: ConfigResource, entries: 
util.List[ConfigEntry], expectedValues: Map[String, String]): Unit = {
       val alterConfigs = request(new AlterConfigsRequest.Builder(
-          Collections.singletonMap(resource, new 
Config(entries.asJavaCollection)), true).build())
+          util.Map.of(resource, new Config(entries)), true).build())
 
       val loggableAlterConfigs = 
alterConfigs.loggableRequest.asInstanceOf[AlterConfigsRequest]
       val loggedConfig = loggableAlterConfigs.configs.get(resource)
@@ -77,37 +76,37 @@ class RequestChannelTest {
 
     val brokerResource = new ConfigResource(ConfigResource.Type.BROKER, "1")
     val keystorePassword = new 
ConfigEntry(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, sensitiveValue)
-    verifyConfig(brokerResource, Seq(keystorePassword), 
Map(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG -> Password.HIDDEN))
+    verifyConfig(brokerResource, util.List.of(keystorePassword), 
Map(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG -> Password.HIDDEN))
 
     val keystoreLocation = new 
ConfigEntry(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "/path/to/keystore")
-    verifyConfig(brokerResource, Seq(keystoreLocation), 
Map(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG -> "/path/to/keystore"))
-    verifyConfig(brokerResource, Seq(keystoreLocation, keystorePassword),
+    verifyConfig(brokerResource, util.List.of(keystoreLocation), 
Map(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG -> "/path/to/keystore"))
+    verifyConfig(brokerResource, util.List.of(keystoreLocation, 
keystorePassword),
       Map(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG -> "/path/to/keystore", 
SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG -> Password.HIDDEN))
 
     val listenerKeyPassword = new 
ConfigEntry(s"listener.name.internal.${SslConfigs.SSL_KEY_PASSWORD_CONFIG}", 
sensitiveValue)
-    verifyConfig(brokerResource, Seq(listenerKeyPassword), 
Map(listenerKeyPassword.name -> Password.HIDDEN))
+    verifyConfig(brokerResource, util.List.of(listenerKeyPassword), 
Map(listenerKeyPassword.name -> Password.HIDDEN))
 
     val listenerKeystore = new 
ConfigEntry(s"listener.name.internal.${SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG}",
 "/path/to/keystore")
-    verifyConfig(brokerResource, Seq(listenerKeystore), 
Map(listenerKeystore.name -> "/path/to/keystore"))
+    verifyConfig(brokerResource, util.List.of(listenerKeystore), 
Map(listenerKeystore.name -> "/path/to/keystore"))
 
     val plainJaasConfig = new 
ConfigEntry(s"listener.name.internal.plain.${SaslConfigs.SASL_JAAS_CONFIG}", 
sensitiveValue)
-    verifyConfig(brokerResource, Seq(plainJaasConfig), 
Map(plainJaasConfig.name -> Password.HIDDEN))
+    verifyConfig(brokerResource, util.List.of(plainJaasConfig), 
Map(plainJaasConfig.name -> Password.HIDDEN))
 
     val plainLoginCallback = new 
ConfigEntry(s"listener.name.internal.plain.${SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS}",
 "test.LoginClass")
-    verifyConfig(brokerResource, Seq(plainLoginCallback), 
Map(plainLoginCallback.name -> plainLoginCallback.value))
+    verifyConfig(brokerResource, util.List.of(plainLoginCallback), 
Map(plainLoginCallback.name -> plainLoginCallback.value))
 
     val customConfig = new ConfigEntry("custom.config", sensitiveValue)
-    verifyConfig(brokerResource, Seq(customConfig), Map(customConfig.name -> 
Password.HIDDEN))
+    verifyConfig(brokerResource, util.List.of(customConfig), 
Map(customConfig.name -> Password.HIDDEN))
 
     val topicResource = new ConfigResource(ConfigResource.Type.TOPIC, 
"testTopic")
     val compressionType = new ConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, 
"lz4")
-    verifyConfig(topicResource, Seq(compressionType), 
Map(TopicConfig.COMPRESSION_TYPE_CONFIG -> "lz4"))
-    verifyConfig(topicResource, Seq(customConfig), Map(customConfig.name -> 
Password.HIDDEN))
+    verifyConfig(topicResource, util.List.of(compressionType), 
Map(TopicConfig.COMPRESSION_TYPE_CONFIG -> "lz4"))
+    verifyConfig(topicResource, util.List.of(customConfig), 
Map(customConfig.name -> Password.HIDDEN))
 
     // Verify empty request
     val alterConfigs = request(new AlterConfigsRequest.Builder(
-        Collections.emptyMap[ConfigResource, Config], true).build())
-    assertEquals(Collections.emptyMap, 
alterConfigs.loggableRequest.asInstanceOf[AlterConfigsRequest].configs)
+        util.Map.of[ConfigResource, Config], true).build())
+    assertEquals(util.Map.of, 
alterConfigs.loggableRequest.asInstanceOf[AlterConfigsRequest].configs)
   }
 
   @Test
@@ -189,7 +188,7 @@ class RequestChannelTest {
 
   @Test
   def testNonAlterRequestsNotTransformed(): Unit = {
-    val metadataRequest = request(new 
MetadataRequest.Builder(List("topic").asJava, true).build())
+    val metadataRequest = request(new 
MetadataRequest.Builder(util.List.of("topic"), true).build())
     assertSame(metadataRequest.body[MetadataRequest], 
metadataRequest.loggableRequest)
   }
 
@@ -198,10 +197,10 @@ class RequestChannelTest {
     val sensitiveValue = "secret"
     val resource = new ConfigResource(ConfigResource.Type.BROKER, "1")
     val keystorePassword = new 
ConfigEntry(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, sensitiveValue)
-    val entries = Seq(keystorePassword)
+    val entries = util.List.of(keystorePassword)
 
-    val alterConfigs = request(new 
AlterConfigsRequest.Builder(Collections.singletonMap(resource,
-      new Config(entries.asJavaCollection)), true).build())
+    val alterConfigs = request(new 
AlterConfigsRequest.Builder(util.Map.of(resource,
+      new Config(entries)), true).build())
 
     
assertTrue(isValidJson(RequestConvertToJson.request(alterConfigs.loggableRequest).toString))
   }
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala 
b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index ac1a98e7fc9..6a4b8d8ca67 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -59,7 +59,7 @@ import java.security.cert.X509Certificate
 import java.util
 import java.util.concurrent.atomic.AtomicInteger
 import java.util.concurrent._
-import java.util.{Collections, Properties, Random}
+import java.util.{Properties, Random}
 import javax.net.ssl._
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
@@ -85,7 +85,7 @@ class SocketServerTest {
   TestUtils.clearYammerMetrics()
 
   private val apiVersionManager = new 
SimpleApiVersionManager(ListenerType.BROKER, true,
-    () => new FinalizedFeatures(MetadataVersion.latestTesting(), 
Collections.emptyMap[String, java.lang.Short], 0))
+    () => new FinalizedFeatures(MetadataVersion.latestTesting(), 
util.Map.of[String, java.lang.Short], 0))
   var server: SocketServer = _
   val sockets = new ArrayBuffer[Socket]
 
@@ -1722,7 +1722,7 @@ class SocketServerTest {
     val testableServer = new 
TestableSocketServer(KafkaConfig.fromProps(props), connectionQueueSize = 1)
     testableServer.enableRequestProcessing(Map.empty).get(1, TimeUnit.MINUTES)
     val testableSelector = testableServer.testableSelector
-    val errors = new mutable.HashSet[String]
+    val errors = new util.HashSet[String]()
 
     def acceptorStackTraces: scala.collection.Map[Thread, String] = {
       Thread.getAllStackTraces.asScala.collect {
@@ -1746,7 +1746,7 @@ class SocketServerTest {
       // Block selector until Acceptor is blocked while connections are pending
       testableSelector.pollCallback = () => {
         try {
-          TestUtils.waitUntilTrue(() => errors.nonEmpty || 
registeredConnectionCount >= numConnections - 1 || acceptorBlocked,
+          TestUtils.waitUntilTrue(() => !errors.isEmpty || 
registeredConnectionCount >= numConnections - 1 || acceptorBlocked,
             "Acceptor not blocked", waitTimeMs = 10000)
         } catch {
           case _: Throwable => errors.add(s"Acceptor not blocked: 
$acceptorStackTraces")
@@ -1754,9 +1754,9 @@ class SocketServerTest {
       }
       testableSelector.operationCounts.clear()
       val sockets = (1 to numConnections).map(_ => connect(testableServer))
-      TestUtils.waitUntilTrue(() => errors.nonEmpty || 
registeredConnectionCount == numConnections,
+      TestUtils.waitUntilTrue(() => !errors.isEmpty || 
registeredConnectionCount == numConnections,
         "Connections not registered", waitTimeMs = 15000)
-      assertEquals(Set.empty, errors)
+      assertEquals(util.Set.of, errors)
       testableSelector.waitForOperations(SelectorOperation.Register, 
numConnections)
 
       // In each iteration, SocketServer processes at most connectionQueueSize 
(1 in this test)
@@ -2061,7 +2061,7 @@ class SocketServerTest {
     private var conn: Option[Socket] = None
 
     override protected[network] def createSelector(channelBuilder: 
ChannelBuilder): Selector = {
-      new TestableSelector(config, channelBuilder, time, metrics, 
metricTags.asScala)
+      new TestableSelector(config, channelBuilder, time, metrics, metricTags)
     }
 
     override private[network] def processException(errorMessage: String, 
throwable: Throwable): Unit = {
@@ -2159,9 +2159,9 @@ class SocketServerTest {
     case object CloseSelector extends SelectorOperation
   }
 
-  class TestableSelector(config: KafkaConfig, channelBuilder: ChannelBuilder, 
time: Time, metrics: Metrics, metricTags: mutable.Map[String, String] = 
mutable.Map.empty)
+  class TestableSelector(config: KafkaConfig, channelBuilder: ChannelBuilder, 
time: Time, metrics: Metrics, metricTags: util.Map[String, String] = new 
util.HashMap())
     extends Selector(config.socketRequestMaxBytes, 
config.connectionsMaxIdleMs, config.failedAuthenticationDelayMs,
-      metrics, time, "socket-server", metricTags.asJava, false, true, 
channelBuilder, MemoryPool.NONE, new LogContext()) {
+      metrics, time, "socket-server", metricTags, false, true, channelBuilder, 
MemoryPool.NONE, new LogContext()) {
 
     val failures = mutable.Map[SelectorOperation, Throwable]()
     val operationCounts = mutable.Map[SelectorOperation, 
Int]().withDefaultValue(0)
diff --git 
a/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala 
b/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala
index 6a9d8dcd1d4..0839990868f 100644
--- a/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala
@@ -44,7 +44,7 @@ import org.junit.jupiter.api.Test
 
 import java.net.InetAddress
 import java.util
-import java.util.{Collections, Properties, UUID}
+import java.util.{Properties, UUID}
 import scala.jdk.CollectionConverters._
 
 class AuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
@@ -567,7 +567,7 @@ class AuthorizerTest extends QuorumTestHarness with 
BaseAuthorizerTest {
     val acl3 = new AclBinding(resource2, new 
AccessControlEntry(principal.toString, WILDCARD_HOST, DESCRIBE, ALLOW))
     val acl4 = new AclBinding(prefixedResource, new 
AccessControlEntry(wildcardPrincipal.toString, WILDCARD_HOST, READ, ALLOW))
 
-    authorizer1.createAcls(requestContext, List(acl1, acl2, acl3, acl4).asJava)
+    authorizer1.createAcls(requestContext, util.List.of(acl1, acl2, acl3, 
acl4))
     assertEquals(Set(acl1, acl2, acl3, acl4), 
authorizer1.acls(AclBindingFilter.ANY).asScala.toSet)
     assertEquals(Set(acl1, acl2), authorizer1.acls(new 
AclBindingFilter(resource1.toFilter, 
AccessControlEntryFilter.ANY)).asScala.toSet)
     assertEquals(Set(acl4), authorizer1.acls(new 
AclBindingFilter(prefixedResource.toFilter, 
AccessControlEntryFilter.ANY)).asScala.toSet)
@@ -632,7 +632,7 @@ class AuthorizerTest extends QuorumTestHarness with 
BaseAuthorizerTest {
 
   private def authorize(authorizer: Authorizer, requestContext: 
RequestContext, operation: AclOperation, resource: ResourcePattern): Boolean = {
     val action = new Action(operation, resource, 1, true, true)
-    authorizer.authorize(requestContext, List(action).asJava).asScala.head == 
AuthorizationResult.ALLOWED
+    authorizer.authorize(requestContext, util.List.of(action)).asScala.head == 
AuthorizationResult.ALLOWED
   }
 
   private def getAcls(authorizer: Authorizer, resourcePattern: 
ResourcePattern): Set[AccessControlEntry] = {
@@ -669,7 +669,7 @@ class AuthorizerTest extends QuorumTestHarness with 
BaseAuthorizerTest {
                                   pluginMetrics: PluginMetrics): Unit = {
     standardAuthorizer.configure(configs)
     standardAuthorizer.withPluginMetrics(pluginMetrics)
-    initializeStandardAuthorizer(standardAuthorizer, new 
AuthorizerTestServerInfo(Collections.singletonList(PLAINTEXT)))
+    initializeStandardAuthorizer(standardAuthorizer, new 
AuthorizerTestServerInfo(util.List.of(PLAINTEXT)))
   }
 
   def initializeStandardAuthorizer(standardAuthorizer: StandardAuthorizer,
diff --git a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala 
b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
index 67df4f1a7e6..336a8dd55c3 100644
--- a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
@@ -485,7 +485,7 @@ class DumpLogSegmentsTest {
         new TopicRecord().setName("test-topic").setTopicId(Uuid.randomUuid()), 
0.toShort),
       new ApiMessageAndVersion(
         new PartitionChangeRecord().setTopicId(Uuid.randomUuid()).setLeader(1).
-          setPartitionId(0).setIsr(util.Arrays.asList(0, 1, 2)), 0.toShort)
+          setPartitionId(0).setIsr(util.List.of(0, 1, 2)), 0.toShort)
     )
 
     val records: Array[SimpleRecord] = metadataRecords.map(message => {
@@ -565,7 +565,7 @@ class DumpLogSegmentsTest {
 
   @Test
   def testDumpMetadataSnapshot(): Unit = {
-    val metadataRecords = Seq(
+    val metadataRecords = util.List.of(
       new ApiMessageAndVersion(
         new RegisterBrokerRecord().setBrokerId(0).setBrokerEpoch(10), 
0.toShort),
       new ApiMessageAndVersion(
@@ -574,7 +574,7 @@ class DumpLogSegmentsTest {
         new TopicRecord().setName("test-topic").setTopicId(Uuid.randomUuid()), 
0.toShort),
       new ApiMessageAndVersion(
         new PartitionChangeRecord().setTopicId(Uuid.randomUuid()).setLeader(1).
-          setPartitionId(0).setIsr(util.Arrays.asList(0, 1, 2)), 0.toShort)
+          setPartitionId(0).setIsr(util.List.of(0, 1, 2)), 0.toShort)
     )
 
     val metadataLog = KafkaMetadataLog(
@@ -603,7 +603,7 @@ class DumpLogSegmentsTest {
         
.setVoterSet(Optional.of(VoterSetTest.voterSet(VoterSetTest.voterMap(IntStream.of(1,
 2, 3), true))))
         .build(MetadataRecordSerde.INSTANCE)
     ) { snapshotWriter =>
-      snapshotWriter.append(metadataRecords.asJava)
+      snapshotWriter.append(metadataRecords)
       snapshotWriter.freeze()
     }
 
@@ -662,14 +662,14 @@ class DumpLogSegmentsTest {
 
     // Get all the batches
     val output = runDumpLogSegments(Array("--files", logFilePath))
-    val lines = util.Arrays.asList(output.split("\n"): _*).listIterator()
+    val lines = util.List.of(output.split("\n"): _*).listIterator()
 
     // Get total bytes of the partial batches
     val partialBatchesBytes = readPartialBatchesBytes(lines, partialBatches)
 
     // Request only the partial batches by bytes
     val partialOutput = runDumpLogSegments(Array("--max-bytes", 
partialBatchesBytes.toString, "--files", logFilePath))
-    val partialLines = util.Arrays.asList(partialOutput.split("\n"): 
_*).listIterator()
+    val partialLines = util.List.of(partialOutput.split("\n"): 
_*).listIterator()
 
     // Count the total of partial batches limited by bytes
     val partialBatchesCount = countBatches(partialLines)
@@ -922,14 +922,14 @@ class DumpLogSegmentsTest {
             .setProducerEpoch(2.toShort)
             .setProducerId(12L)
             .setTransactionLastUpdateTimestampMs(123L)
-            .setTransactionPartitions(List(
+            .setTransactionPartitions(util.List.of(
               new TransactionLogValue.PartitionsSchema()
                 .setTopic("topic1")
-                .setPartitionIds(List(0, 1, 2).map(Integer.valueOf).asJava),
+                .setPartitionIds(util.List.of[Integer](0, 1, 2)),
               new TransactionLogValue.PartitionsSchema()
                 .setTopic("topic2")
-                .setPartitionIds(List(3, 4, 5).map(Integer.valueOf).asJava)
-            ).asJava)
+                .setPartitionIds(util.List.of[Integer](3, 4, 5))
+            ))
             .setTransactionStartTimestampMs(13L)
             .setTransactionStatus(0)
             .setTransactionTimeoutMs(14),
@@ -1024,7 +1024,7 @@ class DumpLogSegmentsTest {
     )
 
     val output = runDumpLogSegments(Array("--deep-iteration", "--files", 
logFilePath))
-    val lines = util.Arrays.asList(output.split("\n"): _*).listIterator()
+    val lines = util.List.of(output.split("\n"): _*).listIterator()
 
     for (batch <- logReadInfo.records.batches.asScala) {
       val parsedBatchOpt = readBatchMetadata(lines)
@@ -1101,13 +1101,13 @@ class DumpLogSegmentsTest {
           .setStartOffset(0)
           .setCreateTimestamp(timestamp)
           .setWriteTimestamp(timestamp)
-          .setStateBatches(List[ShareSnapshotValue.StateBatch](
+          .setStateBatches(util.List.of[ShareSnapshotValue.StateBatch](
             new ShareSnapshotValue.StateBatch()
               .setFirstOffset(0)
               .setLastOffset(4)
               .setDeliveryState(2)
               .setDeliveryCount(1)
-          ).asJava),
+          )),
           0.toShort)
       ))
     )
@@ -1127,13 +1127,13 @@ class DumpLogSegmentsTest {
           .setSnapshotEpoch(0)
           .setLeaderEpoch(0)
           .setStartOffset(0)
-          .setStateBatches(List[ShareUpdateValue.StateBatch](
+          .setStateBatches(util.List.of[ShareUpdateValue.StateBatch](
             new ShareUpdateValue.StateBatch()
               .setFirstOffset(0)
               .setLastOffset(4)
               .setDeliveryState(2)
               .setDeliveryCount(1)
-          ).asJava),
+          )),
           0.toShort)
       ))
     )
diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala 
b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
index 42e262a6858..28b132243e7 100644
--- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
@@ -79,7 +79,7 @@ class StorageToolTest {
     val tempDir = TestUtils.tempDir()
     try {
       assertEquals(1, StorageTool.
-        infoCommand(new PrintStream(stream), true, Seq(tempDir.toString)))
+        infoCommand(new PrintStream(stream), kraftMode = true, 
Seq(tempDir.toString)))
       assertEquals(s"""Found log directory:
   ${tempDir.toString}
 
@@ -97,7 +97,7 @@ Found problem:
     tempDir.delete()
     try {
       assertEquals(1, StorageTool.
-        infoCommand(new PrintStream(stream), true, Seq(tempDir.toString)))
+        infoCommand(new PrintStream(stream), kraftMode = true, 
Seq(tempDir.toString)))
       assertEquals(s"""Found problem:
   ${tempDir.toString} does not exist
 
@@ -111,7 +111,7 @@ Found problem:
     val tempFile = TestUtils.tempFile()
     try {
       assertEquals(1, StorageTool.
-        infoCommand(new PrintStream(stream), true, Seq(tempFile.toString)))
+        infoCommand(new PrintStream(stream), kraftMode = true, 
Seq(tempFile.toString)))
       assertEquals(s"""Found problem:
   ${tempFile.toString} is not a directory
 
@@ -125,13 +125,13 @@ Found problem:
     val tempDir = TestUtils.tempDir()
     try {
       
Files.write(tempDir.toPath.resolve(MetaPropertiesEnsemble.META_PROPERTIES_NAME),
-        String.join("\n", util.Arrays.asList(
+        String.join("\n", util.List.of(
           "version=1",
           "node.id=1",
           "cluster.id=XcZZOzUqS4yHOjhMQB6JLQ")).
             getBytes(StandardCharsets.UTF_8))
       assertEquals(1, StorageTool.
-        infoCommand(new PrintStream(stream), false, Seq(tempDir.toString)))
+        infoCommand(new PrintStream(stream), kraftMode = false, 
Seq(tempDir.toString)))
       assertEquals(s"""Found log directory:
   ${tempDir.toString}
 
@@ -150,13 +150,13 @@ Found problem:
     val tempDir = TestUtils.tempDir()
     try {
       
Files.write(tempDir.toPath.resolve(MetaPropertiesEnsemble.META_PROPERTIES_NAME),
-        String.join("\n", util.Arrays.asList(
+        String.join("\n", util.List.of(
           "version=0",
           "broker.id=1",
           "cluster.id=26c36907-4158-4a35-919d-6534229f5241")).
           getBytes(StandardCharsets.UTF_8))
       assertEquals(1, StorageTool.
-        infoCommand(new PrintStream(stream), true, Seq(tempDir.toString)))
+        infoCommand(new PrintStream(stream), kraftMode = true, 
Seq(tempDir.toString)))
       assertEquals(s"""Found log directory:
   ${tempDir.toString}
 
@@ -193,7 +193,7 @@ Found problem:
    ): Int = {
     val tempDir = TestUtils.tempDir()
     try {
-      val configPathString = new File(tempDir.getAbsolutePath(), 
"format.props").toString
+      val configPathString = new File(tempDir.getAbsolutePath, 
"format.props").toString
       PropertiesUtils.writePropertiesFile(properties, configPathString, true)
       val arguments = ListBuffer[String]("format",
         "--cluster-id", "XcZZOzUqS4yHOjhMQB6JLQ")
@@ -234,7 +234,7 @@ Found problem:
     val unavailableDir1 = TestUtils.tempFile()
     val properties = new Properties()
     properties.putAll(defaultStaticQuorumProperties)
-    properties.setProperty("log.dirs", s"${availableDir1},${unavailableDir1}")
+    properties.setProperty("log.dirs", s"$availableDir1,$unavailableDir1")
     val stream = new ByteArrayOutputStream()
     assertEquals(0, runFormatCommand(stream, properties))
 
@@ -273,7 +273,7 @@ Found problem:
     assertEquals(0, runFormatCommand(stream, properties))
     properties.setProperty("log.dirs", availableDirs.mkString(","))
     val stream2 = new ByteArrayOutputStream()
-    assertEquals(0, runFormatCommand(stream2, properties, Seq(), true))
+    assertEquals(0, runFormatCommand(stream2, properties, Seq(), 
ignoreFormatted = true))
   }
 
   @Test
@@ -282,7 +282,7 @@ Found problem:
     val unavailableDir2 = TestUtils.tempFile()
     val properties = new Properties()
     properties.putAll(defaultStaticQuorumProperties)
-    properties.setProperty("log.dirs", 
s"${unavailableDir1},${unavailableDir2}")
+    properties.setProperty("log.dirs", s"$unavailableDir1,$unavailableDir2")
     val stream = new ByteArrayOutputStream()
     assertEquals("No available log directories to format.", 
assertThrows(classOf[FormatterException],
       () => runFormatCommand(stream, properties)).getMessage)
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 23eeb7e9926..dd69155097b 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -72,7 +72,7 @@ import java.time.Duration
 import java.util
 import java.util.concurrent._
 import java.util.concurrent.atomic.AtomicBoolean
-import java.util.{Collections, Optional, Properties}
+import java.util.{Optional, Properties}
 import scala.collection.mutable.ArrayBuffer
 import scala.collection.{Map, Seq, mutable}
 import scala.concurrent.duration.FiniteDuration
@@ -339,7 +339,7 @@ object TestUtils extends Logging {
     topicConfig.forEach((k, v) => configsMap.put(k.toString, v.toString))
 
     val result = if (replicaAssignment.isEmpty) {
-      admin.createTopics(Collections.singletonList(new NewTopic(
+      admin.createTopics(util.List.of(new NewTopic(
         topic, numPartitions, replicationFactor.toShort).configs(configsMap)))
     } else {
       val assignment = new util.HashMap[Integer, util.List[Integer]]()
@@ -348,7 +348,7 @@ object TestUtils extends Logging {
         v.foreach(r => replicas.add(r.asInstanceOf[Integer]))
         assignment.put(k.asInstanceOf[Integer], replicas)
       }
-      admin.createTopics(Collections.singletonList(new NewTopic(
+      admin.createTopics(util.List.of(new NewTopic(
         topic, assignment).configs(configsMap)))
     }
 
@@ -410,7 +410,7 @@ object TestUtils extends Logging {
     topic: String
   ): TopicDescription = {
     val describedTopics = admin.describeTopics(
-      Collections.singleton(topic)
+      util.Set.of(topic)
     ).allTopicNames().get()
     describedTopics.get(topic)
   }
@@ -466,7 +466,7 @@ object TestUtils extends Logging {
     controllers: Seq[ControllerServer]
   ): Unit = {
     try {
-      admin.deleteTopics(Collections.singletonList(topic)).all().get()
+      admin.deleteTopics(util.List.of(topic)).all().get()
     } catch {
       case e: ExecutionException if e.getCause != null &&
         e.getCause.isInstanceOf[UnknownTopicOrPartitionException] =>
@@ -589,7 +589,7 @@ object TestUtils extends Logging {
     newLeaderOpt: Option[Int] = None
   ): Int = {
     def getPartitionLeader(topic: String, partition: Int): Option[Int] = {
-      
admin.describeTopics(Collections.singletonList(topic)).allTopicNames().get().get(topic).partitions().asScala.
+      
admin.describeTopics(util.List.of(topic)).allTopicNames().get().get(topic).partitions().asScala.
         find(_.partition() == partition).
         flatMap { p =>
           if (p.leader().id() == Node.noNode().id()) {
@@ -821,7 +821,7 @@ object TestUtils extends Logging {
     waitUntilTrue(
       () => brokers.forall { broker =>
         if (expectedNumPartitions == 0) {
-          broker.metadataCache.numPartitions(topic).isEmpty()
+          broker.metadataCache.numPartitions(topic).isEmpty
         } else {
           broker.metadataCache.numPartitions(topic).orElse(null) == 
expectedNumPartitions
         }
@@ -1098,7 +1098,7 @@ object TestUtils extends Logging {
     waitUntilTrue(() => brokers.forall(broker =>
       broker.config.logDirs.stream().allMatch { logDir =>
         topicPartitions.forall { tp =>
-          !util.Arrays.asList(new File(logDir).list()).asScala.exists { 
partitionDirectoryNames =>
+          !util.List.of(new File(logDir).list()).asScala.exists { 
partitionDirectoryNames =>
             partitionDirectoryNames.exists { directoryName =>
               directoryName.startsWith(tp.topic + "-" + tp.partition) &&
                 directoryName.endsWith(UnifiedLog.DELETE_DIR_SUFFIX)
@@ -1155,7 +1155,7 @@ object TestUtils extends Logging {
       securityProtocol = securityProtocol,
       trustStoreFile = trustStoreFile)
     try {
-      consumer.subscribe(Collections.singleton(topic))
+      consumer.subscribe(util.Set.of(topic))
       consumeRecords(consumer, numMessages, waitTime)
     } finally consumer.close()
   }
@@ -1253,7 +1253,7 @@ object TestUtils extends Logging {
       else
         abortedValue
     }
-    new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, key, value, 
Collections.singleton(header))
+    new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, key, value, 
util.Set.of(header))
   }
 
   def producerRecordWithExpectedTransactionStatus(topic: String, partition: 
Integer, key: String, value: String, willBeCommitted: Boolean): 
ProducerRecord[Array[Byte], Array[Byte]] = {
@@ -1276,7 +1276,7 @@ object TestUtils extends Logging {
       if (committed.contains(topicPartition))
         consumer.seek(topicPartition, committed(topicPartition))
       else
-        consumer.seekToBeginning(Collections.singletonList(topicPartition))
+        consumer.seekToBeginning(util.List.of(topicPartition))
     }
   }
 
@@ -1293,7 +1293,7 @@ object TestUtils extends Logging {
         (resource, configEntries)
       }.toMap.asJava
     } else {
-      Map(new ConfigResource(ConfigResource.Type.BROKER, "") -> 
configEntries).asJava
+      util.Map.of(new ConfigResource(ConfigResource.Type.BROKER, ""), 
configEntries)
     }
     adminClient.incrementalAlterConfigs(configs)
   }
@@ -1322,7 +1322,7 @@ object TestUtils extends Logging {
     val partitionId = topicPartition.partition
 
     def currentLeader: Try[Option[Int]] = Try {
-      val topicDescription = 
client.describeTopics(List(topic).asJava).allTopicNames.get.get(topic)
+      val topicDescription = 
client.describeTopics(util.List.of(topic)).allTopicNames.get.get(topic)
       topicDescription.partitions.asScala
         .find(_.partition == partitionId)
         .flatMap(partitionState => Option(partitionState.leader))
@@ -1356,7 +1356,7 @@ object TestUtils extends Logging {
   }
 
   def currentIsr(admin: Admin, partition: TopicPartition): Set[Int] = {
-    val description = admin.describeTopics(Set(partition.topic).asJava)
+    val description = admin.describeTopics(util.Set.of(partition.topic))
       .allTopicNames
       .get
       .asScala

Reply via email to