MyDenseVectorUDT do exist in the assembly jar and in this example all the code is in a single file to make sure every thing is included.
On Tue, Apr 21, 2015 at 1:17 AM, Xiangrui Meng <men...@gmail.com> wrote: > You should check where MyDenseVectorUDT is defined and whether it was > on the classpath (or in the assembly jar) at runtime. Make sure the > full class name (with package name) is used. Btw, UDTs are not public > yet, so please use it with caution. -Xiangrui > > On Fri, Apr 17, 2015 at 12:45 AM, Jaonary Rabarisoa <jaon...@gmail.com> > wrote: > > Dear all, > > > > Here is an example of code to reproduce the issue I mentioned in a > previous > > mail about saving an UserDefinedType into a parquet file. The problem > here > > is that the code works when I run it inside intellij idea but fails when > I > > create the assembly jar and run it with spark-submit. I use the master > > version of Spark. > > > > @SQLUserDefinedType(udt = classOf[MyDenseVectorUDT]) > > class MyDenseVector(val data: Array[Double]) extends Serializable { > > override def equals(other: Any): Boolean = other match { > > case v: MyDenseVector => > > java.util.Arrays.equals(this.data, v.data) > > case _ => false > > } > > } > > > > class MyDenseVectorUDT extends UserDefinedType[MyDenseVector] { > > override def sqlType: DataType = ArrayType(DoubleType, containsNull = > > false) > > override def serialize(obj: Any): Seq[Double] = { > > obj match { > > case features: MyDenseVector => > > features.data.toSeq > > } > > } > > > > override def deserialize(datum: Any): MyDenseVector = { > > datum match { > > case data: Seq[_] => > > new MyDenseVector(data.asInstanceOf[Seq[Double]].toArray) > > } > > } > > > > override def userClass: Class[MyDenseVector] = classOf[MyDenseVector] > > > > } > > > > case class Toto(imageAnnotation: MyDenseVector) > > > > object TestUserDefinedType { > > > > case class Params(input: String = null, > > partitions: Int = 12, > > outputDir: String = "images.parquet") > > > > def main(args: Array[String]): Unit = { > > > > val conf = new > > SparkConf().setAppName("ImportImageFolder").setMaster("local[4]") > > > > val sc = new SparkContext(conf) > > val sqlContext = new SQLContext(sc) > > > > import sqlContext.implicits._ > > > > val rawImages = sc.parallelize((1 to 5).map(x => Toto(new > > MyDenseVector(Array[Double](x.toDouble))))).toDF > > > > rawImages.printSchema() > > > > rawImages.show() > > > > rawImages.save("toto.parquet") // This fails with assembly jar > > sc.stop() > > > > } > > } > > > > > > My build.sbt is as follow : > > > > libraryDependencies ++= Seq( > > "org.apache.spark" %% "spark-core" % sparkVersion % "provided", > > "org.apache.spark" %% "spark-sql" % sparkVersion, > > "org.apache.spark" %% "spark-mllib" % sparkVersion > > ) > > > > assemblyMergeStrategy in assembly := { > > case PathList("javax", "servlet", xs @ _*) => MergeStrategy.first > > case PathList("org", "apache", xs @ _*) => MergeStrategy.first > > case PathList("org", "jboss", xs @ _*) => MergeStrategy.first > > // case PathList(ps @ _*) if ps.last endsWith ".html" => > > MergeStrategy.first > > // case "application.conf" => > > MergeStrategy.concat > > case m if m.startsWith("META-INF") => MergeStrategy.discard > > //case x => > > // val oldStrategy = (assemblyMergeStrategy in assembly).value > > // oldStrategy(x) > > case _ => MergeStrategy.first > > } > > > > > > As I said, this code works without problem when I execute it inside > intellij > > idea. But when generate the assembly jar with sbt-assembly and > > > > use spark-submit I got the following error : > > > > 15/04/17 09:34:01 INFO ParquetOutputFormat: Writer version is: > PARQUET_1_0 > > 15/04/17 09:34:01 ERROR Executor: Exception in task 3.0 in stage 2.0 > (TID 7) > > java.lang.IllegalArgumentException: Unsupported dataType: > > > {"type":"struct","fields":[{"name":"imageAnnotation","type":{"type":"udt","class":"MyDenseVectorUDT","pyClass":null,"sqlType":{"type":"array","elementType":"double","containsNull":false}},"nullable":true,"metadata":{}}]}, > > [1.1] failure: `TimestampType' expected but `{' found > > > > > {"type":"struct","fields":[{"name":"imageAnnotation","type":{"type":"udt","class":"MyDenseVectorUDT","pyClass":null,"sqlType":{"type":"array","elementType":"double","containsNull":false}},"nullable":true,"metadata":{}}]} > > ^ > > at > > > org.apache.spark.sql.types.DataType$CaseClassStringParser$.apply(dataTypes.scala:163) > > at > > > org.apache.spark.sql.types.DataType$.fromCaseClassString(dataTypes.scala:98) > > at > > > org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$6.apply(ParquetTypes.scala:402) > > at > > > org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$6.apply(ParquetTypes.scala:402) > > at scala.util.Try.getOrElse(Try.scala:77) > > at > > > org.apache.spark.sql.parquet.ParquetTypesConverter$.convertFromString(ParquetTypes.scala:402) > > at > > > org.apache.spark.sql.parquet.RowWriteSupport.init(ParquetTableSupport.scala:145) > > at > > > parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:278) > > at > > > parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252) > > at > > org.apache.spark.sql.parquet.ParquetRelation2.org > $apache$spark$sql$parquet$ParquetRelation2$$writeShard$1(newParquet.scala:694) > > at > > > org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:716) > > at > > > org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:716) > > at > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) > > at org.apache.spark.scheduler.Task.run(Task.scala:64) > > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) > > at > > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > > at > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > > at java.lang.Thread.run(Thread.java:745) > > > > > > The issue seems to be related to the generation of the assembly jar but I > > just can't figure out how to fix it. Any ideas will be helpfull. > > > > Best regards, > > > > > > Jao >