junrao commented on a change in pull request #9178: URL: https://github.com/apache/kafka/pull/9178#discussion_r485958438
########## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ########## @@ -1698,8 +1698,12 @@ class ReplicaManager(val config: KafkaConfig, Partition.removeMetrics(tp) } - // logDir should be an absolute path - // sendZkNotification is needed for unit test + /** + * The log directory failure handler for the replica + * + * @param dir the absooute path of the log directory Review comment: typo absooute ########## File path: core/src/main/scala/kafka/log/LogCleanerManager.scala ########## @@ -354,12 +354,24 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], } } - def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)]): Unit = { + /** + * Update checkpoint file, or removing topics and partitions that no longer exist Review comment: removing => remove ########## File path: core/src/main/scala/kafka/log/LogCleanerManager.scala ########## @@ -354,12 +354,24 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], } } - def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)]): Unit = { + /** + * Update checkpoint file, or removing topics and partitions that no longer exist + * + * @param dataDir The File object to be updated + * @param update The [TopicPartition, Long] map data to be updated. pass "none" if doing remove, not add + * @param topicPartitionToBeRemoved The TopicPartition to be removed + */ + def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)], topicPartitionToBeRemoved: TopicPartition = null): Unit = { inLock(lock) { val checkpoint = checkpoints(dataDir) if (checkpoint != null) { try { - val existing = checkpoint.read().filter { case (tp, _) => logs.keys.contains(tp) } ++ update + val existing = update match { + case Some(updatedOffset) => Review comment: updatedOffset is not being used. ########## File path: core/src/main/scala/kafka/log/LogCleanerManager.scala ########## @@ -354,12 +354,24 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], } } - def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)]): Unit = { + /** + * Update checkpoint file, or removing topics and partitions that no longer exist + * + * @param dataDir The File object to be updated + * @param update The [TopicPartition, Long] map data to be updated. pass "none" if doing remove, not add + * @param topicPartitionToBeRemoved The TopicPartition to be removed + */ + def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)], topicPartitionToBeRemoved: TopicPartition = null): Unit = { Review comment: Could we make topicPartitionToBeRemoved as Option[TopicPartition]? ########## File path: core/src/main/scala/kafka/log/LogCleanerManager.scala ########## @@ -369,13 +381,21 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], } } + /** + * alter the checkpoint directory for the topicPartition, to remove the data in sourceLogDir, and add the data in destLogDir + */ def alterCheckpointDir(topicPartition: TopicPartition, sourceLogDir: File, destLogDir: File): Unit = { inLock(lock) { try { checkpoints.get(sourceLogDir).flatMap(_.read().get(topicPartition)) match { case Some(offset) => - // Remove this partition from the checkpoint file in the source log directory - updateCheckpoints(sourceLogDir, None) + debug(s"Removing the partition offset data in checkpoint file for '${topicPartition}' " + + s"from ${sourceLogDir.getAbsoluteFile} direcotory.") Review comment: typo direcotory ########## File path: core/src/main/scala/kafka/log/LogManager.scala ########## @@ -184,7 +184,11 @@ class LogManager(logDirs: Seq[File], numRecoveryThreadsPerDataDir = newSize } - // dir should be an absolute path + /** + * The log diretory failure handler. It'll remove all the checkpoint files located in the directory Review comment: typo diretory ########## File path: core/src/main/scala/kafka/log/LogCleanerManager.scala ########## @@ -393,13 +413,21 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], } } + /** + * Stop the cleaning logs in the provided directory Review comment: Stop the cleaning logs => Stop cleaning logs ########## File path: core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala ########## @@ -75,6 +75,17 @@ class CheckpointReadBuffer[T](location: String, } } +/** + * This class interacts with the checkpoint file to read or write [TopicPartition, Offset] entries + * + * The format in the checkpoint file is like this: + * -----checkpoint file content------ + * 0 <- OffsetCheckpointFile.currentVersion + * 2 <- following entries size + * tp1 par1 1 <- the format is: TOPIC PARTITION OFFSET + * tp1 par2 2 + * -----checkpoint file end---------- + */ Review comment: We now have 2 different formats for checkpoint files, one for OffsetCheckpointFile and another for LeaderEpochCheckpointFile. Perhaps we can add the above comment to the appropriate class. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org