This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new e5b7fb85b2d [SPARK-39543] The option of DataFrameWriterV2 should be passed to storage properties if fallback to v1 e5b7fb85b2d is described below commit e5b7fb85b2d91f2e84dc60888c94e15b53751078 Author: Yikf <yikaif...@gmail.com> AuthorDate: Thu Jun 23 13:04:05 2022 +0800 [SPARK-39543] The option of DataFrameWriterV2 should be passed to storage properties if fallback to v1 ### What changes were proposed in this pull request? The option of DataFrameWriterV2 should be passed to storage properties if fallback to v1, to support something such as compressed formats ### Why are the changes needed? example: `spark.range(0, 100).writeTo("t1").option("compression", "zstd").using("parquet").create` **before** gen: part-00000-644a65ed-0e7a-43d5-8d30-b610a0fb19dc-c000.**snappy**.parquet ... **after** gen: part-00000-6eb9d1ae-8fdb-4428-aea3-bd6553954cdd-c000.**zstd**.parquet ... ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? new test Closes #36941 from Yikf/writeV2option. Authored-by: Yikf <yikaif...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../catalyst/analysis/ResolveSessionCatalog.scala | 8 ++++++-- .../apache/spark/sql/DataFrameWriterV2Suite.scala | 20 ++++++++++++++++++++ 2 files changed, 26 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index f4df3bea532..41b0599848e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -161,11 +161,15 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) c } - case c @ CreateTableAsSelect(ResolvedDBObjectName(catalog, name), _, _, _, _, _) + case c @ CreateTableAsSelect(ResolvedDBObjectName(catalog, name), _, _, _, writeOptions, _) if isSessionCatalog(catalog) => val (storageFormat, provider) = getStorageFormatAndProvider( - c.tableSpec.provider, c.tableSpec.options, c.tableSpec.location, c.tableSpec.serde, + c.tableSpec.provider, + c.tableSpec.options ++ writeOptions, + c.tableSpec.location, + c.tableSpec.serde, ctas = true) + if (!isV2Provider(provider)) { constructV1TableCmd(Some(c.query), c.tableSpec, name, new StructType, c.partitioning, c.ignoreIfExists, storageFormat, provider) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala index 8aef27a1b66..86108a81da8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala @@ -23,12 +23,15 @@ import scala.collection.JavaConverters._ import org.scalatest.BeforeAndAfter +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic} +import org.apache.spark.sql.connector.InMemoryV1Provider import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryTable, InMemoryTableCatalog, TableCatalog} import org.apache.spark.sql.connector.expressions.{BucketTransform, DaysTransform, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, YearsTransform} import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.FakeSourceOne import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType, TimestampType} @@ -531,6 +534,23 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo assert(table.properties === (Map("provider" -> "foo") ++ defaultOwnership).asJava) } + test("SPARK-39543 writeOption should be passed to storage properties when fallback to v1") { + val provider = classOf[InMemoryV1Provider].getName + + withSQLConf((SQLConf.USE_V1_SOURCE_LIST.key, provider)) { + spark.range(10) + .writeTo("table_name") + .option("compression", "zstd").option("name", "table_name") + .using(provider) + .create() + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("table_name")) + + assert(table.identifier === TableIdentifier("table_name", Some("default"))) + assert(table.storage.properties.contains("compression")) + assert(table.storage.properties.getOrElse("compression", "foo") == "zstd") + } + } + test("Replace: basic behavior") { spark.sql( "CREATE TABLE testcat.table_name (id bigint, data string) USING foo PARTITIONED BY (id)") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org