Hey Jaonary,

I saw this line in the error message:

org.apache.spark.sql.types.DataType$CaseClassStringParser$.apply(dataTypes.scala:163)

CaseClassStringParser is only used in older versions of Spark to parse
schema from JSON. So I suspect that the cluster was running on a old
version of Spark when you use spark-submit to run your assembly jar.

Best,
Xiangrui

On Mon, May 11, 2015 at 7:40 AM, Jaonary Rabarisoa <jaon...@gmail.com> wrote:
> In this example, every thing work expect save to parquet file.
>
> On Mon, May 11, 2015 at 4:39 PM, Jaonary Rabarisoa <jaon...@gmail.com>
> wrote:
>>
>> MyDenseVectorUDT do exist in the assembly jar and in this example all the
>> code is in a single file to make sure every thing is included.
>>
>> On Tue, Apr 21, 2015 at 1:17 AM, Xiangrui Meng <men...@gmail.com> wrote:
>>>
>>> You should check where MyDenseVectorUDT is defined and whether it was
>>> on the classpath (or in the assembly jar) at runtime. Make sure the
>>> full class name (with package name) is used. Btw, UDTs are not public
>>> yet, so please use it with caution. -Xiangrui
>>>
>>> On Fri, Apr 17, 2015 at 12:45 AM, Jaonary Rabarisoa <jaon...@gmail.com>
>>> wrote:
>>> > Dear all,
>>> >
>>> > Here is an example of code to reproduce the issue I mentioned in a
>>> > previous
>>> > mail about saving an UserDefinedType into a parquet file. The problem
>>> > here
>>> > is that the code works when I run it inside intellij idea but fails
>>> > when I
>>> > create the assembly jar and run it with spark-submit. I use the master
>>> > version of  Spark.
>>> >
>>> > @SQLUserDefinedType(udt = classOf[MyDenseVectorUDT])
>>> > class MyDenseVector(val data: Array[Double]) extends Serializable {
>>> >   override def equals(other: Any): Boolean = other match {
>>> >     case v: MyDenseVector =>
>>> >       java.util.Arrays.equals(this.data, v.data)
>>> >     case _ => false
>>> >   }
>>> > }
>>> >
>>> > class MyDenseVectorUDT extends UserDefinedType[MyDenseVector] {
>>> >   override def sqlType: DataType = ArrayType(DoubleType, containsNull =
>>> > false)
>>> >   override def serialize(obj: Any): Seq[Double] = {
>>> >     obj match {
>>> >       case features: MyDenseVector =>
>>> >         features.data.toSeq
>>> >     }
>>> >   }
>>> >
>>> >   override def deserialize(datum: Any): MyDenseVector = {
>>> >     datum match {
>>> >       case data: Seq[_] =>
>>> >         new MyDenseVector(data.asInstanceOf[Seq[Double]].toArray)
>>> >     }
>>> >   }
>>> >
>>> >   override def userClass: Class[MyDenseVector] = classOf[MyDenseVector]
>>> >
>>> > }
>>> >
>>> > case class Toto(imageAnnotation: MyDenseVector)
>>> >
>>> > object TestUserDefinedType {
>>> >
>>> >   case class Params(input: String = null,
>>> >                    partitions: Int = 12,
>>> >                     outputDir: String = "images.parquet")
>>> >
>>> >   def main(args: Array[String]): Unit = {
>>> >
>>> >     val conf = new
>>> > SparkConf().setAppName("ImportImageFolder").setMaster("local[4]")
>>> >
>>> >     val sc = new SparkContext(conf)
>>> >     val sqlContext = new SQLContext(sc)
>>> >
>>> >     import sqlContext.implicits._
>>> >
>>> >     val rawImages = sc.parallelize((1 to 5).map(x => Toto(new
>>> > MyDenseVector(Array[Double](x.toDouble))))).toDF
>>> >
>>> >     rawImages.printSchema()
>>> >
>>> >     rawImages.show()
>>> >
>>> >     rawImages.save("toto.parquet") // This fails with assembly jar
>>> >     sc.stop()
>>> >
>>> >   }
>>> > }
>>> >
>>> >
>>> > My build.sbt is as follow :
>>> >
>>> > libraryDependencies ++= Seq(
>>> >   "org.apache.spark" %% "spark-core" % sparkVersion % "provided",
>>> >   "org.apache.spark" %% "spark-sql" % sparkVersion,
>>> >   "org.apache.spark" %% "spark-mllib" % sparkVersion
>>> > )
>>> >
>>> > assemblyMergeStrategy in assembly := {
>>> >   case PathList("javax", "servlet", xs @ _*) => MergeStrategy.first
>>> >   case PathList("org", "apache", xs @ _*) => MergeStrategy.first
>>> >   case PathList("org", "jboss", xs @ _*) => MergeStrategy.first
>>> > //  case PathList(ps @ _*) if ps.last endsWith ".html" =>
>>> > MergeStrategy.first
>>> > //  case "application.conf"                            =>
>>> > MergeStrategy.concat
>>> >   case m if m.startsWith("META-INF") => MergeStrategy.discard
>>> >   //case x =>
>>> >   //  val oldStrategy = (assemblyMergeStrategy in assembly).value
>>> >   //  oldStrategy(x)
>>> >   case _ => MergeStrategy.first
>>> > }
>>> >
>>> >
>>> > As I said, this code works without problem when I execute it inside
>>> > intellij
>>> > idea. But when generate the assembly jar with sbt-assembly and
>>> >
>>> > use spark-submit I got the following error :
>>> >
>>> > 15/04/17 09:34:01 INFO ParquetOutputFormat: Writer version is:
>>> > PARQUET_1_0
>>> > 15/04/17 09:34:01 ERROR Executor: Exception in task 3.0 in stage 2.0
>>> > (TID 7)
>>> > java.lang.IllegalArgumentException: Unsupported dataType:
>>> >
>>> > {"type":"struct","fields":[{"name":"imageAnnotation","type":{"type":"udt","class":"MyDenseVectorUDT","pyClass":null,"sqlType":{"type":"array","elementType":"double","containsNull":false}},"nullable":true,"metadata":{}}]},
>>> > [1.1] failure: `TimestampType' expected but `{' found
>>> >
>>> >
>>> > {"type":"struct","fields":[{"name":"imageAnnotation","type":{"type":"udt","class":"MyDenseVectorUDT","pyClass":null,"sqlType":{"type":"array","elementType":"double","containsNull":false}},"nullable":true,"metadata":{}}]}
>>> > ^
>>> >       at
>>> >
>>> > org.apache.spark.sql.types.DataType$CaseClassStringParser$.apply(dataTypes.scala:163)
>>> >       at
>>> >
>>> > org.apache.spark.sql.types.DataType$.fromCaseClassString(dataTypes.scala:98)
>>> >       at
>>> >
>>> > org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$6.apply(ParquetTypes.scala:402)
>>> >       at
>>> >
>>> > org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$6.apply(ParquetTypes.scala:402)
>>> >       at scala.util.Try.getOrElse(Try.scala:77)
>>> >       at
>>> >
>>> > org.apache.spark.sql.parquet.ParquetTypesConverter$.convertFromString(ParquetTypes.scala:402)
>>> >       at
>>> >
>>> > org.apache.spark.sql.parquet.RowWriteSupport.init(ParquetTableSupport.scala:145)
>>> >       at
>>> >
>>> > parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:278)
>>> >       at
>>> >
>>> > parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252)
>>> >       at
>>> >
>>> > org.apache.spark.sql.parquet.ParquetRelation2.org$apache$spark$sql$parquet$ParquetRelation2$$writeShard$1(newParquet.scala:694)
>>> >       at
>>> >
>>> > org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:716)
>>> >       at
>>> >
>>> > org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:716)
>>> >       at
>>> > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>> >       at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>> >       at
>>> > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>> >       at
>>> >
>>> > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>> >       at
>>> >
>>> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>> >       at java.lang.Thread.run(Thread.java:745)
>>> >
>>> >
>>> > The issue seems to be related to the generation of the assembly jar but
>>> > I
>>> > just can't figure out how to fix it. Any ideas will be helpfull.
>>> >
>>> > Best regards,
>>> >
>>> >
>>> > Jao
>>
>>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to