Repository: spark
Updated Branches:
  refs/heads/master 2597674bc -> f3ed62a38


[SPARK-20831][SQL] Fix INSERT OVERWRITE data source tables with IF NOT EXISTS

### What changes were proposed in this pull request?
Currently, we have a bug when we specify `IF NOT EXISTS` in `INSERT OVERWRITE` 
data source tables. For example, given a query:
```SQL
INSERT OVERWRITE TABLE $tableName partition (b=2, c=3) IF NOT EXISTS SELECT 9, 
10
```
we will get the following error:
```
unresolved operator 'InsertIntoTable Relation[a#425,d#426,b#427,c#428] parquet, 
Map(b -> Some(2), c -> Some(3)), true, true;;
'InsertIntoTable Relation[a#425,d#426,b#427,c#428] parquet, Map(b -> Some(2), c 
-> Some(3)), true, true
+- Project [cast(9#423 as int) AS a#429, cast(10#424 as int) AS d#430]
   +- Project [9 AS 9#423, 10 AS 10#424]
      +- OneRowRelation$
```

This PR is to fix the issue to follow the behavior of Hive serde tables
> INSERT OVERWRITE will overwrite any existing data in the table or partition 
> unless IF NOT EXISTS is provided for a partition

### How was this patch tested?
Modified an existing test case

Author: gatorsmile <gatorsm...@gmail.com>

Closes #18050 from gatorsmile/insertPartitionIfNotExists.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f3ed62a3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f3ed62a3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f3ed62a3

Branch: refs/heads/master
Commit: f3ed62a381897711d86fde27ab80bb70ed0b0513
Parents: 2597674
Author: gatorsmile <gatorsm...@gmail.com>
Authored: Mon May 22 22:24:50 2017 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Mon May 22 22:24:50 2017 +0800

----------------------------------------------------------------------
 .../apache/spark/sql/catalyst/dsl/package.scala |   2 +-
 .../plans/logical/basicLogicalOperators.scala   |  11 +-
 .../org/apache/spark/sql/internal/SQLConf.scala |   2 +-
 .../sql/catalyst/parser/PlanParserSuite.scala   |  10 +-
 .../org/apache/spark/sql/DataFrameWriter.scala  |   2 +-
 .../sql/execution/datasources/DataSource.scala  |   1 +
 .../datasources/DataSourceStrategy.scala        |   5 +-
 .../InsertIntoHadoopFsRelationCommand.scala     |  18 +++-
 .../apache/spark/sql/hive/HiveStrategies.scala  |  14 +--
 .../CreateHiveTableAsSelectCommand.scala        |   4 +-
 .../hive/execution/InsertIntoHiveTable.scala    |   7 +-
 .../sql/hive/InsertIntoHiveTableSuite.scala     | 108 ++++++++-----------
 12 files changed, 90 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f3ed62a3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
index 75bf780..ed423e7 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -366,7 +366,7 @@ package object dsl {
       def insertInto(tableName: String, overwrite: Boolean = false): 
LogicalPlan =
         InsertIntoTable(
           analysis.UnresolvedRelation(TableIdentifier(tableName)),
-          Map.empty, logicalPlan, overwrite, false)
+          Map.empty, logicalPlan, overwrite, ifPartitionNotExists = false)
 
       def as(alias: String): LogicalPlan = SubqueryAlias(alias, logicalPlan)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f3ed62a3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 7a54995..d291ca0 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -410,17 +410,20 @@ case class Hint(name: String, parameters: Seq[String], 
child: LogicalPlan) exten
  *                  would have Map('a' -> Some('1'), 'b' -> None).
  * @param query the logical plan representing data to write to.
  * @param overwrite overwrite existing table or partitions.
- * @param ifNotExists If true, only write if the table or partition does not 
exist.
+ * @param ifPartitionNotExists If true, only write if the partition does not 
exist.
+ *                             Only valid for static partitions.
  */
 case class InsertIntoTable(
     table: LogicalPlan,
     partition: Map[String, Option[String]],
     query: LogicalPlan,
     overwrite: Boolean,
-    ifNotExists: Boolean)
+    ifPartitionNotExists: Boolean)
   extends LogicalPlan {
-  assert(overwrite || !ifNotExists)
-  assert(partition.values.forall(_.nonEmpty) || !ifNotExists)
+  // IF NOT EXISTS is only valid in INSERT OVERWRITE
+  assert(overwrite || !ifPartitionNotExists)
+  // IF NOT EXISTS is only valid in static partitions
+  assert(partition.values.forall(_.nonEmpty) || !ifPartitionNotExists)
 
   // We don't want `table` in children as sometimes we don't want to transform 
it.
   override def children: Seq[LogicalPlan] = query :: Nil

http://git-wip-us.apache.org/repos/asf/spark/blob/f3ed62a3/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index b97adf7..c5d69c2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -303,7 +303,7 @@ object SQLConf {
   val HIVE_MANAGE_FILESOURCE_PARTITIONS =
     buildConf("spark.sql.hive.manageFilesourcePartitions")
       .doc("When true, enable metastore partition management for file source 
tables as well. " +
-           "This includes both datasource and converted Hive tables. When 
partition managment " +
+           "This includes both datasource and converted Hive tables. When 
partition management " +
            "is enabled, datasource tables store partition in the Hive 
metastore, and use the " +
            "metastore to prune partitions during query planning.")
       .booleanConf

http://git-wip-us.apache.org/repos/asf/spark/blob/f3ed62a3/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
index cca0291..d78741d 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
@@ -176,14 +176,14 @@ class PlanParserSuite extends PlanTest {
     def insert(
         partition: Map[String, Option[String]],
         overwrite: Boolean = false,
-        ifNotExists: Boolean = false): LogicalPlan =
-      InsertIntoTable(table("s"), partition, plan, overwrite, ifNotExists)
+        ifPartitionNotExists: Boolean = false): LogicalPlan =
+      InsertIntoTable(table("s"), partition, plan, overwrite, 
ifPartitionNotExists)
 
     // Single inserts
     assertEqual(s"insert overwrite table s $sql",
       insert(Map.empty, overwrite = true))
     assertEqual(s"insert overwrite table s partition (e = 1) if not exists 
$sql",
-      insert(Map("e" -> Option("1")), overwrite = true, ifNotExists = true))
+      insert(Map("e" -> Option("1")), overwrite = true, ifPartitionNotExists = 
true))
     assertEqual(s"insert into s $sql",
       insert(Map.empty))
     assertEqual(s"insert into table s partition (c = 'd', e = 1) $sql",
@@ -193,9 +193,9 @@ class PlanParserSuite extends PlanTest {
     val plan2 = table("t").where('x > 5).select(star())
     assertEqual("from t insert into s select * limit 1 insert into u select * 
where x > 5",
       InsertIntoTable(
-        table("s"), Map.empty, plan.limit(1), false, ifNotExists = 
false).union(
+        table("s"), Map.empty, plan.limit(1), false, ifPartitionNotExists = 
false).union(
         InsertIntoTable(
-          table("u"), Map.empty, plan2, false, ifNotExists = false)))
+          table("u"), Map.empty, plan2, false, ifPartitionNotExists = false)))
   }
 
   test ("insert with if not exists") {

http://git-wip-us.apache.org/repos/asf/spark/blob/f3ed62a3/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 1732a8e..b71c5eb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -286,7 +286,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
         partition = Map.empty[String, Option[String]],
         query = df.logicalPlan,
         overwrite = mode == SaveMode.Overwrite,
-        ifNotExists = false)
+        ifPartitionNotExists = false)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f3ed62a3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index bb7d1f7..14c4060 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -430,6 +430,7 @@ case class DataSource(
       InsertIntoHadoopFsRelationCommand(
         outputPath = outputPath,
         staticPartitions = Map.empty,
+        ifPartitionNotExists = false,
         partitionColumns = partitionAttributes,
         bucketSpec = bucketSpec,
         fileFormat = format,

http://git-wip-us.apache.org/repos/asf/spark/blob/f3ed62a3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index d307122..21d75a4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -142,8 +142,8 @@ case class DataSourceAnalysis(conf: SQLConf) extends 
Rule[LogicalPlan] with Cast
         parts, query, overwrite, false) if parts.isEmpty =>
       InsertIntoDataSourceCommand(l, query, overwrite)
 
-    case InsertIntoTable(
-        l @ LogicalRelation(t: HadoopFsRelation, _, table), parts, query, 
overwrite, false) =>
+    case i @ InsertIntoTable(
+        l @ LogicalRelation(t: HadoopFsRelation, _, table), parts, query, 
overwrite, _) =>
       // If the InsertIntoTable command is for a partitioned HadoopFsRelation 
and
       // the user has specified static partitions, we add a Project operator 
on top of the query
       // to include those constant column values in the query result.
@@ -195,6 +195,7 @@ case class DataSourceAnalysis(conf: SQLConf) extends 
Rule[LogicalPlan] with Cast
       InsertIntoHadoopFsRelationCommand(
         outputPath,
         staticPartitions,
+        i.ifPartitionNotExists,
         partitionSchema,
         t.bucketSpec,
         t.fileFormat,

http://git-wip-us.apache.org/repos/asf/spark/blob/f3ed62a3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
index 19b51d4..c9d3144 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
@@ -37,10 +37,13 @@ import org.apache.spark.sql.execution.command._
  *                         overwrites: when the spec is empty, all partitions 
are overwritten.
  *                         When it covers a prefix of the partition keys, only 
partitions matching
  *                         the prefix are overwritten.
+ * @param ifPartitionNotExists If true, only write if the partition does not 
exist.
+ *                             Only valid for static partitions.
  */
 case class InsertIntoHadoopFsRelationCommand(
     outputPath: Path,
     staticPartitions: TablePartitionSpec,
+    ifPartitionNotExists: Boolean,
     partitionColumns: Seq[Attribute],
     bucketSpec: Option[BucketSpec],
     fileFormat: FileFormat,
@@ -61,8 +64,8 @@ case class InsertIntoHadoopFsRelationCommand(
       val duplicateColumns = query.schema.fieldNames.groupBy(identity).collect 
{
         case (x, ys) if ys.length > 1 => "\"" + x + "\""
       }.mkString(", ")
-      throw new AnalysisException(s"Duplicate column(s) : $duplicateColumns 
found, " +
-          s"cannot save to file.")
+      throw new AnalysisException(s"Duplicate column(s): $duplicateColumns 
found, " +
+        "cannot save to file.")
     }
 
     val hadoopConf = 
sparkSession.sessionState.newHadoopConfWithOptions(options)
@@ -76,11 +79,12 @@ case class InsertIntoHadoopFsRelationCommand(
 
     var initialMatchingPartitions: Seq[TablePartitionSpec] = Nil
     var customPartitionLocations: Map[TablePartitionSpec, String] = Map.empty
+    var matchingPartitions: Seq[CatalogTablePartition] = Seq.empty
 
     // When partitions are tracked by the catalog, compute all custom 
partition locations that
     // may be relevant to the insertion job.
     if (partitionsTrackedByCatalog) {
-      val matchingPartitions = 
sparkSession.sessionState.catalog.listPartitions(
+      matchingPartitions = sparkSession.sessionState.catalog.listPartitions(
         catalogTable.get.identifier, Some(staticPartitions))
       initialMatchingPartitions = matchingPartitions.map(_.spec)
       customPartitionLocations = getCustomPartitionLocations(
@@ -101,8 +105,12 @@ case class InsertIntoHadoopFsRelationCommand(
       case (SaveMode.ErrorIfExists, true) =>
         throw new AnalysisException(s"path $qualifiedOutputPath already 
exists.")
       case (SaveMode.Overwrite, true) =>
-        deleteMatchingPartitions(fs, qualifiedOutputPath, 
customPartitionLocations, committer)
-        true
+        if (ifPartitionNotExists && matchingPartitions.nonEmpty) {
+          false
+        } else {
+          deleteMatchingPartitions(fs, qualifiedOutputPath, 
customPartitionLocations, committer)
+          true
+        }
       case (SaveMode.Append, _) | (SaveMode.Overwrite, _) | 
(SaveMode.ErrorIfExists, false) =>
         true
       case (SaveMode.Ignore, exists) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/f3ed62a3/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 09a5eda..4f090d5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -160,9 +160,9 @@ class DetermineTableStats(session: SparkSession) extends 
Rule[LogicalPlan] {
  */
 object HiveAnalysis extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
-    case InsertIntoTable(relation: CatalogRelation, partSpec, query, 
overwrite, ifNotExists)
-        if DDLUtils.isHiveTable(relation.tableMeta) =>
-      InsertIntoHiveTable(relation.tableMeta, partSpec, query, overwrite, 
ifNotExists)
+    case InsertIntoTable(r: CatalogRelation, partSpec, query, overwrite, 
ifPartitionNotExists)
+        if DDLUtils.isHiveTable(r.tableMeta) =>
+      InsertIntoHiveTable(r.tableMeta, partSpec, query, overwrite, 
ifPartitionNotExists)
 
     case CreateTable(tableDesc, mode, None) if DDLUtils.isHiveTable(tableDesc) 
=>
       CreateTableCommand(tableDesc, ignoreIfExists = mode == SaveMode.Ignore)
@@ -207,11 +207,11 @@ case class RelationConversions(
   override def apply(plan: LogicalPlan): LogicalPlan = {
     plan transformUp {
       // Write path
-      case InsertIntoTable(r: CatalogRelation, partition, query, overwrite, 
ifNotExists)
+      case InsertIntoTable(r: CatalogRelation, partition, query, overwrite, 
ifPartitionNotExists)
         // Inserting into partitioned table is not supported in Parquet/Orc 
data source (yet).
-        if query.resolved && DDLUtils.isHiveTable(r.tableMeta) &&
-          !r.isPartitioned && isConvertible(r) =>
-        InsertIntoTable(convert(r), partition, query, overwrite, ifNotExists)
+          if query.resolved && DDLUtils.isHiveTable(r.tableMeta) &&
+            !r.isPartitioned && isConvertible(r) =>
+        InsertIntoTable(convert(r), partition, query, overwrite, 
ifPartitionNotExists)
 
       // Read path
       case relation: CatalogRelation

http://git-wip-us.apache.org/repos/asf/spark/blob/f3ed62a3/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
index 41c6b18..65e8b4e 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
@@ -62,7 +62,7 @@ case class CreateHiveTableAsSelectCommand(
           Map(),
           query,
           overwrite = false,
-          ifNotExists = false)).toRdd
+          ifPartitionNotExists = false)).toRdd
     } else {
       // TODO ideally, we should get the output data ready first and then
       // add the relation into catalog, just in case of failure occurs while 
data
@@ -78,7 +78,7 @@ case class CreateHiveTableAsSelectCommand(
             Map(),
             query,
             overwrite = true,
-            ifNotExists = false)).toRdd
+            ifPartitionNotExists = false)).toRdd
       } catch {
         case NonFatal(e) =>
           // drop the created table.

http://git-wip-us.apache.org/repos/asf/spark/blob/f3ed62a3/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 10e17c5..10ce8e3 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -71,14 +71,15 @@ import org.apache.spark.SparkException
  *                  }}}.
  * @param query the logical plan representing data to write to.
  * @param overwrite overwrite existing table or partitions.
- * @param ifNotExists If true, only write if the table or partition does not 
exist.
+ * @param ifPartitionNotExists If true, only write if the partition does not 
exist.
+ *                                   Only valid for static partitions.
  */
 case class InsertIntoHiveTable(
     table: CatalogTable,
     partition: Map[String, Option[String]],
     query: LogicalPlan,
     overwrite: Boolean,
-    ifNotExists: Boolean) extends RunnableCommand {
+    ifPartitionNotExists: Boolean) extends RunnableCommand {
 
   override protected def innerChildren: Seq[LogicalPlan] = query :: Nil
 
@@ -375,7 +376,7 @@ case class InsertIntoHiveTable(
 
         var doHiveOverwrite = overwrite
 
-        if (oldPart.isEmpty || !ifNotExists) {
+        if (oldPart.isEmpty || !ifPartitionNotExists) {
           // SPARK-18107: Insert overwrite runs much slower than hive-client.
           // Newer Hive largely improves insert overwrite performance. As 
Spark uses older Hive
           // version and we may not want to catch up new Hive version every 
time. We delete the

http://git-wip-us.apache.org/repos/asf/spark/blob/f3ed62a3/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
index 7bd3973..cc80f2e 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
@@ -166,72 +166,54 @@ class InsertIntoHiveTableSuite extends QueryTest with 
TestHiveSingleton with Bef
     sql("DROP TABLE tmp_table")
   }
 
-  test("INSERT OVERWRITE - partition IF NOT EXISTS") {
-    withTempDir { tmpDir =>
-      val table = "table_with_partition"
-      withTable(table) {
-        val selQuery = s"select c1, p1, p2 from $table"
-        sql(
-          s"""
-             |CREATE TABLE $table(c1 string)
-             |PARTITIONED by (p1 string,p2 string)
-             |location '${tmpDir.toURI.toString}'
-           """.stripMargin)
-        sql(
-          s"""
-             |INSERT OVERWRITE TABLE $table
-             |partition (p1='a',p2='b')
-             |SELECT 'blarr'
-           """.stripMargin)
-        checkAnswer(
-          sql(selQuery),
-          Row("blarr", "a", "b"))
-
-        sql(
-          s"""
-             |INSERT OVERWRITE TABLE $table
-             |partition (p1='a',p2='b')
-             |SELECT 'blarr2'
-           """.stripMargin)
-        checkAnswer(
-          sql(selQuery),
-          Row("blarr2", "a", "b"))
+  testPartitionedTable("INSERT OVERWRITE - partition IF NOT EXISTS") { 
tableName =>
+    val selQuery = s"select a, b, c, d from $tableName"
+    sql(
+      s"""
+         |INSERT OVERWRITE TABLE $tableName
+         |partition (b=2, c=3)
+         |SELECT 1, 4
+        """.stripMargin)
+    checkAnswer(sql(selQuery), Row(1, 2, 3, 4))
 
-        var e = intercept[AnalysisException] {
-          sql(
-            s"""
-               |INSERT OVERWRITE TABLE $table
-               |partition (p1='a',p2) IF NOT EXISTS
-               |SELECT 'blarr3', 'newPartition'
-             """.stripMargin)
-        }
-        assert(e.getMessage.contains(
-          "Dynamic partitions do not support IF NOT EXISTS. Specified 
partitions with value: [p2]"))
+    sql(
+      s"""
+         |INSERT OVERWRITE TABLE $tableName
+         |partition (b=2, c=3)
+         |SELECT 5, 6
+        """.stripMargin)
+    checkAnswer(sql(selQuery), Row(5, 2, 3, 6))
+
+    val e = intercept[AnalysisException] {
+      sql(
+        s"""
+           |INSERT OVERWRITE TABLE $tableName
+           |partition (b=2, c) IF NOT EXISTS
+           |SELECT 7, 8, 3
+          """.stripMargin)
+    }
+    assert(e.getMessage.contains(
+      "Dynamic partitions do not support IF NOT EXISTS. Specified partitions 
with value: [c]"))
 
-        e = intercept[AnalysisException] {
-          sql(
-            s"""
-               |INSERT OVERWRITE TABLE $table
-               |partition (p1='a',p2) IF NOT EXISTS
-               |SELECT 'blarr3', 'b'
-             """.stripMargin)
-        }
-        assert(e.getMessage.contains(
-          "Dynamic partitions do not support IF NOT EXISTS. Specified 
partitions with value: [p2]"))
+    // If the partition already exists, the insert will overwrite the data
+    // unless users specify IF NOT EXISTS
+    sql(
+      s"""
+         |INSERT OVERWRITE TABLE $tableName
+         |partition (b=2, c=3) IF NOT EXISTS
+         |SELECT 9, 10
+        """.stripMargin)
+    checkAnswer(sql(selQuery), Row(5, 2, 3, 6))
 
-        // If the partition already exists, the insert will overwrite the data
-        // unless users specify IF NOT EXISTS
-        sql(
-          s"""
-             |INSERT OVERWRITE TABLE $table
-             |partition (p1='a',p2='b') IF NOT EXISTS
-             |SELECT 'blarr3'
-           """.stripMargin)
-        checkAnswer(
-          sql(selQuery),
-          Row("blarr2", "a", "b"))
-      }
-    }
+    // ADD PARTITION has the same effect, even if no actual data is inserted.
+    sql(s"ALTER TABLE $tableName ADD PARTITION (b=21, c=31)")
+    sql(
+      s"""
+         |INSERT OVERWRITE TABLE $tableName
+         |partition (b=21, c=31) IF NOT EXISTS
+         |SELECT 20, 24
+        """.stripMargin)
+    checkAnswer(sql(selQuery), Row(5, 2, 3, 6))
   }
 
   test("Insert ArrayType.containsNull == false") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to