Github user hvanhovell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20771#discussion_r173231642
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
    @@ -599,8 +610,71 @@ case class MapObjects private(
     
       override def children: Seq[Expression] = lambdaFunction :: inputData :: 
Nil
     
    -  override def eval(input: InternalRow): Any =
    -    throw new UnsupportedOperationException("Only code-generated 
evaluation is supported")
    +  // The data with PythonUserDefinedType are actually stored with the data 
type of its sqlType.
    +  // When we want to apply MapObjects on it, we have to use it.
    +  lazy private val inputDataType = inputData.dataType match {
    +    case p: PythonUserDefinedType => p.sqlType
    +    case _ => inputData.dataType
    +  }
    +
    +  private def executeFuncOnCollection(inputCollection: Seq[_]): Seq[_] = {
    +    inputCollection.map { element =>
    +      val row = InternalRow.fromSeq(Seq(element))
    +      lambdaFunction.eval(row)
    +    }
    +  }
    +
    +  override def eval(input: InternalRow): Any = {
    +    val inputCollection = inputData.eval(input)
    +
    +    if (inputCollection == null) {
    +      return inputCollection
    +    }
    +
    +    val results = inputDataType match {
    +      case ObjectType(cls) if classOf[Seq[_]].isAssignableFrom(cls) =>
    +        executeFuncOnCollection(inputCollection.asInstanceOf[Seq[_]])
    +      case ObjectType(cls) if cls.isArray =>
    +        
executeFuncOnCollection(inputCollection.asInstanceOf[Array[_]].toSeq)
    +      case ObjectType(cls) if 
classOf[java.util.List[_]].isAssignableFrom(cls) =>
    +        
executeFuncOnCollection(inputCollection.asInstanceOf[java.util.List[_]].asScala)
    +      case ObjectType(cls) if cls == classOf[Object] =>
    +        if (inputCollection.getClass.isArray) {
    +          
executeFuncOnCollection(inputCollection.asInstanceOf[Array[_]].toSeq)
    +        } else {
    +          executeFuncOnCollection(inputCollection.asInstanceOf[Seq[_]])
    +        }
    +      case ArrayType(et, _) =>
    +        
executeFuncOnCollection(inputCollection.asInstanceOf[ArrayData].array)
    +    }
    +
    +    customCollectionCls match {
    --- End diff --
    
    We shouldn't be doing this during eval. Please move this into a function 
val.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to