MyDenseVectorUDT do exist in the assembly jar and in this example all the
code is in a single file to make sure every thing is included.

On Tue, Apr 21, 2015 at 1:17 AM, Xiangrui Meng <men...@gmail.com> wrote:

> You should check where MyDenseVectorUDT is defined and whether it was
> on the classpath (or in the assembly jar) at runtime. Make sure the
> full class name (with package name) is used. Btw, UDTs are not public
> yet, so please use it with caution. -Xiangrui
>
> On Fri, Apr 17, 2015 at 12:45 AM, Jaonary Rabarisoa <jaon...@gmail.com>
> wrote:
> > Dear all,
> >
> > Here is an example of code to reproduce the issue I mentioned in a
> previous
> > mail about saving an UserDefinedType into a parquet file. The problem
> here
> > is that the code works when I run it inside intellij idea but fails when
> I
> > create the assembly jar and run it with spark-submit. I use the master
> > version of  Spark.
> >
> > @SQLUserDefinedType(udt = classOf[MyDenseVectorUDT])
> > class MyDenseVector(val data: Array[Double]) extends Serializable {
> >   override def equals(other: Any): Boolean = other match {
> >     case v: MyDenseVector =>
> >       java.util.Arrays.equals(this.data, v.data)
> >     case _ => false
> >   }
> > }
> >
> > class MyDenseVectorUDT extends UserDefinedType[MyDenseVector] {
> >   override def sqlType: DataType = ArrayType(DoubleType, containsNull =
> > false)
> >   override def serialize(obj: Any): Seq[Double] = {
> >     obj match {
> >       case features: MyDenseVector =>
> >         features.data.toSeq
> >     }
> >   }
> >
> >   override def deserialize(datum: Any): MyDenseVector = {
> >     datum match {
> >       case data: Seq[_] =>
> >         new MyDenseVector(data.asInstanceOf[Seq[Double]].toArray)
> >     }
> >   }
> >
> >   override def userClass: Class[MyDenseVector] = classOf[MyDenseVector]
> >
> > }
> >
> > case class Toto(imageAnnotation: MyDenseVector)
> >
> > object TestUserDefinedType {
> >
> >   case class Params(input: String = null,
> >                    partitions: Int = 12,
> >                     outputDir: String = "images.parquet")
> >
> >   def main(args: Array[String]): Unit = {
> >
> >     val conf = new
> > SparkConf().setAppName("ImportImageFolder").setMaster("local[4]")
> >
> >     val sc = new SparkContext(conf)
> >     val sqlContext = new SQLContext(sc)
> >
> >     import sqlContext.implicits._
> >
> >     val rawImages = sc.parallelize((1 to 5).map(x => Toto(new
> > MyDenseVector(Array[Double](x.toDouble))))).toDF
> >
> >     rawImages.printSchema()
> >
> >     rawImages.show()
> >
> >     rawImages.save("toto.parquet") // This fails with assembly jar
> >     sc.stop()
> >
> >   }
> > }
> >
> >
> > My build.sbt is as follow :
> >
> > libraryDependencies ++= Seq(
> >   "org.apache.spark" %% "spark-core" % sparkVersion % "provided",
> >   "org.apache.spark" %% "spark-sql" % sparkVersion,
> >   "org.apache.spark" %% "spark-mllib" % sparkVersion
> > )
> >
> > assemblyMergeStrategy in assembly := {
> >   case PathList("javax", "servlet", xs @ _*) => MergeStrategy.first
> >   case PathList("org", "apache", xs @ _*) => MergeStrategy.first
> >   case PathList("org", "jboss", xs @ _*) => MergeStrategy.first
> > //  case PathList(ps @ _*) if ps.last endsWith ".html" =>
> > MergeStrategy.first
> > //  case "application.conf"                            =>
> > MergeStrategy.concat
> >   case m if m.startsWith("META-INF") => MergeStrategy.discard
> >   //case x =>
> >   //  val oldStrategy = (assemblyMergeStrategy in assembly).value
> >   //  oldStrategy(x)
> >   case _ => MergeStrategy.first
> > }
> >
> >
> > As I said, this code works without problem when I execute it inside
> intellij
> > idea. But when generate the assembly jar with sbt-assembly and
> >
> > use spark-submit I got the following error :
> >
> > 15/04/17 09:34:01 INFO ParquetOutputFormat: Writer version is:
> PARQUET_1_0
> > 15/04/17 09:34:01 ERROR Executor: Exception in task 3.0 in stage 2.0
> (TID 7)
> > java.lang.IllegalArgumentException: Unsupported dataType:
> >
> {"type":"struct","fields":[{"name":"imageAnnotation","type":{"type":"udt","class":"MyDenseVectorUDT","pyClass":null,"sqlType":{"type":"array","elementType":"double","containsNull":false}},"nullable":true,"metadata":{}}]},
> > [1.1] failure: `TimestampType' expected but `{' found
> >
> >
> {"type":"struct","fields":[{"name":"imageAnnotation","type":{"type":"udt","class":"MyDenseVectorUDT","pyClass":null,"sqlType":{"type":"array","elementType":"double","containsNull":false}},"nullable":true,"metadata":{}}]}
> > ^
> >       at
> >
> org.apache.spark.sql.types.DataType$CaseClassStringParser$.apply(dataTypes.scala:163)
> >       at
> >
> org.apache.spark.sql.types.DataType$.fromCaseClassString(dataTypes.scala:98)
> >       at
> >
> org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$6.apply(ParquetTypes.scala:402)
> >       at
> >
> org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$6.apply(ParquetTypes.scala:402)
> >       at scala.util.Try.getOrElse(Try.scala:77)
> >       at
> >
> org.apache.spark.sql.parquet.ParquetTypesConverter$.convertFromString(ParquetTypes.scala:402)
> >       at
> >
> org.apache.spark.sql.parquet.RowWriteSupport.init(ParquetTableSupport.scala:145)
> >       at
> >
> parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:278)
> >       at
> >
> parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252)
> >       at
> > org.apache.spark.sql.parquet.ParquetRelation2.org
> $apache$spark$sql$parquet$ParquetRelation2$$writeShard$1(newParquet.scala:694)
> >       at
> >
> org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:716)
> >       at
> >
> org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:716)
> >       at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> >       at org.apache.spark.scheduler.Task.run(Task.scala:64)
> >       at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> >       at
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> >       at
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> >       at java.lang.Thread.run(Thread.java:745)
> >
> >
> > The issue seems to be related to the generation of the assembly jar but I
> > just can't figure out how to fix it. Any ideas will be helpfull.
> >
> > Best regards,
> >
> >
> > Jao
>

Reply via email to