HeartSaVioR commented on a change in pull request #27664: URL: https://github.com/apache/spark/pull/27664#discussion_r428668027
########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala ########## @@ -182,19 +182,34 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: } } - override def getLatest(): Option[(Long, T)] = { + /** + * Return the latest batch Id without reading the file. This method only checks for existence of + * file to avoid cost on reading and deserializing log file. + */ + def getLatestBatchId(): Option[Long] = { val batchIds = fileManager.list(metadataPath, batchFilesFilter) .map(f => pathToBatchId(f.getPath)) .sorted(Ordering.Long.reverse) for (batchId <- batchIds) { - val batch = get(batchId) - if (batch.isDefined) { - return Some((batchId, batch.get)) + val batchMetadataFile = batchIdToPath(batchId) + if (fileManager.exists(batchMetadataFile)) { + return Some(batchId) } } None } + override def getLatest(): Option[(Long, T)] = { + getLatestBatchId().map { batchId => + val content = get(batchId).getOrElse { + // This only happens in odd case where the file exists when getLatestBatchId() is called, + // but get() doesn't find it. + throw new IllegalStateException(s"failed to read log file for batch $batchId") Review comment: Just pulled the comment here. Either this or #25965 will have to resolve merge conflict but wanted to be sure the code comment is clear in any way. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org