This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push: new 725ce337cb1 [SPARK-39543] The option of DataFrameWriterV2 should be passed to storage properties if fallback to v1 725ce337cb1 is described below commit 725ce337cb1f24f666e99f7f0a3742333d116abb Author: Yikf <yikaif...@gmail.com> AuthorDate: Thu Jun 23 13:04:05 2022 +0800 [SPARK-39543] The option of DataFrameWriterV2 should be passed to storage properties if fallback to v1 The option of DataFrameWriterV2 should be passed to storage properties if fallback to v1, to support something such as compressed formats example: `spark.range(0, 100).writeTo("t1").option("compression", "zstd").using("parquet").create` **before** gen: part-00000-644a65ed-0e7a-43d5-8d30-b610a0fb19dc-c000.**snappy**.parquet ... **after** gen: part-00000-6eb9d1ae-8fdb-4428-aea3-bd6553954cdd-c000.**zstd**.parquet ... No new test Closes #36941 from Yikf/writeV2option. Authored-by: Yikf <yikaif...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit e5b7fb85b2d91f2e84dc60888c94e15b53751078) Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../catalyst/analysis/ResolveSessionCatalog.scala | 9 +++++++-- .../apache/spark/sql/DataFrameWriterV2Suite.scala | 20 ++++++++++++++++++++ 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index b73ccbbdb5e..6694d9b5843 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -180,9 +180,14 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) } case c @ CreateTableAsSelectStatement( - SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _, _) => + SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, writeOptions, _, _, _) => val (storageFormat, provider) = getStorageFormatAndProvider( - c.provider, c.options, c.location, c.serde, ctas = true) + c.provider, + c.options ++ writeOptions, + c.location, + c.serde, + ctas = true) + if (!isV2Provider(provider)) { val tableDesc = buildCatalogTable(tbl.asTableIdentifier, new StructType, c.partitioning, c.bucketSpec, c.properties, provider, c.location, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala index 8aef27a1b66..86108a81da8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala @@ -23,12 +23,15 @@ import scala.collection.JavaConverters._ import org.scalatest.BeforeAndAfter +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic} +import org.apache.spark.sql.connector.InMemoryV1Provider import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryTable, InMemoryTableCatalog, TableCatalog} import org.apache.spark.sql.connector.expressions.{BucketTransform, DaysTransform, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, YearsTransform} import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.FakeSourceOne import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType, TimestampType} @@ -531,6 +534,23 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo assert(table.properties === (Map("provider" -> "foo") ++ defaultOwnership).asJava) } + test("SPARK-39543 writeOption should be passed to storage properties when fallback to v1") { + val provider = classOf[InMemoryV1Provider].getName + + withSQLConf((SQLConf.USE_V1_SOURCE_LIST.key, provider)) { + spark.range(10) + .writeTo("table_name") + .option("compression", "zstd").option("name", "table_name") + .using(provider) + .create() + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("table_name")) + + assert(table.identifier === TableIdentifier("table_name", Some("default"))) + assert(table.storage.properties.contains("compression")) + assert(table.storage.properties.getOrElse("compression", "foo") == "zstd") + } + } + test("Replace: basic behavior") { spark.sql( "CREATE TABLE testcat.table_name (id bigint, data string) USING foo PARTITIONED BY (id)") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org