Hey Jaonary, I saw this line in the error message:
org.apache.spark.sql.types.DataType$CaseClassStringParser$.apply(dataTypes.scala:163) CaseClassStringParser is only used in older versions of Spark to parse schema from JSON. So I suspect that the cluster was running on a old version of Spark when you use spark-submit to run your assembly jar. Best, Xiangrui On Mon, May 11, 2015 at 7:40 AM, Jaonary Rabarisoa <jaon...@gmail.com> wrote: > In this example, every thing work expect save to parquet file. > > On Mon, May 11, 2015 at 4:39 PM, Jaonary Rabarisoa <jaon...@gmail.com> > wrote: >> >> MyDenseVectorUDT do exist in the assembly jar and in this example all the >> code is in a single file to make sure every thing is included. >> >> On Tue, Apr 21, 2015 at 1:17 AM, Xiangrui Meng <men...@gmail.com> wrote: >>> >>> You should check where MyDenseVectorUDT is defined and whether it was >>> on the classpath (or in the assembly jar) at runtime. Make sure the >>> full class name (with package name) is used. Btw, UDTs are not public >>> yet, so please use it with caution. -Xiangrui >>> >>> On Fri, Apr 17, 2015 at 12:45 AM, Jaonary Rabarisoa <jaon...@gmail.com> >>> wrote: >>> > Dear all, >>> > >>> > Here is an example of code to reproduce the issue I mentioned in a >>> > previous >>> > mail about saving an UserDefinedType into a parquet file. The problem >>> > here >>> > is that the code works when I run it inside intellij idea but fails >>> > when I >>> > create the assembly jar and run it with spark-submit. I use the master >>> > version of Spark. >>> > >>> > @SQLUserDefinedType(udt = classOf[MyDenseVectorUDT]) >>> > class MyDenseVector(val data: Array[Double]) extends Serializable { >>> > override def equals(other: Any): Boolean = other match { >>> > case v: MyDenseVector => >>> > java.util.Arrays.equals(this.data, v.data) >>> > case _ => false >>> > } >>> > } >>> > >>> > class MyDenseVectorUDT extends UserDefinedType[MyDenseVector] { >>> > override def sqlType: DataType = ArrayType(DoubleType, containsNull = >>> > false) >>> > override def serialize(obj: Any): Seq[Double] = { >>> > obj match { >>> > case features: MyDenseVector => >>> > features.data.toSeq >>> > } >>> > } >>> > >>> > override def deserialize(datum: Any): MyDenseVector = { >>> > datum match { >>> > case data: Seq[_] => >>> > new MyDenseVector(data.asInstanceOf[Seq[Double]].toArray) >>> > } >>> > } >>> > >>> > override def userClass: Class[MyDenseVector] = classOf[MyDenseVector] >>> > >>> > } >>> > >>> > case class Toto(imageAnnotation: MyDenseVector) >>> > >>> > object TestUserDefinedType { >>> > >>> > case class Params(input: String = null, >>> > partitions: Int = 12, >>> > outputDir: String = "images.parquet") >>> > >>> > def main(args: Array[String]): Unit = { >>> > >>> > val conf = new >>> > SparkConf().setAppName("ImportImageFolder").setMaster("local[4]") >>> > >>> > val sc = new SparkContext(conf) >>> > val sqlContext = new SQLContext(sc) >>> > >>> > import sqlContext.implicits._ >>> > >>> > val rawImages = sc.parallelize((1 to 5).map(x => Toto(new >>> > MyDenseVector(Array[Double](x.toDouble))))).toDF >>> > >>> > rawImages.printSchema() >>> > >>> > rawImages.show() >>> > >>> > rawImages.save("toto.parquet") // This fails with assembly jar >>> > sc.stop() >>> > >>> > } >>> > } >>> > >>> > >>> > My build.sbt is as follow : >>> > >>> > libraryDependencies ++= Seq( >>> > "org.apache.spark" %% "spark-core" % sparkVersion % "provided", >>> > "org.apache.spark" %% "spark-sql" % sparkVersion, >>> > "org.apache.spark" %% "spark-mllib" % sparkVersion >>> > ) >>> > >>> > assemblyMergeStrategy in assembly := { >>> > case PathList("javax", "servlet", xs @ _*) => MergeStrategy.first >>> > case PathList("org", "apache", xs @ _*) => MergeStrategy.first >>> > case PathList("org", "jboss", xs @ _*) => MergeStrategy.first >>> > // case PathList(ps @ _*) if ps.last endsWith ".html" => >>> > MergeStrategy.first >>> > // case "application.conf" => >>> > MergeStrategy.concat >>> > case m if m.startsWith("META-INF") => MergeStrategy.discard >>> > //case x => >>> > // val oldStrategy = (assemblyMergeStrategy in assembly).value >>> > // oldStrategy(x) >>> > case _ => MergeStrategy.first >>> > } >>> > >>> > >>> > As I said, this code works without problem when I execute it inside >>> > intellij >>> > idea. But when generate the assembly jar with sbt-assembly and >>> > >>> > use spark-submit I got the following error : >>> > >>> > 15/04/17 09:34:01 INFO ParquetOutputFormat: Writer version is: >>> > PARQUET_1_0 >>> > 15/04/17 09:34:01 ERROR Executor: Exception in task 3.0 in stage 2.0 >>> > (TID 7) >>> > java.lang.IllegalArgumentException: Unsupported dataType: >>> > >>> > {"type":"struct","fields":[{"name":"imageAnnotation","type":{"type":"udt","class":"MyDenseVectorUDT","pyClass":null,"sqlType":{"type":"array","elementType":"double","containsNull":false}},"nullable":true,"metadata":{}}]}, >>> > [1.1] failure: `TimestampType' expected but `{' found >>> > >>> > >>> > {"type":"struct","fields":[{"name":"imageAnnotation","type":{"type":"udt","class":"MyDenseVectorUDT","pyClass":null,"sqlType":{"type":"array","elementType":"double","containsNull":false}},"nullable":true,"metadata":{}}]} >>> > ^ >>> > at >>> > >>> > org.apache.spark.sql.types.DataType$CaseClassStringParser$.apply(dataTypes.scala:163) >>> > at >>> > >>> > org.apache.spark.sql.types.DataType$.fromCaseClassString(dataTypes.scala:98) >>> > at >>> > >>> > org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$6.apply(ParquetTypes.scala:402) >>> > at >>> > >>> > org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$6.apply(ParquetTypes.scala:402) >>> > at scala.util.Try.getOrElse(Try.scala:77) >>> > at >>> > >>> > org.apache.spark.sql.parquet.ParquetTypesConverter$.convertFromString(ParquetTypes.scala:402) >>> > at >>> > >>> > org.apache.spark.sql.parquet.RowWriteSupport.init(ParquetTableSupport.scala:145) >>> > at >>> > >>> > parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:278) >>> > at >>> > >>> > parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252) >>> > at >>> > >>> > org.apache.spark.sql.parquet.ParquetRelation2.org$apache$spark$sql$parquet$ParquetRelation2$$writeShard$1(newParquet.scala:694) >>> > at >>> > >>> > org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:716) >>> > at >>> > >>> > org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:716) >>> > at >>> > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) >>> > at org.apache.spark.scheduler.Task.run(Task.scala:64) >>> > at >>> > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) >>> > at >>> > >>> > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>> > at >>> > >>> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>> > at java.lang.Thread.run(Thread.java:745) >>> > >>> > >>> > The issue seems to be related to the generation of the assembly jar but >>> > I >>> > just can't figure out how to fix it. Any ideas will be helpfull. >>> > >>> > Best regards, >>> > >>> > >>> > Jao >> >> > --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org