would be good if you can contribute this as an example. BFS is a common
enough algo.

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi <https://twitter.com/mayur_rustagi>



On Sat, Apr 19, 2014 at 4:16 AM, Ghufran Malik <gooksma...@gmail.com> wrote:

> Ahh nvm I found the solution :)
>
> triplet.srcAttr != Double.PositiveInfinity && triplet.dstAttr ==
> Double.PositiveInfinity
>
> as my new if condition.
>
>
> ---------- Forwarded message ----------
> From: Ghufran Malik <gooksma...@gmail.com>
> Date: 18 April 2014 23:15
> Subject: BFS implemented
> To: user@spark.apache.org
>
>
> Hi I have sucessfully implemented the Breadth First Search algorithm using
> the Pregel operator in graphX as follows:
>
> val graph = GraphLoader.edgeListFile(sc, "graphx/data/test_graph.txt")
>
> val root: VertexId = 1
> val initialGraph = graph.mapVertices((id, _) => if (id == root) 0.0 else
> Double.PositiveInfinity)
>
>
> val bfs = initialGraph.pregel(Double.PositiveInfinity, 20)( (id, attr,
> msg) => math.min(attr, msg), triplet => { if (triplet.srcAttr !=
> Double.PositiveInfinity) { Iterator((triplet.dstId, triplet.srcAttr+1)) }
> else { Iterator.empty } }, (a,b) => math.min(a,b) )
>
> println(bfs.vertices.collect.mkString("\n"))
>
> where the test_graph.txt is:
> 1 2
> 2 1
> 2 3
> 2 4
> 3 2
> 3 3
> 4 2
> 4 3
>
> and the result outputted after I run my algorithm is:
> (4,2.0)
> (2,1.0)
> (3,2.0)
> (1,0.0)
>
> which is the correct result.
>
> I was hoping someone could improve upon my implementation by suggesting a
> way in which I do not need the max iteration number (20). If I remove this
> my job will continue on for sometime until eventual I receive the error:
>
>
> 7)
>     at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896)
>     at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>     at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
>     at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
>     at
> ....................carries on and on.................................
>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
>     at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
>     at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)
>     at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:483)
> 14/04/18 23:11:14 ERROR TaskSetManager: Task 81094.0:0 failed 1 times;
> aborting job
> 14/04/18 23:11:14 INFO DAGScheduler: Failed to run reduce at
> VertexRDD.scala:91
> 14/04/18 23:11:14 INFO TaskSchedulerImpl: Remove TaskSet 81094.0 from pool
> org.apache.spark.SparkException: Job aborted: Task 81094.0:0 failed 1
> times (most recent failure: Exception failure: java.lang.StackOverflowError)
>     at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
>     at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
>     at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>     at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
>     at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
>     at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
>     at scala.Option.foreach(Option.scala:236)
>     at
> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
>     at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>     at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>     at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>     at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>     at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>     at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>     at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>     at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
>
>
>

Reply via email to