Repository: spark Updated Branches: refs/heads/master d449988b8 -> e2318ede0
[SPARK-18544][SQL] Append with df.saveAsTable writes data to wrong location ## What changes were proposed in this pull request? We failed to properly propagate table metadata for existing tables for the saveAsTable command. This caused a downstream component to think the table was MANAGED, writing data to the wrong location. ## How was this patch tested? Unit test that fails before the patch. Author: Eric Liang <e...@databricks.com> Closes #15983 from ericl/spark-18544. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e2318ede Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e2318ede Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e2318ede Branch: refs/heads/master Commit: e2318ede04fa7a756d1c8151775e1f2406a176ca Parents: d449988 Author: Eric Liang <e...@databricks.com> Authored: Mon Nov 28 21:58:01 2016 -0800 Committer: Reynold Xin <r...@databricks.com> Committed: Mon Nov 28 21:58:01 2016 -0800 ---------------------------------------------------------------------- .../org/apache/spark/sql/DataFrameWriter.scala | 21 ++++++++++++-------- .../command/createDataSourceTables.scala | 3 ++- .../PartitionProviderCompatibilitySuite.scala | 19 ++++++++++++++++++ 3 files changed, 34 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/e2318ede/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 2d86342..8294e41 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -373,8 +373,19 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { throw new AnalysisException(s"Table $tableIdent already exists.") case _ => - val storage = DataSource.buildStorageFormatFromOptions(extraOptions.toMap) - val tableType = if (storage.locationUri.isDefined) { + val existingTable = if (tableExists) { + Some(df.sparkSession.sessionState.catalog.getTableMetadata(tableIdent)) + } else { + None + } + val storage = if (tableExists) { + existingTable.get.storage + } else { + DataSource.buildStorageFormatFromOptions(extraOptions.toMap) + } + val tableType = if (tableExists) { + existingTable.get.tableType + } else if (storage.locationUri.isDefined) { CatalogTableType.EXTERNAL } else { CatalogTableType.MANAGED @@ -391,12 +402,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { ) df.sparkSession.sessionState.executePlan( CreateTable(tableDesc, mode, Some(df.logicalPlan))).toRdd - if (tableDesc.partitionColumnNames.nonEmpty && - df.sparkSession.sqlContext.conf.manageFilesourcePartitions) { - // Need to recover partitions into the metastore so our saved data is visible. - df.sparkSession.sessionState.executePlan( - AlterTableRecoverPartitionsCommand(tableDesc.identifier)).toRdd - } } } http://git-wip-us.apache.org/repos/asf/spark/blob/e2318ede/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index add732c..422700c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -212,7 +212,8 @@ case class CreateDataSourceTableAsSelectCommand( className = provider, partitionColumns = table.partitionColumnNames, bucketSpec = table.bucketSpec, - options = table.storage.properties ++ pathOption) + options = table.storage.properties ++ pathOption, + catalogTable = Some(table)) val result = try { dataSource.write(mode, df) http://git-wip-us.apache.org/repos/asf/spark/blob/e2318ede/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala index a1aa074..cace5fa 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala @@ -188,6 +188,25 @@ class PartitionProviderCompatibilitySuite } } + for (enabled <- Seq(true, false)) { + test(s"SPARK-18544 append with saveAsTable - partition management $enabled") { + withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> enabled.toString) { + withTable("test") { + withTempDir { dir => + setupPartitionedDatasourceTable("test", dir) + if (enabled) { + spark.sql("msck repair table test") + } + assert(spark.sql("select * from test").count() == 5) + spark.range(10).selectExpr("id as fieldOne", "id as partCol") + .write.partitionBy("partCol").mode("append").saveAsTable("test") + assert(spark.sql("select * from test").count() == 15) + } + } + } + } + } + /** * Runs a test against a multi-level partitioned table, then validates that the custom locations * were respected by the output writer. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org