[jira] [Comment Edited] (SPARK-12878) Dataframe fails with nested User Defined Types
[ https://issues.apache.org/jira/browse/SPARK-12878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15160119#comment-15160119 ] Jakob Odersky edited comment on SPARK-12878 at 2/25/16 10:22 PM: - I just tried your example and get a slightly different exception: {{java.lang.ClassCastException: B cannot be cast to org.apache.spark.sql.catalyst.InternalRow}} (B as opposed to BoxedUnit) However I actually don't understand why this worked in 1.5.2 in the first place. Consider the following extract from your snippet: {code} case A(list) => val row = new GenericMutableRow(1) row.update(0, new GenericArrayData(list.map(_.asInstanceOf[Any]).toArray)) row {code} although `list` is a collection of elements B in this case, I don't think that the individual Bs are serialized according to the definition in BUDT. I would assume you are solely responsible for the serialization and would have to call something like {{list.map(BUDT.serialize(_))}} to convert any child elements to an "SQL Datum" (not sure what that is but the docs say it, http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.types.UserDefinedType) Maybe someone with more knowledge ([~marmbrus] [~cloud_fan]) on the topic can clarify what's going on? was (Author: jodersky): I just tried your example and get a slightly different exception: {{java.lang.ClassCastException: B cannot be cast to org.apache.spark.sql.catalyst.InternalRow}} (B as opposed to BoxedUnit) However I actually don't understand why this worked in 1.5.2 in the first place. Consider the following extract from your snippet: {code} case A(list) => val row = new GenericMutableRow(1) row.update(0, new GenericArrayData(list.map(_.asInstanceOf[Any]).toArray)) row {code} although `list` is a collection of elements B in this case, I don't think that the individual Bs are serialized according to the definition in BUDT. I would assume you are solely responsible for the serialization and would have to call something like {{list.map(BUDT.serialize(_))}} to convert any child elements to an "SQL Datum" (not sure what that is but the docs say it, http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.types.UserDefinedType) Maybe someone with more knowledge on the topic can clarify what's going on? > Dataframe fails with nested User Defined Types > -- > > Key: SPARK-12878 > URL: https://issues.apache.org/jira/browse/SPARK-12878 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0 >Reporter: Joao >Priority: Blocker > > Spark 1.6.0 crashes when using nested User Defined Types in a Dataframe. > In version 1.5.2 the code below worked just fine: > import org.apache.spark.{SparkConf, SparkContext} > import org.apache.spark.sql.catalyst.InternalRow > import org.apache.spark.sql.catalyst.expressions.GenericMutableRow > import org.apache.spark.sql.types._ > @SQLUserDefinedType(udt = classOf[AUDT]) > case class A(list:Seq[B]) > class AUDT extends UserDefinedType[A] { > override def sqlType: DataType = StructType(Seq(StructField("list", > ArrayType(BUDT, containsNull = false), nullable = true))) > override def userClass: Class[A] = classOf[A] > override def serialize(obj: Any): Any = obj match { > case A(list) => > val row = new GenericMutableRow(1) > row.update(0, new > GenericArrayData(list.map(_.asInstanceOf[Any]).toArray)) > row > } > override def deserialize(datum: Any): A = { > datum match { > case row: InternalRow => new A(row.getArray(0).toArray(BUDT).toSeq) > } > } > } > object AUDT extends AUDT > @SQLUserDefinedType(udt = classOf[BUDT]) > case class B(text:Int) > class BUDT extends UserDefinedType[B] { > override def sqlType: DataType = StructType(Seq(StructField("num", > IntegerType, nullable = false))) > override def userClass: Class[B] = classOf[B] > override def serialize(obj: Any): Any = obj match { > case B(text) => > val row = new GenericMutableRow(1) > row.setInt(0, text) > row > } > override def deserialize(datum: Any): B = { > datum match { case row: InternalRow => new B(row.getInt(0)) } > } > } > object BUDT extends BUDT > object Test { > def main(args:Array[String]) = { > val col = Seq(new A(Seq(new B(1), new B(2))), > new A(Seq(new B(3), new B(4 > val sc = new SparkContext(new > SparkConf().setMaster("local[1]").setAppName("TestSpark")) > val sqlContext = new org.apache.spark.sql.SQLContext(sc) > import sqlContext.implicits._ > val df = sc.parallelize(1 to 2 zip col).toDF("id","b") > df.select("b").show() > df.collect().foreach(println) > } > } > In the new version (1.6.0) I needed to include the following import: > import
[jira] [Comment Edited] (SPARK-12878) Dataframe fails with nested User Defined Types
[ https://issues.apache.org/jira/browse/SPARK-12878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15160119#comment-15160119 ] Jakob Odersky edited comment on SPARK-12878 at 2/24/16 7:16 PM: I just tried your example and get a slightly different exception: {{java.lang.ClassCastException: B cannot be cast to org.apache.spark.sql.catalyst.InternalRow}} (B as opposed to BoxedUnit) However I actually don't understand why this worked in 1.5.2 in the first place. Consider the following extract from your snippet: {code} case A(list) => val row = new GenericMutableRow(1) row.update(0, new GenericArrayData(list.map(_.asInstanceOf[Any]).toArray)) row {code} although `list` is a collection of elements B in this case, I don't think that the individual Bs are serialized according to the definition in BUDT. I would assume you are solely responsible for the serialization and would have to call something like {{list.map(BUDT.serialize(_))}} to convert any child elements to an "SQL Datum" (not sure what that is but the docs say it, http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.types.UserDefinedType) Maybe someone with more knowledge on the topic can clarify what's going on? was (Author: jodersky): I just tried your example and get a slightly different exception: {{java.lang.ClassCastException: B cannot be cast to org.apache.spark.sql.catalyst.InternalRow}} (B as opposed to BoxedUnit) However I actually don't understand why this worked in 1.5.2 in the first place. Consider the following extract from your snippet: {code} case A(list) => val row = new GenericMutableRow(1) row.update(0, new GenericArrayData(list.map(_.asInstanceOf[Any]).toArray)) row {code} although `list` is a collection of elements B in this case, I don't think that the individual B's are serialized according to the definition in BUDT. I would assume you are solely responsible for the serialization and would have to call something like {{list.map(BUDT.serialize(_))}} to convert any child elements to an "SQL Datum" (not sure what that is but the docs say it, http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.types.UserDefinedType) Maybe someone with more knowledge on the topic can clarify what's going on? > Dataframe fails with nested User Defined Types > -- > > Key: SPARK-12878 > URL: https://issues.apache.org/jira/browse/SPARK-12878 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0 >Reporter: Joao >Priority: Blocker > > Spark 1.6.0 crashes when using nested User Defined Types in a Dataframe. > In version 1.5.2 the code below worked just fine: > import org.apache.spark.{SparkConf, SparkContext} > import org.apache.spark.sql.catalyst.InternalRow > import org.apache.spark.sql.catalyst.expressions.GenericMutableRow > import org.apache.spark.sql.types._ > @SQLUserDefinedType(udt = classOf[AUDT]) > case class A(list:Seq[B]) > class AUDT extends UserDefinedType[A] { > override def sqlType: DataType = StructType(Seq(StructField("list", > ArrayType(BUDT, containsNull = false), nullable = true))) > override def userClass: Class[A] = classOf[A] > override def serialize(obj: Any): Any = obj match { > case A(list) => > val row = new GenericMutableRow(1) > row.update(0, new > GenericArrayData(list.map(_.asInstanceOf[Any]).toArray)) > row > } > override def deserialize(datum: Any): A = { > datum match { > case row: InternalRow => new A(row.getArray(0).toArray(BUDT).toSeq) > } > } > } > object AUDT extends AUDT > @SQLUserDefinedType(udt = classOf[BUDT]) > case class B(text:Int) > class BUDT extends UserDefinedType[B] { > override def sqlType: DataType = StructType(Seq(StructField("num", > IntegerType, nullable = false))) > override def userClass: Class[B] = classOf[B] > override def serialize(obj: Any): Any = obj match { > case B(text) => > val row = new GenericMutableRow(1) > row.setInt(0, text) > row > } > override def deserialize(datum: Any): B = { > datum match { case row: InternalRow => new B(row.getInt(0)) } > } > } > object BUDT extends BUDT > object Test { > def main(args:Array[String]) = { > val col = Seq(new A(Seq(new B(1), new B(2))), > new A(Seq(new B(3), new B(4 > val sc = new SparkContext(new > SparkConf().setMaster("local[1]").setAppName("TestSpark")) > val sqlContext = new org.apache.spark.sql.SQLContext(sc) > import sqlContext.implicits._ > val df = sc.parallelize(1 to 2 zip col).toDF("id","b") > df.select("b").show() > df.collect().foreach(println) > } > } > In the new version (1.6.0) I needed to include the following import: > import
[jira] [Comment Edited] (SPARK-12878) Dataframe fails with nested User Defined Types
[ https://issues.apache.org/jira/browse/SPARK-12878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15160119#comment-15160119 ] Jakob Odersky edited comment on SPARK-12878 at 2/24/16 7:15 PM: I just tried your example and get a slightly different exception: {{java.lang.ClassCastException: B cannot be cast to org.apache.spark.sql.catalyst.InternalRow}} (B as opposed to BoxedUnit) However I actually don't understand why this worked in 1.5.2 in the first place. Consider the following extract from your snippet: {code} case A(list) => val row = new GenericMutableRow(1) row.update(0, new GenericArrayData(list.map(_.asInstanceOf[Any]).toArray)) row {code} although `list` is a collection of elements B in this case, I don't think that the individual B's are serialized according to the definition in BUDT. I would assume you are solely responsible for the serialization and would have to call something like {{list.map(BUDT.serialize(_))}} to convert any child elements to an "SQL Datum" (not sure what that is but the docs say it, http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.types.UserDefinedType) Maybe someone with more knowledge on the topic can clarify what's going on? was (Author: jodersky): I just tried your example and get a slightly different exception: {{java.lang.ClassCastException: B cannot be cast to org.apache.spark.sql.catalyst.InternalRow}} (B as opposed to BoxedUnit) However I actually don't understand why this worked in 1.5.2 in the first place. Consider the following extract from your snippet: {code} case A(list) => val row = new GenericMutableRow(1) row.update(0, new GenericArrayData(list.map(_.asInstanceOf[Any]).toArray)) row {code} although `list` is of type B in this case, I don't think that the B's are serialized according to the definition in BUDT. I would assume you are solely responsible for the serialization and would have to call something like {{list.map(BUDT.serialize(_))}} to convert any child elements to an "SQL Datum" (not sure what that is but the docs say it, http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.types.UserDefinedType) Maybe someone with more knowledge on the topic can clarify what's going on? > Dataframe fails with nested User Defined Types > -- > > Key: SPARK-12878 > URL: https://issues.apache.org/jira/browse/SPARK-12878 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0 >Reporter: Joao >Priority: Blocker > > Spark 1.6.0 crashes when using nested User Defined Types in a Dataframe. > In version 1.5.2 the code below worked just fine: > import org.apache.spark.{SparkConf, SparkContext} > import org.apache.spark.sql.catalyst.InternalRow > import org.apache.spark.sql.catalyst.expressions.GenericMutableRow > import org.apache.spark.sql.types._ > @SQLUserDefinedType(udt = classOf[AUDT]) > case class A(list:Seq[B]) > class AUDT extends UserDefinedType[A] { > override def sqlType: DataType = StructType(Seq(StructField("list", > ArrayType(BUDT, containsNull = false), nullable = true))) > override def userClass: Class[A] = classOf[A] > override def serialize(obj: Any): Any = obj match { > case A(list) => > val row = new GenericMutableRow(1) > row.update(0, new > GenericArrayData(list.map(_.asInstanceOf[Any]).toArray)) > row > } > override def deserialize(datum: Any): A = { > datum match { > case row: InternalRow => new A(row.getArray(0).toArray(BUDT).toSeq) > } > } > } > object AUDT extends AUDT > @SQLUserDefinedType(udt = classOf[BUDT]) > case class B(text:Int) > class BUDT extends UserDefinedType[B] { > override def sqlType: DataType = StructType(Seq(StructField("num", > IntegerType, nullable = false))) > override def userClass: Class[B] = classOf[B] > override def serialize(obj: Any): Any = obj match { > case B(text) => > val row = new GenericMutableRow(1) > row.setInt(0, text) > row > } > override def deserialize(datum: Any): B = { > datum match { case row: InternalRow => new B(row.getInt(0)) } > } > } > object BUDT extends BUDT > object Test { > def main(args:Array[String]) = { > val col = Seq(new A(Seq(new B(1), new B(2))), > new A(Seq(new B(3), new B(4 > val sc = new SparkContext(new > SparkConf().setMaster("local[1]").setAppName("TestSpark")) > val sqlContext = new org.apache.spark.sql.SQLContext(sc) > import sqlContext.implicits._ > val df = sc.parallelize(1 to 2 zip col).toDF("id","b") > df.select("b").show() > df.collect().foreach(println) > } > } > In the new version (1.6.0) I needed to include the following import: > import org.apache.spark.sql.catalyst.expressions.GenericMutableRow >