You probably don't cause a shuffle (which requires serialization) unless there is a join or group by.
It's possible that we are need to pass the spark class loader to kryo when creating a new instance (you can get it from Utils I believe). We never run Otto this problem since this API is not public yet. I'd start by looking in SparkSqlSerializer. On Mar 18, 2015 1:13 AM, "Zia Ur Rehman Kayani" <zia.kay...@platalytics.com> wrote: > Thanks for your reply. I've tried this as well, by passing the JAR file > path to *spark.executor.extraClassPath *but it doesn't help me out, > actually I've figured it out that custom UDT works fine if I use only one > RDD (table). the issue arises when we join two or more RDDs. According to > this <https://datastax-oss.atlassian.net/browse/SPARKC-23>, its is a bug > when we use custom ROW and use JOIN. But the solution proposed isn't > working in my case. > > Any clue ? > > > On Tue, Mar 17, 2015 at 10:19 PM, Michael Armbrust <mich...@databricks.com > > wrote: > >> I'll caution you that this is not a stable public API. >> >> That said, it seems that the issue is that you have not copied the jar >> file containing your class to all of the executors. You should not need to >> do any special configuration of serialization (you can't for SQL, as we >> hard code it for performance, since we generally know all the types that >> are going to be shipped) >> >> On Tue, Mar 17, 2015 at 5:17 AM, zia_kayani <zia.kay...@platalytics.com> >> wrote: >> >>> Hi, >>> I want to introduce custom type for SchemaRDD, I'm following this >>> < >>> https://github.com/apache/spark/blob/branch-1.2/sql/core/src/main/scala/org/apache/spark/sql/test/ExamplePointUDT.scala >>> > >>> example. But I'm having Kryo Serialization issues, here is stack trace: >>> >>> org.apache.spark.SparkException: Job aborted due to stage failure: Task >>> 0 in >>> stage 6.0 failed 1 times, most recent failure: >>> Lost task 0.0 in stage 6.0 (TID 22, localhost): >>> *com.esotericsoftware.kryo.KryoException: Unable to find class: >>> com.gis.io.GeometryWritable* >>> Serialization trace: >>> value (org.apache.spark.sql.catalyst.expressions.MutableAny) >>> values (org.apache.spark.sql.catalyst.expressions.SpecificMutableRow) >>> at >>> >>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) >>> at >>> >>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) >>> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610) >>> at >>> >>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:599) >>> at >>> >>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) >>> at >>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732) >>> at >>> >>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338) >>> at >>> >>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293) >>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:651) >>> at >>> >>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) >>> at >>> >>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) >>> at >>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732) >>> at >>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42) >>> at >>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33) >>> at >>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732) >>> at >>> >>> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:142) >>> at >>> >>> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133) >>> at >>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) >>> at >>> >>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) >>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) >>> at >>> >>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) >>> at >>> >>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) >>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>> at >>> >>> org.apache.spark.sql.execution.joins.HashedRelation$.apply(HashedRelation.scala:80) >>> at >>> >>> org.apache.spark.sql.execution.joins.ShuffledHashJoin$$anonfun$execute$1.apply(ShuffledHashJoin.scala:46) >>> at >>> >>> org.apache.spark.sql.execution.joins.ShuffledHashJoin$$anonfun$execute$1.apply(ShuffledHashJoin.scala:45) >>> at >>> >>> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88) >>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) >>> at >>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) >>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) >>> at >>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) >>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) >>> at >>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:228) >>> at >>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) >>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) >>> at >>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) >>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) >>> at >>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) >>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) >>> at >>> >>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) >>> at >>> >>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >>> at org.apache.spark.scheduler.Task.run(Task.scala:56) >>> at >>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) >>> at >>> >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>> at >>> >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>> at java.lang.Thread.run(Thread.java:745) >>> *Caused by: java.lang.ClassNotFoundException: >>> com.gis.io.GeometryWritable* >>> at java.net.URLClassLoader$1.run(URLClassLoader.java:366) >>> at java.net.URLClassLoader$1.run(URLClassLoader.java:355) >>> at java.security.AccessController.doPrivileged(Native Method) >>> at java.net.URLClassLoader.findClass(URLClassLoader.java:354) >>> at java.lang.ClassLoader.loadClass(ClassLoader.java:425) >>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) >>> at java.lang.ClassLoader.loadClass(ClassLoader.java:358) >>> at java.lang.Class.forName0(Native Method) >>> at java.lang.Class.forName(Class.java:270) >>> at >>> >>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136) >>> ... 52 more >>> >>> Where /com.gis.io.GeometryWritable/ is my Custom class for which I'm >>> creating UDT, which is present in the APP jar. I've tried as disused >>> here >>> <https://groups.google.com/forum/#!topic/aureliusgraphs/iqDbeLJsGsg> >>> also >>> by changing spark default serializer from kryo to Java. But it doesn't >>> help >>> me out. Any Suggestions ?? If I'm missing something? Following are my >>> classes: >>> >>> @SQLUserDefinedType(udt = classOf[GeometryUDT]) >>> class GeometryWritable(var _geometry: Geometry) extends Writable with >>> Serializable { >>> >>> def geometry = _geometry >>> def geometry_=(geometry: Geometry) = _geometry = geometry >>> >>> def this() = this(null) >>> >>> override def write(dataOutput: DataOutput) : Unit = {} >>> override def readFields(dataInput: DataInput) : Unit = {} >>> @throws(classOf[IOException]) >>> private def writeObject(stream: ObjectOutputStream): Unit = {} >>> @throws(classOf[IOException]) >>> private def readObject(stream: ObjectInputStream): Unit = {} >>> } >>> >>> class GeometryUDT extends UserDefinedType[GeometryWritable] with >>> Serializable { >>> >>> override def sqlType: DataType = ArrayType(ByteType) >>> override def serialize(obj: Any): Array[Byte] = {} >>> override def deserialize(datum: Any): GeometryWritable = {} >>> override def userClass: Class[GeometryWritable] = >>> classOf[GeometryWritable] >>> >>> } >>> >>> This is how I'm using it. >>> >>> val rdd = sc.textFile(args(0)).map( >>> line => { >>> val point = new Point >>> point.setY(line.split(" ")(0).toDouble) >>> point.setX(line.split(" ")(1).toDouble) >>> Row.fromSeq(Seq(new GeometryWritable(point))) >>> }) >>> val schema = StructType(Seq(StructField("Geometry",new GeometryUDT, >>> true))) >>> >>> val schemaRDD = sqlContext.applySchema(rdd, >>> schema).persist(StorageLevel.MEMORY_AND_DISK) >>> >>> >>> >>> -- >>> View this message in context: >>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-UDT-Kryo-serialization-Unable-to-find-class-tp22101.html >>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>> >>> --------------------------------------------------------------------- >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: user-h...@spark.apache.org >>> >>> >> >