Hi all, I'm trying to implement a pipeline for computer vision based on the latest ML package in spark. The first step of my pipeline is to decode image (jpeg for instance) stored in a parquet file. For this, I begin to create a UserDefinedType that represents a decoded image stored in a array of byte. Here is my first attempt :
*@SQLUserDefinedType(udt = classOf[ByteImageUDT])class ByteImage(channels: Int, width: Int, height: Int, data: Array[Byte])private[spark] class ByteImageUDT extends UserDefinedType[ByteImage] { override def sqlType: StructType = { // type: 0 = sparse, 1 = dense // We only use "values" for dense vectors, and "size", "indices", and "values" for sparse // vectors. The "values" field is nullable because we might want to add binary vectors later, // which uses "size" and "indices", but not "values". StructType(Seq( StructField("channels", IntegerType, nullable = false), StructField("width", IntegerType, nullable = false), StructField("height", IntegerType, nullable = false), StructField("data", BinaryType, nullable = false) } override def serialize(obj: Any): Row = { val row = new GenericMutableRow(4) val img = obj.asInstanceOf[ByteImage]* *... } override def deserialize(datum: Any): Vector = { * *....* * } } override def pyUDT: String = "pyspark.mllib.linalg.VectorUDT" override def userClass: Class[Vector] = classOf[Vector]}* I take the VectorUDT as a starting point but there's a lot of thing that I don't really understand. So any help on defining serialize and deserialize methods will be appreciated. Best Regards, Jao