Repository: spark
Updated Branches:
  refs/heads/master 67fb33e7e -> 76e9bd748


[SPARK-18960][SQL][SS] Avoid double reading file which is being copied.

## What changes were proposed in this pull request?

In HDFS, when we copy a file into target directory, there will a temporary 
`._COPY_` file for a period of time. The duration depends on file size. If we 
do not skip this file, we will may read the same data for two times.

## How was this patch tested?
update unit test

Author: uncleGen <husty...@gmail.com>

Closes #16370 from uncleGen/SPARK-18960.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/76e9bd74
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/76e9bd74
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/76e9bd74

Branch: refs/heads/master
Commit: 76e9bd74885a99462ed0957aad37cbead7f14de2
Parents: 67fb33e
Author: uncleGen <husty...@gmail.com>
Authored: Wed Dec 28 10:42:47 2016 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Wed Dec 28 10:42:47 2016 +0000

----------------------------------------------------------------------
 .../datasources/PartitioningAwareFileIndex.scala         | 11 ++++++++---
 .../spark/sql/execution/datasources/FileIndexSuite.scala |  1 +
 2 files changed, 9 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/76e9bd74/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
index 825a0f7..82c1599 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
@@ -439,10 +439,15 @@ object PartitioningAwareFileIndex extends Logging {
 
   /** Checks if we should filter out this path name. */
   def shouldFilterOut(pathName: String): Boolean = {
-    // We filter everything that starts with _ and ., except _common_metadata 
and _metadata
+    // We filter follow paths:
+    // 1. everything that starts with _ and ., except _common_metadata and 
_metadata
     // because Parquet needs to find those metadata files from leaf files 
returned by this method.
     // We should refactor this logic to not mix metadata files with data files.
-    ((pathName.startsWith("_") && !pathName.contains("=")) || 
pathName.startsWith(".")) &&
-      !pathName.startsWith("_common_metadata") && 
!pathName.startsWith("_metadata")
+    // 2. everything that ends with `._COPYING_`, because this is a 
intermediate state of file. we
+    // should skip this file in case of double reading.
+    val exclude = (pathName.startsWith("_") && !pathName.contains("=")) ||
+      pathName.startsWith(".") || pathName.endsWith("._COPYING_")
+    val include = pathName.startsWith("_common_metadata") || 
pathName.startsWith("_metadata")
+    exclude && !include
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/76e9bd74/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
index b7a472b..2b4c9f3 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
@@ -142,6 +142,7 @@ class FileIndexSuite extends SharedSQLContext {
     assert(!PartitioningAwareFileIndex.shouldFilterOut("_common_metadata"))
     assert(PartitioningAwareFileIndex.shouldFilterOut("_ab_metadata"))
     assert(PartitioningAwareFileIndex.shouldFilterOut("_cd_common_metadata"))
+    assert(PartitioningAwareFileIndex.shouldFilterOut("a._COPYING_"))
   }
 
   test("SPARK-17613 - PartitioningAwareFileIndex: base path w/o '/' at end") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to