oh ok i see now its not the same On Sat, Nov 12, 2016 at 2:48 PM, Koert Kuipers <ko...@tresata.com> wrote:
> not sure i see the faster algo in the paper you mention. > > i see this in section 6.1.2: > "In what follows we give a simple labeling algorithm that computes > connectivity on sparse graphs in O(log N) rounds." > N here is the size of the graph, not the largest component diameter. > > that is the exact same algo as is implemented in graphx i think. or is it > not? > > On Fri, Nov 11, 2016 at 7:58 PM, Daniel Darabos < > daniel.dara...@lynxanalytics.com> wrote: > >> Hi Shreya, >> GraphFrames just calls the GraphX strongly connected components code. ( >> https://github.com/graphframes/graphframes/blob/release-0. >> 2.0/src/main/scala/org/graphframes/lib/StronglyConnec >> tedComponents.scala#L51) >> >> For choosing the number of iterations: If the number of iterations is >> less than the diameter of the graph, you may get an incorrect result. But >> running for more iterations than that buys you nothing. The algorithm is >> basically to broadcast your ID to all your neighbors in the first round, >> and then broadcast the smallest ID that you have seen so far in the next >> rounds. So with only 1 round you will get a wrong result unless each vertex >> is connected to the vertex with the lowest ID in that component. (Unlikely >> in a real graph.) >> >> See https://github.com/apache/spark/blob/v2.0.2/graphx/src/main/ >> scala/org/apache/spark/graphx/lib/ConnectedComponents.scala for the >> actual implementation. >> >> A better algorithm exists for this problem that only requires O(log(N)) >> iterations when N is the largest component diameter. (It is described in "A >> Model of Computation for MapReduce", http://www.sidsuri.com/Publica >> tions_files/mrc.pdf.) This outperforms GraphX's implementation >> immensely. (See the last slide of http://www.slideshare.net/Spar >> kSummit/interactive-graph-analytics-daniel-darabos#33.) The large >> advantage is due to the lower number of necessary iterations. >> >> For why this is failing even with one iteration: I would first check your >> partitioning. Too many or too few partitions could equally cause the issue. >> If you are lucky, there is no overlap between the "too many" and "too few" >> domains :). >> >> On Fri, Nov 11, 2016 at 7:39 PM, Shreya Agarwal <shrey...@microsoft.com> >> wrote: >> >>> Tried GraphFrames. Still faced the same – job died after a few hours . >>> The errors I see (And I see tons of them) are – >>> >>> (I ran with 3 times the partitions as well, which was 12 times number of >>> executors , but still the same.) >>> >>> >>> >>> ------------------------------------- >>> >>> ERROR NativeAzureFileSystem: Encountered Storage Exception for write on >>> Blob : hdp/spark2-events/application_1478717432179_0021.inprogress >>> Exception details: null Error Code : RequestBodyTooLarge >>> >>> >>> >>> ------------------------------------- >>> >>> >>> >>> 16/11/11 09:21:46 ERROR TransportResponseHandler: Still have 3 requests >>> outstanding when connection from /10.0.0.95:43301 is closed >>> >>> 16/11/11 09:21:46 INFO RetryingBlockFetcher: Retrying fetch (1/3) for 2 >>> outstanding blocks after 5000 ms >>> >>> 16/11/11 09:21:46 INFO ShuffleBlockFetcherIterator: Getting 1500 >>> non-empty blocks out of 1500 blocks >>> >>> 16/11/11 09:21:46 ERROR OneForOneBlockFetcher: Failed while starting >>> block fetches >>> >>> java.io.IOException: Connection from /10.0.0.95:43301 closed >>> >>> >>> >>> ------------------------------------- >>> >>> >>> >>> 16/11/11 09:21:46 ERROR OneForOneBlockFetcher: Failed while starting >>> block fetches >>> >>> java.lang.RuntimeException: java.io.FileNotFoundException: >>> /mnt/resource/hadoop/yarn/local/usercache/shreyagrssh/appcac >>> he/application_1478717432179_0021/blockmgr-b1dde30d-359e-493 >>> 2-b7a4-a5e138a52360/37/shuffle_1346_21_0.index (No such file or >>> directory) >>> >>> >>> >>> ------------------------------------- >>> >>> >>> >>> org.apache.spark.SparkException: Exception thrown in awaitResult >>> >>> at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTi >>> meout.scala:77) >>> >>> at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTi >>> meout.scala:75) >>> >>> at scala.runtime.AbstractPartialFunction.apply(AbstractPartialF >>> unction.scala:36) >>> >>> at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout >>> $1.applyOrElse(RpcTimeout.scala:59) >>> >>> at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout >>> $1.applyOrElse(RpcTimeout.scala:59) >>> >>> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167) >>> >>> at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala >>> :83) >>> >>> at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpoint >>> Ref.scala:102) >>> >>> at org.apache.spark.executor.Executor.org$apache$spark$executor >>> $Executor$$reportHeartBeat(Executor.scala:518) >>> >>> at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.ap >>> ply$mcV$sp(Executor.scala:547) >>> >>> at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.ap >>> ply(Executor.scala:547) >>> >>> at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.ap >>> ply(Executor.scala:547) >>> >>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.sca >>> la:1857) >>> >>> at org.apache.spark.executor.Executor$$anon$1.run(Executor.scal >>> a:547) >>> >>> at java.util.concurrent.Executors$RunnableAdapter.call(Executor >>> s.java:511) >>> >>> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java: >>> 308) >>> >>> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFu >>> tureTask.access$301(ScheduledThreadPoolExecutor.java:180) >>> >>> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFu >>> tureTask.run(ScheduledThreadPoolExecutor.java:294) >>> >>> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool >>> Executor.java:1142) >>> >>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo >>> lExecutor.java:617) >>> >>> at java.lang.Thread.run(Thread.java:745) >>> >>> Caused by: java.util.ConcurrentModificationException >>> >>> at java.util.ArrayList.writeObject(ArrayList.java:766) >>> >>> at sun.reflect.GeneratedMethodAccessor25.invoke(Unknown Source) >>> >>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe >>> thodAccessorImpl.java:43) >>> >>> at java.lang.reflect.Method.invoke(Method.java:498) >>> >>> at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClas >>> s.java:1028) >>> >>> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStrea >>> m.java:1496) >>> >>> at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputS >>> tream.java:1432) >>> >>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.j >>> ava:1178) >>> >>> >>> >>> ------------------------------------- >>> >>> >>> >>> 16/11/11 13:21:54 WARN Executor: Issue communicating with driver in >>> heartbeater >>> >>> org.apache.spark.SparkException: Error sending message [message = >>> Heartbeat(537,[Lscala.Tuple2;@2999dae4,BlockManagerId(537, 10.0.0.103, >>> 36162))] >>> >>> at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpoint >>> Ref.scala:119) >>> >>> at org.apache.spark.executor.Executor.org$apache$spark$executor >>> $Executor$$reportHeartBeat(Executor.scala:518) >>> >>> at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.ap >>> ply$mcV$sp(Executor.scala:547) >>> >>> at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.ap >>> ply(Executor.scala:547) >>> >>> at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.ap >>> ply(Executor.scala:547) >>> >>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.sca >>> la:1857) >>> >>> at org.apache.spark.executor.Executor$$anon$1.run(Executor.scal >>> a:547) >>> >>> at java.util.concurrent.Executors$RunnableAdapter.call(Executor >>> s.java:511) >>> >>> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java: >>> 308) >>> >>> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFu >>> tureTask.access$301(ScheduledThreadPoolExecutor.java:180) >>> >>> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFu >>> tureTask.run(ScheduledThreadPoolExecutor.java:294) >>> >>> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool >>> Executor.java:1142) >>> >>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo >>> lExecutor.java:617) >>> >>> at java.lang.Thread.run(Thread.java:745) >>> >>> Caused by: org.apache.spark.rpc.RpcTimeoutException: Futures timed out >>> after [10 seconds]. This timeout is controlled by >>> spark.executor.heartbeatInterval >>> >>> at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTime >>> out$$createRpcTimeoutException(RpcTimeout.scala:48) >>> >>> at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout >>> $1.applyOrElse(RpcTimeout.scala:63) >>> >>> at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout >>> $1.applyOrElse(RpcTimeout.scala:59) >>> >>> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167) >>> >>> at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala >>> :83) >>> >>> at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpoint >>> Ref.scala:102) >>> >>> ... 13 more >>> >>> Caused by: java.util.concurrent.TimeoutException: Futures timed out >>> after [10 seconds] >>> >>> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.s >>> cala:219) >>> >>> at scala.concurrent.impl.Promise$DefaultPromise.result(Promise. >>> scala:223) >>> >>> at scala.concurrent.Await$$anonfun$result$1.apply(package.scala >>> :190) >>> >>> >>> >>> *From:* Shreya Agarwal >>> *Sent:* Thursday, November 10, 2016 8:16 PM >>> *To:* 'Felix Cheung' <felixcheun...@hotmail.com>; user@spark.apache.org >>> *Subject:* RE: Strongly Connected Components >>> >>> >>> >>> Yesterday’s run died sometime during the night, without any errors. >>> Today, I am running it using GraphFrames instead. It is still spawning new >>> tasks, so there is progress. >>> >>> >>> >>> *From:* Felix Cheung [mailto:felixcheun...@hotmail.com >>> <felixcheun...@hotmail.com>] >>> *Sent:* Thursday, November 10, 2016 7:50 PM >>> *To:* user@spark.apache.org; Shreya Agarwal <shrey...@microsoft.com> >>> *Subject:* Re: Strongly Connected Components >>> >>> >>> >>> It is possible it is dead. Could you check the Spark UI to see if there >>> is any progress? >>> >>> >>> >>> _____________________________ >>> From: Shreya Agarwal <shrey...@microsoft.com> >>> Sent: Thursday, November 10, 2016 12:45 AM >>> Subject: RE: Strongly Connected Components >>> To: <user@spark.apache.org> >>> >>> >>> Bump. Anyone? Its been running for 10 hours now. No results. >>> >>> >>> >>> *From:* Shreya Agarwal >>> *Sent:* Tuesday, November 8, 2016 9:05 PM >>> *To:* user@spark.apache.org >>> *Subject:* Strongly Connected Components >>> >>> >>> >>> Hi, >>> >>> >>> >>> I am running this on a graph with >5B edges and >3B edges and have 2 >>> questions – >>> >>> >>> >>> 1. What is the optimal number of iterations? >>> 2. I am running it for 1 iteration right now on a beefy 100 node >>> cluster, with 300 executors each having 30GB RAM and 5 cores. I have >>> persisted the graph to MEMORY_AND_DISK. And it has been running for 3 >>> hours >>> already. Any ideas on how to speed this up? >>> >>> >>> >>> Regards, >>> >>> Shreya >>> >>> >>> >> >> >