Repository: spark Updated Branches: refs/heads/master 67fb33e7e -> 76e9bd748
[SPARK-18960][SQL][SS] Avoid double reading file which is being copied. ## What changes were proposed in this pull request? In HDFS, when we copy a file into target directory, there will a temporary `._COPY_` file for a period of time. The duration depends on file size. If we do not skip this file, we will may read the same data for two times. ## How was this patch tested? update unit test Author: uncleGen <husty...@gmail.com> Closes #16370 from uncleGen/SPARK-18960. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/76e9bd74 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/76e9bd74 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/76e9bd74 Branch: refs/heads/master Commit: 76e9bd74885a99462ed0957aad37cbead7f14de2 Parents: 67fb33e Author: uncleGen <husty...@gmail.com> Authored: Wed Dec 28 10:42:47 2016 +0000 Committer: Sean Owen <so...@cloudera.com> Committed: Wed Dec 28 10:42:47 2016 +0000 ---------------------------------------------------------------------- .../datasources/PartitioningAwareFileIndex.scala | 11 ++++++++--- .../spark/sql/execution/datasources/FileIndexSuite.scala | 1 + 2 files changed, 9 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/76e9bd74/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala index 825a0f7..82c1599 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala @@ -439,10 +439,15 @@ object PartitioningAwareFileIndex extends Logging { /** Checks if we should filter out this path name. */ def shouldFilterOut(pathName: String): Boolean = { - // We filter everything that starts with _ and ., except _common_metadata and _metadata + // We filter follow paths: + // 1. everything that starts with _ and ., except _common_metadata and _metadata // because Parquet needs to find those metadata files from leaf files returned by this method. // We should refactor this logic to not mix metadata files with data files. - ((pathName.startsWith("_") && !pathName.contains("=")) || pathName.startsWith(".")) && - !pathName.startsWith("_common_metadata") && !pathName.startsWith("_metadata") + // 2. everything that ends with `._COPYING_`, because this is a intermediate state of file. we + // should skip this file in case of double reading. + val exclude = (pathName.startsWith("_") && !pathName.contains("=")) || + pathName.startsWith(".") || pathName.endsWith("._COPYING_") + val include = pathName.startsWith("_common_metadata") || pathName.startsWith("_metadata") + exclude && !include } } http://git-wip-us.apache.org/repos/asf/spark/blob/76e9bd74/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala index b7a472b..2b4c9f3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala @@ -142,6 +142,7 @@ class FileIndexSuite extends SharedSQLContext { assert(!PartitioningAwareFileIndex.shouldFilterOut("_common_metadata")) assert(PartitioningAwareFileIndex.shouldFilterOut("_ab_metadata")) assert(PartitioningAwareFileIndex.shouldFilterOut("_cd_common_metadata")) + assert(PartitioningAwareFileIndex.shouldFilterOut("a._COPYING_")) } test("SPARK-17613 - PartitioningAwareFileIndex: base path w/o '/' at end") { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org