Joao created SPARK-12878: ---------------------------- Summary: Dataframe fails with nested User Defined Types Key: SPARK-12878 URL: https://issues.apache.org/jira/browse/SPARK-12878 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.6.0 Reporter: Joao Priority: Blocker
Spark 1.6.0 crashes when using nested User Defined Types in a Dataframe. In version 1.5.2 the code below worked just fine: import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.GenericMutableRow import org.apache.spark.sql.types._ @SQLUserDefinedType(udt = classOf[AUDT]) case class A(list:Seq[B]) class AUDT extends UserDefinedType[A] { override def sqlType: DataType = StructType(Seq(StructField("list", ArrayType(BUDT, containsNull = false), nullable = true))) override def userClass: Class[A] = classOf[A] override def serialize(obj: Any): Any = obj match { case A(list) => val row = new GenericMutableRow(1).update(0, new GenericArrayData(list.map(_.asInstanceOf[Any]).toArray)) row } override def deserialize(datum: Any): A = { datum match { case row: InternalRow => new A(row.getArray(0).toArray(BUDT).toSeq) } } } object AUDT extends AUDT @SQLUserDefinedType(udt = classOf[BUDT]) case class B(text:Int) class BUDT extends UserDefinedType[B] { override def sqlType: DataType = StructType(Seq(StructField("num", IntegerType, nullable = false))) override def userClass: Class[B] = classOf[B] override def serialize(obj: Any): Any = obj match { case B(text) => val row = new GenericMutableRow(1).setInt(0, text) row } override def deserialize(datum: Any): B = { datum match { case row: InternalRow => new B(row.getInt(0)) } } } object BUDT extends BUDT object Test { def main(args:Array[String]) = { val col = Seq(new A(Seq(new B(1), new B(2))), new A(Seq(new B(3), new B(4)))) val sc = new SparkContext(new SparkConf().setMaster("local[1]").setAppName("TestSpark")) val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.implicits._ val df = sc.parallelize(1 to 2 zip col).toDF("id","b") df.select("b").show() df.collect().foreach(println) } } In the new version (1.6.0) I needed to include the following import: import org.apache.spark.sql.catalyst.expressions.GenericMutableRow However, Spark crashes in runtime: 16/01/18 14:36:22 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) java.lang.ClassCastException: scala.runtime.BoxedUnit cannot be cast to org.apache.spark.sql.catalyst.InternalRow at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getStruct(rows.scala:51) at org.apache.spark.sql.catalyst.expressions.GenericMutableRow.getStruct(rows.scala:248) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51) at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$10.next(Iterator.scala:312) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org