Repository: spark
Updated Branches:
  refs/heads/master 8fe32b4f7 -> 6a7e537f3


[SPARK-8756] [SQL] Keep cached information and avoid re-calculating footers in 
ParquetRelation2

JIRA: https://issues.apache.org/jira/browse/SPARK-8756

Currently, in ParquetRelation2, footers are re-read every time refresh() is 
called. But we can check if it is possibly changed before we do the reading 
because reading all footers will be expensive when there are too many 
partitions. This pr fixes this by keeping some cached information to check it.

Author: Liang-Chi Hsieh <vii...@appier.com>

Closes #7154 from viirya/cached_footer_parquet_relation and squashes the 
following commits:

92e9347 [Liang-Chi Hsieh] Fix indentation.
ae0ec64 [Liang-Chi Hsieh] Fix wrong assignment.
c8fdfb7 [Liang-Chi Hsieh] Fix it.
a52b6d1 [Liang-Chi Hsieh] For comments.
c2a2420 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into 
cached_footer_parquet_relation
fa5458f [Liang-Chi Hsieh] Use Map to cache FileStatus and do merging previously 
loaded schema and newly loaded one.
6ae0911 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into 
cached_footer_parquet_relation
21bbdec [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into 
cached_footer_parquet_relation
12a0ed9 [Liang-Chi Hsieh] Add check of FileStatus's modification time.
186429d [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into 
cached_footer_parquet_relation
0ef8caf [Liang-Chi Hsieh] Keep cached information and avoid re-calculating 
footers.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6a7e537f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6a7e537f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6a7e537f

Branch: refs/heads/master
Commit: 6a7e537f3a4fd5e99a905f9842dc0ad4c348e4fd
Parents: 8fe32b4
Author: Liang-Chi Hsieh <vii...@appier.com>
Authored: Fri Jul 24 17:39:57 2015 +0800
Committer: Cheng Lian <l...@databricks.com>
Committed: Fri Jul 24 17:39:57 2015 +0800

----------------------------------------------------------------------
 .../apache/spark/sql/parquet/newParquet.scala   | 38 ++++++++++++--------
 1 file changed, 24 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6a7e537f/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index 2f9f880..c384697 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -345,24 +345,34 @@ private[sql] class ParquetRelation2(
     // Schema of the whole table, including partition columns.
     var schema: StructType = _
 
+    // Cached leaves
+    var cachedLeaves: Set[FileStatus] = null
+
     /**
      * Refreshes `FileStatus`es, footers, partition spec, and table schema.
      */
     def refresh(): Unit = {
-      // Lists `FileStatus`es of all leaf nodes (files) under all base 
directories.
-      val leaves = cachedLeafStatuses().filter { f =>
-        isSummaryFile(f.getPath) ||
-          !(f.getPath.getName.startsWith("_") || 
f.getPath.getName.startsWith("."))
-      }.toArray
-
-      dataStatuses = leaves.filterNot(f => isSummaryFile(f.getPath))
-      metadataStatuses = leaves.filter(_.getPath.getName == 
ParquetFileWriter.PARQUET_METADATA_FILE)
-      commonMetadataStatuses =
-        leaves.filter(_.getPath.getName == 
ParquetFileWriter.PARQUET_COMMON_METADATA_FILE)
-
-      // If we already get the schema, don't need to re-compute it since the 
schema merging is
-      // time-consuming.
-      if (dataSchema == null) {
+      val currentLeafStatuses = cachedLeafStatuses()
+
+      // Check if cachedLeafStatuses is changed or not
+      val leafStatusesChanged = (cachedLeaves == null) ||
+        !cachedLeaves.equals(currentLeafStatuses)
+
+      if (leafStatusesChanged) {
+        cachedLeaves = currentLeafStatuses.toIterator.toSet
+
+        // Lists `FileStatus`es of all leaf nodes (files) under all base 
directories.
+        val leaves = currentLeafStatuses.filter { f =>
+          isSummaryFile(f.getPath) ||
+            !(f.getPath.getName.startsWith("_") || 
f.getPath.getName.startsWith("."))
+        }.toArray
+
+        dataStatuses = leaves.filterNot(f => isSummaryFile(f.getPath))
+        metadataStatuses =
+          leaves.filter(_.getPath.getName == 
ParquetFileWriter.PARQUET_METADATA_FILE)
+        commonMetadataStatuses =
+          leaves.filter(_.getPath.getName == 
ParquetFileWriter.PARQUET_COMMON_METADATA_FILE)
+
         dataSchema = {
           val dataSchema0 = maybeDataSchema
             .orElse(readSchema())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to