Github user wangyum commented on a diff in the pull request: https://github.com/apache/spark/pull/19841#discussion_r154480032 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala --- @@ -104,147 +105,153 @@ case class InsertIntoHiveTable( val partitionColumns = fileSinkConf.getTableInfo.getProperties.getProperty("partition_columns") val partitionColumnNames = Option(partitionColumns).map(_.split("/")).getOrElse(Array.empty) - // By this time, the partition map must match the table's partition columns - if (partitionColumnNames.toSet != partition.keySet) { - throw new SparkException( - s"""Requested partitioning does not match the ${table.identifier.table} table: - |Requested partitions: ${partition.keys.mkString(",")} - |Table partitions: ${table.partitionColumnNames.mkString(",")}""".stripMargin) - } - - // Validate partition spec if there exist any dynamic partitions - if (numDynamicPartitions > 0) { - // Report error if dynamic partitioning is not enabled - if (!hadoopConf.get("hive.exec.dynamic.partition", "true").toBoolean) { - throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg) + def processInsert = { + // By this time, the partition map must match the table's partition columns + if (partitionColumnNames.toSet != partition.keySet) { + throw new SparkException( + s"""Requested partitioning does not match the ${table.identifier.table} table: + |Requested partitions: ${partition.keys.mkString(",")} + |Table partitions: ${table.partitionColumnNames.mkString(",")}""".stripMargin) } - // Report error if dynamic partition strict mode is on but no static partition is found - if (numStaticPartitions == 0 && - hadoopConf.get("hive.exec.dynamic.partition.mode", "strict").equalsIgnoreCase("strict")) { - throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg) - } + // Validate partition spec if there exist any dynamic partitions + if (numDynamicPartitions > 0) { + // Report error if dynamic partitioning is not enabled + if (!hadoopConf.get("hive.exec.dynamic.partition", "true").toBoolean) { + throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg) + } + + // Report error if dynamic partition strict mode is on but no static partition is found + if (numStaticPartitions == 0 && + hadoopConf.get("hive.exec.dynamic.partition.mode", "strict").equalsIgnoreCase("strict")) { + throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg) + } - // Report error if any static partition appears after a dynamic partition - val isDynamic = partitionColumnNames.map(partitionSpec(_).isEmpty) - if (isDynamic.init.zip(isDynamic.tail).contains((true, false))) { - throw new AnalysisException(ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg) + // Report error if any static partition appears after a dynamic partition + val isDynamic = partitionColumnNames.map(partitionSpec(_).isEmpty) + if (isDynamic.init.zip(isDynamic.tail).contains((true, false))) { + throw new AnalysisException(ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg) + } } - } - table.bucketSpec match { - case Some(bucketSpec) => - // Writes to bucketed hive tables are allowed only if user does not care about maintaining - // table's bucketing ie. both "hive.enforce.bucketing" and "hive.enforce.sorting" are - // set to false - val enforceBucketingConfig = "hive.enforce.bucketing" - val enforceSortingConfig = "hive.enforce.sorting" + table.bucketSpec match { + case Some(bucketSpec) => + // Writes to bucketed hive tables are allowed only if user does not care about maintaining + // table's bucketing ie. both "hive.enforce.bucketing" and "hive.enforce.sorting" are + // set to false + val enforceBucketingConfig = "hive.enforce.bucketing" + val enforceSortingConfig = "hive.enforce.sorting" - val message = s"Output Hive table ${table.identifier} is bucketed but Spark" + - "currently does NOT populate bucketed output which is compatible with Hive." + val message = s"Output Hive table ${table.identifier} is bucketed but Spark" + + "currently does NOT populate bucketed output which is compatible with Hive." - if (hadoopConf.get(enforceBucketingConfig, "true").toBoolean || - hadoopConf.get(enforceSortingConfig, "true").toBoolean) { - throw new AnalysisException(message) - } else { - logWarning(message + s" Inserting data anyways since both $enforceBucketingConfig and " + - s"$enforceSortingConfig are set to false.") - } - case _ => // do nothing since table has no bucketing - } + if (hadoopConf.get(enforceBucketingConfig, "true").toBoolean || + hadoopConf.get(enforceSortingConfig, "true").toBoolean) { + throw new AnalysisException(message) + } else { + logWarning(message + s" Inserting data anyways since both $enforceBucketingConfig and " + + s"$enforceSortingConfig are set to false.") + } + case _ => // do nothing since table has no bucketing + } - val partitionAttributes = partitionColumnNames.takeRight(numDynamicPartitions).map { name => - query.resolve(name :: Nil, sparkSession.sessionState.analyzer.resolver).getOrElse { - throw new AnalysisException( - s"Unable to resolve $name given [${query.output.map(_.name).mkString(", ")}]") - }.asInstanceOf[Attribute] - } + val partitionAttributes = partitionColumnNames.takeRight(numDynamicPartitions).map { name => + query.resolve(name :: Nil, sparkSession.sessionState.analyzer.resolver).getOrElse { + throw new AnalysisException( + s"Unable to resolve $name given [${query.output.map(_.name).mkString(", ")}]") + }.asInstanceOf[Attribute] + } - saveAsHiveFile( - sparkSession = sparkSession, - queryExecution = Dataset.ofRows(sparkSession, query).queryExecution, - hadoopConf = hadoopConf, - fileSinkConf = fileSinkConf, - outputLocation = tmpLocation.toString, - partitionAttributes = partitionAttributes) + saveAsHiveFile( + sparkSession = sparkSession, + queryExecution = Dataset.ofRows(sparkSession, query).queryExecution, + hadoopConf = hadoopConf, + fileSinkConf = fileSinkConf, + outputLocation = tmpLocation.toString, + partitionAttributes = partitionAttributes) - if (partition.nonEmpty) { - if (numDynamicPartitions > 0) { - externalCatalog.loadDynamicPartitions( - db = table.database, - table = table.identifier.table, - tmpLocation.toString, - partitionSpec, - overwrite, - numDynamicPartitions) - } else { - // scalastyle:off - // ifNotExists is only valid with static partition, refer to - // https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-InsertingdataintoHiveTablesfromqueries - // scalastyle:on - val oldPart = + if (partition.nonEmpty) { + if (numDynamicPartitions > 0) { + externalCatalog.loadDynamicPartitions( + db = table.database, + table = table.identifier.table, + tmpLocation.toString, + partitionSpec, + overwrite, + numDynamicPartitions) + } else { + // scalastyle:off + // ifNotExists is only valid with static partition, refer to + // https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-InsertingdataintoHiveTablesfromqueries + // scalastyle:on + val oldPart = externalCatalog.getPartitionOption( table.database, table.identifier.table, partitionSpec) - var doHiveOverwrite = overwrite - - if (oldPart.isEmpty || !ifPartitionNotExists) { - // SPARK-18107: Insert overwrite runs much slower than hive-client. - // Newer Hive largely improves insert overwrite performance. As Spark uses older Hive - // version and we may not want to catch up new Hive version every time. We delete the - // Hive partition first and then load data file into the Hive partition. - if (oldPart.nonEmpty && overwrite) { - oldPart.get.storage.locationUri.foreach { uri => - val partitionPath = new Path(uri) - val fs = partitionPath.getFileSystem(hadoopConf) - if (fs.exists(partitionPath)) { - if (!fs.delete(partitionPath, true)) { - throw new RuntimeException( - "Cannot remove partition directory '" + partitionPath.toString) + var doHiveOverwrite = overwrite + + if (oldPart.isEmpty || !ifPartitionNotExists) { + // SPARK-18107: Insert overwrite runs much slower than hive-client. + // Newer Hive largely improves insert overwrite performance. As Spark uses older Hive + // version and we may not want to catch up new Hive version every time. We delete the + // Hive partition first and then load data file into the Hive partition. + if (oldPart.nonEmpty && overwrite) { + oldPart.get.storage.locationUri.foreach { uri => + val partitionPath = new Path(uri) + val fs = partitionPath.getFileSystem(hadoopConf) + if (fs.exists(partitionPath)) { + if (!fs.delete(partitionPath, true)) { + throw new RuntimeException( + "Cannot remove partition directory '" + partitionPath.toString) + } + // Don't let Hive do overwrite operation since it is slower. + doHiveOverwrite = false } - // Don't let Hive do overwrite operation since it is slower. - doHiveOverwrite = false } } - } - // inheritTableSpecs is set to true. It should be set to false for an IMPORT query - // which is currently considered as a Hive native command. - val inheritTableSpecs = true - externalCatalog.loadPartition( - table.database, - table.identifier.table, - tmpLocation.toString, - partitionSpec, - isOverwrite = doHiveOverwrite, - inheritTableSpecs = inheritTableSpecs, - isSrcLocal = false) + // inheritTableSpecs is set to true. It should be set to false for an IMPORT query --- End diff -- l l The number of spaces is different
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org