This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ac88b12f86b [SPARK-44886][SQL] Introduce CLUSTER BY clause for 
CREATE/REPLACE TABLE
5ac88b12f86b is described below

commit 5ac88b12f86b306e7612591154c26aebabb957a8
Author: Terry Kim <yumin...@gmail.com>
AuthorDate: Thu Nov 9 19:30:12 2023 +0800

    [SPARK-44886][SQL] Introduce CLUSTER BY clause for CREATE/REPLACE TABLE
    
    ### What changes were proposed in this pull request?
    This proposes to introduce `CLUSTER BY` SQL clause to CREATE/REPLACE SQL 
syntax:
    
    ```
    CREATE TABLE tbl(a int, b string) CLUSTER BY (a, b)
    ```
    
    This doesn't introduce a default implementation for clustering, but it's up 
to the catalog/datasource implementation to utilize the clustering information 
(e.g., Delta, Iceberg, etc.).
    
    ### Why are the changes needed?
    To introduce the concept of clustering to datasources.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, this introduces a new SQL keyword.
    
    ### How was this patch tested?
    Added extensive unit tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #42577 from imback82/cluster_by.
    
    Lead-authored-by: Terry Kim <yumin...@gmail.com>
    Co-authored-by: Terry Kim <terry....@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../src/main/resources/error/error-classes.json    |  12 +++
 docs/sql-error-conditions.md                       |  12 +++
 .../spark/sql/catalyst/parser/SqlBaseParser.g4     |   5 +
 .../spark/sql/errors/QueryParsingErrors.scala      |   8 ++
 .../spark/sql/catalyst/catalog/interface.scala     |  63 +++++++++++-
 .../spark/sql/catalyst/parser/AstBuilder.scala     |  46 +++++++--
 .../sql/connector/catalog/CatalogV2Implicits.scala |  26 ++++-
 .../sql/connector/expressions/expressions.scala    |  36 +++++++
 .../spark/sql/catalyst/parser/DDLParserSuite.scala | 110 ++++++++++++++++++++-
 .../sql/connector/catalog/InMemoryBaseTable.scala  |   1 +
 .../expressions/TransformExtractorSuite.scala      |  43 +++++++-
 .../catalyst/analysis/ResolveSessionCatalog.scala  |   8 +-
 .../spark/sql/execution/SparkSqlParser.scala       |   3 +-
 .../datasources/v2/V2SessionCatalog.scala          |   8 +-
 .../apache/spark/sql/internal/CatalogImpl.scala    |   3 +-
 .../command/CreateTableClusterBySuiteBase.scala    |  83 ++++++++++++++++
 .../command/v1/CreateTableClusterBySuite.scala     |  51 ++++++++++
 .../command/v2/CreateTableClusterBySuite.scala     |  50 ++++++++++
 .../command/CreateTableClusterBySuite.scala        |  39 ++++++++
 19 files changed, 583 insertions(+), 24 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-classes.json 
b/common/utils/src/main/resources/error/error-classes.json
index c38171c3d9e6..26f6c0240afb 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -2963,6 +2963,18 @@
     ],
     "sqlState" : "42601"
   },
+  "SPECIFY_CLUSTER_BY_WITH_BUCKETING_IS_NOT_ALLOWED" : {
+    "message" : [
+      "Cannot specify both CLUSTER BY and CLUSTERED BY INTO BUCKETS."
+    ],
+    "sqlState" : "42908"
+  },
+  "SPECIFY_CLUSTER_BY_WITH_PARTITIONED_BY_IS_NOT_ALLOWED" : {
+    "message" : [
+      "Cannot specify both CLUSTER BY and PARTITIONED BY."
+    ],
+    "sqlState" : "42908"
+  },
   "SPECIFY_PARTITION_IS_NOT_ALLOWED" : {
     "message" : [
       "A CREATE TABLE without explicit column list cannot specify PARTITIONED 
BY.",
diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md
index 8a5faa15dc9c..2cb433b19fa5 100644
--- a/docs/sql-error-conditions.md
+++ b/docs/sql-error-conditions.md
@@ -1852,6 +1852,18 @@ A CREATE TABLE without explicit column list cannot 
specify bucketing information
 Please use the form with explicit column list and specify bucketing 
information.
 Alternatively, allow bucketing information to be inferred by omitting the 
clause.
 
+### SPECIFY_CLUSTER_BY_WITH_BUCKETING_IS_NOT_ALLOWED
+
+[SQLSTATE: 
42908](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
+
+Cannot specify both CLUSTER BY and CLUSTERED BY INTO BUCKETS.
+
+### SPECIFY_CLUSTER_BY_WITH_PARTITIONED_BY_IS_NOT_ALLOWED
+
+[SQLSTATE: 
42908](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
+
+Cannot specify both CLUSTER BY and PARTITIONED BY.
+
 ### SPECIFY_PARTITION_IS_NOT_ALLOWED
 
 [SQLSTATE: 
42601](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
diff --git 
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
index 84a31dafed98..bd449a4e194e 100644
--- 
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
+++ 
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
@@ -298,6 +298,10 @@ replaceTableHeader
     : (CREATE OR)? REPLACE TABLE identifierReference
     ;
 
+clusterBySpec
+    : CLUSTER BY LEFT_PAREN multipartIdentifierList RIGHT_PAREN
+    ;
+
 bucketSpec
     : CLUSTERED BY identifierList
       (SORTED BY orderedIdentifierList)?
@@ -383,6 +387,7 @@ createTableClauses
     :((OPTIONS options=expressionPropertyList) |
      (PARTITIONED BY partitioning=partitionFieldList) |
      skewSpec |
+     clusterBySpec |
      bucketSpec |
      rowFormat |
      createFileFormat |
diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala
index 2067bf7d0955..841a678144f5 100644
--- 
a/sql/api/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala
+++ 
b/sql/api/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala
@@ -683,4 +683,12 @@ private[sql] object QueryParsingErrors extends 
DataTypeErrorsBase {
       ctx
     )
   }
+
+  def clusterByWithPartitionedBy(ctx: ParserRuleContext): Throwable = {
+    new ParseException(errorClass = 
"SPECIFY_CLUSTER_BY_WITH_PARTITIONED_BY_IS_NOT_ALLOWED", ctx)
+  }
+
+  def clusterByWithBucketing(ctx: ParserRuleContext): Throwable = {
+    new ParseException(errorClass = 
"SPECIFY_CLUSTER_BY_WITH_BUCKETING_IS_NOT_ALLOWED", ctx)
+  }
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 634afb47ea5e..066dbd9fad15 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -24,6 +24,9 @@ import java.util.Date
 import scala.collection.mutable
 import scala.util.control.NonFatal
 
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.{ClassTagExtensions, 
DefaultScalaModule}
 import org.apache.commons.lang3.StringUtils
 import org.json4s.JsonAST.{JArray, JString}
 import org.json4s.jackson.JsonMethods._
@@ -31,7 +34,7 @@ import org.json4s.jackson.JsonMethods._
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.{CurrentUserContext, FunctionIdentifier, 
InternalRow, SQLConfHelper, TableIdentifier}
-import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, 
UnresolvedLeafNode}
+import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, 
Resolver, UnresolvedLeafNode}
 import 
org.apache.spark.sql.catalyst.catalog.CatalogTable.VIEW_STORING_ANALYZED_PLAN
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, 
AttributeReference, Cast, ExprId, Literal}
 import org.apache.spark.sql.catalyst.plans.logical._
@@ -39,10 +42,11 @@ import 
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUti
 import org.apache.spark.sql.catalyst.types.DataTypeUtils
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.connector.catalog.CatalogManager
+import org.apache.spark.sql.connector.expressions.{FieldReference, 
NamedReference}
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
-import org.apache.spark.sql.util.CaseInsensitiveStringMap
+import org.apache.spark.sql.util.{CaseInsensitiveStringMap, SchemaUtils}
 
 
 /**
@@ -170,6 +174,55 @@ case class CatalogTablePartition(
   }
 }
 
+/**
+ * A container for clustering information.
+ *
+ * @param columnNames the names of the columns used for clustering.
+ */
+case class ClusterBySpec(columnNames: Seq[NamedReference]) {
+  override def toString: String = toJson
+
+  def toJson: String = 
ClusterBySpec.mapper.writeValueAsString(columnNames.map(_.fieldNames))
+}
+
+object ClusterBySpec {
+  private val mapper = {
+    val ret = new ObjectMapper() with ClassTagExtensions
+    ret.setSerializationInclusion(Include.NON_ABSENT)
+    ret.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+    ret.registerModule(DefaultScalaModule)
+    ret
+  }
+
+  def fromProperty(columns: String): ClusterBySpec = {
+    
ClusterBySpec(mapper.readValue[Seq[Seq[String]]](columns).map(FieldReference(_)))
+  }
+
+  def toProperty(
+      schema: StructType,
+      clusterBySpec: ClusterBySpec,
+      resolver: Resolver): (String, String) = {
+    CatalogTable.PROP_CLUSTERING_COLUMNS ->
+      normalizeClusterBySpec(schema, clusterBySpec, resolver).toJson
+  }
+
+  private def normalizeClusterBySpec(
+      schema: StructType,
+      clusterBySpec: ClusterBySpec,
+      resolver: Resolver): ClusterBySpec = {
+    val normalizedColumns = clusterBySpec.columnNames.map { columnName =>
+      val position = SchemaUtils.findColumnPosition(
+        columnName.fieldNames(), schema, resolver)
+      FieldReference(SchemaUtils.getColumnName(position, schema))
+    }
+
+    SchemaUtils.checkColumnNameDuplication(
+      normalizedColumns.map(_.toString),
+      resolver)
+
+    ClusterBySpec(normalizedColumns)
+  }
+}
 
 /**
  * A container for bucketing information.
@@ -462,6 +515,10 @@ case class CatalogTable(
       if (value.isEmpty) key else s"$key: $value"
     }.mkString("", "\n", "")
   }
+
+  lazy val clusterBySpec: Option[ClusterBySpec] = {
+    properties.get(PROP_CLUSTERING_COLUMNS).map(ClusterBySpec.fromProperty)
+  }
 }
 
 object CatalogTable {
@@ -499,6 +556,8 @@ object CatalogTable {
 
   val VIEW_STORING_ANALYZED_PLAN = VIEW_PREFIX + "storingAnalyzedPlan"
 
+  val PROP_CLUSTERING_COLUMNS: String = "clusteringColumns"
+
   def splitLargeTableProp(
       key: String,
       value: String,
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 6d70ad29f876..eb501f56d81c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -33,7 +33,7 @@ import org.apache.spark.{SparkArithmeticException, 
SparkException}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, SQLConfHelper, 
TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis._
-import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat}
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, 
CatalogStorageFormat, ClusterBySpec}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate.{AnyValue, First, 
Last, PercentileCont, PercentileDisc}
 import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
@@ -3241,6 +3241,15 @@ class AstBuilder extends DataTypeAstBuilder with 
SQLConfHelper with Logging {
           })
   }
 
+  /**
+   * Create a [[ClusterBySpec]].
+   */
+  override def visitClusterBySpec(ctx: ClusterBySpecContext): ClusterBySpec = 
withOrigin(ctx) {
+    val columnNames = ctx.multipartIdentifierList.multipartIdentifier.asScala
+      .map(typedVisit[Seq[String]]).map(FieldReference(_)).toSeq
+    ClusterBySpec(columnNames)
+  }
+
   /**
    * Convert a property list into a key-value map.
    * This should be called through [[visitPropertyKeyValues]] or 
[[visitPropertyKeys]].
@@ -3341,6 +3350,7 @@ class AstBuilder extends DataTypeAstBuilder with 
SQLConfHelper with Logging {
    * - location
    * - comment
    * - serde
+   * - clusterBySpec
    *
    * Note: Partition transforms are based on existing table schema definition. 
It can be simple
    * column names, or functions like `year(date_col)`. Partition columns are 
column names with data
@@ -3348,7 +3358,7 @@ class AstBuilder extends DataTypeAstBuilder with 
SQLConfHelper with Logging {
    */
   type TableClauses = (
       Seq[Transform], Seq[StructField], Option[BucketSpec], Map[String, 
String],
-      OptionList, Option[String], Option[String], Option[SerdeInfo])
+      OptionList, Option[String], Option[String], Option[SerdeInfo], 
Option[ClusterBySpec])
 
   /**
    * Validate a create table statement and return the [[TableIdentifier]].
@@ -3809,6 +3819,7 @@ class AstBuilder extends DataTypeAstBuilder with 
SQLConfHelper with Logging {
     checkDuplicateClauses(ctx.rowFormat, "ROW FORMAT", ctx)
     checkDuplicateClauses(ctx.commentSpec(), "COMMENT", ctx)
     checkDuplicateClauses(ctx.bucketSpec(), "CLUSTERED BY", ctx)
+    checkDuplicateClauses(ctx.clusterBySpec(), "CLUSTER BY", ctx)
     checkDuplicateClauses(ctx.locationSpec, "LOCATION", ctx)
 
     if (ctx.skewSpec.size > 0) {
@@ -3827,8 +3838,19 @@ class AstBuilder extends DataTypeAstBuilder with 
SQLConfHelper with Logging {
     val comment = visitCommentSpecList(ctx.commentSpec())
     val serdeInfo =
       getSerdeInfo(ctx.rowFormat.asScala.toSeq, 
ctx.createFileFormat.asScala.toSeq, ctx)
+    val clusterBySpec = 
ctx.clusterBySpec().asScala.headOption.map(visitClusterBySpec)
+
+    if (clusterBySpec.isDefined) {
+      if (partCols.nonEmpty || partTransforms.nonEmpty) {
+        throw QueryParsingErrors.clusterByWithPartitionedBy(ctx)
+      }
+      if (bucketSpec.isDefined) {
+        throw QueryParsingErrors.clusterByWithBucketing(ctx)
+      }
+    }
+
     (partTransforms, partCols, bucketSpec, cleanedProperties, cleanedOptions, 
newLocation, comment,
-      serdeInfo)
+      serdeInfo, clusterBySpec)
   }
 
   protected def getSerdeInfo(
@@ -3881,6 +3903,7 @@ class AstBuilder extends DataTypeAstBuilder with 
SQLConfHelper with Logging {
    *     [OPTIONS table_property_list]
    *     [ROW FORMAT row_format]
    *     [STORED AS file_format]
+   *     [CLUSTER BY (col_name, col_name, ...)]
    *     [CLUSTERED BY (col_name, col_name, ...)
    *       [SORTED BY (col_name [ASC|DESC], ...)]
    *       INTO num_buckets BUCKETS
@@ -3902,7 +3925,7 @@ class AstBuilder extends DataTypeAstBuilder with 
SQLConfHelper with Logging {
       .map(visitCreateOrReplaceTableColTypeList).getOrElse(Nil)
     val provider = Option(ctx.tableProvider).map(_.multipartIdentifier.getText)
     val (partTransforms, partCols, bucketSpec, properties, options, location,
-      comment, serdeInfo) = visitCreateTableClauses(ctx.createTableClauses())
+      comment, serdeInfo, clusterBySpec) = 
visitCreateTableClauses(ctx.createTableClauses())
 
     if (provider.isDefined && serdeInfo.isDefined) {
       operationNotAllowed(s"CREATE TABLE ... USING ... 
${serdeInfo.get.describe}", ctx)
@@ -3915,7 +3938,10 @@ class AstBuilder extends DataTypeAstBuilder with 
SQLConfHelper with Logging {
     }
 
     val partitioning =
-      partitionExpressions(partTransforms, partCols, ctx) ++ 
bucketSpec.map(_.asTransform)
+      partitionExpressions(partTransforms, partCols, ctx) ++
+        bucketSpec.map(_.asTransform) ++
+        clusterBySpec.map(_.asTransform)
+
     val tableSpec = UnresolvedTableSpec(properties, provider, options, 
location, comment,
       serdeInfo, external)
 
@@ -3958,6 +3984,7 @@ class AstBuilder extends DataTypeAstBuilder with 
SQLConfHelper with Logging {
    *   replace_table_clauses (order insensitive):
    *     [OPTIONS table_property_list]
    *     [PARTITIONED BY (partition_fields)]
+   *     [CLUSTER BY (col_name, col_name, ...)]
    *     [CLUSTERED BY (col_name, col_name, ...)
    *       [SORTED BY (col_name [ASC|DESC], ...)]
    *       INTO num_buckets BUCKETS
@@ -3973,8 +4000,8 @@ class AstBuilder extends DataTypeAstBuilder with 
SQLConfHelper with Logging {
    */
   override def visitReplaceTable(ctx: ReplaceTableContext): LogicalPlan = 
withOrigin(ctx) {
     val orCreate = ctx.replaceTableHeader().CREATE() != null
-    val (partTransforms, partCols, bucketSpec, properties, options, location, 
comment, serdeInfo) =
-      visitCreateTableClauses(ctx.createTableClauses())
+    val (partTransforms, partCols, bucketSpec, properties, options, location, 
comment, serdeInfo,
+      clusterBySpec) = visitCreateTableClauses(ctx.createTableClauses())
     val columns = Option(ctx.createOrReplaceTableColTypeList())
       .map(visitCreateOrReplaceTableColTypeList).getOrElse(Nil)
     val provider = Option(ctx.tableProvider).map(_.multipartIdentifier.getText)
@@ -3984,7 +4011,10 @@ class AstBuilder extends DataTypeAstBuilder with 
SQLConfHelper with Logging {
     }
 
     val partitioning =
-      partitionExpressions(partTransforms, partCols, ctx) ++ 
bucketSpec.map(_.asTransform)
+      partitionExpressions(partTransforms, partCols, ctx) ++
+        bucketSpec.map(_.asTransform) ++
+        clusterBySpec.map(_.asTransform)
+
     val tableSpec = UnresolvedTableSpec(properties, provider, options, 
location, comment,
       serdeInfo, external = false)
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala
index 8843a9fa237f..0c49f9e46730 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala
@@ -19,13 +19,14 @@ package org.apache.spark.sql.connector.catalog
 
 import scala.collection.mutable
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
-import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, ClusterBySpec}
 import org.apache.spark.sql.catalyst.expressions.AttributeReference
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.catalyst.types.DataTypeUtils
 import org.apache.spark.sql.catalyst.util.{quoteIfNeeded, QuotingUtils}
-import org.apache.spark.sql.connector.expressions.{BucketTransform, 
FieldReference, IdentityTransform, LogicalExpressions, Transform}
+import org.apache.spark.sql.connector.expressions.{BucketTransform, 
ClusterByTransform, FieldReference, IdentityTransform, LogicalExpressions, 
Transform}
 import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
 import org.apache.spark.sql.types.StructType
 
@@ -53,10 +54,15 @@ private[sql] object CatalogV2Implicits {
     }
   }
 
+  implicit class ClusterByHelper(spec: ClusterBySpec) {
+    def asTransform: Transform = clusterBy(spec.columnNames.toArray)
+  }
+
   implicit class TransformHelper(transforms: Seq[Transform]) {
-    def convertTransforms: (Seq[String], Option[BucketSpec]) = {
+    def convertTransforms: (Seq[String], Option[BucketSpec], 
Option[ClusterBySpec]) = {
       val identityCols = new mutable.ArrayBuffer[String]
       var bucketSpec = Option.empty[BucketSpec]
+      var clusterBySpec = Option.empty[ClusterBySpec]
 
       transforms.map {
         case IdentityTransform(FieldReference(Seq(col))) =>
@@ -73,11 +79,23 @@ private[sql] object CatalogV2Implicits {
               sortCol.map(_.fieldNames.mkString("."))))
           }
 
+        case ClusterByTransform(columnNames) =>
+          if (clusterBySpec.nonEmpty) {
+            // AstBuilder guarantees that it only passes down one 
ClusterByTransform.
+            throw SparkException.internalError("Cannot have multiple cluster 
by transforms.")
+          }
+          clusterBySpec = Some(ClusterBySpec(columnNames))
+
         case transform =>
           throw 
QueryExecutionErrors.unsupportedPartitionTransformError(transform)
       }
 
-      (identityCols.toSeq, bucketSpec)
+      // Parser guarantees that partition and clustering cannot co-exist.
+      assert(!(identityCols.toSeq.nonEmpty && clusterBySpec.nonEmpty))
+      // Parser guarantees that bucketing and clustering cannot co-exist.
+      assert(!(bucketSpec.nonEmpty && clusterBySpec.nonEmpty))
+
+      (identityCols.toSeq, bucketSpec, clusterBySpec)
     }
   }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/expressions/expressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/expressions/expressions.scala
index fbd2520e2a77..0037f52a21b7 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/expressions/expressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/expressions/expressions.scala
@@ -52,6 +52,9 @@ private[sql] object LogicalExpressions {
       sortedCols: Array[NamedReference]): SortedBucketTransform =
     SortedBucketTransform(literal(numBuckets, IntegerType), references, 
sortedCols)
 
+  def clusterBy(references: Array[NamedReference]): ClusterByTransform =
+    ClusterByTransform(references)
+
   def identity(reference: NamedReference): IdentityTransform = 
IdentityTransform(reference)
 
   def years(reference: NamedReference): YearsTransform = 
YearsTransform(reference)
@@ -150,6 +153,39 @@ private[sql] object BucketTransform {
   }
 }
 
+/**
+ * This class represents a transform for [[ClusterBySpec]]. This is used to 
bundle
+ * ClusterBySpec in CreateTable's partitioning transforms to pass it down to 
analyzer.
+ */
+final case class ClusterByTransform(
+    columnNames: Seq[NamedReference]) extends RewritableTransform {
+
+  override val name: String = "cluster_by"
+
+  override def references: Array[NamedReference] = columnNames.toArray
+
+  override def arguments: Array[Expression] = columnNames.toArray
+
+  override def toString: String = 
s"$name(${arguments.map(_.describe).mkString(", ")})"
+
+  override def withReferences(newReferences: Seq[NamedReference]): Transform = 
{
+    this.copy(columnNames = newReferences)
+  }
+}
+
+/**
+ * Convenience extractor for ClusterByTransform.
+ */
+object ClusterByTransform {
+  def unapply(transform: Transform): Option[Seq[NamedReference]] =
+    transform match {
+      case NamedTransform("cluster_by", arguments) =>
+        Some(arguments.map(_.asInstanceOf[NamedReference]))
+      case _ =>
+        None
+    }
+}
+
 private[sql] final case class SortedBucketTransform(
     numBuckets: Literal[Int],
     columns: Seq[NamedReference],
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
index b1f734050449..2e896d563a73 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.{EqualTo, 
Hex, Literal}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.util.{GeneratedColumn, 
ResolveDefaultColumns}
 import 
org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition.{after, first}
-import org.apache.spark.sql.connector.expressions.{ApplyTransform, 
BucketTransform, DaysTransform, FieldReference, HoursTransform, 
IdentityTransform, LiteralValue, MonthsTransform, Transform, YearsTransform}
+import org.apache.spark.sql.connector.expressions.{ApplyTransform, 
BucketTransform, ClusterByTransform, DaysTransform, FieldReference, 
HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, 
YearsTransform}
 import org.apache.spark.sql.connector.expressions.LogicalExpressions.bucket
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{Decimal, IntegerType, LongType, 
MetadataBuilder, StringType, StructType, TimestampType}
@@ -190,6 +190,50 @@ class DDLParserSuite extends AnalysisTest {
     }
   }
 
+  test("create/replace table - with cluster by") {
+    // Testing cluster by single part and multipart name.
+    Seq(
+      ("a INT, b STRING, ts TIMESTAMP",
+        "a, b",
+        new StructType()
+          .add("a", IntegerType)
+          .add("b", StringType)
+          .add("ts", TimestampType),
+        ClusterByTransform(Seq(FieldReference("a"), FieldReference("b")))),
+      ("a STRUCT<b INT, c STRING>, ts TIMESTAMP",
+        "a.b, ts",
+        new StructType()
+          .add("a",
+            new StructType()
+              .add("b", IntegerType)
+              .add("c", StringType))
+          .add("ts", TimestampType),
+        ClusterByTransform(Seq(FieldReference(Seq("a", "b")), 
FieldReference("ts"))))
+    ).foreach { case (columns, clusteringColumns, schema, clusterByTransform) 
=>
+      val createSql =
+        s"""CREATE TABLE my_tab ($columns) USING parquet
+           |CLUSTER BY ($clusteringColumns)
+           |""".stripMargin
+      val replaceSql =
+        s"""REPLACE TABLE my_tab ($columns) USING parquet
+           |CLUSTER BY ($clusteringColumns)
+           |""".stripMargin
+      val expectedTableSpec = TableSpec(
+        Seq("my_tab"),
+        Some(schema),
+        Seq(clusterByTransform),
+        Map.empty[String, String],
+        Some("parquet"),
+        OptionList(Seq.empty),
+        None,
+        None,
+        None)
+      Seq(createSql, replaceSql).foreach { sql =>
+        testCreateOrReplaceDdl(sql, expectedTableSpec, expectedIfNotExists = 
false)
+      }
+    }
+  }
+
   test("create/replace table - with comment") {
     val createSql = "CREATE TABLE my_tab(a INT, b STRING) USING parquet 
COMMENT 'abc'"
     val replaceSql = "REPLACE TABLE my_tab(a INT, b STRING) USING parquet 
COMMENT 'abc'"
@@ -859,6 +903,26 @@ class DDLParserSuite extends AnalysisTest {
         fragment = sql18,
         start = 0,
         stop = 86))
+
+    val sql19 = createTableHeader("CLUSTER BY (a)")
+    checkError(
+      exception = parseException(sql19),
+      errorClass = "DUPLICATE_CLAUSES",
+      parameters = Map("clauseName" -> "CLUSTER BY"),
+      context = ExpectedContext(
+        fragment = sql19,
+        start = 0,
+        stop = 65))
+
+    val sql20 = replaceTableHeader("CLUSTER BY (a)")
+    checkError(
+      exception = parseException(sql20),
+      errorClass = "DUPLICATE_CLAUSES",
+      parameters = Map("clauseName" -> "CLUSTER BY"),
+      context = ExpectedContext(
+        fragment = sql20,
+        start = 0,
+        stop = 66))
   }
 
   test("support for other types in OPTIONS") {
@@ -2896,6 +2960,50 @@ class DDLParserSuite extends AnalysisTest {
     )
   }
 
+  test("create table cluster by with bucket") {
+    val sql1 = "CREATE TABLE my_tab(a INT, b STRING) " +
+      "USING parquet CLUSTERED BY (a) INTO 2 BUCKETS CLUSTER BY (a)"
+    checkError(
+      exception = parseException(sql1),
+      errorClass = "SPECIFY_CLUSTER_BY_WITH_BUCKETING_IS_NOT_ALLOWED",
+      parameters = Map.empty,
+      context = ExpectedContext(fragment = sql1, start = 0, stop = 96)
+    )
+  }
+
+  test("replace table cluster by with bucket") {
+    val sql1 = "REPLACE TABLE my_tab(a INT, b STRING) " +
+      "USING parquet CLUSTERED BY (a) INTO 2 BUCKETS CLUSTER BY (a)"
+    checkError(
+      exception = parseException(sql1),
+      errorClass = "SPECIFY_CLUSTER_BY_WITH_BUCKETING_IS_NOT_ALLOWED",
+      parameters = Map.empty,
+      context = ExpectedContext(fragment = sql1, start = 0, stop = 97)
+    )
+  }
+
+  test("create table cluster by with partitioned by") {
+    val sql1 = "CREATE TABLE my_tab(a INT, b STRING) " +
+      "USING parquet CLUSTER BY (a) PARTITIONED BY (a)"
+    checkError(
+      exception = parseException(sql1),
+      errorClass = "SPECIFY_CLUSTER_BY_WITH_PARTITIONED_BY_IS_NOT_ALLOWED",
+      parameters = Map.empty,
+      context = ExpectedContext(fragment = sql1, start = 0, stop = 83)
+    )
+  }
+
+  test("replace table cluster by with partitioned by") {
+    val sql1 = "REPLACE TABLE my_tab(a INT, b STRING) " +
+      "USING parquet CLUSTER BY (a) PARTITIONED BY (a)"
+    checkError(
+      exception = parseException(sql1),
+      errorClass = "SPECIFY_CLUSTER_BY_WITH_PARTITIONED_BY_IS_NOT_ALLOWED",
+      parameters = Map.empty,
+      context = ExpectedContext(fragment = sql1, start = 0, stop = 84)
+    )
+  }
+
   test("AstBuilder don't support `INSERT OVERWRITE DIRECTORY`") {
     val insertDirSql =
       s"""
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
index cd7f7295d5cb..318cbf6962c1 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
@@ -90,6 +90,7 @@ abstract class InMemoryBaseTable(
     case _: HoursTransform =>
     case _: BucketTransform =>
     case _: SortedBucketTransform =>
+    case _: ClusterByTransform =>
     case NamedTransform("truncate", Seq(_: NamedReference, _: Literal[_])) =>
     case t if !allowUnsupportedTransforms =>
       throw new IllegalArgumentException(s"Transform $t is not a supported 
transform")
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/expressions/TransformExtractorSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/expressions/TransformExtractorSuite.scala
index 62cae3c86107..8ac268df80bc 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/expressions/TransformExtractorSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/expressions/TransformExtractorSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.connector.expressions
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.catalyst
-import org.apache.spark.sql.connector.expressions.LogicalExpressions.bucket
+import org.apache.spark.sql.connector.expressions.LogicalExpressions.{bucket, 
clusterBy}
 import org.apache.spark.sql.types.DataType
 
 class TransformExtractorSuite extends SparkFunSuite {
@@ -210,4 +210,45 @@ class TransformExtractorSuite extends SparkFunSuite {
     val copied2 = sortedBucketTransform.withReferences(reference2)
     assert(copied2.equals(sortedBucketTransform))
   }
+
+  test("ClusterBySpec extractor") {
+    val col = ref("a", "b")
+    val clusterByTransform = new Transform {
+      override def name: String = "cluster_by"
+      override def references: Array[NamedReference] = Array(col)
+      override def arguments: Array[Expression] = Array(col)
+      override def toString: String = s"$name(${col.describe})"
+    }
+
+    clusterByTransform match {
+      case ClusterByTransform(columnNames) =>
+        assert(columnNames.size === 1)
+        assert(columnNames(0).fieldNames === Seq("a", "b"))
+      case _ =>
+        fail("Did not match ClusterByTransform extractor")
+    }
+
+    transform("unknown", ref("a", "b")) match {
+      case ClusterByTransform(_) =>
+        fail("Matched unknown transform")
+      case _ =>
+      // expected
+    }
+  }
+
+  test("test cluster by") {
+    val col = Array(ref("a a", "b"), ref("ts"))
+
+    val clusterByTransform = clusterBy(col)
+    val reference = clusterByTransform.references
+    assert(reference.length == 2)
+    assert(reference(0).fieldNames() === Seq("a a", "b"))
+    assert(reference(1).fieldNames() === Seq("ts"))
+    val arguments = clusterByTransform.arguments
+    assert(arguments.length == 2)
+    assert(arguments(0).asInstanceOf[NamedReference].fieldNames() === Seq("a 
a", "b"))
+    assert(arguments(1).asInstanceOf[NamedReference].fieldNames() === 
Seq("ts"))
+    val copied = clusterByTransform.withReferences(reference)
+    assert(copied.equals(clusterByTransform))
+  }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index 174881a46592..c557ec4a486d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -21,7 +21,7 @@ import org.apache.commons.lang3.StringUtils
 
 import org.apache.spark.sql.SaveMode
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
-import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, 
CatalogTable, CatalogTableType, CatalogUtils}
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, 
CatalogTable, CatalogTableType, CatalogUtils, ClusterBySpec}
 import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
@@ -532,7 +532,7 @@ class ResolveSessionCatalog(val catalogManager: 
CatalogManager)
     } else {
       CatalogTableType.MANAGED
     }
-    val (partitionColumns, maybeBucketSpec) = partitioning.convertTransforms
+    val (partitionColumns, maybeBucketSpec, maybeClusterBySpec) = 
partitioning.convertTransforms
 
     CatalogTable(
       identifier = table,
@@ -542,7 +542,9 @@ class ResolveSessionCatalog(val catalogManager: 
CatalogManager)
       provider = Some(provider),
       partitionColumnNames = partitionColumns,
       bucketSpec = maybeBucketSpec,
-      properties = properties,
+      properties = properties ++
+        maybeClusterBySpec.map(
+          clusterBySpec => ClusterBySpec.toProperty(schema, clusterBySpec, 
conf.resolver)),
       comment = comment)
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 1cc214772535..d8e5d4f22701 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -323,7 +323,8 @@ class SparkSqlAstBuilder extends AstBuilder {
         operationNotAllowed("CREATE TEMPORARY TABLE IF NOT EXISTS", ctx)
       }
 
-      val (_, _, _, _, options, location, _, _) = 
visitCreateTableClauses(ctx.createTableClauses())
+      val (_, _, _, _, options, location, _, _, _) =
+        visitCreateTableClauses(ctx.createTableClauses())
       val provider = 
Option(ctx.tableProvider).map(_.multipartIdentifier.getText).getOrElse(
         throw QueryParsingErrors.createTempTableNotSpecifyProviderError(ctx))
       val schema = 
Option(ctx.createOrReplaceTableColTypeList()).map(createSchema)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
index bd2e65974931..6dd76973baa5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
@@ -25,7 +25,7 @@ import scala.jdk.CollectionConverters._
 
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, SQLConfHelper, 
TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, 
NoSuchTableException, TableAlreadyExistsException}
-import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, 
CatalogTableType, CatalogUtils, SessionCatalog}
+import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, 
CatalogTableType, CatalogUtils, ClusterBySpec, SessionCatalog}
 import org.apache.spark.sql.catalyst.util.TypeUtils._
 import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, 
Column, FunctionCatalog, Identifier, NamespaceChange, SupportsNamespaces, 
Table, TableCatalog, TableCatalogCapability, TableChange, V1Table}
 import org.apache.spark.sql.connector.catalog.NamespaceChange.RemoveProperty
@@ -114,7 +114,7 @@ class V2SessionCatalog(catalog: SessionCatalog)
       partitions: Array[Transform],
       properties: util.Map[String, String]): Table = {
     import 
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.TransformHelper
-    val (partitionColumns, maybeBucketSpec) = 
partitions.toSeq.convertTransforms
+    val (partitionColumns, maybeBucketSpec, maybeClusterBySpec) = 
partitions.toSeq.convertTransforms
     val provider = properties.getOrDefault(TableCatalog.PROP_PROVIDER, 
conf.defaultDataSourceName)
     val tableProperties = properties.asScala
     val location = Option(properties.get(TableCatalog.PROP_LOCATION))
@@ -135,7 +135,9 @@ class V2SessionCatalog(catalog: SessionCatalog)
       provider = Some(provider),
       partitionColumnNames = partitionColumns,
       bucketSpec = maybeBucketSpec,
-      properties = tableProperties.toMap,
+      properties = tableProperties.toMap ++
+        maybeClusterBySpec.map(
+          clusterBySpec => ClusterBySpec.toProperty(schema, clusterBySpec, 
conf.resolver)),
       tracksPartitionsInCatalog = conf.manageFilesourcePartitions,
       comment = Option(properties.get(TableCatalog.PROP_COMMENT)))
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index b1ad454fc041..d58cd001e941 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -379,7 +379,8 @@ class CatalogImpl(sparkSession: SparkSession) extends 
Catalog {
 
     val columns = sparkSession.sessionState.executePlan(plan).analyzed match {
       case ResolvedTable(_, _, table, _) =>
-        val (partitionColumnNames, bucketSpecOpt) = 
table.partitioning.toSeq.convertTransforms
+        // TODO (SPARK-45787): Support clusterBySpec for listColumns().
+        val (partitionColumnNames, bucketSpecOpt, _) = 
table.partitioning.toSeq.convertTransforms
         val bucketColumnNames = 
bucketSpecOpt.map(_.bucketColumnNames).getOrElse(Nil)
         schemaToColumns(table.schema(), partitionColumnNames.contains, 
bucketColumnNames.contains)
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/CreateTableClusterBySuiteBase.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/CreateTableClusterBySuiteBase.scala
new file mode 100644
index 000000000000..cb56d11b665d
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/CreateTableClusterBySuiteBase.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.sql.{AnalysisException, QueryTest}
+
+/**
+ * This base suite contains unified tests for the `CREATE/REPLACE TABLE ... 
CLUSTER BY` command
+ * that check V1 and V2 table catalogs. The tests that cannot run for all 
supported catalogs are
+ * located in more specific test suites:
+ *
+ *   - V2 table catalog tests: 
`org.apache.spark.sql.execution.command.v2.CreateTableClusterBySuite`
+ *   - V1 table catalog tests:
+ *     
`org.apache.spark.sql.execution.command.v1.CreateTableClusterBySuiteBase`
+ *     - V1 In-Memory catalog: 
`org.apache.spark.sql.execution.command.v1.CreateTableClusterBySuite`
+ *     - V1 Hive External catalog:
+ *        
`org.apache.spark.sql.hive.execution.command.CreateTableClusterBySuite`
+ */
+trait CreateTableClusterBySuiteBase extends QueryTest with DDLCommandTestUtils 
{
+  override val command = "CREATE/REPLACE TABLE CLUSTER BY"
+
+  protected val nestedColumnSchema: String =
+    "col1 INT, col2 STRUCT<col3 INT, `col4 1` INT>, col3 STRUCT<`col4.1` INT>"
+  protected val nestedClusteringColumns: Seq[String] =
+    Seq("col2.col3", "col2.`col4 1`", "col3.`col4.1`")
+
+  def validateClusterBy(tableName: String, clusteringColumns: Seq[String]): 
Unit
+
+  test("test basic CREATE TABLE with clustering columns") {
+    withNamespaceAndTable("ns", "table") { tbl =>
+      spark.sql(s"CREATE TABLE $tbl (id INT, data STRING) $defaultUsing 
CLUSTER BY (id, data)")
+      validateClusterBy(tbl, Seq("id", "data"))
+    }
+  }
+
+  test("test clustering columns with comma") {
+    withNamespaceAndTable("ns", "table") { tbl =>
+      spark.sql(s"CREATE TABLE $tbl (`i,d` INT, data STRING) $defaultUsing " +
+        "CLUSTER BY (`i,d`, data)")
+      validateClusterBy(tbl, Seq("`i,d`", "data"))
+    }
+  }
+
+  test("test nested clustering columns") {
+    withNamespaceAndTable("ns", "table") { tbl =>
+      spark.sql(s"CREATE TABLE $tbl " +
+        s"($nestedColumnSchema) " +
+        s"$defaultUsing CLUSTER BY (${nestedClusteringColumns.mkString(",")})")
+      validateClusterBy(tbl, nestedClusteringColumns)
+    }
+  }
+
+  test("clustering columns not defined in schema") {
+    withNamespaceAndTable("ns", "table") { tbl =>
+      val err = intercept[AnalysisException] {
+        sql(s"CREATE TABLE $tbl (id bigint, data string) $defaultUsing CLUSTER 
BY (unknown)")
+      }
+      assert(err.message.contains("Couldn't find column unknown in:"))
+    }
+  }
+
+  // Converts three-part table name (catalog.namespace.table) to 
TableIdentifier.
+  protected def parseTableName(threePartTableName: String): (String, String, 
String) = {
+    val tablePath = threePartTableName.split('.')
+    assert(tablePath.length === 3)
+    (tablePath(0), tablePath(1), tablePath(2))
+  }
+}
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/CreateTableClusterBySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/CreateTableClusterBySuite.scala
new file mode 100644
index 000000000000..2444fe062e28
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/CreateTableClusterBySuite.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.v1
+
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.ClusterBySpec
+import org.apache.spark.sql.connector.expressions.FieldReference
+import org.apache.spark.sql.execution.command
+
+/**
+ * This base suite contains unified tests for the `CREATE TABLE ... CLUSTER 
BY` command that
+ * checks V1 table catalogs. The tests that cannot run for all V1 catalogs are 
located in more
+ * specific test suites:
+ *
+ *   - V1 In-Memory catalog: 
`org.apache.spark.sql.execution.command.v1.CreateTableClusterBySuite`
+ *   - V1 Hive External catalog:
+ *     `org.apache.spark.sql.hive.execution.command.CreateTableClusterBySuite`
+ */
+trait CreateTableClusterBySuiteBase extends 
command.CreateTableClusterBySuiteBase
+    with command.TestsV1AndV2Commands {
+  override def validateClusterBy(tableName: String, clusteringColumns: 
Seq[String]): Unit = {
+    val catalog = spark.sessionState.catalog
+    val (_, db, t) = parseTableName(tableName)
+    val table = catalog.getTableMetadata(TableIdentifier.apply(t, Some(db)))
+    assert(table.clusterBySpec === 
Some(ClusterBySpec(clusteringColumns.map(FieldReference(_)))))
+  }
+}
+
+/**
+ * The class contains tests for the `CREATE TABLE ... CLUSTER BY` command to 
check V1 In-Memory
+ * table catalog.
+ */
+class CreateTableClusterBySuite extends CreateTableClusterBySuiteBase
+    with CommandSuiteBase {
+  override def commandVersion: String = 
super[CreateTableClusterBySuiteBase].commandVersion
+}
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CreateTableClusterBySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CreateTableClusterBySuite.scala
new file mode 100644
index 000000000000..86b14d668038
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CreateTableClusterBySuite.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.v2
+
+import org.apache.spark.sql.connector.catalog.{Identifier, 
InMemoryPartitionTable}
+import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.CatalogHelper
+import org.apache.spark.sql.connector.expressions.{ClusterByTransform, 
FieldReference}
+import org.apache.spark.sql.execution.command
+
+/**
+ * The class contains tests for the `CREATE TABLE ... CLUSTER BY` command to 
check V2 table
+ * catalogs.
+ */
+class CreateTableClusterBySuite extends command.CreateTableClusterBySuiteBase
+  with CommandSuiteBase {
+  override def validateClusterBy(tableName: String, clusteringColumns: 
Seq[String]): Unit = {
+    val (catalog, namespace, table) = parseTableName(tableName)
+    val catalogPlugin = spark.sessionState.catalogManager.catalog(catalog)
+    val partTable = catalogPlugin.asTableCatalog
+      .loadTable(Identifier.of(Array(namespace), table))
+      .asInstanceOf[InMemoryPartitionTable]
+    assert(partTable.partitioning ===
+      Array(ClusterByTransform(clusteringColumns.map(FieldReference(_)))))
+  }
+
+  test("test REPLACE TABLE with clustering columns") {
+    withNamespaceAndTable("ns", "table") { tbl =>
+      spark.sql(s"CREATE TABLE $tbl (id INT) $defaultUsing CLUSTER BY (id)")
+      validateClusterBy(tbl, Seq("id"))
+
+      spark.sql(s"REPLACE TABLE $tbl (id2 INT) $defaultUsing CLUSTER BY (id2)")
+      validateClusterBy(tbl, Seq("id2"))
+    }
+  }
+}
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/CreateTableClusterBySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/CreateTableClusterBySuite.scala
new file mode 100644
index 000000000000..496cc13c4971
--- /dev/null
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/CreateTableClusterBySuite.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution.command
+
+import org.apache.spark.sql.execution.command.v1
+
+/**
+ * The class contains tests for the `CREATE TABLE ... CLUSTER BY` command to 
check V1 Hive external
+ * table catalog.
+ */
+class CreateTableClusterBySuite extends v1.CreateTableClusterBySuiteBase with 
CommandSuiteBase {
+  // Hive doesn't support nested column names with space and dot.
+  override protected val nestedColumnSchema: String =
+    "col1 INT, col2 STRUCT<col3 INT, col4 INT>"
+  override protected val nestedClusteringColumns: Seq[String] =
+    Seq("col2.col3")
+
+  // Hive catalog doesn't support column names with commas.
+  override def excluded: Seq[String] = Seq(
+    s"$command using Hive V1 catalog V1 command: test clustering columns with 
comma",
+    s"$command using Hive V1 catalog V2 command: test clustering columns with 
comma")
+
+  override def commandVersion: String = 
super[CreateTableClusterBySuiteBase].commandVersion
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to