Repository: spark
Updated Branches:
  refs/heads/master 57446eb69 -> d6035d97c


[SPARK-10304] [SQL] Partition discovery should throw an exception if the dir 
structure is invalid

JIRA: https://issues.apache.org/jira/browse/SPARK-10304

This patch detects if the structure of partition directories is not valid.

The test cases are from #8547. Thanks zhzhan.

cc liancheng

Author: Liang-Chi Hsieh <vii...@appier.com>

Closes #8840 from viirya/detect_invalid_part_dir.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d6035d97
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d6035d97
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d6035d97

Branch: refs/heads/master
Commit: d6035d97c91fe78b1336ade48134252915263ea6
Parents: 57446eb
Author: Liang-Chi Hsieh <vii...@appier.com>
Authored: Tue Nov 3 07:41:50 2015 -0800
Committer: Davies Liu <davies....@gmail.com>
Committed: Tue Nov 3 07:41:50 2015 -0800

----------------------------------------------------------------------
 .../datasources/PartitioningUtils.scala         | 36 ++++++++++++++------
 .../ParquetPartitionDiscoverySuite.scala        | 36 ++++++++++++++++++--
 2 files changed, 59 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d6035d97/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index 0a2007e..628c5e1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -77,9 +77,11 @@ private[sql] object PartitioningUtils {
       defaultPartitionName: String,
       typeInference: Boolean): PartitionSpec = {
     // First, we need to parse every partition's path and see if we can find 
partition values.
-    val pathsWithPartitionValues = paths.flatMap { path =>
-      parsePartition(path, defaultPartitionName, typeInference).map(path -> _)
-    }
+    val (partitionValues, optBasePaths) = paths.map { path =>
+      parsePartition(path, defaultPartitionName, typeInference)
+    }.unzip
+
+    val pathsWithPartitionValues = paths.zip(partitionValues).flatMap(x => 
x._2.map(x._1 -> _))
 
     if (pathsWithPartitionValues.isEmpty) {
       // This dataset is not partitioned.
@@ -87,6 +89,12 @@ private[sql] object PartitioningUtils {
     } else {
       // This dataset is partitioned. We need to check whether all partitions 
have the same
       // partition columns and resolve potential type conflicts.
+      val basePaths = optBasePaths.flatMap(x => x)
+      assert(
+        basePaths.distinct.size == 1,
+        "Conflicting directory structures detected. Suspicious paths:\b" +
+          basePaths.mkString("\n\t", "\n\t", "\n\n"))
+
       val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues)
 
       // Creates the StructType which represents the partition columns.
@@ -110,12 +118,12 @@ private[sql] object PartitioningUtils {
   }
 
   /**
-   * Parses a single partition, returns column names and values of each 
partition column.  For
-   * example, given:
+   * Parses a single partition, returns column names and values of each 
partition column, also
+   * the base path.  For example, given:
    * {{{
    *   path = hdfs://<host>:<port>/path/to/partition/a=42/b=hello/c=3.14
    * }}}
-   * it returns:
+   * it returns the partition:
    * {{{
    *   PartitionValues(
    *     Seq("a", "b", "c"),
@@ -124,34 +132,40 @@ private[sql] object PartitioningUtils {
    *       Literal.create("hello", StringType),
    *       Literal.create(3.14, FloatType)))
    * }}}
+   * and the base path:
+   * {{{
+   *   /path/to/partition
+   * }}}
    */
   private[sql] def parsePartition(
       path: Path,
       defaultPartitionName: String,
-      typeInference: Boolean): Option[PartitionValues] = {
+      typeInference: Boolean): (Option[PartitionValues], Option[Path]) = {
     val columns = ArrayBuffer.empty[(String, Literal)]
     // Old Hadoop versions don't have `Path.isRoot`
     var finished = path.getParent == null
     var chopped = path
+    var basePath = path
 
     while (!finished) {
       // Sometimes (e.g., when speculative task is enabled), temporary 
directories may be left
       // uncleaned.  Here we simply ignore them.
       if (chopped.getName.toLowerCase == "_temporary") {
-        return None
+        return (None, None)
       }
 
       val maybeColumn = parsePartitionColumn(chopped.getName, 
defaultPartitionName, typeInference)
       maybeColumn.foreach(columns += _)
+      basePath = chopped
       chopped = chopped.getParent
-      finished = maybeColumn.isEmpty || chopped.getParent == null
+      finished = (maybeColumn.isEmpty && !columns.isEmpty) || 
chopped.getParent == null
     }
 
     if (columns.isEmpty) {
-      None
+      (None, Some(path))
     } else {
       val (columnNames, values) = columns.reverse.unzip
-      Some(PartitionValues(columnNames, values))
+      (Some(PartitionValues(columnNames, values)), Some(basePath))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d6035d97/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index 3a23b8e..67b6a37 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -58,14 +58,46 @@ class ParquetPartitionDiscoverySuite extends QueryTest with 
ParquetTest with Sha
     check(defaultPartitionName, Literal.create(null, NullType))
   }
 
+  test("parse invalid partitioned directories") {
+    // Invalid
+    var paths = Seq(
+      "hdfs://host:9000/invalidPath",
+      "hdfs://host:9000/path/a=10/b=20",
+      "hdfs://host:9000/path/a=10.5/b=hello")
+
+    var exception = intercept[AssertionError] {
+      parsePartitions(paths.map(new Path(_)), defaultPartitionName, true)
+    }
+    assert(exception.getMessage().contains("Conflicting directory structures 
detected"))
+
+    // Valid
+    paths = Seq(
+      "hdfs://host:9000/path/_temporary",
+      "hdfs://host:9000/path/a=10/b=20",
+      "hdfs://host:9000/path/_temporary/path")
+
+    parsePartitions(paths.map(new Path(_)), defaultPartitionName, true)
+
+    // Invalid
+    paths = Seq(
+      "hdfs://host:9000/path/_temporary",
+      "hdfs://host:9000/path/a=10/b=20",
+      "hdfs://host:9000/path/path1")
+
+    exception = intercept[AssertionError] {
+      parsePartitions(paths.map(new Path(_)), defaultPartitionName, true)
+    }
+    assert(exception.getMessage().contains("Conflicting directory structures 
detected"))
+  }
+
   test("parse partition") {
     def check(path: String, expected: Option[PartitionValues]): Unit = {
-      assert(expected === parsePartition(new Path(path), defaultPartitionName, 
true))
+      assert(expected === parsePartition(new Path(path), defaultPartitionName, 
true)._1)
     }
 
     def checkThrows[T <: Throwable: Manifest](path: String, expected: String): 
Unit = {
       val message = intercept[T] {
-        parsePartition(new Path(path), defaultPartitionName, true).get
+        parsePartition(new Path(path), defaultPartitionName, true)
       }.getMessage
 
       assert(message.contains(expected))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to