Repository: spark Updated Branches: refs/heads/master 67ea11ea0 -> d4f0b1d2c
[SPARK-22834][SQL] Make insertion commands have real children to fix UI issues ## What changes were proposed in this pull request? With #19474, children of insertion commands are missing in UI. To fix it: 1. Create a new physical plan `DataWritingCommandExec` to exec `DataWritingCommand` with children. So that the other commands won't be affected. 2. On creation of `DataWritingCommand`, a new field `allColumns` must be specified, which is the output of analyzed plan. 3. In `FileFormatWriter`, the output schema will use `allColumns` instead of the output of optimized plan. Before code changes: ![2017-12-19 10 27 10](https://user-images.githubusercontent.com/1097932/34161850-d2fd0acc-e50c-11e7-898a-177154fe7d8e.png) After code changes: ![2017-12-19 10 27 04](https://user-images.githubusercontent.com/1097932/34161865-de23de26-e50c-11e7-9131-0c32f7b7b749.png) ## How was this patch tested? Unit test Author: Wang Gengliang <ltn...@gmail.com> Closes #20020 from gengliangwang/insert. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d4f0b1d2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d4f0b1d2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d4f0b1d2 Branch: refs/heads/master Commit: d4f0b1d2c546fd0a3141dfb31aeccad1b9788bf5 Parents: 67ea11e Author: Wang Gengliang <ltn...@gmail.com> Authored: Fri Dec 29 15:28:33 2017 +0800 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Fri Dec 29 15:28:33 2017 +0800 ---------------------------------------------------------------------- .../org/apache/spark/sql/DataFrameWriter.scala | 4 +-- .../spark/sql/execution/SparkStrategies.scala | 1 + .../execution/command/DataWritingCommand.scala | 28 ++++++++------- .../spark/sql/execution/command/commands.scala | 38 +++++++++++++++++++- .../sql/execution/datasources/DataSource.scala | 16 ++------- .../datasources/DataSourceStrategy.scala | 3 +- .../datasources/FileFormatWriter.scala | 24 ++++++------- .../InsertIntoHadoopFsRelationCommand.scala | 10 +++--- .../execution/streaming/FileStreamSink.scala | 5 +-- .../apache/spark/sql/hive/HiveStrategies.scala | 5 +-- .../execution/InsertIntoHiveDirCommand.scala | 14 +++++--- .../hive/execution/InsertIntoHiveTable.scala | 16 +++++---- .../sql/hive/execution/SaveAsHiveFile.scala | 10 +++--- .../spark/sql/hive/execution/HiveDDLSuite.scala | 13 +++++++ 14 files changed, 123 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/d4f0b1d2/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index bd216ff..3304f36 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -26,7 +26,7 @@ import org.apache.spark.annotation.InterfaceStability import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedRelation} import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan} +import org.apache.spark.sql.catalyst.plans.logical.{AnalysisBarrier, InsertIntoTable, LogicalPlan} import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation} @@ -264,7 +264,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { sparkSession = df.sparkSession, className = source, partitionColumns = partitioningColumns.getOrElse(Nil), - options = extraOptions.toMap).planForWriting(mode, df.logicalPlan) + options = extraOptions.toMap).planForWriting(mode, AnalysisBarrier(df.logicalPlan)) } } } http://git-wip-us.apache.org/repos/asf/spark/blob/d4f0b1d2/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 8c6c324..6b3f301 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -403,6 +403,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { // Can we automate these 'pass through' operations? object BasicOperators extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + case d: DataWritingCommand => DataWritingCommandExec(d, planLater(d.query)) :: Nil case r: RunnableCommand => ExecutedCommandExec(r) :: Nil case MemoryPlan(sink, output) => http://git-wip-us.apache.org/repos/asf/spark/blob/d4f0b1d2/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala index 2cf0698..e56f810 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala @@ -20,30 +20,32 @@ package org.apache.spark.sql.execution.command import org.apache.hadoop.conf.Configuration import org.apache.spark.SparkContext -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan} +import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker +import org.apache.spark.sql.execution.datasources.FileFormatWriter import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.util.SerializableConfiguration - /** - * A special `RunnableCommand` which writes data out and updates metrics. + * A special `Command` which writes data out and updates metrics. */ -trait DataWritingCommand extends RunnableCommand { - +trait DataWritingCommand extends Command { /** * The input query plan that produces the data to be written. + * IMPORTANT: the input query plan MUST be analyzed, so that we can carry its output columns + * to [[FileFormatWriter]]. */ def query: LogicalPlan - // We make the input `query` an inner child instead of a child in order to hide it from the - // optimizer. This is because optimizer may not preserve the output schema names' case, and we - // have to keep the original analyzed plan here so that we can pass the corrected schema to the - // writer. The schema of analyzed plan is what user expects(or specifies), so we should respect - // it when writing. - override protected def innerChildren: Seq[LogicalPlan] = query :: Nil + override final def children: Seq[LogicalPlan] = query :: Nil - override lazy val metrics: Map[String, SQLMetric] = { + // Output columns of the analyzed input query plan + def outputColumns: Seq[Attribute] + + lazy val metrics: Map[String, SQLMetric] = { val sparkContext = SparkContext.getActive.get Map( "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of written files"), @@ -57,4 +59,6 @@ trait DataWritingCommand extends RunnableCommand { val serializableHadoopConf = new SerializableConfiguration(hadoopConf) new BasicWriteJobStatsTracker(serializableHadoopConf, metrics) } + + def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] } http://git-wip-us.apache.org/repos/asf/spark/blob/d4f0b1d2/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index e28b5eb2..2cc0e38 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan} -import org.apache.spark.sql.execution.LeafExecNode +import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan} import org.apache.spark.sql.execution.debug._ import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.execution.streaming.{IncrementalExecution, OffsetSeqMetadata} @@ -88,6 +88,42 @@ case class ExecutedCommandExec(cmd: RunnableCommand) extends LeafExecNode { } /** + * A physical operator that executes the run method of a `DataWritingCommand` and + * saves the result to prevent multiple executions. + * + * @param cmd the `DataWritingCommand` this operator will run. + * @param child the physical plan child ran by the `DataWritingCommand`. + */ +case class DataWritingCommandExec(cmd: DataWritingCommand, child: SparkPlan) + extends SparkPlan { + + override lazy val metrics: Map[String, SQLMetric] = cmd.metrics + + protected[sql] lazy val sideEffectResult: Seq[InternalRow] = { + val converter = CatalystTypeConverters.createToCatalystConverter(schema) + val rows = cmd.run(sqlContext.sparkSession, child) + + rows.map(converter(_).asInstanceOf[InternalRow]) + } + + override def children: Seq[SparkPlan] = child :: Nil + + override def output: Seq[Attribute] = cmd.output + + override def nodeName: String = "Execute " + cmd.nodeName + + override def executeCollect(): Array[InternalRow] = sideEffectResult.toArray + + override def executeToIterator: Iterator[InternalRow] = sideEffectResult.toIterator + + override def executeTake(limit: Int): Array[InternalRow] = sideEffectResult.take(limit).toArray + + protected override def doExecute(): RDD[InternalRow] = { + sqlContext.sparkContext.parallelize(sideEffectResult, 1) + } +} + +/** * An explain command for users to see how a command will be executed. * * Note that this command takes in a logical plan, runs the optimizer on the logical plan http://git-wip-us.apache.org/repos/asf/spark/blob/d4f0b1d2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index b676672..25e1210 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -456,17 +456,6 @@ case class DataSource( val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis PartitioningUtils.validatePartitionColumn(data.schema, partitionColumns, caseSensitive) - - // SPARK-17230: Resolve the partition columns so InsertIntoHadoopFsRelationCommand does - // not need to have the query as child, to avoid to analyze an optimized query, - // because InsertIntoHadoopFsRelationCommand will be optimized first. - val partitionAttributes = partitionColumns.map { name => - data.output.find(a => equality(a.name, name)).getOrElse { - throw new AnalysisException( - s"Unable to resolve $name given [${data.output.map(_.name).mkString(", ")}]") - } - } - val fileIndex = catalogTable.map(_.identifier).map { tableIdent => sparkSession.table(tableIdent).queryExecution.analyzed.collect { case LogicalRelation(t: HadoopFsRelation, _, _, _) => t.location @@ -479,14 +468,15 @@ case class DataSource( outputPath = outputPath, staticPartitions = Map.empty, ifPartitionNotExists = false, - partitionColumns = partitionAttributes, + partitionColumns = partitionColumns.map(UnresolvedAttribute.quoted), bucketSpec = bucketSpec, fileFormat = format, options = options, query = data, mode = mode, catalogTable = catalogTable, - fileIndex = fileIndex) + fileIndex = fileIndex, + outputColumns = data.output) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/d4f0b1d2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 400f2e0..d94c5bb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -208,7 +208,8 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast actualQuery, mode, table, - Some(t.location)) + Some(t.location), + actualQuery.output) } } http://git-wip-us.apache.org/repos/asf/spark/blob/d4f0b1d2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 1fac01a..1d80a69 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, _} import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} -import org.apache.spark.sql.execution.{QueryExecution, SortExec, SQLExecution} +import org.apache.spark.sql.execution.{SortExec, SparkPlan, SQLExecution} import org.apache.spark.sql.types.StringType import org.apache.spark.util.{SerializableConfiguration, Utils} @@ -56,7 +56,9 @@ object FileFormatWriter extends Logging { /** Describes how output files should be placed in the filesystem. */ case class OutputSpec( - outputPath: String, customPartitionLocations: Map[TablePartitionSpec, String]) + outputPath: String, + customPartitionLocations: Map[TablePartitionSpec, String], + outputColumns: Seq[Attribute]) /** A shared job description for all the write tasks. */ private class WriteJobDescription( @@ -101,7 +103,7 @@ object FileFormatWriter extends Logging { */ def write( sparkSession: SparkSession, - queryExecution: QueryExecution, + plan: SparkPlan, fileFormat: FileFormat, committer: FileCommitProtocol, outputSpec: OutputSpec, @@ -117,11 +119,8 @@ object FileFormatWriter extends Logging { job.setOutputValueClass(classOf[InternalRow]) FileOutputFormat.setOutputPath(job, new Path(outputSpec.outputPath)) - // Pick the attributes from analyzed plan, as optimizer may not preserve the output schema - // names' case. - val allColumns = queryExecution.analyzed.output val partitionSet = AttributeSet(partitionColumns) - val dataColumns = allColumns.filterNot(partitionSet.contains) + val dataColumns = outputSpec.outputColumns.filterNot(partitionSet.contains) val bucketIdExpression = bucketSpec.map { spec => val bucketColumns = spec.bucketColumnNames.map(c => dataColumns.find(_.name == c).get) @@ -144,7 +143,7 @@ object FileFormatWriter extends Logging { uuid = UUID.randomUUID().toString, serializableHadoopConf = new SerializableConfiguration(job.getConfiguration), outputWriterFactory = outputWriterFactory, - allColumns = allColumns, + allColumns = outputSpec.outputColumns, dataColumns = dataColumns, partitionColumns = partitionColumns, bucketIdExpression = bucketIdExpression, @@ -160,7 +159,7 @@ object FileFormatWriter extends Logging { // We should first sort by partition columns, then bucket id, and finally sorting columns. val requiredOrdering = partitionColumns ++ bucketIdExpression ++ sortColumns // the sort order doesn't matter - val actualOrdering = queryExecution.executedPlan.outputOrdering.map(_.child) + val actualOrdering = plan.outputOrdering.map(_.child) val orderingMatched = if (requiredOrdering.length > actualOrdering.length) { false } else { @@ -178,17 +177,18 @@ object FileFormatWriter extends Logging { try { val rdd = if (orderingMatched) { - queryExecution.toRdd + plan.execute() } else { // SPARK-21165: the `requiredOrdering` is based on the attributes from analyzed plan, and // the physical plan may have different attribute ids due to optimizer removing some // aliases. Here we bind the expression ahead to avoid potential attribute ids mismatch. val orderingExpr = requiredOrdering - .map(SortOrder(_, Ascending)).map(BindReferences.bindReference(_, allColumns)) + .map(SortOrder(_, Ascending)) + .map(BindReferences.bindReference(_, outputSpec.outputColumns)) SortExec( orderingExpr, global = false, - child = queryExecution.executedPlan).execute() + child = plan).execute() } val ret = new Array[WriteTaskResult](rdd.partitions.length) sparkSession.sparkContext.runJob( http://git-wip-us.apache.org/repos/asf/spark/blob/d4f0b1d2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index 675bee8..ad24e28 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogT import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.util.SchemaUtils @@ -52,11 +53,12 @@ case class InsertIntoHadoopFsRelationCommand( query: LogicalPlan, mode: SaveMode, catalogTable: Option[CatalogTable], - fileIndex: Option[FileIndex]) + fileIndex: Option[FileIndex], + outputColumns: Seq[Attribute]) extends DataWritingCommand { import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName - override def run(sparkSession: SparkSession): Seq[Row] = { + override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = { // Most formats don't do well with duplicate columns, so lets not allow that SchemaUtils.checkSchemaColumnNameDuplication( query.schema, @@ -139,11 +141,11 @@ case class InsertIntoHadoopFsRelationCommand( val updatedPartitionPaths = FileFormatWriter.write( sparkSession = sparkSession, - queryExecution = Dataset.ofRows(sparkSession, query).queryExecution, + plan = child, fileFormat = fileFormat, committer = committer, outputSpec = FileFormatWriter.OutputSpec( - qualifiedOutputPath.toString, customPartitionLocations), + qualifiedOutputPath.toString, customPartitionLocations, outputColumns), hadoopConf = hadoopConf, partitionColumns = partitionColumns, bucketSpec = bucketSpec, http://git-wip-us.apache.org/repos/asf/spark/blob/d4f0b1d2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala index 6bd0696..2715fa9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala @@ -118,13 +118,14 @@ class FileStreamSink( throw new RuntimeException(s"Partition column $col not found in schema ${data.schema}") } } + val qe = data.queryExecution FileFormatWriter.write( sparkSession = sparkSession, - queryExecution = data.queryExecution, + plan = qe.executedPlan, fileFormat = fileFormat, committer = committer, - outputSpec = FileFormatWriter.OutputSpec(path, Map.empty), + outputSpec = FileFormatWriter.OutputSpec(path, Map.empty, qe.analyzed.output), hadoopConf = hadoopConf, partitionColumns = partitionColumns, bucketSpec = None, http://git-wip-us.apache.org/repos/asf/spark/blob/d4f0b1d2/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index a7961c7..ab857b9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -148,7 +148,8 @@ object HiveAnalysis extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { case InsertIntoTable(r: HiveTableRelation, partSpec, query, overwrite, ifPartitionNotExists) if DDLUtils.isHiveTable(r.tableMeta) => - InsertIntoHiveTable(r.tableMeta, partSpec, query, overwrite, ifPartitionNotExists) + InsertIntoHiveTable(r.tableMeta, partSpec, query, overwrite, + ifPartitionNotExists, query.output) case CreateTable(tableDesc, mode, None) if DDLUtils.isHiveTable(tableDesc) => DDLUtils.checkDataColNames(tableDesc) @@ -163,7 +164,7 @@ object HiveAnalysis extends Rule[LogicalPlan] { val outputPath = new Path(storage.locationUri.get) if (overwrite) DDLUtils.verifyNotReadPath(child, outputPath) - InsertIntoHiveDirCommand(isLocal, storage, child, overwrite) + InsertIntoHiveDirCommand(isLocal, storage, child, overwrite, child.output) } } http://git-wip-us.apache.org/repos/asf/spark/blob/d4f0b1d2/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala index 1c6f8dd..cebeca0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala @@ -27,10 +27,12 @@ import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe import org.apache.hadoop.mapred._ import org.apache.spark.SparkException -import org.apache.spark.sql.{Dataset, Row, SparkSession} +import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable} +import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.hive.client.HiveClientImpl /** @@ -54,9 +56,10 @@ case class InsertIntoHiveDirCommand( isLocal: Boolean, storage: CatalogStorageFormat, query: LogicalPlan, - overwrite: Boolean) extends SaveAsHiveFile { + overwrite: Boolean, + outputColumns: Seq[Attribute]) extends SaveAsHiveFile { - override def run(sparkSession: SparkSession): Seq[Row] = { + override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = { assert(storage.locationUri.nonEmpty) val hiveTable = HiveClientImpl.toHiveTable(CatalogTable( @@ -98,10 +101,11 @@ case class InsertIntoHiveDirCommand( try { saveAsHiveFile( sparkSession = sparkSession, - queryExecution = Dataset.ofRows(sparkSession, query).queryExecution, + plan = child, hadoopConf = hadoopConf, fileSinkConf = fileSinkConf, - outputLocation = tmpPath.toString) + outputLocation = tmpPath.toString, + allColumns = outputColumns) val fs = writeToPath.getFileSystem(hadoopConf) if (overwrite && fs.exists(writeToPath)) { http://git-wip-us.apache.org/repos/asf/spark/blob/d4f0b1d2/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index b46addb..3ce5b84 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -23,10 +23,11 @@ import org.apache.hadoop.hive.ql.ErrorMsg import org.apache.hadoop.hive.ql.plan.TableDesc import org.apache.spark.SparkException -import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession} +import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.catalog.{CatalogTable, ExternalCatalog} import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command.CommandUtils import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} import org.apache.spark.sql.hive.client.HiveClientImpl @@ -67,14 +68,15 @@ case class InsertIntoHiveTable( partition: Map[String, Option[String]], query: LogicalPlan, overwrite: Boolean, - ifPartitionNotExists: Boolean) extends SaveAsHiveFile { + ifPartitionNotExists: Boolean, + outputColumns: Seq[Attribute]) extends SaveAsHiveFile { /** * Inserts all the rows in the table into Hive. Row objects are properly serialized with the * `org.apache.hadoop.hive.serde2.SerDe` and the * `org.apache.hadoop.mapred.OutputFormat` provided by the table definition. */ - override def run(sparkSession: SparkSession): Seq[Row] = { + override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = { val externalCatalog = sparkSession.sharedState.externalCatalog val hadoopConf = sparkSession.sessionState.newHadoopConf() @@ -94,7 +96,7 @@ case class InsertIntoHiveTable( val tmpLocation = getExternalTmpPath(sparkSession, hadoopConf, tableLocation) try { - processInsert(sparkSession, externalCatalog, hadoopConf, tableDesc, tmpLocation) + processInsert(sparkSession, externalCatalog, hadoopConf, tableDesc, tmpLocation, child) } finally { // Attempt to delete the staging directory and the inclusive files. If failed, the files are // expected to be dropped at the normal termination of VM since deleteOnExit is used. @@ -119,7 +121,8 @@ case class InsertIntoHiveTable( externalCatalog: ExternalCatalog, hadoopConf: Configuration, tableDesc: TableDesc, - tmpLocation: Path): Unit = { + tmpLocation: Path, + child: SparkPlan): Unit = { val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false) val numDynamicPartitions = partition.values.count(_.isEmpty) @@ -191,10 +194,11 @@ case class InsertIntoHiveTable( saveAsHiveFile( sparkSession = sparkSession, - queryExecution = Dataset.ofRows(sparkSession, query).queryExecution, + plan = child, hadoopConf = hadoopConf, fileSinkConf = fileSinkConf, outputLocation = tmpLocation.toString, + allColumns = outputColumns, partitionAttributes = partitionAttributes) if (partition.nonEmpty) { http://git-wip-us.apache.org/repos/asf/spark/blob/d4f0b1d2/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala index 6365759..9a6607f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala @@ -33,7 +33,7 @@ import org.apache.spark.internal.io.FileCommitProtocol import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command.DataWritingCommand import org.apache.spark.sql.execution.datasources.FileFormatWriter import org.apache.spark.sql.hive.HiveExternalCatalog @@ -47,10 +47,11 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand { protected def saveAsHiveFile( sparkSession: SparkSession, - queryExecution: QueryExecution, + plan: SparkPlan, hadoopConf: Configuration, fileSinkConf: FileSinkDesc, outputLocation: String, + allColumns: Seq[Attribute], customPartitionLocations: Map[TablePartitionSpec, String] = Map.empty, partitionAttributes: Seq[Attribute] = Nil): Set[String] = { @@ -75,10 +76,11 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand { FileFormatWriter.write( sparkSession = sparkSession, - queryExecution = queryExecution, + plan = plan, fileFormat = new HiveFileFormat(fileSinkConf), committer = committer, - outputSpec = FileFormatWriter.OutputSpec(outputLocation, customPartitionLocations), + outputSpec = + FileFormatWriter.OutputSpec(outputLocation, customPartitionLocations, allColumns), hadoopConf = hadoopConf, partitionColumns = partitionAttributes, bucketSpec = None, http://git-wip-us.apache.org/repos/asf/spark/blob/d4f0b1d2/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 6c11905..f2e0c69 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -2150,4 +2150,17 @@ class HiveDDLSuite assert(e.message.contains("LOAD DATA input path does not exist")) } } + + test("SPARK-22252: FileFormatWriter should respect the input query schema in HIVE") { + withTable("t1", "t2", "t3", "t4") { + spark.range(1).select('id as 'col1, 'id as 'col2).write.saveAsTable("t1") + spark.sql("select COL1, COL2 from t1").write.format("hive").saveAsTable("t2") + checkAnswer(spark.table("t2"), Row(0, 0)) + + // Test picking part of the columns when writing. + spark.range(1).select('id, 'id as 'col1, 'id as 'col2).write.saveAsTable("t3") + spark.sql("select COL1, COL2 from t3").write.format("hive").saveAsTable("t4") + checkAnswer(spark.table("t4"), Row(0, 0)) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org