You should check where MyDenseVectorUDT is defined and whether it was
on the classpath (or in the assembly jar) at runtime. Make sure the
full class name (with package name) is used. Btw, UDTs are not public
yet, so please use it with caution. -Xiangrui

On Fri, Apr 17, 2015 at 12:45 AM, Jaonary Rabarisoa <jaon...@gmail.com> wrote:
> Dear all,
>
> Here is an example of code to reproduce the issue I mentioned in a previous
> mail about saving an UserDefinedType into a parquet file. The problem here
> is that the code works when I run it inside intellij idea but fails when I
> create the assembly jar and run it with spark-submit. I use the master
> version of  Spark.
>
> @SQLUserDefinedType(udt = classOf[MyDenseVectorUDT])
> class MyDenseVector(val data: Array[Double]) extends Serializable {
>   override def equals(other: Any): Boolean = other match {
>     case v: MyDenseVector =>
>       java.util.Arrays.equals(this.data, v.data)
>     case _ => false
>   }
> }
>
> class MyDenseVectorUDT extends UserDefinedType[MyDenseVector] {
>   override def sqlType: DataType = ArrayType(DoubleType, containsNull =
> false)
>   override def serialize(obj: Any): Seq[Double] = {
>     obj match {
>       case features: MyDenseVector =>
>         features.data.toSeq
>     }
>   }
>
>   override def deserialize(datum: Any): MyDenseVector = {
>     datum match {
>       case data: Seq[_] =>
>         new MyDenseVector(data.asInstanceOf[Seq[Double]].toArray)
>     }
>   }
>
>   override def userClass: Class[MyDenseVector] = classOf[MyDenseVector]
>
> }
>
> case class Toto(imageAnnotation: MyDenseVector)
>
> object TestUserDefinedType {
>
>   case class Params(input: String = null,
>                    partitions: Int = 12,
>                     outputDir: String = "images.parquet")
>
>   def main(args: Array[String]): Unit = {
>
>     val conf = new
> SparkConf().setAppName("ImportImageFolder").setMaster("local[4]")
>
>     val sc = new SparkContext(conf)
>     val sqlContext = new SQLContext(sc)
>
>     import sqlContext.implicits._
>
>     val rawImages = sc.parallelize((1 to 5).map(x => Toto(new
> MyDenseVector(Array[Double](x.toDouble))))).toDF
>
>     rawImages.printSchema()
>
>     rawImages.show()
>
>     rawImages.save("toto.parquet") // This fails with assembly jar
>     sc.stop()
>
>   }
> }
>
>
> My build.sbt is as follow :
>
> libraryDependencies ++= Seq(
>   "org.apache.spark" %% "spark-core" % sparkVersion % "provided",
>   "org.apache.spark" %% "spark-sql" % sparkVersion,
>   "org.apache.spark" %% "spark-mllib" % sparkVersion
> )
>
> assemblyMergeStrategy in assembly := {
>   case PathList("javax", "servlet", xs @ _*) => MergeStrategy.first
>   case PathList("org", "apache", xs @ _*) => MergeStrategy.first
>   case PathList("org", "jboss", xs @ _*) => MergeStrategy.first
> //  case PathList(ps @ _*) if ps.last endsWith ".html" =>
> MergeStrategy.first
> //  case "application.conf"                            =>
> MergeStrategy.concat
>   case m if m.startsWith("META-INF") => MergeStrategy.discard
>   //case x =>
>   //  val oldStrategy = (assemblyMergeStrategy in assembly).value
>   //  oldStrategy(x)
>   case _ => MergeStrategy.first
> }
>
>
> As I said, this code works without problem when I execute it inside intellij
> idea. But when generate the assembly jar with sbt-assembly and
>
> use spark-submit I got the following error :
>
> 15/04/17 09:34:01 INFO ParquetOutputFormat: Writer version is: PARQUET_1_0
> 15/04/17 09:34:01 ERROR Executor: Exception in task 3.0 in stage 2.0 (TID 7)
> java.lang.IllegalArgumentException: Unsupported dataType:
> {"type":"struct","fields":[{"name":"imageAnnotation","type":{"type":"udt","class":"MyDenseVectorUDT","pyClass":null,"sqlType":{"type":"array","elementType":"double","containsNull":false}},"nullable":true,"metadata":{}}]},
> [1.1] failure: `TimestampType' expected but `{' found
>
> {"type":"struct","fields":[{"name":"imageAnnotation","type":{"type":"udt","class":"MyDenseVectorUDT","pyClass":null,"sqlType":{"type":"array","elementType":"double","containsNull":false}},"nullable":true,"metadata":{}}]}
> ^
>       at
> org.apache.spark.sql.types.DataType$CaseClassStringParser$.apply(dataTypes.scala:163)
>       at
> org.apache.spark.sql.types.DataType$.fromCaseClassString(dataTypes.scala:98)
>       at
> org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$6.apply(ParquetTypes.scala:402)
>       at
> org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$6.apply(ParquetTypes.scala:402)
>       at scala.util.Try.getOrElse(Try.scala:77)
>       at
> org.apache.spark.sql.parquet.ParquetTypesConverter$.convertFromString(ParquetTypes.scala:402)
>       at
> org.apache.spark.sql.parquet.RowWriteSupport.init(ParquetTableSupport.scala:145)
>       at
> parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:278)
>       at
> parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252)
>       at
> org.apache.spark.sql.parquet.ParquetRelation2.org$apache$spark$sql$parquet$ParquetRelation2$$writeShard$1(newParquet.scala:694)
>       at
> org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:716)
>       at
> org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:716)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>       at org.apache.spark.scheduler.Task.run(Task.scala:64)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>       at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>       at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>       at java.lang.Thread.run(Thread.java:745)
>
>
> The issue seems to be related to the generation of the assembly jar but I
> just can't figure out how to fix it. Any ideas will be helpfull.
>
> Best regards,
>
>
> Jao

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to