Repository: spark
Updated Branches:
  refs/heads/branch-1.6 c3da2bd46 -> 9fa9ad0e2


[SPARK-11678][SQL] Partition discovery should stop at the root path of the 
table.

https://issues.apache.org/jira/browse/SPARK-11678

The change of this PR is to pass root paths of table to the partition discovery 
logic. So, the process of partition discovery stops at those root paths instead 
of going all the way to the root path of the file system.

Author: Yin Huai <yh...@databricks.com>

Closes #9651 from yhuai/SPARK-11678.

(cherry picked from commit 7b5d9051cf91c099458d092a6705545899134b3b)
Signed-off-by: Cheng Lian <l...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9fa9ad0e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9fa9ad0e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9fa9ad0e

Branch: refs/heads/branch-1.6
Commit: 9fa9ad0e2301d7f066fae6ef29a9cbc099f84566
Parents: c3da2bd
Author: Yin Huai <yh...@databricks.com>
Authored: Fri Nov 13 18:36:56 2015 +0800
Committer: Cheng Lian <l...@databricks.com>
Committed: Fri Nov 13 18:45:02 2015 +0800

----------------------------------------------------------------------
 .../datasources/PartitioningUtils.scala         |  68 +++++++---
 .../datasources/json/JSONRelation.scala         |  21 +--
 .../datasources/parquet/ParquetRelation.scala   |   2 +-
 .../datasources/text/DefaultSource.scala        |   5 +-
 .../apache/spark/sql/sources/interfaces.scala   |  49 ++++++-
 .../parquet/ParquetFilterSuite.scala            |   4 +-
 .../ParquetPartitionDiscoverySuite.scala        | 132 +++++++++++++++++--
 .../apache/spark/sql/hive/orc/OrcRelation.scala |   2 +-
 .../spark/sql/sources/SimpleTextRelation.scala  |   2 +-
 .../sql/sources/hadoopFsRelationSuites.scala    |   1 +
 10 files changed, 235 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9fa9ad0e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index 86bc3a1..81962f8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -75,10 +75,11 @@ private[sql] object PartitioningUtils {
   private[sql] def parsePartitions(
       paths: Seq[Path],
       defaultPartitionName: String,
-      typeInference: Boolean): PartitionSpec = {
+      typeInference: Boolean,
+      basePaths: Set[Path]): PartitionSpec = {
     // First, we need to parse every partition's path and see if we can find 
partition values.
-    val (partitionValues, optBasePaths) = paths.map { path =>
-      parsePartition(path, defaultPartitionName, typeInference)
+    val (partitionValues, optDiscoveredBasePaths) = paths.map { path =>
+      parsePartition(path, defaultPartitionName, typeInference, basePaths)
     }.unzip
 
     // We create pairs of (path -> path's partition value) here
@@ -101,11 +102,15 @@ private[sql] object PartitioningUtils {
       // It will be recognised as conflicting directory structure:
       //   "hdfs://host:9000/invalidPath"
       //   "hdfs://host:9000/path"
-      val basePaths = optBasePaths.flatMap(x => x)
+      val disvoeredBasePaths = optDiscoveredBasePaths.flatMap(x => x)
       assert(
-        basePaths.distinct.size == 1,
+        disvoeredBasePaths.distinct.size == 1,
         "Conflicting directory structures detected. Suspicious paths:\b" +
-          basePaths.distinct.mkString("\n\t", "\n\t", "\n\n"))
+          disvoeredBasePaths.distinct.mkString("\n\t", "\n\t", "\n\n") +
+          "If provided paths are partition directories, please set " +
+          "\"basePath\" in the options of the data source to specify the " +
+          "root directory of the table. If there are multiple root 
directories, " +
+          "please load them separately and then union them.")
 
       val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues)
 
@@ -131,7 +136,7 @@ private[sql] object PartitioningUtils {
 
   /**
    * Parses a single partition, returns column names and values of each 
partition column, also
-   * the base path.  For example, given:
+   * the path when we stop partition discovery.  For example, given:
    * {{{
    *   path = hdfs://<host>:<port>/path/to/partition/a=42/b=hello/c=3.14
    * }}}
@@ -144,40 +149,63 @@ private[sql] object PartitioningUtils {
    *       Literal.create("hello", StringType),
    *       Literal.create(3.14, FloatType)))
    * }}}
-   * and the base path:
+   * and the path when we stop the discovery is:
    * {{{
-   *   /path/to/partition
+   *   hdfs://<host>:<port>/path/to/partition
    * }}}
    */
   private[sql] def parsePartition(
       path: Path,
       defaultPartitionName: String,
-      typeInference: Boolean): (Option[PartitionValues], Option[Path]) = {
+      typeInference: Boolean,
+      basePaths: Set[Path]): (Option[PartitionValues], Option[Path]) = {
     val columns = ArrayBuffer.empty[(String, Literal)]
     // Old Hadoop versions don't have `Path.isRoot`
     var finished = path.getParent == null
-    var chopped = path
-    var basePath = path
+    // currentPath is the current path that we will use to parse partition 
column value.
+    var currentPath: Path = path
 
     while (!finished) {
       // Sometimes (e.g., when speculative task is enabled), temporary 
directories may be left
-      // uncleaned.  Here we simply ignore them.
-      if (chopped.getName.toLowerCase == "_temporary") {
+      // uncleaned. Here we simply ignore them.
+      if (currentPath.getName.toLowerCase == "_temporary") {
         return (None, None)
       }
 
-      val maybeColumn = parsePartitionColumn(chopped.getName, 
defaultPartitionName, typeInference)
-      maybeColumn.foreach(columns += _)
-      basePath = chopped
-      chopped = chopped.getParent
-      finished = (maybeColumn.isEmpty && !columns.isEmpty) || 
chopped.getParent == null
+      if (basePaths.contains(currentPath)) {
+        // If the currentPath is one of base paths. We should stop.
+        finished = true
+      } else {
+        // Let's say currentPath is a path of "/table/a=1/", 
currentPath.getName will give us a=1.
+        // Once we get the string, we try to parse it and find the partition 
column and value.
+        val maybeColumn =
+          parsePartitionColumn(currentPath.getName, defaultPartitionName, 
typeInference)
+        maybeColumn.foreach(columns += _)
+
+        // Now, we determine if we should stop.
+        // When we hit any of the following cases, we will stop:
+        //  - In this iteration, we could not parse the value of partition 
column and value,
+        //    i.e. maybeColumn is None, and columns is not empty. At here we 
check if columns is
+        //    empty to handle cases like /table/a=1/_temporary/something (we 
need to find a=1 in
+        //    this case).
+        //  - After we get the new currentPath, this new currentPath represent 
the top level dir
+        //    i.e. currentPath.getParent == null. For the example of 
"/table/a=1/",
+        //    the top level dir is "/table".
+        finished =
+          (maybeColumn.isEmpty && !columns.isEmpty) || currentPath.getParent 
== null
+
+        if (!finished) {
+          // For the above example, currentPath will be "/table/".
+          currentPath = currentPath.getParent
+        }
+      }
     }
 
     if (columns.isEmpty) {
       (None, Some(path))
     } else {
       val (columnNames, values) = columns.reverse.unzip
-      (Some(PartitionValues(columnNames, values)), Some(basePath))
+      (Some(PartitionValues(columnNames, values)), Some(currentPath))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9fa9ad0e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
index 85b52f0..dca638b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
@@ -56,13 +56,14 @@ class DefaultSource extends HadoopFsRelationProvider with 
DataSourceRegister {
     val primitivesAsString = 
parameters.get("primitivesAsString").map(_.toBoolean).getOrElse(false)
 
     new JSONRelation(
-      None,
-      samplingRatio,
-      primitivesAsString,
-      dataSchema,
-      None,
-      partitionColumns,
-      paths)(sqlContext)
+      inputRDD = None,
+      samplingRatio = samplingRatio,
+      primitivesAsString = primitivesAsString,
+      maybeDataSchema = dataSchema,
+      maybePartitionSpec = None,
+      userDefinedPartitionColumns = partitionColumns,
+      paths = paths,
+      parameters = parameters)(sqlContext)
   }
 }
 
@@ -73,8 +74,10 @@ private[sql] class JSONRelation(
     val maybeDataSchema: Option[StructType],
     val maybePartitionSpec: Option[PartitionSpec],
     override val userDefinedPartitionColumns: Option[StructType],
-    override val paths: Array[String] = Array.empty[String])(@transient val 
sqlContext: SQLContext)
-  extends HadoopFsRelation(maybePartitionSpec) {
+    override val paths: Array[String] = Array.empty[String],
+    parameters: Map[String, String] = Map.empty[String, String])
+    (@transient val sqlContext: SQLContext)
+  extends HadoopFsRelation(maybePartitionSpec, parameters) {
 
   /** Constraints to be imposed on schema to be stored. */
   private def checkConstraints(schema: StructType): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/9fa9ad0e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index 21337b2..cb0aab8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -109,7 +109,7 @@ private[sql] class ParquetRelation(
     override val userDefinedPartitionColumns: Option[StructType],
     parameters: Map[String, String])(
     val sqlContext: SQLContext)
-  extends HadoopFsRelation(maybePartitionSpec)
+  extends HadoopFsRelation(maybePartitionSpec, parameters)
   with Logging {
 
   private[sql] def this(

http://git-wip-us.apache.org/repos/asf/spark/blob/9fa9ad0e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
index 4b8b8e4..fbd387b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
@@ -71,9 +71,10 @@ class DefaultSource extends HadoopFsRelationProvider with 
DataSourceRegister {
 private[sql] class TextRelation(
     val maybePartitionSpec: Option[PartitionSpec],
     override val userDefinedPartitionColumns: Option[StructType],
-    override val paths: Array[String] = Array.empty[String])
+    override val paths: Array[String] = Array.empty[String],
+    parameters: Map[String, String] = Map.empty[String, String])
     (@transient val sqlContext: SQLContext)
-  extends HadoopFsRelation(maybePartitionSpec) {
+  extends HadoopFsRelation(maybePartitionSpec, parameters) {
 
   /** Data schema is always a single column, named "text". */
   override def dataSchema: StructType = new StructType().add("value", 
StringType)

http://git-wip-us.apache.org/repos/asf/spark/blob/9fa9ad0e/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 2be6cd4..b3d3bdf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -416,12 +416,19 @@ abstract class OutputWriter {
  * @since 1.4.0
  */
 @Experimental
-abstract class HadoopFsRelation private[sql](maybePartitionSpec: 
Option[PartitionSpec])
+abstract class HadoopFsRelation private[sql](
+    maybePartitionSpec: Option[PartitionSpec],
+    parameters: Map[String, String])
   extends BaseRelation with FileRelation with Logging {
 
   override def toString: String = getClass.getSimpleName + paths.mkString("[", 
",", "]")
 
-  def this() = this(None)
+  def this() = this(None, Map.empty[String, String])
+
+  def this(parameters: Map[String, String]) = this(None, parameters)
+
+  private[sql] def this(maybePartitionSpec: Option[PartitionSpec]) =
+    this(maybePartitionSpec, Map.empty[String, String])
 
   private val hadoopConf = new 
Configuration(sqlContext.sparkContext.hadoopConfiguration)
 
@@ -519,13 +526,37 @@ abstract class HadoopFsRelation 
private[sql](maybePartitionSpec: Option[Partitio
   }
 
   /**
-   * Base paths of this relation.  For partitioned relations, it should be 
either root directories
+   * Paths of this relation.  For partitioned relations, it should be root 
directories
    * of all partition directories.
    *
    * @since 1.4.0
    */
   def paths: Array[String]
 
+  /**
+   * Contains a set of paths that are considered as the base dirs of the input 
datasets.
+   * The partitioning discovery logic will make sure it will stop when it 
reaches any
+   * base path. By default, the paths of the dataset provided by users will be 
base paths.
+   * For example, if a user uses 
`sqlContext.read.parquet("/path/something=true/")`, the base path
+   * will be `/path/something=true/`, and the returned DataFrame will not 
contain a column of
+   * `something`. If users want to override the basePath. They can set 
`basePath` in the options
+   * to pass the new base path to the data source.
+   * For the above example, if the user-provided base path is `/path/`, the 
returned
+   * DataFrame will have the column of `something`.
+   */
+  private def basePaths: Set[Path] = {
+    val userDefinedBasePath = parameters.get("basePath").map(basePath => 
Set(new Path(basePath)))
+    userDefinedBasePath.getOrElse {
+      // If the user does not provide basePath, we will just use paths.
+      val pathSet = paths.toSet
+      pathSet.map(p => new Path(p))
+    }.map { hdfsPath =>
+      // Make the path qualified (consistent with listLeafFiles and 
listLeafFilesInParallel).
+      val fs = hdfsPath.getFileSystem(hadoopConf)
+      hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
+    }
+  }
+
   override def inputFiles: Array[String] = 
cachedLeafStatuses().map(_.getPath.toString).toArray
 
   override def sizeInBytes: Long = cachedLeafStatuses().map(_.getLen).sum
@@ -559,7 +590,10 @@ abstract class HadoopFsRelation 
private[sql](maybePartitionSpec: Option[Partitio
     userDefinedPartitionColumns match {
       case Some(userProvidedSchema) if userProvidedSchema.nonEmpty =>
         val spec = PartitioningUtils.parsePartitions(
-          leafDirs, PartitioningUtils.DEFAULT_PARTITION_NAME, typeInference = 
false)
+          leafDirs,
+          PartitioningUtils.DEFAULT_PARTITION_NAME,
+          typeInference = false,
+          basePaths = basePaths)
 
         // Without auto inference, all of value in the `row` should be null or 
in StringType,
         // we need to cast into the data type that user specified.
@@ -577,8 +611,11 @@ abstract class HadoopFsRelation 
private[sql](maybePartitionSpec: Option[Partitio
 
       case _ =>
         // user did not provide a partitioning schema
-        PartitioningUtils.parsePartitions(leafDirs, 
PartitioningUtils.DEFAULT_PARTITION_NAME,
-          typeInference = 
sqlContext.conf.partitionColumnTypeInferenceEnabled())
+        PartitioningUtils.parsePartitions(
+          leafDirs,
+          PartitioningUtils.DEFAULT_PARTITION_NAME,
+          typeInference = 
sqlContext.conf.partitionColumnTypeInferenceEnabled(),
+          basePaths = basePaths)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9fa9ad0e/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 2ac87ad..458786f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -294,7 +294,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest 
with SharedSQLContex
         // If the "part = 1" filter gets pushed down, this query will throw an 
exception since
         // "part" is not a valid column in the actual Parquet file
         checkAnswer(
-          sqlContext.read.parquet(path).filter("part = 1"),
+          sqlContext.read.parquet(dir.getCanonicalPath).filter("part = 1"),
           (1 to 3).map(i => Row(i, i.toString, 1)))
       }
     }
@@ -311,7 +311,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest 
with SharedSQLContex
         // If the "part = 1" filter gets pushed down, this query will throw an 
exception since
         // "part" is not a valid column in the actual Parquet file
         checkAnswer(
-          sqlContext.read.parquet(path).filter("a > 0 and (part = 0 or a > 
1)"),
+          sqlContext.read.parquet(dir.getCanonicalPath).filter("a > 0 and 
(part = 0 or a > 1)"),
           (2 to 3).map(i => Row(i, i.toString, 1)))
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/9fa9ad0e/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index 61cc0da..71e9034 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -66,7 +66,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with 
ParquetTest with Sha
       "hdfs://host:9000/path/a=10.5/b=hello")
 
     var exception = intercept[AssertionError] {
-      parsePartitions(paths.map(new Path(_)), defaultPartitionName, true)
+      parsePartitions(paths.map(new Path(_)), defaultPartitionName, true, 
Set.empty[Path])
     }
     assert(exception.getMessage().contains("Conflicting directory structures 
detected"))
 
@@ -76,7 +76,37 @@ class ParquetPartitionDiscoverySuite extends QueryTest with 
ParquetTest with Sha
       "hdfs://host:9000/path/a=10/b=20",
       "hdfs://host:9000/path/_temporary/path")
 
-    parsePartitions(paths.map(new Path(_)), defaultPartitionName, true)
+    parsePartitions(
+      paths.map(new Path(_)),
+      defaultPartitionName,
+      true,
+      Set(new Path("hdfs://host:9000/path/")))
+
+    // Valid
+    paths = Seq(
+      "hdfs://host:9000/path/something=true/table/",
+      "hdfs://host:9000/path/something=true/table/_temporary",
+      "hdfs://host:9000/path/something=true/table/a=10/b=20",
+      "hdfs://host:9000/path/something=true/table/_temporary/path")
+
+    parsePartitions(
+      paths.map(new Path(_)),
+      defaultPartitionName,
+      true,
+      Set(new Path("hdfs://host:9000/path/something=true/table")))
+
+    // Valid
+    paths = Seq(
+      "hdfs://host:9000/path/table=true/",
+      "hdfs://host:9000/path/table=true/_temporary",
+      "hdfs://host:9000/path/table=true/a=10/b=20",
+      "hdfs://host:9000/path/table=true/_temporary/path")
+
+    parsePartitions(
+      paths.map(new Path(_)),
+      defaultPartitionName,
+      true,
+      Set(new Path("hdfs://host:9000/path/table=true")))
 
     // Invalid
     paths = Seq(
@@ -85,7 +115,11 @@ class ParquetPartitionDiscoverySuite extends QueryTest with 
ParquetTest with Sha
       "hdfs://host:9000/path/path1")
 
     exception = intercept[AssertionError] {
-      parsePartitions(paths.map(new Path(_)), defaultPartitionName, true)
+      parsePartitions(
+        paths.map(new Path(_)),
+        defaultPartitionName,
+        true,
+        Set(new Path("hdfs://host:9000/path/")))
     }
     assert(exception.getMessage().contains("Conflicting directory structures 
detected"))
 
@@ -101,19 +135,24 @@ class ParquetPartitionDiscoverySuite extends QueryTest 
with ParquetTest with Sha
       "hdfs://host:9000/tmp/tables/nonPartitionedTable2")
 
     exception = intercept[AssertionError] {
-      parsePartitions(paths.map(new Path(_)), defaultPartitionName, true)
+      parsePartitions(
+        paths.map(new Path(_)),
+        defaultPartitionName,
+        true,
+        Set(new Path("hdfs://host:9000/tmp/tables/")))
     }
     assert(exception.getMessage().contains("Conflicting directory structures 
detected"))
   }
 
   test("parse partition") {
     def check(path: String, expected: Option[PartitionValues]): Unit = {
-      assert(expected === parsePartition(new Path(path), defaultPartitionName, 
true)._1)
+      val actual = parsePartition(new Path(path), defaultPartitionName, true, 
Set.empty[Path])._1
+      assert(expected === actual)
     }
 
     def checkThrows[T <: Throwable: Manifest](path: String, expected: String): 
Unit = {
       val message = intercept[T] {
-        parsePartition(new Path(path), defaultPartitionName, true)
+        parsePartition(new Path(path), defaultPartitionName, true, 
Set.empty[Path])
       }.getMessage
 
       assert(message.contains(expected))
@@ -152,8 +191,17 @@ class ParquetPartitionDiscoverySuite extends QueryTest 
with ParquetTest with Sha
   }
 
   test("parse partitions") {
-    def check(paths: Seq[String], spec: PartitionSpec): Unit = {
-      assert(parsePartitions(paths.map(new Path(_)), defaultPartitionName, 
true) === spec)
+    def check(
+        paths: Seq[String],
+        spec: PartitionSpec,
+        rootPaths: Set[Path] = Set.empty[Path]): Unit = {
+      val actualSpec =
+        parsePartitions(
+          paths.map(new Path(_)),
+          defaultPartitionName,
+          true,
+          rootPaths)
+      assert(actualSpec === spec)
     }
 
     check(Seq(
@@ -232,7 +280,9 @@ class ParquetPartitionDiscoverySuite extends QueryTest with 
ParquetTest with Sha
 
   test("parse partitions with type inference disabled") {
     def check(paths: Seq[String], spec: PartitionSpec): Unit = {
-      assert(parsePartitions(paths.map(new Path(_)), defaultPartitionName, 
false) === spec)
+      val actualSpec =
+        parsePartitions(paths.map(new Path(_)), defaultPartitionName, false, 
Set.empty[Path])
+      assert(actualSpec === spec)
     }
 
     check(Seq(
@@ -590,6 +640,70 @@ class ParquetPartitionDiscoverySuite extends QueryTest 
with ParquetTest with Sha
     }
   }
 
+  test("SPARK-11678: Partition discovery stops at the root path of the 
dataset") {
+    withTempPath { dir =>
+      val tablePath = new File(dir, "key=value")
+      val df = (1 to 3).map(i => (i, i, i, i)).toDF("a", "b", "c", "d")
+
+      df.write
+        .format("parquet")
+        .partitionBy("b", "c", "d")
+        .save(tablePath.getCanonicalPath)
+
+      Files.touch(new File(s"${tablePath.getCanonicalPath}/", "_SUCCESS"))
+      Files.createParentDirs(new 
File(s"${dir.getCanonicalPath}/b=1/c=1/.foo/bar"))
+
+      
checkAnswer(sqlContext.read.format("parquet").load(tablePath.getCanonicalPath), 
df)
+    }
+
+    withTempPath { dir =>
+      val path = new File(dir, "key=value")
+      val tablePath = new File(path, "table")
+
+      val df = (1 to 3).map(i => (i, i, i, i)).toDF("a", "b", "c", "d")
+
+      df.write
+        .format("parquet")
+        .partitionBy("b", "c", "d")
+        .save(tablePath.getCanonicalPath)
+
+      Files.touch(new File(s"${tablePath.getCanonicalPath}/", "_SUCCESS"))
+      Files.createParentDirs(new 
File(s"${dir.getCanonicalPath}/b=1/c=1/.foo/bar"))
+
+      
checkAnswer(sqlContext.read.format("parquet").load(tablePath.getCanonicalPath), 
df)
+    }
+  }
+
+  test("use basePath to specify the root dir of a partitioned table.") {
+    withTempPath { dir =>
+      val tablePath = new File(dir, "table")
+      val df = (1 to 3).map(i => (i, i, i, i)).toDF("a", "b", "c", "d")
+
+      df.write
+        .format("parquet")
+        .partitionBy("b", "c", "d")
+        .save(tablePath.getCanonicalPath)
+
+      val twoPartitionsDF =
+        sqlContext
+          .read
+          .option("basePath", tablePath.getCanonicalPath)
+          .parquet(
+            s"${tablePath.getCanonicalPath}/b=1",
+            s"${tablePath.getCanonicalPath}/b=2")
+
+      checkAnswer(twoPartitionsDF, df.filter("b != 3"))
+
+      intercept[AssertionError] {
+        sqlContext
+          .read
+          .parquet(
+            s"${tablePath.getCanonicalPath}/b=1",
+            s"${tablePath.getCanonicalPath}/b=2")
+      }
+    }
+  }
+
   test("listConflictingPartitionColumns") {
     def makeExpectedMessage(colNameLists: Seq[String], paths: Seq[String]): 
String = {
       val conflictingColNameLists = colNameLists.zipWithIndex.map { case 
(list, index) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/9fa9ad0e/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index 45de567..1136670 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -157,7 +157,7 @@ private[sql] class OrcRelation(
     override val userDefinedPartitionColumns: Option[StructType],
     parameters: Map[String, String])(
     @transient val sqlContext: SQLContext)
-  extends HadoopFsRelation(maybePartitionSpec)
+  extends HadoopFsRelation(maybePartitionSpec, parameters)
   with Logging {
 
   private[sql] def this(

http://git-wip-us.apache.org/repos/asf/spark/blob/9fa9ad0e/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
index bdc48a3..01960fd 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
@@ -89,7 +89,7 @@ class SimpleTextRelation(
     override val userDefinedPartitionColumns: Option[StructType],
     parameters: Map[String, String])(
     @transient val sqlContext: SQLContext)
-  extends HadoopFsRelation {
+  extends HadoopFsRelation(parameters) {
 
   import sqlContext.sparkContext
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9fa9ad0e/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index 100b971..665e87e 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -486,6 +486,7 @@ abstract class HadoopFsRelationTest extends QueryTest with 
SQLTestUtils with Tes
       val df = sqlContext.read
         .format(dataSourceName)
         .option("dataSchema", dataSchema.json)
+        .option("basePath", file.getCanonicalPath)
         .load(s"${file.getCanonicalPath}/p1=*/p2=???")
 
       val expectedPaths = Set(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to