Github user wangyum commented on a diff in the pull request: https://github.com/apache/spark/pull/20020#discussion_r212850054 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala --- @@ -20,30 +20,32 @@ package org.apache.spark.sql.execution.command import org.apache.hadoop.conf.Configuration import org.apache.spark.SparkContext -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan} +import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker +import org.apache.spark.sql.execution.datasources.FileFormatWriter import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.util.SerializableConfiguration - /** - * A special `RunnableCommand` which writes data out and updates metrics. + * A special `Command` which writes data out and updates metrics. */ -trait DataWritingCommand extends RunnableCommand { - +trait DataWritingCommand extends Command { /** * The input query plan that produces the data to be written. + * IMPORTANT: the input query plan MUST be analyzed, so that we can carry its output columns + * to [[FileFormatWriter]]. */ def query: LogicalPlan - // We make the input `query` an inner child instead of a child in order to hide it from the - // optimizer. This is because optimizer may not preserve the output schema names' case, and we - // have to keep the original analyzed plan here so that we can pass the corrected schema to the - // writer. The schema of analyzed plan is what user expects(or specifies), so we should respect - // it when writing. - override protected def innerChildren: Seq[LogicalPlan] = query :: Nil + override final def children: Seq[LogicalPlan] = query :: Nil - override lazy val metrics: Map[String, SQLMetric] = { + // Output columns of the analyzed input query plan + def outputColumns: Seq[Attribute] --- End diff -- `outputColumns` changed from analyzed to optimized. For example: ```scala withTempDir { dir => val path = dir.getCanonicalPath val cnt = 30 val table1Path = s"$path/table1" val table3Path = s"$path/table3" spark.range(cnt).selectExpr("cast(id as bigint) as col1", "cast(id % 3 as bigint) as col2") .write.mode(SaveMode.Overwrite).parquet(table1Path) withTable("table1", "table3") { spark.sql( s"CREATE TABLE table1(col1 bigint, col2 bigint) using parquet location '$table1Path/'") spark.sql("CREATE TABLE table3(COL1 bigint, COL2 bigint) using parquet " + "PARTITIONED BY (COL2) " + s"CLUSTERED BY (COL1) INTO 2 BUCKETS location '$table3Path/'") withView("view1") { spark.sql("CREATE VIEW view1 as select col1, col2 from table1 where col1 > -20") spark.sql("INSERT OVERWRITE TABLE table3 select COL1, COL2 from view1 CLUSTER BY COL1") spark.table("table3").show } } } ``` ``` outputColumns: List(COL1#19L, COL2#20L) outputColumns: List(COL1#19L, COL2#20L) outputColumns: List(COL1#19L, COL2#20L) outputColumns: List(COL1#19L, COL2#20L) outputColumns: List(COL1#19L, COL2#20L) outputColumns: List(COL1#19L, COL2#20L) outputColumns: List(COL1#19L, COL2#20L) outputColumns: List(COL1#19L, COL2#20L) outputColumns: List(col1#16L, col2#17L) outputColumns: List(col1#16L, col2#17L) outputColumns: List(col1#16L, col2#17L) ```
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org