Repository: spark
Updated Branches:
  refs/heads/master f1b220eee -> b4c32c495


[SPARK-15549][SQL] Disable bucketing when the output doesn't contain all 
bucketing columns

## What changes were proposed in this pull request?
I create a bucketed table bucketed_table with bucket column i,
```scala
case class Data(i: Int, j: Int, k: Int)
sc.makeRDD(Array((1, 2, 3))).map(x => Data(x._1, x._2, 
x._3)).toDF.write.bucketBy(2, "i").saveAsTable("bucketed_table")
```

and I run the following SQLs:
```sql
SELECT j FROM bucketed_table;
Error in query: bucket column i not found in existing columns (j);

SELECT j, MAX(k) FROM bucketed_table GROUP BY j;
Error in query: bucket column i not found in existing columns (j, k);
```

I think we should add a check that, we only enable bucketing when it satisfies 
all conditions below:
1. the conf is enabled
2. the relation is bucketed
3. the output contains all bucketing columns

## How was this patch tested?
Updated test cases to reflect the changes.

Author: Yadong Qi <qiyadong2...@gmail.com>

Closes #13321 from watermen/SPARK-15549.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b4c32c49
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b4c32c49
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b4c32c49

Branch: refs/heads/master
Commit: b4c32c4952f7af2733258aa4e27f21e8832c8a3a
Parents: f1b220e
Author: Yadong Qi <qiyadong2...@gmail.com>
Authored: Sat May 28 10:19:29 2016 -0700
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Sat May 28 10:19:29 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/execution/ExistingRDD.scala   | 13 ++++++-------
 .../apache/spark/sql/sources/BucketedReadSuite.scala   | 11 +++++++++++
 2 files changed, 17 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b4c32c49/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index 412f5fa..fef3255 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -347,15 +347,14 @@ private[sql] object DataSourceScanExec {
         case _ => None
       }
 
-      def toAttribute(colName: String): Attribute = output.find(_.name == 
colName).getOrElse {
-        throw new AnalysisException(s"bucket column $colName not found in 
existing columns " +
-          s"(${output.map(_.name).mkString(", ")})")
-      }
-
       bucketSpec.map { spec =>
         val numBuckets = spec.numBuckets
-        val bucketColumns = spec.bucketColumnNames.map(toAttribute)
-        HashPartitioning(bucketColumns, numBuckets)
+        val bucketColumns = spec.bucketColumnNames.flatMap { n => 
output.find(_.name == n) }
+        if (bucketColumns.size == spec.bucketColumnNames.size) {
+          HashPartitioning(bucketColumns, numBuckets)
+        } else {
+          UnknownPartitioning(0)
+        }
       }.getOrElse {
         UnknownPartitioning(0)
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/b4c32c49/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
index f9891ac..bab0092 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
@@ -362,4 +362,15 @@ class BucketedReadSuite extends QueryTest with 
SQLTestUtils with TestHiveSinglet
       assert(error.toString contains "Invalid bucket file")
     }
   }
+
+  test("disable bucketing when the output doesn't contain all bucketing 
columns") {
+    withTable("bucketed_table") {
+      df1.write.format("parquet").bucketBy(8, 
"i").saveAsTable("bucketed_table")
+
+      checkAnswer(hiveContext.table("bucketed_table").select("j"), 
df1.select("j"))
+
+      
checkAnswer(hiveContext.table("bucketed_table").groupBy("j").agg(max("k")),
+        df1.groupBy("j").agg(max("k")))
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to