Repository: spark
Updated Branches:
  refs/heads/branch-2.2 4f0eb0c86 -> 43f9c84b6


[SPARK-21374][CORE] Fix reading globbed paths from S3 into DF with disabled FS 
cache

This PR replaces #18623 to do some clean up.

Closes #18623

Jenkins

Author: Shixiong Zhu <shixi...@databricks.com>
Author: Andrey Taptunov <taptu...@amazon.com>

Closes #18848 from zsxwing/review-pr18623.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/43f9c84b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/43f9c84b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/43f9c84b

Branch: refs/heads/branch-2.2
Commit: 43f9c84b6749b2ebf802e1f062238167b2b1f3bb
Parents: 4f0eb0c
Author: Andrey Taptunov <taptu...@amazon.com>
Authored: Fri Aug 4 22:40:04 2017 -0700
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Mon Aug 7 11:04:32 2017 -0700

----------------------------------------------------------------------
 .../apache/spark/deploy/SparkHadoopUtil.scala   |  8 ++++
 .../sql/execution/datasources/DataSource.scala  | 45 ++++++++++++--------
 2 files changed, 36 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/43f9c84b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 6afe58b..550bd68 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -227,6 +227,10 @@ class SparkHadoopUtil extends Logging {
 
   def globPath(pattern: Path): Seq[Path] = {
     val fs = pattern.getFileSystem(conf)
+    globPath(fs, pattern)
+  }
+
+  def globPath(fs: FileSystem, pattern: Path): Seq[Path] = {
     Option(fs.globStatus(pattern)).map { statuses =>
       statuses.map(_.getPath.makeQualified(fs.getUri, 
fs.getWorkingDirectory)).toSeq
     }.getOrElse(Seq.empty[Path])
@@ -236,6 +240,10 @@ class SparkHadoopUtil extends Logging {
     if (isGlobPath(pattern)) globPath(pattern) else Seq(pattern)
   }
 
+  def globPathIfNecessary(fs: FileSystem, pattern: Path): Seq[Path] = {
+    if (isGlobPath(pattern)) globPath(fs, pattern) else Seq(pattern)
+  }
+
   /**
    * Lists all the files in a directory with the specified prefix, and does 
not end with the
    * given suffix. The returned {{FileStatus}} instances are sorted by the 
modification times of

http://git-wip-us.apache.org/repos/asf/spark/blob/43f9c84b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 0915bd3..a13bb24 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -23,6 +23,7 @@ import scala.collection.JavaConverters._
 import scala.language.{existentials, implicitConversions}
 import scala.util.{Failure, Success, Try}
 
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.deploy.SparkHadoopUtil
@@ -123,7 +124,7 @@ case class DataSource(
         val hdfsPath = new Path(path)
         val fs = hdfsPath.getFileSystem(hadoopConf)
         val qualified = hdfsPath.makeQualified(fs.getUri, 
fs.getWorkingDirectory)
-        SparkHadoopUtil.get.globPathIfNecessary(qualified)
+        SparkHadoopUtil.get.globPathIfNecessary(fs, qualified)
       }.toArray
       new InMemoryFileIndex(sparkSession, globbedPaths, options, None, 
fileStatusCache)
     }
@@ -345,22 +346,8 @@ case class DataSource(
       case (format: FileFormat, _) =>
         val allPaths = caseInsensitiveOptions.get("path") ++ paths
         val hadoopConf = sparkSession.sessionState.newHadoopConf()
-        val globbedPaths = allPaths.flatMap { path =>
-          val hdfsPath = new Path(path)
-          val fs = hdfsPath.getFileSystem(hadoopConf)
-          val qualified = hdfsPath.makeQualified(fs.getUri, 
fs.getWorkingDirectory)
-          val globPath = SparkHadoopUtil.get.globPathIfNecessary(qualified)
-
-          if (globPath.isEmpty) {
-            throw new AnalysisException(s"Path does not exist: $qualified")
-          }
-          // Sufficient to check head of the globPath seq for non-glob scenario
-          // Don't need to check once again if files exist in streaming mode
-          if (checkFilesExist && !fs.exists(globPath.head)) {
-            throw new AnalysisException(s"Path does not exist: 
${globPath.head}")
-          }
-          globPath
-        }.toArray
+        val globbedPaths = allPaths.flatMap(
+          DataSource.checkAndGlobPathIfNecessary(hadoopConf, _, 
checkFilesExist)).toArray
 
         val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
         val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format, 
fileStatusCache)
@@ -606,4 +593,28 @@ object DataSource extends Logging {
     CatalogStorageFormat.empty.copy(
       locationUri = path.map(CatalogUtils.stringToURI), properties = 
optionsWithoutPath)
   }
+
+  /**
+   * If `path` is a file pattern, return all the files that match it. 
Otherwise, return itself.
+   * If `checkFilesExist` is `true`, also check the file existence.
+   */
+  private def checkAndGlobPathIfNecessary(
+      hadoopConf: Configuration,
+      path: String,
+      checkFilesExist: Boolean): Seq[Path] = {
+    val hdfsPath = new Path(path)
+    val fs = hdfsPath.getFileSystem(hadoopConf)
+    val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
+    val globPath = SparkHadoopUtil.get.globPathIfNecessary(fs, qualified)
+
+    if (globPath.isEmpty) {
+      throw new AnalysisException(s"Path does not exist: $qualified")
+    }
+    // Sufficient to check head of the globPath seq for non-glob scenario
+    // Don't need to check once again if files exist in streaming mode
+    if (checkFilesExist && !fs.exists(globPath.head)) {
+      throw new AnalysisException(s"Path does not exist: ${globPath.head}")
+    }
+    globPath
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to