ccding commented on a change in pull request #10763: URL: https://github.com/apache/kafka/pull/10763#discussion_r660101025
########## File path: core/src/main/scala/kafka/log/LogLoader.scala ########## @@ -90,11 +90,58 @@ object LogLoader extends Logging { * overflow index offset */ def load(params: LoadLogParams): LoadedLogOffsets = { - // first do a pass through the files in the log directory and remove any temporary files + + // First pass: through the files in the log directory and remove any temporary files // and find any interrupted swap operations val swapFiles = removeTempFilesAndCollectSwapFiles(params) - // Now do a second pass and load all the log and index files. + // The remaining valid swap files must come from compaction or segment split operation. We can + // simply rename them to regular segment files. But, before renaming, we should figure out which + // segments are compacted and delete these segment files: this is done by calculating min/maxSwapFileOffset. Review comment: fixed ########## File path: core/src/main/scala/kafka/log/LogLoader.scala ########## @@ -90,11 +90,58 @@ object LogLoader extends Logging { * overflow index offset */ def load(params: LoadLogParams): LoadedLogOffsets = { - // first do a pass through the files in the log directory and remove any temporary files + + // First pass: through the files in the log directory and remove any temporary files // and find any interrupted swap operations val swapFiles = removeTempFilesAndCollectSwapFiles(params) - // Now do a second pass and load all the log and index files. + // The remaining valid swap files must come from compaction or segment split operation. We can + // simply rename them to regular segment files. But, before renaming, we should figure out which + // segments are compacted and delete these segment files: this is done by calculating min/maxSwapFileOffset. + // We store segments that require renaming in this code block, and do the actual renaming later. + var minSwapFileOffset = Long.MaxValue + var maxSwapFileOffset = Long.MinValue + swapFiles.filter(f => Log.isLogFile(new File(CoreUtils.replaceSuffix(f.getPath, SwapFileSuffix, "")))).foreach { f => + val baseOffset = offsetFromFile(f) + val segment = LogSegment.open(f.getParentFile, + baseOffset = baseOffset, + params.config, + time = params.time, + fileSuffix = Log.SwapFileSuffix) + info(s"${params.logIdentifier}Found log file ${f.getPath} from interrupted swap operation, which is recoverable from ${Log.SwapFileSuffix} files by renaming.") + minSwapFileOffset = Math.min(segment.baseOffset, minSwapFileOffset) + maxSwapFileOffset = Math.max(segment.offsetIndex.lastOffset, maxSwapFileOffset) + } + + // Second pass: delete segments that are between minSwapFileOffset and maxSwapFileOffset. As + // discussed above, these segments were compacted or split but haven't been renamed to .delete + // before shutting down the broker. + for (file <- params.dir.listFiles if file.isFile) { + try { + if (!file.getName.endsWith(SwapFileSuffix)) { + val offset = offsetFromFile(file) + if (offset >= minSwapFileOffset && offset <= maxSwapFileOffset) { Review comment: fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org