Hi, I get the same problem with both the CanonicalVertexCut and 
RandomVertexCut, with the graph code as follows

val graph = Graph.fromEdgeTuples(indexedEdges, 0, None, 
StorageLevel.MEMORY_AND_DISK_SER, StorageLevel.MEMORY_AND_DISK_SER);
graph.partitionBy(PartitionStrategy.RandomVertexCut);
graph.connectedComponents().vertices


From: Robin East [mailto:robin.e...@xense.co.uk]
Sent: den 5 oktober 2015 19:07
To: William Saar <william.s...@king.com>; user@spark.apache.org
Subject: Re: Graphx hangs and crashes on EdgeRDD creation

Have you tried using Graph.partitionBy? e.g. using 
PartitionStrategy.RandomVertexCut?
-------------------------------------------------------------------------------
Robin East
Spark GraphX in Action Michael Malak and Robin East
Manning Publications Co.
http://www.manning.com/books/spark-graphx-in-action[manning.com]<https://urldefense.proofpoint.com/v2/url?u=http-3A__www.manning.com_books_spark-2Dgraphx-2Din-2Daction&d=AwMFaQ&c=vMQQCxRI9pSsuxcncXNTCA&r=xeOF0uoF3ypq-qwRTA3Cs_a8VRgxDa7p2cKJGxm4bzY&m=NjuNJX6FdlEeBFp14TNHdGWA0s-sOJEtXvSo5UOhGsI&s=KQ-b16m0NmxXPZUt_c47Ly73IEs6qOQrzo0gYNP6xW0&e=>




On 5 Oct 2015, at 09:14, William Saar 
<william.s...@king.com<mailto:william.s...@king.com>> wrote:

Hi,
I am trying to run a GraphX job on 20 million edges with Spark 1.5.1, but the 
job seems to hang for 30 minutes on a single executor when creating the graph 
and eventually crashes with “IllegalArgumentException: Size exceeds 
Integer.MAX_VALUE”

I suspect this is because of partitioning problem, but how can I control the 
partitioning of the creation of the EdgeRDD?

My graph code only does the following:
val graph = Graph.fromEdgeTuples(indexedEdges, 0, None, 
StorageLevel.MEMORY_AND_DISK_SER, StorageLevel.MEMORY_AND_DISK_SER);
graph.connectedComponents().vertices

The web UI shows the following while the job is hanging (I am running this 
inside a transform operation on spark streaming)
transform at 
MyJob.scala:62<http://candy-bi01.skd.midasplayer.com:4040/stages/stage?id=10&attempt=0>+details
RDD: EdgeRDD, 
EdgeRDD<http://candy-bi01.skd.midasplayer.com:4040/storage/rdd?id=28>

org.apache.spark.streaming.dstream.DStream.transform(DStream.scala:649)

com.example.MyJob$.main(MyJob.scala:62)

com.example.MyJob.main(MyJob.scala)

sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

The executor thread dump while the job is hanging is the following
Thread 66: Executor task launch worker-1 (RUNNABLE)
java.lang.System.identityHashCode(Native Method)
com.esotericsoftware.kryo.util.IdentityObjectIntMap.get(IdentityObjectIntMap.java:241)
com.esotericsoftware.kryo.util.MapReferenceResolver.getWrittenId(MapReferenceResolver.java:28)
com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:588)
com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:566)
org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:158)
org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:153)
org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1190)
org.apache.spark.storage.DiskStore$$anonfun$putIterator$1.apply$mcV$sp(DiskStore.scala:81)
org.apache.spark.storage.DiskStore$$anonfun$putIterator$1.apply(DiskStore.scala:81)
org.apache.spark.storage.DiskStore$$anonfun$putIterator$1.apply(DiskStore.scala:81)
org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1206)
org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:82)
org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:791)
org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638)
org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:153)
org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
org.apache.spark.scheduler.Task.run(Task.scala:88)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)

The failure stack trace is as follows:
15/10/02 17:09:54 ERROR JobScheduler: Error generating jobs for time 
1443796200000 ms
org.apache.spark.SparkException: Job aborted due to stage failure: Task 39 in 
stage 10.0 failed 4 times, most recent failure: Lost task 39.3 in stage 10.0 
(TID 168, 172.26.88.66): java.lang.RuntimeException: 
java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
        at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836)
        at 
org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125)
        at 
org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1206)
        at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127)
        at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134)
        at 
org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:512)
        at 
org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:302)
        at 
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
        at 
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
        at 
org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
        at 
org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114)
        at 
org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:87)
        at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:101)
        at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
        at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
        at 
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
        at 
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
        at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
        at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
        at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
        at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
        at java.lang.Thread.run(Thread.java:745)

        at 
org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:162)
        at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:103)
        at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
        at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
        at 
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
        at 
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
        at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
        at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
        at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
        at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
        at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
        at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
        at scala.Option.foreach(Option.scala:236)
        at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1942)
        at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1003)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
        at org.apache.spark.rdd.RDD.reduce(RDD.scala:985)
        at 
org.apache.spark.graphx.impl.VertexRDDImpl.count(VertexRDDImpl.scala:90)
        at org.apache.spark.graphx.Pregel$.apply(Pregel.scala:125)
        at 
org.apache.spark.graphx.lib.ConnectedComponents$.run(ConnectedComponents.scala:50)
        at 
org.apache.spark.graphx.GraphOps.connectedComponents(GraphOps.scala:417)

Reply via email to