Repository: spark
Updated Branches:
  refs/heads/master 00026fa99 -> 9348431da


[SPARK-12975][SQL] Throwing Exception when Bucketing Columns are part of 
Partitioning Columns

When users are using `partitionBy` and `bucketBy` at the same time, some 
bucketing columns might be part of partitioning columns. For example,
```
        df.write
          .format(source)
          .partitionBy("i")
          .bucketBy(8, "i", "k")
          .saveAsTable("bucketed_table")
```
However, in the above case, adding column `i` into `bucketBy` is useless. It is 
just wasting extra CPU when reading or writing bucket tables. Thus, like Hive, 
we can issue an exception and let users do the change.

Also added a test case for checking if the information of `sortBy` and 
`bucketBy` columns are correctly saved in the metastore table.

Could you check if my understanding is correct? cloud-fan rxin marmbrus Thanks!

Author: gatorsmile <gatorsm...@gmail.com>

Closes #10891 from gatorsmile/commonKeysInPartitionByBucketBy.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9348431d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9348431d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9348431d

Branch: refs/heads/master
Commit: 9348431da212ec3ab7be2b8e89a952a48b4e2a31
Parents: 00026fa
Author: gatorsmile <gatorsm...@gmail.com>
Authored: Mon Jan 25 13:38:09 2016 -0800
Committer: Michael Armbrust <mich...@databricks.com>
Committed: Mon Jan 25 13:38:09 2016 -0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/DataFrameWriter.scala  |  9 ++++
 .../sql/hive/MetastoreDataSourcesSuite.scala    | 55 +++++++++++++++++++-
 .../spark/sql/sources/BucketedWriteSuite.scala  | 22 +++++++-
 3 files changed, 83 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9348431d/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index ab63fe4..12eb239 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -240,6 +240,15 @@ final class DataFrameWriter private[sql](df: DataFrame) {
       n <- numBuckets
     } yield {
       require(n > 0 && n < 100000, "Bucket number must be greater than 0 and 
less than 100000.")
+
+      // partitionBy columns cannot be used in bucketBy
+      if (normalizedParCols.nonEmpty &&
+        
normalizedBucketColNames.get.toSet.intersect(normalizedParCols.get.toSet).nonEmpty)
 {
+          throw new AnalysisException(
+            s"bucketBy columns '${bucketColumnNames.get.mkString(", ")}' 
should not be part of " +
+            s"partitionBy columns '${partitioningColumns.get.mkString(", ")}'")
+      }
+
       BucketSpec(n, normalizedBucketColNames.get, 
normalizedSortColNames.getOrElse(Nil))
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/9348431d/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 253f13c..211932f 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -745,7 +745,7 @@ class MetastoreDataSourcesSuite extends QueryTest with 
SQLTestUtils with TestHiv
     }
   }
 
-  test("Saving partition columns information") {
+  test("Saving partitionBy columns information") {
     val df = (1 to 10).map(i => (i, i + 1, s"str$i", s"str${i + 
1}")).toDF("a", "b", "c", "d")
     val tableName = s"partitionInfo_${System.currentTimeMillis()}"
 
@@ -776,6 +776,59 @@ class MetastoreDataSourcesSuite extends QueryTest with 
SQLTestUtils with TestHiv
     }
   }
 
+  test("Saving information for sortBy and bucketBy columns") {
+    val df = (1 to 10).map(i => (i, i + 1, s"str$i", s"str${i + 
1}")).toDF("a", "b", "c", "d")
+    val tableName = s"bucketingInfo_${System.currentTimeMillis()}"
+
+    withTable(tableName) {
+      df.write
+        .format("parquet")
+        .bucketBy(8, "d", "b")
+        .sortBy("c")
+        .saveAsTable(tableName)
+      invalidateTable(tableName)
+      val metastoreTable = catalog.client.getTable("default", tableName)
+      val expectedBucketByColumns = StructType(df.schema("d") :: 
df.schema("b") :: Nil)
+      val expectedSortByColumns = StructType(df.schema("c") :: Nil)
+
+      val numBuckets = 
metastoreTable.properties("spark.sql.sources.schema.numBuckets").toInt
+      assert(numBuckets == 8)
+
+      val numBucketCols = 
metastoreTable.properties("spark.sql.sources.schema.numBucketCols").toInt
+      assert(numBucketCols == 2)
+
+      val numSortCols = 
metastoreTable.properties("spark.sql.sources.schema.numSortCols").toInt
+      assert(numSortCols == 1)
+
+      val actualBucketByColumns =
+        StructType(
+          (0 until numBucketCols).map { index =>
+            
df.schema(metastoreTable.properties(s"spark.sql.sources.schema.bucketCol.$index"))
+          })
+      // Make sure bucketBy columns are correctly stored in metastore.
+      assert(
+        expectedBucketByColumns.sameType(actualBucketByColumns),
+        s"Partitions columns stored in metastore $actualBucketByColumns is not 
the " +
+          s"partition columns defined by the saveAsTable operation 
$expectedBucketByColumns.")
+
+      val actualSortByColumns =
+        StructType(
+          (0 until numSortCols).map { index =>
+            
df.schema(metastoreTable.properties(s"spark.sql.sources.schema.sortCol.$index"))
+          })
+      // Make sure sortBy columns are correctly stored in metastore.
+      assert(
+        expectedSortByColumns.sameType(actualSortByColumns),
+        s"Partitions columns stored in metastore $actualSortByColumns is not 
the " +
+          s"partition columns defined by the saveAsTable operation 
$expectedSortByColumns.")
+
+      // Check the content of the saved table.
+      checkAnswer(
+        table(tableName).select("c", "b", "d", "a"),
+        df.select("c", "b", "d", "a"))
+    }
+  }
+
   test("insert into a table") {
     def createDF(from: Int, to: Int): DataFrame = {
       (from to to).map(i => i -> s"str$i").toDF("c1", "c2")

http://git-wip-us.apache.org/repos/asf/spark/blob/9348431d/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
index 59b74d2..a32f8fb 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
@@ -92,10 +92,13 @@ class BucketedWriteSuite extends QueryTest with 
SQLTestUtils with TestHiveSingle
         fail(s"Unable to find the related bucket files.")
       }
 
+      // Remove the duplicate columns in bucketCols and sortCols;
+      // Otherwise, we got analysis errors due to duplicate names
+      val selectedColumns = (bucketCols ++ sortCols).distinct
       // We may lose the type information after write(e.g. json format doesn't 
keep schema
       // information), here we get the types from the original dataframe.
-      val types = df.select((bucketCols ++ sortCols).map(col): 
_*).schema.map(_.dataType)
-      val columns = (bucketCols ++ sortCols).zip(types).map {
+      val types = df.select(selectedColumns.map(col): 
_*).schema.map(_.dataType)
+      val columns = selectedColumns.zip(types).map {
         case (colName, dt) => col(colName).cast(dt)
       }
 
@@ -158,6 +161,21 @@ class BucketedWriteSuite extends QueryTest with 
SQLTestUtils with TestHiveSingle
     }
   }
 
+  test("write bucketed data with the overlapping bucketBy and partitionBy 
columns") {
+    intercept[AnalysisException](df.write
+      .partitionBy("i", "j")
+      .bucketBy(8, "j", "k")
+      .sortBy("k")
+      .saveAsTable("bucketed_table"))
+  }
+
+  test("write bucketed data with the identical bucketBy and partitionBy 
columns") {
+    intercept[AnalysisException](df.write
+      .partitionBy("i")
+      .bucketBy(8, "i")
+      .saveAsTable("bucketed_table"))
+  }
+
   test("write bucketed data without partitionBy") {
     for (source <- Seq("parquet", "json", "orc")) {
       withTable("bucketed_table") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to