In this example, every thing work expect save to parquet file.

On Mon, May 11, 2015 at 4:39 PM, Jaonary Rabarisoa <jaon...@gmail.com>
wrote:

> MyDenseVectorUDT do exist in the assembly jar and in this example all the
> code is in a single file to make sure every thing is included.
>
> On Tue, Apr 21, 2015 at 1:17 AM, Xiangrui Meng <men...@gmail.com> wrote:
>
>> You should check where MyDenseVectorUDT is defined and whether it was
>> on the classpath (or in the assembly jar) at runtime. Make sure the
>> full class name (with package name) is used. Btw, UDTs are not public
>> yet, so please use it with caution. -Xiangrui
>>
>> On Fri, Apr 17, 2015 at 12:45 AM, Jaonary Rabarisoa <jaon...@gmail.com>
>> wrote:
>> > Dear all,
>> >
>> > Here is an example of code to reproduce the issue I mentioned in a
>> previous
>> > mail about saving an UserDefinedType into a parquet file. The problem
>> here
>> > is that the code works when I run it inside intellij idea but fails
>> when I
>> > create the assembly jar and run it with spark-submit. I use the master
>> > version of  Spark.
>> >
>> > @SQLUserDefinedType(udt = classOf[MyDenseVectorUDT])
>> > class MyDenseVector(val data: Array[Double]) extends Serializable {
>> >   override def equals(other: Any): Boolean = other match {
>> >     case v: MyDenseVector =>
>> >       java.util.Arrays.equals(this.data, v.data)
>> >     case _ => false
>> >   }
>> > }
>> >
>> > class MyDenseVectorUDT extends UserDefinedType[MyDenseVector] {
>> >   override def sqlType: DataType = ArrayType(DoubleType, containsNull =
>> > false)
>> >   override def serialize(obj: Any): Seq[Double] = {
>> >     obj match {
>> >       case features: MyDenseVector =>
>> >         features.data.toSeq
>> >     }
>> >   }
>> >
>> >   override def deserialize(datum: Any): MyDenseVector = {
>> >     datum match {
>> >       case data: Seq[_] =>
>> >         new MyDenseVector(data.asInstanceOf[Seq[Double]].toArray)
>> >     }
>> >   }
>> >
>> >   override def userClass: Class[MyDenseVector] = classOf[MyDenseVector]
>> >
>> > }
>> >
>> > case class Toto(imageAnnotation: MyDenseVector)
>> >
>> > object TestUserDefinedType {
>> >
>> >   case class Params(input: String = null,
>> >                    partitions: Int = 12,
>> >                     outputDir: String = "images.parquet")
>> >
>> >   def main(args: Array[String]): Unit = {
>> >
>> >     val conf = new
>> > SparkConf().setAppName("ImportImageFolder").setMaster("local[4]")
>> >
>> >     val sc = new SparkContext(conf)
>> >     val sqlContext = new SQLContext(sc)
>> >
>> >     import sqlContext.implicits._
>> >
>> >     val rawImages = sc.parallelize((1 to 5).map(x => Toto(new
>> > MyDenseVector(Array[Double](x.toDouble))))).toDF
>> >
>> >     rawImages.printSchema()
>> >
>> >     rawImages.show()
>> >
>> >     rawImages.save("toto.parquet") // This fails with assembly jar
>> >     sc.stop()
>> >
>> >   }
>> > }
>> >
>> >
>> > My build.sbt is as follow :
>> >
>> > libraryDependencies ++= Seq(
>> >   "org.apache.spark" %% "spark-core" % sparkVersion % "provided",
>> >   "org.apache.spark" %% "spark-sql" % sparkVersion,
>> >   "org.apache.spark" %% "spark-mllib" % sparkVersion
>> > )
>> >
>> > assemblyMergeStrategy in assembly := {
>> >   case PathList("javax", "servlet", xs @ _*) => MergeStrategy.first
>> >   case PathList("org", "apache", xs @ _*) => MergeStrategy.first
>> >   case PathList("org", "jboss", xs @ _*) => MergeStrategy.first
>> > //  case PathList(ps @ _*) if ps.last endsWith ".html" =>
>> > MergeStrategy.first
>> > //  case "application.conf"                            =>
>> > MergeStrategy.concat
>> >   case m if m.startsWith("META-INF") => MergeStrategy.discard
>> >   //case x =>
>> >   //  val oldStrategy = (assemblyMergeStrategy in assembly).value
>> >   //  oldStrategy(x)
>> >   case _ => MergeStrategy.first
>> > }
>> >
>> >
>> > As I said, this code works without problem when I execute it inside
>> intellij
>> > idea. But when generate the assembly jar with sbt-assembly and
>> >
>> > use spark-submit I got the following error :
>> >
>> > 15/04/17 09:34:01 INFO ParquetOutputFormat: Writer version is:
>> PARQUET_1_0
>> > 15/04/17 09:34:01 ERROR Executor: Exception in task 3.0 in stage 2.0
>> (TID 7)
>> > java.lang.IllegalArgumentException: Unsupported dataType:
>> >
>> {"type":"struct","fields":[{"name":"imageAnnotation","type":{"type":"udt","class":"MyDenseVectorUDT","pyClass":null,"sqlType":{"type":"array","elementType":"double","containsNull":false}},"nullable":true,"metadata":{}}]},
>> > [1.1] failure: `TimestampType' expected but `{' found
>> >
>> >
>> {"type":"struct","fields":[{"name":"imageAnnotation","type":{"type":"udt","class":"MyDenseVectorUDT","pyClass":null,"sqlType":{"type":"array","elementType":"double","containsNull":false}},"nullable":true,"metadata":{}}]}
>> > ^
>> >       at
>> >
>> org.apache.spark.sql.types.DataType$CaseClassStringParser$.apply(dataTypes.scala:163)
>> >       at
>> >
>> org.apache.spark.sql.types.DataType$.fromCaseClassString(dataTypes.scala:98)
>> >       at
>> >
>> org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$6.apply(ParquetTypes.scala:402)
>> >       at
>> >
>> org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$6.apply(ParquetTypes.scala:402)
>> >       at scala.util.Try.getOrElse(Try.scala:77)
>> >       at
>> >
>> org.apache.spark.sql.parquet.ParquetTypesConverter$.convertFromString(ParquetTypes.scala:402)
>> >       at
>> >
>> org.apache.spark.sql.parquet.RowWriteSupport.init(ParquetTableSupport.scala:145)
>> >       at
>> >
>> parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:278)
>> >       at
>> >
>> parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252)
>> >       at
>> > org.apache.spark.sql.parquet.ParquetRelation2.org
>> $apache$spark$sql$parquet$ParquetRelation2$$writeShard$1(newParquet.scala:694)
>> >       at
>> >
>> org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:716)
>> >       at
>> >
>> org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:716)
>> >       at
>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>> >       at org.apache.spark.scheduler.Task.run(Task.scala:64)
>> >       at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>> >       at
>> >
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> >       at
>> >
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> >       at java.lang.Thread.run(Thread.java:745)
>> >
>> >
>> > The issue seems to be related to the generation of the assembly jar but
>> I
>> > just can't figure out how to fix it. Any ideas will be helpfull.
>> >
>> > Best regards,
>> >
>> >
>> > Jao
>>
>
>

Reply via email to