junrao commented on a change in pull request #9178:
URL: https://github.com/apache/kafka/pull/9178#discussion_r485958438



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1698,8 +1698,12 @@ class ReplicaManager(val config: KafkaConfig,
     Partition.removeMetrics(tp)
   }
 
-  // logDir should be an absolute path
-  // sendZkNotification is needed for unit test
+  /**
+   * The log directory failure handler for the replica
+   *
+   * @param dir                     the absooute path of the log directory

Review comment:
       typo absooute

##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -354,12 +354,24 @@ private[log] class LogCleanerManager(val logDirs: 
Seq[File],
     }
   }
 
-  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, 
Long)]): Unit = {
+  /**
+   * Update checkpoint file, or removing topics and partitions that no longer 
exist

Review comment:
       removing => remove

##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -354,12 +354,24 @@ private[log] class LogCleanerManager(val logDirs: 
Seq[File],
     }
   }
 
-  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, 
Long)]): Unit = {
+  /**
+   * Update checkpoint file, or removing topics and partitions that no longer 
exist
+   *
+   * @param dataDir                       The File object to be updated
+   * @param update                        The [TopicPartition, Long] map data 
to be updated. pass "none" if doing remove, not add
+   * @param topicPartitionToBeRemoved     The TopicPartition to be removed
+   */
+  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)], 
topicPartitionToBeRemoved: TopicPartition = null): Unit = {
     inLock(lock) {
       val checkpoint = checkpoints(dataDir)
       if (checkpoint != null) {
         try {
-          val existing = checkpoint.read().filter { case (tp, _) => 
logs.keys.contains(tp) } ++ update
+          val existing = update match {
+            case Some(updatedOffset) =>

Review comment:
       updatedOffset is not being used.

##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -354,12 +354,24 @@ private[log] class LogCleanerManager(val logDirs: 
Seq[File],
     }
   }
 
-  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, 
Long)]): Unit = {
+  /**
+   * Update checkpoint file, or removing topics and partitions that no longer 
exist
+   *
+   * @param dataDir                       The File object to be updated
+   * @param update                        The [TopicPartition, Long] map data 
to be updated. pass "none" if doing remove, not add
+   * @param topicPartitionToBeRemoved     The TopicPartition to be removed
+   */
+  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)], 
topicPartitionToBeRemoved: TopicPartition = null): Unit = {

Review comment:
       Could we make topicPartitionToBeRemoved as Option[TopicPartition]?

##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -369,13 +381,21 @@ private[log] class LogCleanerManager(val logDirs: 
Seq[File],
     }
   }
 
+  /**
+   * alter the checkpoint directory for the topicPartition, to remove the data 
in sourceLogDir, and add the data in destLogDir
+   */
   def alterCheckpointDir(topicPartition: TopicPartition, sourceLogDir: File, 
destLogDir: File): Unit = {
     inLock(lock) {
       try {
         checkpoints.get(sourceLogDir).flatMap(_.read().get(topicPartition)) 
match {
           case Some(offset) =>
-            // Remove this partition from the checkpoint file in the source 
log directory
-            updateCheckpoints(sourceLogDir, None)
+            debug(s"Removing the partition offset data in checkpoint file for 
'${topicPartition}' " +
+              s"from ${sourceLogDir.getAbsoluteFile} direcotory.")

Review comment:
       typo direcotory

##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -184,7 +184,11 @@ class LogManager(logDirs: Seq[File],
     numRecoveryThreadsPerDataDir = newSize
   }
 
-  // dir should be an absolute path
+  /**
+   * The log diretory failure handler. It'll remove all the checkpoint files 
located in the directory

Review comment:
       typo diretory

##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -393,13 +413,21 @@ private[log] class LogCleanerManager(val logDirs: 
Seq[File],
     }
   }
 
+  /**
+   * Stop the cleaning logs in the provided directory

Review comment:
       Stop the cleaning logs => Stop cleaning logs 

##########
File path: core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala
##########
@@ -75,6 +75,17 @@ class CheckpointReadBuffer[T](location: String,
   }
 }
 
+/**
+ * This class interacts with the checkpoint file to read or write 
[TopicPartition, Offset] entries
+ *
+ * The format in the checkpoint file is like this:
+ *  -----checkpoint file content------
+ *  0                <- OffsetCheckpointFile.currentVersion
+ *  2                <- following entries size
+ *  tp1  par1  1     <- the format is: TOPIC  PARTITION  OFFSET
+ *  tp1  par2  2
+ *  -----checkpoint file end----------
+ */

Review comment:
       We now have 2 different formats for checkpoint files, one for 
OffsetCheckpointFile and another for LeaderEpochCheckpointFile. Perhaps we can 
add the above comment to the appropriate class.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to