Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13269#discussion_r65492813
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
    @@ -1884,10 +1884,62 @@ class Analyzer(
               } else {
                 inputAttributes
               }
    -          val unbound = deserializer transform {
    -            case b: BoundReference => inputs(b.ordinal)
    +
    +          validateTupleColumns(deserializer, inputs)
    +          val ordinalResolved = deserializer transform {
    +            case GetColumnByOrdinal(ordinal, _) => inputs(ordinal)
    +          }
    +          val attrResolved = resolveExpression(
    +            ordinalResolved, LocalRelation(inputs), throws = true)
    +          validateInnerTupleFields(attrResolved)
    +          attrResolved
    +      }
    +    }
    +
    +    private def fail(schema: StructType, maxOrdinal: Int): Unit = {
    +      throw new AnalysisException(s"Try to map ${schema.simpleString} to 
Tuple${maxOrdinal + 1}, " +
    +        "but failed as the number of fields does not line up.")
    +    }
    +
    +    /**
    +     * For each Tuple field, we use [[GetColumnByOrdinal]] to get its 
corresponding column by
    +     * position.  However, the actual number of columns may be different 
from the number of Tuple
    +     * fields.  This method is used to check the number of columns and 
fields, and throw an
    +     * exception if they do not match.
    +     */
    +    private def validateTupleColumns(deserializer: Expression, inputs: 
Seq[Attribute]): Unit = {
    +      var maxOrdinal = -1
    +      deserializer.foreach {
    +        case GetColumnByOrdinal(ordinal, _) => if (ordinal > maxOrdinal) 
maxOrdinal = ordinal
    +        case _ =>
    +      }
    +      if (maxOrdinal >= 0 && maxOrdinal != inputs.length - 1) {
    +        fail(inputs.toStructType, maxOrdinal)
    +      }
    --- End diff --
    
    Actually we should also check that each ordinal from 0 to `inputs.length - 
1` appears in deserializer expression:
    
    ```scala
    val ordinals = deserializer.collect {
      case GetColumnByOrdinal(ordinal, _) => ordinal
    }.distinct.sorted
    
    if (ordinals.nonEmpty && ordinals != (0 until inputs.length)) {
      fail(inputs.toStructType, ordinals.max)
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to