This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 689ab0ee8ca8 [SPARK-45827] Disallow partitioning on Variant column
689ab0ee8ca8 is described below

commit 689ab0ee8ca87286db6167f672348e73116b9186
Author: cashmand <david.cash...@databricks.com>
AuthorDate: Sat Jan 20 10:45:51 2024 +0800

    [SPARK-45827] Disallow partitioning on Variant column
    
    ### What changes were proposed in this pull request?
    
    Follow-up to https://github.com/apache/spark/pull/43984: we should not 
allow partitioning on VariantType. Even though it is is an atomic type, it 
represents a nested semi-structured value, so not partitioning is consistent 
with our decision to not allow partitioning on nested types. Also, for now at 
least, it is not even comparable, so attempting to partition fails with a 
confusing codegen error about a method named `compare` not being declared.
    
    ### Why are the changes needed?
    
    Improves error message when attempting to partition on Variant, and 
explicitly forbids a case that we do not intend to support.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Improved error message if a user tries to partition on Variant.
    
    ### How was this patch tested?
    
    Added unit test, which fails without the code change.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #44742 from cashmand/SPARK-45827-no-partitioning.
    
    Authored-by: cashmand <david.cash...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../execution/datasources/PartitioningUtils.scala  |  2 +-
 .../spark/sql/execution/datasources/rules.scala    |  5 +-
 .../scala/org/apache/spark/sql/VariantSuite.scala  | 56 ++++++++++++++++++++++
 3 files changed, 60 insertions(+), 3 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index 9905e9af9b0b..555099da221e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -564,7 +564,7 @@ object PartitioningUtils extends SQLConfHelper {
 
     partitionColumnsSchema(schema, partitionColumns).foreach {
       field => field.dataType match {
-        case _: AtomicType => // OK
+        case a: AtomicType if !a.isInstanceOf[VariantType] => // OK
         case _ => throw 
QueryCompilationErrors.cannotUseDataTypeForPartitionColumnError(field)
       }
     }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index c58815b6978e..bb2bad7a6867 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.execution.datasources.{CreateTable => 
CreateTableV1}
 import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
 import org.apache.spark.sql.sources.InsertableRelation
-import org.apache.spark.sql.types.{AtomicType, StructType}
+import org.apache.spark.sql.types.{AtomicType, StructType, VariantType}
 import org.apache.spark.sql.util.PartitioningUtils.normalizePartitionSpec
 import org.apache.spark.sql.util.SchemaUtils
 import org.apache.spark.util.ArrayImplicits._
@@ -330,7 +330,8 @@ case class PreprocessTableCreation(catalog: SessionCatalog) 
extends Rule[Logical
     }
 
     schema.filter(f => 
normalizedPartitionCols.contains(f.name)).map(_.dataType).foreach {
-      case _: AtomicType => // OK
+      // VariantType values are not comparable, so can't be used as partition 
columns.
+      case a: AtomicType if !a.isInstanceOf[VariantType] => // OK
       case other => failAnalysis(s"Cannot use ${other.catalogString} for 
partition column")
     }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala
index 98d106f05f0c..af37445c1323 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala
@@ -138,4 +138,60 @@ class VariantSuite extends QueryTest with 
SharedSparkSession {
       }
     }
   }
+
+  test("write partitioned file") {
+    def verifyResult(df: DataFrame): Unit = {
+      val result = df.selectExpr("v").collect()
+        .map(_.get(0).asInstanceOf[VariantVal].toString)
+        .sorted
+        .toSeq
+      val expected = (1 until 10).map(id => "1" * id)
+      assert(result == expected)
+    }
+
+    // At this point, JSON parsing logic is not really implemented. We just 
construct some number
+    // inputs that are also valid JSON. This exercises passing VariantVal 
throughout the system.
+    val queryString = "select id, parse_json(repeat('1', id)) as v from 
range(1, 10)"
+    val query = spark.sql(queryString)
+    verifyResult(query)
+
+    // Partition by another column should work.
+    withTempDir { dir =>
+      val tempDir = new File(dir, "files").getCanonicalPath
+      query.write.partitionBy("id").parquet(tempDir)
+      verifyResult(spark.read.parquet(tempDir))
+    }
+
+    // Partitioning by Variant column is not allowed.
+    withTempDir { dir =>
+      val tempDir = new File(dir, "files").getCanonicalPath
+      intercept[AnalysisException] {
+        query.write.partitionBy("v").parquet(tempDir)
+      }
+    }
+
+    // Same as above, using saveAsTable
+    withTable("t") {
+      query.write.partitionBy("id").saveAsTable("t")
+      verifyResult(spark.sql("select * from t"))
+    }
+
+    withTable("t") {
+      intercept[AnalysisException] {
+        query.write.partitionBy("v").saveAsTable("t")
+      }
+    }
+
+    // Same as above, using SQL CTAS
+    withTable("t") {
+      spark.sql(s"CREATE TABLE t USING PARQUET PARTITIONED BY (id) AS 
$queryString")
+      verifyResult(spark.sql("select * from t"))
+    }
+
+    withTable("t") {
+      intercept[AnalysisException] {
+        spark.sql(s"CREATE TABLE t USING PARQUET PARTITIONED BY (v) AS 
$queryString")
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to