Thanks for the detailed response ☺ I will try the things you mentioned!

From: Daniel Darabos [mailto:daniel.dara...@lynxanalytics.com]
Sent: Friday, November 11, 2016 4:59 PM
To: Shreya Agarwal <shrey...@microsoft.com>
Cc: Felix Cheung <felixcheun...@hotmail.com>; user@spark.apache.org; Denny Lee 
<denny....@microsoft.com>
Subject: Re: Strongly Connected Components

Hi Shreya,
GraphFrames just calls the GraphX strongly connected components code. 
(https://github.com/graphframes/graphframes/blob/release-0.2.0/src/main/scala/org/graphframes/lib/StronglyConnectedComponents.scala#L51)

For choosing the number of iterations: If the number of iterations is less than 
the diameter of the graph, you may get an incorrect result. But running for 
more iterations than that buys you nothing. The algorithm is basically to 
broadcast your ID to all your neighbors in the first round, and then broadcast 
the smallest ID that you have seen so far in the next rounds. So with only 1 
round you will get a wrong result unless each vertex is connected to the vertex 
with the lowest ID in that component. (Unlikely in a real graph.)

See 
https://github.com/apache/spark/blob/v2.0.2/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
 for the actual implementation.

A better algorithm exists for this problem that only requires O(log(N)) 
iterations when N is the largest component diameter. (It is described in "A 
Model of Computation for MapReduce", 
http://www.sidsuri.com/Publications_files/mrc.pdf.) This outperforms GraphX's 
implementation immensely. (See the last slide of 
http://www.slideshare.net/SparkSummit/interactive-graph-analytics-daniel-darabos#33.)
 The large advantage is due to the lower number of necessary iterations.

For why this is failing even with one iteration: I would first check your 
partitioning. Too many or too few partitions could equally cause the issue. If 
you are lucky, there is no overlap between the "too many" and "too few" domains 
:).

On Fri, Nov 11, 2016 at 7:39 PM, Shreya Agarwal 
<shrey...@microsoft.com<mailto:shrey...@microsoft.com>> wrote:
Tried GraphFrames. Still faced the same – job died after a few hours . The 
errors I see (And I see tons of them) are –
(I ran with 3 times the partitions as well, which was 12 times number of 
executors , but still the same.)

-------------------------------------
ERROR NativeAzureFileSystem: Encountered Storage Exception for write on Blob : 
hdp/spark2-events/application_1478717432179_0021.inprogress Exception details: 
null Error Code : RequestBodyTooLarge

-------------------------------------

16/11/11 09:21:46 ERROR TransportResponseHandler: Still have 3 requests 
outstanding when connection from /10.0.0.95:43301<http://10.0.0.95:43301> is 
closed
16/11/11 09:21:46 INFO RetryingBlockFetcher: Retrying fetch (1/3) for 2 
outstanding blocks after 5000 ms
16/11/11 09:21:46 INFO ShuffleBlockFetcherIterator: Getting 1500 non-empty 
blocks out of 1500 blocks
16/11/11 09:21:46 ERROR OneForOneBlockFetcher: Failed while starting block 
fetches
java.io.IOException: Connection from /10.0.0.95:43301<http://10.0.0.95:43301> 
closed

-------------------------------------

16/11/11 09:21:46 ERROR OneForOneBlockFetcher: Failed while starting block 
fetches
java.lang.RuntimeException: java.io.FileNotFoundException: 
/mnt/resource/hadoop/yarn/local/usercache/shreyagrssh/appcache/application_1478717432179_0021/blockmgr-b1dde30d-359e-4932-b7a4-a5e138a52360/37/shuffle_1346_21_0.index
 (No such file or directory)

-------------------------------------

org.apache.spark.SparkException: Exception thrown in awaitResult
        at 
org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:77)
        at 
org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:75)
        at 
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
        at 
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
        at 
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
        at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)
        at 
org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
        at 
org.apache.spark.executor.Executor.org<http://org.apache.spark.executor.Executor.org>$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:518)
        at 
org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:547)
        at 
org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:547)
        at 
org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:547)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1857)
        at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:547)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.ConcurrentModificationException
        at java.util.ArrayList.writeObject(ArrayList.java:766)
        at sun.reflect.GeneratedMethodAccessor25.invoke(Unknown Source)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)

-------------------------------------

16/11/11 13:21:54 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Error sending message [message = 
Heartbeat(537,[Lscala.Tuple2;@2999dae4,BlockManagerId(537, 10.0.0.103, 36162))]
        at 
org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:119)
        at 
org.apache.spark.executor.Executor.org<http://org.apache.spark.executor.Executor.org>$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:518)
        at 
org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:547)
        at 
org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:547)
        at 
org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:547)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1857)
        at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:547)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.rpc.RpcTimeoutException: Futures timed out after 
[10 seconds]. This timeout is controlled by spark.executor.heartbeatInterval
        at 
org.apache.spark.rpc.RpcTimeout.org<http://org.apache.spark.rpc.RpcTimeout.org>$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
        at 
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
        at 
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
        at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)
        at 
org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
        ... 13 more
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [10 
seconds]
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
        at 
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
        at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)

From: Shreya Agarwal
Sent: Thursday, November 10, 2016 8:16 PM
To: 'Felix Cheung' 
<felixcheun...@hotmail.com<mailto:felixcheun...@hotmail.com>>; 
user@spark.apache.org<mailto:user@spark.apache.org>
Subject: RE: Strongly Connected Components

Yesterday’s run died sometime during the night, without any errors. Today, I am 
running it using GraphFrames instead. It is still spawning new tasks, so there 
is progress.

From: Felix Cheung [mailto:felixcheun...@hotmail.com]
Sent: Thursday, November 10, 2016 7:50 PM
To: user@spark.apache.org<mailto:user@spark.apache.org>; Shreya Agarwal 
<shrey...@microsoft.com<mailto:shrey...@microsoft.com>>
Subject: Re: Strongly Connected Components

It is possible it is dead. Could you check the Spark UI to see if there is any 
progress?

_____________________________
From: Shreya Agarwal <shrey...@microsoft.com<mailto:shrey...@microsoft.com>>
Sent: Thursday, November 10, 2016 12:45 AM
Subject: RE: Strongly Connected Components
To: <user@spark.apache.org<mailto:user@spark.apache.org>>

Bump. Anyone? Its been running for 10 hours now. No results.

From: Shreya Agarwal
Sent: Tuesday, November 8, 2016 9:05 PM
To: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Strongly Connected Components

Hi,

I am running this on a graph with >5B edges and >3B edges and have 2 questions –


  1.  What is the optimal number of iterations?
  2.  I am running it for 1 iteration right now on a beefy 100 node cluster, 
with 300 executors each having 30GB RAM and 5 cores. I have persisted the graph 
to MEMORY_AND_DISK. And it has been running for 3 hours already. Any ideas on 
how to speed this up?

Regards,
Shreya


Reply via email to