Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20771#discussion_r177968292 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala --- @@ -599,8 +610,86 @@ case class MapObjects private( override def children: Seq[Expression] = lambdaFunction :: inputData :: Nil - override def eval(input: InternalRow): Any = - throw new UnsupportedOperationException("Only code-generated evaluation is supported") + // The data with UserDefinedType are actually stored with the data type of its sqlType. + // When we want to apply MapObjects on it, we have to use it. + lazy private val inputDataType = inputData.dataType match { + case u: UserDefinedType[_] => u.sqlType + case _ => inputData.dataType + } + + private def executeFuncOnCollection(inputCollection: Seq[_]): Seq[_] = { + inputCollection.map { element => + val row = InternalRow.fromSeq(Seq(element)) + lambdaFunction.eval(row) + } + } + + // Executes lambda function on input collection. + private lazy val executeFunc: Any => Seq[_] = inputDataType match { + case ObjectType(cls) if classOf[Seq[_]].isAssignableFrom(cls) => + x => executeFuncOnCollection(x.asInstanceOf[Seq[_]]) + case ObjectType(cls) if cls.isArray => + x => executeFuncOnCollection(x.asInstanceOf[Array[_]].toSeq) + case ObjectType(cls) if classOf[java.util.List[_]].isAssignableFrom(cls) => + x => executeFuncOnCollection(x.asInstanceOf[java.util.List[_]].asScala) + case ObjectType(cls) if cls == classOf[Object] => + if (cls.isArray) { + x => executeFuncOnCollection(x.asInstanceOf[Array[_]].toSeq) + } else { + x => executeFuncOnCollection(x.asInstanceOf[Seq[_]]) + } + case ArrayType(et, _) => + x => executeFuncOnCollection(x.asInstanceOf[ArrayData].array) --- End diff -- Shall we implement this wrapper here, or a follow-up?
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org