(Note that UDT is not a public API yet.)

On Thu, May 7, 2015 at 7:11 AM, wjur <wojtek.jurc...@gmail.com> wrote:
> Hi all!
>
> I'm using Spark 1.3.0 and I'm struggling with a definition of a new type for
> a project I'm working on. I've created a case class Person(name: String) and
> now I'm trying to make Spark to be able serialize and deserialize the
> defined type. I made a couple of attempts but none of them did not work in
> 100% (there were issues either in serialization or deserialization).
>
> This is my class and the corresponding UDT.
>
> @SQLUserDefinedType(udt = classOf[PersonUDT])
> case class Person(name: String)
>
> class PersonUDT extends UserDefinedType[Person] {
>   override def sqlType: DataType = StructType(Seq(StructField("name",
> StringType)))
>
>   override def serialize(obj: Any): Seq[Any] = {

This should return a Row instance instead of Seq[Any], because the
sqlType is a struct type.

>     obj match {
>       case c: Person =>
>         Seq(c.name)
>     }
>   }
>
>   override def userClass: Class[Person] = classOf[Person]
>
>   override def deserialize(datum: Any): Person = {
>     datum match {
>       case values: Seq[_] =>
>         assert(values.length == 1)
>         Person(values.head.asInstanceOf[String])
>       case values: util.ArrayList[_] =>
>         Person(values.get(0).asInstanceOf[String])
>     }
>   }
>
>   // In some other attempt I was creating RDD of Seq with manually
> serialized data and
>   // I had to override equals because two DFs with the same type weren't
> actually equal
>   // StructField(person,...types.PersonUDT@a096ac3)
>   // StructField(person,...types.PersonUDT@613fd937)
>   def canEqual(other: Any): Boolean = other.isInstanceOf[PersonUDT]
>
>   override def equals(other: Any): Boolean = other match {
>     case that: PersonUDT => true
>     case _ => false
>   }
>
>   override def hashCode(): Int = 1
> }
>
> This is how I create RDD of Person and then try to create a DataFrame
> val rdd = sparkContext.parallelize((1 to 100).map(i => Person(i.toString)))
> val sparkDataFrame = sqlContext.createDataFrame(rdd)
>
> The second line throws an exception:
> java.lang.ClassCastException: ....types.PersonUDT cannot be cast to
> org.apache.spark.sql.types.StructType
> at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:316)
>
> I looked into the code in SQLContext.scala and it seems that the code
> requires UDT to be extending StructType but in fact it extends
> UserDefinedType which extends directly DataType.
> I'm not sure whether it is a bug or I just don't know how to use UDTs.
>
> Do you have any suggestions how to solve this? I based my UDT on
> ExamplePointUDT but it seems to be incorrect. Is there a working example for
> UDT?
>
>
> Thank you for the reply in advance!
> wjur
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/User-Defined-Type-UDT-tp22796.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to