Repository: spark Updated Branches: refs/heads/branch-2.2 81f63c892 -> a57553279
[SPARK-20831][SQL] Fix INSERT OVERWRITE data source tables with IF NOT EXISTS ### What changes were proposed in this pull request? Currently, we have a bug when we specify `IF NOT EXISTS` in `INSERT OVERWRITE` data source tables. For example, given a query: ```SQL INSERT OVERWRITE TABLE $tableName partition (b=2, c=3) IF NOT EXISTS SELECT 9, 10 ``` we will get the following error: ``` unresolved operator 'InsertIntoTable Relation[a#425,d#426,b#427,c#428] parquet, Map(b -> Some(2), c -> Some(3)), true, true;; 'InsertIntoTable Relation[a#425,d#426,b#427,c#428] parquet, Map(b -> Some(2), c -> Some(3)), true, true +- Project [cast(9#423 as int) AS a#429, cast(10#424 as int) AS d#430] +- Project [9 AS 9#423, 10 AS 10#424] +- OneRowRelation$ ``` This PR is to fix the issue to follow the behavior of Hive serde tables > INSERT OVERWRITE will overwrite any existing data in the table or partition > unless IF NOT EXISTS is provided for a partition ### How was this patch tested? Modified an existing test case Author: gatorsmile <gatorsm...@gmail.com> Closes #18050 from gatorsmile/insertPartitionIfNotExists. (cherry picked from commit f3ed62a381897711d86fde27ab80bb70ed0b0513) Signed-off-by: Wenchen Fan <wenc...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a5755327 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a5755327 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a5755327 Branch: refs/heads/branch-2.2 Commit: a57553279b1c680c370b7ee4e617f8a623030d60 Parents: 81f63c8 Author: gatorsmile <gatorsm...@gmail.com> Authored: Mon May 22 22:24:50 2017 +0800 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Mon May 22 22:25:19 2017 +0800 ---------------------------------------------------------------------- .../apache/spark/sql/catalyst/dsl/package.scala | 2 +- .../plans/logical/basicLogicalOperators.scala | 11 +- .../org/apache/spark/sql/internal/SQLConf.scala | 2 +- .../sql/catalyst/parser/PlanParserSuite.scala | 10 +- .../org/apache/spark/sql/DataFrameWriter.scala | 2 +- .../sql/execution/datasources/DataSource.scala | 1 + .../datasources/DataSourceStrategy.scala | 5 +- .../InsertIntoHadoopFsRelationCommand.scala | 18 +++- .../apache/spark/sql/hive/HiveStrategies.scala | 14 +-- .../CreateHiveTableAsSelectCommand.scala | 4 +- .../hive/execution/InsertIntoHiveTable.scala | 7 +- .../sql/hive/InsertIntoHiveTableSuite.scala | 108 ++++++++----------- 12 files changed, 90 insertions(+), 94 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/a5755327/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala index 75bf780..ed423e7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala @@ -366,7 +366,7 @@ package object dsl { def insertInto(tableName: String, overwrite: Boolean = false): LogicalPlan = InsertIntoTable( analysis.UnresolvedRelation(TableIdentifier(tableName)), - Map.empty, logicalPlan, overwrite, false) + Map.empty, logicalPlan, overwrite, ifPartitionNotExists = false) def as(alias: String): LogicalPlan = SubqueryAlias(alias, logicalPlan) http://git-wip-us.apache.org/repos/asf/spark/blob/a5755327/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 2c19265..64c3649 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -410,17 +410,20 @@ case class Hint(name: String, parameters: Seq[String], child: LogicalPlan) exten * would have Map('a' -> Some('1'), 'b' -> None). * @param query the logical plan representing data to write to. * @param overwrite overwrite existing table or partitions. - * @param ifNotExists If true, only write if the table or partition does not exist. + * @param ifPartitionNotExists If true, only write if the partition does not exist. + * Only valid for static partitions. */ case class InsertIntoTable( table: LogicalPlan, partition: Map[String, Option[String]], query: LogicalPlan, overwrite: Boolean, - ifNotExists: Boolean) + ifPartitionNotExists: Boolean) extends LogicalPlan { - assert(overwrite || !ifNotExists) - assert(partition.values.forall(_.nonEmpty) || !ifNotExists) + // IF NOT EXISTS is only valid in INSERT OVERWRITE + assert(overwrite || !ifPartitionNotExists) + // IF NOT EXISTS is only valid in static partitions + assert(partition.values.forall(_.nonEmpty) || !ifPartitionNotExists) // We don't want `table` in children as sometimes we don't want to transform it. override def children: Seq[LogicalPlan] = query :: Nil http://git-wip-us.apache.org/repos/asf/spark/blob/a5755327/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index b97adf7..c5d69c2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -303,7 +303,7 @@ object SQLConf { val HIVE_MANAGE_FILESOURCE_PARTITIONS = buildConf("spark.sql.hive.manageFilesourcePartitions") .doc("When true, enable metastore partition management for file source tables as well. " + - "This includes both datasource and converted Hive tables. When partition managment " + + "This includes both datasource and converted Hive tables. When partition management " + "is enabled, datasource tables store partition in the Hive metastore, and use the " + "metastore to prune partitions during query planning.") .booleanConf http://git-wip-us.apache.org/repos/asf/spark/blob/a5755327/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala index 411777d..fa42efa 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala @@ -176,14 +176,14 @@ class PlanParserSuite extends PlanTest { def insert( partition: Map[String, Option[String]], overwrite: Boolean = false, - ifNotExists: Boolean = false): LogicalPlan = - InsertIntoTable(table("s"), partition, plan, overwrite, ifNotExists) + ifPartitionNotExists: Boolean = false): LogicalPlan = + InsertIntoTable(table("s"), partition, plan, overwrite, ifPartitionNotExists) // Single inserts assertEqual(s"insert overwrite table s $sql", insert(Map.empty, overwrite = true)) assertEqual(s"insert overwrite table s partition (e = 1) if not exists $sql", - insert(Map("e" -> Option("1")), overwrite = true, ifNotExists = true)) + insert(Map("e" -> Option("1")), overwrite = true, ifPartitionNotExists = true)) assertEqual(s"insert into s $sql", insert(Map.empty)) assertEqual(s"insert into table s partition (c = 'd', e = 1) $sql", @@ -193,9 +193,9 @@ class PlanParserSuite extends PlanTest { val plan2 = table("t").where('x > 5).select(star()) assertEqual("from t insert into s select * limit 1 insert into u select * where x > 5", InsertIntoTable( - table("s"), Map.empty, plan.limit(1), false, ifNotExists = false).union( + table("s"), Map.empty, plan.limit(1), false, ifPartitionNotExists = false).union( InsertIntoTable( - table("u"), Map.empty, plan2, false, ifNotExists = false))) + table("u"), Map.empty, plan2, false, ifPartitionNotExists = false))) } test ("insert with if not exists") { http://git-wip-us.apache.org/repos/asf/spark/blob/a5755327/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 1732a8e..b71c5eb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -286,7 +286,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { partition = Map.empty[String, Option[String]], query = df.logicalPlan, overwrite = mode == SaveMode.Overwrite, - ifNotExists = false) + ifPartitionNotExists = false) } } http://git-wip-us.apache.org/repos/asf/spark/blob/a5755327/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index bb7d1f7..14c4060 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -430,6 +430,7 @@ case class DataSource( InsertIntoHadoopFsRelationCommand( outputPath = outputPath, staticPartitions = Map.empty, + ifPartitionNotExists = false, partitionColumns = partitionAttributes, bucketSpec = bucketSpec, fileFormat = format, http://git-wip-us.apache.org/repos/asf/spark/blob/a5755327/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index d307122..21d75a4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -142,8 +142,8 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast parts, query, overwrite, false) if parts.isEmpty => InsertIntoDataSourceCommand(l, query, overwrite) - case InsertIntoTable( - l @ LogicalRelation(t: HadoopFsRelation, _, table), parts, query, overwrite, false) => + case i @ InsertIntoTable( + l @ LogicalRelation(t: HadoopFsRelation, _, table), parts, query, overwrite, _) => // If the InsertIntoTable command is for a partitioned HadoopFsRelation and // the user has specified static partitions, we add a Project operator on top of the query // to include those constant column values in the query result. @@ -195,6 +195,7 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast InsertIntoHadoopFsRelationCommand( outputPath, staticPartitions, + i.ifPartitionNotExists, partitionSchema, t.bucketSpec, t.fileFormat, http://git-wip-us.apache.org/repos/asf/spark/blob/a5755327/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index 19b51d4..c9d3144 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -37,10 +37,13 @@ import org.apache.spark.sql.execution.command._ * overwrites: when the spec is empty, all partitions are overwritten. * When it covers a prefix of the partition keys, only partitions matching * the prefix are overwritten. + * @param ifPartitionNotExists If true, only write if the partition does not exist. + * Only valid for static partitions. */ case class InsertIntoHadoopFsRelationCommand( outputPath: Path, staticPartitions: TablePartitionSpec, + ifPartitionNotExists: Boolean, partitionColumns: Seq[Attribute], bucketSpec: Option[BucketSpec], fileFormat: FileFormat, @@ -61,8 +64,8 @@ case class InsertIntoHadoopFsRelationCommand( val duplicateColumns = query.schema.fieldNames.groupBy(identity).collect { case (x, ys) if ys.length > 1 => "\"" + x + "\"" }.mkString(", ") - throw new AnalysisException(s"Duplicate column(s) : $duplicateColumns found, " + - s"cannot save to file.") + throw new AnalysisException(s"Duplicate column(s): $duplicateColumns found, " + + "cannot save to file.") } val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(options) @@ -76,11 +79,12 @@ case class InsertIntoHadoopFsRelationCommand( var initialMatchingPartitions: Seq[TablePartitionSpec] = Nil var customPartitionLocations: Map[TablePartitionSpec, String] = Map.empty + var matchingPartitions: Seq[CatalogTablePartition] = Seq.empty // When partitions are tracked by the catalog, compute all custom partition locations that // may be relevant to the insertion job. if (partitionsTrackedByCatalog) { - val matchingPartitions = sparkSession.sessionState.catalog.listPartitions( + matchingPartitions = sparkSession.sessionState.catalog.listPartitions( catalogTable.get.identifier, Some(staticPartitions)) initialMatchingPartitions = matchingPartitions.map(_.spec) customPartitionLocations = getCustomPartitionLocations( @@ -101,8 +105,12 @@ case class InsertIntoHadoopFsRelationCommand( case (SaveMode.ErrorIfExists, true) => throw new AnalysisException(s"path $qualifiedOutputPath already exists.") case (SaveMode.Overwrite, true) => - deleteMatchingPartitions(fs, qualifiedOutputPath, customPartitionLocations, committer) - true + if (ifPartitionNotExists && matchingPartitions.nonEmpty) { + false + } else { + deleteMatchingPartitions(fs, qualifiedOutputPath, customPartitionLocations, committer) + true + } case (SaveMode.Append, _) | (SaveMode.Overwrite, _) | (SaveMode.ErrorIfExists, false) => true case (SaveMode.Ignore, exists) => http://git-wip-us.apache.org/repos/asf/spark/blob/a5755327/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 09a5eda..4f090d5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -160,9 +160,9 @@ class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] { */ object HiveAnalysis extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case InsertIntoTable(relation: CatalogRelation, partSpec, query, overwrite, ifNotExists) - if DDLUtils.isHiveTable(relation.tableMeta) => - InsertIntoHiveTable(relation.tableMeta, partSpec, query, overwrite, ifNotExists) + case InsertIntoTable(r: CatalogRelation, partSpec, query, overwrite, ifPartitionNotExists) + if DDLUtils.isHiveTable(r.tableMeta) => + InsertIntoHiveTable(r.tableMeta, partSpec, query, overwrite, ifPartitionNotExists) case CreateTable(tableDesc, mode, None) if DDLUtils.isHiveTable(tableDesc) => CreateTableCommand(tableDesc, ignoreIfExists = mode == SaveMode.Ignore) @@ -207,11 +207,11 @@ case class RelationConversions( override def apply(plan: LogicalPlan): LogicalPlan = { plan transformUp { // Write path - case InsertIntoTable(r: CatalogRelation, partition, query, overwrite, ifNotExists) + case InsertIntoTable(r: CatalogRelation, partition, query, overwrite, ifPartitionNotExists) // Inserting into partitioned table is not supported in Parquet/Orc data source (yet). - if query.resolved && DDLUtils.isHiveTable(r.tableMeta) && - !r.isPartitioned && isConvertible(r) => - InsertIntoTable(convert(r), partition, query, overwrite, ifNotExists) + if query.resolved && DDLUtils.isHiveTable(r.tableMeta) && + !r.isPartitioned && isConvertible(r) => + InsertIntoTable(convert(r), partition, query, overwrite, ifPartitionNotExists) // Read path case relation: CatalogRelation http://git-wip-us.apache.org/repos/asf/spark/blob/a5755327/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala index 41c6b18..65e8b4e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala @@ -62,7 +62,7 @@ case class CreateHiveTableAsSelectCommand( Map(), query, overwrite = false, - ifNotExists = false)).toRdd + ifPartitionNotExists = false)).toRdd } else { // TODO ideally, we should get the output data ready first and then // add the relation into catalog, just in case of failure occurs while data @@ -78,7 +78,7 @@ case class CreateHiveTableAsSelectCommand( Map(), query, overwrite = true, - ifNotExists = false)).toRdd + ifPartitionNotExists = false)).toRdd } catch { case NonFatal(e) => // drop the created table. http://git-wip-us.apache.org/repos/asf/spark/blob/a5755327/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 3facf9f..35f65e9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -71,14 +71,15 @@ import org.apache.spark.SparkException * }}}. * @param query the logical plan representing data to write to. * @param overwrite overwrite existing table or partitions. - * @param ifNotExists If true, only write if the table or partition does not exist. + * @param ifPartitionNotExists If true, only write if the partition does not exist. + * Only valid for static partitions. */ case class InsertIntoHiveTable( table: CatalogTable, partition: Map[String, Option[String]], query: LogicalPlan, overwrite: Boolean, - ifNotExists: Boolean) extends RunnableCommand { + ifPartitionNotExists: Boolean) extends RunnableCommand { override protected def innerChildren: Seq[LogicalPlan] = query :: Nil @@ -354,7 +355,7 @@ case class InsertIntoHiveTable( var doHiveOverwrite = overwrite - if (oldPart.isEmpty || !ifNotExists) { + if (oldPart.isEmpty || !ifPartitionNotExists) { // SPARK-18107: Insert overwrite runs much slower than hive-client. // Newer Hive largely improves insert overwrite performance. As Spark uses older Hive // version and we may not want to catch up new Hive version every time. We delete the http://git-wip-us.apache.org/repos/asf/spark/blob/a5755327/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala index 2c724f8..58ab0c2 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala @@ -166,72 +166,54 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef sql("DROP TABLE tmp_table") } - test("INSERT OVERWRITE - partition IF NOT EXISTS") { - withTempDir { tmpDir => - val table = "table_with_partition" - withTable(table) { - val selQuery = s"select c1, p1, p2 from $table" - sql( - s""" - |CREATE TABLE $table(c1 string) - |PARTITIONED by (p1 string,p2 string) - |location '${tmpDir.toURI.toString}' - """.stripMargin) - sql( - s""" - |INSERT OVERWRITE TABLE $table - |partition (p1='a',p2='b') - |SELECT 'blarr' - """.stripMargin) - checkAnswer( - sql(selQuery), - Row("blarr", "a", "b")) - - sql( - s""" - |INSERT OVERWRITE TABLE $table - |partition (p1='a',p2='b') - |SELECT 'blarr2' - """.stripMargin) - checkAnswer( - sql(selQuery), - Row("blarr2", "a", "b")) + testPartitionedTable("INSERT OVERWRITE - partition IF NOT EXISTS") { tableName => + val selQuery = s"select a, b, c, d from $tableName" + sql( + s""" + |INSERT OVERWRITE TABLE $tableName + |partition (b=2, c=3) + |SELECT 1, 4 + """.stripMargin) + checkAnswer(sql(selQuery), Row(1, 2, 3, 4)) - var e = intercept[AnalysisException] { - sql( - s""" - |INSERT OVERWRITE TABLE $table - |partition (p1='a',p2) IF NOT EXISTS - |SELECT 'blarr3', 'newPartition' - """.stripMargin) - } - assert(e.getMessage.contains( - "Dynamic partitions do not support IF NOT EXISTS. Specified partitions with value: [p2]")) + sql( + s""" + |INSERT OVERWRITE TABLE $tableName + |partition (b=2, c=3) + |SELECT 5, 6 + """.stripMargin) + checkAnswer(sql(selQuery), Row(5, 2, 3, 6)) + + val e = intercept[AnalysisException] { + sql( + s""" + |INSERT OVERWRITE TABLE $tableName + |partition (b=2, c) IF NOT EXISTS + |SELECT 7, 8, 3 + """.stripMargin) + } + assert(e.getMessage.contains( + "Dynamic partitions do not support IF NOT EXISTS. Specified partitions with value: [c]")) - e = intercept[AnalysisException] { - sql( - s""" - |INSERT OVERWRITE TABLE $table - |partition (p1='a',p2) IF NOT EXISTS - |SELECT 'blarr3', 'b' - """.stripMargin) - } - assert(e.getMessage.contains( - "Dynamic partitions do not support IF NOT EXISTS. Specified partitions with value: [p2]")) + // If the partition already exists, the insert will overwrite the data + // unless users specify IF NOT EXISTS + sql( + s""" + |INSERT OVERWRITE TABLE $tableName + |partition (b=2, c=3) IF NOT EXISTS + |SELECT 9, 10 + """.stripMargin) + checkAnswer(sql(selQuery), Row(5, 2, 3, 6)) - // If the partition already exists, the insert will overwrite the data - // unless users specify IF NOT EXISTS - sql( - s""" - |INSERT OVERWRITE TABLE $table - |partition (p1='a',p2='b') IF NOT EXISTS - |SELECT 'blarr3' - """.stripMargin) - checkAnswer( - sql(selQuery), - Row("blarr2", "a", "b")) - } - } + // ADD PARTITION has the same effect, even if no actual data is inserted. + sql(s"ALTER TABLE $tableName ADD PARTITION (b=21, c=31)") + sql( + s""" + |INSERT OVERWRITE TABLE $tableName + |partition (b=21, c=31) IF NOT EXISTS + |SELECT 20, 24 + """.stripMargin) + checkAnswer(sql(selQuery), Row(5, 2, 3, 6)) } test("Insert ArrayType.containsNull == false") { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org