FrankYang0529 commented on code in PR #19579:
URL: https://github.com/apache/kafka/pull/19579#discussion_r2063791648
##########
core/src/main/scala/kafka/server/BrokerServer.scala:
##########
@@ -709,7 +709,7 @@ class BrokerServer(
None
}
- val rlm = new RemoteLogManager(config.remoteLogManagerConfig,
config.brokerId, config.logDirs.head, clusterId, time,
+ val rlm = new RemoteLogManager(config.remoteLogManagerConfig,
config.brokerId, config.logDirs.asScala.head, clusterId, time,
Review Comment:
It looks like we can use `getFirst` to avoid redundant conversion.
```suggestion
val rlm = new RemoteLogManager(config.remoteLogManagerConfig,
config.brokerId, config.logDirs.getFirst, clusterId, time,
```
##########
core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala:
##########
@@ -1551,7 +1551,7 @@ class KRaftClusterTest {
// Copy foo-0 to targetParentDir
// This is so that we can rename the main replica to a future down
below
val parentDir = log.parentDir
- val targetParentDir = broker0.config.logDirs.filter(_ !=
parentDir).head
+ val targetParentDir = broker0.config.logDirs.asScala.filter(_ !=
parentDir).head
val targetDirFile = new File(targetParentDir, log.dir.getName)
Review Comment:
```suggestion
val targetParentDir = broker0.config.logDirs.stream().filter(l =>
!l.equals(parentDir)).findFirst()
assertTrue(targetParentDir.isPresent)
val targetDirFile = new File(targetParentDir.get(), log.dir.getName)
```
##########
core/src/main/scala/kafka/server/KafkaConfig.scala:
##########
@@ -241,7 +241,7 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
def metadataLogDir: String = {
Option(getString(MetadataLogConfig.METADATA_LOG_DIR_CONFIG)) match {
case Some(dir) => dir
- case None => logDirs.head
+ case None => logDirs.asScala.head
Review Comment:
ditto
##########
core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala:
##########
@@ -91,10 +91,10 @@ class AlterReplicaLogDirsRequestTest extends
BaseRequestTest {
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testAlterReplicaLogDirsRequestErrorCode(quorum: String): Unit = {
- val offlineDir = new
File(brokers.head.config.logDirs.tail.head).getAbsolutePath
- val validDir1 = new File(brokers.head.config.logDirs(1)).getAbsolutePath
- val validDir2 = new File(brokers.head.config.logDirs(2)).getAbsolutePath
- val validDir3 = new File(brokers.head.config.logDirs(3)).getAbsolutePath
+ val offlineDir = new
File(brokers.head.config.logDirs.asScala.tail.head).getAbsolutePath
Review Comment:
```suggestion
val offlineDir = new
File(brokers.head.config.logDirs.getLast).getAbsolutePath
```
##########
core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala:
##########
@@ -42,8 +42,8 @@ class DescribeLogDirsRequestTest extends BaseRequestTest {
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testDescribeLogDirsRequest(quorum: String): Unit = {
- val onlineDir = new File(brokers.head.config.logDirs.head).getAbsolutePath
- val offlineDir = new
File(brokers.head.config.logDirs.tail.head).getAbsolutePath
+ val onlineDir = new
File(brokers.head.config.logDirs.asScala.head).getAbsolutePath
+ val offlineDir = new
File(brokers.head.config.logDirs.asScala.tail.head).getAbsolutePath
Review Comment:
```suggestion
val onlineDir = new
File(brokers.head.config.logDirs.getFirst).getAbsolutePath
val offlineDir = new
File(brokers.head.config.logDirs.getLast).getAbsolutePath
```
##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1237,7 +1237,7 @@ class ReplicaManager(val config: KafkaConfig,
def describeLogDirs(partitions: Set[TopicPartition]):
List[DescribeLogDirsResponseData.DescribeLogDirsResult] = {
val logsByDir = logManager.allLogs.groupBy(log => log.parentDir)
- config.logDirs.toSet.map { logDir: String =>
+ config.logDirs.asScala.toSet.map { logDir: String =>
Review Comment:
Can we change `describeLogDirs` to return `java.util.List`? So we don't need
to convert `logDirs` to Scala and convert result back to Java again in
`KafkaApis`.
##########
core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala:
##########
@@ -1500,7 +1500,7 @@ class KafkaConfigTest {
val config = KafkaConfig.fromProps(props)
assertEquals(metadataDir, config.metadataLogDir)
- assertEquals(Seq(dataDir), config.logDirs)
+ assertEquals(Seq(dataDir), config.logDirs.asScala)
Review Comment:
```suggestion
assertEquals(util.List.of(dataDir), config.logDirs)
```
##########
core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala:
##########
@@ -1165,7 +1165,7 @@ class KafkaConfigTest {
assertEquals(1, config.brokerId)
assertEquals(Seq("PLAINTEXT://127.0.0.1:1122"),
config.effectiveAdvertisedBrokerListeners.map(JTestUtils.endpointToString))
assertEquals(Map("127.0.0.1" -> 2, "127.0.0.2" -> 3),
config.maxConnectionsPerIpOverrides)
- assertEquals(List("/tmp1", "/tmp2"), config.logDirs)
+ assertEquals(List("/tmp1", "/tmp2"), config.logDirs.asScala.toList)
Review Comment:
```suggestion
assertEquals(util.List.of("/tmp1", "/tmp2"), config.logDirs)
```
##########
core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala:
##########
@@ -1518,7 +1518,7 @@ class KafkaConfigTest {
val config = KafkaConfig.fromProps(props)
assertEquals(dataDir1, config.metadataLogDir)
- assertEquals(Seq(dataDir1, dataDir2), config.logDirs)
+ assertEquals(Seq(dataDir1, dataDir2), config.logDirs.asScala)
Review Comment:
```suggestion
assertEquals(util.List.of(dataDir1, dataDir2), config.logDirs)
```
##########
core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala:
##########
@@ -62,8 +64,8 @@ class LogRecoveryTest extends QuorumTestHarness {
var admin: Admin = _
var producer: KafkaProducer[Integer, String] = _
- def hwFile1 = new OffsetCheckpointFile(new File(configProps1.logDirs.head,
ReplicaManager.HighWatermarkFilename), null)
- def hwFile2 = new OffsetCheckpointFile(new File(configProps2.logDirs.head,
ReplicaManager.HighWatermarkFilename), null)
+ def hwFile1 = new OffsetCheckpointFile(new
File(configProps1.logDirs.asScala.head, ReplicaManager.HighWatermarkFilename),
null)
+ def hwFile2 = new OffsetCheckpointFile(new
File(configProps2.logDirs.asScala.head, ReplicaManager.HighWatermarkFilename),
null)
Review Comment:
```suggestion
def hwFile1 = new OffsetCheckpointFile(new
File(configProps1.logDirs.getFirst, ReplicaManager.HighWatermarkFilename), null)
def hwFile2 = new OffsetCheckpointFile(new
File(configProps2.logDirs.getFirst, ReplicaManager.HighWatermarkFilename), null)
```
##########
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java:
##########
@@ -115,8 +115,7 @@ public void setup() {
this.time = Time.SYSTEM;
this.failureChannel = new
LogDirFailureChannel(brokerProperties.logDirs().size());
final BrokerTopicStats brokerTopicStats = new BrokerTopicStats(false);
- final List<File> files =
-
CollectionConverters.asJava(brokerProperties.logDirs()).stream().map(File::new).collect(Collectors.toList());
+ final List<File> files =
brokerProperties.logDirs().stream().map(File::new).collect(Collectors.toList());
Review Comment:
```suggestion
final List<File> files =
brokerProperties.logDirs().stream().map(File::new).toList();
```
##########
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java:
##########
@@ -108,8 +108,7 @@ public void setup() {
this.metrics = new Metrics();
this.time = new MockTime();
this.failureChannel = new
LogDirFailureChannel(brokerProperties.logDirs().size());
- final List<File> files =
-
CollectionConverters.asJava(brokerProperties.logDirs()).stream().map(File::new).collect(Collectors.toList());
+ final List<File> files =
brokerProperties.logDirs().stream().map(File::new).collect(Collectors.toList());
Review Comment:
```suggestion
final List<File> files =
brokerProperties.logDirs().stream().map(File::new).toList();
```
##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -2771,7 +2771,7 @@ class ReplicaManagerTest {
props.asScala ++= extraProps.asScala
val config = KafkaConfig.fromProps(props)
val logConfig = new LogConfig(new Properties)
- val logDir = new File(new File(config.logDirs.head),
s"$topic-$topicPartition")
+ val logDir = new File(new File(config.logDirs.asScala.head),
s"$topic-$topicPartition")
Review Comment:
```suggestion
val logDir = new File(new File(config.logDirs.getFirst),
s"$topic-$topicPartition")
```
##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -214,7 +214,7 @@ class ReplicaManagerTest {
partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava), None)
rm.checkpointHighWatermarks()
- config.logDirs.map(s => Paths.get(s,
ReplicaManager.HighWatermarkFilename))
+ config.logDirs.asScala.map(s => Paths.get(s,
ReplicaManager.HighWatermarkFilename))
.foreach(checkpointFile => assertTrue(Files.exists(checkpointFile),
Review Comment:
```suggestion
config.logDirs.stream().map(s => Paths.get(s,
ReplicaManager.HighWatermarkFilename))
.forEach(checkpointFile => assertTrue(Files.exists(checkpointFile),
```
##########
core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala:
##########
@@ -195,7 +197,7 @@ class HighwatermarkPersistenceTest {
}
private def hwmFor(replicaManager: ReplicaManager, topic: String, partition:
Int): Long = {
- replicaManager.highWatermarkCheckpoints(new
File(replicaManager.config.logDirs.head).getAbsolutePath).read().getOrDefault(
+ replicaManager.highWatermarkCheckpoints(new
File(replicaManager.config.logDirs.asScala.head).getAbsolutePath).read().getOrDefault(
Review Comment:
```suggestion
replicaManager.highWatermarkCheckpoints(new
File(replicaManager.config.logDirs.getFirst).getAbsolutePath).read().getOrDefault(
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]