Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r208090428
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala ---
    @@ -336,4 +337,124 @@ object DataType {
           case (fromDataType, toDataType) => fromDataType == toDataType
         }
       }
    +
    +  private val SparkGeneratedName = """col\d+""".r
    +  private def isSparkGeneratedName(name: String): Boolean = name match {
    +    case SparkGeneratedName(_*) => true
    +    case _ => false
    +  }
    +
    +  /**
    +   * Returns true if the write data type can be read using the read data 
type.
    +   *
    +   * The write type is compatible with the read type if:
    +   * - Both types are arrays, the array element types are compatible, and 
element nullability is
    +   *   compatible (read allows nulls or write does not contain nulls).
    +   * - Both types are maps and the map key and value types are compatible, 
and value nullability
    +   *   is compatible  (read allows nulls or write does not contain nulls).
    +   * - Both types are structs and each field in the read struct is present 
in the write struct and
    +   *   compatible (including nullability), or is nullable if the write 
struct does not contain the
    +   *   field. Write-side structs are not compatible if they contain fields 
that are not present in
    +   *   the read-side struct.
    +   * - Both types are atomic and the write type can be safely cast to the 
read type.
    +   *
    +   * Extra fields in write-side structs are not allowed to avoid 
accidentally writing data that
    +   * the read schema will not read, and to ensure map key equality is not 
changed when data is read.
    +   *
    +   * @param write a write-side data type to validate against the read type
    +   * @param read a read-side data type
    +   * @return true if data written with the write type can be read using 
the read type
    +   */
    +  def canWrite(
    +      write: DataType,
    +      read: DataType,
    +      resolver: Resolver,
    +      context: String,
    +      addError: String => Unit = (_: String) => {}): Boolean = {
    +    (write, read) match {
    +      case (wArr: ArrayType, rArr: ArrayType) =>
    +        // run compatibility check first to produce all error messages
    +        val typesCompatible =
    +          canWrite(wArr.elementType, rArr.elementType, resolver, context + 
".element", addError)
    +
    +        if (wArr.containsNull && !rArr.containsNull) {
    +          addError(s"Cannot write nullable elements to array of non-nulls: 
'$context'")
    +          false
    +        } else {
    +          typesCompatible
    +        }
    +
    +      case (wMap: MapType, rMap: MapType) =>
    +        // map keys cannot include data fields not in the read schema 
without changing equality when
    +        // read. map keys can be missing fields as long as they are 
nullable in the read schema.
    +
    +        // run compatibility check first to produce all error messages
    +        val keyCompatible =
    +          canWrite(wMap.keyType, rMap.keyType, resolver, context + ".key", 
addError)
    +        val valueCompatible =
    +          canWrite(wMap.valueType, rMap.valueType, resolver, context + 
".value", addError)
    +        val typesCompatible = keyCompatible && valueCompatible
    +
    +        if (wMap.valueContainsNull && !rMap.valueContainsNull) {
    +          addError(s"Cannot write nullable values to map of non-nulls: 
'$context'")
    +          false
    +        } else {
    +          typesCompatible
    +        }
    +
    +      case (StructType(writeFields), StructType(readFields)) =>
    +        var fieldCompatible = true
    +        readFields.zip(writeFields).foreach {
    +          case (rField, wField) =>
    +            val namesMatch = resolver(wField.name, rField.name) || 
isSparkGeneratedName(wField.name)
    +            val fieldContext = s"$context.${rField.name}"
    +            val typesCompatible =
    +              canWrite(wField.dataType, rField.dataType, resolver, 
fieldContext, addError)
    +
    +            if (!namesMatch) {
    +              addError(s"Struct '$context' field name does not match (may 
be out of order): " +
    +                  s"expected '${rField.name}', found '${wField.name}'")
    +              fieldCompatible = false
    +            } else if (!rField.nullable && wField.nullable) {
    +              addError(s"Cannot write nullable values to non-null field: 
'$fieldContext'")
    +              fieldCompatible = false
    +            } else if (!typesCompatible) {
    +              // errors are added in the recursive call to canWrite above
    +              fieldCompatible = false
    +            }
    +        }
    +
    +        if (readFields.size > writeFields.size) {
    +          val missingFieldsStr = readFields.takeRight(readFields.size - 
writeFields.size)
    +                  .filterNot(_.nullable).map(f => 
s"'${f.name}'").mkString(", ")
    +          if (missingFieldsStr.nonEmpty) {
    +            addError(s"Struct '$context' missing required (non-null) 
fields: $missingFieldsStr")
    --- End diff --
    
    I'll update it and open an issue for schema evolution. If we support ADD 
COLUMN then we'll need to do this.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to