I'll caution you that this is not a stable public API.

That said, it seems that the issue is that you have not copied the jar file
containing your class to all of the executors.  You should not need to do
any special configuration of serialization (you can't for SQL, as we hard
code it for performance, since we generally know all the types that are
going to be shipped)

On Tue, Mar 17, 2015 at 5:17 AM, zia_kayani <zia.kay...@platalytics.com>
wrote:

> Hi,
> I want to introduce custom type for SchemaRDD, I'm following  this
> <
> https://github.com/apache/spark/blob/branch-1.2/sql/core/src/main/scala/org/apache/spark/sql/test/ExamplePointUDT.scala
> >
> example. But I'm having Kryo Serialization issues, here is stack trace:
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
> in
> stage 6.0 failed 1 times, most recent failure:
>     Lost task 0.0 in stage 6.0 (TID 22, localhost):
>     *com.esotericsoftware.kryo.KryoException: Unable to find class:
> com.gis.io.GeometryWritable*
>     Serialization trace:
>     value (org.apache.spark.sql.catalyst.expressions.MutableAny)
>     values (org.apache.spark.sql.catalyst.expressions.SpecificMutableRow)
>        at
>
> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
>        at
>
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
>        at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610)
>        at
>
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:599)
>        at
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
>        at
>
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
>        at
>
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
>        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:651)
>        at
>
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
>        at
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
>        at
> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42)
>        at
> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
>        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
>        at
>
> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:142)
>        at
>
> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
>        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>        at
>
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>        at
>
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>        at
>
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>        at
>
> org.apache.spark.sql.execution.joins.HashedRelation$.apply(HashedRelation.scala:80)
>        at
>
> org.apache.spark.sql.execution.joins.ShuffledHashJoin$$anonfun$execute$1.apply(ShuffledHashJoin.scala:46)
>        at
>
> org.apache.spark.sql.execution.joins.ShuffledHashJoin$$anonfun$execute$1.apply(ShuffledHashJoin.scala:45)
>        at
>
> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
>        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>        at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>        at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>        at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>        at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>        at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
>        at org.apache.spark.rdd.RDD.iterator(RDD.scala:228)
>        at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>        at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>        at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>        at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>        at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>        at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>        at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>        at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>        at org.apache.spark.scheduler.Task.run(Task.scala:56)
>        at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
>        at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>        at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>        at java.lang.Thread.run(Thread.java:745)
>     *Caused by: java.lang.ClassNotFoundException:
> com.gis.io.GeometryWritable*
>        at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>        at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>        at java.security.AccessController.doPrivileged(Native Method)
>        at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>        at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>        at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>        at java.lang.Class.forName0(Native Method)
>        at java.lang.Class.forName(Class.java:270)
>        at
>
> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
>        ... 52 more
>
> Where /com.gis.io.GeometryWritable/ is my Custom class for which I'm
> creating UDT, which is present in the APP jar. I've tried as disused  here
> <https://groups.google.com/forum/#!topic/aureliusgraphs/iqDbeLJsGsg>
>  also
> by changing spark default serializer from kryo to Java. But it doesn't help
> me out. Any Suggestions ?? If I'm missing something? Following are my
> classes:
>
> @SQLUserDefinedType(udt = classOf[GeometryUDT])
>     class GeometryWritable(var _geometry: Geometry) extends Writable with
> Serializable {
>
>        def geometry = _geometry
>        def geometry_=(geometry: Geometry) = _geometry = geometry
>
>        def this() = this(null)
>
>        override def write(dataOutput: DataOutput) : Unit = {}
>        override def readFields(dataInput: DataInput) : Unit = {}
>        @throws(classOf[IOException])
>        private def writeObject(stream: ObjectOutputStream): Unit = {}
>        @throws(classOf[IOException])
>        private def readObject(stream: ObjectInputStream): Unit = {}
>     }
>
>     class GeometryUDT extends UserDefinedType[GeometryWritable] with
> Serializable {
>
>       override def sqlType: DataType = ArrayType(ByteType)
>       override def serialize(obj: Any): Array[Byte] = {}
>       override def deserialize(datum: Any): GeometryWritable = {}
>       override def userClass: Class[GeometryWritable] =
> classOf[GeometryWritable]
>
>    }
>
> This is how I'm using it.
>
>      val rdd = sc.textFile(args(0)).map(
>        line => {
>         val point = new Point
>         point.setY(line.split(" ")(0).toDouble)
>         point.setX(line.split(" ")(1).toDouble)
>        Row.fromSeq(Seq(new GeometryWritable(point)))
>      })
>      val schema = StructType(Seq(StructField("Geometry",new GeometryUDT,
> true)))
>
>      val schemaRDD = sqlContext.applySchema(rdd,
> schema).persist(StorageLevel.MEMORY_AND_DISK)
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-UDT-Kryo-serialization-Unable-to-find-class-tp22101.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

Reply via email to