Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r200423206
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
    @@ -2120,6 +2122,99 @@ class Analyzer(
         }
       }
     
    +  /**
    +   * Resolves columns of an output table from the data in a logical plan. 
This rule will:
    +   *
    +   * - Reorder columns when the write is by name
    +   * - Insert safe casts when data types do not match
    +   * - Insert aliases when column names do not match
    +   * - Detect plans that are not compatible with the output table and 
throw AnalysisException
    +   */
    +  object ResolveOutputRelation extends Rule[LogicalPlan] {
    +    override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    +      case append @ AppendData(table: NamedRelation, query, isByName)
    +          if table.resolved && query.resolved && !append.resolved =>
    +        val projection = resolveOutputColumns(table.name, table.output, 
query, isByName)
    +
    +        if (projection != query) {
    +          append.copy(query = projection)
    +        } else {
    +          append
    +        }
    +    }
    +
    +    def resolveOutputColumns(
    +        tableName: String,
    +        expected: Seq[Attribute],
    +        query: LogicalPlan,
    +        byName: Boolean): LogicalPlan = {
    +
    +      if (expected.size < query.output.size) {
    +        throw new AnalysisException(
    +          s"""Cannot write to '$tableName', too many data columns:
    +             |Table columns: ${expected.map(_.name).mkString(", ")}
    +             |Data columns: ${query.output.map(_.name).mkString(", 
")}""".stripMargin)
    +      }
    +
    +      val errors = new mutable.ArrayBuffer[String]()
    +      val resolved: Seq[NamedExpression] = if (byName) {
    +        expected.flatMap { outAttr =>
    +          query.resolveQuoted(outAttr.name, resolver) match {
    +            case Some(inAttr) if inAttr.nullable && !outAttr.nullable =>
    +              errors += s"Cannot write nullable values to non-null column 
'${outAttr.name}'"
    +              None
    +
    +            case Some(inAttr) if 
!outAttr.dataType.sameType(inAttr.dataType) =>
    +              Some(upcast(inAttr, outAttr))
    +
    +            case Some(inAttr) =>
    +              Some(inAttr) // matches nullability, datatype, and name
    +
    +            case _ =>
    +              errors += s"Cannot find data for output column 
'${outAttr.name}'"
    +              None
    +          }
    +        }
    +
    +      } else {
    +        if (expected.size > query.output.size) {
    +          throw new AnalysisException(
    +            s"""Cannot write to '$tableName', not enough data columns:
    +               |Table columns: ${expected.map(_.name).mkString(", ")}
    +               |Data columns: ${query.output.map(_.name).mkString(", 
")}""".stripMargin)
    +        }
    +
    +        query.output.zip(expected).flatMap {
    +          case (inAttr, outAttr) if inAttr.nullable && !outAttr.nullable =>
    +            errors += s"Cannot write nullable values to non-null column 
'${outAttr.name}'"
    +            None
    +
    +          case (inAttr, outAttr)
    +            if !inAttr.dataType.sameType(outAttr.dataType) || inAttr.name 
!= outAttr.name =>
    +            Some(upcast(inAttr, outAttr))
    +
    +          case (inAttr, _) =>
    +            Some(inAttr) // matches nullability, datatype, and name
    +        }
    +      }
    +
    +      if (errors.nonEmpty) {
    +        throw new AnalysisException(
    +          s"Cannot write incompatible data to table '$tableName':\n- 
${errors.mkString("\n- ")}")
    +      }
    +
    +      Project(resolved, query)
    +    }
    +
    +    private def upcast(inAttr: NamedExpression, outAttr: Attribute): 
NamedExpression = {
    +      Alias(
    +        UpCast(inAttr, outAttr.dataType, Seq()), outAttr.name
    --- End diff --
    
    The purpose of `UpCast` here is to prevent Spark from automatically 
inserting casts that could lose information, like `long` to `int` or `string` 
to `int`.
    
    I would support the same for `string` to `boolean` to catch destructive 
problems from accidental column alignment (in SQL) or similar errors. The main 
problem here is that Spark inserts casts instead of alerting the user that 
there's a problem. When the write succeeds, it may be a while before the user 
realizes the mistake and can't recover the original data.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to