kowshik commented on a change in pull request #10280: URL: https://github.com/apache/kafka/pull/10280#discussion_r643449279
########## File path: core/src/main/scala/kafka/log/Log.scala ########## @@ -1852,65 +1612,24 @@ class Log(@volatile private var _dir: File, logString.toString } - /** - * This method deletes the given log segments by doing the following for each of them: - * <ol> - * <li>It removes the segment from the segment map so that it will no longer be used for reads. - * <li>It renames the index and log files by appending .deleted to the respective file name - * <li>It can either schedule an asynchronous delete operation to occur in the future or perform the deletion synchronously - * </ol> - * Asynchronous deletion allows reads to happen concurrently without synchronization and without the possibility of - * physically deleting a file while it is being read. - * - * This method does not need to convert IOException to KafkaStorageException because it is either called before all logs are loaded - * or the immediate caller will catch and handle IOException - * - * @param segments The log segments to schedule for deletion - * @param asyncDelete Whether the segment files should be deleted asynchronously - */ - private def removeAndDeleteSegments(segments: Iterable[LogSegment], - asyncDelete: Boolean, - reason: SegmentDeletionReason): Unit = { - if (segments.nonEmpty) { - lock synchronized { - // As most callers hold an iterator into the `segments` collection and `removeAndDeleteSegment` mutates it by - // removing the deleted segment, we should force materialization of the iterator here, so that results of the - // iteration remain valid and deterministic. - val toDelete = segments.toList - reason.logReason(this, toDelete) - toDelete.foreach { segment => - this.segments.remove(segment.baseOffset) - } - deleteSegmentFiles(toDelete, asyncDelete) - } - } - } - - private def deleteSegmentFiles(segments: Iterable[LogSegment], asyncDelete: Boolean, deleteProducerStateSnapshots: Boolean = true): Unit = { - Log.deleteSegmentFiles(segments, asyncDelete, deleteProducerStateSnapshots, dir, topicPartition, - config, scheduler, logDirFailureChannel, producerStateManager, this.logIdent) - } - private[log] def replaceSegments(newSegments: Seq[LogSegment], oldSegments: Seq[LogSegment], isRecoveredSwapFile: Boolean = false): Unit = { Review comment: Great catch, I'll fix this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org