This is an automated email from the ASF dual-hosted git repository. huaxingao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new feba5ac [SPARK-37545][SQL] V2 CreateTableAsSelect command should qualify location feba5ac is described below commit feba5ac32f2598f6ca8a274850934106be0db64d Author: Terry Kim <yumin...@gmail.com> AuthorDate: Sat Dec 4 20:47:45 2021 -0800 [SPARK-37545][SQL] V2 CreateTableAsSelect command should qualify location ### What changes were proposed in this pull request? Currently, v2 CTAS command doesn't qualify the location: ``` spark.sql("CREATE TABLE testcat.t USING foo LOCATION '/tmp/foo' AS SELECT id FROM source") spark.sql("DESCRIBE EXTENDED testcat.t").filter("col_name = 'Location'").show +--------+-------------+-------+ |col_name| data_type|comment| +--------+-------------+-------+ |Location|/tmp/foo | | +--------+-------------+-------+ ``` , whereas v1 command qualifies the location as `file:/tmp/foo` which is the correct behavior since the default filesystem can change for different sessions. ### Why are the changes needed? This PR proposes to store the qualified location in order to prevent the issue where default filesystem changes for different sessions. ### Does this PR introduce _any_ user-facing change? Yes, now, v2 CTAS command will store qualified location: ``` +--------+-------------+-------+ |col_name| data_type|comment| +--------+-------------+-------+ |Location|file:/tmp/foo| | +--------+-------------+-------+ ``` ### How was this patch tested? Added new test Closes #34806 from imback82/v2_ctas_qualified_loc. Authored-by: Terry Kim <yumin...@gmail.com> Signed-off-by: Huaxin Gao <huaxin_...@apple.com> --- .../execution/datasources/v2/DataSourceV2Strategy.scala | 6 ++++-- .../DataSourceV2DataFrameSessionCatalogSuite.scala | 4 ++-- .../apache/spark/sql/connector/DataSourceV2SQLSuite.scala | 15 +++++++++++++++ 3 files changed, 21 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index f73b1a6..dbe4168 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -172,13 +172,15 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat case CreateTableAsSelect(ResolvedDBObjectName(catalog, ident), parts, query, tableSpec, options, ifNotExists) => val writeOptions = new CaseInsensitiveStringMap(options.asJava) + val tableSpecWithQualifiedLocation = tableSpec.copy( + location = tableSpec.location.map(makeQualifiedDBObjectPath(_))) catalog match { case staging: StagingTableCatalog => AtomicCreateTableAsSelectExec(staging, ident.asIdentifier, parts, query, planLater(query), - tableSpec, writeOptions, ifNotExists) :: Nil + tableSpecWithQualifiedLocation, writeOptions, ifNotExists) :: Nil case _ => CreateTableAsSelectExec(catalog.asTableCatalog, ident.asIdentifier, parts, query, - planLater(query), tableSpec, writeOptions, ifNotExists) :: Nil + planLater(query), tableSpecWithQualifiedLocation, writeOptions, ifNotExists) :: Nil } case RefreshTable(r: ResolvedTable) => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala index 91ac7db..3edc4b9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala @@ -83,10 +83,10 @@ class DataSourceV2DataFrameSessionCatalogSuite test("saveAsTable passes path and provider information properly") { val t1 = "prop_table" withTable(t1) { - spark.range(20).write.format(v2Format).option("path", "abc").saveAsTable(t1) + spark.range(20).write.format(v2Format).option("path", "/abc").saveAsTable(t1) val cat = spark.sessionState.catalogManager.currentCatalog.asInstanceOf[TableCatalog] val tableInfo = cat.loadTable(Identifier.of(Array("default"), t1)) - assert(tableInfo.properties().get("location") === "abc") + assert(tableInfo.properties().get("location") === "file:/abc") assert(tableInfo.properties().get("provider") === v2Format) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 949abfe..6fe8e10 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -408,6 +408,21 @@ class DataSourceV2SQLSuite } } + test("SPARK-37545: CreateTableAsSelect should store location as qualified") { + val basicIdentifier = "testcat.table_name" + val atomicIdentifier = "testcat_atomic.table_name" + Seq(basicIdentifier, atomicIdentifier).foreach { identifier => + withTable(identifier) { + spark.sql(s"CREATE TABLE $identifier USING foo LOCATION '/tmp/foo' " + + "AS SELECT id FROM source") + val location = spark.sql(s"DESCRIBE EXTENDED $identifier") + .filter("col_name = 'Location'") + .select("data_type").head.getString(0) + assert(location === "file:/tmp/foo") + } + } + } + test("ReplaceTableAsSelect: basic v2 implementation.") { val basicCatalog = catalog("testcat").asTableCatalog val atomicCatalog = catalog("testcat_atomic").asTableCatalog --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org