Hi Shreya,
GraphFrames just calls the GraphX strongly connected components code. (
https://github.com/graphframes/graphframes/blob/release-0.2.0/src/main/scala/org/graphframes/lib/StronglyConnectedComponents.scala#L51
)

For choosing the number of iterations: If the number of iterations is less
than the diameter of the graph, you may get an incorrect result. But
running for more iterations than that buys you nothing. The algorithm is
basically to broadcast your ID to all your neighbors in the first round,
and then broadcast the smallest ID that you have seen so far in the next
rounds. So with only 1 round you will get a wrong result unless each vertex
is connected to the vertex with the lowest ID in that component. (Unlikely
in a real graph.)

See
https://github.com/apache/spark/blob/v2.0.2/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
for the actual implementation.

A better algorithm exists for this problem that only requires O(log(N))
iterations when N is the largest component diameter. (It is described in "A
Model of Computation for MapReduce",
http://www.sidsuri.com/Publications_files/mrc.pdf.) This outperforms
GraphX's implementation immensely. (See the last slide of
http://www.slideshare.net/SparkSummit/interactive-graph-analytics-daniel-darabos#33.)
The large advantage is due to the lower number of necessary iterations.

For why this is failing even with one iteration: I would first check your
partitioning. Too many or too few partitions could equally cause the issue.
If you are lucky, there is no overlap between the "too many" and "too few"
domains :).

On Fri, Nov 11, 2016 at 7:39 PM, Shreya Agarwal <shrey...@microsoft.com>
wrote:

> Tried GraphFrames. Still faced the same – job died after a few hours . The
> errors I see (And I see tons of them) are –
>
> (I ran with 3 times the partitions as well, which was 12 times number of
> executors , but still the same.)
>
>
>
> -------------------------------------
>
> ERROR NativeAzureFileSystem: Encountered Storage Exception for write on
> Blob : hdp/spark2-events/application_1478717432179_0021.inprogress
> Exception details: null Error Code : RequestBodyTooLarge
>
>
>
> -------------------------------------
>
>
>
> 16/11/11 09:21:46 ERROR TransportResponseHandler: Still have 3 requests
> outstanding when connection from /10.0.0.95:43301 is closed
>
> 16/11/11 09:21:46 INFO RetryingBlockFetcher: Retrying fetch (1/3) for 2
> outstanding blocks after 5000 ms
>
> 16/11/11 09:21:46 INFO ShuffleBlockFetcherIterator: Getting 1500 non-empty
> blocks out of 1500 blocks
>
> 16/11/11 09:21:46 ERROR OneForOneBlockFetcher: Failed while starting block
> fetches
>
> java.io.IOException: Connection from /10.0.0.95:43301 closed
>
>
>
> -------------------------------------
>
>
>
> 16/11/11 09:21:46 ERROR OneForOneBlockFetcher: Failed while starting block
> fetches
>
> java.lang.RuntimeException: java.io.FileNotFoundException:
> /mnt/resource/hadoop/yarn/local/usercache/shreyagrssh/
> appcache/application_1478717432179_0021/blockmgr-b1dde30d-359e-4932-b7a4-
> a5e138a52360/37/shuffle_1346_21_0.index (No such file or directory)
>
>
>
> -------------------------------------
>
>
>
> org.apache.spark.SparkException: Exception thrown in awaitResult
>
>         at org.apache.spark.rpc.RpcTimeout$$anonfun$1.
> applyOrElse(RpcTimeout.scala:77)
>
>         at org.apache.spark.rpc.RpcTimeout$$anonfun$1.
> applyOrElse(RpcTimeout.scala:75)
>
>         at scala.runtime.AbstractPartialFunction.apply(
> AbstractPartialFunction.scala:36)
>
>         at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.
> applyOrElse(RpcTimeout.scala:59)
>
>         at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.
> applyOrElse(RpcTimeout.scala:59)
>
>         at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>
>         at org.apache.spark.rpc.RpcTimeout.awaitResult(
> RpcTimeout.scala:83)
>
>         at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(
> RpcEndpointRef.scala:102)
>
>         at org.apache.spark.executor.Executor.org$apache$spark$
> executor$Executor$$reportHeartBeat(Executor.scala:518)
>
>         at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$
> 1.apply$mcV$sp(Executor.scala:547)
>
>         at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$
> 1.apply(Executor.scala:547)
>
>         at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$
> 1.apply(Executor.scala:547)
>
>         at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.
> scala:1857)
>
>         at org.apache.spark.executor.Executor$$anon$1.run(Executor.
> scala:547)
>
>         at java.util.concurrent.Executors$RunnableAdapter.
> call(Executors.java:511)
>
>         at java.util.concurrent.FutureTask.runAndReset(
> FutureTask.java:308)
>
>         at java.util.concurrent.ScheduledThreadPoolExecutor$
> ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>
>         at java.util.concurrent.ScheduledThreadPoolExecutor$
> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
>
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
>
>         at java.lang.Thread.run(Thread.java:745)
>
> Caused by: java.util.ConcurrentModificationException
>
>         at java.util.ArrayList.writeObject(ArrayList.java:766)
>
>         at sun.reflect.GeneratedMethodAccessor25.invoke(Unknown Source)
>
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
>
>         at java.lang.reflect.Method.invoke(Method.java:498)
>
>         at java.io.ObjectStreamClass.invokeWriteObject(
> ObjectStreamClass.java:1028)
>
>         at java.io.ObjectOutputStream.writeSerialData(
> ObjectOutputStream.java:1496)
>
>         at java.io.ObjectOutputStream.writeOrdinaryObject(
> ObjectOutputStream.java:1432)
>
>         at java.io.ObjectOutputStream.writeObject0(
> ObjectOutputStream.java:1178)
>
>
>
> -------------------------------------
>
>
>
> 16/11/11 13:21:54 WARN Executor: Issue communicating with driver in
> heartbeater
>
> org.apache.spark.SparkException: Error sending message [message =
> Heartbeat(537,[Lscala.Tuple2;@2999dae4,BlockManagerId(537, 10.0.0.103,
> 36162))]
>
>         at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(
> RpcEndpointRef.scala:119)
>
>         at org.apache.spark.executor.Executor.org$apache$spark$
> executor$Executor$$reportHeartBeat(Executor.scala:518)
>
>         at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$
> 1.apply$mcV$sp(Executor.scala:547)
>
>         at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$
> 1.apply(Executor.scala:547)
>
>         at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$
> 1.apply(Executor.scala:547)
>
>         at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.
> scala:1857)
>
>         at org.apache.spark.executor.Executor$$anon$1.run(Executor.
> scala:547)
>
>         at java.util.concurrent.Executors$RunnableAdapter.
> call(Executors.java:511)
>
>         at java.util.concurrent.FutureTask.runAndReset(
> FutureTask.java:308)
>
>         at java.util.concurrent.ScheduledThreadPoolExecutor$
> ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>
>         at java.util.concurrent.ScheduledThreadPoolExecutor$
> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
>
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
>
>         at java.lang.Thread.run(Thread.java:745)
>
> Caused by: org.apache.spark.rpc.RpcTimeoutException: Futures timed out
> after [10 seconds]. This timeout is controlled by spark.executor.
> heartbeatInterval
>
>         at org.apache.spark.rpc.RpcTimeout.org$apache$spark$
> rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
>
>         at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.
> applyOrElse(RpcTimeout.scala:63)
>
>         at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.
> applyOrElse(RpcTimeout.scala:59)
>
>         at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>
>         at org.apache.spark.rpc.RpcTimeout.awaitResult(
> RpcTimeout.scala:83)
>
>         at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(
> RpcEndpointRef.scala:102)
>
>         ... 13 more
>
> Caused by: java.util.concurrent.TimeoutException: Futures timed out after
> [10 seconds]
>
>         at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.
> scala:219)
>
>         at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.
> scala:223)
>
>         at scala.concurrent.Await$$anonfun$result$1.apply(
> package.scala:190)
>
>
>
> *From:* Shreya Agarwal
> *Sent:* Thursday, November 10, 2016 8:16 PM
> *To:* 'Felix Cheung' <felixcheun...@hotmail.com>; user@spark.apache.org
> *Subject:* RE: Strongly Connected Components
>
>
>
> Yesterday’s run died sometime during the night, without any errors. Today,
> I am running it using GraphFrames instead. It is still spawning new tasks,
> so there is progress.
>
>
>
> *From:* Felix Cheung [mailto:felixcheun...@hotmail.com
> <felixcheun...@hotmail.com>]
> *Sent:* Thursday, November 10, 2016 7:50 PM
> *To:* user@spark.apache.org; Shreya Agarwal <shrey...@microsoft.com>
> *Subject:* Re: Strongly Connected Components
>
>
>
> It is possible it is dead. Could you check the Spark UI to see if there is
> any progress?
>
>
>
> _____________________________
> From: Shreya Agarwal <shrey...@microsoft.com>
> Sent: Thursday, November 10, 2016 12:45 AM
> Subject: RE: Strongly Connected Components
> To: <user@spark.apache.org>
>
>
> Bump. Anyone? Its been running for 10 hours now. No results.
>
>
>
> *From:* Shreya Agarwal
> *Sent:* Tuesday, November 8, 2016 9:05 PM
> *To:* user@spark.apache.org
> *Subject:* Strongly Connected Components
>
>
>
> Hi,
>
>
>
> I am running this on a graph with >5B edges and >3B edges and have 2
> questions –
>
>
>
>    1. What is the optimal number of iterations?
>    2. I am running it for 1 iteration right now on a beefy 100 node
>    cluster, with 300 executors each having 30GB RAM and 5 cores. I have
>    persisted the graph to MEMORY_AND_DISK. And it has been running for 3 hours
>    already. Any ideas on how to speed this up?
>
>
>
> Regards,
>
> Shreya
>
>
>

Reply via email to