Ahh nvm I found the solution :)

triplet.srcAttr != Double.PositiveInfinity && triplet.dstAttr ==
Double.PositiveInfinity

as my new if condition.

---------- Forwarded message ----------
From: Ghufran Malik <gooksma...@gmail.com>
Date: 18 April 2014 23:15
Subject: BFS implemented
To: user@spark.apache.org


Hi I have sucessfully implemented the Breadth First Search algorithm using
the Pregel operator in graphX as follows:

val graph = GraphLoader.edgeListFile(sc, "graphx/data/test_graph.txt")

val root: VertexId = 1
val initialGraph = graph.mapVertices((id, _) => if (id == root) 0.0 else
Double.PositiveInfinity)


val bfs = initialGraph.pregel(Double.PositiveInfinity, 20)( (id, attr, msg)
=> math.min(attr, msg), triplet => { if (triplet.srcAttr !=
Double.PositiveInfinity) { Iterator((triplet.dstId, triplet.srcAttr+1)) }
else { Iterator.empty } }, (a,b) => math.min(a,b) )

println(bfs.vertices.collect.mkString("\n"))

where the test_graph.txt is:
1 2
2 1
2 3
2 4
3 2
3 3
4 2
4 3

and the result outputted after I run my algorithm is:
(4,2.0)
(2,1.0)
(3,2.0)
(1,0.0)

which is the correct result.

I was hoping someone could improve upon my implementation by suggesting a
way in which I do not need the max iteration number (20). If I remove this
my job will continue on for sometime until eventual I receive the error:


7)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896)
    at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
    at
....................carries on and on.................................
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
    at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
    at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)
    at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:483)
14/04/18 23:11:14 ERROR TaskSetManager: Task 81094.0:0 failed 1 times;
aborting job
14/04/18 23:11:14 INFO DAGScheduler: Failed to run reduce at
VertexRDD.scala:91
14/04/18 23:11:14 INFO TaskSchedulerImpl: Remove TaskSet 81094.0 from pool
org.apache.spark.SparkException: Job aborted: Task 81094.0:0 failed 1 times
(most recent failure: Exception failure: java.lang.StackOverflowError)
    at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
    at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
    at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
    at
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
    at
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
    at scala.Option.foreach(Option.scala:236)
    at
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
    at
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
    at akka.actor.ActorCell.invoke(ActorCell.scala:456)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Reply via email to