Helo everyone:

I have a problem when setting the number of partitions inside Graphx with
the ConnectedComponents function. When I launch the application with the
default number of partition everything runs smoothly. However when I
increase the number of partitions to 150 for example ( it happens with
bigger values as well) it gets stuck in stage 5 in the last task.

<http://apache-spark-user-list.1001560.n3.nabble.com/file/n27629/Screen_Shot_2016-08-31_at_13.png>
 

with the following error


[Stage
5:=====================================================================>  
(190 + 10) / 200]241.445: [GC [PSYoungGen: 118560K->800K(233472K)]
418401K->301406K(932864K), 0.0029430 secs] [Times: user=0.02 sys=0.00,
real=0.01 secs]
[Stage
5:=========================================================================>(199
+ 1) / 200]16/08/31 11:09:23 ERROR spark.ContextCleaner: Error cleaning
broadcast 4
org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply in 120
seconds. This timeout is controlled by spark.rpc.askTimeout
        at
org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
        at
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
        at
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
        at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
        at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:185)
        at scala.util.Try$.apply(Try.scala:161)
        at scala.util.Failure.recover(Try.scala:185)
        at
scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
        at
scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
        at
org.spark-project.guava.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293)
        at
scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:133)
        at
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
        at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
        at scala.concurrent.Promise$class.complete(Promise.scala:55)
        at
scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153)
        at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
        at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
        at
scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.processBatch$1(Future.scala:643)
        at
scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.apply$mcV$sp(Future.scala:658)
        at
scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.apply(Future.scala:635)
        at
scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.apply(Future.scala:635)
        at
scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
        at
scala.concurrent.Future$InternalCallbackExecutor$Batch.run(Future.scala:634)
        at
scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
        at
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:685)
        at
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
        at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
        at scala.concurrent.Promise$class.tryFailure(Promise.scala:112)
        at
scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:153)
        at
org.apache.spark.rpc.netty.NettyRpcEnv$$anon$1.run(NettyRpcEnv.scala:241)
        at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.TimeoutException: Cannot receive any reply
in 120 seconds
        at
org.apache.spark.rpc.netty.NettyRpcEnv$$anon$1.run(NettyRpcEnv.scala:242)
        ... 7 more

The way I set the number of partitions is when reading the graph through:

val graph = GraphLoader.edgeListFile(sc, input, true, minEdge,
StorageLevel.MEMORY_AND_DISK, StorageLevel.MEMORY_AND_DISK)
val res = graph.connectedComponents().vertices

The version of Spark I'm using is 1.6 and the size of the file is 5GB
generated through the Graphx utility to generate data. 

Thanks in advance.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Problem-with-Graphx-and-number-of-partitions-tp27629.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to