This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 689ab0ee8ca8 [SPARK-45827] Disallow partitioning on Variant column 689ab0ee8ca8 is described below commit 689ab0ee8ca87286db6167f672348e73116b9186 Author: cashmand <david.cash...@databricks.com> AuthorDate: Sat Jan 20 10:45:51 2024 +0800 [SPARK-45827] Disallow partitioning on Variant column ### What changes were proposed in this pull request? Follow-up to https://github.com/apache/spark/pull/43984: we should not allow partitioning on VariantType. Even though it is is an atomic type, it represents a nested semi-structured value, so not partitioning is consistent with our decision to not allow partitioning on nested types. Also, for now at least, it is not even comparable, so attempting to partition fails with a confusing codegen error about a method named `compare` not being declared. ### Why are the changes needed? Improves error message when attempting to partition on Variant, and explicitly forbids a case that we do not intend to support. ### Does this PR introduce _any_ user-facing change? Improved error message if a user tries to partition on Variant. ### How was this patch tested? Added unit test, which fails without the code change. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44742 from cashmand/SPARK-45827-no-partitioning. Authored-by: cashmand <david.cash...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../execution/datasources/PartitioningUtils.scala | 2 +- .../spark/sql/execution/datasources/rules.scala | 5 +- .../scala/org/apache/spark/sql/VariantSuite.scala | 56 ++++++++++++++++++++++ 3 files changed, 60 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 9905e9af9b0b..555099da221e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -564,7 +564,7 @@ object PartitioningUtils extends SQLConfHelper { partitionColumnsSchema(schema, partitionColumns).foreach { field => field.dataType match { - case _: AtomicType => // OK + case a: AtomicType if !a.isInstanceOf[VariantType] => // OK case _ => throw QueryCompilationErrors.cannotUseDataTypeForPartitionColumnError(field) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index c58815b6978e..bb2bad7a6867 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.{CreateTable => CreateTableV1} import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2 import org.apache.spark.sql.sources.InsertableRelation -import org.apache.spark.sql.types.{AtomicType, StructType} +import org.apache.spark.sql.types.{AtomicType, StructType, VariantType} import org.apache.spark.sql.util.PartitioningUtils.normalizePartitionSpec import org.apache.spark.sql.util.SchemaUtils import org.apache.spark.util.ArrayImplicits._ @@ -330,7 +330,8 @@ case class PreprocessTableCreation(catalog: SessionCatalog) extends Rule[Logical } schema.filter(f => normalizedPartitionCols.contains(f.name)).map(_.dataType).foreach { - case _: AtomicType => // OK + // VariantType values are not comparable, so can't be used as partition columns. + case a: AtomicType if !a.isInstanceOf[VariantType] => // OK case other => failAnalysis(s"Cannot use ${other.catalogString} for partition column") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala index 98d106f05f0c..af37445c1323 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala @@ -138,4 +138,60 @@ class VariantSuite extends QueryTest with SharedSparkSession { } } } + + test("write partitioned file") { + def verifyResult(df: DataFrame): Unit = { + val result = df.selectExpr("v").collect() + .map(_.get(0).asInstanceOf[VariantVal].toString) + .sorted + .toSeq + val expected = (1 until 10).map(id => "1" * id) + assert(result == expected) + } + + // At this point, JSON parsing logic is not really implemented. We just construct some number + // inputs that are also valid JSON. This exercises passing VariantVal throughout the system. + val queryString = "select id, parse_json(repeat('1', id)) as v from range(1, 10)" + val query = spark.sql(queryString) + verifyResult(query) + + // Partition by another column should work. + withTempDir { dir => + val tempDir = new File(dir, "files").getCanonicalPath + query.write.partitionBy("id").parquet(tempDir) + verifyResult(spark.read.parquet(tempDir)) + } + + // Partitioning by Variant column is not allowed. + withTempDir { dir => + val tempDir = new File(dir, "files").getCanonicalPath + intercept[AnalysisException] { + query.write.partitionBy("v").parquet(tempDir) + } + } + + // Same as above, using saveAsTable + withTable("t") { + query.write.partitionBy("id").saveAsTable("t") + verifyResult(spark.sql("select * from t")) + } + + withTable("t") { + intercept[AnalysisException] { + query.write.partitionBy("v").saveAsTable("t") + } + } + + // Same as above, using SQL CTAS + withTable("t") { + spark.sql(s"CREATE TABLE t USING PARQUET PARTITIONED BY (id) AS $queryString") + verifyResult(spark.sql("select * from t")) + } + + withTable("t") { + intercept[AnalysisException] { + spark.sql(s"CREATE TABLE t USING PARQUET PARTITIONED BY (v) AS $queryString") + } + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org