This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new cbe6846c477b [SPARK-48760][SQL] Fix CatalogV2Util.applyClusterByChanges
cbe6846c477b is described below

commit cbe6846c477bc8b6d94385ddd0097c4e97b05d41
Author: Jiaheng Tang <jiaheng.t...@databricks.com>
AuthorDate: Fri Jul 12 11:10:38 2024 +0800

    [SPARK-48760][SQL] Fix CatalogV2Util.applyClusterByChanges
    
    ### What changes were proposed in this pull request?
    
    https://github.com/apache/spark/pull/47156/ introduced a bug in 
`CatalogV2Util.applyClusterByChanges` that it will remove the existing 
`ClusterByTransform` first, regardless of whether there is a `ClusterBy` table 
change. This means any table change will remove the clustering columns from the 
table.
    
    This PR fixes the bug by removing the `ClusterByTransform` only when there 
is a `ClusterBy` table change.
    
    ### Why are the changes needed?
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    ### How was this patch tested?
    
    Amend existing test to catch this bug.
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #47288 from zedtang/fix-apply-cluster-by-changes.
    
    Authored-by: Jiaheng Tang <jiaheng.t...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../spark/sql/connector/catalog/CatalogV2Util.scala   | 19 +++++++++++--------
 .../execution/command/DescribeTableSuiteBase.scala    |  3 ++-
 2 files changed, 13 insertions(+), 9 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
index c5888d72c2b2..645ed9e6bb0c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
@@ -178,16 +178,19 @@ private[sql] object CatalogV2Util {
      schema: StructType,
      changes: Seq[TableChange]): Array[Transform] = {
 
-    val newPartitioning = 
partitioning.filterNot(_.isInstanceOf[ClusterByTransform]).toBuffer
-    changes.foreach {
-      case clusterBy: ClusterBy =>
-        newPartitioning += ClusterBySpec.extractClusterByTransform(
+    var newPartitioning = partitioning
+    // If there is a clusterBy change (only the first one), we overwrite the 
existing
+    // clustering columns.
+    val clusterByOpt = changes.collectFirst { case c: ClusterBy => c }
+    clusterByOpt.foreach { clusterBy =>
+      newPartitioning = partitioning.map {
+        case _: ClusterByTransform => ClusterBySpec.extractClusterByTransform(
           schema, ClusterBySpec(clusterBy.clusteringColumns.toIndexedSeq), 
conf.resolver)
-
-      case _ =>
-      // ignore other changes
+        case other => other
+      }
     }
-    newPartitioning.toArray
+
+    newPartitioning
   }
 
   /**
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeTableSuiteBase.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeTableSuiteBase.scala
index 2588aa4313fa..02e8a5e68999 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeTableSuiteBase.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeTableSuiteBase.scala
@@ -181,8 +181,9 @@ trait DescribeTableSuiteBase extends QueryTest with 
DDLCommandTestUtils {
 
   test("describe a clustered table") {
     withNamespaceAndTable("ns", "tbl") { tbl =>
-      sql(s"CREATE TABLE $tbl (col1 STRING COMMENT 'this is comment', col2 
struct<x:int, y:int>) " +
+      sql(s"CREATE TABLE $tbl (col1 STRING, col2 struct<x:int, y:int>) " +
         s"$defaultUsing CLUSTER BY (col1, col2.x)")
+      sql(s"ALTER TABLE $tbl ALTER COLUMN col1 COMMENT 'this is comment';")
       val descriptionDf = sql(s"DESC $tbl")
       assert(descriptionDf.schema.map(field => (field.name, field.dataType)) 
=== Seq(
         ("col_name", StringType),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to