ijuma commented on code in PR #14034:
URL: https://github.com/apache/kafka/pull/14034#discussion_r1448871794


##########
core/src/main/scala/kafka/log/LogLoader.scala:
##########
@@ -489,16 +488,16 @@ class LogLoader(
    *
    * @param segmentsToDelete The log segments to schedule for deletion
    */
-  private def removeAndDeleteSegmentsAsync(segmentsToDelete: 
Iterable[LogSegment]): Unit = {
-    if (segmentsToDelete.nonEmpty) {
+  private def removeAndDeleteSegmentsAsync(segmentsToDelete: 
java.util.Collection[LogSegment]): Unit = {
+    if (!segmentsToDelete.isEmpty) {
       // Most callers hold an iterator into the `params.segments` collection 
and
       // `removeAndDeleteSegmentAsync` mutates it by removing the deleted 
segment. Therefore,
       // we should force materialization of the iterator here, so that results 
of the iteration
       // remain valid and deterministic. We should also pass only the 
materialized view of the
       // iterator to the logic that deletes the segments.
-      val toDelete = segmentsToDelete.toList
-      info(s"Deleting segments as part of log recovery: 
${toDelete.mkString(",")}")
-      toDelete.foreach { segment =>
+      val toDelete = new util.ArrayList[LogSegment](segmentsToDelete)
+      info(s"Deleting segments as part of log recovery: 
${LocalLog.mkString(toDelete.iterator(), ",")}")

Review Comment:
   Can you not use `Utils.join` instead of creating a new implementation here?



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -2246,30 +2251,31 @@ object UnifiedLog extends Logging {
                                           config: LogConfig,
                                           scheduler: Scheduler,
                                           logDirFailureChannel: 
LogDirFailureChannel,
-                                          logPrefix: String): 
SplitSegmentResult = {
-    LocalLog.splitOverflowedSegment(segment, existingSegments, dir, 
topicPartition, config, scheduler, logDirFailureChannel, logPrefix)
+                                          logger: Logger): SplitSegmentResult 
= {
+    LocalLog.splitOverflowedSegment(segment, existingSegments, dir, 
topicPartition, config, scheduler, logDirFailureChannel, logger)
   }
 
-  private[log] def deleteProducerSnapshots(segments: Iterable[LogSegment],
+  private[log] def deleteProducerSnapshots(segments: 
util.Collection[LogSegment],
                                            producerStateManager: 
ProducerStateManager,
                                            asyncDelete: Boolean,
                                            scheduler: Scheduler,
                                            config: LogConfig,
                                            logDirFailureChannel: 
LogDirFailureChannel,
                                            parentDir: String,
                                            topicPartition: TopicPartition): 
Unit = {
-    val snapshotsToDelete = segments.flatMap { segment =>
-      
producerStateManager.removeAndMarkSnapshotForDeletion(segment.baseOffset).asScala
-    }
+    val snapshotsToDelete = segments.stream().flatMap { segment =>
+      val snapshotOptional = 
producerStateManager.removeAndMarkSnapshotForDeletion(segment.baseOffset)
+      if (snapshotOptional.isPresent) 
util.stream.Stream.of[SnapshotFile](snapshotOptional.get) else 
util.stream.Stream.empty[SnapshotFile]
+    }.collect(Collectors.toList[SnapshotFile])
 
     def deleteProducerSnapshots(): Unit = {
-      LocalLog.maybeHandleIOException(logDirFailureChannel,
-        parentDir,
-        s"Error while deleting producer state snapshots for $topicPartition in 
dir $parentDir") {
-        snapshotsToDelete.foreach { snapshot =>
-          snapshot.deleteIfExists()
-        }
-      }
+      LocalLog.maybeHandleIOException(logDirFailureChannel, parentDir,
+        () => s"Error while deleting producer state snapshots for 
$topicPartition in dir $parentDir",
+        () => {

Review Comment:
   I don't think you need the block since there is only one expression inside 
it.



##########
core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala:
##########
@@ -3971,10 +3973,10 @@ class UnifiedLogTest {
       ))
       log.appendAsLeader(records, leaderEpoch = 0)
       log.maybeIncrementHighWatermark(log.logEndOffsetMetadata)
-      val deletable = log.deletableSegments((_: LogSegment, _: 
Option[LogSegment]) => true)
-      val expected = log.logSegments.asScala.toList
-      assertEquals(10, expected.length)
-      assertEquals(expected, deletable.toList)
+      val deletable = new util.ArrayList(log.deletableSegments((_: LogSegment, 
_: Optional[LogSegment]) => true))
+      val expected = new util.ArrayList(log.logSegments)

Review Comment:
   Are these copies needed?



##########
core/src/test/scala/unit/kafka/log/LocalLogTest.scala:
##########
@@ -362,8 +363,8 @@ class LocalLogTest {
     }
     assertEquals(5, log.segments.numberOfSegments)
     assertNotEquals(10L, log.segments.activeSegment.baseOffset)
-    val expected = log.segments.values.asScala.toVector
-    val deleted = log.truncateFullyAndStartAt(10L)
+    val expected = new util.ArrayList(log.segments.values)
+    val deleted = new util.ArrayList(log.truncateFullyAndStartAt(10L))

Review Comment:
   Why is this needed?



##########
core/src/main/scala/kafka/log/LogCleaner.scala:
##########
@@ -1245,8 +1246,8 @@ private[log] class CleanedTransactionMetadata {
    *
    * @param abortedTransactions The new found aborted transactions to add
    */
-  def addAbortedTransactions(abortedTransactions: List[AbortedTxn]): Unit = {
-    this.abortedTransactions ++= abortedTransactions
+  def addAbortedTransactions(abortedTransactions: util.List[AbortedTxn]): Unit 
= {
+    abortedTransactions.forEach(abortedTxn => this.abortedTransactions += 
abortedTxn)

Review Comment:
   Can this be `this.abortedTransactions ++= abortedTransactions.asScala`? 
Adding in bulk is usually more efficient.



##########
core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala:
##########
@@ -4015,7 +4017,7 @@ class UnifiedLogTest {
         true
       })
     assertEquals(10L, log.logSegments.size())
-    assertEquals(log.nonActiveLogSegmentsFrom(0L).asScala.toSeq, 
deletableSegments.toSeq)
+    assertEquals(new util.ArrayList(log.nonActiveLogSegmentsFrom(0L)), 
deletableSegments)

Review Comment:
   Is this copy needed?



##########
core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala:
##########
@@ -3951,18 +3953,18 @@ class UnifiedLogTest {
     assertEquals(10, log.logSegments.size())
 
     {
-      val deletable = log.deletableSegments(
-        (segment: LogSegment, _: Option[LogSegment]) => segment.baseOffset <= 
5)
-      val expected = log.nonActiveLogSegmentsFrom(0L).asScala.filter(segment 
=> segment.baseOffset <= 5).toList
-      assertEquals(6, expected.length)
-      assertEquals(expected, deletable.toList)
+      val deletable = new util.ArrayList(
+        log.deletableSegments((segment: LogSegment, _: Optional[LogSegment]) 
=> segment.baseOffset <= 5))
+      val expected = log.nonActiveLogSegmentsFrom(0L).stream().filter(segment 
=> segment.baseOffset <= 5).collect(Collectors.toList())
+      assertEquals(6, expected.size)
+      assertEquals(expected, deletable)
     }
 
     {
-      val deletable = log.deletableSegments((_: LogSegment, _: 
Option[LogSegment]) => true)
-      val expected = log.nonActiveLogSegmentsFrom(0L).asScala.toList
-      assertEquals(9, expected.length)
-      assertEquals(expected, deletable.toList)
+      val deletable = new util.ArrayList(log.deletableSegments((_: LogSegment, 
_: Optional[LogSegment]) => true))
+      val expected = new util.ArrayList(log.nonActiveLogSegmentsFrom(0L))

Review Comment:
   Are these copies needed?



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -2246,30 +2251,31 @@ object UnifiedLog extends Logging {
                                           config: LogConfig,
                                           scheduler: Scheduler,
                                           logDirFailureChannel: 
LogDirFailureChannel,
-                                          logPrefix: String): 
SplitSegmentResult = {
-    LocalLog.splitOverflowedSegment(segment, existingSegments, dir, 
topicPartition, config, scheduler, logDirFailureChannel, logPrefix)
+                                          logger: Logger): SplitSegmentResult 
= {
+    LocalLog.splitOverflowedSegment(segment, existingSegments, dir, 
topicPartition, config, scheduler, logDirFailureChannel, logger)
   }
 
-  private[log] def deleteProducerSnapshots(segments: Iterable[LogSegment],
+  private[log] def deleteProducerSnapshots(segments: 
util.Collection[LogSegment],
                                            producerStateManager: 
ProducerStateManager,
                                            asyncDelete: Boolean,
                                            scheduler: Scheduler,
                                            config: LogConfig,
                                            logDirFailureChannel: 
LogDirFailureChannel,
                                            parentDir: String,
                                            topicPartition: TopicPartition): 
Unit = {
-    val snapshotsToDelete = segments.flatMap { segment =>
-      
producerStateManager.removeAndMarkSnapshotForDeletion(segment.baseOffset).asScala
-    }
+    val snapshotsToDelete = segments.stream().flatMap { segment =>
+      val snapshotOptional = 
producerStateManager.removeAndMarkSnapshotForDeletion(segment.baseOffset)
+      if (snapshotOptional.isPresent) 
util.stream.Stream.of[SnapshotFile](snapshotOptional.get) else 
util.stream.Stream.empty[SnapshotFile]

Review Comment:
   Nit: does `snapshotOptional.map(Stream::of).orElseGet(Stream::empty);` work? 
Also, it can probably be inlined (i.e. no need for the variable 
`snapshotOptional`.



##########
core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala:
##########
@@ -3951,18 +3953,18 @@ class UnifiedLogTest {
     assertEquals(10, log.logSegments.size())
 
     {
-      val deletable = log.deletableSegments(
-        (segment: LogSegment, _: Option[LogSegment]) => segment.baseOffset <= 
5)
-      val expected = log.nonActiveLogSegmentsFrom(0L).asScala.filter(segment 
=> segment.baseOffset <= 5).toList
-      assertEquals(6, expected.length)
-      assertEquals(expected, deletable.toList)
+      val deletable = new util.ArrayList(

Review Comment:
   Do we actually need this copy?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to