This is an automated email from the ASF dual-hosted git repository.
frankvicky pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a090dc3ba5e MINOR: Cleanup Core Module- Scala Modules (4/n) (#19805)
a090dc3ba5e is described below
commit a090dc3ba5e711133658d7ee794da2a6902b85f0
Author: Sanskar Jhajharia <[email protected]>
AuthorDate: Fri Jun 6 12:19:16 2025 +0530
MINOR: Cleanup Core Module- Scala Modules (4/n) (#19805)
Now that Kafka Brokers support Java 17, this PR makes some changes in
core module. The changes in this PR are limited to only some Scala files
in the Core module's tests. The changes mostly include:
- Collections.emptyList(), Collections.singletonList() and
Arrays.asList() are replaced with List.of()
- Collections.emptyMap() and Collections.singletonMap() are replaced
with Map.of()
- Collections.singleton() is replaced with Set.of()
To be clear, the directories being targeted in this PR from unit.kafka
module:
- log
- network
- security
- tools
- utils
Reviewers: TengYao Chi <[email protected]>
---
.../test/scala/unit/kafka/KafkaConfigTest.scala | 4 +-
.../unit/kafka/log/LogCleanerManagerTest.scala | 45 +++++++++++++--------
.../test/scala/unit/kafka/log/LogCleanerTest.scala | 12 +++---
.../test/scala/unit/kafka/log/LogConfigTest.scala | 47 +++++++++++-----------
.../test/scala/unit/kafka/log/LogLoaderTest.scala | 16 ++++----
.../test/scala/unit/kafka/log/LogManagerTest.scala | 2 +-
.../test/scala/unit/kafka/log/UnifiedLogTest.scala | 16 ++++----
.../unit/kafka/network/ConnectionQuotasTest.scala | 33 ++++++++-------
.../scala/unit/kafka/network/ProcessorTest.scala | 6 +--
.../unit/kafka/network/RequestChannelTest.scala | 41 +++++++++----------
.../unit/kafka/network/SocketServerTest.scala | 18 ++++-----
.../kafka/security/authorizer/AuthorizerTest.scala | 8 ++--
.../unit/kafka/tools/DumpLogSegmentsTest.scala | 30 +++++++-------
.../scala/unit/kafka/tools/StorageToolTest.scala | 22 +++++-----
.../test/scala/unit/kafka/utils/TestUtils.scala | 28 ++++++-------
15 files changed, 169 insertions(+), 159 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
index 8834f6f3608..81f2c8e5e08 100644
--- a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
@@ -116,12 +116,12 @@ class KafkaConfigTest {
// We should be also able to set completely new property
val config3 =
KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile,
"--override", "log.cleanup.policy=compact")))
assertEquals(1, config3.nodeId)
- assertEquals(util.Arrays.asList("compact"), config3.logCleanupPolicy)
+ assertEquals(util.List.of("compact"), config3.logCleanupPolicy)
// We should be also able to set several properties
val config4 =
KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile,
"--override", "log.cleanup.policy=compact,delete", "--override", "node.id=2")))
assertEquals(2, config4.nodeId)
- assertEquals(util.Arrays.asList("compact","delete"),
config4.logCleanupPolicy)
+ assertEquals(util.List.of("compact","delete"), config4.logCleanupPolicy)
}
@Test
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index 8445baa7719..dc9a1d0928a 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -37,8 +37,7 @@ import org.junit.jupiter.api.{AfterEach, Test}
import java.lang.{Long => JLong}
import java.util
import java.util.concurrent.ConcurrentHashMap
-import scala.collection.mutable
-import scala.jdk.CollectionConverters._
+import java.util.stream.Collectors
import scala.jdk.OptionConverters.RichOptional
/**
@@ -61,13 +60,13 @@ class LogCleanerManagerTest extends Logging {
val offset = 999
val producerStateManagerConfig = new
ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
false)
- val cleanerCheckpoints: mutable.Map[TopicPartition, JLong] =
mutable.Map[TopicPartition, JLong]()
+ val cleanerCheckpoints: util.HashMap[TopicPartition, JLong] = new
util.HashMap[TopicPartition, JLong]()
class LogCleanerManagerMock(logDirs: util.List[File],
logs:
util.concurrent.ConcurrentMap[TopicPartition, UnifiedLog],
logDirFailureChannel: LogDirFailureChannel)
extends LogCleanerManager(logDirs, logs, logDirFailureChannel) {
override def allCleanerCheckpoints: util.Map[TopicPartition, JLong] = {
- cleanerCheckpoints.toMap.asJava
+ cleanerCheckpoints
}
override def updateCheckpoints(dataDir: File, partitionToUpdateOrAdd:
Optional[util.Map.Entry[TopicPartition, JLong]],
@@ -382,7 +381,11 @@ class LogCleanerManagerTest extends Logging {
assertEquals(0, cleanable.size, "should have 0 logs ready to be compacted")
// log cleanup finished, and log can be picked up for compaction
-
cleanerManager.resumeCleaning(deletableLog.asScala.map(_.getKey).toSet.asJava)
+ cleanerManager.resumeCleaning(
+ deletableLog.stream()
+ .map[TopicPartition](entry => entry.getKey)
+ .collect(Collectors.toSet[TopicPartition]())
+ )
val cleanable2 = cleanerManager.grabFilthiestCompactedLog(time, new
PreCleanStats()).toScala
assertEquals(1, cleanable2.size, "should have 1 logs ready to be
compacted")
@@ -396,7 +399,7 @@ class LogCleanerManagerTest extends Logging {
assertEquals(0, deletableLog2.size, "should have 0 logs ready to be
deleted")
// compaction done, should have 1 log eligible for log cleanup
- cleanerManager.doneDeleting(Seq(cleanable2.get.topicPartition).asJava)
+ cleanerManager.doneDeleting(util.List.of(cleanable2.get.topicPartition))
val deletableLog3 = cleanerManager.pauseCleaningForNonCompactedPartitions()
assertEquals(1, deletableLog3.size, "should have 1 logs ready to be
deleted")
}
@@ -501,9 +504,13 @@ class LogCleanerManagerTest extends Logging {
val pausedPartitions =
cleanerManager.pauseCleaningForNonCompactedPartitions()
// Log truncation happens due to unclean leader election
cleanerManager.abortAndPauseCleaning(log.topicPartition)
- cleanerManager.resumeCleaning(Set(log.topicPartition).asJava)
+ cleanerManager.resumeCleaning(util.Set.of(log.topicPartition))
// log cleanup finishes and pausedPartitions are resumed
-
cleanerManager.resumeCleaning(pausedPartitions.asScala.map(_.getKey).toSet.asJava)
+ cleanerManager.resumeCleaning(
+ pausedPartitions.stream()
+ .map[TopicPartition](entry => entry.getKey)
+ .collect(Collectors.toSet[TopicPartition]())
+ )
assertEquals(Optional.empty(),
cleanerManager.cleaningState(log.topicPartition))
}
@@ -522,7 +529,11 @@ class LogCleanerManagerTest extends Logging {
// Broker processes StopReplicaRequest with delete=true
cleanerManager.abortCleaning(log.topicPartition)
// log cleanup finishes and pausedPartitions are resumed
-
cleanerManager.resumeCleaning(pausedPartitions.asScala.map(_.getKey).toSet.asJava)
+ cleanerManager.resumeCleaning(
+ pausedPartitions.stream()
+ .map[TopicPartition](entry => entry.getKey)
+ .collect(Collectors.toSet[TopicPartition]())
+ )
assertEquals(Optional.empty(),
cleanerManager.cleaningState(log.topicPartition))
}
@@ -743,17 +754,17 @@ class LogCleanerManagerTest extends Logging {
val cleanerManager: LogCleanerManager = createCleanerManager(log)
val tp = new TopicPartition("log", 0)
- assertThrows(classOf[IllegalStateException], () =>
cleanerManager.doneDeleting(Seq(tp).asJava))
+ assertThrows(classOf[IllegalStateException], () =>
cleanerManager.doneDeleting(util.List.of(tp)))
cleanerManager.setCleaningState(tp, LogCleaningState.logCleaningPaused(1))
- assertThrows(classOf[IllegalStateException], () =>
cleanerManager.doneDeleting(Seq(tp).asJava))
+ assertThrows(classOf[IllegalStateException], () =>
cleanerManager.doneDeleting(util.List.of(tp)))
cleanerManager.setCleaningState(tp, LOG_CLEANING_IN_PROGRESS)
- cleanerManager.doneDeleting(Seq(tp).asJava)
+ cleanerManager.doneDeleting(util.List.of(tp))
assertTrue(cleanerManager.cleaningState(tp).isEmpty)
cleanerManager.setCleaningState(tp, LOG_CLEANING_ABORTED)
- cleanerManager.doneDeleting(Seq(tp).asJava)
+ cleanerManager.doneDeleting(util.List.of(tp))
assertEquals(LogCleaningState.logCleaningPaused(1),
cleanerManager.cleaningState(tp).get)
}
@@ -771,7 +782,7 @@ class LogCleanerManagerTest extends Logging {
val filthiestLog = cleanerManager.grabFilthiestCompactedLog(time, new
PreCleanStats())
assertEquals(Optional.empty(), filthiestLog, "Log should not be selected
for cleaning")
- assertEquals(20L, cleanerCheckpoints(tp), "Unselected log should have
checkpoint offset updated")
+ assertEquals(20L, cleanerCheckpoints.get(tp), "Unselected log should have
checkpoint offset updated")
}
/**
@@ -793,17 +804,17 @@ class LogCleanerManagerTest extends Logging {
val filthiestLog = cleanerManager.grabFilthiestCompactedLog(time, new
PreCleanStats()).get
assertEquals(tp1, filthiestLog.topicPartition, "Dirtier log should be
selected")
- assertEquals(15L, cleanerCheckpoints(tp0), "Unselected log should have
checkpoint offset updated")
+ assertEquals(15L, cleanerCheckpoints.get(tp0), "Unselected log should have
checkpoint offset updated")
}
private def createCleanerManager(log: UnifiedLog): LogCleanerManager = {
val logs = new util.concurrent.ConcurrentHashMap[TopicPartition,
UnifiedLog]()
logs.put(topicPartition, log)
- new LogCleanerManager(Seq(logDir, logDir2).asJava, logs, null)
+ new LogCleanerManager(util.List.of(logDir, logDir2), logs, null)
}
private def createCleanerManagerMock(pool:
util.concurrent.ConcurrentMap[TopicPartition, UnifiedLog]):
LogCleanerManagerMock = {
- new LogCleanerManagerMock(Seq(logDir).asJava, pool, null)
+ new LogCleanerManagerMock(util.List.of(logDir), pool, null)
}
private def createLog(segmentSize: Int,
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 1bebfaa49e1..5b842d60e46 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -913,7 +913,7 @@ class LogCleanerTest extends Logging {
// clean the log
val stats = new CleanerStats(Time.SYSTEM)
- cleaner.cleanSegments(log, Seq(log.logSegments.asScala.head).asJava, map,
0L, stats, new CleanedTransactionMetadata, -1,
log.logSegments.asScala.head.readNextOffset)
+ cleaner.cleanSegments(log, util.List.of(log.logSegments.asScala.head),
map, 0L, stats, new CleanedTransactionMetadata, -1,
log.logSegments.asScala.head.readNextOffset)
val shouldRemain = LogTestUtils.keysInLog(log).filterNot(keys.contains)
assertEquals(shouldRemain, LogTestUtils.keysInLog(log))
}
@@ -926,7 +926,7 @@ class LogCleanerTest extends Logging {
val (log, offsetMap) =
createLogWithMessagesLargerThanMaxSize(largeMessageSize = 1024 * 1024)
val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024)
- cleaner.cleanSegments(log, Seq(log.logSegments.asScala.head).asJava,
offsetMap, 0L, new CleanerStats(Time.SYSTEM), new CleanedTransactionMetadata,
-1, log.logSegments.asScala.head.readNextOffset)
+ cleaner.cleanSegments(log, util.List.of(log.logSegments.asScala.head),
offsetMap, 0L, new CleanerStats(Time.SYSTEM), new CleanedTransactionMetadata,
-1, log.logSegments.asScala.head.readNextOffset)
val shouldRemain = LogTestUtils.keysInLog(log).filter(k =>
!offsetMap.map.containsKey(k.toString))
assertEquals(shouldRemain, LogTestUtils.keysInLog(log))
}
@@ -945,7 +945,7 @@ class LogCleanerTest extends Logging {
val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024)
assertThrows(classOf[CorruptRecordException], () =>
- cleaner.cleanSegments(log, Seq(log.logSegments.asScala.head).asJava,
offsetMap, 0L, new CleanerStats(Time.SYSTEM), new CleanedTransactionMetadata,
-1, log.logSegments.asScala.head.readNextOffset)
+ cleaner.cleanSegments(log, util.List.of(log.logSegments.asScala.head),
offsetMap, 0L, new CleanerStats(Time.SYSTEM), new CleanedTransactionMetadata,
-1, log.logSegments.asScala.head.readNextOffset)
)
}
@@ -962,7 +962,7 @@ class LogCleanerTest extends Logging {
val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024)
assertThrows(classOf[CorruptRecordException], () =>
- cleaner.cleanSegments(log, Seq(log.logSegments.asScala.head).asJava,
offsetMap, 0L, new CleanerStats(Time.SYSTEM), new CleanedTransactionMetadata,
-1, log.logSegments.asScala.head.readNextOffset)
+ cleaner.cleanSegments(log, util.List.of(log.logSegments.asScala.head),
offsetMap, 0L, new CleanerStats(Time.SYSTEM), new CleanedTransactionMetadata,
-1, log.logSegments.asScala.head.readNextOffset)
)
}
@@ -1636,7 +1636,7 @@ class LogCleanerTest extends Logging {
// Try to clean segment with offset overflow. This will trigger log split
and the cleaning itself must abort.
assertThrows(classOf[LogCleaningAbortedException], () =>
- cleaner.cleanSegments(log, Seq(segmentWithOverflow).asJava, offsetMap,
0L, new CleanerStats(Time.SYSTEM),
+ cleaner.cleanSegments(log, util.List.of(segmentWithOverflow), offsetMap,
0L, new CleanerStats(Time.SYSTEM),
new CleanedTransactionMetadata, -1, segmentWithOverflow.readNextOffset)
)
assertEquals(numSegmentsInitial + 1, log.logSegments.size)
@@ -1646,7 +1646,7 @@ class LogCleanerTest extends Logging {
// Clean each segment now that split is complete.
val upperBoundOffset = log.logSegments.asScala.last.readNextOffset
for (segmentToClean <- log.logSegments.asScala)
- cleaner.cleanSegments(log, List(segmentToClean).asJava, offsetMap, 0L,
new CleanerStats(Time.SYSTEM),
+ cleaner.cleanSegments(log, util.List.of(segmentToClean), offsetMap, 0L,
new CleanerStats(Time.SYSTEM),
new CleanedTransactionMetadata, -1, upperBoundOffset)
assertEquals(expectedKeysAfterCleaning, LogTestUtils.keysInLog(log))
assertFalse(LogTestUtils.hasOffsetOverflow(log))
diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
index 296736fc678..1b5c61fc777 100644
--- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
@@ -27,7 +27,8 @@ import org.apache.kafka.server.common.MetadataVersion
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
-import java.util.{Collections, Properties}
+import java.util
+import java.util.Properties
import org.apache.kafka.server.config.ServerLogConfigs
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.storage.internals.log.{LogConfig,
ThrottledReplicaListValidator}
@@ -122,7 +123,7 @@ class LogConfigTest {
/* Sanity check that toHtml produces one of the expected configs */
@Test
def testToHtml(): Unit = {
- val html = LogConfig.configDefCopy.toHtml(4, (key: String) => "prefix_" +
key, Collections.emptyMap())
+ val html = LogConfig.configDefCopy.toHtml(4, (key: String) => "prefix_" +
key, util.Map.of)
val expectedConfig = "<h4><a id=\"file.delete.delay.ms\"></a><a
id=\"prefix_file.delete.delay.ms\"
href=\"#prefix_file.delete.delay.ms\">file.delete.delay.ms</a></h4>"
assertTrue(html.contains(expectedConfig), s"Could not find
`$expectedConfig` in:\n $html")
}
@@ -273,7 +274,7 @@ class LogConfigTest {
props.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG,
localRetentionMs.toString)
props.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG,
localRetentionBytes.toString)
assertThrows(classOf[ConfigException],
- () => LogConfig.validate(Collections.emptyMap(), props,
kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
+ () => LogConfig.validate(util.Map.of, props,
kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled))
}
@Test
@@ -283,7 +284,7 @@ class LogConfigTest {
val kafkaConfig = KafkaConfig.fromProps(kafkaProps)
val logProps = new Properties()
def validateCleanupPolicy(): Unit = {
- LogConfig.validate(Collections.emptyMap(), logProps,
kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
+ LogConfig.validate(util.Map.of, logProps,
kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled)
}
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG,
TopicConfig.CLEANUP_POLICY_DELETE)
logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
@@ -310,10 +311,10 @@ class LogConfigTest {
val logProps = new Properties()
logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
if (sysRemoteStorageEnabled) {
- LogConfig.validate(Collections.emptyMap(), logProps,
kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
+ LogConfig.validate(util.Map.of, logProps,
kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled)
} else {
val message = assertThrows(classOf[ConfigException],
- () => LogConfig.validate(Collections.emptyMap(), logProps,
kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
+ () => LogConfig.validate(util.Map.of, logProps,
kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled))
assertTrue(message.getMessage.contains("Tiered Storage functionality is
disabled in the broker"))
}
}
@@ -329,8 +330,8 @@ class LogConfigTest {
logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false")
if (wasRemoteStorageEnabled) {
val message = assertThrows(classOf[InvalidConfigurationException],
- () =>
LogConfig.validate(Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
"true"),
- logProps, kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
+ () =>
LogConfig.validate(util.Map.of(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
"true"),
+ logProps, kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled))
assertTrue(message.getMessage.contains("It is invalid to disable remote
storage without deleting remote data. " +
"If you want to keep the remote data and turn to read only, please set
`remote.storage.enable=true,remote.log.copy.disable=true`. " +
"If you want to disable remote storage and delete all remote data,
please set `remote.storage.enable=false,remote.log.delete.on.disable=true`."))
@@ -338,12 +339,12 @@ class LogConfigTest {
// It should be able to disable the remote log storage when delete on
disable is set to true
logProps.put(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, "true")
-
LogConfig.validate(Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
"true"),
- logProps, kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
+
LogConfig.validate(util.Map.of(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
"true"),
+ logProps, kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled)
} else {
- LogConfig.validate(Collections.emptyMap(), logProps,
kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
-
LogConfig.validate(Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
"false"), logProps,
- kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
+ LogConfig.validate(util.Map.of, logProps,
kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled)
+
LogConfig.validate(util.Map.of(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
"false"), logProps,
+ kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled)
}
}
@@ -362,12 +363,12 @@ class LogConfigTest {
logProps.put(TopicConfig.RETENTION_MS_CONFIG, "500")
if (sysRemoteStorageEnabled) {
val message = assertThrows(classOf[ConfigException],
- () => LogConfig.validate(Collections.emptyMap(), logProps,
kafkaConfig.extractLogConfigMap,
- kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
+ () => LogConfig.validate(util.Map.of, logProps,
kafkaConfig.extractLogConfigMap,
+ kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled))
assertTrue(message.getMessage.contains(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG))
} else {
- LogConfig.validate(Collections.emptyMap(), logProps,
kafkaConfig.extractLogConfigMap,
- kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
+ LogConfig.validate(util.Map.of, logProps,
kafkaConfig.extractLogConfigMap,
+ kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled)
}
}
@@ -386,12 +387,12 @@ class LogConfigTest {
logProps.put(TopicConfig.RETENTION_BYTES_CONFIG, "128")
if (sysRemoteStorageEnabled) {
val message = assertThrows(classOf[ConfigException],
- () => LogConfig.validate(Collections.emptyMap(), logProps,
kafkaConfig.extractLogConfigMap,
- kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
+ () => LogConfig.validate(util.Map.of, logProps,
kafkaConfig.extractLogConfigMap,
+ kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled))
assertTrue(message.getMessage.contains(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG))
} else {
- LogConfig.validate(Collections.emptyMap(), logProps,
kafkaConfig.extractLogConfigMap,
- kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
+ LogConfig.validate(util.Map.of, logProps,
kafkaConfig.extractLogConfigMap,
+ kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled)
}
}
@@ -406,10 +407,10 @@ class LogConfigTest {
if (sysRemoteStorageEnabled) {
val message = assertThrows(classOf[ConfigException],
- () =>
LogConfig.validateBrokerLogConfigValues(kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
+ () =>
LogConfig.validateBrokerLogConfigValues(kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled))
assertTrue(message.getMessage.contains(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG))
} else {
- LogConfig.validateBrokerLogConfigValues(kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
+ LogConfig.validateBrokerLogConfigValues(kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled)
}
}
diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
index 0c465cf2138..c1d611ce6dc 100644
--- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
@@ -286,7 +286,7 @@ class LogLoaderTest {
val wrapper = Mockito.spy(segment)
Mockito.doAnswer { in =>
segmentsWithReads += wrapper
- segment.read(in.getArgument(0, classOf[java.lang.Long]),
in.getArgument(1, classOf[java.lang.Integer]), in.getArgument(2,
classOf[java.util.Optional[java.lang.Long]]), in.getArgument(3,
classOf[java.lang.Boolean]))
+ segment.read(in.getArgument(0, classOf[java.lang.Long]),
in.getArgument(1, classOf[java.lang.Integer]), in.getArgument(2,
classOf[util.Optional[java.lang.Long]]), in.getArgument(3,
classOf[java.lang.Boolean]))
}.when(wrapper).read(ArgumentMatchers.any(), ArgumentMatchers.any(),
ArgumentMatchers.any(), ArgumentMatchers.any())
Mockito.doAnswer { in =>
recoveredSegments += wrapper
@@ -391,12 +391,12 @@ class LogLoaderTest {
codec: Compression =
Compression.NONE,
timestamp: Long =
RecordBatch.NO_TIMESTAMP,
magicValue: Byte =
RecordBatch.CURRENT_MAGIC_VALUE): MemoryRecords = {
- val records = Seq(new SimpleRecord(timestamp, key, value))
+ val records = util.List.of(new SimpleRecord(timestamp, key, value))
- val buf =
ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava))
+ val buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records))
val builder = MemoryRecords.builder(buf, magicValue, codec,
TimestampType.CREATE_TIME, offset,
mockTime.milliseconds, leaderEpoch)
- records.foreach(builder.append)
+ records.forEach(builder.append)
builder.build()
}
@@ -559,7 +559,7 @@ class LogLoaderTest {
false,
LogOffsetsListener.NO_OP_OFFSETS_LISTENER)
-
verify(stateManager).removeStraySnapshots(any[java.util.List[java.lang.Long]])
+ verify(stateManager).removeStraySnapshots(any[util.List[java.lang.Long]])
verify(stateManager, times(2)).updateMapEndOffset(0L)
verify(stateManager, times(2)).takeSnapshot()
verify(stateManager).isEmpty
@@ -1215,11 +1215,11 @@ class LogLoaderTest {
val fourthBatch = singletonRecordsWithLeaderEpoch(value =
"random".getBytes, leaderEpoch = 3, offset = 3)
log.appendAsFollower(fourthBatch, Int.MaxValue)
- assertEquals(java.util.Arrays.asList(new EpochEntry(1, 0), new
EpochEntry(2, 1), new EpochEntry(3, 3)), leaderEpochCache.epochEntries)
+ assertEquals(util.List.of(new EpochEntry(1, 0), new EpochEntry(2, 1), new
EpochEntry(3, 3)), leaderEpochCache.epochEntries)
// deliberately remove some of the epoch entries
leaderEpochCache.truncateFromEndAsyncFlush(2)
- assertNotEquals(java.util.Arrays.asList(new EpochEntry(1, 0), new
EpochEntry(2, 1), new EpochEntry(3, 3)), leaderEpochCache.epochEntries)
+ assertNotEquals(util.List.of(new EpochEntry(1, 0), new EpochEntry(2, 1),
new EpochEntry(3, 3)), leaderEpochCache.epochEntries)
log.close()
// reopen the log and recover from the beginning
@@ -1227,7 +1227,7 @@ class LogLoaderTest {
val recoveredLeaderEpochCache = recoveredLog.leaderEpochCache
// epoch entries should be recovered
- assertEquals(java.util.Arrays.asList(new EpochEntry(1, 0), new
EpochEntry(2, 1), new EpochEntry(3, 3)), recoveredLeaderEpochCache.epochEntries)
+ assertEquals(util.List.of(new EpochEntry(1, 0), new EpochEntry(2, 1), new
EpochEntry(3, 3)), recoveredLeaderEpochCache.epochEntries)
recoveredLog.close()
}
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 04f4acca5de..a8946a3d139 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -534,7 +534,7 @@ class LogManagerTest {
true
}
- logManager.loadLog(log.dir, hadCleanShutdown = true,
Collections.emptyMap[TopicPartition, JLong],
Collections.emptyMap[TopicPartition, JLong], logConfig, Map.empty, new
ConcurrentHashMap[String, Integer](), providedIsStray)
+ logManager.loadLog(log.dir, hadCleanShutdown = true,
util.Map.of[TopicPartition, JLong], util.Map.of[TopicPartition, JLong],
logConfig, Map.empty, new ConcurrentHashMap[String, Integer](),
providedIsStray)
assertEquals(1, invokedCount)
assertTrue(
logDir.listFiles().toSet
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index 42c813074aa..e42b509b247 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -231,7 +231,7 @@ class UnifiedLogTest {
reopened.truncateFullyAndStartAt(2L, Optional.of(1L))
assertEquals(Optional.empty, reopened.firstUnstableOffset)
- assertEquals(java.util.Collections.emptyMap(),
reopened.producerStateManager.activeProducers)
+ assertEquals(util.Map.of, reopened.producerStateManager.activeProducers)
assertEquals(1L, reopened.logStartOffset)
assertEquals(2L, reopened.logEndOffset)
}
@@ -274,7 +274,7 @@ class UnifiedLogTest {
truncateFunc(reopened, 0L)
assertEquals(Optional.empty, reopened.firstUnstableOffset)
- assertEquals(java.util.Collections.emptyMap(),
reopened.producerStateManager.activeProducers)
+ assertEquals(util.Map.of, reopened.producerStateManager.activeProducers)
}
@Test
@@ -1980,7 +1980,7 @@ class UnifiedLogTest {
val log = createLog(logDir, LogTestUtils.createLogConfig(maxMessageBytes =
second.sizeInBytes - 1))
log.appendAsFollower(first, Int.MaxValue)
- // the second record is larger then limit but appendAsFollower does not
validate the size.
+ // the second record is larger than limit but appendAsFollower does not
validate the size.
log.appendAsFollower(second, Int.MaxValue)
}
@@ -1991,7 +1991,7 @@ class UnifiedLogTest {
val log = createLog(logDir, logConfig)
val previousEndOffset = log.logEndOffsetMetadata.messageOffset
- if (expectedException.isPresent()) {
+ if (expectedException.isPresent) {
assertThrows(
expectedException.get(),
() => log.appendAsFollower(records, Int.MaxValue)
@@ -2991,7 +2991,7 @@ class UnifiedLogTest {
for (_ <- 0 until 15)
log.appendAsLeader(createRecords, 0)
- // mark oldest segment as older the retention.ms
+ // mark the oldest segment as older the retention.ms
log.logSegments.asScala.head.setLastModified(mockTime.milliseconds - 20000)
val segments = log.numberOfSegments
@@ -3110,7 +3110,7 @@ class UnifiedLogTest {
log.deleteOldSegments()
//The oldest epoch entry should have been removed
- assertEquals(java.util.Arrays.asList(new EpochEntry(1, 5), new
EpochEntry(2, 10)), cache.epochEntries)
+ assertEquals(util.List.of(new EpochEntry(1, 5), new EpochEntry(2, 10)),
cache.epochEntries)
}
@Test
@@ -3135,7 +3135,7 @@ class UnifiedLogTest {
log.deleteOldSegments()
//The first entry should have gone from (0,0) => (0,5)
- assertEquals(java.util.Arrays.asList(new EpochEntry(0, 5), new
EpochEntry(1, 7), new EpochEntry(2, 10)), cache.epochEntries)
+ assertEquals(util.List.of(new EpochEntry(0, 5), new EpochEntry(1, 7), new
EpochEntry(2, 10)), cache.epochEntries)
}
@Test
@@ -4599,7 +4599,7 @@ class UnifiedLogTest {
def testGetFirstBatchTimestampForSegments(): Unit = {
val log = createLog(logDir, LogTestUtils.createLogConfig())
- val segments: java.util.List[LogSegment] = new
java.util.ArrayList[LogSegment]()
+ val segments: util.List[LogSegment] = new util.ArrayList[LogSegment]()
val seg1 = LogTestUtils.createSegment(1, logDir, 10, Time.SYSTEM)
val seg2 = LogTestUtils.createSegment(2, logDir, 10, Time.SYSTEM)
segments.add(seg1)
diff --git a/core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
b/core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
index 3906011a203..d9a64b4186f 100644
--- a/core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
@@ -20,7 +20,7 @@ package kafka.network
import java.net.InetAddress
import java.util
import java.util.concurrent.{Callable, ExecutorService, Executors, TimeUnit}
-import java.util.{Collections, Properties}
+import java.util.Properties
import com.yammer.metrics.core.Meter
import kafka.network.Processor.ListenerMetricTag
import kafka.server.KafkaConfig
@@ -37,7 +37,6 @@ import org.apache.kafka.server.util.MockTime
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api._
-import scala.jdk.CollectionConverters._
import scala.collection.{Map, mutable}
import scala.concurrent.TimeoutException
@@ -91,12 +90,12 @@ class ConnectionQuotasTest {
listeners.keys.foreach { name =>
blockedPercentMeters.put(name, new
KafkaMetricsGroup(this.getClass).newMeter(
- s"${name}BlockedPercent", "blocked time", TimeUnit.NANOSECONDS,
Map(ListenerMetricTag -> name).asJava))
+ s"${name}BlockedPercent", "blocked time", TimeUnit.NANOSECONDS,
util.Map.of(ListenerMetricTag, name)))
}
// use system time, because ConnectionQuota causes the current thread to
wait with timeout, which waits based on
// system time; so using mock time will likely result in test flakiness
due to a mixed use of mock and system time
time = Time.SYSTEM
- metrics = new Metrics(new MetricConfig(), Collections.emptyList(), time)
+ metrics = new Metrics(new MetricConfig(), util.List.of, time)
executor = Executors.newFixedThreadPool(listeners.size)
}
@@ -282,7 +281,7 @@ class ConnectionQuotasTest {
addListenersAndVerify(config, connectionQuotas)
- val listenerConfig = Map(SocketServerConfigs.MAX_CONNECTIONS_CONFIG ->
listenerMaxConnections.toString).asJava
+ val listenerConfig =
util.Map.of(SocketServerConfigs.MAX_CONNECTIONS_CONFIG,
listenerMaxConnections.toString)
listeners.values.foreach { listener =>
connectionQuotas.maxConnectionsPerListener(listener.listenerName).configure(listenerConfig)
}
@@ -374,7 +373,7 @@ class ConnectionQuotasTest {
val config = KafkaConfig.fromProps(props)
connectionQuotas = new ConnectionQuotas(config, time, metrics)
- val listenerConfig =
Map(SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG ->
listenerRateLimit.toString).asJava
+ val listenerConfig =
util.Map.of(SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG,
listenerRateLimit.toString)
addListenersAndVerify(config, listenerConfig, connectionQuotas)
// create connections with the rate < listener quota on every listener,
and verify there is no throttling
@@ -400,7 +399,7 @@ class ConnectionQuotasTest {
val config = KafkaConfig.fromProps(props)
connectionQuotas = new ConnectionQuotas(config, time, metrics)
- val listenerConfig =
Map(SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG ->
listenerRateLimit.toString).asJava
+ val listenerConfig =
util.Map.of(SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG,
listenerRateLimit.toString)
addListenersAndVerify(config, listenerConfig, connectionQuotas)
// create connections with the rate > listener quota on every listener
@@ -498,7 +497,7 @@ class ConnectionQuotasTest {
// with a default per-IP limit of 25 and a listener rate of 30, only one
IP should be able to saturate their IP rate
// limit, the other IP will hit listener rate limits and block
connectionQuotas.updateIpConnectionRateQuota(None,
Some(ipConnectionRateLimit))
- val listenerConfig =
Map(SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG ->
listenerRateLimit.toString).asJava
+ val listenerConfig =
util.Map.of(SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG,
listenerRateLimit.toString)
addListenersAndVerify(config, listenerConfig, connectionQuotas)
val listener = listeners("EXTERNAL").listenerName
// use a small number of connections because a longer-running test will
have both IPs throttle at different times
@@ -556,7 +555,7 @@ class ConnectionQuotasTest {
connectionQuotas.addListener(config, listeners("EXTERNAL").listenerName)
val maxListenerConnectionRate = 0
- val listenerConfig =
Map(SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG ->
maxListenerConnectionRate.toString).asJava
+ val listenerConfig =
util.Map.of(SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG,
maxListenerConnectionRate.toString)
assertThrows(classOf[ConfigException],
() =>
connectionQuotas.maxConnectionsPerListener(listeners("EXTERNAL").listenerName).validateReconfiguration(listenerConfig)
)
@@ -569,11 +568,11 @@ class ConnectionQuotasTest {
connectionQuotas.addListener(config, listeners("EXTERNAL").listenerName)
val listenerRateLimit = 20
- val listenerConfig =
Map(SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG ->
listenerRateLimit.toString).asJava
+ val listenerConfig =
util.Map.of(SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG,
listenerRateLimit.toString)
connectionQuotas.maxConnectionsPerListener(listeners("EXTERNAL").listenerName).configure(listenerConfig)
// remove connection rate limit
-
connectionQuotas.maxConnectionsPerListener(listeners("EXTERNAL").listenerName).reconfigure(Map.empty.asJava)
+
connectionQuotas.maxConnectionsPerListener(listeners("EXTERNAL").listenerName).reconfigure(util.Map.of)
// create connections as fast as possible, will timeout if connections get
throttled with previous rate
// (50s to create 1000 connections)
@@ -586,7 +585,7 @@ class ConnectionQuotasTest {
// configure 100 connection/second rate limit
val newMaxListenerConnectionRate = 10
- val newListenerConfig =
Map(SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG ->
newMaxListenerConnectionRate.toString).asJava
+ val newListenerConfig =
util.Map.of(SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG,
newMaxListenerConnectionRate.toString)
connectionQuotas.maxConnectionsPerListener(listeners("EXTERNAL").listenerName).reconfigure(newListenerConfig)
// verify rate limit
@@ -750,7 +749,7 @@ class ConnectionQuotasTest {
}
private def addListenersAndVerify(config: KafkaConfig, connectionQuotas:
ConnectionQuotas) : Unit = {
- addListenersAndVerify(config, Map.empty.asJava, connectionQuotas)
+ addListenersAndVerify(config, util.Map.of, connectionQuotas)
}
private def addListenersAndVerify(config: KafkaConfig,
@@ -829,7 +828,7 @@ class ConnectionQuotasTest {
val metricName = metrics.metricName(
"connection-accept-throttle-time",
SocketServer.MetricsGroup,
- Collections.singletonMap(Processor.ListenerMetricTag, listener))
+ util.Map.of(Processor.ListenerMetricTag, listener))
metrics.metric(metricName)
}
@@ -837,7 +836,7 @@ class ConnectionQuotasTest {
val metricName = metrics.metricName(
"ip-connection-accept-throttle-time",
SocketServer.MetricsGroup,
- Collections.singletonMap(Processor.ListenerMetricTag, listener))
+ util.Map.of(Processor.ListenerMetricTag, listener))
metrics.metric(metricName)
}
@@ -845,7 +844,7 @@ class ConnectionQuotasTest {
val metricName = metrics.metricName(
"connection-accept-rate",
SocketServer.MetricsGroup,
- Collections.singletonMap(Processor.ListenerMetricTag, listener))
+ util.Map.of(Processor.ListenerMetricTag, listener))
metrics.metric(metricName)
}
@@ -860,7 +859,7 @@ class ConnectionQuotasTest {
val metricName = metrics.metricName(
s"connection-accept-rate",
SocketServer.MetricsGroup,
- Collections.singletonMap("ip", ip))
+ util.Map.of("ip", ip))
metrics.metric(metricName)
}
diff --git a/core/src/test/scala/unit/kafka/network/ProcessorTest.scala
b/core/src/test/scala/unit/kafka/network/ProcessorTest.scala
index 575f004fe0f..54bbd0bf201 100644
--- a/core/src/test/scala/unit/kafka/network/ProcessorTest.scala
+++ b/core/src/test/scala/unit/kafka/network/ProcessorTest.scala
@@ -31,8 +31,9 @@ import org.junit.jupiter.api.Test
import org.junit.jupiter.api.function.Executable
import org.mockito.Mockito.mock
+import java.util
import java.util.function.Supplier
-import java.util.{Collections, Optional}
+import java.util.Optional
class ProcessorTest {
@@ -41,7 +42,7 @@ class ProcessorTest {
val requestHeader = RequestTestUtils.serializeRequestHeader(
new RequestHeader(ApiKeys.INIT_PRODUCER_ID, 0, "clientid", 0))
val apiVersionManager = new
SimpleApiVersionManager(ListenerType.CONTROLLER, true,
- () => new FinalizedFeatures(MetadataVersion.latestTesting(),
Collections.emptyMap[String, java.lang.Short], 0))
+ () => new FinalizedFeatures(MetadataVersion.latestTesting(),
util.Map.of[String, java.lang.Short], 0))
val e = assertThrows(classOf[InvalidRequestException],
(() => Processor.parseRequestHeader(apiVersionManager, requestHeader)):
Executable,
"INIT_PRODUCER_ID with listener type CONTROLLER should throw
InvalidRequestException exception")
@@ -95,5 +96,4 @@ class ProcessorTest {
assertTrue(e.toString.contains("unsupported version"))
}
}
-
}
diff --git a/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
b/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
index ecea412e989..8dbfa808d7f 100644
--- a/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
+++ b/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
@@ -17,7 +17,6 @@
package kafka.network
-
import com.fasterxml.jackson.databind.ObjectMapper
import kafka.network
import kafka.server.EnvelopeUtils
@@ -47,9 +46,9 @@ import org.mockito.Mockito.mock
import java.io.IOException
import java.net.InetAddress
import java.nio.ByteBuffer
-import java.util.Collections
+import java.util
import java.util.concurrent.atomic.AtomicReference
-import scala.collection.{Map, Seq}
+import scala.collection.Map
import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters.RichOption
@@ -64,9 +63,9 @@ class RequestChannelTest {
def testAlterRequests(): Unit = {
val sensitiveValue = "secret"
- def verifyConfig(resource: ConfigResource, entries: Seq[ConfigEntry],
expectedValues: Map[String, String]): Unit = {
+ def verifyConfig(resource: ConfigResource, entries:
util.List[ConfigEntry], expectedValues: Map[String, String]): Unit = {
val alterConfigs = request(new AlterConfigsRequest.Builder(
- Collections.singletonMap(resource, new
Config(entries.asJavaCollection)), true).build())
+ util.Map.of(resource, new Config(entries)), true).build())
val loggableAlterConfigs =
alterConfigs.loggableRequest.asInstanceOf[AlterConfigsRequest]
val loggedConfig = loggableAlterConfigs.configs.get(resource)
@@ -77,37 +76,37 @@ class RequestChannelTest {
val brokerResource = new ConfigResource(ConfigResource.Type.BROKER, "1")
val keystorePassword = new
ConfigEntry(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, sensitiveValue)
- verifyConfig(brokerResource, Seq(keystorePassword),
Map(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG -> Password.HIDDEN))
+ verifyConfig(brokerResource, util.List.of(keystorePassword),
Map(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG -> Password.HIDDEN))
val keystoreLocation = new
ConfigEntry(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "/path/to/keystore")
- verifyConfig(brokerResource, Seq(keystoreLocation),
Map(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG -> "/path/to/keystore"))
- verifyConfig(brokerResource, Seq(keystoreLocation, keystorePassword),
+ verifyConfig(brokerResource, util.List.of(keystoreLocation),
Map(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG -> "/path/to/keystore"))
+ verifyConfig(brokerResource, util.List.of(keystoreLocation,
keystorePassword),
Map(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG -> "/path/to/keystore",
SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG -> Password.HIDDEN))
val listenerKeyPassword = new
ConfigEntry(s"listener.name.internal.${SslConfigs.SSL_KEY_PASSWORD_CONFIG}",
sensitiveValue)
- verifyConfig(brokerResource, Seq(listenerKeyPassword),
Map(listenerKeyPassword.name -> Password.HIDDEN))
+ verifyConfig(brokerResource, util.List.of(listenerKeyPassword),
Map(listenerKeyPassword.name -> Password.HIDDEN))
val listenerKeystore = new
ConfigEntry(s"listener.name.internal.${SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG}",
"/path/to/keystore")
- verifyConfig(brokerResource, Seq(listenerKeystore),
Map(listenerKeystore.name -> "/path/to/keystore"))
+ verifyConfig(brokerResource, util.List.of(listenerKeystore),
Map(listenerKeystore.name -> "/path/to/keystore"))
val plainJaasConfig = new
ConfigEntry(s"listener.name.internal.plain.${SaslConfigs.SASL_JAAS_CONFIG}",
sensitiveValue)
- verifyConfig(brokerResource, Seq(plainJaasConfig),
Map(plainJaasConfig.name -> Password.HIDDEN))
+ verifyConfig(brokerResource, util.List.of(plainJaasConfig),
Map(plainJaasConfig.name -> Password.HIDDEN))
val plainLoginCallback = new
ConfigEntry(s"listener.name.internal.plain.${SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS}",
"test.LoginClass")
- verifyConfig(brokerResource, Seq(plainLoginCallback),
Map(plainLoginCallback.name -> plainLoginCallback.value))
+ verifyConfig(brokerResource, util.List.of(plainLoginCallback),
Map(plainLoginCallback.name -> plainLoginCallback.value))
val customConfig = new ConfigEntry("custom.config", sensitiveValue)
- verifyConfig(brokerResource, Seq(customConfig), Map(customConfig.name ->
Password.HIDDEN))
+ verifyConfig(brokerResource, util.List.of(customConfig),
Map(customConfig.name -> Password.HIDDEN))
val topicResource = new ConfigResource(ConfigResource.Type.TOPIC,
"testTopic")
val compressionType = new ConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG,
"lz4")
- verifyConfig(topicResource, Seq(compressionType),
Map(TopicConfig.COMPRESSION_TYPE_CONFIG -> "lz4"))
- verifyConfig(topicResource, Seq(customConfig), Map(customConfig.name ->
Password.HIDDEN))
+ verifyConfig(topicResource, util.List.of(compressionType),
Map(TopicConfig.COMPRESSION_TYPE_CONFIG -> "lz4"))
+ verifyConfig(topicResource, util.List.of(customConfig),
Map(customConfig.name -> Password.HIDDEN))
// Verify empty request
val alterConfigs = request(new AlterConfigsRequest.Builder(
- Collections.emptyMap[ConfigResource, Config], true).build())
- assertEquals(Collections.emptyMap,
alterConfigs.loggableRequest.asInstanceOf[AlterConfigsRequest].configs)
+ util.Map.of[ConfigResource, Config], true).build())
+ assertEquals(util.Map.of,
alterConfigs.loggableRequest.asInstanceOf[AlterConfigsRequest].configs)
}
@Test
@@ -189,7 +188,7 @@ class RequestChannelTest {
@Test
def testNonAlterRequestsNotTransformed(): Unit = {
- val metadataRequest = request(new
MetadataRequest.Builder(List("topic").asJava, true).build())
+ val metadataRequest = request(new
MetadataRequest.Builder(util.List.of("topic"), true).build())
assertSame(metadataRequest.body[MetadataRequest],
metadataRequest.loggableRequest)
}
@@ -198,10 +197,10 @@ class RequestChannelTest {
val sensitiveValue = "secret"
val resource = new ConfigResource(ConfigResource.Type.BROKER, "1")
val keystorePassword = new
ConfigEntry(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, sensitiveValue)
- val entries = Seq(keystorePassword)
+ val entries = util.List.of(keystorePassword)
- val alterConfigs = request(new
AlterConfigsRequest.Builder(Collections.singletonMap(resource,
- new Config(entries.asJavaCollection)), true).build())
+ val alterConfigs = request(new
AlterConfigsRequest.Builder(util.Map.of(resource,
+ new Config(entries)), true).build())
assertTrue(isValidJson(RequestConvertToJson.request(alterConfigs.loggableRequest).toString))
}
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index ac1a98e7fc9..6a4b8d8ca67 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -59,7 +59,7 @@ import java.security.cert.X509Certificate
import java.util
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent._
-import java.util.{Collections, Properties, Random}
+import java.util.{Properties, Random}
import javax.net.ssl._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
@@ -85,7 +85,7 @@ class SocketServerTest {
TestUtils.clearYammerMetrics()
private val apiVersionManager = new
SimpleApiVersionManager(ListenerType.BROKER, true,
- () => new FinalizedFeatures(MetadataVersion.latestTesting(),
Collections.emptyMap[String, java.lang.Short], 0))
+ () => new FinalizedFeatures(MetadataVersion.latestTesting(),
util.Map.of[String, java.lang.Short], 0))
var server: SocketServer = _
val sockets = new ArrayBuffer[Socket]
@@ -1722,7 +1722,7 @@ class SocketServerTest {
val testableServer = new
TestableSocketServer(KafkaConfig.fromProps(props), connectionQueueSize = 1)
testableServer.enableRequestProcessing(Map.empty).get(1, TimeUnit.MINUTES)
val testableSelector = testableServer.testableSelector
- val errors = new mutable.HashSet[String]
+ val errors = new util.HashSet[String]()
def acceptorStackTraces: scala.collection.Map[Thread, String] = {
Thread.getAllStackTraces.asScala.collect {
@@ -1746,7 +1746,7 @@ class SocketServerTest {
// Block selector until Acceptor is blocked while connections are pending
testableSelector.pollCallback = () => {
try {
- TestUtils.waitUntilTrue(() => errors.nonEmpty ||
registeredConnectionCount >= numConnections - 1 || acceptorBlocked,
+ TestUtils.waitUntilTrue(() => !errors.isEmpty ||
registeredConnectionCount >= numConnections - 1 || acceptorBlocked,
"Acceptor not blocked", waitTimeMs = 10000)
} catch {
case _: Throwable => errors.add(s"Acceptor not blocked:
$acceptorStackTraces")
@@ -1754,9 +1754,9 @@ class SocketServerTest {
}
testableSelector.operationCounts.clear()
val sockets = (1 to numConnections).map(_ => connect(testableServer))
- TestUtils.waitUntilTrue(() => errors.nonEmpty ||
registeredConnectionCount == numConnections,
+ TestUtils.waitUntilTrue(() => !errors.isEmpty ||
registeredConnectionCount == numConnections,
"Connections not registered", waitTimeMs = 15000)
- assertEquals(Set.empty, errors)
+ assertEquals(util.Set.of, errors)
testableSelector.waitForOperations(SelectorOperation.Register,
numConnections)
// In each iteration, SocketServer processes at most connectionQueueSize
(1 in this test)
@@ -2061,7 +2061,7 @@ class SocketServerTest {
private var conn: Option[Socket] = None
override protected[network] def createSelector(channelBuilder:
ChannelBuilder): Selector = {
- new TestableSelector(config, channelBuilder, time, metrics,
metricTags.asScala)
+ new TestableSelector(config, channelBuilder, time, metrics, metricTags)
}
override private[network] def processException(errorMessage: String,
throwable: Throwable): Unit = {
@@ -2159,9 +2159,9 @@ class SocketServerTest {
case object CloseSelector extends SelectorOperation
}
- class TestableSelector(config: KafkaConfig, channelBuilder: ChannelBuilder,
time: Time, metrics: Metrics, metricTags: mutable.Map[String, String] =
mutable.Map.empty)
+ class TestableSelector(config: KafkaConfig, channelBuilder: ChannelBuilder,
time: Time, metrics: Metrics, metricTags: util.Map[String, String] = new
util.HashMap())
extends Selector(config.socketRequestMaxBytes,
config.connectionsMaxIdleMs, config.failedAuthenticationDelayMs,
- metrics, time, "socket-server", metricTags.asJava, false, true,
channelBuilder, MemoryPool.NONE, new LogContext()) {
+ metrics, time, "socket-server", metricTags, false, true, channelBuilder,
MemoryPool.NONE, new LogContext()) {
val failures = mutable.Map[SelectorOperation, Throwable]()
val operationCounts = mutable.Map[SelectorOperation,
Int]().withDefaultValue(0)
diff --git
a/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala
b/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala
index 6a9d8dcd1d4..0839990868f 100644
--- a/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala
@@ -44,7 +44,7 @@ import org.junit.jupiter.api.Test
import java.net.InetAddress
import java.util
-import java.util.{Collections, Properties, UUID}
+import java.util.{Properties, UUID}
import scala.jdk.CollectionConverters._
class AuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
@@ -567,7 +567,7 @@ class AuthorizerTest extends QuorumTestHarness with
BaseAuthorizerTest {
val acl3 = new AclBinding(resource2, new
AccessControlEntry(principal.toString, WILDCARD_HOST, DESCRIBE, ALLOW))
val acl4 = new AclBinding(prefixedResource, new
AccessControlEntry(wildcardPrincipal.toString, WILDCARD_HOST, READ, ALLOW))
- authorizer1.createAcls(requestContext, List(acl1, acl2, acl3, acl4).asJava)
+ authorizer1.createAcls(requestContext, util.List.of(acl1, acl2, acl3,
acl4))
assertEquals(Set(acl1, acl2, acl3, acl4),
authorizer1.acls(AclBindingFilter.ANY).asScala.toSet)
assertEquals(Set(acl1, acl2), authorizer1.acls(new
AclBindingFilter(resource1.toFilter,
AccessControlEntryFilter.ANY)).asScala.toSet)
assertEquals(Set(acl4), authorizer1.acls(new
AclBindingFilter(prefixedResource.toFilter,
AccessControlEntryFilter.ANY)).asScala.toSet)
@@ -632,7 +632,7 @@ class AuthorizerTest extends QuorumTestHarness with
BaseAuthorizerTest {
private def authorize(authorizer: Authorizer, requestContext:
RequestContext, operation: AclOperation, resource: ResourcePattern): Boolean = {
val action = new Action(operation, resource, 1, true, true)
- authorizer.authorize(requestContext, List(action).asJava).asScala.head ==
AuthorizationResult.ALLOWED
+ authorizer.authorize(requestContext, util.List.of(action)).asScala.head ==
AuthorizationResult.ALLOWED
}
private def getAcls(authorizer: Authorizer, resourcePattern:
ResourcePattern): Set[AccessControlEntry] = {
@@ -669,7 +669,7 @@ class AuthorizerTest extends QuorumTestHarness with
BaseAuthorizerTest {
pluginMetrics: PluginMetrics): Unit = {
standardAuthorizer.configure(configs)
standardAuthorizer.withPluginMetrics(pluginMetrics)
- initializeStandardAuthorizer(standardAuthorizer, new
AuthorizerTestServerInfo(Collections.singletonList(PLAINTEXT)))
+ initializeStandardAuthorizer(standardAuthorizer, new
AuthorizerTestServerInfo(util.List.of(PLAINTEXT)))
}
def initializeStandardAuthorizer(standardAuthorizer: StandardAuthorizer,
diff --git a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
index 67df4f1a7e6..336a8dd55c3 100644
--- a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
@@ -485,7 +485,7 @@ class DumpLogSegmentsTest {
new TopicRecord().setName("test-topic").setTopicId(Uuid.randomUuid()),
0.toShort),
new ApiMessageAndVersion(
new PartitionChangeRecord().setTopicId(Uuid.randomUuid()).setLeader(1).
- setPartitionId(0).setIsr(util.Arrays.asList(0, 1, 2)), 0.toShort)
+ setPartitionId(0).setIsr(util.List.of(0, 1, 2)), 0.toShort)
)
val records: Array[SimpleRecord] = metadataRecords.map(message => {
@@ -565,7 +565,7 @@ class DumpLogSegmentsTest {
@Test
def testDumpMetadataSnapshot(): Unit = {
- val metadataRecords = Seq(
+ val metadataRecords = util.List.of(
new ApiMessageAndVersion(
new RegisterBrokerRecord().setBrokerId(0).setBrokerEpoch(10),
0.toShort),
new ApiMessageAndVersion(
@@ -574,7 +574,7 @@ class DumpLogSegmentsTest {
new TopicRecord().setName("test-topic").setTopicId(Uuid.randomUuid()),
0.toShort),
new ApiMessageAndVersion(
new PartitionChangeRecord().setTopicId(Uuid.randomUuid()).setLeader(1).
- setPartitionId(0).setIsr(util.Arrays.asList(0, 1, 2)), 0.toShort)
+ setPartitionId(0).setIsr(util.List.of(0, 1, 2)), 0.toShort)
)
val metadataLog = KafkaMetadataLog(
@@ -603,7 +603,7 @@ class DumpLogSegmentsTest {
.setVoterSet(Optional.of(VoterSetTest.voterSet(VoterSetTest.voterMap(IntStream.of(1,
2, 3), true))))
.build(MetadataRecordSerde.INSTANCE)
) { snapshotWriter =>
- snapshotWriter.append(metadataRecords.asJava)
+ snapshotWriter.append(metadataRecords)
snapshotWriter.freeze()
}
@@ -662,14 +662,14 @@ class DumpLogSegmentsTest {
// Get all the batches
val output = runDumpLogSegments(Array("--files", logFilePath))
- val lines = util.Arrays.asList(output.split("\n"): _*).listIterator()
+ val lines = util.List.of(output.split("\n"): _*).listIterator()
// Get total bytes of the partial batches
val partialBatchesBytes = readPartialBatchesBytes(lines, partialBatches)
// Request only the partial batches by bytes
val partialOutput = runDumpLogSegments(Array("--max-bytes",
partialBatchesBytes.toString, "--files", logFilePath))
- val partialLines = util.Arrays.asList(partialOutput.split("\n"):
_*).listIterator()
+ val partialLines = util.List.of(partialOutput.split("\n"):
_*).listIterator()
// Count the total of partial batches limited by bytes
val partialBatchesCount = countBatches(partialLines)
@@ -922,14 +922,14 @@ class DumpLogSegmentsTest {
.setProducerEpoch(2.toShort)
.setProducerId(12L)
.setTransactionLastUpdateTimestampMs(123L)
- .setTransactionPartitions(List(
+ .setTransactionPartitions(util.List.of(
new TransactionLogValue.PartitionsSchema()
.setTopic("topic1")
- .setPartitionIds(List(0, 1, 2).map(Integer.valueOf).asJava),
+ .setPartitionIds(util.List.of[Integer](0, 1, 2)),
new TransactionLogValue.PartitionsSchema()
.setTopic("topic2")
- .setPartitionIds(List(3, 4, 5).map(Integer.valueOf).asJava)
- ).asJava)
+ .setPartitionIds(util.List.of[Integer](3, 4, 5))
+ ))
.setTransactionStartTimestampMs(13L)
.setTransactionStatus(0)
.setTransactionTimeoutMs(14),
@@ -1024,7 +1024,7 @@ class DumpLogSegmentsTest {
)
val output = runDumpLogSegments(Array("--deep-iteration", "--files",
logFilePath))
- val lines = util.Arrays.asList(output.split("\n"): _*).listIterator()
+ val lines = util.List.of(output.split("\n"): _*).listIterator()
for (batch <- logReadInfo.records.batches.asScala) {
val parsedBatchOpt = readBatchMetadata(lines)
@@ -1101,13 +1101,13 @@ class DumpLogSegmentsTest {
.setStartOffset(0)
.setCreateTimestamp(timestamp)
.setWriteTimestamp(timestamp)
- .setStateBatches(List[ShareSnapshotValue.StateBatch](
+ .setStateBatches(util.List.of[ShareSnapshotValue.StateBatch](
new ShareSnapshotValue.StateBatch()
.setFirstOffset(0)
.setLastOffset(4)
.setDeliveryState(2)
.setDeliveryCount(1)
- ).asJava),
+ )),
0.toShort)
))
)
@@ -1127,13 +1127,13 @@ class DumpLogSegmentsTest {
.setSnapshotEpoch(0)
.setLeaderEpoch(0)
.setStartOffset(0)
- .setStateBatches(List[ShareUpdateValue.StateBatch](
+ .setStateBatches(util.List.of[ShareUpdateValue.StateBatch](
new ShareUpdateValue.StateBatch()
.setFirstOffset(0)
.setLastOffset(4)
.setDeliveryState(2)
.setDeliveryCount(1)
- ).asJava),
+ )),
0.toShort)
))
)
diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
index 42e262a6858..28b132243e7 100644
--- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
@@ -79,7 +79,7 @@ class StorageToolTest {
val tempDir = TestUtils.tempDir()
try {
assertEquals(1, StorageTool.
- infoCommand(new PrintStream(stream), true, Seq(tempDir.toString)))
+ infoCommand(new PrintStream(stream), kraftMode = true,
Seq(tempDir.toString)))
assertEquals(s"""Found log directory:
${tempDir.toString}
@@ -97,7 +97,7 @@ Found problem:
tempDir.delete()
try {
assertEquals(1, StorageTool.
- infoCommand(new PrintStream(stream), true, Seq(tempDir.toString)))
+ infoCommand(new PrintStream(stream), kraftMode = true,
Seq(tempDir.toString)))
assertEquals(s"""Found problem:
${tempDir.toString} does not exist
@@ -111,7 +111,7 @@ Found problem:
val tempFile = TestUtils.tempFile()
try {
assertEquals(1, StorageTool.
- infoCommand(new PrintStream(stream), true, Seq(tempFile.toString)))
+ infoCommand(new PrintStream(stream), kraftMode = true,
Seq(tempFile.toString)))
assertEquals(s"""Found problem:
${tempFile.toString} is not a directory
@@ -125,13 +125,13 @@ Found problem:
val tempDir = TestUtils.tempDir()
try {
Files.write(tempDir.toPath.resolve(MetaPropertiesEnsemble.META_PROPERTIES_NAME),
- String.join("\n", util.Arrays.asList(
+ String.join("\n", util.List.of(
"version=1",
"node.id=1",
"cluster.id=XcZZOzUqS4yHOjhMQB6JLQ")).
getBytes(StandardCharsets.UTF_8))
assertEquals(1, StorageTool.
- infoCommand(new PrintStream(stream), false, Seq(tempDir.toString)))
+ infoCommand(new PrintStream(stream), kraftMode = false,
Seq(tempDir.toString)))
assertEquals(s"""Found log directory:
${tempDir.toString}
@@ -150,13 +150,13 @@ Found problem:
val tempDir = TestUtils.tempDir()
try {
Files.write(tempDir.toPath.resolve(MetaPropertiesEnsemble.META_PROPERTIES_NAME),
- String.join("\n", util.Arrays.asList(
+ String.join("\n", util.List.of(
"version=0",
"broker.id=1",
"cluster.id=26c36907-4158-4a35-919d-6534229f5241")).
getBytes(StandardCharsets.UTF_8))
assertEquals(1, StorageTool.
- infoCommand(new PrintStream(stream), true, Seq(tempDir.toString)))
+ infoCommand(new PrintStream(stream), kraftMode = true,
Seq(tempDir.toString)))
assertEquals(s"""Found log directory:
${tempDir.toString}
@@ -193,7 +193,7 @@ Found problem:
): Int = {
val tempDir = TestUtils.tempDir()
try {
- val configPathString = new File(tempDir.getAbsolutePath(),
"format.props").toString
+ val configPathString = new File(tempDir.getAbsolutePath,
"format.props").toString
PropertiesUtils.writePropertiesFile(properties, configPathString, true)
val arguments = ListBuffer[String]("format",
"--cluster-id", "XcZZOzUqS4yHOjhMQB6JLQ")
@@ -234,7 +234,7 @@ Found problem:
val unavailableDir1 = TestUtils.tempFile()
val properties = new Properties()
properties.putAll(defaultStaticQuorumProperties)
- properties.setProperty("log.dirs", s"${availableDir1},${unavailableDir1}")
+ properties.setProperty("log.dirs", s"$availableDir1,$unavailableDir1")
val stream = new ByteArrayOutputStream()
assertEquals(0, runFormatCommand(stream, properties))
@@ -273,7 +273,7 @@ Found problem:
assertEquals(0, runFormatCommand(stream, properties))
properties.setProperty("log.dirs", availableDirs.mkString(","))
val stream2 = new ByteArrayOutputStream()
- assertEquals(0, runFormatCommand(stream2, properties, Seq(), true))
+ assertEquals(0, runFormatCommand(stream2, properties, Seq(),
ignoreFormatted = true))
}
@Test
@@ -282,7 +282,7 @@ Found problem:
val unavailableDir2 = TestUtils.tempFile()
val properties = new Properties()
properties.putAll(defaultStaticQuorumProperties)
- properties.setProperty("log.dirs",
s"${unavailableDir1},${unavailableDir2}")
+ properties.setProperty("log.dirs", s"$unavailableDir1,$unavailableDir2")
val stream = new ByteArrayOutputStream()
assertEquals("No available log directories to format.",
assertThrows(classOf[FormatterException],
() => runFormatCommand(stream, properties)).getMessage)
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 23eeb7e9926..dd69155097b 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -72,7 +72,7 @@ import java.time.Duration
import java.util
import java.util.concurrent._
import java.util.concurrent.atomic.AtomicBoolean
-import java.util.{Collections, Optional, Properties}
+import java.util.{Optional, Properties}
import scala.collection.mutable.ArrayBuffer
import scala.collection.{Map, Seq, mutable}
import scala.concurrent.duration.FiniteDuration
@@ -339,7 +339,7 @@ object TestUtils extends Logging {
topicConfig.forEach((k, v) => configsMap.put(k.toString, v.toString))
val result = if (replicaAssignment.isEmpty) {
- admin.createTopics(Collections.singletonList(new NewTopic(
+ admin.createTopics(util.List.of(new NewTopic(
topic, numPartitions, replicationFactor.toShort).configs(configsMap)))
} else {
val assignment = new util.HashMap[Integer, util.List[Integer]]()
@@ -348,7 +348,7 @@ object TestUtils extends Logging {
v.foreach(r => replicas.add(r.asInstanceOf[Integer]))
assignment.put(k.asInstanceOf[Integer], replicas)
}
- admin.createTopics(Collections.singletonList(new NewTopic(
+ admin.createTopics(util.List.of(new NewTopic(
topic, assignment).configs(configsMap)))
}
@@ -410,7 +410,7 @@ object TestUtils extends Logging {
topic: String
): TopicDescription = {
val describedTopics = admin.describeTopics(
- Collections.singleton(topic)
+ util.Set.of(topic)
).allTopicNames().get()
describedTopics.get(topic)
}
@@ -466,7 +466,7 @@ object TestUtils extends Logging {
controllers: Seq[ControllerServer]
): Unit = {
try {
- admin.deleteTopics(Collections.singletonList(topic)).all().get()
+ admin.deleteTopics(util.List.of(topic)).all().get()
} catch {
case e: ExecutionException if e.getCause != null &&
e.getCause.isInstanceOf[UnknownTopicOrPartitionException] =>
@@ -589,7 +589,7 @@ object TestUtils extends Logging {
newLeaderOpt: Option[Int] = None
): Int = {
def getPartitionLeader(topic: String, partition: Int): Option[Int] = {
-
admin.describeTopics(Collections.singletonList(topic)).allTopicNames().get().get(topic).partitions().asScala.
+
admin.describeTopics(util.List.of(topic)).allTopicNames().get().get(topic).partitions().asScala.
find(_.partition() == partition).
flatMap { p =>
if (p.leader().id() == Node.noNode().id()) {
@@ -821,7 +821,7 @@ object TestUtils extends Logging {
waitUntilTrue(
() => brokers.forall { broker =>
if (expectedNumPartitions == 0) {
- broker.metadataCache.numPartitions(topic).isEmpty()
+ broker.metadataCache.numPartitions(topic).isEmpty
} else {
broker.metadataCache.numPartitions(topic).orElse(null) ==
expectedNumPartitions
}
@@ -1098,7 +1098,7 @@ object TestUtils extends Logging {
waitUntilTrue(() => brokers.forall(broker =>
broker.config.logDirs.stream().allMatch { logDir =>
topicPartitions.forall { tp =>
- !util.Arrays.asList(new File(logDir).list()).asScala.exists {
partitionDirectoryNames =>
+ !util.List.of(new File(logDir).list()).asScala.exists {
partitionDirectoryNames =>
partitionDirectoryNames.exists { directoryName =>
directoryName.startsWith(tp.topic + "-" + tp.partition) &&
directoryName.endsWith(UnifiedLog.DELETE_DIR_SUFFIX)
@@ -1155,7 +1155,7 @@ object TestUtils extends Logging {
securityProtocol = securityProtocol,
trustStoreFile = trustStoreFile)
try {
- consumer.subscribe(Collections.singleton(topic))
+ consumer.subscribe(util.Set.of(topic))
consumeRecords(consumer, numMessages, waitTime)
} finally consumer.close()
}
@@ -1253,7 +1253,7 @@ object TestUtils extends Logging {
else
abortedValue
}
- new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, key, value,
Collections.singleton(header))
+ new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, key, value,
util.Set.of(header))
}
def producerRecordWithExpectedTransactionStatus(topic: String, partition:
Integer, key: String, value: String, willBeCommitted: Boolean):
ProducerRecord[Array[Byte], Array[Byte]] = {
@@ -1276,7 +1276,7 @@ object TestUtils extends Logging {
if (committed.contains(topicPartition))
consumer.seek(topicPartition, committed(topicPartition))
else
- consumer.seekToBeginning(Collections.singletonList(topicPartition))
+ consumer.seekToBeginning(util.List.of(topicPartition))
}
}
@@ -1293,7 +1293,7 @@ object TestUtils extends Logging {
(resource, configEntries)
}.toMap.asJava
} else {
- Map(new ConfigResource(ConfigResource.Type.BROKER, "") ->
configEntries).asJava
+ util.Map.of(new ConfigResource(ConfigResource.Type.BROKER, ""),
configEntries)
}
adminClient.incrementalAlterConfigs(configs)
}
@@ -1322,7 +1322,7 @@ object TestUtils extends Logging {
val partitionId = topicPartition.partition
def currentLeader: Try[Option[Int]] = Try {
- val topicDescription =
client.describeTopics(List(topic).asJava).allTopicNames.get.get(topic)
+ val topicDescription =
client.describeTopics(util.List.of(topic)).allTopicNames.get.get(topic)
topicDescription.partitions.asScala
.find(_.partition == partitionId)
.flatMap(partitionState => Option(partitionState.leader))
@@ -1356,7 +1356,7 @@ object TestUtils extends Logging {
}
def currentIsr(admin: Admin, partition: TopicPartition): Set[Int] = {
- val description = admin.describeTopics(Set(partition.topic).asJava)
+ val description = admin.describeTopics(util.Set.of(partition.topic))
.allTopicNames
.get
.asScala