Repository: spark
Updated Branches:
  refs/heads/master d27d362eb -> 64529b186


[SPARK-16691][SQL] move BucketSpec to catalyst module and use it in CatalogTable

## What changes were proposed in this pull request?

It's weird that we have `BucketSpec` to abstract bucket info, but don't use it 
in `CatalogTable`. This PR moves `BucketSpec` into catalyst module.

## How was this patch tested?

existing tests.

Author: Wenchen Fan <wenc...@databricks.com>

Closes #14331 from cloud-fan/check.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/64529b18
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/64529b18
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/64529b18

Branch: refs/heads/master
Commit: 64529b186a1c33740067cc7639d630bc5b9ae6e8
Parents: d27d362
Author: Wenchen Fan <wenc...@databricks.com>
Authored: Mon Jul 25 22:05:48 2016 +0800
Committer: Cheng Lian <l...@databricks.com>
Committed: Mon Jul 25 22:05:48 2016 +0800

----------------------------------------------------------------------
 .../spark/sql/catalyst/catalog/interface.scala  | 49 ++++++++++++----
 .../catalyst/catalog/ExternalCatalogSuite.scala |  2 +-
 .../org/apache/spark/sql/DataFrameWriter.scala  |  5 +-
 .../spark/sql/execution/command/ddl.scala       |  3 +-
 .../spark/sql/execution/command/tables.scala    | 30 +++++-----
 .../execution/datasources/BucketingUtils.scala  | 39 +++++++++++++
 .../sql/execution/datasources/DataSource.scala  |  1 +
 .../datasources/FileSourceStrategy.scala        |  1 +
 .../InsertIntoHadoopFsRelationCommand.scala     |  2 +-
 .../execution/datasources/WriterContainer.scala |  1 +
 .../sql/execution/datasources/bucket.scala      | 59 --------------------
 .../spark/sql/execution/datasources/ddl.scala   |  1 +
 .../datasources/fileSourceInterfaces.scala      |  2 +-
 .../apache/spark/sql/internal/CatalogImpl.scala |  2 +-
 .../sql/execution/command/DDLCommandSuite.scala |  6 +-
 .../spark/sql/execution/command/DDLSuite.scala  |  3 +-
 .../datasources/FileSourceStrategySuite.scala   |  1 +
 .../spark/sql/internal/CatalogSuite.scala       |  5 +-
 .../sql/sources/CreateTableAsSelectSuite.scala  |  2 +-
 .../spark/sql/hive/client/HiveClientImpl.scala  |  9 +--
 .../spark/sql/hive/HiveDDLCommandSuite.scala    |  8 +--
 .../spark/sql/sources/BucketedReadSuite.scala   |  3 +-
 22 files changed, 117 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/64529b18/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 2a20651..710bce5 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.{FunctionIdentifier, 
TableIdentifier}
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
+import org.apache.spark.sql.catalyst.util.quoteIdentifier
 
 
 /**
@@ -110,6 +111,24 @@ case class CatalogTablePartition(
 
 
 /**
+ * A container for bucketing information.
+ * Bucketing is a technology for decomposing data sets into more manageable 
parts, and the number
+ * of buckets is fixed so it does not fluctuate with data.
+ *
+ * @param numBuckets number of buckets.
+ * @param bucketColumnNames the names of the columns that used to generate the 
bucket id.
+ * @param sortColumnNames the names of the columns that used to sort data in 
each bucket.
+ */
+case class BucketSpec(
+    numBuckets: Int,
+    bucketColumnNames: Seq[String],
+    sortColumnNames: Seq[String]) {
+  if (numBuckets <= 0) {
+    throw new AnalysisException(s"Expected positive number of buckets, but got 
`$numBuckets`.")
+  }
+}
+
+/**
  * A table defined in the catalog.
  *
  * Note that Hive's metastore also tracks skewed columns. We should consider 
adding that in the
@@ -124,9 +143,7 @@ case class CatalogTable(
     storage: CatalogStorageFormat,
     schema: Seq[CatalogColumn],
     partitionColumnNames: Seq[String] = Seq.empty,
-    sortColumnNames: Seq[String] = Seq.empty,
-    bucketColumnNames: Seq[String] = Seq.empty,
-    numBuckets: Int = -1,
+    bucketSpec: Option[BucketSpec] = None,
     owner: String = "",
     createTime: Long = System.currentTimeMillis,
     lastAccessTime: Long = -1,
@@ -143,8 +160,8 @@ case class CatalogTable(
       s"must be a subset of schema (${colNames.mkString(", ")}) in table 
'$identifier'")
   }
   requireSubsetOfSchema(partitionColumnNames, "partition")
-  requireSubsetOfSchema(sortColumnNames, "sort")
-  requireSubsetOfSchema(bucketColumnNames, "bucket")
+  requireSubsetOfSchema(bucketSpec.map(_.sortColumnNames).getOrElse(Nil), 
"sort")
+  requireSubsetOfSchema(bucketSpec.map(_.bucketColumnNames).getOrElse(Nil), 
"bucket")
 
   /** Columns this table is partitioned by. */
   def partitionColumns: Seq[CatalogColumn] =
@@ -172,9 +189,19 @@ case class CatalogTable(
 
   override def toString: String = {
     val tableProperties = properties.map(p => p._1 + "=" + p._2).mkString("[", 
", ", "]")
-    val partitionColumns = partitionColumnNames.map("`" + _ + 
"`").mkString("[", ", ", "]")
-    val sortColumns = sortColumnNames.map("`" + _ + "`").mkString("[", ", ", 
"]")
-    val bucketColumns = bucketColumnNames.map("`" + _ + "`").mkString("[", ", 
", "]")
+    val partitionColumns = 
partitionColumnNames.map(quoteIdentifier).mkString("[", ", ", "]")
+    val bucketStrings = bucketSpec match {
+      case Some(BucketSpec(numBuckets, bucketColumnNames, sortColumnNames)) =>
+        val bucketColumnsString = 
bucketColumnNames.map(quoteIdentifier).mkString("[", ", ", "]")
+        val sortColumnsString = 
sortColumnNames.map(quoteIdentifier).mkString("[", ", ", "]")
+        Seq(
+          s"Num Buckets: $numBuckets",
+          if (bucketColumnNames.nonEmpty) s"Bucket Columns: 
$bucketColumnsString" else "",
+          if (sortColumnNames.nonEmpty) s"Sort Columns: $sortColumnsString" 
else ""
+        )
+
+      case _ => Nil
+    }
 
     val output =
       Seq(s"Table: ${identifier.quotedString}",
@@ -183,10 +210,8 @@ case class CatalogTable(
         s"Last Access: ${new Date(lastAccessTime).toString}",
         s"Type: ${tableType.name}",
         if (schema.nonEmpty) s"Schema: ${schema.mkString("[", ", ", "]")}" 
else "",
-        if (partitionColumnNames.nonEmpty) s"Partition Columns: 
$partitionColumns" else "",
-        if (numBuckets != -1) s"Num Buckets: $numBuckets" else "",
-        if (bucketColumnNames.nonEmpty) s"Bucket Columns: $bucketColumns" else 
"",
-        if (sortColumnNames.nonEmpty) s"Sort Columns: $sortColumns" else "",
+        if (partitionColumnNames.nonEmpty) s"Partition Columns: 
$partitionColumns" else ""
+      ) ++ bucketStrings ++ Seq(
         viewOriginalText.map("Original View: " + _).getOrElse(""),
         viewText.map("View: " + _).getOrElse(""),
         comment.map("Comment: " + _).getOrElse(""),

http://git-wip-us.apache.org/repos/asf/spark/blob/64529b18/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
index 5bb50cb..3a0dcea 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -692,7 +692,7 @@ abstract class CatalogTestUtils {
         CatalogColumn("a", "int"),
         CatalogColumn("b", "string")),
       partitionColumnNames = Seq("a", "b"),
-      bucketColumnNames = Seq("col1"))
+      bucketSpec = Some(BucketSpec(4, Seq("col1"), Nil)))
   }
 
   def newFunc(name: String, database: Option[String] = None): CatalogFunction 
= {

http://git-wip-us.apache.org/repos/asf/spark/blob/64529b18/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 753b64b..4418988 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -23,8 +23,9 @@ import scala.collection.JavaConverters._
 
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
-import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Project}
-import org.apache.spark.sql.execution.datasources.{BucketSpec, 
CreateTableUsingAsSelect, DataSource, HadoopFsRelation}
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable
+import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, 
DataSource, HadoopFsRelation}
 import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/64529b18/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 2a62b86..03f81c4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -21,12 +21,11 @@ import scala.util.control.NonFatal
 
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable}
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogDatabase, 
CatalogTable}
 import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, 
CatalogTableType, SessionCatalog}
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
 import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
-import org.apache.spark.sql.execution.datasources.BucketSpec
 import org.apache.spark.sql.types._
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/64529b18/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index a62853b..8f3adad 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -29,7 +29,7 @@ import org.apache.hadoop.fs.Path
 
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, 
CatalogTableType}
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogColumn, 
CatalogTable, CatalogTableType}
 import org.apache.spark.sql.catalyst.catalog.CatalogTableType._
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
@@ -498,23 +498,19 @@ case class DescribeTableCommand(table: TableIdentifier, 
isExtended: Boolean, isF
   }
 
   private def describeBucketingInfo(metadata: CatalogTable, buffer: 
ArrayBuffer[Row]): Unit = {
-    def appendBucketInfo(numBuckets: Int, bucketColumns: Seq[String], 
sortColumns: Seq[String]) = {
-      append(buffer, "Num Buckets:", numBuckets.toString, "")
-      append(buffer, "Bucket Columns:", bucketColumns.mkString("[", ", ", 
"]"), "")
-      append(buffer, "Sort Columns:", sortColumns.mkString("[", ", ", "]"), "")
+    def appendBucketInfo(bucketSpec: Option[BucketSpec]) = bucketSpec match {
+      case Some(BucketSpec(numBuckets, bucketColumnNames, sortColumnNames)) =>
+        append(buffer, "Num Buckets:", numBuckets.toString, "")
+        append(buffer, "Bucket Columns:", bucketColumnNames.mkString("[", ", 
", "]"), "")
+        append(buffer, "Sort Columns:", sortColumnNames.mkString("[", ", ", 
"]"), "")
+
+      case _ =>
     }
 
-    DDLUtils.getBucketSpecFromTableProperties(metadata) match {
-      case Some(bucketSpec) =>
-        appendBucketInfo(
-          bucketSpec.numBuckets,
-          bucketSpec.bucketColumnNames,
-          bucketSpec.sortColumnNames)
-      case None =>
-        appendBucketInfo(
-          metadata.numBuckets,
-          metadata.bucketColumnNames,
-          metadata.sortColumnNames)
+    if (DDLUtils.isDatasourceTable(metadata)) {
+      appendBucketInfo(DDLUtils.getBucketSpecFromTableProperties(metadata))
+    } else {
+      appendBucketInfo(metadata.bucketSpec)
     }
   }
 
@@ -808,7 +804,7 @@ case class ShowCreateTableCommand(table: TableIdentifier) 
extends RunnableComman
       builder ++= partCols.mkString("PARTITIONED BY (", ", ", ")\n")
     }
 
-    if (metadata.bucketColumnNames.nonEmpty) {
+    if (metadata.bucketSpec.isDefined) {
       throw new UnsupportedOperationException(
         "Creating Hive table with bucket spec is not supported yet.")
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/64529b18/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BucketingUtils.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BucketingUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BucketingUtils.scala
new file mode 100644
index 0000000..377b818
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BucketingUtils.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+private[sql] object BucketingUtils {
+  // The file name of bucketed data should have 3 parts:
+  //   1. some other information in the head of file name
+  //   2. bucket id part, some numbers, starts with "_"
+  //      * The other-information part may use `-` as separator and may have 
numbers at the end,
+  //        e.g. a normal parquet file without bucketing may have name:
+  //        part-r-00000-2dd664f9-d2c4-4ffe-878f-431234567891.gz.parquet, and 
we will mistakenly
+  //        treat `431234567891` as bucket id. So here we pick `_` as 
separator.
+  //   3. optional file extension part, in the tail of file name, starts with 
`.`
+  // An example of bucketed parquet file name with bucket id 3:
+  //   part-r-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet
+  private val bucketedFileName = """.*_(\d+)(?:\..*)?$""".r
+
+  def getBucketId(fileName: String): Option[Int] = fileName match {
+    case bucketedFileName(bucketId) => Some(bucketId.toInt)
+    case other => None
+  }
+
+  def bucketIdToString(id: Int): String = f"_$id%05d"
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/64529b18/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index f572b93..79024fd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -30,6 +30,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
 import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
 import org.apache.spark.sql.execution.datasources.json.JsonFileFormat

http://git-wip-us.apache.org/repos/asf/spark/blob/64529b18/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 04f166f..32aa471 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -25,6 +25,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.{expressions, InternalRow}
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

http://git-wip-us.apache.org/repos/asf/spark/blob/64529b18/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
index 1426dcf..b49525c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
@@ -19,13 +19,13 @@ package org.apache.spark.sql.execution.datasources
 
 import java.io.IOException
 
-import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapreduce._
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
 
 import org.apache.spark._
 import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.InternalRow

http://git-wip-us.apache.org/repos/asf/spark/blob/64529b18/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
index 9a0b46c..c801436 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
@@ -28,6 +28,7 @@ import org.apache.spark._
 import org.apache.spark.internal.Logging
 import org.apache.spark.mapred.SparkHadoopMapRedUtil
 import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
 import org.apache.spark.sql.catalyst.InternalRow

http://git-wip-us.apache.org/repos/asf/spark/blob/64529b18/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/bucket.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/bucket.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/bucket.scala
deleted file mode 100644
index 961d035..0000000
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/bucket.scala
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources
-
-import org.apache.spark.sql.AnalysisException
-
-/**
- * A container for bucketing information.
- * Bucketing is a technology for decomposing data sets into more manageable 
parts, and the number
- * of buckets is fixed so it does not fluctuate with data.
- *
- * @param numBuckets number of buckets.
- * @param bucketColumnNames the names of the columns that used to generate the 
bucket id.
- * @param sortColumnNames the names of the columns that used to sort data in 
each bucket.
- */
-private[sql] case class BucketSpec(
-    numBuckets: Int,
-    bucketColumnNames: Seq[String],
-    sortColumnNames: Seq[String]) {
-  if (numBuckets <= 0) {
-    throw new AnalysisException(s"Expected positive number of buckets, but got 
`$numBuckets`.")
-  }
-}
-
-private[sql] object BucketingUtils {
-  // The file name of bucketed data should have 3 parts:
-  //   1. some other information in the head of file name
-  //   2. bucket id part, some numbers, starts with "_"
-  //      * The other-information part may use `-` as separator and may have 
numbers at the end,
-  //        e.g. a normal parquet file without bucketing may have name:
-  //        part-r-00000-2dd664f9-d2c4-4ffe-878f-431234567891.gz.parquet, and 
we will mistakenly
-  //        treat `431234567891` as bucket id. So here we pick `_` as 
separator.
-  //   3. optional file extension part, in the tail of file name, starts with 
`.`
-  // An example of bucketed parquet file name with bucket id 3:
-  //   part-r-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet
-  private val bucketedFileName = """.*_(\d+)(?:\..*)?$""".r
-
-  def getBucketId(fileName: String): Option[Int] = fileName match {
-    case bucketedFileName(bucketId) => Some(bucketId.toInt)
-    case other => None
-  }
-
-  def bucketIdToString(id: Int): String = f"_$id%05d"
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/64529b18/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index 31a2075..18369b5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

http://git-wip-us.apache.org/repos/asf/spark/blob/64529b18/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
index d238da2..5ce8350 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.sql.execution.datasources
 
 import scala.collection.mutable
-import scala.util.Try
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs._
@@ -30,6 +29,7 @@ import org.apache.spark.annotation.Experimental
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.expressions._
 import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
 import org.apache.spark.sql.execution.FileRelation

http://git-wip-us.apache.org/repos/asf/spark/blob/64529b18/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 1ae9b55..05dfb8c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -153,7 +153,7 @@ class CatalogImpl(sparkSession: SparkSession) extends 
Catalog {
   private def listColumns(tableIdentifier: TableIdentifier): Dataset[Column] = 
{
     val tableMetadata = sessionCatalog.getTableMetadata(tableIdentifier)
     val partitionColumnNames = tableMetadata.partitionColumnNames.toSet
-    val bucketColumnNames = tableMetadata.bucketColumnNames.toSet
+    val bucketColumnNames = 
tableMetadata.bucketSpec.map(_.bucketColumnNames).getOrElse(Nil).toSet
     val columns = tableMetadata.schema.map { c =>
       new Column(
         name = c.name,

http://git-wip-us.apache.org/repos/asf/spark/blob/64529b18/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
index b170a3a..999afc9 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -20,15 +20,15 @@ package org.apache.spark.sql.execution.command
 import scala.reflect.{classTag, ClassTag}
 
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, 
FunctionResource}
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTableType, 
FunctionResource}
 import org.apache.spark.sql.catalyst.catalog.FunctionResourceType
 import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.catalyst.plans.PlanTest
 import org.apache.spark.sql.catalyst.plans.logical.Project
 import org.apache.spark.sql.execution.SparkSqlParser
-import org.apache.spark.sql.execution.datasources.{BucketSpec, 
CreateTableUsing}
+import org.apache.spark.sql.execution.datasources.CreateTableUsing
 import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
-import org.apache.spark.sql.types.{IntegerType, MetadataBuilder, StringType, 
StructType}
+import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
 
 
 // TODO: merge this with DDLSuite (SPARK-14441)

http://git-wip-us.apache.org/repos/asf/spark/blob/64529b18/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 34c980e..a354594 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -26,13 +26,12 @@ import org.apache.spark.internal.config._
 import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, 
FunctionRegistry, NoSuchPartitionException, NoSuchTableException, 
TempTableAlreadyExistsException}
-import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, 
CatalogStorageFormat}
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogDatabase, 
CatalogStorageFormat}
 import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, 
CatalogTableType}
 import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, 
SessionCatalog}
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
-import org.apache.spark.sql.execution.datasources.BucketSpec
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types.{IntegerType, StructType}

http://git-wip-us.apache.org/repos/asf/spark/blob/64529b18/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index 8d8a18f..ddcc24a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -27,6 +27,7 @@ import org.apache.hadoop.mapreduce.Job
 import org.apache.spark.SparkConf
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionSet, 
PredicateHelper}
 import org.apache.spark.sql.catalyst.util
 import org.apache.spark.sql.execution.DataSourceScanExec

http://git-wip-us.apache.org/repos/asf/spark/blob/64529b18/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
index 07aeaeb..8aa8185 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
@@ -90,11 +90,12 @@ class CatalogSuite
       .getOrElse { spark.catalog.listColumns(tableName) }
     assume(tableMetadata.schema.nonEmpty, "bad test")
     assume(tableMetadata.partitionColumnNames.nonEmpty, "bad test")
-    assume(tableMetadata.bucketColumnNames.nonEmpty, "bad test")
+    assume(tableMetadata.bucketSpec.isDefined, "bad test")
     assert(columns.collect().map(_.name).toSet == 
tableMetadata.schema.map(_.name).toSet)
+    val bucketColumnNames = 
tableMetadata.bucketSpec.map(_.bucketColumnNames).getOrElse(Nil).toSet
     columns.collect().foreach { col =>
       assert(col.isPartition == 
tableMetadata.partitionColumnNames.contains(col.name))
-      assert(col.isBucket == 
tableMetadata.bucketColumnNames.contains(col.name))
+      assert(col.isBucket == bucketColumnNames.contains(col.name))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/64529b18/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
index 5ab585f..49153f7 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
@@ -24,9 +24,9 @@ import org.scalatest.BeforeAndAfterEach
 import org.apache.spark.SparkException
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.execution.command.DDLUtils
-import org.apache.spark.sql.execution.datasources.BucketSpec
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.util.Utils
 

http://git-wip-us.apache.org/repos/asf/spark/blob/64529b18/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 9f5782f..2392cc0 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -365,9 +365,9 @@ private[hive] class HiveClientImpl(
         },
         schema = schema,
         partitionColumnNames = partCols.map(_.name),
-        sortColumnNames = Seq(), // TODO: populate this
-        bucketColumnNames = h.getBucketCols.asScala,
-        numBuckets = h.getNumBuckets,
+        // We can not populate bucketing information for Hive tables as Spark 
SQL has a different
+        // implementation of hash function from Hive.
+        bucketSpec = None,
         owner = h.getOwner,
         createTime = h.getTTable.getCreateTime.toLong * 1000,
         lastAccessTime = h.getLastAccessTime.toLong * 1000,
@@ -764,10 +764,7 @@ private[hive] class HiveClientImpl(
       hiveTable.setFields(schema.asJava)
     }
     hiveTable.setPartCols(partCols.asJava)
-    // TODO: set sort columns here too
-    hiveTable.setBucketCols(table.bucketColumnNames.asJava)
     hiveTable.setOwner(conf.getUser)
-    hiveTable.setNumBuckets(table.numBuckets)
     hiveTable.setCreateTime((table.createTime / 1000).toInt)
     hiveTable.setLastAccessTime((table.lastAccessTime / 1000).toInt)
     table.storage.locationUri.foreach { loc => shim.setDataLocation(hiveTable, 
loc) }

http://git-wip-us.apache.org/repos/asf/spark/blob/64529b18/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
index a708434..5450fba 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
@@ -293,9 +293,7 @@ class HiveDDLCommandSuite extends PlanTest {
     assert(desc.tableType == CatalogTableType.MANAGED)
     assert(desc.schema == Seq(CatalogColumn("id", "int"), 
CatalogColumn("name", "string")))
     assert(desc.partitionColumnNames.isEmpty)
-    assert(desc.sortColumnNames.isEmpty)
-    assert(desc.bucketColumnNames.isEmpty)
-    assert(desc.numBuckets == -1)
+    assert(desc.bucketSpec.isEmpty)
     assert(desc.viewText.isEmpty)
     assert(desc.viewOriginalText.isEmpty)
     assert(desc.storage.locationUri.isEmpty)
@@ -453,9 +451,7 @@ class HiveDDLCommandSuite extends PlanTest {
       CatalogColumn("name", "string"),
       CatalogColumn("month", "int")))
     assert(desc.partitionColumnNames == Seq("month"))
-    assert(desc.sortColumnNames.isEmpty)
-    assert(desc.bucketColumnNames.isEmpty)
-    assert(desc.numBuckets == -1)
+    assert(desc.bucketSpec.isEmpty)
     assert(desc.viewText.isEmpty)
     assert(desc.viewOriginalText.isEmpty)
     assert(desc.storage.locationUri == Some("/path/to/mercury"))

http://git-wip-us.apache.org/repos/asf/spark/blob/64529b18/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
index fc01ff3..e461490 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
@@ -20,10 +20,11 @@ package org.apache.spark.sql.sources
 import java.io.File
 
 import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
 import org.apache.spark.sql.execution.DataSourceScanExec
-import org.apache.spark.sql.execution.datasources.{BucketSpec, 
DataSourceStrategy}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
 import org.apache.spark.sql.execution.exchange.ShuffleExchange
 import org.apache.spark.sql.execution.joins.SortMergeJoinExec
 import org.apache.spark.sql.functions._


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to