ijuma commented on code in PR #14034: URL: https://github.com/apache/kafka/pull/14034#discussion_r1448871794
########## core/src/main/scala/kafka/log/LogLoader.scala: ########## @@ -489,16 +488,16 @@ class LogLoader( * * @param segmentsToDelete The log segments to schedule for deletion */ - private def removeAndDeleteSegmentsAsync(segmentsToDelete: Iterable[LogSegment]): Unit = { - if (segmentsToDelete.nonEmpty) { + private def removeAndDeleteSegmentsAsync(segmentsToDelete: java.util.Collection[LogSegment]): Unit = { + if (!segmentsToDelete.isEmpty) { // Most callers hold an iterator into the `params.segments` collection and // `removeAndDeleteSegmentAsync` mutates it by removing the deleted segment. Therefore, // we should force materialization of the iterator here, so that results of the iteration // remain valid and deterministic. We should also pass only the materialized view of the // iterator to the logic that deletes the segments. - val toDelete = segmentsToDelete.toList - info(s"Deleting segments as part of log recovery: ${toDelete.mkString(",")}") - toDelete.foreach { segment => + val toDelete = new util.ArrayList[LogSegment](segmentsToDelete) + info(s"Deleting segments as part of log recovery: ${LocalLog.mkString(toDelete.iterator(), ",")}") Review Comment: Can you not use `Utils.join` instead of creating a new implementation here? ########## core/src/main/scala/kafka/log/UnifiedLog.scala: ########## @@ -2246,30 +2251,31 @@ object UnifiedLog extends Logging { config: LogConfig, scheduler: Scheduler, logDirFailureChannel: LogDirFailureChannel, - logPrefix: String): SplitSegmentResult = { - LocalLog.splitOverflowedSegment(segment, existingSegments, dir, topicPartition, config, scheduler, logDirFailureChannel, logPrefix) + logger: Logger): SplitSegmentResult = { + LocalLog.splitOverflowedSegment(segment, existingSegments, dir, topicPartition, config, scheduler, logDirFailureChannel, logger) } - private[log] def deleteProducerSnapshots(segments: Iterable[LogSegment], + private[log] def deleteProducerSnapshots(segments: util.Collection[LogSegment], producerStateManager: ProducerStateManager, asyncDelete: Boolean, scheduler: Scheduler, config: LogConfig, logDirFailureChannel: LogDirFailureChannel, parentDir: String, topicPartition: TopicPartition): Unit = { - val snapshotsToDelete = segments.flatMap { segment => - producerStateManager.removeAndMarkSnapshotForDeletion(segment.baseOffset).asScala - } + val snapshotsToDelete = segments.stream().flatMap { segment => + val snapshotOptional = producerStateManager.removeAndMarkSnapshotForDeletion(segment.baseOffset) + if (snapshotOptional.isPresent) util.stream.Stream.of[SnapshotFile](snapshotOptional.get) else util.stream.Stream.empty[SnapshotFile] + }.collect(Collectors.toList[SnapshotFile]) def deleteProducerSnapshots(): Unit = { - LocalLog.maybeHandleIOException(logDirFailureChannel, - parentDir, - s"Error while deleting producer state snapshots for $topicPartition in dir $parentDir") { - snapshotsToDelete.foreach { snapshot => - snapshot.deleteIfExists() - } - } + LocalLog.maybeHandleIOException(logDirFailureChannel, parentDir, + () => s"Error while deleting producer state snapshots for $topicPartition in dir $parentDir", + () => { Review Comment: I don't think you need the block since there is only one expression inside it. ########## core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala: ########## @@ -3971,10 +3973,10 @@ class UnifiedLogTest { )) log.appendAsLeader(records, leaderEpoch = 0) log.maybeIncrementHighWatermark(log.logEndOffsetMetadata) - val deletable = log.deletableSegments((_: LogSegment, _: Option[LogSegment]) => true) - val expected = log.logSegments.asScala.toList - assertEquals(10, expected.length) - assertEquals(expected, deletable.toList) + val deletable = new util.ArrayList(log.deletableSegments((_: LogSegment, _: Optional[LogSegment]) => true)) + val expected = new util.ArrayList(log.logSegments) Review Comment: Are these copies needed? ########## core/src/test/scala/unit/kafka/log/LocalLogTest.scala: ########## @@ -362,8 +363,8 @@ class LocalLogTest { } assertEquals(5, log.segments.numberOfSegments) assertNotEquals(10L, log.segments.activeSegment.baseOffset) - val expected = log.segments.values.asScala.toVector - val deleted = log.truncateFullyAndStartAt(10L) + val expected = new util.ArrayList(log.segments.values) + val deleted = new util.ArrayList(log.truncateFullyAndStartAt(10L)) Review Comment: Why is this needed? ########## core/src/main/scala/kafka/log/LogCleaner.scala: ########## @@ -1245,8 +1246,8 @@ private[log] class CleanedTransactionMetadata { * * @param abortedTransactions The new found aborted transactions to add */ - def addAbortedTransactions(abortedTransactions: List[AbortedTxn]): Unit = { - this.abortedTransactions ++= abortedTransactions + def addAbortedTransactions(abortedTransactions: util.List[AbortedTxn]): Unit = { + abortedTransactions.forEach(abortedTxn => this.abortedTransactions += abortedTxn) Review Comment: Can this be `this.abortedTransactions ++= abortedTransactions.asScala`? Adding in bulk is usually more efficient. ########## core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala: ########## @@ -4015,7 +4017,7 @@ class UnifiedLogTest { true }) assertEquals(10L, log.logSegments.size()) - assertEquals(log.nonActiveLogSegmentsFrom(0L).asScala.toSeq, deletableSegments.toSeq) + assertEquals(new util.ArrayList(log.nonActiveLogSegmentsFrom(0L)), deletableSegments) Review Comment: Is this copy needed? ########## core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala: ########## @@ -3951,18 +3953,18 @@ class UnifiedLogTest { assertEquals(10, log.logSegments.size()) { - val deletable = log.deletableSegments( - (segment: LogSegment, _: Option[LogSegment]) => segment.baseOffset <= 5) - val expected = log.nonActiveLogSegmentsFrom(0L).asScala.filter(segment => segment.baseOffset <= 5).toList - assertEquals(6, expected.length) - assertEquals(expected, deletable.toList) + val deletable = new util.ArrayList( + log.deletableSegments((segment: LogSegment, _: Optional[LogSegment]) => segment.baseOffset <= 5)) + val expected = log.nonActiveLogSegmentsFrom(0L).stream().filter(segment => segment.baseOffset <= 5).collect(Collectors.toList()) + assertEquals(6, expected.size) + assertEquals(expected, deletable) } { - val deletable = log.deletableSegments((_: LogSegment, _: Option[LogSegment]) => true) - val expected = log.nonActiveLogSegmentsFrom(0L).asScala.toList - assertEquals(9, expected.length) - assertEquals(expected, deletable.toList) + val deletable = new util.ArrayList(log.deletableSegments((_: LogSegment, _: Optional[LogSegment]) => true)) + val expected = new util.ArrayList(log.nonActiveLogSegmentsFrom(0L)) Review Comment: Are these copies needed? ########## core/src/main/scala/kafka/log/UnifiedLog.scala: ########## @@ -2246,30 +2251,31 @@ object UnifiedLog extends Logging { config: LogConfig, scheduler: Scheduler, logDirFailureChannel: LogDirFailureChannel, - logPrefix: String): SplitSegmentResult = { - LocalLog.splitOverflowedSegment(segment, existingSegments, dir, topicPartition, config, scheduler, logDirFailureChannel, logPrefix) + logger: Logger): SplitSegmentResult = { + LocalLog.splitOverflowedSegment(segment, existingSegments, dir, topicPartition, config, scheduler, logDirFailureChannel, logger) } - private[log] def deleteProducerSnapshots(segments: Iterable[LogSegment], + private[log] def deleteProducerSnapshots(segments: util.Collection[LogSegment], producerStateManager: ProducerStateManager, asyncDelete: Boolean, scheduler: Scheduler, config: LogConfig, logDirFailureChannel: LogDirFailureChannel, parentDir: String, topicPartition: TopicPartition): Unit = { - val snapshotsToDelete = segments.flatMap { segment => - producerStateManager.removeAndMarkSnapshotForDeletion(segment.baseOffset).asScala - } + val snapshotsToDelete = segments.stream().flatMap { segment => + val snapshotOptional = producerStateManager.removeAndMarkSnapshotForDeletion(segment.baseOffset) + if (snapshotOptional.isPresent) util.stream.Stream.of[SnapshotFile](snapshotOptional.get) else util.stream.Stream.empty[SnapshotFile] Review Comment: Nit: does `snapshotOptional.map(Stream::of).orElseGet(Stream::empty);` work? Also, it can probably be inlined (i.e. no need for the variable `snapshotOptional`. ########## core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala: ########## @@ -3951,18 +3953,18 @@ class UnifiedLogTest { assertEquals(10, log.logSegments.size()) { - val deletable = log.deletableSegments( - (segment: LogSegment, _: Option[LogSegment]) => segment.baseOffset <= 5) - val expected = log.nonActiveLogSegmentsFrom(0L).asScala.filter(segment => segment.baseOffset <= 5).toList - assertEquals(6, expected.length) - assertEquals(expected, deletable.toList) + val deletable = new util.ArrayList( Review Comment: Do we actually need this copy? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org