Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21305#discussion_r206977289 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -2217,6 +2218,100 @@ class Analyzer( } } + /** + * Resolves columns of an output table from the data in a logical plan. This rule will: + * + * - Reorder columns when the write is by name + * - Insert safe casts when data types do not match + * - Insert aliases when column names do not match + * - Detect plans that are not compatible with the output table and throw AnalysisException + */ + object ResolveOutputRelation extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case append @ AppendData(table, query, isByName) + if table.resolved && query.resolved && !append.resolved => + val projection = resolveOutputColumns(table.name, table.output, query, isByName) + + if (projection != query) { + append.copy(query = projection) + } else { + append + } + } + + def resolveOutputColumns( + tableName: String, + expected: Seq[Attribute], + query: LogicalPlan, + byName: Boolean): LogicalPlan = { + + if (expected.size < query.output.size) { + throw new AnalysisException( + s"""Cannot write to '$tableName', too many data columns: + |Table columns: ${expected.map(_.name).mkString(", ")} + |Data columns: ${query.output.map(_.name).mkString(", ")}""".stripMargin) + } + + val errors = new mutable.ArrayBuffer[String]() + val resolved: Seq[NamedExpression] = if (byName) { + expected.flatMap { outAttr => + query.resolveQuoted(outAttr.name, resolver) match { + case Some(inAttr) if inAttr.nullable && !outAttr.nullable => + errors += s"Cannot write nullable values to non-null column '${outAttr.name}'" + None + + case Some(inAttr) if !DataType.canWrite(outAttr.dataType, inAttr.dataType, resolver) => + Some(upcast(inAttr, outAttr)) + + case Some(inAttr) => + Some(inAttr) // matches nullability, datatype, and name + + case _ => + errors += s"Cannot find data for output column '${outAttr.name}'" + None + } + } + + } else { + if (expected.size > query.output.size) { + throw new AnalysisException( + s"""Cannot write to '$tableName', not enough data columns: + |Table columns: ${expected.map(_.name).mkString(", ")} + |Data columns: ${query.output.map(_.name).mkString(", ")}""".stripMargin) + } + + query.output.zip(expected).flatMap { --- End diff -- This handles both append cases, write by name and write by position. This block is checking by position. I'll see if I can refactor the checks into a private method.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org