Hi all!

I'm using Spark 1.3.0 and I'm struggling with a definition of a new type for
a project I'm working on. I've created a case class Person(name: String) and
now I'm trying to make Spark to be able serialize and deserialize the
defined type. I made a couple of attempts but none of them did not work in
100% (there were issues either in serialization or deserialization). 

This is my class and the corresponding UDT.

@SQLUserDefinedType(udt = classOf[PersonUDT])
case class Person(name: String)

class PersonUDT extends UserDefinedType[Person] {
  override def sqlType: DataType = StructType(Seq(StructField("name",
StringType)))

  override def serialize(obj: Any): Seq[Any] = {
    obj match {
      case c: Person =>
        Seq(c.name)
    }
  }

  override def userClass: Class[Person] = classOf[Person]

  override def deserialize(datum: Any): Person = {
    datum match {
      case values: Seq[_] =>
        assert(values.length == 1)
        Person(values.head.asInstanceOf[String])
      case values: util.ArrayList[_] =>
        Person(values.get(0).asInstanceOf[String])
    }
  }
  
  // In some other attempt I was creating RDD of Seq with manually
serialized data and 
  // I had to override equals because two DFs with the same type weren't
actually equal
  // StructField(person,...types.PersonUDT@a096ac3)
  // StructField(person,...types.PersonUDT@613fd937)
  def canEqual(other: Any): Boolean = other.isInstanceOf[PersonUDT]
 
  override def equals(other: Any): Boolean = other match {
    case that: PersonUDT => true
    case _ => false
  }

  override def hashCode(): Int = 1
}

This is how I create RDD of Person and then try to create a DataFrame
val rdd = sparkContext.parallelize((1 to 100).map(i => Person(i.toString)))
val sparkDataFrame = sqlContext.createDataFrame(rdd)

The second line throws an exception:
java.lang.ClassCastException: ....types.PersonUDT cannot be cast to
org.apache.spark.sql.types.StructType 
at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:316)

I looked into the code in SQLContext.scala and it seems that the code
requires UDT to be extending StructType but in fact it extends
UserDefinedType which extends directly DataType.
I'm not sure whether it is a bug or I just don't know how to use UDTs.

Do you have any suggestions how to solve this? I based my UDT on
ExamplePointUDT but it seems to be incorrect. Is there a working example for
UDT?


Thank you for the reply in advance!
wjur



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/User-Defined-Type-UDT-tp22796.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to