In this example, every thing work expect save to parquet file. On Mon, May 11, 2015 at 4:39 PM, Jaonary Rabarisoa <jaon...@gmail.com> wrote:
> MyDenseVectorUDT do exist in the assembly jar and in this example all the > code is in a single file to make sure every thing is included. > > On Tue, Apr 21, 2015 at 1:17 AM, Xiangrui Meng <men...@gmail.com> wrote: > >> You should check where MyDenseVectorUDT is defined and whether it was >> on the classpath (or in the assembly jar) at runtime. Make sure the >> full class name (with package name) is used. Btw, UDTs are not public >> yet, so please use it with caution. -Xiangrui >> >> On Fri, Apr 17, 2015 at 12:45 AM, Jaonary Rabarisoa <jaon...@gmail.com> >> wrote: >> > Dear all, >> > >> > Here is an example of code to reproduce the issue I mentioned in a >> previous >> > mail about saving an UserDefinedType into a parquet file. The problem >> here >> > is that the code works when I run it inside intellij idea but fails >> when I >> > create the assembly jar and run it with spark-submit. I use the master >> > version of Spark. >> > >> > @SQLUserDefinedType(udt = classOf[MyDenseVectorUDT]) >> > class MyDenseVector(val data: Array[Double]) extends Serializable { >> > override def equals(other: Any): Boolean = other match { >> > case v: MyDenseVector => >> > java.util.Arrays.equals(this.data, v.data) >> > case _ => false >> > } >> > } >> > >> > class MyDenseVectorUDT extends UserDefinedType[MyDenseVector] { >> > override def sqlType: DataType = ArrayType(DoubleType, containsNull = >> > false) >> > override def serialize(obj: Any): Seq[Double] = { >> > obj match { >> > case features: MyDenseVector => >> > features.data.toSeq >> > } >> > } >> > >> > override def deserialize(datum: Any): MyDenseVector = { >> > datum match { >> > case data: Seq[_] => >> > new MyDenseVector(data.asInstanceOf[Seq[Double]].toArray) >> > } >> > } >> > >> > override def userClass: Class[MyDenseVector] = classOf[MyDenseVector] >> > >> > } >> > >> > case class Toto(imageAnnotation: MyDenseVector) >> > >> > object TestUserDefinedType { >> > >> > case class Params(input: String = null, >> > partitions: Int = 12, >> > outputDir: String = "images.parquet") >> > >> > def main(args: Array[String]): Unit = { >> > >> > val conf = new >> > SparkConf().setAppName("ImportImageFolder").setMaster("local[4]") >> > >> > val sc = new SparkContext(conf) >> > val sqlContext = new SQLContext(sc) >> > >> > import sqlContext.implicits._ >> > >> > val rawImages = sc.parallelize((1 to 5).map(x => Toto(new >> > MyDenseVector(Array[Double](x.toDouble))))).toDF >> > >> > rawImages.printSchema() >> > >> > rawImages.show() >> > >> > rawImages.save("toto.parquet") // This fails with assembly jar >> > sc.stop() >> > >> > } >> > } >> > >> > >> > My build.sbt is as follow : >> > >> > libraryDependencies ++= Seq( >> > "org.apache.spark" %% "spark-core" % sparkVersion % "provided", >> > "org.apache.spark" %% "spark-sql" % sparkVersion, >> > "org.apache.spark" %% "spark-mllib" % sparkVersion >> > ) >> > >> > assemblyMergeStrategy in assembly := { >> > case PathList("javax", "servlet", xs @ _*) => MergeStrategy.first >> > case PathList("org", "apache", xs @ _*) => MergeStrategy.first >> > case PathList("org", "jboss", xs @ _*) => MergeStrategy.first >> > // case PathList(ps @ _*) if ps.last endsWith ".html" => >> > MergeStrategy.first >> > // case "application.conf" => >> > MergeStrategy.concat >> > case m if m.startsWith("META-INF") => MergeStrategy.discard >> > //case x => >> > // val oldStrategy = (assemblyMergeStrategy in assembly).value >> > // oldStrategy(x) >> > case _ => MergeStrategy.first >> > } >> > >> > >> > As I said, this code works without problem when I execute it inside >> intellij >> > idea. But when generate the assembly jar with sbt-assembly and >> > >> > use spark-submit I got the following error : >> > >> > 15/04/17 09:34:01 INFO ParquetOutputFormat: Writer version is: >> PARQUET_1_0 >> > 15/04/17 09:34:01 ERROR Executor: Exception in task 3.0 in stage 2.0 >> (TID 7) >> > java.lang.IllegalArgumentException: Unsupported dataType: >> > >> {"type":"struct","fields":[{"name":"imageAnnotation","type":{"type":"udt","class":"MyDenseVectorUDT","pyClass":null,"sqlType":{"type":"array","elementType":"double","containsNull":false}},"nullable":true,"metadata":{}}]}, >> > [1.1] failure: `TimestampType' expected but `{' found >> > >> > >> {"type":"struct","fields":[{"name":"imageAnnotation","type":{"type":"udt","class":"MyDenseVectorUDT","pyClass":null,"sqlType":{"type":"array","elementType":"double","containsNull":false}},"nullable":true,"metadata":{}}]} >> > ^ >> > at >> > >> org.apache.spark.sql.types.DataType$CaseClassStringParser$.apply(dataTypes.scala:163) >> > at >> > >> org.apache.spark.sql.types.DataType$.fromCaseClassString(dataTypes.scala:98) >> > at >> > >> org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$6.apply(ParquetTypes.scala:402) >> > at >> > >> org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$6.apply(ParquetTypes.scala:402) >> > at scala.util.Try.getOrElse(Try.scala:77) >> > at >> > >> org.apache.spark.sql.parquet.ParquetTypesConverter$.convertFromString(ParquetTypes.scala:402) >> > at >> > >> org.apache.spark.sql.parquet.RowWriteSupport.init(ParquetTableSupport.scala:145) >> > at >> > >> parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:278) >> > at >> > >> parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252) >> > at >> > org.apache.spark.sql.parquet.ParquetRelation2.org >> $apache$spark$sql$parquet$ParquetRelation2$$writeShard$1(newParquet.scala:694) >> > at >> > >> org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:716) >> > at >> > >> org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:716) >> > at >> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) >> > at org.apache.spark.scheduler.Task.run(Task.scala:64) >> > at >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) >> > at >> > >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >> > at >> > >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >> > at java.lang.Thread.run(Thread.java:745) >> > >> > >> > The issue seems to be related to the generation of the assembly jar but >> I >> > just can't figure out how to fix it. Any ideas will be helpfull. >> > >> > Best regards, >> > >> > >> > Jao >> > >