Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22749#discussion_r226295859
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
 ---
    @@ -103,75 +88,61 @@ object ExpressionEncoder {
        * name/positional binding is preserved.
        */
       def tuple(encoders: Seq[ExpressionEncoder[_]]): ExpressionEncoder[_] = {
    +    if (encoders.length > 22) {
    +      throw new RuntimeException("Can't construct a tuple encoder for more 
than 22 encoders.")
    +    }
    +
         encoders.foreach(_.assertUnresolved())
     
         val schema = StructType(encoders.zipWithIndex.map {
           case (e, i) =>
    -        val (dataType, nullable) = if (e.flat) {
    -          e.schema.head.dataType -> e.schema.head.nullable
    -        } else {
    -          e.schema -> true
    -        }
    -        StructField(s"_${i + 1}", dataType, nullable)
    +        StructField(s"_${i + 1}", e.objSerializer.dataType, 
e.objSerializer.nullable)
         })
     
         val cls = 
Utils.getContextOrSparkClassLoader.loadClass(s"scala.Tuple${encoders.size}")
     
    -    val serializer = encoders.zipWithIndex.map { case (enc, index) =>
    -      val originalInputObject = enc.serializer.head.collect { case b: 
BoundReference => b }.head
    +    val serializers = encoders.zipWithIndex.map { case (enc, index) =>
    +      val boundRefs = enc.objSerializer.collect { case b: BoundReference 
=> b }.distinct
    +      assert(boundRefs.size == 1, "object serializer should have only one 
bound reference but " +
    +        s"there are ${boundRefs.size}")
    +
    +      val originalInputObject = boundRefs.head
           val newInputObject = Invoke(
             BoundReference(0, ObjectType(cls), nullable = true),
             s"_${index + 1}",
    -        originalInputObject.dataType)
    +        originalInputObject.dataType,
    +        returnNullable = originalInputObject.nullable)
     
    -      val newSerializer = enc.serializer.map(_.transformUp {
    +      val newSerializer = enc.objSerializer.transformUp {
             case b: BoundReference if b == originalInputObject => 
newInputObject
    --- End diff --
    
    Since there is only one distinct `BoundReference`, we can just write `case 
b: BoundReference  => newInputObject`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to