kowshik commented on a change in pull request #10280:
URL: https://github.com/apache/kafka/pull/10280#discussion_r643449279



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1852,65 +1612,24 @@ class Log(@volatile private var _dir: File,
     logString.toString
   }
 
-  /**
-   * This method deletes the given log segments by doing the following for 
each of them:
-   * <ol>
-   *   <li>It removes the segment from the segment map so that it will no 
longer be used for reads.
-   *   <li>It renames the index and log files by appending .deleted to the 
respective file name
-   *   <li>It can either schedule an asynchronous delete operation to occur in 
the future or perform the deletion synchronously
-   * </ol>
-   * Asynchronous deletion allows reads to happen concurrently without 
synchronization and without the possibility of
-   * physically deleting a file while it is being read.
-   *
-   * This method does not need to convert IOException to KafkaStorageException 
because it is either called before all logs are loaded
-   * or the immediate caller will catch and handle IOException
-   *
-   * @param segments The log segments to schedule for deletion
-   * @param asyncDelete Whether the segment files should be deleted 
asynchronously
-   */
-  private def removeAndDeleteSegments(segments: Iterable[LogSegment],
-                                      asyncDelete: Boolean,
-                                      reason: SegmentDeletionReason): Unit = {
-    if (segments.nonEmpty) {
-      lock synchronized {
-        // As most callers hold an iterator into the `segments` collection and 
`removeAndDeleteSegment` mutates it by
-        // removing the deleted segment, we should force materialization of 
the iterator here, so that results of the
-        // iteration remain valid and deterministic.
-        val toDelete = segments.toList
-        reason.logReason(this, toDelete)
-        toDelete.foreach { segment =>
-          this.segments.remove(segment.baseOffset)
-        }
-        deleteSegmentFiles(toDelete, asyncDelete)
-      }
-    }
-  }
-
-  private def deleteSegmentFiles(segments: Iterable[LogSegment], asyncDelete: 
Boolean, deleteProducerStateSnapshots: Boolean = true): Unit = {
-    Log.deleteSegmentFiles(segments, asyncDelete, 
deleteProducerStateSnapshots, dir, topicPartition,
-      config, scheduler, logDirFailureChannel, producerStateManager, 
this.logIdent)
-  }
-
   private[log] def replaceSegments(newSegments: Seq[LogSegment], oldSegments: 
Seq[LogSegment], isRecoveredSwapFile: Boolean = false): Unit = {

Review comment:
       Great catch, I'll fix this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to