Hi, I get the same problem with both the CanonicalVertexCut and RandomVertexCut, with the graph code as follows
val graph = Graph.fromEdgeTuples(indexedEdges, 0, None, StorageLevel.MEMORY_AND_DISK_SER, StorageLevel.MEMORY_AND_DISK_SER); graph.partitionBy(PartitionStrategy.RandomVertexCut); graph.connectedComponents().vertices From: Robin East [mailto:robin.e...@xense.co.uk] Sent: den 5 oktober 2015 19:07 To: William Saar <william.s...@king.com>; user@spark.apache.org Subject: Re: Graphx hangs and crashes on EdgeRDD creation Have you tried using Graph.partitionBy? e.g. using PartitionStrategy.RandomVertexCut? ------------------------------------------------------------------------------- Robin East Spark GraphX in Action Michael Malak and Robin East Manning Publications Co. http://www.manning.com/books/spark-graphx-in-action[manning.com]<https://urldefense.proofpoint.com/v2/url?u=http-3A__www.manning.com_books_spark-2Dgraphx-2Din-2Daction&d=AwMFaQ&c=vMQQCxRI9pSsuxcncXNTCA&r=xeOF0uoF3ypq-qwRTA3Cs_a8VRgxDa7p2cKJGxm4bzY&m=NjuNJX6FdlEeBFp14TNHdGWA0s-sOJEtXvSo5UOhGsI&s=KQ-b16m0NmxXPZUt_c47Ly73IEs6qOQrzo0gYNP6xW0&e=> On 5 Oct 2015, at 09:14, William Saar <william.s...@king.com<mailto:william.s...@king.com>> wrote: Hi, I am trying to run a GraphX job on 20 million edges with Spark 1.5.1, but the job seems to hang for 30 minutes on a single executor when creating the graph and eventually crashes with “IllegalArgumentException: Size exceeds Integer.MAX_VALUE” I suspect this is because of partitioning problem, but how can I control the partitioning of the creation of the EdgeRDD? My graph code only does the following: val graph = Graph.fromEdgeTuples(indexedEdges, 0, None, StorageLevel.MEMORY_AND_DISK_SER, StorageLevel.MEMORY_AND_DISK_SER); graph.connectedComponents().vertices The web UI shows the following while the job is hanging (I am running this inside a transform operation on spark streaming) transform at MyJob.scala:62<http://candy-bi01.skd.midasplayer.com:4040/stages/stage?id=10&attempt=0>+details RDD: EdgeRDD, EdgeRDD<http://candy-bi01.skd.midasplayer.com:4040/storage/rdd?id=28> org.apache.spark.streaming.dstream.DStream.transform(DStream.scala:649) com.example.MyJob$.main(MyJob.scala:62) com.example.MyJob.main(MyJob.scala) sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) The executor thread dump while the job is hanging is the following Thread 66: Executor task launch worker-1 (RUNNABLE) java.lang.System.identityHashCode(Native Method) com.esotericsoftware.kryo.util.IdentityObjectIntMap.get(IdentityObjectIntMap.java:241) com.esotericsoftware.kryo.util.MapReferenceResolver.getWrittenId(MapReferenceResolver.java:28) com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:588) com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:566) org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:158) org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:153) org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1190) org.apache.spark.storage.DiskStore$$anonfun$putIterator$1.apply$mcV$sp(DiskStore.scala:81) org.apache.spark.storage.DiskStore$$anonfun$putIterator$1.apply(DiskStore.scala:81) org.apache.spark.storage.DiskStore$$anonfun$putIterator$1.apply(DiskStore.scala:81) org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1206) org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:82) org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:791) org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638) org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:153) org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) org.apache.spark.rdd.RDD.iterator(RDD.scala:262) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) org.apache.spark.rdd.RDD.iterator(RDD.scala:264) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) org.apache.spark.rdd.RDD.iterator(RDD.scala:262) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) org.apache.spark.rdd.RDD.iterator(RDD.scala:264) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) org.apache.spark.scheduler.Task.run(Task.scala:88) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745) The failure stack trace is as follows: 15/10/02 17:09:54 ERROR JobScheduler: Error generating jobs for time 1443796200000 ms org.apache.spark.SparkException: Job aborted due to stage failure: Task 39 in stage 10.0 failed 4 times, most recent failure: Lost task 39.3 in stage 10.0 (TID 168, 172.26.88.66): java.lang.RuntimeException: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836) at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125) at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1206) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:512) at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:302) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114) at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:87) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:101) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) at java.lang.Thread.run(Thread.java:745) at org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:162) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:103) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1942) at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1003) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) at org.apache.spark.rdd.RDD.withScope(RDD.scala:306) at org.apache.spark.rdd.RDD.reduce(RDD.scala:985) at org.apache.spark.graphx.impl.VertexRDDImpl.count(VertexRDDImpl.scala:90) at org.apache.spark.graphx.Pregel$.apply(Pregel.scala:125) at org.apache.spark.graphx.lib.ConnectedComponents$.run(ConnectedComponents.scala:50) at org.apache.spark.graphx.GraphOps.connectedComponents(GraphOps.scala:417)