You probably don't cause a shuffle (which requires serialization) unless
there is a join or group by.

It's possible that we are need to pass the spark class loader to kryo when
creating a new instance (you can get it from Utils I believe).  We never
run Otto this problem since this API is not public yet.  I'd start by
looking in SparkSqlSerializer.
On Mar 18, 2015 1:13 AM, "Zia Ur Rehman Kayani" <zia.kay...@platalytics.com>
wrote:

> Thanks for your reply. I've tried this as well, by passing the JAR file
> path to *spark.executor.extraClassPath *but it doesn't help me out,
> actually I've figured it out that custom UDT works fine if I use only one
> RDD (table). the issue arises when we join two or more RDDs. According to
> this <https://datastax-oss.atlassian.net/browse/SPARKC-23>, its is a bug
> when we use custom ROW and use JOIN. But the solution proposed isn't
> working in my case.
>
> Any clue ?
>
>
> On Tue, Mar 17, 2015 at 10:19 PM, Michael Armbrust <mich...@databricks.com
> > wrote:
>
>> I'll caution you that this is not a stable public API.
>>
>> That said, it seems that the issue is that you have not copied the jar
>> file containing your class to all of the executors.  You should not need to
>> do any special configuration of serialization (you can't for SQL, as we
>> hard code it for performance, since we generally know all the types that
>> are going to be shipped)
>>
>> On Tue, Mar 17, 2015 at 5:17 AM, zia_kayani <zia.kay...@platalytics.com>
>> wrote:
>>
>>> Hi,
>>> I want to introduce custom type for SchemaRDD, I'm following  this
>>> <
>>> https://github.com/apache/spark/blob/branch-1.2/sql/core/src/main/scala/org/apache/spark/sql/test/ExamplePointUDT.scala
>>> >
>>> example. But I'm having Kryo Serialization issues, here is stack trace:
>>>
>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>>> 0 in
>>> stage 6.0 failed 1 times, most recent failure:
>>>     Lost task 0.0 in stage 6.0 (TID 22, localhost):
>>>     *com.esotericsoftware.kryo.KryoException: Unable to find class:
>>> com.gis.io.GeometryWritable*
>>>     Serialization trace:
>>>     value (org.apache.spark.sql.catalyst.expressions.MutableAny)
>>>     values (org.apache.spark.sql.catalyst.expressions.SpecificMutableRow)
>>>        at
>>>
>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
>>>        at
>>>
>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
>>>        at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610)
>>>        at
>>>
>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:599)
>>>        at
>>>
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>>>        at
>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
>>>        at
>>>
>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
>>>        at
>>>
>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
>>>        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:651)
>>>        at
>>>
>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
>>>        at
>>>
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>>>        at
>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
>>>        at
>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42)
>>>        at
>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
>>>        at
>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
>>>        at
>>>
>>> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:142)
>>>        at
>>>
>>> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
>>>        at
>>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>>        at
>>>
>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>>>        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>        at
>>>
>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>>>        at
>>>
>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>>        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>        at
>>>
>>> org.apache.spark.sql.execution.joins.HashedRelation$.apply(HashedRelation.scala:80)
>>>        at
>>>
>>> org.apache.spark.sql.execution.joins.ShuffledHashJoin$$anonfun$execute$1.apply(ShuffledHashJoin.scala:46)
>>>        at
>>>
>>> org.apache.spark.sql.execution.joins.ShuffledHashJoin$$anonfun$execute$1.apply(ShuffledHashJoin.scala:45)
>>>        at
>>>
>>> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
>>>        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>>>        at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>>>        at
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>>>        at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>>>        at
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>>>        at
>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
>>>        at org.apache.spark.rdd.RDD.iterator(RDD.scala:228)
>>>        at
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>>>        at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>>>        at
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>>>        at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>>>        at
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>>>        at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>>>        at
>>>
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>>>        at
>>>
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>>        at org.apache.spark.scheduler.Task.run(Task.scala:56)
>>>        at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
>>>        at
>>>
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>        at
>>>
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>        at java.lang.Thread.run(Thread.java:745)
>>>     *Caused by: java.lang.ClassNotFoundException:
>>> com.gis.io.GeometryWritable*
>>>        at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>>>        at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>>>        at java.security.AccessController.doPrivileged(Native Method)
>>>        at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>>>        at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>>>        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>>>        at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>>>        at java.lang.Class.forName0(Native Method)
>>>        at java.lang.Class.forName(Class.java:270)
>>>        at
>>>
>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
>>>        ... 52 more
>>>
>>> Where /com.gis.io.GeometryWritable/ is my Custom class for which I'm
>>> creating UDT, which is present in the APP jar. I've tried as disused
>>> here
>>> <https://groups.google.com/forum/#!topic/aureliusgraphs/iqDbeLJsGsg>
>>>  also
>>> by changing spark default serializer from kryo to Java. But it doesn't
>>> help
>>> me out. Any Suggestions ?? If I'm missing something? Following are my
>>> classes:
>>>
>>> @SQLUserDefinedType(udt = classOf[GeometryUDT])
>>>     class GeometryWritable(var _geometry: Geometry) extends Writable with
>>> Serializable {
>>>
>>>        def geometry = _geometry
>>>        def geometry_=(geometry: Geometry) = _geometry = geometry
>>>
>>>        def this() = this(null)
>>>
>>>        override def write(dataOutput: DataOutput) : Unit = {}
>>>        override def readFields(dataInput: DataInput) : Unit = {}
>>>        @throws(classOf[IOException])
>>>        private def writeObject(stream: ObjectOutputStream): Unit = {}
>>>        @throws(classOf[IOException])
>>>        private def readObject(stream: ObjectInputStream): Unit = {}
>>>     }
>>>
>>>     class GeometryUDT extends UserDefinedType[GeometryWritable] with
>>> Serializable {
>>>
>>>       override def sqlType: DataType = ArrayType(ByteType)
>>>       override def serialize(obj: Any): Array[Byte] = {}
>>>       override def deserialize(datum: Any): GeometryWritable = {}
>>>       override def userClass: Class[GeometryWritable] =
>>> classOf[GeometryWritable]
>>>
>>>    }
>>>
>>> This is how I'm using it.
>>>
>>>      val rdd = sc.textFile(args(0)).map(
>>>        line => {
>>>         val point = new Point
>>>         point.setY(line.split(" ")(0).toDouble)
>>>         point.setX(line.split(" ")(1).toDouble)
>>>        Row.fromSeq(Seq(new GeometryWritable(point)))
>>>      })
>>>      val schema = StructType(Seq(StructField("Geometry",new GeometryUDT,
>>> true)))
>>>
>>>      val schemaRDD = sqlContext.applySchema(rdd,
>>> schema).persist(StorageLevel.MEMORY_AND_DISK)
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-UDT-Kryo-serialization-Unable-to-find-class-tp22101.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>

Reply via email to