Hi I have sucessfully implemented the Breadth First Search algorithm using the Pregel operator in graphX as follows:
val graph = GraphLoader.edgeListFile(sc, "graphx/data/test_graph.txt") val root: VertexId = 1 val initialGraph = graph.mapVertices((id, _) => if (id == root) 0.0 else Double.PositiveInfinity) val bfs = initialGraph.pregel(Double.PositiveInfinity, 20)( (id, attr, msg) => math.min(attr, msg), triplet => { if (triplet.srcAttr != Double.PositiveInfinity) { Iterator((triplet.dstId, triplet.srcAttr+1)) } else { Iterator.empty } }, (a,b) => math.min(a,b) ) println(bfs.vertices.collect.mkString("\n")) where the test_graph.txt is: 1 2 2 1 2 3 2 4 3 2 3 3 4 2 4 3 and the result outputted after I run my algorithm is: (4,2.0) (2,1.0) (3,2.0) (1,0.0) which is the correct result. I was hoping someone could improve upon my implementation by suggesting a way in which I do not need the max iteration number (20). If I remove this my job will continue on for sometime until eventual I receive the error: 7) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at ....................carries on and on................................. at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at scala.collection.immutable.$colon$colon.readObject(List.scala:362) at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) 14/04/18 23:11:14 ERROR TaskSetManager: Task 81094.0:0 failed 1 times; aborting job 14/04/18 23:11:14 INFO DAGScheduler: Failed to run reduce at VertexRDD.scala:91 14/04/18 23:11:14 INFO TaskSchedulerImpl: Remove TaskSet 81094.0 from pool org.apache.spark.SparkException: Job aborted: Task 81094.0:0 failed 1 times (most recent failure: Exception failure: java.lang.StackOverflowError) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619) at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)