Hi All, I'm unable to use Kryo serializer in my Spark program. I'm loading a graph from an edgelist file using GraphLoader and performing a BFS using pregel API. But I get the below mentioned error while I'm running. Can anybody tell me what is the right way to serialize a class in Spark and what are the functions that needs to be implemented.
class MyRegistrator extends KryoRegistrator { override def registerClasses(kryo: Kryo) { kryo.register(classOf[GraphBFS]) kryo.register(classOf[Config]) kryo.register(classOf[Iterator[(Long, Double)]]) } } Class GraphBFS{ def vprog(id: VertexId, attr: Double, msg: Double): Double = math.min(attr,msg) def sendMessage(triplet: EdgeTriplet[Double, Int]) : Iterator[(VertexId, Double)] = { var iter:Iterator[(VertexId, Double)] = Iterator.empty val isSrcMarked = triplet.srcAttr != Double.PositiveInfinity val isDstMarked = triplet.dstAttr != Double.PositiveInfinity if(!(isSrcMarked && isDstMarked)){ if(isSrcMarked){ iter = Iterator((triplet.dstId,triplet.srcAttr+1)) }else{ iter = Iterator((triplet.srcId,triplet.dstAttr+1)) } } iter } def reduceMessage(a: Double, b: Double) = math.min(a,b) def main() { .......... val bfs = initialGraph.pregel(initialMessage, maxIterations, activeEdgeDirection)(vprog, sendMessage, reduceMessage) ..... } } 15/12/07 21:52:49 INFO BlockManager: Removing RDD 8 15/12/07 21:52:49 INFO BlockManager: Removing RDD 2 Exception in thread "main" org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132) at org.apache.spark.SparkContext.clean(SparkContext.scala:1893) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:683) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:682) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) at org.apache.spark.rdd.RDD.withScope(RDD.scala:286) at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:682) at org.apache.spark.graphx.impl.VertexRDDImpl.mapVertexPartitions(VertexRDDImpl.scala:96) at org.apache.spark.graphx.impl.GraphImpl.mapVertices(GraphImpl.scala:132) at org.apache.spark.graphx.Pregel$.apply(Pregel.scala:122) at org.apache.spark.graphx.GraphOps.pregel(GraphOps.scala:362) at GraphBFS.main(GraphBFS.scala:241) at run$.main(GraphBFS.scala:268) at run.main(GraphBFS.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.io.NotSerializableException: GraphBFS Serialization stack: - object not serializable (class: GraphBFS, value: GraphBFS@575c3e9b) - field (class: GraphBFS$$anonfun$17, name: $outer, type: class GraphBFS) - object (class GraphBFS$$anonfun$17, <function3>) - field (class: org.apache.spark.graphx.Pregel$$anonfun$1, name: vprog$1, type: interface scala.Function3) - object (class org.apache.spark.graphx.Pregel$$anonfun$1, <function2>) - field (class: org.apache.spark.graphx.impl.GraphImpl$$anonfun$5, name: f$1, type: interface scala.Function2) - object (class org.apache.spark.graphx.impl.GraphImpl$$anonfun$5, <function1>) - field (class: org.apache.spark.graphx.impl.VertexRDDImpl$$anonfun$1, name: f$1, type: interface scala.Function1) - object (class org.apache.spark.graphx.impl.VertexRDDImpl$$anonfun$1, <function1>) Thanks, Prasad ----- Thanks, Prasad -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-Serialization-in-Spark-tp25628.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org