Repository: spark
Updated Branches:
  refs/heads/master 987df4bfc -> de289bf27


[SPARK-10304][SQL] Following up checking valid dir structure for partition 
discovery

This patch follows up #8840.

Author: Liang-Chi Hsieh <vii...@appier.com>

Closes #9459 from viirya/detect_invalid_part_dir_following.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/de289bf2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/de289bf2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/de289bf2

Branch: refs/heads/master
Commit: de289bf279e14e47859b5fbcd70e97b9d0759f14
Parents: 987df4b
Author: Liang-Chi Hsieh <vii...@appier.com>
Authored: Wed Nov 4 10:56:32 2015 -0800
Committer: Yin Huai <yh...@databricks.com>
Committed: Wed Nov 4 10:56:32 2015 -0800

----------------------------------------------------------------------
 .../execution/datasources/PartitioningUtils.scala   | 14 +++++++++++++-
 .../parquet/ParquetPartitionDiscoverySuite.scala    | 16 ++++++++++++++++
 2 files changed, 29 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/de289bf2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index 16dc236..86bc3a1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -81,6 +81,8 @@ private[sql] object PartitioningUtils {
       parsePartition(path, defaultPartitionName, typeInference)
     }.unzip
 
+    // We create pairs of (path -> path's partition value) here
+    // If the corresponding partition value is None, the pair will be skiped
     val pathsWithPartitionValues = paths.zip(partitionValues).flatMap(x => 
x._2.map(x._1 -> _))
 
     if (pathsWithPartitionValues.isEmpty) {
@@ -89,11 +91,21 @@ private[sql] object PartitioningUtils {
     } else {
       // This dataset is partitioned. We need to check whether all partitions 
have the same
       // partition columns and resolve potential type conflicts.
+
+      // Check if there is conflicting directory structure.
+      // For the paths such as:
+      // var paths = Seq(
+      //   "hdfs://host:9000/invalidPath",
+      //   "hdfs://host:9000/path/a=10/b=20",
+      //   "hdfs://host:9000/path/a=10.5/b=hello")
+      // It will be recognised as conflicting directory structure:
+      //   "hdfs://host:9000/invalidPath"
+      //   "hdfs://host:9000/path"
       val basePaths = optBasePaths.flatMap(x => x)
       assert(
         basePaths.distinct.size == 1,
         "Conflicting directory structures detected. Suspicious paths:\b" +
-          basePaths.mkString("\n\t", "\n\t", "\n\n"))
+          basePaths.distinct.mkString("\n\t", "\n\t", "\n\n"))
 
       val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/de289bf2/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index 67b6a37..61cc0da 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -88,6 +88,22 @@ class ParquetPartitionDiscoverySuite extends QueryTest with 
ParquetTest with Sha
       parsePartitions(paths.map(new Path(_)), defaultPartitionName, true)
     }
     assert(exception.getMessage().contains("Conflicting directory structures 
detected"))
+
+    // Invalid
+    // Conflicting directory structure:
+    // "hdfs://host:9000/tmp/tables/partitionedTable"
+    // "hdfs://host:9000/tmp/tables/nonPartitionedTable1"
+    // "hdfs://host:9000/tmp/tables/nonPartitionedTable2"
+    paths = Seq(
+      "hdfs://host:9000/tmp/tables/partitionedTable",
+      "hdfs://host:9000/tmp/tables/partitionedTable/p=1/",
+      "hdfs://host:9000/tmp/tables/nonPartitionedTable1",
+      "hdfs://host:9000/tmp/tables/nonPartitionedTable2")
+
+    exception = intercept[AssertionError] {
+      parsePartitions(paths.map(new Path(_)), defaultPartitionName, true)
+    }
+    assert(exception.getMessage().contains("Conflicting directory structures 
detected"))
   }
 
   test("parse partition") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to