dajac commented on a change in pull request #8672:
URL: https://github.com/apache/kafka/pull/8672#discussion_r436519796



##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -580,53 +586,105 @@ class LogManager(logDirs: Seq[File],
    * to avoid exposing data that have been deleted by DeleteRecordsRequest
    */
   def checkpointLogStartOffsets(): Unit = {
-    liveLogDirs.foreach(checkpointLogStartOffsetsInDir)
+    val logsByDirCached = logsByDir
+    liveLogDirs.foreach { logDir =>
+      checkpointLogsStartOffsetsInDir(
+        logsByDirCached.getOrElse(logDir.getAbsolutePath, Map.empty), logDir)
+    }
   }
 
   /**
-    * Write the recovery checkpoint file for all logs in provided directory 
and clean older snapshots for provided logs.
-    *
-    * @param dir the directory in which logs are checkpointed
-    * @param logsToCleanSnapshot logs whose snapshots need to be cleaned
-    */
+   * Write the checkpoint files for all the provided directories. This is used 
to cleanup
+   * checkpoints after having deleted partitions.
+   */
+  def checkpoint(logDirs: Set[File]): Unit = {
+    val logsByDirCached = logsByDir
+    logDirs.foreach { logDir =>
+      val partitionToLog = logsByDirCached.getOrElse(logDir.getAbsolutePath, 
Map.empty)
+      if (cleaner != null) {
+        cleaner.updateCheckpoints(logDir)
+      }
+      checkpointLogsRecoveryOffsetsInDir(partitionToLog, logDir)
+      checkpointLogsStartOffsetsInDir(partitionToLog, logDir)
+    }
+  }
+
+  /**
+   * Clean snapshots of the provided logs in the provided directory.
+   *
+   * @param logsToCleanSnapshot the logs whose snapshots will be cleaned
+   * @param dir the directory in which the logs are
+   */
   // Only for testing
-  private[log] def checkpointRecoveryOffsetsAndCleanSnapshot(dir: File, 
logsToCleanSnapshot: Seq[Log]): Unit = {
+  private[log] def cleanSnapshotsInDir(logsToCleanSnapshot: Seq[Log], dir: 
File): Unit = {
     try {
-      checkpointLogRecoveryOffsetsInDir(dir)
       
logsToCleanSnapshot.foreach(_.deleteSnapshotsAfterRecoveryPointCheckpoint())
     } catch {
       case e: IOException =>
-        logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, s"Disk 
error while writing to recovery point " +
-          s"file in directory $dir", e)
+        logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath,
+          s"Disk error while writing to recovery point file in directory 
$dir", e)
     }
   }
 
-  private def checkpointLogRecoveryOffsetsInDir(dir: File): Unit = {
-    for {
-      partitionToLog <- logsByDir.get(dir.getAbsolutePath)
-      checkpoint <- recoveryPointCheckpoints.get(dir)
-    } {
-      checkpoint.write(partitionToLog.map { case (tp, log) => tp -> 
log.recoveryPoint })
+  /**
+   * Checkpoint log recovery offsets for all the logs in the provided 
directory.
+   *
+   * @param dir the directory in which logs are checkpointed
+   */
+  // Only for testing
+  private[log] def checkpointRecoveryOffsetsInDir(dir: File): Unit = {
+    val partitionToLog = logsByDir.getOrElse(dir.getAbsolutePath, Map.empty)
+    checkpointLogsRecoveryOffsetsInDir(partitionToLog, dir)
+  }
+
+  /**
+   * Checkpoint log recovery and start offsets for all logs in the provided 
directory.
+   *
+   * @param dir the directory in which logs are checkpointed
+   */
+  private def checkpointRecoveryAndLogStartOffsetsInDir(dir: File): Unit = {

Review comment:
       It is a small optimization to avoid computing the `logsByDir` mapping 
twice when both we checkpoint both at the same time. Computing the mapping is 
quite expensive when there are many logs.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to