Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r200824602
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
    @@ -2120,6 +2122,99 @@ class Analyzer(
         }
       }
     
    +  /**
    +   * Resolves columns of an output table from the data in a logical plan. 
This rule will:
    +   *
    +   * - Reorder columns when the write is by name
    +   * - Insert safe casts when data types do not match
    +   * - Insert aliases when column names do not match
    +   * - Detect plans that are not compatible with the output table and 
throw AnalysisException
    +   */
    +  object ResolveOutputRelation extends Rule[LogicalPlan] {
    +    override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    +      case append @ AppendData(table: NamedRelation, query, isByName)
    +          if table.resolved && query.resolved && !append.resolved =>
    +        val projection = resolveOutputColumns(table.name, table.output, 
query, isByName)
    +
    +        if (projection != query) {
    +          append.copy(query = projection)
    +        } else {
    +          append
    +        }
    +    }
    +
    +    def resolveOutputColumns(
    +        tableName: String,
    +        expected: Seq[Attribute],
    +        query: LogicalPlan,
    +        byName: Boolean): LogicalPlan = {
    +
    +      if (expected.size < query.output.size) {
    +        throw new AnalysisException(
    +          s"""Cannot write to '$tableName', too many data columns:
    +             |Table columns: ${expected.map(_.name).mkString(", ")}
    +             |Data columns: ${query.output.map(_.name).mkString(", 
")}""".stripMargin)
    +      }
    +
    +      val errors = new mutable.ArrayBuffer[String]()
    +      val resolved: Seq[NamedExpression] = if (byName) {
    +        expected.flatMap { outAttr =>
    +          query.resolveQuoted(outAttr.name, resolver) match {
    +            case Some(inAttr) if inAttr.nullable && !outAttr.nullable =>
    +              errors += s"Cannot write nullable values to non-null column 
'${outAttr.name}'"
    +              None
    +
    +            case Some(inAttr) if 
!outAttr.dataType.sameType(inAttr.dataType) =>
    --- End diff --
    
    Yes, I'll update to check nested fields.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to