Repository: spark
Updated Branches:
  refs/heads/branch-2.0 f14f47f07 -> 243bdb11d


[SPARK-17613] S3A base paths with no '/' at the end return empty DataFrames

Consider you have a bucket as `s3a://some-bucket`
and under it you have files:
```
s3a://some-bucket/file1.parquet
s3a://some-bucket/file2.parquet
```
Getting the parent path of `s3a://some-bucket/file1.parquet` yields
`s3a://some-bucket/` and the ListingFileCatalog uses this as the key in the 
hash map.

When catalog.allFiles is called, we use `s3a://some-bucket` (no slash at the 
end) to get the list of files, and we're left with an empty list!

This PR fixes this by adding a `/` at the end of the `URI` iff the given `Path` 
doesn't have a parent, i.e. is the root. This is a no-op if the path already 
had a `/` at the end, and is handled through the Hadoop Path, path merging 
semantics.

Unit test in `FileCatalogSuite`.

Author: Burak Yavuz <brk...@gmail.com>

Closes #15169 from brkyvz/SPARK-17613.

(cherry picked from commit 85d609cf25c1da2df3cd4f5d5aeaf3cbcf0d674c)
Signed-off-by: Josh Rosen <joshro...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/243bdb11
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/243bdb11
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/243bdb11

Branch: refs/heads/branch-2.0
Commit: 243bdb11d89ee379acae1ea1ed78df10797e86d1
Parents: f14f47f
Author: Burak Yavuz <brk...@gmail.com>
Authored: Thu Sep 22 13:05:41 2016 -0700
Committer: Josh Rosen <joshro...@databricks.com>
Committed: Thu Sep 22 13:06:15 2016 -0700

----------------------------------------------------------------------
 .../PartitioningAwareFileCatalog.scala          | 10 ++++-
 .../datasources/FileCatalogSuite.scala          | 45 +++++++++++++++++++-
 2 files changed, 53 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/243bdb11/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
index cef9d4d..2130c27 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
@@ -76,7 +76,15 @@ abstract class PartitioningAwareFileCatalog(
       paths.flatMap { path =>
         // Make the path qualified (consistent with listLeafFiles and 
listLeafFilesInParallel).
         val fs = path.getFileSystem(hadoopConf)
-        val qualifiedPath = fs.makeQualified(path)
+        val qualifiedPathPre = fs.makeQualified(path)
+        val qualifiedPath: Path = if (qualifiedPathPre.isRoot && 
!qualifiedPathPre.isAbsolute) {
+          // SPARK-17613: Always append `Path.SEPARATOR` to the end of parent 
directories,
+          // because the `leafFile.getParent` would have returned an absolute 
path with the
+          // separator at the end.
+          new Path(qualifiedPathPre, Path.SEPARATOR)
+        } else {
+          qualifiedPathPre
+        }
 
         // There are three cases possible with each path
         // 1. The path is a directory and has children files in it. Then it 
must be present in

http://git-wip-us.apache.org/repos/asf/spark/blob/243bdb11/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
index 0d9ea51..563f340 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
@@ -18,10 +18,12 @@
 package org.apache.spark.sql.execution.datasources
 
 import java.io.File
+import java.net.URI
 
+import scala.collection.mutable
 import scala.language.reflectiveCalls
 
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
 
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.test.SharedSQLContext
@@ -67,4 +69,45 @@ class FileCatalogSuite extends SharedSQLContext {
 
     }
   }
+
+  test("SPARK-17613 - PartitioningAwareFileCatalog: base path w/o '/' at end") 
{
+    class MockCatalog(
+      override val paths: Seq[Path]) extends 
PartitioningAwareFileCatalog(spark, Map.empty, None) {
+
+      override def refresh(): Unit = {}
+
+      override def leafFiles: mutable.LinkedHashMap[Path, FileStatus] = 
mutable.LinkedHashMap(
+        new Path("mockFs://some-bucket/file1.json") -> new FileStatus()
+      )
+
+      override def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] = Map(
+        new Path("mockFs://some-bucket/") -> Array(new FileStatus())
+      )
+
+      override def partitionSpec(): PartitionSpec = {
+        PartitionSpec.emptySpec
+      }
+    }
+
+    withSQLConf(
+        "fs.mockFs.impl" -> classOf[FakeParentPathFileSystem].getName,
+        "fs.mockFs.impl.disable.cache" -> "true") {
+      val pathWithSlash = new Path("mockFs://some-bucket/")
+      assert(pathWithSlash.getParent === null)
+      val pathWithoutSlash = new Path("mockFs://some-bucket")
+      assert(pathWithoutSlash.getParent === null)
+      val catalog1 = new MockCatalog(Seq(pathWithSlash))
+      val catalog2 = new MockCatalog(Seq(pathWithoutSlash))
+      assert(catalog1.allFiles().nonEmpty)
+      assert(catalog2.allFiles().nonEmpty)
+    }
+  }
+}
+
+class FakeParentPathFileSystem extends RawLocalFileSystem {
+  override def getScheme: String = "mockFs"
+
+  override def getUri: URI = {
+    URI.create("mockFs://some-bucket")
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to