Repository: spark
Updated Branches:
  refs/heads/master 6f7ba6409 -> 917d3fc06


[SPARK-12539][SQL] support writing bucketed table

This PR adds bucket write support to Spark SQL. User can specify bucketing 
columns, numBuckets and sorting columns with or without partition columns. For 
example:
```
df.write.partitionBy("year").bucketBy(8, 
"country").sortBy("amount").saveAsTable("sales")
```

When bucketing is used, we will calculate bucket id for each record, and group 
the records by bucket id. For each group, we will create a file with bucket id 
in its name, and write data into it. For each bucket file, if sorting columns 
are specified, the data will be sorted before write.

Note that there may be multiply files for one bucket, as the data is 
distributed.

Currently we store the bucket metadata at hive metastore in a 
non-hive-compatible way. We use different bucketing hash function compared to 
hive, so we can't be compatible anyway.

Limitations:

* Can't write bucketed data without hive metastore.
* Can't insert bucketed data into existing hive tables.

Author: Wenchen Fan <wenc...@databricks.com>

Closes #10498 from cloud-fan/bucket-write.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/917d3fc0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/917d3fc0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/917d3fc0

Branch: refs/heads/master
Commit: 917d3fc069fb9ea1c1487119c9c12b373f4f9b77
Parents: 6f7ba64
Author: Wenchen Fan <wenc...@databricks.com>
Authored: Wed Jan 6 16:58:10 2016 -0800
Committer: Reynold Xin <r...@databricks.com>
Committed: Wed Jan 6 16:58:10 2016 -0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/DataFrameWriter.scala  |  89 +++++++-
 .../spark/sql/execution/SparkStrategies.scala   |   7 +-
 .../sql/execution/datasources/DDLParser.scala   |   1 +
 .../InsertIntoHadoopFsRelation.scala            |   2 +-
 .../datasources/ResolvedDataSource.scala        |   2 +
 .../execution/datasources/WriterContainer.scala | 219 +++++++++++++------
 .../sql/execution/datasources/bucket.scala      |  57 +++++
 .../spark/sql/execution/datasources/ddl.scala   |  10 +-
 .../datasources/json/JSONRelation.scala         |  35 ++-
 .../datasources/parquet/ParquetRelation.scala   |  28 ++-
 .../spark/sql/execution/datasources/rules.scala |  24 +-
 .../apache/spark/sql/sources/interfaces.scala   |  34 ++-
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |  23 +-
 .../apache/spark/sql/hive/HiveStrategies.scala  |   7 +-
 .../spark/sql/hive/execution/commands.scala     |  15 +-
 .../apache/spark/sql/hive/orc/OrcRelation.scala |  20 +-
 .../sql/hive/MetastoreDataSourcesSuite.scala    |   1 +
 .../spark/sql/sources/BucketedWriteSuite.scala  | 169 ++++++++++++++
 18 files changed, 626 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/917d3fc0/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index e2d72a5..00f9817 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -23,9 +23,9 @@ import scala.collection.JavaConverters._
 
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.sql.catalyst.{SqlParser, TableIdentifier}
-import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, 
UnresolvedRelation}
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Project}
-import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, 
ResolvedDataSource}
+import org.apache.spark.sql.execution.datasources.{BucketSpec, 
CreateTableUsingAsSelect, ResolvedDataSource}
 import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
 import org.apache.spark.sql.sources.HadoopFsRelation
 
@@ -129,6 +129,34 @@ final class DataFrameWriter private[sql](df: DataFrame) {
   }
 
   /**
+   * Buckets the output by the given columns. If specified, the output is laid 
out on the file
+   * system similar to Hive's bucketing scheme.
+   *
+   * This is applicable for Parquet, JSON and ORC.
+   *
+   * @since 2.0
+   */
+  @scala.annotation.varargs
+  def bucketBy(numBuckets: Int, colName: String, colNames: String*): 
DataFrameWriter = {
+    this.numBuckets = Option(numBuckets)
+    this.bucketColumnNames = Option(colName +: colNames)
+    this
+  }
+
+  /**
+   * Sorts the output in each bucket by the given columns.
+   *
+   * This is applicable for Parquet, JSON and ORC.
+   *
+   * @since 2.0
+   */
+  @scala.annotation.varargs
+  def sortBy(colName: String, colNames: String*): DataFrameWriter = {
+    this.sortColumnNames = Option(colName +: colNames)
+    this
+  }
+
+  /**
    * Saves the content of the [[DataFrame]] at the specified path.
    *
    * @since 1.4.0
@@ -144,10 +172,12 @@ final class DataFrameWriter private[sql](df: DataFrame) {
    * @since 1.4.0
    */
   def save(): Unit = {
+    assertNotBucketed()
     ResolvedDataSource(
       df.sqlContext,
       source,
       partitioningColumns.map(_.toArray).getOrElse(Array.empty[String]),
+      getBucketSpec,
       mode,
       extraOptions.toMap,
       df)
@@ -166,6 +196,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
   }
 
   private def insertInto(tableIdent: TableIdentifier): Unit = {
+    assertNotBucketed()
     val partitions = normalizedParCols.map(_.map(col => col -> (None: 
Option[String])).toMap)
     val overwrite = mode == SaveMode.Overwrite
 
@@ -188,13 +219,47 @@ final class DataFrameWriter private[sql](df: DataFrame) {
         ifNotExists = false)).toRdd
   }
 
-  private def normalizedParCols: Option[Seq[String]] = partitioningColumns.map 
{ parCols =>
-    parCols.map { col =>
-      df.logicalPlan.output
-        .map(_.name)
-        .find(df.sqlContext.analyzer.resolver(_, col))
-        .getOrElse(throw new AnalysisException(s"Partition column $col not 
found in existing " +
-          s"columns (${df.logicalPlan.output.map(_.name).mkString(", ")})"))
+  private def normalizedParCols: Option[Seq[String]] = partitioningColumns.map 
{ cols =>
+    cols.map(normalize(_, "Partition"))
+  }
+
+  private def normalizedBucketColNames: Option[Seq[String]] = 
bucketColumnNames.map { cols =>
+    cols.map(normalize(_, "Bucketing"))
+  }
+
+  private def normalizedSortColNames: Option[Seq[String]] = 
sortColumnNames.map { cols =>
+    cols.map(normalize(_, "Sorting"))
+  }
+
+  private def getBucketSpec: Option[BucketSpec] = {
+    if (sortColumnNames.isDefined) {
+      require(numBuckets.isDefined, "sortBy must be used together with 
bucketBy")
+    }
+
+    for {
+      n <- numBuckets
+    } yield {
+      require(n > 0 && n < 100000, "Bucket number must be greater than 0 and 
less than 100000.")
+      BucketSpec(n, normalizedBucketColNames.get, 
normalizedSortColNames.getOrElse(Nil))
+    }
+  }
+
+  /**
+   * The given column name may not be equal to any of the existing column 
names if we were in
+   * case-insensitive context.  Normalize the given column name to the real 
one so that we don't
+   * need to care about case sensitivity afterwards.
+   */
+  private def normalize(columnName: String, columnType: String): String = {
+    val validColumnNames = df.logicalPlan.output.map(_.name)
+    validColumnNames.find(df.sqlContext.analyzer.resolver(_, columnName))
+      .getOrElse(throw new AnalysisException(s"$columnType column $columnName 
not found in " +
+        s"existing columns (${validColumnNames.mkString(", ")})"))
+  }
+
+  private def assertNotBucketed(): Unit = {
+    if (numBuckets.isDefined || sortColumnNames.isDefined) {
+      throw new IllegalArgumentException(
+        "Currently we don't support writing bucketed data to this data 
source.")
     }
   }
 
@@ -244,6 +309,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
             source,
             temporary = false,
             partitioningColumns.map(_.toArray).getOrElse(Array.empty[String]),
+            getBucketSpec,
             mode,
             extraOptions.toMap,
             df.logicalPlan)
@@ -372,4 +438,9 @@ final class DataFrameWriter private[sql](df: DataFrame) {
 
   private var partitioningColumns: Option[Seq[String]] = None
 
+  private var bucketColumnNames: Option[Seq[String]] = None
+
+  private var numBuckets: Option[Int] = None
+
+  private var sortColumnNames: Option[Seq[String]] = None
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/917d3fc0/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 6cf75bc..482130a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -382,13 +382,12 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
       case c: CreateTableUsing if c.temporary && c.allowExisting =>
         sys.error("allowExisting should be set to false when creating a 
temporary table.")
 
-      case CreateTableUsingAsSelect(tableIdent, provider, true, 
partitionsCols, mode, opts, query)
-          if partitionsCols.nonEmpty =>
+      case c: CreateTableUsingAsSelect if c.temporary && 
c.partitionColumns.nonEmpty =>
         sys.error("Cannot create temporary partitioned table.")
 
-      case CreateTableUsingAsSelect(tableIdent, provider, true, _, mode, opts, 
query) =>
+      case c: CreateTableUsingAsSelect if c.temporary =>
         val cmd = CreateTempTableUsingAsSelect(
-          tableIdent, provider, Array.empty[String], mode, opts, query)
+          c.tableIdent, c.provider, Array.empty[String], c.mode, c.options, 
c.child)
         ExecutedCommand(cmd) :: Nil
       case c: CreateTableUsingAsSelect if !c.temporary =>
         sys.error("Tables created with SQLContext must be TEMPORARY. Use a 
HiveContext instead.")

http://git-wip-us.apache.org/repos/asf/spark/blob/917d3fc0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DDLParser.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DDLParser.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DDLParser.scala
index 48eff62..d8d21b0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DDLParser.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DDLParser.scala
@@ -109,6 +109,7 @@ class DDLParser(parseQuery: String => LogicalPlan)
             provider,
             temp.isDefined,
             Array.empty[String],
+            bucketSpec = None,
             mode,
             options,
             queryPlan)

http://git-wip-us.apache.org/repos/asf/spark/blob/917d3fc0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
index 38152d0..7a8691e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
@@ -125,7 +125,7 @@ private[sql] case class InsertIntoHadoopFsRelation(
              |Actual: ${partitionColumns.mkString(", ")}
           """.stripMargin)
 
-        val writerContainer = if (partitionColumns.isEmpty) {
+        val writerContainer = if (partitionColumns.isEmpty && 
relation.bucketSpec.isEmpty) {
           new DefaultWriterContainer(relation, job, isAppend)
         } else {
           val output = df.queryExecution.executedPlan.output

http://git-wip-us.apache.org/repos/asf/spark/blob/917d3fc0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
index 0ca0a38..ece9b8a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
@@ -210,6 +210,7 @@ object ResolvedDataSource extends Logging {
       sqlContext: SQLContext,
       provider: String,
       partitionColumns: Array[String],
+      bucketSpec: Option[BucketSpec],
       mode: SaveMode,
       options: Map[String, String],
       data: DataFrame): ResolvedDataSource = {
@@ -244,6 +245,7 @@ object ResolvedDataSource extends Logging {
           Array(outputPath.toString),
           Some(dataSchema.asNullable),
           Some(partitionColumnsSchema(data.schema, partitionColumns, 
caseSensitive)),
+          bucketSpec,
           caseInsensitiveOptions)
 
         // For partitioned relation r, r.schema's column ordering can be 
different from the column

http://git-wip-us.apache.org/repos/asf/spark/blob/917d3fc0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
index 9f23d53..4f8524f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.UnsafeKVExternalSorter
 import org.apache.spark.sql.sources.{HadoopFsRelation, OutputWriter, 
OutputWriterFactory}
-import org.apache.spark.sql.types.{StringType, StructType}
+import org.apache.spark.sql.types.{IntegerType, StructType, StringType}
 import org.apache.spark.util.SerializableConfiguration
 
 
@@ -121,9 +121,9 @@ private[sql] abstract class BaseWriterContainer(
     }
   }
 
-  protected def newOutputWriter(path: String): OutputWriter = {
+  protected def newOutputWriter(path: String, bucketId: Option[Int] = None): 
OutputWriter = {
     try {
-      outputWriterFactory.newInstance(path, dataSchema, taskAttemptContext)
+      outputWriterFactory.newInstance(path, bucketId, dataSchema, 
taskAttemptContext)
     } catch {
       case e: org.apache.hadoop.fs.FileAlreadyExistsException =>
         if 
(outputCommitter.isInstanceOf[parquet.DirectParquetOutputCommitter]) {
@@ -312,19 +312,23 @@ private[sql] class DynamicPartitionWriterContainer(
     isAppend: Boolean)
   extends BaseWriterContainer(relation, job, isAppend) {
 
-  def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): 
Unit = {
-    val outputWriters = new java.util.HashMap[InternalRow, OutputWriter]
-    executorSideSetup(taskContext)
+  private val bucketSpec = relation.bucketSpec
 
-    var outputWritersCleared = false
+  private val bucketColumns: Seq[Attribute] = bucketSpec.toSeq.flatMap {
+    spec => spec.bucketColumnNames.map(c => inputSchema.find(_.name == c).get)
+  }
 
-    // Returns the partition key given an input row
-    val getPartitionKey = UnsafeProjection.create(partitionColumns, 
inputSchema)
-    // Returns the data columns to be written given an input row
-    val getOutputRow = UnsafeProjection.create(dataColumns, inputSchema)
+  private val sortColumns: Seq[Attribute] = bucketSpec.toSeq.flatMap {
+    spec => spec.sortColumnNames.map(c => inputSchema.find(_.name == c).get)
+  }
+
+  private def bucketIdExpression: Option[Expression] = for {
+    BucketSpec(numBuckets, _, _) <- bucketSpec
+  } yield Pmod(new Murmur3Hash(bucketColumns), Literal(numBuckets))
 
-    // Expressions that given a partition key build a string like: 
col1=val/col2=val/...
-    val partitionStringExpression = partitionColumns.zipWithIndex.flatMap { 
case (c, i) =>
+  // Expressions that given a partition key build a string like: 
col1=val/col2=val/...
+  private def partitionStringExpression: Seq[Expression] = {
+    partitionColumns.zipWithIndex.flatMap { case (c, i) =>
       val escaped =
         ScalaUDF(
           PartitioningUtils.escapePathName _,
@@ -335,6 +339,121 @@ private[sql] class DynamicPartitionWriterContainer(
       val partitionName = Literal(c.name + "=") :: str :: Nil
       if (i == 0) partitionName else Literal(Path.SEPARATOR) :: partitionName
     }
+  }
+
+  private def getBucketIdFromKey(key: InternalRow): Option[Int] = {
+    if (bucketSpec.isDefined) {
+      Some(key.getInt(partitionColumns.length))
+    } else {
+      None
+    }
+  }
+
+  private def sameBucket(key1: UnsafeRow, key2: UnsafeRow): Boolean = {
+    val bucketIdIndex = partitionColumns.length
+    if (key1.getInt(bucketIdIndex) != key2.getInt(bucketIdIndex)) {
+      false
+    } else {
+      var i = partitionColumns.length - 1
+      while (i >= 0) {
+        val dt = partitionColumns(i).dataType
+        if (key1.get(i, dt) != key2.get(i, dt)) return false
+        i -= 1
+      }
+      true
+    }
+  }
+
+  private def sortBasedWrite(
+      sorter: UnsafeKVExternalSorter,
+      iterator: Iterator[InternalRow],
+      getSortingKey: UnsafeProjection,
+      getOutputRow: UnsafeProjection,
+      getPartitionString: UnsafeProjection,
+      outputWriters: java.util.HashMap[InternalRow, OutputWriter]): Unit = {
+    while (iterator.hasNext) {
+      val currentRow = iterator.next()
+      sorter.insertKV(getSortingKey(currentRow), getOutputRow(currentRow))
+    }
+
+    logInfo(s"Sorting complete. Writing out partition files one at a time.")
+
+    val needNewWriter: (UnsafeRow, UnsafeRow) => Boolean = if 
(sortColumns.isEmpty) {
+      (key1, key2) => key1 != key2
+    } else {
+      (key1, key2) => key1 == null || !sameBucket(key1, key2)
+    }
+
+    val sortedIterator = sorter.sortedIterator()
+    var currentKey: UnsafeRow = null
+    var currentWriter: OutputWriter = null
+    try {
+      while (sortedIterator.next()) {
+        if (needNewWriter(currentKey, sortedIterator.getKey)) {
+          if (currentWriter != null) {
+            currentWriter.close()
+          }
+          currentKey = sortedIterator.getKey.copy()
+          logDebug(s"Writing partition: $currentKey")
+
+          // Either use an existing file from before, or open a new one.
+          currentWriter = outputWriters.remove(currentKey)
+          if (currentWriter == null) {
+            currentWriter = newOutputWriter(currentKey, getPartitionString)
+          }
+        }
+
+        currentWriter.writeInternal(sortedIterator.getValue)
+      }
+    } finally {
+      if (currentWriter != null) { currentWriter.close() }
+    }
+  }
+
+  /**
+   * Open and returns a new OutputWriter given a partition key and optional 
bucket id.
+   * If bucket id is specified, we will append it to the end of the file name, 
but before the
+   * file extension, e.g. 
part-r-00009-ea518ad4-455a-4431-b471-d24e03814677-00002.gz.parquet
+   */
+  private def newOutputWriter(
+      key: InternalRow,
+      getPartitionString: UnsafeProjection): OutputWriter = {
+    val configuration = taskAttemptContext.getConfiguration
+    val path = if (partitionColumns.nonEmpty) {
+      val partitionPath = getPartitionString(key).getString(0)
+      configuration.set(
+        "spark.sql.sources.output.path", new Path(outputPath, 
partitionPath).toString)
+      new Path(getWorkPath, partitionPath).toString
+    } else {
+      configuration.set("spark.sql.sources.output.path", outputPath)
+      getWorkPath
+    }
+    val bucketId = getBucketIdFromKey(key)
+    val newWriter = super.newOutputWriter(path, bucketId)
+    newWriter.initConverter(dataSchema)
+    newWriter
+  }
+
+  def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): 
Unit = {
+    val outputWriters = new java.util.HashMap[InternalRow, OutputWriter]
+    executorSideSetup(taskContext)
+
+    var outputWritersCleared = false
+
+    // We should first sort by partition columns, then bucket id, and finally 
sorting columns.
+    val getSortingKey =
+      UnsafeProjection.create(partitionColumns ++ bucketIdExpression ++ 
sortColumns, inputSchema)
+
+    val sortingKeySchema = if (bucketSpec.isEmpty) {
+      StructType.fromAttributes(partitionColumns)
+    } else { // If it's bucketed, we should also consider bucket id as part of 
the key.
+      val fields = StructType.fromAttributes(partitionColumns)
+        .add("bucketId", IntegerType, nullable = false) ++ 
StructType.fromAttributes(sortColumns)
+      StructType(fields)
+    }
+
+    // Returns the data columns to be written given an input row
+    val getOutputRow = UnsafeProjection.create(dataColumns, inputSchema)
 
     // Returns the partition path given a partition key.
     val getPartitionString =
@@ -342,22 +461,34 @@ private[sql] class DynamicPartitionWriterContainer(
 
     // If anything below fails, we should abort the task.
     try {
-      // This will be filled in if we have to fall back on sorting.
-      var sorter: UnsafeKVExternalSorter = null
+      // If there is no sorting columns, we set sorter to null and try the 
hash-based writing first,
+      // and fill the sorter if there are too many writers and we need to fall 
back on sorting.
+      // If there are sorting columns, then we have to sort the data anyway, 
and no need to try the
+      // hash-based writing first.
+      var sorter: UnsafeKVExternalSorter = if (sortColumns.nonEmpty) {
+        new UnsafeKVExternalSorter(
+          sortingKeySchema,
+          StructType.fromAttributes(dataColumns),
+          SparkEnv.get.blockManager,
+          TaskContext.get().taskMemoryManager().pageSizeBytes)
+      } else {
+        null
+      }
       while (iterator.hasNext && sorter == null) {
         val inputRow = iterator.next()
-        val currentKey = getPartitionKey(inputRow)
+        // When we reach here, the `sortColumns` must be empty, so the sorting 
key is hashing key.
+        val currentKey = getSortingKey(inputRow)
         var currentWriter = outputWriters.get(currentKey)
 
         if (currentWriter == null) {
           if (outputWriters.size < maxOpenFiles) {
-            currentWriter = newOutputWriter(currentKey)
+            currentWriter = newOutputWriter(currentKey, getPartitionString)
             outputWriters.put(currentKey.copy(), currentWriter)
             currentWriter.writeInternal(getOutputRow(inputRow))
           } else {
             logInfo(s"Maximum partitions reached, falling back on sorting.")
             sorter = new UnsafeKVExternalSorter(
-              StructType.fromAttributes(partitionColumns),
+              sortingKeySchema,
               StructType.fromAttributes(dataColumns),
               SparkEnv.get.blockManager,
               TaskContext.get().taskMemoryManager().pageSizeBytes)
@@ -369,39 +500,15 @@ private[sql] class DynamicPartitionWriterContainer(
       }
 
       // If the sorter is not null that means that we reached the maxFiles 
above and need to finish
-      // using external sort.
+      // using external sort, or there are sorting columns and we need to sort 
the whole data set.
       if (sorter != null) {
-        while (iterator.hasNext) {
-          val currentRow = iterator.next()
-          sorter.insertKV(getPartitionKey(currentRow), 
getOutputRow(currentRow))
-        }
-
-        logInfo(s"Sorting complete. Writing out partition files one at a 
time.")
-
-        val sortedIterator = sorter.sortedIterator()
-        var currentKey: InternalRow = null
-        var currentWriter: OutputWriter = null
-        try {
-          while (sortedIterator.next()) {
-            if (currentKey != sortedIterator.getKey) {
-              if (currentWriter != null) {
-                currentWriter.close()
-              }
-              currentKey = sortedIterator.getKey.copy()
-              logDebug(s"Writing partition: $currentKey")
-
-              // Either use an existing file from before, or open a new one.
-              currentWriter = outputWriters.remove(currentKey)
-              if (currentWriter == null) {
-                currentWriter = newOutputWriter(currentKey)
-              }
-            }
-
-            currentWriter.writeInternal(sortedIterator.getValue)
-          }
-        } finally {
-          if (currentWriter != null) { currentWriter.close() }
-        }
+        sortBasedWrite(
+          sorter,
+          iterator,
+          getSortingKey,
+          getOutputRow,
+          getPartitionString,
+          outputWriters)
       }
 
       commitTask()
@@ -412,18 +519,6 @@ private[sql] class DynamicPartitionWriterContainer(
         throw new SparkException("Task failed while writing rows.", cause)
     }
 
-    /** Open and returns a new OutputWriter given a partition key. */
-    def newOutputWriter(key: InternalRow): OutputWriter = {
-      val partitionPath = getPartitionString(key).getString(0)
-      val path = new Path(getWorkPath, partitionPath)
-      val configuration = taskAttemptContext.getConfiguration
-      configuration.set(
-        "spark.sql.sources.output.path", new Path(outputPath, 
partitionPath).toString)
-      val newWriter = super.newOutputWriter(path.toString)
-      newWriter.initConverter(dataSchema)
-      newWriter
-    }
-
     def clearOutputWriters(): Unit = {
       if (!outputWritersCleared) {
         outputWriters.asScala.values.foreach(_.close())

http://git-wip-us.apache.org/repos/asf/spark/blob/917d3fc0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/bucket.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/bucket.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/bucket.scala
new file mode 100644
index 0000000..82287c8
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/bucket.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.hadoop.mapreduce.TaskAttemptContext
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.sources.{OutputWriter, OutputWriterFactory, 
HadoopFsRelationProvider, HadoopFsRelation}
+import org.apache.spark.sql.types.StructType
+
+/**
+ * A container for bucketing information.
+ * Bucketing is a technology for decomposing data sets into more manageable 
parts, and the number
+ * of buckets is fixed so it does not fluctuate with data.
+ *
+ * @param numBuckets number of buckets.
+ * @param bucketColumnNames the names of the columns that used to generate the 
bucket id.
+ * @param sortColumnNames the names of the columns that used to sort data in 
each bucket.
+ */
+private[sql] case class BucketSpec(
+    numBuckets: Int,
+    bucketColumnNames: Seq[String],
+    sortColumnNames: Seq[String])
+
+private[sql] trait BucketedHadoopFsRelationProvider extends 
HadoopFsRelationProvider {
+  final override def createRelation(
+      sqlContext: SQLContext,
+      paths: Array[String],
+      dataSchema: Option[StructType],
+      partitionColumns: Option[StructType],
+      parameters: Map[String, String]): HadoopFsRelation =
+    // TODO: throw exception here as we won't call this method during 
execution, after bucketed read
+    // support is finished.
+    createRelation(sqlContext, paths, dataSchema, partitionColumns, bucketSpec 
= None, parameters)
+}
+
+private[sql] abstract class BucketedOutputWriterFactory extends 
OutputWriterFactory {
+  final override def newInstance(
+      path: String,
+      dataSchema: StructType,
+      context: TaskAttemptContext): OutputWriter =
+    throw new UnsupportedOperationException("use bucket version")
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/917d3fc0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index aed5d0d..0897fca 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -76,6 +76,7 @@ case class CreateTableUsingAsSelect(
     provider: String,
     temporary: Boolean,
     partitionColumns: Array[String],
+    bucketSpec: Option[BucketSpec],
     mode: SaveMode,
     options: Map[String, String],
     child: LogicalPlan) extends UnaryNode {
@@ -109,7 +110,14 @@ case class CreateTempTableUsingAsSelect(
 
   override def run(sqlContext: SQLContext): Seq[Row] = {
     val df = DataFrame(sqlContext, query)
-    val resolved = ResolvedDataSource(sqlContext, provider, partitionColumns, 
mode, options, df)
+    val resolved = ResolvedDataSource(
+      sqlContext,
+      provider,
+      partitionColumns,
+      bucketSpec = None,
+      mode,
+      options,
+      df)
     sqlContext.catalog.registerTable(
       tableIdent,
       DataFrame(sqlContext, LogicalRelation(resolved.relation)).logicalPlan)

http://git-wip-us.apache.org/repos/asf/spark/blob/917d3fc0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
index 8bf5381..b92edf6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
@@ -34,13 +34,13 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
-import org.apache.spark.sql.execution.datasources.PartitionSpec
+import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.SerializableConfiguration
 
 
-class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister {
+class DefaultSource extends BucketedHadoopFsRelationProvider with 
DataSourceRegister {
 
   override def shortName(): String = "json"
 
@@ -49,6 +49,7 @@ class DefaultSource extends HadoopFsRelationProvider with 
DataSourceRegister {
       paths: Array[String],
       dataSchema: Option[StructType],
       partitionColumns: Option[StructType],
+      bucketSpec: Option[BucketSpec],
       parameters: Map[String, String]): HadoopFsRelation = {
 
     new JSONRelation(
@@ -56,6 +57,7 @@ class DefaultSource extends HadoopFsRelationProvider with 
DataSourceRegister {
       maybeDataSchema = dataSchema,
       maybePartitionSpec = None,
       userDefinedPartitionColumns = partitionColumns,
+      bucketSpec = bucketSpec,
       paths = paths,
       parameters = parameters)(sqlContext)
   }
@@ -66,11 +68,29 @@ private[sql] class JSONRelation(
     val maybeDataSchema: Option[StructType],
     val maybePartitionSpec: Option[PartitionSpec],
     override val userDefinedPartitionColumns: Option[StructType],
+    override val bucketSpec: Option[BucketSpec],
     override val paths: Array[String] = Array.empty[String],
     parameters: Map[String, String] = Map.empty[String, String])
     (@transient val sqlContext: SQLContext)
   extends HadoopFsRelation(maybePartitionSpec, parameters) {
 
+  def this(
+      inputRDD: Option[RDD[String]],
+      maybeDataSchema: Option[StructType],
+      maybePartitionSpec: Option[PartitionSpec],
+      userDefinedPartitionColumns: Option[StructType],
+      paths: Array[String] = Array.empty[String],
+      parameters: Map[String, String] = Map.empty[String, String])(sqlContext: 
SQLContext) = {
+    this(
+      inputRDD,
+      maybeDataSchema,
+      maybePartitionSpec,
+      userDefinedPartitionColumns,
+      None,
+      paths,
+      parameters)(sqlContext)
+  }
+
   val options: JSONOptions = JSONOptions.createFromConfigMap(parameters)
 
   /** Constraints to be imposed on schema to be stored. */
@@ -158,13 +178,14 @@ private[sql] class JSONRelation(
       partitionColumns)
   }
 
-  override def prepareJobForWrite(job: Job): OutputWriterFactory = {
-    new OutputWriterFactory {
+  override def prepareJobForWrite(job: Job): BucketedOutputWriterFactory = {
+    new BucketedOutputWriterFactory {
       override def newInstance(
           path: String,
+          bucketId: Option[Int],
           dataSchema: StructType,
           context: TaskAttemptContext): OutputWriter = {
-        new JsonOutputWriter(path, dataSchema, context)
+        new JsonOutputWriter(path, bucketId, dataSchema, context)
       }
     }
   }
@@ -172,6 +193,7 @@ private[sql] class JSONRelation(
 
 private[json] class JsonOutputWriter(
     path: String,
+    bucketId: Option[Int],
     dataSchema: StructType,
     context: TaskAttemptContext)
   extends OutputWriter with Logging {
@@ -188,7 +210,8 @@ private[json] class JsonOutputWriter(
         val uniqueWriteJobId = 
configuration.get("spark.sql.sources.writeJobUUID")
         val taskAttemptId = context.getTaskAttemptID
         val split = taskAttemptId.getTaskID.getId
-        new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$extension")
+        val bucketString = bucketId.map(id => f"-$id%05d").getOrElse("")
+        new Path(path, 
f"part-r-$split%05d-$uniqueWriteJobId$bucketString$extension")
       }
     }.getRecordWriter(context)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/917d3fc0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index 45f1dff..4b375de 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -45,13 +45,13 @@ import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.{RDD, SqlNewHadoopPartition, SqlNewHadoopRDD}
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.catalyst.util.LegacyTypeStringParser
-import org.apache.spark.sql.execution.datasources.PartitionSpec
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{DataType, StructType}
 import org.apache.spark.util.{SerializableConfiguration, Utils}
 
-private[sql] class DefaultSource extends HadoopFsRelationProvider with 
DataSourceRegister {
+private[sql] class DefaultSource extends BucketedHadoopFsRelationProvider with 
DataSourceRegister {
 
   override def shortName(): String = "parquet"
 
@@ -60,13 +60,17 @@ private[sql] class DefaultSource extends 
HadoopFsRelationProvider with DataSourc
       paths: Array[String],
       schema: Option[StructType],
       partitionColumns: Option[StructType],
+      bucketSpec: Option[BucketSpec],
       parameters: Map[String, String]): HadoopFsRelation = {
-    new ParquetRelation(paths, schema, None, partitionColumns, 
parameters)(sqlContext)
+    new ParquetRelation(paths, schema, None, partitionColumns, bucketSpec, 
parameters)(sqlContext)
   }
 }
 
 // NOTE: This class is instantiated and used on executor side only, no need to 
be serializable.
-private[sql] class ParquetOutputWriter(path: String, context: 
TaskAttemptContext)
+private[sql] class ParquetOutputWriter(
+    path: String,
+    bucketId: Option[Int],
+    context: TaskAttemptContext)
   extends OutputWriter {
 
   private val recordWriter: RecordWriter[Void, InternalRow] = {
@@ -86,7 +90,8 @@ private[sql] class ParquetOutputWriter(path: String, context: 
TaskAttemptContext
           val uniqueWriteJobId = 
configuration.get("spark.sql.sources.writeJobUUID")
           val taskAttemptId = context.getTaskAttemptID
           val split = taskAttemptId.getTaskID.getId
-          new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$extension")
+          val bucketString = bucketId.map(id => f"-$id%05d").getOrElse("")
+          new Path(path, 
f"part-r-$split%05d-$uniqueWriteJobId$bucketString$extension")
         }
       }
     }
@@ -107,6 +112,7 @@ private[sql] class ParquetRelation(
     // This is for metastore conversion.
     private val maybePartitionSpec: Option[PartitionSpec],
     override val userDefinedPartitionColumns: Option[StructType],
+    override val bucketSpec: Option[BucketSpec],
     parameters: Map[String, String])(
     val sqlContext: SQLContext)
   extends HadoopFsRelation(maybePartitionSpec, parameters)
@@ -123,6 +129,7 @@ private[sql] class ParquetRelation(
       maybeDataSchema,
       maybePartitionSpec,
       maybePartitionSpec.map(_.partitionColumns),
+      None,
       parameters)(sqlContext)
   }
 
@@ -216,7 +223,7 @@ private[sql] class ParquetRelation(
 
   override def sizeInBytes: Long = metadataCache.dataStatuses.map(_.getLen).sum
 
-  override def prepareJobForWrite(job: Job): OutputWriterFactory = {
+  override def prepareJobForWrite(job: Job): BucketedOutputWriterFactory = {
     val conf = ContextUtil.getConfiguration(job)
 
     // SPARK-9849 DirectParquetOutputCommitter qualified name should be 
backward compatible
@@ -276,10 +283,13 @@ private[sql] class ParquetRelation(
           sqlContext.conf.parquetCompressionCodec.toUpperCase,
           CompressionCodecName.UNCOMPRESSED).name())
 
-    new OutputWriterFactory {
+    new BucketedOutputWriterFactory {
       override def newInstance(
-          path: String, dataSchema: StructType, context: TaskAttemptContext): 
OutputWriter = {
-        new ParquetOutputWriter(path, context)
+          path: String,
+          bucketId: Option[Int],
+          dataSchema: StructType,
+          context: TaskAttemptContext): OutputWriter = {
+        new ParquetOutputWriter(path, bucketId, context)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/917d3fc0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 50ecbd3..d484403 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources
 
 import org.apache.spark.sql.{AnalysisException, SaveMode, SQLContext}
 import org.apache.spark.sql.catalyst.analysis._
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast}
+import org.apache.spark.sql.catalyst.expressions.{RowOrdering, Alias, 
Attribute, Cast}
 import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
@@ -165,22 +165,22 @@ private[sql] case class PreWriteCheck(catalog: Catalog) 
extends (LogicalPlan =>
           // OK
         }
 
-      case CreateTableUsingAsSelect(tableIdent, _, _, partitionColumns, mode, 
_, query) =>
+      case c: CreateTableUsingAsSelect =>
         // When the SaveMode is Overwrite, we need to check if the table is an 
input table of
         // the query. If so, we will throw an AnalysisException to let users 
know it is not allowed.
-        if (mode == SaveMode.Overwrite && catalog.tableExists(tableIdent)) {
+        if (c.mode == SaveMode.Overwrite && catalog.tableExists(c.tableIdent)) 
{
           // Need to remove SubQuery operator.
-          EliminateSubQueries(catalog.lookupRelation(tableIdent)) match {
+          EliminateSubQueries(catalog.lookupRelation(c.tableIdent)) match {
             // Only do the check if the table is a data source table
             // (the relation is a BaseRelation).
             case l @ LogicalRelation(dest: BaseRelation, _) =>
               // Get all input data source relations of the query.
-              val srcRelations = query.collect {
+              val srcRelations = c.child.collect {
                 case LogicalRelation(src: BaseRelation, _) => src
               }
               if (srcRelations.contains(dest)) {
                 failAnalysis(
-                  s"Cannot overwrite table $tableIdent that is also being read 
from.")
+                  s"Cannot overwrite table ${c.tableIdent} that is also being 
read from.")
               } else {
                 // OK
               }
@@ -192,7 +192,17 @@ private[sql] case class PreWriteCheck(catalog: Catalog) 
extends (LogicalPlan =>
         }
 
         PartitioningUtils.validatePartitionColumnDataTypes(
-          query.schema, partitionColumns, catalog.conf.caseSensitiveAnalysis)
+          c.child.schema, c.partitionColumns, 
catalog.conf.caseSensitiveAnalysis)
+
+        for {
+          spec <- c.bucketSpec
+          sortColumnName <- spec.sortColumnNames
+          sortColumn <- c.child.schema.find(_.name == sortColumnName)
+        } {
+          if (!RowOrdering.isOrderable(sortColumn.dataType)) {
+            failAnalysis(s"Cannot use ${sortColumn.dataType.simpleString} for 
sorting column.")
+          }
+        }
 
       case _ => // OK
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/917d3fc0/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index f4c7f0a..c35f331 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -21,7 +21,7 @@ import scala.collection.mutable
 import scala.util.Try
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
 import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
 import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
 
@@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.{CatalystTypeConverters, 
InternalRow}
 import org.apache.spark.sql.catalyst.expressions._
 import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
 import org.apache.spark.sql.execution.{FileRelation, RDDConversions}
-import org.apache.spark.sql.execution.datasources.{Partition, 
PartitioningUtils, PartitionSpec}
+import org.apache.spark.sql.execution.datasources.{BucketSpec, Partition, 
PartitioningUtils, PartitionSpec}
 import org.apache.spark.sql.types.{StringType, StructType}
 import org.apache.spark.util.SerializableConfiguration
 
@@ -161,6 +161,20 @@ trait HadoopFsRelationProvider {
       dataSchema: Option[StructType],
       partitionColumns: Option[StructType],
       parameters: Map[String, String]): HadoopFsRelation
+
+  // TODO: expose bucket API to users.
+  private[sql] def createRelation(
+      sqlContext: SQLContext,
+      paths: Array[String],
+      dataSchema: Option[StructType],
+      partitionColumns: Option[StructType],
+      bucketSpec: Option[BucketSpec],
+      parameters: Map[String, String]): HadoopFsRelation = {
+    if (bucketSpec.isDefined) {
+      throw new AnalysisException("Currently we don't support bucketing for 
this data source.")
+    }
+    createRelation(sqlContext, paths, dataSchema, partitionColumns, parameters)
+  }
 }
 
 /**
@@ -351,7 +365,18 @@ abstract class OutputWriterFactory extends Serializable {
    *
    * @since 1.4.0
    */
-  def newInstance(path: String, dataSchema: StructType, context: 
TaskAttemptContext): OutputWriter
+  def newInstance(
+      path: String,
+      dataSchema: StructType,
+      context: TaskAttemptContext): OutputWriter
+
+  // TODO: expose bucket API to users.
+  private[sql] def newInstance(
+      path: String,
+      bucketId: Option[Int],
+      dataSchema: StructType,
+      context: TaskAttemptContext): OutputWriter =
+    newInstance(path, dataSchema, context)
 }
 
 /**
@@ -435,6 +460,9 @@ abstract class HadoopFsRelation private[sql](
 
   private var _partitionSpec: PartitionSpec = _
 
+  // TODO: expose bucket API to users.
+  private[sql] def bucketSpec: Option[BucketSpec] = None
+
   private class FileStatusCache {
     var leafFiles = mutable.LinkedHashMap.empty[Path, FileStatus]
 

http://git-wip-us.apache.org/repos/asf/spark/blob/917d3fc0/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 1616c45..43d84d5 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -40,7 +40,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
 import org.apache.spark.sql.catalyst.util.DataTypeParser
 import org.apache.spark.sql.execution.{datasources, FileRelation}
-import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, 
LogicalRelation, Partition => ParquetPartition, PartitionSpec, 
ResolvedDataSource}
+import org.apache.spark.sql.execution.datasources.{Partition => 
ParquetPartition, _}
 import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
 import org.apache.spark.sql.hive.client._
 import org.apache.spark.sql.hive.execution.HiveNativeCommand
@@ -211,6 +211,7 @@ private[hive] class HiveMetastoreCatalog(val client: 
ClientInterface, hive: Hive
       tableIdent: TableIdentifier,
       userSpecifiedSchema: Option[StructType],
       partitionColumns: Array[String],
+      bucketSpec: Option[BucketSpec],
       provider: String,
       options: Map[String, String],
       isExternal: Boolean): Unit = {
@@ -240,6 +241,25 @@ private[hive] class HiveMetastoreCatalog(val client: 
ClientInterface, hive: Hive
       }
     }
 
+    if (userSpecifiedSchema.isDefined && bucketSpec.isDefined) {
+      val BucketSpec(numBuckets, bucketColumnNames, sortColumnNames) = 
bucketSpec.get
+
+      tableProperties.put("spark.sql.sources.schema.numBuckets", 
numBuckets.toString)
+      tableProperties.put("spark.sql.sources.schema.numBucketCols",
+        bucketColumnNames.length.toString)
+      bucketColumnNames.zipWithIndex.foreach { case (bucketCol, index) =>
+        tableProperties.put(s"spark.sql.sources.schema.bucketCol.$index", 
bucketCol)
+      }
+
+      if (sortColumnNames.nonEmpty) {
+        tableProperties.put("spark.sql.sources.schema.numSortCols",
+          sortColumnNames.length.toString)
+        sortColumnNames.zipWithIndex.foreach { case (sortCol, index) =>
+          tableProperties.put(s"spark.sql.sources.schema.sortCol.$index", 
sortCol)
+        }
+      }
+    }
+
     if (userSpecifiedSchema.isEmpty && partitionColumns.length > 0) {
       // The table does not have a specified schema, which means that the 
schema will be inferred
       // when we load the table. So, we are not expecting partition columns 
and we will discover
@@ -596,6 +616,7 @@ private[hive] class HiveMetastoreCatalog(val client: 
ClientInterface, hive: Hive
             conf.defaultDataSourceName,
             temporary = false,
             Array.empty[String],
+            bucketSpec = None,
             mode,
             options = Map.empty[String, String],
             child

http://git-wip-us.apache.org/repos/asf/spark/blob/917d3fc0/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 0b4f5a0..3687dd6 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -88,10 +88,9 @@ private[hive] trait HiveStrategies {
             tableIdent, userSpecifiedSchema, provider, opts, allowExisting, 
managedIfNoPath)
         ExecutedCommand(cmd) :: Nil
 
-      case CreateTableUsingAsSelect(
-        tableIdent, provider, false, partitionCols, mode, opts, query) =>
-        val cmd =
-          CreateMetastoreDataSourceAsSelect(tableIdent, provider, 
partitionCols, mode, opts, query)
+      case c: CreateTableUsingAsSelect =>
+        val cmd = CreateMetastoreDataSourceAsSelect(c.tableIdent, c.provider, 
c.partitionColumns,
+          c.bucketSpec, c.mode, c.options, c.child)
         ExecutedCommand(cmd) :: Nil
 
       case _ => Nil

http://git-wip-us.apache.org/repos/asf/spark/blob/917d3fc0/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index 94210a5..612f01c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.execution.RunnableCommand
-import org.apache.spark.sql.execution.datasources.{LogicalRelation, 
ResolvedDataSource}
+import org.apache.spark.sql.execution.datasources.{BucketSpec, 
LogicalRelation, ResolvedDataSource}
 import org.apache.spark.sql.hive.HiveContext
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
@@ -151,6 +151,7 @@ case class CreateMetastoreDataSource(
       tableIdent,
       userSpecifiedSchema,
       Array.empty[String],
+      bucketSpec = None,
       provider,
       optionsWithPath,
       isExternal)
@@ -164,6 +165,7 @@ case class CreateMetastoreDataSourceAsSelect(
     tableIdent: TableIdentifier,
     provider: String,
     partitionColumns: Array[String],
+    bucketSpec: Option[BucketSpec],
     mode: SaveMode,
     options: Map[String, String],
     query: LogicalPlan) extends RunnableCommand {
@@ -254,8 +256,14 @@ case class CreateMetastoreDataSourceAsSelect(
     }
 
     // Create the relation based on the data of df.
-    val resolved =
-      ResolvedDataSource(sqlContext, provider, partitionColumns, mode, 
optionsWithPath, df)
+    val resolved = ResolvedDataSource(
+      sqlContext,
+      provider,
+      partitionColumns,
+      bucketSpec,
+      mode,
+      optionsWithPath,
+      df)
 
     if (createMetastoreTable) {
       // We will use the schema of resolved.relation as the schema of the 
table (instead of
@@ -265,6 +273,7 @@ case class CreateMetastoreDataSourceAsSelect(
         tableIdent,
         Some(resolved.relation.schema),
         partitionColumns,
+        bucketSpec,
         provider,
         optionsWithPath,
         isExternal)

http://git-wip-us.apache.org/repos/asf/spark/blob/917d3fc0/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index 3538d64..14fa152 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -37,13 +37,13 @@ import org.apache.spark.rdd.{HadoopRDD, RDD}
 import org.apache.spark.sql.{Row, SQLContext}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.execution.datasources.PartitionSpec
+import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.hive.{HiveContext, HiveInspectors, 
HiveMetastoreTypes, HiveShim}
 import org.apache.spark.sql.sources.{Filter, _}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.SerializableConfiguration
 
-private[sql] class DefaultSource extends HadoopFsRelationProvider with 
DataSourceRegister {
+private[sql] class DefaultSource extends BucketedHadoopFsRelationProvider with 
DataSourceRegister {
 
   override def shortName(): String = "orc"
 
@@ -52,17 +52,19 @@ private[sql] class DefaultSource extends 
HadoopFsRelationProvider with DataSourc
       paths: Array[String],
       dataSchema: Option[StructType],
       partitionColumns: Option[StructType],
+      bucketSpec: Option[BucketSpec],
       parameters: Map[String, String]): HadoopFsRelation = {
     assert(
       sqlContext.isInstanceOf[HiveContext],
       "The ORC data source can only be used with HiveContext.")
 
-    new OrcRelation(paths, dataSchema, None, partitionColumns, 
parameters)(sqlContext)
+    new OrcRelation(paths, dataSchema, None, partitionColumns, bucketSpec, 
parameters)(sqlContext)
   }
 }
 
 private[orc] class OrcOutputWriter(
     path: String,
+    bucketId: Option[Int],
     dataSchema: StructType,
     context: TaskAttemptContext)
   extends OutputWriter with HiveInspectors {
@@ -101,7 +103,8 @@ private[orc] class OrcOutputWriter(
     val uniqueWriteJobId = conf.get("spark.sql.sources.writeJobUUID")
     val taskAttemptId = context.getTaskAttemptID
     val partition = taskAttemptId.getTaskID.getId
-    val filename = f"part-r-$partition%05d-$uniqueWriteJobId.orc"
+    val bucketString = bucketId.map(id => f"-$id%05d").getOrElse("")
+    val filename = f"part-r-$partition%05d-$uniqueWriteJobId$bucketString.orc"
 
     new OrcOutputFormat().getRecordWriter(
       new Path(path, filename).getFileSystem(conf),
@@ -153,6 +156,7 @@ private[sql] class OrcRelation(
     maybeDataSchema: Option[StructType],
     maybePartitionSpec: Option[PartitionSpec],
     override val userDefinedPartitionColumns: Option[StructType],
+    override val bucketSpec: Option[BucketSpec],
     parameters: Map[String, String])(
     @transient val sqlContext: SQLContext)
   extends HadoopFsRelation(maybePartitionSpec, parameters)
@@ -169,6 +173,7 @@ private[sql] class OrcRelation(
       maybeDataSchema,
       maybePartitionSpec,
       maybePartitionSpec.map(_.partitionColumns),
+      None,
       parameters)(sqlContext)
   }
 
@@ -205,7 +210,7 @@ private[sql] class OrcRelation(
     OrcTableScan(output, this, filters, inputPaths).execute()
   }
 
-  override def prepareJobForWrite(job: Job): OutputWriterFactory = {
+  override def prepareJobForWrite(job: Job): BucketedOutputWriterFactory = {
     job.getConfiguration match {
       case conf: JobConf =>
         conf.setOutputFormat(classOf[OrcOutputFormat])
@@ -216,12 +221,13 @@ private[sql] class OrcRelation(
           classOf[MapRedOutputFormat[_, _]])
     }
 
-    new OutputWriterFactory {
+    new BucketedOutputWriterFactory {
       override def newInstance(
           path: String,
+          bucketId: Option[Int],
           dataSchema: StructType,
           context: TaskAttemptContext): OutputWriter = {
-        new OrcOutputWriter(path, dataSchema, context)
+        new OrcOutputWriter(path, bucketId, dataSchema, context)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/917d3fc0/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index e22dac3..202851a 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -707,6 +707,7 @@ class MetastoreDataSourcesSuite extends QueryTest with 
SQLTestUtils with TestHiv
           tableIdent = TableIdentifier("wide_schema"),
           userSpecifiedSchema = Some(schema),
           partitionColumns = Array.empty[String],
+          bucketSpec = None,
           provider = "json",
           options = Map("path" -> "just a dummy path"),
           isExternal = false)

http://git-wip-us.apache.org/repos/asf/spark/blob/917d3fc0/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
new file mode 100644
index 0000000..579da02
--- /dev/null
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources
+
+import java.io.File
+
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.test.SQLTestUtils
+import org.apache.spark.sql.{AnalysisException, QueryTest}
+
+class BucketedWriteSuite extends QueryTest with SQLTestUtils with 
TestHiveSingleton {
+  import testImplicits._
+
+  test("bucketed by non-existing column") {
+    val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j")
+    intercept[AnalysisException](df.write.bucketBy(2, "k").saveAsTable("tt"))
+  }
+
+  test("numBuckets not greater than 0 or less than 100000") {
+    val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j")
+    intercept[IllegalArgumentException](df.write.bucketBy(0, 
"i").saveAsTable("tt"))
+    intercept[IllegalArgumentException](df.write.bucketBy(100000, 
"i").saveAsTable("tt"))
+  }
+
+  test("specify sorting columns without bucketing columns") {
+    val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j")
+    intercept[IllegalArgumentException](df.write.sortBy("j").saveAsTable("tt"))
+  }
+
+  test("sorting by non-orderable column") {
+    val df = Seq("a" -> Map(1 -> 1), "b" -> Map(2 -> 2)).toDF("i", "j")
+    intercept[AnalysisException](df.write.bucketBy(2, 
"i").sortBy("j").saveAsTable("tt"))
+  }
+
+  test("write bucketed data to unsupported data source") {
+    val df = Seq(Tuple1("a"), Tuple1("b")).toDF("i")
+    intercept[AnalysisException](df.write.bucketBy(3, 
"i").format("text").saveAsTable("tt"))
+  }
+
+  test("write bucketed data to non-hive-table or existing hive table") {
+    val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j")
+    intercept[IllegalArgumentException](df.write.bucketBy(2, 
"i").parquet("/tmp/path"))
+    intercept[IllegalArgumentException](df.write.bucketBy(2, 
"i").json("/tmp/path"))
+    intercept[IllegalArgumentException](df.write.bucketBy(2, 
"i").insertInto("tt"))
+  }
+
+  private val testFileName = """.*-(\d+)$""".r
+  private val otherFileName = """.*-(\d+)\..*""".r
+  private def getBucketId(fileName: String): Int = {
+    fileName match {
+      case testFileName(bucketId) => bucketId.toInt
+      case otherFileName(bucketId) => bucketId.toInt
+    }
+  }
+
+  private def testBucketing(
+      dataDir: File,
+      source: String,
+      bucketCols: Seq[String],
+      sortCols: Seq[String] = Nil): Unit = {
+    val allBucketFiles = dataDir.listFiles().filterNot(f =>
+      f.getName.startsWith(".") || f.getName.startsWith("_")
+    )
+    val groupedBucketFiles = allBucketFiles.groupBy(f => 
getBucketId(f.getName))
+    assert(groupedBucketFiles.size <= 8)
+
+    for ((bucketId, bucketFiles) <- groupedBucketFiles) {
+      for (bucketFile <- bucketFiles) {
+        val df = 
sqlContext.read.format(source).load(bucketFile.getAbsolutePath)
+          .select((bucketCols ++ sortCols).map(col): _*)
+
+        if (sortCols.nonEmpty) {
+          checkAnswer(df.sort(sortCols.map(col): _*), df.collect())
+        }
+
+        val rows = df.select(bucketCols.map(col): 
_*).queryExecution.toRdd.map(_.copy()).collect()
+
+        for (row <- rows) {
+          assert(row.isInstanceOf[UnsafeRow])
+          val actualBucketId = (row.hashCode() % 8 + 8) % 8
+          assert(actualBucketId == bucketId)
+        }
+      }
+    }
+  }
+
+  private val df = (0 until 50).map(i => (i % 5, i % 13, 
i.toString)).toDF("i", "j", "k")
+
+  test("write bucketed data") {
+    for (source <- Seq("parquet", "json", "orc")) {
+      withTable("bucketed_table") {
+        df.write
+          .format(source)
+          .partitionBy("i")
+          .bucketBy(8, "j", "k")
+          .saveAsTable("bucketed_table")
+
+        val tableDir = new File(hiveContext.warehousePath, "bucketed_table")
+        for (i <- 0 until 5) {
+          testBucketing(new File(tableDir, s"i=$i"), source, Seq("j", "k"))
+        }
+      }
+    }
+  }
+
+  test("write bucketed data with sortBy") {
+    for (source <- Seq("parquet", "json", "orc")) {
+      withTable("bucketed_table") {
+        df.write
+          .format(source)
+          .partitionBy("i")
+          .bucketBy(8, "j")
+          .sortBy("k")
+          .saveAsTable("bucketed_table")
+
+        val tableDir = new File(hiveContext.warehousePath, "bucketed_table")
+        for (i <- 0 until 5) {
+          testBucketing(new File(tableDir, s"i=$i"), source, Seq("j"), 
Seq("k"))
+        }
+      }
+    }
+  }
+
+  test("write bucketed data without partitionBy") {
+    for (source <- Seq("parquet", "json", "orc")) {
+      withTable("bucketed_table") {
+        df.write
+          .format(source)
+          .bucketBy(8, "i", "j")
+          .saveAsTable("bucketed_table")
+
+        val tableDir = new File(hiveContext.warehousePath, "bucketed_table")
+        testBucketing(tableDir, source, Seq("i", "j"))
+      }
+    }
+  }
+
+  test("write bucketed data without partitionBy with sortBy") {
+    for (source <- Seq("parquet", "json", "orc")) {
+      withTable("bucketed_table") {
+        df.write
+          .format(source)
+          .bucketBy(8, "i", "j")
+          .sortBy("k")
+          .saveAsTable("bucketed_table")
+
+        val tableDir = new File(hiveContext.warehousePath, "bucketed_table")
+        testBucketing(tableDir, source, Seq("i", "j"), Seq("k"))
+      }
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to