This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 27e20fe9eb1 [SPARK-41708][SQL] Pull v1write information to `WriteFiles` 27e20fe9eb1 is described below commit 27e20fe9eb1b1ef1b3d32e180de55931f31fc345 Author: ulysses-you <ulyssesyo...@gmail.com> AuthorDate: Fri Jan 6 12:13:30 2023 +0800 [SPARK-41708][SQL] Pull v1write information to `WriteFiles` ### What changes were proposed in this pull request? This pr aims to pull out the v1write information from `V1WriteCommand` to `WriteFiles`: ```scala case class WriteFiles(child: LogicalPlan) => case class WriteFiles( child: LogicalPlan, fileFormat: FileFormat, partitionColumns: Seq[Attribute], bucketSpec: Option[BucketSpec], options: Map[String, String], staticPartitions: TablePartitionSpec) ``` Also, this pr do a cleanup for `WriteSpec` which is unnecessary. ### Why are the changes needed? After this pr, `WriteFiles` will hold write information that can help developers ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? Pass CI Closes #39277 from ulysses-you/SPARK-41708. Authored-by: ulysses-you <ulyssesyo...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../org/apache/spark/sql/internal/WriteSpec.java | 33 --- .../org/apache/spark/sql/execution/SparkPlan.scala | 9 +- .../spark/sql/execution/SparkStrategies.scala | 5 +- .../spark/sql/execution/datasources/V1Writes.scala | 24 ++- .../sql/execution/datasources/WriteFiles.scala | 26 ++- .../org/apache/spark/sql/hive/HiveStrategies.scala | 3 +- .../{SaveAsHiveFile.scala => HiveTempPath.scala} | 204 ++++++------------- .../hive/execution/InsertIntoHiveDirCommand.scala | 13 +- .../sql/hive/execution/InsertIntoHiveTable.scala | 88 +++++--- .../spark/sql/hive/execution/SaveAsHiveFile.scala | 221 +-------------------- .../sql/hive/execution/V1WritesHiveUtils.scala | 33 ++- .../org/apache/spark/sql/hive/InsertSuite.scala | 15 +- 12 files changed, 224 insertions(+), 450 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/internal/WriteSpec.java b/sql/core/src/main/java/org/apache/spark/sql/internal/WriteSpec.java deleted file mode 100644 index c51a3ed7dc6..00000000000 --- a/sql/core/src/main/java/org/apache/spark/sql/internal/WriteSpec.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.internal; - -import java.io.Serializable; - -/** - * Write spec is a input parameter of - * {@link org.apache.spark.sql.execution.SparkPlan#executeWrite}. - * - * <p> - * This is an empty interface, the concrete class which implements - * {@link org.apache.spark.sql.execution.SparkPlan#doExecuteWrite} - * should define its own class and use it. - * - * @since 3.4.0 - */ -public interface WriteSpec extends Serializable {} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 401302e5bde..5ca36a8a216 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -36,8 +36,9 @@ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.catalyst.trees.{BinaryLike, LeafLike, TreeNodeTag, UnaryLike} import org.apache.spark.sql.connector.write.WriterCommitMessage import org.apache.spark.sql.errors.QueryExecutionErrors +import org.apache.spark.sql.execution.datasources.WriteFilesSpec import org.apache.spark.sql.execution.metric.SQLMetric -import org.apache.spark.sql.internal.{SQLConf, WriteSpec} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.NextIterator import org.apache.spark.util.io.{ChunkedByteBuffer, ChunkedByteBufferOutputStream} @@ -230,11 +231,11 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ * * Concrete implementations of SparkPlan should override `doExecuteWrite`. */ - def executeWrite(writeSpec: WriteSpec): RDD[WriterCommitMessage] = executeQuery { + def executeWrite(writeFilesSpec: WriteFilesSpec): RDD[WriterCommitMessage] = executeQuery { if (isCanonicalizedPlan) { throw SparkException.internalError("A canonicalized plan is not supposed to be executed.") } - doExecuteWrite(writeSpec) + doExecuteWrite(writeFilesSpec) } /** @@ -343,7 +344,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ * * Overridden by concrete implementations of SparkPlan. */ - protected def doExecuteWrite(writeSpec: WriteSpec): RDD[WriterCommitMessage] = { + protected def doExecuteWrite(writeFilesSpec: WriteFilesSpec): RDD[WriterCommitMessage] = { throw SparkException.internalError(s"Internal Error ${this.getClass} has write support" + s" mismatch:\n${this}") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 51a0c837c3e..110fe45cc12 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -895,8 +895,9 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { throw QueryExecutionErrors.ddlUnsupportedTemporarilyError("MERGE INTO TABLE") case logical.CollectMetrics(name, metrics, child) => execution.CollectMetricsExec(name, metrics, planLater(child)) :: Nil - case WriteFiles(child) => - WriteFilesExec(planLater(child)) :: Nil + case WriteFiles(child, fileFormat, partitionColumns, bucket, options, staticPartitions) => + WriteFilesExec(planLater(child), fileFormat, partitionColumns, bucket, options, + staticPartitions) :: Nil case _ => Nil } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala index 3ed04e5bd6d..7b4fa7ad80b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Attribute, AttributeMap, AttributeSet, BitwiseAnd, Expression, HiveHash, Literal, NamedExpression, Pmod, SortOrder, String2StringExpression, UnaryExpression} import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Sort} @@ -31,11 +32,31 @@ import org.apache.spark.sql.types.StringType import org.apache.spark.unsafe.types.UTF8String trait V1WriteCommand extends DataWritingCommand { + /** + * Specify the [[FileFormat]] of the provider of V1 write command. + */ + def fileFormat: FileFormat + /** * Specify the partition columns of the V1 write command. */ def partitionColumns: Seq[Attribute] + /** + * Specify the partition spec of the V1 write command. + */ + def staticPartitions: TablePartitionSpec + + /** + * Specify the bucket spec of the V1 write command. + */ + def bucketSpec: Option[BucketSpec] + + /** + * Specify the storage options of the V1 write command. + */ + def options: Map[String, String] + /** * Specify the required ordering for the V1 write command. `FileFormatWriter` will * add SortExec if necessary when the requiredOrdering is empty. @@ -56,7 +77,8 @@ object V1Writes extends Rule[LogicalPlan] with SQLConfHelper { case write: V1WriteCommand if !write.child.isInstanceOf[WriteFiles] => val newQuery = prepareQuery(write, write.query) val attrMap = AttributeMap(write.query.output.zip(newQuery.output)) - val newChild = WriteFiles(newQuery) + val newChild = WriteFiles(newQuery, write.fileFormat, write.partitionColumns, + write.bucketSpec, write.options, write.staticPartitions) val newWrite = write.withNewChildren(newChild :: Nil).transformExpressions { case a: Attribute if attrMap.contains(a) => a.withExprId(attrMap(a).exprId) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteFiles.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteFiles.scala index 5bc8f9db32b..53d94704471 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteFiles.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteFiles.scala @@ -23,12 +23,13 @@ import org.apache.spark.{SparkException, TaskContext} import org.apache.spark.internal.io.{FileCommitProtocol, SparkHadoopWriterUtils} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode} import org.apache.spark.sql.connector.write.WriterCommitMessage import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} import org.apache.spark.sql.execution.datasources.FileFormatWriter.ConcurrentOutputWriterSpec -import org.apache.spark.sql.internal.WriteSpec /** * The write files spec holds all information of [[V1WriteCommand]] if its provider is @@ -38,13 +39,18 @@ case class WriteFilesSpec( description: WriteJobDescription, committer: FileCommitProtocol, concurrentOutputWriterSpecFunc: SparkPlan => Option[ConcurrentOutputWriterSpec]) - extends WriteSpec /** * During Optimizer, [[V1Writes]] injects the [[WriteFiles]] between [[V1WriteCommand]] and query. * [[WriteFiles]] must be the root plan as the child of [[V1WriteCommand]]. */ -case class WriteFiles(child: LogicalPlan) extends UnaryNode { +case class WriteFiles( + child: LogicalPlan, + fileFormat: FileFormat, + partitionColumns: Seq[Attribute], + bucketSpec: Option[BucketSpec], + options: Map[String, String], + staticPartitions: TablePartitionSpec) extends UnaryNode { override def output: Seq[Attribute] = child.output override protected def withNewChildInternal(newChild: LogicalPlan): WriteFiles = copy(child = newChild) @@ -53,13 +59,17 @@ case class WriteFiles(child: LogicalPlan) extends UnaryNode { /** * Responsible for writing files. */ -case class WriteFilesExec(child: SparkPlan) extends UnaryExecNode { +case class WriteFilesExec( + child: SparkPlan, + fileFormat: FileFormat, + partitionColumns: Seq[Attribute], + bucketSpec: Option[BucketSpec], + options: Map[String, String], + staticPartitions: TablePartitionSpec) extends UnaryExecNode { override def output: Seq[Attribute] = Seq.empty - override protected def doExecuteWrite(writeSpec: WriteSpec): RDD[WriterCommitMessage] = { - assert(writeSpec.isInstanceOf[WriteFilesSpec]) - val writeFilesSpec: WriteFilesSpec = writeSpec.asInstanceOf[WriteFilesSpec] - + override protected def doExecuteWrite( + writeFilesSpec: WriteFilesSpec): RDD[WriterCommitMessage] = { val rdd = child.execute() // SPARK-23271 If we are attempting to write a zero partition rdd, create a dummy single // partition rdd to make sure we at least set up one write task to write the metadata. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index af727f966e5..6c5646a2416 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -238,7 +238,8 @@ case class RelationConversions( // that only matches table insertion inside Hive CTAS. // This pattern would not cause conflicts because this rule is always applied before // `HiveAnalysis` and both of these rules are running once. - case InsertIntoHiveTable(tableDesc, _, query, overwrite, ifPartitionNotExists, _) + case InsertIntoHiveTable( + tableDesc, _, query, overwrite, ifPartitionNotExists, _, _, _, _, _, _) if query.resolved && DDLUtils.isHiveTable(tableDesc) && tableDesc.partitionColumnNames.isEmpty && isConvertible(tableDesc) && conf.getConf(HiveUtils.CONVERT_METASTORE_CTAS) => diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTempPath.scala similarity index 54% copy from sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala copy to sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTempPath.scala index 799cea42e1e..9981ae4cc31 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTempPath.scala @@ -29,83 +29,19 @@ import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.hive.common.FileUtils import org.apache.hadoop.hive.ql.exec.TaskRunner -import org.apache.spark.internal.io.FileCommitProtocol +import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.catalog.BucketSpec -import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec -import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.errors.QueryExecutionErrors -import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.execution.command.DataWritingCommand -import org.apache.spark.sql.execution.datasources.FileFormatWriter import org.apache.spark.sql.hive.HiveExternalCatalog -import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} import org.apache.spark.sql.hive.client.HiveVersion -// Base trait from which all hive insert statement physical execution extends. -private[hive] trait SaveAsHiveFile extends DataWritingCommand with V1WritesHiveUtils { +class HiveTempPath(session: SparkSession, val hadoopConf: Configuration, path: Path) + extends Logging { + private var stagingDirForCreating: Option[Path] = None - var createdTempDir: Option[Path] = None + lazy val externalTempPath: Path = getExternalTmpPath(path) - protected def saveAsHiveFile( - sparkSession: SparkSession, - plan: SparkPlan, - hadoopConf: Configuration, - fileSinkConf: FileSinkDesc, - outputLocation: String, - customPartitionLocations: Map[TablePartitionSpec, String] = Map.empty, - partitionAttributes: Seq[Attribute] = Nil, - bucketSpec: Option[BucketSpec] = None): Set[String] = { - - val isCompressed = - fileSinkConf.getTableInfo.getOutputFileFormatClassName.toLowerCase(Locale.ROOT) match { - case formatName if formatName.endsWith("orcoutputformat") => - // For ORC,"mapreduce.output.fileoutputformat.compress", - // "mapreduce.output.fileoutputformat.compress.codec", and - // "mapreduce.output.fileoutputformat.compress.type" - // have no impact because it uses table properties to store compression information. - false - case _ => hadoopConf.get("hive.exec.compress.output", "false").toBoolean - } - - if (isCompressed) { - hadoopConf.set("mapreduce.output.fileoutputformat.compress", "true") - fileSinkConf.setCompressed(true) - fileSinkConf.setCompressCodec(hadoopConf - .get("mapreduce.output.fileoutputformat.compress.codec")) - fileSinkConf.setCompressType(hadoopConf - .get("mapreduce.output.fileoutputformat.compress.type")) - } else { - // Set compression by priority - HiveOptions.getHiveWriteCompression(fileSinkConf.getTableInfo, sparkSession.sessionState.conf) - .foreach { case (compression, codec) => hadoopConf.set(compression, codec) } - } - - val committer = FileCommitProtocol.instantiate( - sparkSession.sessionState.conf.fileCommitProtocolClass, - jobId = java.util.UUID.randomUUID().toString, - outputPath = outputLocation) - - val options = getOptionsWithHiveBucketWrite(bucketSpec) - - FileFormatWriter.write( - sparkSession = sparkSession, - plan = plan, - fileFormat = new HiveFileFormat(fileSinkConf), - committer = committer, - outputSpec = - FileFormatWriter.OutputSpec(outputLocation, customPartitionLocations, outputColumns), - hadoopConf = hadoopConf, - partitionColumns = partitionAttributes, - bucketSpec = bucketSpec, - statsTrackers = Seq(basicWriteJobStatsTracker(hadoopConf)), - options = options) - } - - protected def getExternalTmpPath( - sparkSession: SparkSession, - hadoopConf: Configuration, - path: Path): Path = { + private def getExternalTmpPath(path: Path): Path = { import org.apache.spark.sql.hive.client.hive._ // Before Hive 1.1, when inserting into a table, Hive will create the staging directory under @@ -125,43 +61,22 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand with V1WritesHiveU assert(hiveVersionsUsingNewExternalTempPath ++ hiveVersionsUsingOldExternalTempPath == allSupportedHiveVersions) - val externalCatalog = sparkSession.sharedState.externalCatalog + val externalCatalog = session.sharedState.externalCatalog val hiveVersion = externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client.version val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging") val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive") if (hiveVersionsUsingOldExternalTempPath.contains(hiveVersion)) { - oldVersionExternalTempPath(path, hadoopConf, scratchDir) + oldVersionExternalTempPath(path, scratchDir) } else if (hiveVersionsUsingNewExternalTempPath.contains(hiveVersion)) { - newVersionExternalTempPath(path, hadoopConf, stagingDir) + newVersionExternalTempPath(path, stagingDir) } else { throw new IllegalStateException("Unsupported hive version: " + hiveVersion.fullVersion) } } - protected def deleteExternalTmpPath(hadoopConf: Configuration) : Unit = { - // Attempt to delete the staging directory and the inclusive files. If failed, the files are - // expected to be dropped at the normal termination of VM since deleteOnExit is used. - try { - createdTempDir.foreach { path => - val fs = path.getFileSystem(hadoopConf) - if (fs.delete(path, true)) { - // If we successfully delete the staging directory, remove it from FileSystem's cache. - fs.cancelDeleteOnExit(path) - } - } - } catch { - case NonFatal(e) => - val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging") - logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e) - } - } - // Mostly copied from Context.java#getExternalTmpPath of Hive 0.13 - private def oldVersionExternalTempPath( - path: Path, - hadoopConf: Configuration, - scratchDir: String): Path = { + private def oldVersionExternalTempPath(path: Path, scratchDir: String): Path = { val extURI: URI = path.toUri val scratchPath = new Path(scratchDir, executionId) var dirPath = new Path( @@ -169,56 +84,34 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand with V1WritesHiveU extURI.getAuthority, scratchPath.toUri.getPath + "-" + TaskRunner.getTaskRunnerID()) - try { - val fs: FileSystem = dirPath.getFileSystem(hadoopConf) - dirPath = new Path(fs.makeQualified(dirPath).toString()) - - if (!FileUtils.mkdir(fs, dirPath, true, hadoopConf)) { - throw new IllegalStateException("Cannot create staging directory: " + dirPath.toString) - } - createdTempDir = Some(dirPath) - fs.deleteOnExit(dirPath) - } catch { - case e: IOException => - throw QueryExecutionErrors.cannotCreateStagingDirError(dirPath.toString, e) - } + val fs = dirPath.getFileSystem(hadoopConf) + dirPath = new Path(fs.makeQualified(dirPath).toString()) + stagingDirForCreating = Some(dirPath) dirPath } // Mostly copied from Context.java#getExternalTmpPath of Hive 1.2 - private def newVersionExternalTempPath( - path: Path, - hadoopConf: Configuration, - stagingDir: String): Path = { + private def newVersionExternalTempPath(path: Path, stagingDir: String): Path = { val extURI: URI = path.toUri if (extURI.getScheme == "viewfs") { - getExtTmpPathRelTo(path, hadoopConf, stagingDir) + val qualifiedStagingDir = getStagingDir(path, stagingDir) + stagingDirForCreating = Some(qualifiedStagingDir) + // Hive uses 10000 + new Path(qualifiedStagingDir, "-ext-10000") } else { - new Path(getExternalScratchDir(extURI, hadoopConf, stagingDir), "-ext-10000") + val qualifiedStagingDir = getExternalScratchDir(extURI, stagingDir) + stagingDirForCreating = Some(qualifiedStagingDir) + new Path(qualifiedStagingDir, "-ext-10000") } } - private def getExtTmpPathRelTo( - path: Path, - hadoopConf: Configuration, - stagingDir: String): Path = { - new Path(getStagingDir(path, hadoopConf, stagingDir), "-ext-10000") // Hive uses 10000 - } - - private def getExternalScratchDir( - extURI: URI, - hadoopConf: Configuration, - stagingDir: String): Path = { + private def getExternalScratchDir(extURI: URI, stagingDir: String): Path = { getStagingDir( new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath), - hadoopConf, stagingDir) } - private[hive] def getStagingDir( - inputPath: Path, - hadoopConf: Configuration, - stagingDir: String): Path = { + private[hive] def getStagingDir(inputPath: Path, stagingDir: String): Path = { val inputPathName: String = inputPath.toString val fs: FileSystem = inputPath.getFileSystem(hadoopConf) var stagingPathName: String = @@ -243,17 +136,6 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand with V1WritesHiveU fs.makeQualified( new Path(stagingPathName + "_" + executionId + "-" + TaskRunner.getTaskRunnerID)) logDebug("Created staging dir = " + dir + " for path = " + inputPath) - try { - if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) { - throw new IllegalStateException("Cannot create staging directory '" + dir.toString + "'") - } - createdTempDir = Some(dir) - fs.deleteOnExit(dir) - } catch { - case e: IOException => - throw QueryExecutionErrors.cannotCreateStagingDirError( - s"'${dir.toString}': ${e.getMessage}", e) - } dir } @@ -269,5 +151,43 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand with V1WritesHiveU val format = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss_SSS", Locale.US) "hive_" + format.format(new Date) + "_" + Math.abs(rand.nextLong) } -} + def deleteTmpPath() : Unit = { + // Attempt to delete the staging directory and the inclusive files. If failed, the files are + // expected to be dropped at the normal termination of VM since deleteOnExit is used. + try { + stagingDirForCreating.foreach { stagingDir => + val fs = stagingDir.getFileSystem(hadoopConf) + if (fs.delete(stagingDir, true)) { + // If we successfully delete the staging directory, remove it from FileSystem's cache. + fs.cancelDeleteOnExit(stagingDir) + } + } + } catch { + case NonFatal(e) => + val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging") + logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e) + } + } + + def createTmpPath(): Unit = { + try { + stagingDirForCreating.foreach { stagingDir => + val fs: FileSystem = stagingDir.getFileSystem(hadoopConf) + if (!FileUtils.mkdir(fs, stagingDir, true, hadoopConf)) { + throw new IllegalStateException( + "Cannot create staging directory '" + stagingDir.toString + "'") + } + fs.deleteOnExit(stagingDir) + } + } catch { + case e: IOException => + throw QueryExecutionErrors.cannotCreateStagingDirError( + s"'${stagingDirForCreating.toString}': ${e.getMessage}", e) + } + } + + def deleteIfNotStagingDir(path: Path, fs: FileSystem): Unit = { + if (Option(path) != stagingDirForCreating) fs.delete(path, true) + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala index fb15432013d..bd6278473a7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala @@ -55,7 +55,7 @@ case class InsertIntoHiveDirCommand( storage: CatalogStorageFormat, query: LogicalPlan, overwrite: Boolean, - outputColumnNames: Seq[String]) extends SaveAsHiveFile { + outputColumnNames: Seq[String]) extends SaveAsHiveFile with V1WritesHiveUtils { override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = { assert(storage.locationUri.nonEmpty) @@ -99,21 +99,24 @@ case class InsertIntoHiveDirCommand( } // The temporary path must be a HDFS path, not a local path. - val tmpPath = getExternalTmpPath(sparkSession, hadoopConf, qualifiedPath) + val hiveTempPath = new HiveTempPath(sparkSession, hadoopConf, qualifiedPath) + val tmpPath = hiveTempPath.externalTempPath val fileSinkConf = new org.apache.spark.sql.hive.HiveShim.ShimFileSinkDesc( tmpPath.toString, tableDesc, false) + setupHadoopConfForCompression(fileSinkConf, hadoopConf, sparkSession) + hiveTempPath.createTmpPath() try { saveAsHiveFile( sparkSession = sparkSession, plan = child, hadoopConf = hadoopConf, - fileSinkConf = fileSinkConf, + fileFormat = new HiveFileFormat(fileSinkConf), outputLocation = tmpPath.toString) if (overwrite && fs.exists(writeToPath)) { fs.listStatus(writeToPath).foreach { existFile => - if (Option(existFile.getPath) != createdTempDir) fs.delete(existFile.getPath, true) + hiveTempPath.deleteIfNotStagingDir(existFile.getPath, fs) } } @@ -131,7 +134,7 @@ case class InsertIntoHiveDirCommand( throw new SparkException( "Failed inserting overwrite directory " + storage.locationUri.get, e) } finally { - deleteExternalTmpPath(hadoopConf) + hiveTempPath.deleteTmpPath() } Seq.empty[Row] diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 8c3aa0a80c1..6785b5d96d9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -22,15 +22,17 @@ import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.ql.plan.TableDesc +import org.apache.spark.internal.Logging import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.catalog._ +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command.CommandUtils -import org.apache.spark.sql.execution.datasources.{V1WriteCommand, V1WritesUtils} +import org.apache.spark.sql.execution.datasources.{FileFormat, V1WriteCommand, V1WritesUtils} import org.apache.spark.sql.hive.HiveExternalCatalog import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} import org.apache.spark.sql.hive.client.HiveClientImpl @@ -73,16 +75,20 @@ case class InsertIntoHiveTable( query: LogicalPlan, overwrite: Boolean, ifPartitionNotExists: Boolean, - outputColumnNames: Seq[String] + outputColumnNames: Seq[String], + partitionColumns: Seq[Attribute], + bucketSpec: Option[BucketSpec], + options: Map[String, String], + fileFormat: FileFormat, + @transient hiveTmpPath: HiveTempPath ) extends SaveAsHiveFile with V1WriteCommand with V1WritesHiveUtils { - override lazy val partitionColumns: Seq[Attribute] = { - getDynamicPartitionColumns(table, partition, query) + override def staticPartitions: TablePartitionSpec = { + partition.filter(_._2.nonEmpty).map { case (k, v) => k -> v.get } } override def requiredOrdering: Seq[SortOrder] = { - val options = getOptionsWithHiveBucketWrite(table.bucketSpec) - V1WritesUtils.getSortOrder(outputColumns, partitionColumns, table.bucketSpec, options) + V1WritesUtils.getSortOrder(outputColumns, partitionColumns, bucketSpec, options) } /** @@ -92,29 +98,16 @@ case class InsertIntoHiveTable( */ override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = { val externalCatalog = sparkSession.sharedState.externalCatalog - val hadoopConf = sparkSession.sessionState.newHadoopConf() - - val hiveQlTable = HiveClientImpl.toHiveTable(table) - // Have to pass the TableDesc object to RDD.mapPartitions and then instantiate new serializer - // instances within the closure, since Serializer is not serializable while TableDesc is. - val tableDesc = new TableDesc( - hiveQlTable.getInputFormatClass, - // The class of table should be org.apache.hadoop.hive.ql.metadata.Table because - // getOutputFormatClass will use HiveFileFormatUtils.getOutputFormatSubstitute to - // substitute some output formats, e.g. substituting SequenceFileOutputFormat to - // HiveSequenceFileOutputFormat. - hiveQlTable.getOutputFormatClass, - hiveQlTable.getMetadata - ) - val tableLocation = hiveQlTable.getDataLocation - val tmpLocation = getExternalTmpPath(sparkSession, hadoopConf, tableLocation) + val hadoopConf = hiveTmpPath.hadoopConf + val tmpLocation = hiveTmpPath.externalTempPath + hiveTmpPath.createTmpPath() try { - processInsert(sparkSession, externalCatalog, hadoopConf, tableDesc, tmpLocation, child) + processInsert(sparkSession, externalCatalog, hadoopConf, tmpLocation, child) } finally { // Attempt to delete the staging directory and the inclusive files. If failed, the files are // expected to be dropped at the normal termination of VM since deleteOnExit is used. - deleteExternalTmpPath(hadoopConf) + hiveTmpPath.deleteTmpPath() } // un-cache this table. @@ -134,23 +127,21 @@ case class InsertIntoHiveTable( sparkSession: SparkSession, externalCatalog: ExternalCatalog, hadoopConf: Configuration, - tableDesc: TableDesc, tmpLocation: Path, child: SparkPlan): Unit = { - val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false) val numDynamicPartitions = partition.values.count(_.isEmpty) val partitionSpec = getPartitionSpec(partition) - val partitionAttributes = getDynamicPartitionColumns(table, partition, query) val writtenParts = saveAsHiveFile( sparkSession = sparkSession, plan = child, hadoopConf = hadoopConf, - fileSinkConf = fileSinkConf, + fileFormat = fileFormat, outputLocation = tmpLocation.toString, - partitionAttributes = partitionAttributes, - bucketSpec = table.bucketSpec) + partitionAttributes = partitionColumns, + bucketSpec = bucketSpec, + options = options) if (partition.nonEmpty) { if (numDynamicPartitions > 0) { @@ -294,3 +285,40 @@ case class InsertIntoHiveTable( override protected def withNewChildInternal(newChild: LogicalPlan): InsertIntoHiveTable = copy(query = newChild) } + +object InsertIntoHiveTable extends V1WritesHiveUtils with Logging { + def apply( + table: CatalogTable, + partition: Map[String, Option[String]], + query: LogicalPlan, + overwrite: Boolean, + ifPartitionNotExists: Boolean, + outputColumnNames: Seq[String]): InsertIntoHiveTable = { + val sparkSession = SparkSession.getActiveSession.orNull + val hiveQlTable = HiveClientImpl.toHiveTable(table) + // Have to pass the TableDesc object to RDD.mapPartitions and then instantiate new serializer + // instances within the closure, since Serializer is not serializable while TableDesc is. + val tableDesc = new TableDesc( + hiveQlTable.getInputFormatClass, + // The class of table should be org.apache.hadoop.hive.ql.metadata.Table because + // getOutputFormatClass will use HiveFileFormatUtils.getOutputFormatSubstitute to + // substitute some output formats, e.g. substituting SequenceFileOutputFormat to + // HiveSequenceFileOutputFormat. + hiveQlTable.getOutputFormatClass, + hiveQlTable.getMetadata + ) + val hadoopConf = sparkSession.sessionState.newHadoopConf() + val tableLocation = hiveQlTable.getDataLocation + val hiveTempPath = new HiveTempPath(sparkSession, hadoopConf, tableLocation) + val fileSinkConf = new FileSinkDesc(hiveTempPath.externalTempPath.toString, tableDesc, false) + setupHadoopConfForCompression(fileSinkConf, hadoopConf, sparkSession) + val fileFormat: FileFormat = new HiveFileFormat(fileSinkConf) + + val partitionColumns = getDynamicPartitionColumns(table, partition, query) + val bucketSpec = table.bucketSpec + val options = getOptionsWithHiveBucketWrite(bucketSpec) + + new InsertIntoHiveTable(table, partition, query, overwrite, ifPartitionNotExists, + outputColumnNames, partitionColumns, bucketSpec, options, fileFormat, hiveTempPath) + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala index 799cea42e1e..47d402c2e8b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala @@ -17,81 +17,40 @@ package org.apache.spark.sql.hive.execution -import java.io.IOException -import java.net.URI -import java.text.SimpleDateFormat -import java.util.{Date, Locale, Random} - -import scala.util.control.NonFatal - import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.hadoop.hive.common.FileUtils -import org.apache.hadoop.hive.ql.exec.TaskRunner import org.apache.spark.internal.io.FileCommitProtocol import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command.DataWritingCommand -import org.apache.spark.sql.execution.datasources.FileFormatWriter -import org.apache.spark.sql.hive.HiveExternalCatalog -import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} -import org.apache.spark.sql.hive.client.HiveVersion +import org.apache.spark.sql.execution.datasources.{FileFormat, FileFormatWriter} // Base trait from which all hive insert statement physical execution extends. -private[hive] trait SaveAsHiveFile extends DataWritingCommand with V1WritesHiveUtils { - - var createdTempDir: Option[Path] = None +private[hive] trait SaveAsHiveFile extends DataWritingCommand { protected def saveAsHiveFile( sparkSession: SparkSession, plan: SparkPlan, hadoopConf: Configuration, - fileSinkConf: FileSinkDesc, + fileFormat: FileFormat, outputLocation: String, customPartitionLocations: Map[TablePartitionSpec, String] = Map.empty, partitionAttributes: Seq[Attribute] = Nil, - bucketSpec: Option[BucketSpec] = None): Set[String] = { - - val isCompressed = - fileSinkConf.getTableInfo.getOutputFileFormatClassName.toLowerCase(Locale.ROOT) match { - case formatName if formatName.endsWith("orcoutputformat") => - // For ORC,"mapreduce.output.fileoutputformat.compress", - // "mapreduce.output.fileoutputformat.compress.codec", and - // "mapreduce.output.fileoutputformat.compress.type" - // have no impact because it uses table properties to store compression information. - false - case _ => hadoopConf.get("hive.exec.compress.output", "false").toBoolean - } - - if (isCompressed) { - hadoopConf.set("mapreduce.output.fileoutputformat.compress", "true") - fileSinkConf.setCompressed(true) - fileSinkConf.setCompressCodec(hadoopConf - .get("mapreduce.output.fileoutputformat.compress.codec")) - fileSinkConf.setCompressType(hadoopConf - .get("mapreduce.output.fileoutputformat.compress.type")) - } else { - // Set compression by priority - HiveOptions.getHiveWriteCompression(fileSinkConf.getTableInfo, sparkSession.sessionState.conf) - .foreach { case (compression, codec) => hadoopConf.set(compression, codec) } - } + bucketSpec: Option[BucketSpec] = None, + options: Map[String, String] = Map.empty): Set[String] = { val committer = FileCommitProtocol.instantiate( sparkSession.sessionState.conf.fileCommitProtocolClass, jobId = java.util.UUID.randomUUID().toString, outputPath = outputLocation) - val options = getOptionsWithHiveBucketWrite(bucketSpec) - FileFormatWriter.write( sparkSession = sparkSession, plan = plan, - fileFormat = new HiveFileFormat(fileSinkConf), + fileFormat = fileFormat, committer = committer, outputSpec = FileFormatWriter.OutputSpec(outputLocation, customPartitionLocations, outputColumns), @@ -101,173 +60,5 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand with V1WritesHiveU statsTrackers = Seq(basicWriteJobStatsTracker(hadoopConf)), options = options) } - - protected def getExternalTmpPath( - sparkSession: SparkSession, - hadoopConf: Configuration, - path: Path): Path = { - import org.apache.spark.sql.hive.client.hive._ - - // Before Hive 1.1, when inserting into a table, Hive will create the staging directory under - // a common scratch directory. After the writing is finished, Hive will simply empty the table - // directory and move the staging directory to it. - // After Hive 1.1, Hive will create the staging directory under the table directory, and when - // moving staging directory to table directory, Hive will still empty the table directory, but - // will exclude the staging directory there. - // We have to follow the Hive behavior here, to avoid troubles. For example, if we create - // staging directory under the table director for Hive prior to 1.1, the staging directory will - // be removed by Hive when Hive is trying to empty the table directory. - val hiveVersionsUsingOldExternalTempPath: Set[HiveVersion] = Set(v12, v13, v14, v1_0) - val hiveVersionsUsingNewExternalTempPath: Set[HiveVersion] = - Set(v1_1, v1_2, v2_0, v2_1, v2_2, v2_3, v3_0, v3_1) - - // Ensure all the supported versions are considered here. - assert(hiveVersionsUsingNewExternalTempPath ++ hiveVersionsUsingOldExternalTempPath == - allSupportedHiveVersions) - - val externalCatalog = sparkSession.sharedState.externalCatalog - val hiveVersion = externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client.version - val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging") - val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive") - - if (hiveVersionsUsingOldExternalTempPath.contains(hiveVersion)) { - oldVersionExternalTempPath(path, hadoopConf, scratchDir) - } else if (hiveVersionsUsingNewExternalTempPath.contains(hiveVersion)) { - newVersionExternalTempPath(path, hadoopConf, stagingDir) - } else { - throw new IllegalStateException("Unsupported hive version: " + hiveVersion.fullVersion) - } - } - - protected def deleteExternalTmpPath(hadoopConf: Configuration) : Unit = { - // Attempt to delete the staging directory and the inclusive files. If failed, the files are - // expected to be dropped at the normal termination of VM since deleteOnExit is used. - try { - createdTempDir.foreach { path => - val fs = path.getFileSystem(hadoopConf) - if (fs.delete(path, true)) { - // If we successfully delete the staging directory, remove it from FileSystem's cache. - fs.cancelDeleteOnExit(path) - } - } - } catch { - case NonFatal(e) => - val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging") - logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e) - } - } - - // Mostly copied from Context.java#getExternalTmpPath of Hive 0.13 - private def oldVersionExternalTempPath( - path: Path, - hadoopConf: Configuration, - scratchDir: String): Path = { - val extURI: URI = path.toUri - val scratchPath = new Path(scratchDir, executionId) - var dirPath = new Path( - extURI.getScheme, - extURI.getAuthority, - scratchPath.toUri.getPath + "-" + TaskRunner.getTaskRunnerID()) - - try { - val fs: FileSystem = dirPath.getFileSystem(hadoopConf) - dirPath = new Path(fs.makeQualified(dirPath).toString()) - - if (!FileUtils.mkdir(fs, dirPath, true, hadoopConf)) { - throw new IllegalStateException("Cannot create staging directory: " + dirPath.toString) - } - createdTempDir = Some(dirPath) - fs.deleteOnExit(dirPath) - } catch { - case e: IOException => - throw QueryExecutionErrors.cannotCreateStagingDirError(dirPath.toString, e) - } - dirPath - } - - // Mostly copied from Context.java#getExternalTmpPath of Hive 1.2 - private def newVersionExternalTempPath( - path: Path, - hadoopConf: Configuration, - stagingDir: String): Path = { - val extURI: URI = path.toUri - if (extURI.getScheme == "viewfs") { - getExtTmpPathRelTo(path, hadoopConf, stagingDir) - } else { - new Path(getExternalScratchDir(extURI, hadoopConf, stagingDir), "-ext-10000") - } - } - - private def getExtTmpPathRelTo( - path: Path, - hadoopConf: Configuration, - stagingDir: String): Path = { - new Path(getStagingDir(path, hadoopConf, stagingDir), "-ext-10000") // Hive uses 10000 - } - - private def getExternalScratchDir( - extURI: URI, - hadoopConf: Configuration, - stagingDir: String): Path = { - getStagingDir( - new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath), - hadoopConf, - stagingDir) - } - - private[hive] def getStagingDir( - inputPath: Path, - hadoopConf: Configuration, - stagingDir: String): Path = { - val inputPathName: String = inputPath.toString - val fs: FileSystem = inputPath.getFileSystem(hadoopConf) - var stagingPathName: String = - if (inputPathName.indexOf(stagingDir) == -1) { - new Path(inputPathName, stagingDir).toString - } else { - inputPathName.substring(0, inputPathName.indexOf(stagingDir) + stagingDir.length) - } - - // SPARK-20594: This is a walk-around fix to resolve a Hive bug. Hive requires that the - // staging directory needs to avoid being deleted when users set hive.exec.stagingdir - // under the table directory. - if (isSubDir(new Path(stagingPathName), inputPath, fs) && - !stagingPathName.stripPrefix(inputPathName).stripPrefix("/").startsWith(".")) { - logDebug(s"The staging dir '$stagingPathName' should be a child directory starts " + - "with '.' to avoid being deleted if we set hive.exec.stagingdir under the table " + - "directory.") - stagingPathName = new Path(inputPathName, ".hive-staging").toString - } - - val dir: Path = - fs.makeQualified( - new Path(stagingPathName + "_" + executionId + "-" + TaskRunner.getTaskRunnerID)) - logDebug("Created staging dir = " + dir + " for path = " + inputPath) - try { - if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) { - throw new IllegalStateException("Cannot create staging directory '" + dir.toString + "'") - } - createdTempDir = Some(dir) - fs.deleteOnExit(dir) - } catch { - case e: IOException => - throw QueryExecutionErrors.cannotCreateStagingDirError( - s"'${dir.toString}': ${e.getMessage}", e) - } - dir - } - - // HIVE-14259 removed FileUtils.isSubDir(). Adapted it from Hive 1.2's FileUtils.isSubDir(). - private def isSubDir(p1: Path, p2: Path, fs: FileSystem): Boolean = { - val path1 = fs.makeQualified(p1).toString + Path.SEPARATOR - val path2 = fs.makeQualified(p2).toString + Path.SEPARATOR - path1.startsWith(path2) - } - - private def executionId: String = { - val rand: Random = new Random - val format = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss_SSS", Locale.US) - "hive_" + format.format(new Date) + "_" + Math.abs(rand.nextLong) - } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/V1WritesHiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/V1WritesHiveUtils.scala index 752753f334a..6421dd184ae 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/V1WritesHiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/V1WritesHiveUtils.scala @@ -19,8 +19,9 @@ package org.apache.spark.sql.hive.execution import java.util.Locale +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hive.ql.ErrorMsg -import org.apache.hadoop.hive.ql.plan.TableDesc +import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc} import org.apache.spark.SparkException import org.apache.spark.sql.{AnalysisException, SparkSession} @@ -32,6 +33,7 @@ import org.apache.spark.sql.execution.datasources.BucketingUtils import org.apache.spark.sql.hive.client.HiveClientImpl trait V1WritesHiveUtils { + def getPartitionSpec(partition: Map[String, Option[String]]): Map[String, String] = { partition.map { case (key, Some(null)) => key -> ExternalCatalogUtils.DEFAULT_PARTITION_NAME @@ -105,4 +107,33 @@ trait V1WritesHiveUtils { .map(_ => Map(BucketingUtils.optionForHiveCompatibleBucketWrite -> "true")) .getOrElse(Map.empty) } + + def setupHadoopConfForCompression( + fileSinkConf: FileSinkDesc, + hadoopConf: Configuration, + sparkSession: SparkSession): Unit = { + val isCompressed = + fileSinkConf.getTableInfo.getOutputFileFormatClassName.toLowerCase(Locale.ROOT) match { + case formatName if formatName.endsWith("orcoutputformat") => + // For ORC,"mapreduce.output.fileoutputformat.compress", + // "mapreduce.output.fileoutputformat.compress.codec", and + // "mapreduce.output.fileoutputformat.compress.type" + // have no impact because it uses table properties to store compression information. + false + case _ => hadoopConf.get("hive.exec.compress.output", "false").toBoolean + } + + if (isCompressed) { + hadoopConf.set("mapreduce.output.fileoutputformat.compress", "true") + fileSinkConf.setCompressed(true) + fileSinkConf.setCompressCodec(hadoopConf + .get("mapreduce.output.fileoutputformat.compress.codec")) + fileSinkConf.setCompressType(hadoopConf + .get("mapreduce.output.fileoutputformat.compress.type")) + } else { + // Set compression by priority + HiveOptions.getHiveWriteCompression(fileSinkConf.getTableInfo, sparkSession.sessionState.conf) + .foreach { case (compression, codec) => hadoopConf.set(compression, codec) } + } + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala index f62d941746b..13f2a865936 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala @@ -27,7 +27,7 @@ import org.scalatest.{BeforeAndAfter, PrivateMethodTester} import org.apache.spark.SparkException import org.apache.spark.sql.{QueryTest, _} import org.apache.spark.sql.catalyst.parser.ParseException -import org.apache.spark.sql.hive.execution.InsertIntoHiveTable +import org.apache.spark.sql.hive.execution.HiveTempPath import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestUtils @@ -541,25 +541,24 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter val conf = spark.sessionState.newHadoopConf() val inputPath = new Path("/tmp/b/c") var stagingDir = "tmp/b" - val saveHiveFile = InsertIntoHiveTable(null, Map.empty, null, false, false, null) - val getStagingDir = PrivateMethod[Path](Symbol("getStagingDir")) - var path = saveHiveFile invokePrivate getStagingDir(inputPath, conf, stagingDir) + val hiveTempPath = new HiveTempPath(null, conf, null) + var path = hiveTempPath.getStagingDir(inputPath, stagingDir) assert(path.toString.indexOf("/tmp/b_hive_") != -1) stagingDir = "tmp/b/c" - path = saveHiveFile invokePrivate getStagingDir(inputPath, conf, stagingDir) + path = hiveTempPath.getStagingDir(inputPath, stagingDir) assert(path.toString.indexOf("/tmp/b/c/.hive-staging_hive_") != -1) stagingDir = "d/e" - path = saveHiveFile invokePrivate getStagingDir(inputPath, conf, stagingDir) + path = hiveTempPath.getStagingDir(inputPath, stagingDir) assert(path.toString.indexOf("/tmp/b/c/.hive-staging_hive_") != -1) stagingDir = ".d/e" - path = saveHiveFile invokePrivate getStagingDir(inputPath, conf, stagingDir) + path = hiveTempPath.getStagingDir(inputPath, stagingDir) assert(path.toString.indexOf("/tmp/b/c/.d/e_hive_") != -1) stagingDir = "/tmp/c/" - path = saveHiveFile invokePrivate getStagingDir(inputPath, conf, stagingDir) + path = hiveTempPath.getStagingDir(inputPath, stagingDir) assert(path.toString.indexOf("/tmp/c_hive_") != -1) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org