Repository: spark
Updated Branches:
  refs/heads/branch-2.0 8d2fc010b -> ee6eea644


[SPARK-16034][SQL] Checks the partition columns when calling 
dataFrame.write.mode("append").saveAsTable

## What changes were proposed in this pull request?

`DataFrameWriter` can be used to append data to existing data source tables. It 
becomes tricky when partition columns used in 
`DataFrameWriter.partitionBy(columns)` don't match the actual partition columns 
of the underlying table. This pull request enforces the check so that the 
partition columns of these two always match.

## How was this patch tested?

Unit test.

Author: Sean Zhong <seanzh...@databricks.com>

Closes #13749 from clockfly/SPARK-16034.

(cherry picked from commit ce3b98bae28af72299722f56e4e4ef831f471ec0)
Signed-off-by: Yin Huai <yh...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ee6eea64
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ee6eea64
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ee6eea64

Branch: refs/heads/branch-2.0
Commit: ee6eea644fe0197a183385ef5879911ae8ab9ccb
Parents: 8d2fc01
Author: Sean Zhong <seanzh...@databricks.com>
Authored: Sat Jun 18 10:41:33 2016 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Sat Jun 18 10:41:46 2016 -0700

----------------------------------------------------------------------
 .../command/createDataSourceTables.scala        |  9 ++++-
 .../sql/execution/datasources/DataSource.scala  | 39 ++++++++++----------
 .../spark/sql/execution/command/DDLSuite.scala  | 24 ++++++++++++
 3 files changed, 50 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ee6eea64/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 4918780..c38eca5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -242,8 +242,13 @@ case class CreateDataSourceTableAsSelectCommand(
       bucketSpec = bucketSpec,
       options = optionsWithPath)
 
-    val result = dataSource.write(mode, df)
-
+    val result = try {
+      dataSource.write(mode, df)
+    } catch {
+      case ex: AnalysisException =>
+        logError(s"Failed to write to table ${tableIdent.identifier} in $mode 
mode", ex)
+        throw ex
+    }
     if (createMetastoreTable) {
       // We will use the schema of resolved.relation as the schema of the 
table (instead of
       // the schema of df). It is important since the nullability may be 
changed by the relation

http://git-wip-us.apache.org/repos/asf/spark/blob/ee6eea64/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 7f3683f..f274fc7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -435,26 +435,25 @@ case class DataSource(
         // If we are appending to a table that already exists, make sure the 
partitioning matches
         // up.  If we fail to load the table for whatever reason, ignore the 
check.
         if (mode == SaveMode.Append) {
-          val existingPartitionColumnSet = try {
-            Some(
-              resolveRelation()
-                .asInstanceOf[HadoopFsRelation]
-                .location
-                .partitionSpec()
-                .partitionColumns
-                .fieldNames
-                .toSet)
-          } catch {
-            case e: Exception =>
-              None
-          }
-
-          existingPartitionColumnSet.foreach { ex =>
-            if (ex.map(_.toLowerCase) != 
partitionColumns.map(_.toLowerCase()).toSet) {
-              throw new AnalysisException(
-                s"Requested partitioning does not equal existing partitioning: 
" +
-                s"$ex != ${partitionColumns.toSet}.")
-            }
+          val existingColumns = Try {
+            resolveRelation()
+              .asInstanceOf[HadoopFsRelation]
+              .location
+              .partitionSpec()
+              .partitionColumns
+              .fieldNames
+              .toSeq
+          }.getOrElse(Seq.empty[String])
+          val sameColumns =
+            existingColumns.map(_.toLowerCase) == 
partitionColumns.map(_.toLowerCase)
+          if (existingColumns.size > 0 && !sameColumns) {
+            throw new AnalysisException(
+              s"""Requested partitioning does not match existing partitioning.
+                 |Existing partitioning columns:
+                 |  ${existingColumns.mkString(", ")}
+                 |Requested partitioning columns:
+                 |  ${partitionColumns.mkString(", ")}
+                 |""".stripMargin)
           }
         }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ee6eea64/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 7eb2fff..8827649 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -1317,4 +1317,28 @@ class DDLSuite extends QueryTest with SharedSQLContext 
with BeforeAndAfterEach {
     assertUnsupported("TRUNCATE TABLE my_tab PARTITION (age=10)")
   }
 
+  test("SPARK-16034 Partition columns should match when appending to existing 
data source tables") {
+    import testImplicits._
+    val df = Seq((1, 2, 3)).toDF("a", "b", "c")
+    withTable("partitionedTable") {
+      df.write.mode("overwrite").partitionBy("a", 
"b").saveAsTable("partitionedTable")
+      // Misses some partition columns
+      intercept[AnalysisException] {
+        
df.write.mode("append").partitionBy("a").saveAsTable("partitionedTable")
+      }
+      // Wrong order
+      intercept[AnalysisException] {
+        df.write.mode("append").partitionBy("b", 
"a").saveAsTable("partitionedTable")
+      }
+      // Partition columns not specified
+      intercept[AnalysisException] {
+        df.write.mode("append").saveAsTable("partitionedTable")
+      }
+      assert(sql("select * from partitionedTable").collect().size == 1)
+      // Inserts new data successfully when partition columns are correctly 
specified in
+      // partitionBy(...).
+      df.write.mode("append").partitionBy("a", 
"b").saveAsTable("partitionedTable")
+      assert(sql("select * from partitionedTable").collect().size == 2)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to