Repository: spark Updated Branches: refs/heads/master 8fe32b4f7 -> 6a7e537f3
[SPARK-8756] [SQL] Keep cached information and avoid re-calculating footers in ParquetRelation2 JIRA: https://issues.apache.org/jira/browse/SPARK-8756 Currently, in ParquetRelation2, footers are re-read every time refresh() is called. But we can check if it is possibly changed before we do the reading because reading all footers will be expensive when there are too many partitions. This pr fixes this by keeping some cached information to check it. Author: Liang-Chi Hsieh <vii...@appier.com> Closes #7154 from viirya/cached_footer_parquet_relation and squashes the following commits: 92e9347 [Liang-Chi Hsieh] Fix indentation. ae0ec64 [Liang-Chi Hsieh] Fix wrong assignment. c8fdfb7 [Liang-Chi Hsieh] Fix it. a52b6d1 [Liang-Chi Hsieh] For comments. c2a2420 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into cached_footer_parquet_relation fa5458f [Liang-Chi Hsieh] Use Map to cache FileStatus and do merging previously loaded schema and newly loaded one. 6ae0911 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into cached_footer_parquet_relation 21bbdec [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into cached_footer_parquet_relation 12a0ed9 [Liang-Chi Hsieh] Add check of FileStatus's modification time. 186429d [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into cached_footer_parquet_relation 0ef8caf [Liang-Chi Hsieh] Keep cached information and avoid re-calculating footers. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6a7e537f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6a7e537f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6a7e537f Branch: refs/heads/master Commit: 6a7e537f3a4fd5e99a905f9842dc0ad4c348e4fd Parents: 8fe32b4 Author: Liang-Chi Hsieh <vii...@appier.com> Authored: Fri Jul 24 17:39:57 2015 +0800 Committer: Cheng Lian <l...@databricks.com> Committed: Fri Jul 24 17:39:57 2015 +0800 ---------------------------------------------------------------------- .../apache/spark/sql/parquet/newParquet.scala | 38 ++++++++++++-------- 1 file changed, 24 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/6a7e537f/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index 2f9f880..c384697 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -345,24 +345,34 @@ private[sql] class ParquetRelation2( // Schema of the whole table, including partition columns. var schema: StructType = _ + // Cached leaves + var cachedLeaves: Set[FileStatus] = null + /** * Refreshes `FileStatus`es, footers, partition spec, and table schema. */ def refresh(): Unit = { - // Lists `FileStatus`es of all leaf nodes (files) under all base directories. - val leaves = cachedLeafStatuses().filter { f => - isSummaryFile(f.getPath) || - !(f.getPath.getName.startsWith("_") || f.getPath.getName.startsWith(".")) - }.toArray - - dataStatuses = leaves.filterNot(f => isSummaryFile(f.getPath)) - metadataStatuses = leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_METADATA_FILE) - commonMetadataStatuses = - leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE) - - // If we already get the schema, don't need to re-compute it since the schema merging is - // time-consuming. - if (dataSchema == null) { + val currentLeafStatuses = cachedLeafStatuses() + + // Check if cachedLeafStatuses is changed or not + val leafStatusesChanged = (cachedLeaves == null) || + !cachedLeaves.equals(currentLeafStatuses) + + if (leafStatusesChanged) { + cachedLeaves = currentLeafStatuses.toIterator.toSet + + // Lists `FileStatus`es of all leaf nodes (files) under all base directories. + val leaves = currentLeafStatuses.filter { f => + isSummaryFile(f.getPath) || + !(f.getPath.getName.startsWith("_") || f.getPath.getName.startsWith(".")) + }.toArray + + dataStatuses = leaves.filterNot(f => isSummaryFile(f.getPath)) + metadataStatuses = + leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_METADATA_FILE) + commonMetadataStatuses = + leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE) + dataSchema = { val dataSchema0 = maybeDataSchema .orElse(readSchema()) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org