Repository: spark Updated Branches: refs/heads/branch-2.0 36045106d -> 2c1b6b58d
[SPARK-15549][SQL] Disable bucketing when the output doesn't contain all bucketing columns ## What changes were proposed in this pull request? I create a bucketed table bucketed_table with bucket column i, ```scala case class Data(i: Int, j: Int, k: Int) sc.makeRDD(Array((1, 2, 3))).map(x => Data(x._1, x._2, x._3)).toDF.write.bucketBy(2, "i").saveAsTable("bucketed_table") ``` and I run the following SQLs: ```sql SELECT j FROM bucketed_table; Error in query: bucket column i not found in existing columns (j); SELECT j, MAX(k) FROM bucketed_table GROUP BY j; Error in query: bucket column i not found in existing columns (j, k); ``` I think we should add a check that, we only enable bucketing when it satisfies all conditions below: 1. the conf is enabled 2. the relation is bucketed 3. the output contains all bucketing columns ## How was this patch tested? Updated test cases to reflect the changes. Author: Yadong Qi <qiyadong2...@gmail.com> Closes #13321 from watermen/SPARK-15549. (cherry picked from commit b4c32c4952f7af2733258aa4e27f21e8832c8a3a) Signed-off-by: Wenchen Fan <wenc...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2c1b6b58 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2c1b6b58 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2c1b6b58 Branch: refs/heads/branch-2.0 Commit: 2c1b6b58d161dadfab1208e05d4ef549cc2e735c Parents: 3604510 Author: Yadong Qi <qiyadong2...@gmail.com> Authored: Sat May 28 10:19:29 2016 -0700 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Sat May 28 10:19:41 2016 -0700 ---------------------------------------------------------------------- .../org/apache/spark/sql/execution/ExistingRDD.scala | 13 ++++++------- .../apache/spark/sql/sources/BucketedReadSuite.scala | 11 +++++++++++ 2 files changed, 17 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/2c1b6b58/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 412f5fa..fef3255 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -347,15 +347,14 @@ private[sql] object DataSourceScanExec { case _ => None } - def toAttribute(colName: String): Attribute = output.find(_.name == colName).getOrElse { - throw new AnalysisException(s"bucket column $colName not found in existing columns " + - s"(${output.map(_.name).mkString(", ")})") - } - bucketSpec.map { spec => val numBuckets = spec.numBuckets - val bucketColumns = spec.bucketColumnNames.map(toAttribute) - HashPartitioning(bucketColumns, numBuckets) + val bucketColumns = spec.bucketColumnNames.flatMap { n => output.find(_.name == n) } + if (bucketColumns.size == spec.bucketColumnNames.size) { + HashPartitioning(bucketColumns, numBuckets) + } else { + UnknownPartitioning(0) + } }.getOrElse { UnknownPartitioning(0) } http://git-wip-us.apache.org/repos/asf/spark/blob/2c1b6b58/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index f9891ac..bab0092 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -362,4 +362,15 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet assert(error.toString contains "Invalid bucket file") } } + + test("disable bucketing when the output doesn't contain all bucketing columns") { + withTable("bucketed_table") { + df1.write.format("parquet").bucketBy(8, "i").saveAsTable("bucketed_table") + + checkAnswer(hiveContext.table("bucketed_table").select("j"), df1.select("j")) + + checkAnswer(hiveContext.table("bucketed_table").groupBy("j").agg(max("k")), + df1.groupBy("j").agg(max("k"))) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org