Repository: spark
Updated Branches:
  refs/heads/branch-2.0 22f9f5f97 -> dc1562e97


[SPARK-14997][SQL] Fixed FileCatalog to return correct set of files when there 
is no partitioning scheme in the given paths

## What changes were proposed in this pull request?
Lets says there are json files in the following directories structure
```
xyz/file0.json
xyz/subdir1/file1.json
xyz/subdir2/file2.json
xyz/subdir1/subsubdir1/file3.json
```
`sqlContext.read.json("xyz")` should read only file0.json according to behavior 
in Spark 1.6.1. However in current master, all the 4 files are read.

The fix is to make FileCatalog return only the children files of the given path 
if there is not partitioning detected (instead of all the recursive list of 
files).

Closes #12774

## How was this patch tested?

unit tests

Author: Tathagata Das <tathagata.das1...@gmail.com>

Closes #12856 from tdas/SPARK-14997.

(cherry picked from commit f7b7ef41662d7d02fc4f834f3c6c4ee8802e949c)
Signed-off-by: Yin Huai <yh...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dc1562e9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dc1562e9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dc1562e9

Branch: refs/heads/branch-2.0
Commit: dc1562e97d570238f8532b3f8051e8df90722732
Parents: 22f9f5f
Author: Tathagata Das <tathagata.das1...@gmail.com>
Authored: Fri May 6 15:04:16 2016 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Fri May 6 15:04:27 2016 -0700

----------------------------------------------------------------------
 .../PartitioningAwareFileCatalog.scala          |  24 +-
 .../datasources/FileCatalogSuite.scala          |  68 ++++++
 .../ParquetPartitionDiscoverySuite.scala        |  47 ++++
 .../sql/streaming/FileStreamSourceSuite.scala   |  15 +-
 .../sql/sources/HadoopFsRelationTest.scala      | 232 +++++++++++++++++--
 5 files changed, 356 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/dc1562e9/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
index 2c44b39..5f04a6c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
@@ -61,7 +61,29 @@ abstract class PartitioningAwareFileCatalog(
     }
   }
 
-  override def allFiles(): Seq[FileStatus] = leafFiles.values.toSeq
+  override def allFiles(): Seq[FileStatus] = {
+    if (partitionSpec().partitionColumns.isEmpty) {
+      // For each of the input paths, get the list of files inside them
+      paths.flatMap { path =>
+        // Make the path qualified (consistent with listLeafFiles and 
listLeafFilesInParallel).
+        val fs = path.getFileSystem(hadoopConf)
+        val qualifiedPath = fs.makeQualified(path)
+
+        // There are three cases possible with each path
+        // 1. The path is a directory and has children files in it. Then it 
must be present in
+        //    leafDirToChildrenFiles as those children files will have been 
found as leaf files.
+        //    Find its children files from leafDirToChildrenFiles and include 
them.
+        // 2. The path is a file, then it will be present in leafFiles. 
Include this path.
+        // 3. The path is a directory, but has no children files. Do not 
include this path.
+
+        leafDirToChildrenFiles.get(qualifiedPath)
+          .orElse { leafFiles.get(qualifiedPath).map(Array(_)) }
+          .getOrElse(Array.empty)
+      }
+    } else {
+      leafFiles.values.toSeq
+    }
+  }
 
   protected def inferPartitioning(): PartitionSpec = {
     // We use leaf dirs containing data files to discover the schema.

http://git-wip-us.apache.org/repos/asf/spark/blob/dc1562e9/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
new file mode 100644
index 0000000..dab5c76
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.io.File
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.test.SharedSQLContext
+
+class FileCatalogSuite extends SharedSQLContext {
+
+  test("ListingFileCatalog: leaf files are qualified paths") {
+    withTempDir { dir =>
+      val file = new File(dir, "text.txt")
+      stringToFile(file, "text")
+
+      val path = new Path(file.getCanonicalPath)
+      val catalog = new ListingFileCatalog(sqlContext.sparkSession, Seq(path), 
Map.empty, None) {
+        def leafFilePaths: Seq[Path] = leafFiles.keys.toSeq
+        def leafDirPaths: Seq[Path] = leafDirToChildrenFiles.keys.toSeq
+      }
+      assert(catalog.leafFilePaths.forall(p => 
p.toString.startsWith("file:/")))
+      assert(catalog.leafDirPaths.forall(p => p.toString.startsWith("file:/")))
+    }
+  }
+
+  test("ListingFileCatalog: input paths are converted to qualified paths") {
+    withTempDir { dir =>
+      val file = new File(dir, "text.txt")
+      stringToFile(file, "text")
+
+      val unqualifiedDirPath = new Path(dir.getCanonicalPath)
+      val unqualifiedFilePath = new Path(file.getCanonicalPath)
+      require(!unqualifiedDirPath.toString.contains("file:"))
+      require(!unqualifiedFilePath.toString.contains("file:"))
+
+      val fs = 
unqualifiedDirPath.getFileSystem(sparkContext.hadoopConfiguration)
+      val qualifiedFilePath = fs.makeQualified(new Path(file.getCanonicalPath))
+      require(qualifiedFilePath.toString.startsWith("file:"))
+
+      val catalog1 = new ListingFileCatalog(
+        sqlContext.sparkSession, Seq(unqualifiedDirPath), Map.empty, None)
+      assert(catalog1.allFiles.map(_.getPath) === Seq(qualifiedFilePath))
+
+      val catalog2 = new ListingFileCatalog(
+        sqlContext.sparkSession, Seq(unqualifiedFilePath), Map.empty, None)
+      assert(catalog2.allFiles.map(_.getPath) === Seq(qualifiedFilePath))
+
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/dc1562e9/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index cb2c252..b4d35be 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -765,6 +765,53 @@ class ParquetPartitionDiscoverySuite extends QueryTest 
with ParquetTest with Sha
     }
   }
 
+  test("use basePath and file globbing to selectively load partitioned table") 
{
+    withTempPath { dir =>
+
+      val df = Seq(
+        (1, "foo", 100),
+        (1, "bar", 200),
+        (2, "foo", 300),
+        (2, "bar", 400)
+      ).toDF("p1", "p2", "v")
+      df.write
+        .mode(SaveMode.Overwrite)
+        .partitionBy("p1", "p2")
+        .parquet(dir.getCanonicalPath)
+
+      def check(path: String, basePath: String, expectedDf: DataFrame): Unit = 
{
+        val testDf = sqlContext.read
+          .option("basePath", basePath)
+          .parquet(path)
+        checkAnswer(testDf, expectedDf)
+      }
+
+      // Should find all the data with partitioning columns when base path is 
set to the root
+      val resultDf = df.select("v", "p1", "p2")
+      check(path = s"$dir", basePath = s"$dir", resultDf)
+      check(path = s"$dir/*", basePath = s"$dir", resultDf)
+      check(path = s"$dir/*/*", basePath = s"$dir", resultDf)
+      check(path = s"$dir/*/*/*", basePath = s"$dir", resultDf)
+
+      // Should find selective partitions of the data if the base path is not 
set to root
+
+      check(          // read from ../p1=1 with base ../p1=1, should not infer 
p1 col
+        path = s"$dir/p1=1/*",
+        basePath = s"$dir/p1=1/",
+        resultDf.filter("p1 = 1").drop("p1"))
+
+      check(          // red from ../p1=1/p2=foo with base ../p1=1/ should not 
infer p1
+        path = s"$dir/p1=1/p2=foo/*",
+        basePath = s"$dir/p1=1/",
+        resultDf.filter("p1 = 1").filter("p2 = 'foo'").drop("p1"))
+
+      check(          // red from ../p1=1/p2=foo with base ../p1=1/p2=foo, 
should not infer p1, p2
+        path = s"$dir/p1=1/p2=foo/*",
+        basePath = s"$dir/p1=1/p2=foo/",
+        resultDf.filter("p1 = 1").filter("p2 = 'foo'").drop("p1", "p2"))
+    }
+  }
+
   test("_SUCCESS should not break partitioning discovery") {
     Seq(1, 32).foreach { threshold =>
       // We have two paths to list files, one at driver side, another one that 
we use

http://git-wip-us.apache.org/repos/asf/spark/blob/dc1562e9/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index bc5c0c1..a62852b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.streaming
 
 import java.io.File
+import java.util.UUID
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.util._
@@ -84,10 +85,13 @@ class FileStreamSourceTest extends StreamTest with 
SharedSQLContext {
       AddParquetFileData(seq.toDS().toDF(), src, tmp)
     }
 
+    /** Write parquet files in a temp dir, and move the individual files to 
the 'src' dir */
     def writeToFile(df: DataFrame, src: File, tmp: File): Unit = {
-      val file = Utils.tempFileWith(new File(tmp, "parquet"))
-      df.write.parquet(file.getCanonicalPath)
-      file.renameTo(new File(src, file.getName))
+      val tmpDir = Utils.tempFileWith(new File(tmp, "parquet"))
+      df.write.parquet(tmpDir.getCanonicalPath)
+      tmpDir.listFiles().foreach { f =>
+        f.renameTo(new File(src, s"${f.getName}"))
+      }
     }
   }
 
@@ -210,8 +214,9 @@ class FileStreamSourceSuite extends FileStreamSourceTest 
with SharedSQLContext {
 
   test("FileStreamSource schema: parquet, existing files, no schema") {
     withTempDir { src =>
-      Seq("a", "b", "c").toDS().as("userColumn").toDF()
-        .write.parquet(new File(src, "1").getCanonicalPath)
+      Seq("a", "b", "c").toDS().as("userColumn").toDF().write
+        .mode(org.apache.spark.sql.SaveMode.Overwrite)
+        .parquet(src.getCanonicalPath)
       val schema = createFileStreamSourceAndGetSchema(
         format = Some("parquet"), path = Some(src.getCanonicalPath), schema = 
None)
       assert(schema === new StructType().add("value", StringType))

http://git-wip-us.apache.org/repos/asf/spark/blob/dc1562e9/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
index 67b403a..20c5f72 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.sources
 
+import java.io.File
+
 import scala.util.Random
 
 import org.apache.hadoop.fs.Path
@@ -486,40 +488,222 @@ abstract class HadoopFsRelationTest extends QueryTest 
with SQLTestUtils with Tes
     }
   }
 
-  test("Hadoop style globbing") {
+  test("load() - with directory of unpartitioned data in nested subdirs") {
+    withTempPath { dir =>
+      val subdir = new File(dir, "subdir")
+
+      val dataInDir = Seq(1, 2, 3).toDF("value")
+      val dataInSubdir = Seq(4, 5, 6).toDF("value")
+
+      /*
+
+        Directory structure to be generated
+
+        dir
+          |
+          |___ [ files of dataInDir ]
+          |
+          |___ subsubdir
+                    |
+                    |___ [ files of dataInSubdir ]
+      */
+
+      // Generated dataInSubdir, not data in dir
+      dataInSubdir.write
+        .format(dataSourceName)
+        .mode(SaveMode.Overwrite)
+        .save(subdir.getCanonicalPath)
+
+      // Inferring schema should throw error as it should not find any file to 
infer
+      val e = intercept[Exception] {
+        sqlContext.read.format(dataSourceName).load(dir.getCanonicalPath)
+      }
+
+      e match {
+        case _: AnalysisException =>
+          assert(e.getMessage.contains("infer"))
+
+        case _: java.util.NoSuchElementException if 
e.getMessage.contains("dataSchema") =>
+          // Ignore error, the source format requires schema to be provided by 
user
+          // This is needed for SimpleTextHadoopFsRelationSuite as 
SimpleTextSource needs schema
+
+        case _ =>
+          fail("Unexpected error trying to infer schema from empty dir", e)
+      }
+
+      /** Test whether data is read with the given path matches the expected 
answer */
+      def testWithPath(path: File, expectedAnswer: Seq[Row]): Unit = {
+        val df = sqlContext.read
+          .format(dataSourceName)
+          .schema(dataInDir.schema) // avoid schema inference for any format
+          .load(path.getCanonicalPath)
+        checkAnswer(df, expectedAnswer)
+      }
+
+      // Verify that reading by path 'dir/' gives empty results as there are 
no files in 'file'
+      // and it should not pick up files in 'dir/subdir'
+      require(subdir.exists)
+      require(subdir.listFiles().exists(!_.isDirectory))
+      testWithPath(dir, Seq.empty)
+
+      // Verify that if there is data in dir, then reading by path 'dir/' 
reads only dataInDir
+      dataInDir.write
+        .format(dataSourceName)
+        .mode(SaveMode.Append)   // append to prevent subdir from being deleted
+        .save(dir.getCanonicalPath)
+      require(dir.listFiles().exists(!_.isDirectory))
+      require(subdir.exists())
+      require(subdir.listFiles().exists(!_.isDirectory))
+      testWithPath(dir, dataInDir.collect())
+    }
+  }
+
+  test("Hadoop style globbing - unpartitioned data") {
     withTempPath { file =>
+
+      val dir = file.getCanonicalPath
+      val subdir = new File(dir, "subdir")
+      val subsubdir = new File(subdir, "subsubdir")
+      val anotherSubsubdir =
+        new File(new File(dir, "another-subdir"), "another-subsubdir")
+
+      val dataInSubdir = Seq(1, 2, 3).toDF("value")
+      val dataInSubsubdir = Seq(4, 5, 6).toDF("value")
+      val dataInAnotherSubsubdir = Seq(7, 8, 9).toDF("value")
+
+      dataInSubdir.write
+        .format (dataSourceName)
+        .mode (SaveMode.Overwrite)
+        .save (subdir.getCanonicalPath)
+
+      dataInSubsubdir.write
+        .format (dataSourceName)
+        .mode (SaveMode.Overwrite)
+        .save (subsubdir.getCanonicalPath)
+
+      dataInAnotherSubsubdir.write
+        .format (dataSourceName)
+        .mode (SaveMode.Overwrite)
+        .save (anotherSubsubdir.getCanonicalPath)
+
+      require(subdir.exists)
+      require(subdir.listFiles().exists(!_.isDirectory))
+      require(subsubdir.exists)
+      require(subsubdir.listFiles().exists(!_.isDirectory))
+      require(anotherSubsubdir.exists)
+      require(anotherSubsubdir.listFiles().exists(!_.isDirectory))
+
+      /*
+        Directory structure generated
+
+        dir
+          |
+          |___ subdir
+          |     |
+          |     |___ [ files of dataInSubdir ]
+          |     |
+          |     |___ subsubdir
+          |               |
+          |               |___ [ files of dataInSubsubdir ]
+          |
+          |
+          |___ anotherSubdir
+                |
+                |___ anotherSubsubdir
+                          |
+                          |___ [ files of dataInAnotherSubsubdir ]
+       */
+
+      val schema = dataInSubdir.schema
+
+      /** Check whether data is read with the given path matches the expected 
answer */
+      def check(path: String, expectedDf: DataFrame): Unit = {
+        val df = sqlContext.read
+          .format(dataSourceName)
+          .schema(schema) // avoid schema inference for any format, expected 
to be same format
+          .load(path)
+        checkAnswer(df, expectedDf)
+      }
+
+      check(s"$dir/*/", dataInSubdir)
+      check(s"$dir/sub*/*", dataInSubdir.union(dataInSubsubdir))
+      check(s"$dir/another*/*", dataInAnotherSubsubdir)
+      check(s"$dir/*/another*", dataInAnotherSubsubdir)
+      check(s"$dir/*/*", 
dataInSubdir.union(dataInSubsubdir).union(dataInAnotherSubsubdir))
+    }
+  }
+
+  test("Hadoop style globbing - partitioned data with schema inference") {
+
+    // Tests the following on partition data
+    // - partitions are not discovered with globbing and without base path set.
+    // - partitions are discovered with globbing and base path set, though 
more detailed
+    //   tests for this is in ParquetPartitionDiscoverySuite
+
+    withTempPath { path =>
+      val dir = path.getCanonicalPath
       partitionedTestDF.write
         .format(dataSourceName)
         .mode(SaveMode.Overwrite)
         .partitionBy("p1", "p2")
-        .save(file.getCanonicalPath)
+        .save(dir)
+
+      def check(
+          path: String,
+          expectedResult: Either[DataFrame, String],
+          basePath: Option[String] = None
+        ): Unit = {
+        try {
+          val reader = sqlContext.read
+          basePath.foreach(reader.option("basePath", _))
+          val testDf = reader
+            .format(dataSourceName)
+            .load(path)
+          assert(expectedResult.isLeft, s"Error was expected with $path but 
result found")
+          checkAnswer(testDf, expectedResult.left.get)
+        } catch {
+          case e: java.util.NoSuchElementException if 
e.getMessage.contains("dataSchema") =>
+            // Ignore error, the source format requires schema to be provided 
by user
+            // This is needed for SimpleTextHadoopFsRelationSuite as 
SimpleTextSource needs schema
+
+          case e: Throwable =>
+            assert(expectedResult.isRight, s"Was not expecting error with 
$path: " + e)
+            assert(
+              e.getMessage.contains(expectedResult.right.get),
+              s"Did not find expected error message wiht $path")
+        }
+      }
 
-      val df = sqlContext.read
-        .format(dataSourceName)
-        .option("dataSchema", dataSchema.json)
-        .option("basePath", file.getCanonicalPath)
-        .load(s"${file.getCanonicalPath}/p1=*/p2=???")
-
-      val expectedPaths = Set(
-        s"${file.getCanonicalFile}/p1=1/p2=foo",
-        s"${file.getCanonicalFile}/p1=2/p2=foo",
-        s"${file.getCanonicalFile}/p1=1/p2=bar",
-        s"${file.getCanonicalFile}/p1=2/p2=bar"
-      ).map { p =>
-        val path = new Path(p)
-        val fs = path.getFileSystem(sqlContext.sessionState.newHadoopConf())
-        path.makeQualified(fs.getUri, fs.getWorkingDirectory).toString
+      object Error {
+        def apply(msg: String): Either[DataFrame, String] = Right(msg)
       }
 
-      val actualPaths = df.queryExecution.analyzed.collectFirst {
-        case LogicalRelation(relation: HadoopFsRelation, _, _) =>
-          relation.location.paths.map(_.toString).toSet
-      }.getOrElse {
-        fail("Expect an FSBasedRelation, but none could be found")
+      object Result {
+        def apply(df: DataFrame): Either[DataFrame, String] = Left(df)
       }
 
-      assert(actualPaths === expectedPaths)
-      checkAnswer(df, partitionedTestDF.collect())
+      // ---- Without base path set ----
+      // Should find all the data with partitioning columns
+      check(s"$dir", Result(partitionedTestDF))
+
+      // Should fail as globbing finds dirs without files, only subdirs in 
them.
+      check(s"$dir/*/", Error("please set \"basePath\""))
+      check(s"$dir/p1=*/", Error("please set \"basePath\""))
+
+      // Should not find partition columns as the globs resolve to p2 dirs
+      // with files in them
+      check(s"$dir/*/*", Result(partitionedTestDF.drop("p1", "p2")))
+      check(s"$dir/p1=*/p2=foo", Result(partitionedTestDF.filter("p2 = 
'foo'").drop("p1", "p2")))
+      check(s"$dir/p1=1/p2=???", Result(partitionedTestDF.filter("p1 = 
1").drop("p1", "p2")))
+
+      // Should find all data without the partitioning columns as the globs 
resolve to the files
+      check(s"$dir/*/*/*", Result(partitionedTestDF.drop("p1", "p2")))
+
+      // ---- With base path set ----
+      val resultDf = partitionedTestDF.select("a", "b", "p1", "p2")
+      check(path = s"$dir/*", Result(resultDf), basePath = Some(dir))
+      check(path = s"$dir/*/*", Result(resultDf), basePath = Some(dir))
+      check(path = s"$dir/*/*/*", Result(resultDf), basePath = Some(dir))
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to