unsubscribe
unsubscribe
Re: [Spark] java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
Thanks Deng, Yes, I agree that there is a partition larger than 2GB which caused this exception. But actually in my case it seems to be not-helpful to fix this problem by directly increasing partitioning in sortBy operation. I think the partitioning in sortBy is not balanced, e.g. in my dataset, there exist amounts of elements having same value, then they will be kept in a common partition whose size would be quite large. So, I called rePartition method for balanced partitioning after sorting and it works well. On 30 October 2015 at 03:13, Deng Ching-Mallete <och...@apache.org> wrote: > Hi Yifan, > > This is a known issue, please refer to > https://issues.apache.org/jira/browse/SPARK-6235 for more details. In > your case, it looks like you are caching to disk a partition > 2G. A > workaround would be to increase the number of your RDD partitions in order > to make them smaller in size. > > HTH, > Deng > > On Thu, Oct 29, 2015 at 8:40 PM, Yifan LI <iamyifa...@gmail.com> wrote: > >> I have a guess that before scanning that RDD, I sorted it and set >> partitioning, so the result is not balanced: >> >> sortBy[S](f: Function >> <http://spark.apache.org/docs/latest/api/scala/org/apache/spark/api/java/function/Function.html> >> [T, S], ascending: Boolean, *numPartitions*: Int) >> >> I will try to repartition it to see if it helps. >> >> Best, >> Yifan LI >> >> >> >> >> >> On 29 Oct 2015, at 12:52, Yifan LI <iamyifa...@gmail.com> wrote: >> >> Hey, >> >> I was just trying to scan a large RDD sortedRdd, ~1billion elements, >> using toLocalIterator api, but an exception returned as it was almost >> finished: >> >> java.lang.RuntimeException: java.lang.IllegalArgumentException: Size >> exceeds Integer.MAX_VALUE >> at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:821) >> at >> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125) >> at >> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113) >> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285) >> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127) >> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134) >> at >> org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:511) >> at >> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:302) >> at >> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) >> at >> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) >> at >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >> at >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >> at >> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) >> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) >> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) >> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) >> at >> org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) >> at >> org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114) >> at >> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:87) >> at >> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:101) >> at >> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51) >> at >> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) >> at >> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) >> at >> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) >> at >> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:254) >> at >> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) >> at >> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) >> at >> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) >> at >> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333
[Spark] java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
(IdleStateHandler.java:254) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) Do you have any idea? I have set partitioning quite big, like 4 Best, Yifan LI
Re: [Spark] java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
I have a guess that before scanning that RDD, I sorted it and set partitioning, so the result is not balanced: sortBy[S](f: Function <http://spark.apache.org/docs/latest/api/scala/org/apache/spark/api/java/function/Function.html>[T, S], ascending: Boolean, numPartitions: Int) I will try to repartition it to see if it helps. Best, Yifan LI > On 29 Oct 2015, at 12:52, Yifan LI <iamyifa...@gmail.com> wrote: > > Hey, > > I was just trying to scan a large RDD sortedRdd, ~1billion elements, using > toLocalIterator api, but an exception returned as it was almost finished: > > java.lang.RuntimeException: java.lang.IllegalArgumentException: Size exceeds > Integer.MAX_VALUE > at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:821) > at > org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125) > at > org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285) > at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127) > at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134) > at > org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:511) > at > org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:302) > at > org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) > at > org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) > at > org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) > at > org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114) > at > org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:87) > at > org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:101) > at > org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51) > at > io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) > at > io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:254) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) > at > io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) > at > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) > at > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787) > at > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130) > at > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) > at > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) > at java.lang.T
Re: java.lang.NegativeArraySizeException? as iterating a big RDD
Thanks for your advice, Jem. :) I will increase the partitioning and see if it helps. Best, Yifan LI > On 23 Oct 2015, at 12:48, Jem Tucker <jem.tuc...@gmail.com> wrote: > > Hi Yifan, > > I think this is a result of Kryo trying to seriallize something too large. > Have you tried to increase your partitioning? > > Cheers, > > Jem > > On Fri, Oct 23, 2015 at 11:24 AM Yifan LI <iamyifa...@gmail.com > <mailto:iamyifa...@gmail.com>> wrote: > Hi, > > I have a big sorted RDD sRdd(~962million elements), and need to scan its > elements in order(using sRdd.toLocalIterator). > > But the process failed when the scanning was done after around 893million > elements, returned with following exception: > > Anyone has idea? Thanks! > > > Exception in thread "main" org.apache.spark.SparkException: Job aborted due > to stage failure: Task 0 in stage 421752.0 failed 128 times, most recent > failure: Lost task 0.127 in stage 421752.0 (TID 17304, > small15-tap1.common.lip6.fr <http://small15-tap1.common.lip6.fr/>): > java.lang.NegativeArraySizeException > at > com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:409) > at > com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:227) > at > com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221) > at > com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:117) > at > com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:228) > at > com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221) > at > com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:117) > at > com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapReferenceResolver.java:23) > at com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:598) > at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:566) > at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:36) > at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33) > at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293) > at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) > at > org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:250) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:236) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > Driver stacktrace: > at org.apache.spark.scheduler.DAGScheduler.org > <http://org.apache.spark.scheduler.dagscheduler.org/>$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) > at scala.Option.foreach(Option.scala:236) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > > Best, > Yifan LI > > > > >
java.lang.NegativeArraySizeException? as iterating a big RDD
Hi, I have a big sorted RDD sRdd(~962million elements), and need to scan its elements in order(using sRdd.toLocalIterator). But the process failed when the scanning was done after around 893million elements, returned with following exception: Anyone has idea? Thanks! Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 421752.0 failed 128 times, most recent failure: Lost task 0.127 in stage 421752.0 (TID 17304, small15-tap1.common.lip6.fr): java.lang.NegativeArraySizeException at com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:409) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:227) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:117) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:228) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:117) at com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapReferenceResolver.java:23) at com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:598) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:566) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:36) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:250) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:236) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) Best, Yifan LI
"dynamically" sort a large collection?
Hey, I need to scan a large "key-value" collection as below: 1) sort it on an attribute of “value” 2) scan it one by one, from element with largest value 2.1) if the current element matches a pre-defined condition, its value will be reduced and the element will be inserted back to collection. if not, this current element should be removed from collection. In my previous program, the 1) step can be easily conducted in Spark(RDD operation), but I am not sure how to do 2.1) step, esp. the “put/inserted back” operation on a sorted RDD. I have tried to make a new RDD at every-time an element was found to inserted, but it is very costly due to a re-sorting… Is there anyone having some ideas? Thanks so much! ** an example: the sorted result of initial collection C(on bold value), sortedC: (1, (71, “aaa")) (2, (60, “bbb")) (3, (53.5, “ccc”)) (4, (48, “ddd”)) (5, (29, “eee")) … pre-condition: its_value%2 == 0 if pre-condition is matched, its value will be reduce on half. Thus: #1: 71 is not matched, so this element is removed. (1, (71, “aaa”)) —> removed! (2, (60, “bbb")) (3, (53.5, “ccc”)) (4, (48, “ddd”)) (5, (29, “eee")) … #2: 60 is matched! 60/2 = 30, the collection right now should be as: (3, (53.5, “ccc”)) (4, (48, “ddd”)) (2, (30, “bbb”)) <— inserted back here (5, (29, “eee")) … Best, Yifan LI
Re: "dynamically" sort a large collection?
Hey Adrian, Thanks for your fast reply. :) Actually the “pre-condition” is not fixed in real application, e.g. it would change based on counting of previous unmatched elements. So I need to use iterator operator, rather than flatMap-like operators… Besides, do you have any idea on how to avoid that “sort again”? it is too costly… :( Anyway thank you again! Best, Yifan LI > On 12 Oct 2015, at 12:19, Adrian Tanase <atan...@adobe.com> wrote: > > I think you’re looking for the flatMap (or flatMapValues) operator – you can > do something like > > sortedRdd.flatMapValues( v => > If (v % 2 == 0) { > Some(v / 2) > } else { > None > } > ) > > Then you need to sort again. > > -adrian > > From: Yifan LI > Date: Monday, October 12, 2015 at 1:03 PM > To: spark users > Subject: "dynamically" sort a large collection? > > Hey, > > I need to scan a large "key-value" collection as below: > > 1) sort it on an attribute of “value” > 2) scan it one by one, from element with largest value > 2.1) if the current element matches a pre-defined condition, its value will > be reduced and the element will be inserted back to collection. > if not, this current element should be removed from collection. > > > In my previous program, the 1) step can be easily conducted in Spark(RDD > operation), but I am not sure how to do 2.1) step, esp. the “put/inserted > back” operation on a sorted RDD. > I have tried to make a new RDD at every-time an element was found to > inserted, but it is very costly due to a re-sorting… > > > Is there anyone having some ideas? > > Thanks so much! > > ** > an example: > > the sorted result of initial collection C(on bold value), sortedC: > (1, (71, “aaa")) > (2, (60, “bbb")) > (3, (53.5, “ccc”)) > (4, (48, “ddd”)) > (5, (29, “eee")) > … > > pre-condition: its_value%2 == 0 > if pre-condition is matched, its value will be reduce on half. > > Thus: > > #1: > 71 is not matched, so this element is removed. > (1, (71, “aaa”)) —> removed! > (2, (60, “bbb")) > (3, (53.5, “ccc”)) > (4, (48, “ddd”)) > (5, (29, “eee")) > … > > #2: > 60 is matched! 60/2 = 30, the collection right now should be as: > (3, (53.5, “ccc”)) > (4, (48, “ddd”)) > (2, (30, “bbb”)) <— inserted back here > (5, (29, “eee")) > … > > > > > > > Best, > Yifan LI > > > > >
Re: "dynamically" sort a large collection?
Shiwei, yes, you might be right. Thanks. :) Best, Yifan LI > On 12 Oct 2015, at 12:55, 郭士伟 <guoshi...@gmail.com> wrote: > > I think this is not a problem Spark can solve effectively, cause RDD in > immutable. Every time you want to change an RDD, you create a new one, and > sort again. Maybe hbase or some other DB system will be a more suitable > solution. Or, if the data can fit into memory, use a simple heap will work. > > > 2015-10-12 18:29 GMT+08:00 Yifan LI <iamyifa...@gmail.com > <mailto:iamyifa...@gmail.com>>: > Hey Adrian, > > Thanks for your fast reply. :) > > Actually the “pre-condition” is not fixed in real application, e.g. it would > change based on counting of previous unmatched elements. > So I need to use iterator operator, rather than flatMap-like operators… > > Besides, do you have any idea on how to avoid that “sort again”? it is too > costly… :( > > Anyway thank you again! > > Best, > Yifan LI > > > > > >> On 12 Oct 2015, at 12:19, Adrian Tanase <atan...@adobe.com >> <mailto:atan...@adobe.com>> wrote: >> >> I think you’re looking for the flatMap (or flatMapValues) operator – you can >> do something like >> >> sortedRdd.flatMapValues( v => >> If (v % 2 == 0) { >> Some(v / 2) >> } else { >> None >> } >> ) >> >> Then you need to sort again. >> >> -adrian >> >> From: Yifan LI >> Date: Monday, October 12, 2015 at 1:03 PM >> To: spark users >> Subject: "dynamically" sort a large collection? >> >> Hey, >> >> I need to scan a large "key-value" collection as below: >> >> 1) sort it on an attribute of “value” >> 2) scan it one by one, from element with largest value >> 2.1) if the current element matches a pre-defined condition, its value will >> be reduced and the element will be inserted back to collection. >> if not, this current element should be removed from collection. >> >> >> In my previous program, the 1) step can be easily conducted in Spark(RDD >> operation), but I am not sure how to do 2.1) step, esp. the “put/inserted >> back” operation on a sorted RDD. >> I have tried to make a new RDD at every-time an element was found to >> inserted, but it is very costly due to a re-sorting… >> >> >> Is there anyone having some ideas? >> >> Thanks so much! >> >> ** >> an example: >> >> the sorted result of initial collection C(on bold value), sortedC: >> (1, (71, “aaa")) >> (2, (60, “bbb")) >> (3, (53.5, “ccc”)) >> (4, (48, “ddd”)) >> (5, (29, “eee")) >> … >> >> pre-condition: its_value%2 == 0 >> if pre-condition is matched, its value will be reduce on half. >> >> Thus: >> >> #1: >> 71 is not matched, so this element is removed. >> (1, (71, “aaa”)) —> removed! >> (2, (60, “bbb")) >> (3, (53.5, “ccc”)) >> (4, (48, “ddd”)) >> (5, (29, “eee")) >> … >> >> #2: >> 60 is matched! 60/2 = 30, the collection right now should be as: >> (3, (53.5, “ccc”)) >> (4, (48, “ddd”)) >> (2, (30, “bbb”)) <— inserted back here >> (5, (29, “eee")) >> … >> >> >> >> >> >> >> Best, >> Yifan LI >> >> >> >> >> > >
Re: Master dies after program finishes normally
Hi, I just encountered the same problem, when I run a PageRank program which has lots of stages(iterations)… The master was lost after my program done. And, the issue still remains even I increased driver memory. Have any idea? e.g. how to increase the master memory? Thanks. Best, Yifan LI On 12 Feb 2015, at 20:05, Arush Kharbanda ar...@sigmoidanalytics.com wrote: What is your cluster configuration? Did you try looking at the Web UI? There are many tips here http://spark.apache.org/docs/1.2.0/tuning.html http://spark.apache.org/docs/1.2.0/tuning.html Did you try these? On Fri, Feb 13, 2015 at 12:09 AM, Manas Kar manasdebashis...@gmail.com mailto:manasdebashis...@gmail.com wrote: Hi, I have a Hidden Markov Model running with 200MB data. Once the program finishes (i.e. all stages/jobs are done) the program hangs for 20 minutes or so before killing master. In the spark master the following log appears. 2015-02-12 13:00:05,035 ERROR akka.actor.ActorSystemImpl: Uncaught fatal error from thread [sparkMaster-akka.actor.default-dispatcher-31] shutting down ActorSystem [sparkMaster] java.lang.OutOfMemoryError: GC overhead limit exceeded at scala.collection.immutable.List$.newBuilder(List.scala:396) at scala.collection.generic.GenericTraversableTemplate$class.genericBuilder(GenericTraversableTemplate.scala:69) at scala.collection.AbstractTraversable.genericBuilder(Traversable.scala:105) at scala.collection.generic.GenTraversableFactory$GenericCanBuildFrom.apply(GenTraversableFactory.scala:58) at scala.collection.generic.GenTraversableFactory$GenericCanBuildFrom.apply(GenTraversableFactory.scala:53) at scala.collection.TraversableLike$class.builder$1(TraversableLike.scala:239) at scala.collection.TraversableLike$class.map(TraversableLike.scala:243) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.json4s.MonadicJValue$$anonfun$org$json4s$MonadicJValue$$findDirectByName$1.apply(MonadicJValue.scala:26) at org.json4s.MonadicJValue$$anonfun$org$json4s$MonadicJValue$$findDirectByName$1.apply(MonadicJValue.scala:22) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at org.json4s.MonadicJValue.org http://org.json4s.monadicjvalue.org/$json4s$MonadicJValue$$findDirectByName(MonadicJValue.scala:22) at org.json4s.MonadicJValue.$bslash(MonadicJValue.scala:16) at org.apache.spark.util.JsonProtocol$.taskStartFromJson(JsonProtocol.scala:450) at org.apache.spark.util.JsonProtocol$.sparkEventFromJson(JsonProtocol.scala:423) at org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2$$anonfun$apply$1.apply(ReplayListenerBus.scala:71) at org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2$$anonfun$apply$1.apply(ReplayListenerBus.scala:69) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2.apply(ReplayListenerBus.scala:69) at org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2.apply(ReplayListenerBus.scala:55) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:55) at org.apache.spark.deploy.master.Master.rebuildSparkUI(Master.scala:726) at org.apache.spark.deploy.master.Master.removeApplication(Master.scala:675) at org.apache.spark.deploy.master.Master.finishApplication(Master.scala:653) at org.apache.spark.deploy.master.Master$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$29.apply(Master.scala:399) Can anyone help? ..Manas -- http://htmlsig.com/www.sigmoidanalytics.com Arush Kharbanda || Technical Teamlead ar...@sigmoidanalytics.com mailto:ar...@sigmoidanalytics.com || www.sigmoidanalytics.com http://www.sigmoidanalytics.com/
large shuffling = executor lost?
Hi, I am running my graphx application with Spark 1.3.1 on a small cluster. Then it failed on this exception: org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 1 But actually I found it is caused by “ExecutorLostFailure” indeed, and someone told it might because there was a large shuffling… Is there anyone has idea to fix it? Thanks in advance! Best, Yifan LI
com.esotericsoftware.kryo.KryoException: java.io.IOException: Stream is corrupted
Hi, I was running our graphx application(worked finely on Spark 1.2.0) but failed on Spark 1.3.1 with below exception. Anyone has idea on this issue? I guess it was caused by using LZ4 codec? Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 54 in stage 7.6 failed 128 times, most recent failure: Lost task 54.127 in stage 7.6 (TID 5311, small15-tap1.common.lip6.fr): com.esotericsoftware.kryo.KryoException: java.io.IOException: Stream is corrupted at com.esotericsoftware.kryo.io.Input.fill(Input.java:142) at com.esotericsoftware.kryo.io.Input.require(Input.java:155) at com.esotericsoftware.kryo.io.Input.readInt(Input.java:337) at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:109) at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:721) at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:138) at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.graphx.impl.ShippableVertexPartition$.apply(ShippableVertexPartition.scala:60) at org.apache.spark.graphx.VertexRDD$$anonfun$2.apply(VertexRDD.scala:300) at org.apache.spark.graphx.VertexRDD$$anonfun$2.apply(VertexRDD.scala:297) at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70) at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Stream is corrupted at net.jpountz.lz4.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:152) at net.jpountz.lz4.LZ4BlockInputStream.read(LZ4BlockInputStream.java:116) at com.esotericsoftware.kryo.io.Input.fill(Input.java:140) ... 35 more Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) Best, Yifan LI
applications are still in progress?
Hi, I have some applications finished(but actually failed before), that in WebUI show Application myApp is still in progress. and, in the eventlog folder, there are several log files like this: app-20150512***.inprogress So, I am wondering what the “inprogress” means… Thanks! :) Best, Yifan LI
No space left on device??
Hi, I am running my graphx application on Spark, but it failed since there is an error on one executor node(on which available hdfs space is small) that “no space left on device”. I can understand why it happened, because my vertex(-attribute) rdd was becoming bigger and bigger during computation…, so maybe sometime the request on that node was too bigger than available space. But, is there any way to avoid this kind of error? I am sure that the overall disk space of all nodes is enough for my application. Thanks in advance! Best, Yifan LI
Re: No space left on device??
Thanks, Shao. :-) I am wondering if the spark will rebalance the storage overhead in runtime…since still there is some available space on other nodes. Best, Yifan LI On 06 May 2015, at 14:57, Saisai Shao sai.sai.s...@gmail.com wrote: I think you could configure multiple disks through spark.local.dir, default is /tmp. Anyway if your intermediate data is larger than available disk space, still will meet this issue. spark.local.dir /tmpDirectory to use for scratch space in Spark, including map output files and RDDs that get stored on disk. This should be on a fast, local disk in your system. It can also be a comma-separated list of multiple directories on different disks. NOTE: In Spark 1.0 and later this will be overriden by SPARK_LOCAL_DIRS (Standalone, Mesos) or LOCAL_DIRS (YARN) environment variables set by the cluster manager. 2015-05-06 20:35 GMT+08:00 Yifan LI iamyifa...@gmail.com mailto:iamyifa...@gmail.com: Hi, I am running my graphx application on Spark, but it failed since there is an error on one executor node(on which available hdfs space is small) that “no space left on device”. I can understand why it happened, because my vertex(-attribute) rdd was becoming bigger and bigger during computation…, so maybe sometime the request on that node was too bigger than available space. But, is there any way to avoid this kind of error? I am sure that the overall disk space of all nodes is enough for my application. Thanks in advance! Best, Yifan LI
Re: No space left on device??
Yes, you are right. For now I have to say the workload/executor is distributed evenly…so, like you said, it is difficult to improve the situation. However, have you any idea of how to make a *skew* data/executor distribution? Best, Yifan LI On 06 May 2015, at 15:13, Saisai Shao sai.sai.s...@gmail.com wrote: I think it depends on your workload and executor distribution, if your workload is evenly distributed without any big data skew, and executors are evenly distributed on each nodes, the storage usage of each node is nearly the same. Spark itself cannot rebalance the storage overhead as you mentioned. 2015-05-06 21:09 GMT+08:00 Yifan LI iamyifa...@gmail.com mailto:iamyifa...@gmail.com: Thanks, Shao. :-) I am wondering if the spark will rebalance the storage overhead in runtime…since still there is some available space on other nodes. Best, Yifan LI On 06 May 2015, at 14:57, Saisai Shao sai.sai.s...@gmail.com mailto:sai.sai.s...@gmail.com wrote: I think you could configure multiple disks through spark.local.dir, default is /tmp. Anyway if your intermediate data is larger than available disk space, still will meet this issue. spark.local.dir /tmpDirectory to use for scratch space in Spark, including map output files and RDDs that get stored on disk. This should be on a fast, local disk in your system. It can also be a comma-separated list of multiple directories on different disks. NOTE: In Spark 1.0 and later this will be overriden by SPARK_LOCAL_DIRS (Standalone, Mesos) or LOCAL_DIRS (YARN) environment variables set by the cluster manager. 2015-05-06 20:35 GMT+08:00 Yifan LI iamyifa...@gmail.com mailto:iamyifa...@gmail.com: Hi, I am running my graphx application on Spark, but it failed since there is an error on one executor node(on which available hdfs space is small) that “no space left on device”. I can understand why it happened, because my vertex(-attribute) rdd was becoming bigger and bigger during computation…, so maybe sometime the request on that node was too bigger than available space. But, is there any way to avoid this kind of error? I am sure that the overall disk space of all nodes is enough for my application. Thanks in advance! Best, Yifan LI
to split an RDD to multiple ones?
Hi, I have an RDD srdd containing (unordered-)data like this: s1_0, s3_0, s2_1, s2_2, s3_1, s1_3, s1_2, … What I want is (it will be much better if they could be in ascending order): srdd_s1: s1_0, s1_1, s1_2, …, s1_n srdd_s2: s2_0, s2_1, s2_2, …, s2_n srdd_s3: s3_0, s3_1, s3_2, …, s3_n … … Have any idea? Thanks in advance! :) Best, Yifan LI
Re: to split an RDD to multiple ones?
Thanks, Olivier and Franz. :) Best, Yifan LI On 02 May 2015, at 23:23, Olivier Girardot ssab...@gmail.com wrote: I guess : val srdd_s1 = srdd.filter(_.startsWith(s1_)).sortBy(_) val srdd_s2 = srdd.filter(_.startsWith(s2_)).sortBy(_) val srdd_s3 = srdd.filter(_.startsWith(s3_)).sortBy(_) Regards, Olivier. Le sam. 2 mai 2015 à 22:53, Yifan LI iamyifa...@gmail.com mailto:iamyifa...@gmail.com a écrit : Hi, I have an RDD srdd containing (unordered-)data like this: s1_0, s3_0, s2_1, s2_2, s3_1, s1_3, s1_2, … What I want is (it will be much better if they could be in ascending order): srdd_s1: s1_0, s1_1, s1_2, …, s1_n srdd_s2: s2_0, s2_1, s2_2, …, s2_n srdd_s3: s3_0, s3_1, s3_2, …, s3_n … … Have any idea? Thanks in advance! :) Best, Yifan LI
How to avoid the repartitioning in graph construction
Hi, Now I have 10 edge data files in my HDFS directory, e.g. edges_part00, edges_part01, …, edges_part09 format: srcId tarId (They make a good partitioning of that whole graph, so I never expect any change(re-partitoning operations) on them during graph building). I am thinking of how to use them to construct graph using Graphx api, without any repartitioning. My idea: 1) to build an RDD, edgeTupleRDD, by using sc.textFile(“hdfs://myDirectory”) in where each file size is limited below 64MB(smaller than a HDFS block) so, normally I could get 1 partitions per file, right? 2) then, to build the graph by using Graph.fromEdgeTuples(edgeTupleRDD,..) from graphx documentation, this operation will keep those partitions without any change, right? ——— — - Is there any other idea, or anything I missed? - if a file is larger than 64MB(the default size of a HDFS block), the repartitioning will be inevitable?? Thanks in advance! Best, Yifan LI
Re: Processing graphs
Hi Kannan, I am not sure I have understood what your question is exactly, but maybe the reduceByKey or reduceByKeyLocally functionality is better to your need. Best, Yifan LI On 17 Feb 2015, at 17:37, Vijayasarathy Kannan kvi...@vt.edu wrote: Hi, I am working on a Spark application that processes graphs and I am trying to do the following. - group the vertices (key - vertex, value - set of its outgoing edges) - distribute each key to separate processes and process them (like mapper) - reduce the results back at the main process Does the groupBy functionality do the distribution by default? Do we have to explicitly use RDDs to enable automatic distribution? It'd be great if you could help me understand these and how to go about with the problem. Thanks.
Re: OutofMemoryError: Java heap space
Thanks, Kelvin :) The error seems to disappear after I decreased both spark.storage.memoryFraction and spark.shuffle.memoryFraction to 0.2 And, some increase on driver memory. Best, Yifan LI On 10 Feb 2015, at 18:58, Kelvin Chu 2dot7kel...@gmail.com wrote: Since the stacktrace shows kryo is being used, maybe, you could also try increasing spark.kryoserializer.buffer.max.mb. Hope this help. Kelvin On Tue, Feb 10, 2015 at 1:26 AM, Akhil Das ak...@sigmoidanalytics.com mailto:ak...@sigmoidanalytics.com wrote: You could try increasing the driver memory. Also, can you be more specific about the data volume? Thanks Best Regards On Mon, Feb 9, 2015 at 3:30 PM, Yifan LI iamyifa...@gmail.com mailto:iamyifa...@gmail.com wrote: Hi, I just found the following errors during computation(graphx), anyone has ideas on this? thanks so much! (I think the memory is sufficient, spark.executor.memory 30GB ) 15/02/09 00:37:12 ERROR Executor: Exception in task 162.0 in stage 719.0 (TID 7653) java.lang.OutOfMemoryError: Java heap space at com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:410) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:113) at com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapReferenceResolver.java:23) at com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:598) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:566) at com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:29) at com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:27) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at com.twitter.chill.TraversableSerializer.write(Traversable.scala:27) at com.twitter.chill.TraversableSerializer.write(Traversable.scala:21) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:36) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:128) at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:110) 15/02/09 00:37:12 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-15,5,main] java.lang.OutOfMemoryError: Java heap space at com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:410) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:113) at com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapReferenceResolver.java:23) at com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:598) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:566) at com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:29) at com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:27) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226
Re: OutofMemoryError: Java heap space
Hi Akhil, Excuse me, I am trying a random-walk algorithm over a not that large graph(~1GB raw dataset, including ~5million vertices and ~60million edges) on a cluster which has 20 machines. And, the property of each vertex in graph is a hash map, of which size will increase dramatically during pregel supersteps. so, it seems to suffer from high GC? Best, Yifan LI On 10 Feb 2015, at 10:26, Akhil Das ak...@sigmoidanalytics.com wrote: You could try increasing the driver memory. Also, can you be more specific about the data volume? Thanks Best Regards On Mon, Feb 9, 2015 at 3:30 PM, Yifan LI iamyifa...@gmail.com mailto:iamyifa...@gmail.com wrote: Hi, I just found the following errors during computation(graphx), anyone has ideas on this? thanks so much! (I think the memory is sufficient, spark.executor.memory 30GB ) 15/02/09 00:37:12 ERROR Executor: Exception in task 162.0 in stage 719.0 (TID 7653) java.lang.OutOfMemoryError: Java heap space at com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:410) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:113) at com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapReferenceResolver.java:23) at com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:598) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:566) at com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:29) at com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:27) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at com.twitter.chill.TraversableSerializer.write(Traversable.scala:27) at com.twitter.chill.TraversableSerializer.write(Traversable.scala:21) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:36) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:128) at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:110) 15/02/09 00:37:12 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-15,5,main] java.lang.OutOfMemoryError: Java heap space at com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:410) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:113) at com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapReferenceResolver.java:23) at com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:598) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:566) at com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:29) at com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:27) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39
Re: OutofMemoryError: Java heap space
Yes, I have read it, and am trying to find some way to do that… Thanks :) Best, Yifan LI On 10 Feb 2015, at 12:06, Akhil Das ak...@sigmoidanalytics.com wrote: Did you have a chance to look at this doc http://spark.apache.org/docs/1.2.0/tuning.html http://spark.apache.org/docs/1.2.0/tuning.html Thanks Best Regards On Tue, Feb 10, 2015 at 4:13 PM, Yifan LI iamyifa...@gmail.com mailto:iamyifa...@gmail.com wrote: Hi Akhil, Excuse me, I am trying a random-walk algorithm over a not that large graph(~1GB raw dataset, including ~5million vertices and ~60million edges) on a cluster which has 20 machines. And, the property of each vertex in graph is a hash map, of which size will increase dramatically during pregel supersteps. so, it seems to suffer from high GC? Best, Yifan LI On 10 Feb 2015, at 10:26, Akhil Das ak...@sigmoidanalytics.com mailto:ak...@sigmoidanalytics.com wrote: You could try increasing the driver memory. Also, can you be more specific about the data volume? Thanks Best Regards On Mon, Feb 9, 2015 at 3:30 PM, Yifan LI iamyifa...@gmail.com mailto:iamyifa...@gmail.com wrote: Hi, I just found the following errors during computation(graphx), anyone has ideas on this? thanks so much! (I think the memory is sufficient, spark.executor.memory 30GB ) 15/02/09 00:37:12 ERROR Executor: Exception in task 162.0 in stage 719.0 (TID 7653) java.lang.OutOfMemoryError: Java heap space at com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:410) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:113) at com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapReferenceResolver.java:23) at com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:598) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:566) at com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:29) at com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:27) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at com.twitter.chill.TraversableSerializer.write(Traversable.scala:27) at com.twitter.chill.TraversableSerializer.write(Traversable.scala:21) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:36) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:128) at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:110) 15/02/09 00:37:12 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-15,5,main] java.lang.OutOfMemoryError: Java heap space at com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:410) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:113) at com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapReferenceResolver.java:23) at com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:598) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:566) at com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:29
OutofMemoryError: Java heap space
(TupleSerializers.scala:33) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:128) at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:110) Best, Yifan LI
how to debug this kind of error, e.g. lost executor?
Hi, I am running a heavy memory/cpu overhead graphx application, I think the memory is sufficient and set RDDs’ StorageLevel using MEMORY_AND_DISK. But I found there were some tasks failed due to following errors: java.io.FileNotFoundException: /data/spark/local/spark-local-20150205151711-9700/09/rdd_3_275 (No files or folders of this type) ExecutorLostFailure (executor 11 lost) So, finally that stage failed: org.apache.spark.shuffle.FetchFailedException: java.io.FileNotFoundException: /data/spark/local/spark-local-20150205151711-587a/16/shuffle_11_219_0.index Anyone has points? Where I can get more details for this issue? Best, Yifan LI
Re: how to debug this kind of error, e.g. lost executor?
Anyone has idea on where I can find the detailed log of that lost executor(why it was lost)? Thanks in advance! On 05 Feb 2015, at 16:14, Yifan LI iamyifa...@gmail.com wrote: Hi, I am running a heavy memory/cpu overhead graphx application, I think the memory is sufficient and set RDDs’ StorageLevel using MEMORY_AND_DISK. But I found there were some tasks failed due to following errors: java.io.FileNotFoundException: /data/spark/local/spark-local-20150205151711-9700/09/rdd_3_275 (No files or folders of this type) ExecutorLostFailure (executor 11 lost) So, finally that stage failed: org.apache.spark.shuffle.FetchFailedException: java.io.FileNotFoundException: /data/spark/local/spark-local-20150205151711-587a/16/shuffle_11_219_0.index Anyone has points? Where I can get more details for this issue? Best, Yifan LI
Re: [Graphx Spark] Error of Lost executor and TimeoutException
Thanks, Sonal. But it seems to be an error happened when “cleaning broadcast”? BTW, what is the timeout of “[30 seconds]”? can I increase it? Best, Yifan LI On 02 Feb 2015, at 11:12, Sonal Goyal sonalgoy...@gmail.com wrote: That may be the cause of your issue. Take a look at the tuning guide[1] and maybe also profile your application. See if you can reuse your objects. 1. http://spark.apache.org/docs/latest/tuning.html http://spark.apache.org/docs/latest/tuning.html Best Regards, Sonal Founder, Nube Technologies http://www.nubetech.co/ http://in.linkedin.com/in/sonalgoyal On Sat, Jan 31, 2015 at 4:21 AM, Yifan LI iamyifa...@gmail.com mailto:iamyifa...@gmail.com wrote: Yes, I think so, esp. for a pregel application… have any suggestion? Best, Yifan LI On 30 Jan 2015, at 22:25, Sonal Goyal sonalgoy...@gmail.com mailto:sonalgoy...@gmail.com wrote: Is your code hitting frequent garbage collection? Best Regards, Sonal Founder, Nube Technologies http://www.nubetech.co/ http://in.linkedin.com/in/sonalgoyal On Fri, Jan 30, 2015 at 7:52 PM, Yifan LI iamyifa...@gmail.com mailto:iamyifa...@gmail.com wrote: Hi, I am running my graphx application on Spark 1.2.0(11 nodes cluster), has requested 30GB memory per node and 100 cores for around 1GB input dataset(5 million vertices graph). But the error below always happen… Is there anyone could give me some points? (BTW, the overall edge/vertex RDDs will reach more than 100GB during graph computation, and another version of my application can work well on the same dataset while it need much less memory during computation) Thanks in advance!!! 15/01/29 18:05:08 ERROR ContextCleaner: Error cleaning broadcast 60 java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:137) at org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:227) at org.apache.spark.broadcast.TorrentBroadcastFactory.unbroadcast(TorrentBroadcastFactory.scala:45) at org.apache.spark.broadcast.BroadcastManager.unbroadcast(BroadcastManager.scala:66) at org.apache.spark.ContextCleaner.doCleanupBroadcast(ContextCleaner.scala:185) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:147) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:138) at scala.Option.foreach(Option.scala:236) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:138) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply(ContextCleaner.scala:134) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply(ContextCleaner.scala:134) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1460) at org.apache.spark.ContextCleaner.org http://org.apache.spark.contextcleaner.org/$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:133) at org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:65) [Stage 91:=== (2 + 4) / 6]15/01/29 18:08:15 ERROR SparkDeploySchedulerBackend: Asked to remove non-existent executor 0 [Stage 93: (29 + 20) / 49]15/01/29 23:47:03 ERROR TaskSchedulerImpl: Lost executor 9 on small11-tap1.common.lip6.fr http://small11-tap1.common.lip6.fr/: remote Akka client disassociated [Stage 83: (1 + 0) / 6][Stage 86: (0 + 1) / 2][Stage 88: (0 + 2) / 8]15/01/29 23:47:06 ERROR SparkDeploySchedulerBackend: Asked to remove non-existent executor 9 [Stage 83:=== (5 + 1) / 6][Stage 88:= (9 + 2) / 11]15/01/29 23:57:30 ERROR TaskSchedulerImpl: Lost executor 8 on small10-tap1.common.lip6.fr http://small10-tap1.common.lip6.fr/: remote Akka client disassociated 15/01/29 23:57:30 ERROR SparkDeploySchedulerBackend: Asked to remove non-existent executor 8 15/01/29 23:57:30 ERROR SparkDeploySchedulerBackend: Asked to remove non-existent executor 8 Best, Yifan LI
Re: [Graphx Spark] Error of Lost executor and TimeoutException
I think this broadcast cleaning(memory block remove?) timeout exception was caused by: 15/02/02 11:48:49 ERROR TaskSchedulerImpl: Lost executor 13 on small18-tap1.common.lip6.fr: remote Akka client disassociated 15/02/02 11:48:49 ERROR SparkDeploySchedulerBackend: Asked to remove non-existent executor 13 15/02/02 11:48:49 ERROR SparkDeploySchedulerBackend: Asked to remove non-existent executor 13 Anyone has points on this? Best, Yifan LI On 02 Feb 2015, at 11:47, Yifan LI iamyifa...@gmail.com wrote: Thanks, Sonal. But it seems to be an error happened when “cleaning broadcast”? BTW, what is the timeout of “[30 seconds]”? can I increase it? Best, Yifan LI On 02 Feb 2015, at 11:12, Sonal Goyal sonalgoy...@gmail.com mailto:sonalgoy...@gmail.com wrote: That may be the cause of your issue. Take a look at the tuning guide[1] and maybe also profile your application. See if you can reuse your objects. 1. http://spark.apache.org/docs/latest/tuning.html http://spark.apache.org/docs/latest/tuning.html Best Regards, Sonal Founder, Nube Technologies http://www.nubetech.co/ http://in.linkedin.com/in/sonalgoyal On Sat, Jan 31, 2015 at 4:21 AM, Yifan LI iamyifa...@gmail.com mailto:iamyifa...@gmail.com wrote: Yes, I think so, esp. for a pregel application… have any suggestion? Best, Yifan LI On 30 Jan 2015, at 22:25, Sonal Goyal sonalgoy...@gmail.com mailto:sonalgoy...@gmail.com wrote: Is your code hitting frequent garbage collection? Best Regards, Sonal Founder, Nube Technologies http://www.nubetech.co/ http://in.linkedin.com/in/sonalgoyal On Fri, Jan 30, 2015 at 7:52 PM, Yifan LI iamyifa...@gmail.com mailto:iamyifa...@gmail.com wrote: Hi, I am running my graphx application on Spark 1.2.0(11 nodes cluster), has requested 30GB memory per node and 100 cores for around 1GB input dataset(5 million vertices graph). But the error below always happen… Is there anyone could give me some points? (BTW, the overall edge/vertex RDDs will reach more than 100GB during graph computation, and another version of my application can work well on the same dataset while it need much less memory during computation) Thanks in advance!!! 15/01/29 18:05:08 ERROR ContextCleaner: Error cleaning broadcast 60 java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:137) at org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:227) at org.apache.spark.broadcast.TorrentBroadcastFactory.unbroadcast(TorrentBroadcastFactory.scala:45) at org.apache.spark.broadcast.BroadcastManager.unbroadcast(BroadcastManager.scala:66) at org.apache.spark.ContextCleaner.doCleanupBroadcast(ContextCleaner.scala:185) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:147) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:138) at scala.Option.foreach(Option.scala:236) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:138) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply(ContextCleaner.scala:134) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply(ContextCleaner.scala:134) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1460) at org.apache.spark.ContextCleaner.org http://org.apache.spark.contextcleaner.org/$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:133) at org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:65) [Stage 91:=== (2 + 4) / 6]15/01/29 18:08:15 ERROR SparkDeploySchedulerBackend: Asked to remove non-existent executor 0 [Stage 93: (29 + 20) / 49]15/01/29 23:47:03 ERROR TaskSchedulerImpl: Lost executor 9 on small11-tap1.common.lip6.fr http://small11-tap1.common.lip6.fr/: remote Akka client disassociated [Stage 83: (1 + 0) / 6][Stage 86: (0 + 1) / 2][Stage 88: (0 + 2) / 8]15/01/29 23:47:06 ERROR SparkDeploySchedulerBackend: Asked to remove non-existent executor 9 [Stage 83:=== (5 + 1) / 6][Stage 88
Re: [Graphx Spark] Error of Lost executor and TimeoutException
Yes, I think so, esp. for a pregel application… have any suggestion? Best, Yifan LI On 30 Jan 2015, at 22:25, Sonal Goyal sonalgoy...@gmail.com wrote: Is your code hitting frequent garbage collection? Best Regards, Sonal Founder, Nube Technologies http://www.nubetech.co/ http://in.linkedin.com/in/sonalgoyal On Fri, Jan 30, 2015 at 7:52 PM, Yifan LI iamyifa...@gmail.com mailto:iamyifa...@gmail.com wrote: Hi, I am running my graphx application on Spark 1.2.0(11 nodes cluster), has requested 30GB memory per node and 100 cores for around 1GB input dataset(5 million vertices graph). But the error below always happen… Is there anyone could give me some points? (BTW, the overall edge/vertex RDDs will reach more than 100GB during graph computation, and another version of my application can work well on the same dataset while it need much less memory during computation) Thanks in advance!!! 15/01/29 18:05:08 ERROR ContextCleaner: Error cleaning broadcast 60 java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:137) at org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:227) at org.apache.spark.broadcast.TorrentBroadcastFactory.unbroadcast(TorrentBroadcastFactory.scala:45) at org.apache.spark.broadcast.BroadcastManager.unbroadcast(BroadcastManager.scala:66) at org.apache.spark.ContextCleaner.doCleanupBroadcast(ContextCleaner.scala:185) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:147) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:138) at scala.Option.foreach(Option.scala:236) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:138) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply(ContextCleaner.scala:134) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply(ContextCleaner.scala:134) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1460) at org.apache.spark.ContextCleaner.org http://org.apache.spark.contextcleaner.org/$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:133) at org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:65) [Stage 91:=== (2 + 4) / 6]15/01/29 18:08:15 ERROR SparkDeploySchedulerBackend: Asked to remove non-existent executor 0 [Stage 93: (29 + 20) / 49]15/01/29 23:47:03 ERROR TaskSchedulerImpl: Lost executor 9 on small11-tap1.common.lip6.fr http://small11-tap1.common.lip6.fr/: remote Akka client disassociated [Stage 83: (1 + 0) / 6][Stage 86: (0 + 1) / 2][Stage 88: (0 + 2) / 8]15/01/29 23:47:06 ERROR SparkDeploySchedulerBackend: Asked to remove non-existent executor 9 [Stage 83:=== (5 + 1) / 6][Stage 88:= (9 + 2) / 11]15/01/29 23:57:30 ERROR TaskSchedulerImpl: Lost executor 8 on small10-tap1.common.lip6.fr http://small10-tap1.common.lip6.fr/: remote Akka client disassociated 15/01/29 23:57:30 ERROR SparkDeploySchedulerBackend: Asked to remove non-existent executor 8 15/01/29 23:57:30 ERROR SparkDeploySchedulerBackend: Asked to remove non-existent executor 8 Best, Yifan LI
[Graphx Spark] Error of Lost executor and TimeoutException
Hi, I am running my graphx application on Spark 1.2.0(11 nodes cluster), has requested 30GB memory per node and 100 cores for around 1GB input dataset(5 million vertices graph). But the error below always happen… Is there anyone could give me some points? (BTW, the overall edge/vertex RDDs will reach more than 100GB during graph computation, and another version of my application can work well on the same dataset while it need much less memory during computation) Thanks in advance!!! 15/01/29 18:05:08 ERROR ContextCleaner: Error cleaning broadcast 60 java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:137) at org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:227) at org.apache.spark.broadcast.TorrentBroadcastFactory.unbroadcast(TorrentBroadcastFactory.scala:45) at org.apache.spark.broadcast.BroadcastManager.unbroadcast(BroadcastManager.scala:66) at org.apache.spark.ContextCleaner.doCleanupBroadcast(ContextCleaner.scala:185) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:147) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:138) at scala.Option.foreach(Option.scala:236) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:138) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply(ContextCleaner.scala:134) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply(ContextCleaner.scala:134) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1460) at org.apache.spark.ContextCleaner.org http://org.apache.spark.contextcleaner.org/$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:133) at org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:65) [Stage 91:=== (2 + 4) / 6]15/01/29 18:08:15 ERROR SparkDeploySchedulerBackend: Asked to remove non-existent executor 0 [Stage 93: (29 + 20) / 49]15/01/29 23:47:03 ERROR TaskSchedulerImpl: Lost executor 9 on small11-tap1.common.lip6.fr http://small11-tap1.common.lip6.fr/: remote Akka client disassociated [Stage 83: (1 + 0) / 6][Stage 86: (0 + 1) / 2][Stage 88: (0 + 2) / 8]15/01/29 23:47:06 ERROR SparkDeploySchedulerBackend: Asked to remove non-existent executor 9 [Stage 83:=== (5 + 1) / 6][Stage 88:= (9 + 2) / 11]15/01/29 23:57:30 ERROR TaskSchedulerImpl: Lost executor 8 on small10-tap1.common.lip6.fr http://small10-tap1.common.lip6.fr/: remote Akka client disassociated 15/01/29 23:57:30 ERROR SparkDeploySchedulerBackend: Asked to remove non-existent executor 8 15/01/29 23:57:30 ERROR SparkDeploySchedulerBackend: Asked to remove non-existent executor 8 Best, Yifan LI
[Graphx Spark] Error of Lost executor and TimeoutException
Hi, I am running my graphx application on Spark 1.2.0(11 nodes cluster), has requested 30GB memory per node and 100 cores for around 1GB input dataset(5 million vertices graph). But the error below always happen… Is there anyone could give me some points? (BTW, the overall edge/vertex RDDs will reach more than 100GB during graph computation, and another version of my application can work well on the same dataset while it need much less memory during computation) Thanks in advance!!! 15/01/29 18:05:08 ERROR ContextCleaner: Error cleaning broadcast 60 java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:137) at org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:227) at org.apache.spark.broadcast.TorrentBroadcastFactory.unbroadcast(TorrentBroadcastFactory.scala:45) at org.apache.spark.broadcast.BroadcastManager.unbroadcast(BroadcastManager.scala:66) at org.apache.spark.ContextCleaner.doCleanupBroadcast(ContextCleaner.scala:185) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:147) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:138) at scala.Option.foreach(Option.scala:236) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:138) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply(ContextCleaner.scala:134) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply(ContextCleaner.scala:134) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1460) at org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:133) at org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:65) [Stage 91:=== (2 + 4) / 6]15/01/29 18:08:15 ERROR SparkDeploySchedulerBackend: Asked to remove non-existent executor 0 [Stage 93: (29 + 20) / 49]15/01/29 23:47:03 ERROR TaskSchedulerImpl: Lost executor 9 on small11-tap1.common.lip6.fr: remote Akka client disassociated [Stage 83: (1 + 0) / 6][Stage 86: (0 + 1) / 2][Stage 88: (0 + 2) / 8]15/01/29 23:47:06 ERROR SparkDeploySchedulerBackend: Asked to remove non-existent executor 9 [Stage 83:=== (5 + 1) / 6][Stage 88:= (9 + 2) / 11]15/01/29 23:57:30 ERROR TaskSchedulerImpl: Lost executor 8 on small10-tap1.common.lip6.fr: remote Akka client disassociated 15/01/29 23:57:30 ERROR SparkDeploySchedulerBackend: Asked to remove non-existent executor 8 15/01/29 23:57:30 ERROR SparkDeploySchedulerBackend: Asked to remove non-existent executor 8 Best, Yifan LI
300% Fraction Cached?
Hi, I just saw an Edge RDD is 300% Fraction Cached” in Storage WebUI, what does that mean? I can understand if the value was under 100%… Thanks. Best, Yifan LI
[Graphx] the communication cost of leftJoin
Hi, I am trying to leftJoin an other vertice RDD(e.g vB) with this one(vA). vA.leftJoin(vB)(f) - vA is the vertices RDD in graph G, and G is edge-partitioned using EdgePartition2D. - vB is created using default partitioner(actually I am not sure...) So, I am wondering, that if vB has same partitioner to vA, what will graphx(spark) do to handle this case? for instance, as below 1) to check the partitioner of vB. 2) to do leftJoin operations, on each machine separately, for those co-located partitions of vA and vB. right? But, if vB’s partitioner is different, what will happen? how they communicate between partitions(and machines)? Anyone has some points on this, or communication between RDDs? Thanks, :) Best, Yifan LI
[Graphx] which way is better to access faraway neighbors?
Hi, I have a graph in where each vertex keep several messages to some faraway neighbours(I mean, not to only immediate neighbours, at most k-hops far, e.g. k = 5). now, I propose to distribute these messages to their corresponding destinations(say, faraway neighbours”): - by using pregel api, one superset is enough - by using spark basic operations(groupByKey, leftJoin, etc) on vertices RDD and its intermediate results. w.r.t the communication among machines, and the high cost of groupByKey/leftJoin, I guess that 1st option is better? what’s your idea? Best, Yifan LI
map function
Hi, I have a RDD like below: (1, (10, 20)) (2, (30, 40, 10)) (3, (30)) … Is there any way to map it to this: (10,1) (20,1) (30,2) (40,2) (10,2) (30,3) … generally, for each element, it might be mapped to multiple. Thanks in advance! Best, Yifan LI
Re: map function
Thanks, Paolo and Mark. :) On 04 Dec 2014, at 11:58, Paolo Platter paolo.plat...@agilelab.it wrote: Hi, rdd.flatMap( e = e._2.map( i = ( i, e._1))) Should work, but I didn't test it so maybe I'm missing something. Paolo Inviata dal mio Windows Phone Da: Yifan LI mailto:iamyifa...@gmail.com Inviato: 04/12/2014 09:27 A: user@spark.apache.org mailto:user@spark.apache.org Oggetto: map function Hi, I have a RDD like below: (1, (10, 20)) (2, (30, 40, 10)) (3, (30)) … Is there any way to map it to this: (10,1) (20,1) (30,2) (40,2) (10,2) (30,3) … generally, for each element, it might be mapped to multiple. Thanks in advance! Best, Yifan LI
[graphx] failed to submit an application with java.lang.ClassNotFoundException
Hi, I just tried to submit an application from graphx examples directory, but it failed: yifan2:bin yifanli$ MASTER=local[*] ./run-example graphx.PPR_hubs java.lang.ClassNotFoundException: org.apache.spark.examples.graphx.PPR_hubs at java.net.URLClassLoader$1.run(URLClassLoader.java:202) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:190) at java.lang.ClassLoader.loadClass(ClassLoader.java:306) at java.lang.ClassLoader.loadClass(ClassLoader.java:247) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:249) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:318) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) and also, yifan2:bin yifanli$ ./spark-submit --class org.apache.spark.examples.graphx.PPR_hubs ../examples/target/scala-2.10/spark-examples-1.2.0-SNAPSHOT-hadoop1.0.4.jar java.lang.ClassNotFoundException: org.apache.spark.examples.graphx.PPR_hubs at java.net.URLClassLoader$1.run(URLClassLoader.java:202) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:190) at java.lang.ClassLoader.loadClass(ClassLoader.java:306) at java.lang.ClassLoader.loadClass(ClassLoader.java:247) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:249) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:318) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) anyone has some points on this? Best, Yifan LI
Re: How to measure communication between nodes in Spark Standalone Cluster?
I am not sure there is a direct way(an api in graphx, etc) to measure the number of transferred vertex values among nodes during computation. It might depend on: - the operations in your application, e.g. only communicate with its immediate neighbours for each vertex. - the partition strategy you chose, wrt the vertices replication factor - the distribution of partitions on cluster ... Best, Yifan LI LIP6, UPMC, Paris On 17 Nov 2014, at 11:59, Hlib Mykhailenko hlib.mykhaile...@inria.fr wrote: Hello, I use Spark Standalone Cluster and I want to measure somehow internode communication. As I understood, Graphx transfers only vertices values. Am I right? But I do not want to get number of bytes which were transferred between any two nodes. So is there way to measure how many values of vertices were transferred among nodes? Thanks! -- Cordialement, Hlib Mykhailenko Doctorant à INRIA Sophia-Antipolis Méditerranée http://www.inria.fr/centre/sophia/ 2004 Route des Lucioles BP93 06902 SOPHIA ANTIPOLIS cedex
Re: How to set persistence level of graph in GraphX in spark 1.0.0
Hi Arpit, To try this: val graph = GraphLoader.edgeListFile(sc, edgesFile, minEdgePartitions = numPartitions, edgeStorageLevel = StorageLevel.MEMORY_AND_DISK, vertexStorageLevel = StorageLevel.MEMORY_AND_DISK) Best, Yifan LI On 28 Oct 2014, at 11:17, Arpit Kumar arp8...@gmail.com wrote: Any help regarding this issue please? Regards, Arpit On Sat, Oct 25, 2014 at 8:56 AM, Arpit Kumar arp8...@gmail.com wrote: Hi all, I am using the GrpahLoader class to load graphs from edge list files. But then I need to change the storage level of the graph to some other thing than MEMORY_ONLY. val graph = GraphLoader.edgeListFile(sc, fname, minEdgePartitions = numEPart).persist(StorageLevel.MEMORY_AND_DISK_SER) The error I am getting while executing this is: Exception in thread main java.lang.UnsupportedOperationException: Cannot change storage level of an RDD after it was already assigned a level Then I looked into the GraphLoader class. I know that in the latest version of spark support for setting persistence level is provided in this class. Please suggest a workaround for spark 1.0.0 as I do not have the option to shift to latest release. Note: I tried copying the GraphLoader class to my package as GraphLoader1 importing package com.cloudera.xyz import org.apache.spark.storage.StorageLevel import org.apache.spark.graphx._ import org.apache.spark.{Logging, SparkContext} import org.apache.spark.graphx.impl._ and then changing the persistence level to my suitability as .persist(gStorageLevel) instead of .cache() But while compiling I am getting the following errors GraphLoader1.scala:49: error: class EdgePartitionBuilder in package impl cannot be accessed in package org.apache.spark.graphx.impl [INFO] val builder = new EdgePartitionBuilder[Int, Int] I am also attaching the file with the mail. Maybe this way of doing thing is not possible. Please suggest some workarounds so that I can set persistence level of my graph to MEMORY_AND_DISK_SER for the graph I read from edge file list -- Arpit Kumar Fourth Year Undergraduate Department of Computer Science and Engineering Indian Institute of Technology, Kharagpur
Re: How to set persistence level of graph in GraphX in spark 1.0.0
I am not sure if it can work on Spark 1.0, but give it a try. or, Maybe you can try: 1) to construct the edges and vertices RDDs respectively with desired storage level. 2) then, to obtain a graph by using Graph(verticesRDD, edgesRDD). Best, Yifan LI On 28 Oct 2014, at 12:10, Arpit Kumar arp8...@gmail.com wrote: Hi Yifan LI, I am currently working on Spark 1.0 in which we can't pass edgeStorageLevel as parameter. It implicitly caches the edges. So I am looking for a workaround. http://spark.apache.org/docs/1.0.0/api/scala/index.html#org.apache.spark.graphx.GraphLoader$ Regards, Arpit On Tue, Oct 28, 2014 at 4:25 PM, Yifan LI iamyifa...@gmail.com wrote: Hi Arpit, To try this: val graph = GraphLoader.edgeListFile(sc, edgesFile, minEdgePartitions = numPartitions, edgeStorageLevel = StorageLevel.MEMORY_AND_DISK, vertexStorageLevel = StorageLevel.MEMORY_AND_DISK) Best, Yifan LI On 28 Oct 2014, at 11:17, Arpit Kumar arp8...@gmail.com wrote: Any help regarding this issue please? Regards, Arpit On Sat, Oct 25, 2014 at 8:56 AM, Arpit Kumar arp8...@gmail.com wrote: Hi all, I am using the GrpahLoader class to load graphs from edge list files. But then I need to change the storage level of the graph to some other thing than MEMORY_ONLY. val graph = GraphLoader.edgeListFile(sc, fname, minEdgePartitions = numEPart).persist(StorageLevel.MEMORY_AND_DISK_SER) The error I am getting while executing this is: Exception in thread main java.lang.UnsupportedOperationException: Cannot change storage level of an RDD after it was already assigned a level Then I looked into the GraphLoader class. I know that in the latest version of spark support for setting persistence level is provided in this class. Please suggest a workaround for spark 1.0.0 as I do not have the option to shift to latest release. Note: I tried copying the GraphLoader class to my package as GraphLoader1 importing package com.cloudera.xyz import org.apache.spark.storage.StorageLevel import org.apache.spark.graphx._ import org.apache.spark.{Logging, SparkContext} import org.apache.spark.graphx.impl._ and then changing the persistence level to my suitability as .persist(gStorageLevel) instead of .cache() But while compiling I am getting the following errors GraphLoader1.scala:49: error: class EdgePartitionBuilder in package impl cannot be accessed in package org.apache.spark.graphx.impl [INFO] val builder = new EdgePartitionBuilder[Int, Int] I am also attaching the file with the mail. Maybe this way of doing thing is not possible. Please suggest some workarounds so that I can set persistence level of my graph to MEMORY_AND_DISK_SER for the graph I read from edge file list -- Arpit Kumar Fourth Year Undergraduate Department of Computer Science and Engineering Indian Institute of Technology, Kharagpur -- Arpit Kumar Fourth Year Undergraduate Department of Computer Science and Engineering Indian Institute of Technology, Kharagpur
how to send message to specific vertex by Pregel api
Hi, Is there anyone having clue of sending messages to specific vertex(not to immediate neighbour), whose vId is stored in property of source vertex, in Pregel api? More precisely, how to do this in sendMessage() ? to pass more general Triplets into above function? (Obviously we can do it using basic spark table operations(join, etc), for instance, in [1]) [1] http://event.cwi.nl/grades2014/03-salihoglu.pdf Best, Yifan LI
Re: vertex active/inactive feature in Pregel API ?
Dear Ankur, Thanks! :) - from [1], and my understanding, the existing inactive feature in graphx pregel api is “if there is no in-edges, from active vertex, to this vertex, then we will say this one is inactive”, right? For instance, there is a graph in which every vertex has at least one in-edges, then we run static Pagerank on it for 10 iterations. During this calculation, is there any vertex would be set inactive? - for more “explicit active vertex tracking”, e.g. vote to halt, how to achieve it in existing api? (I am not sure I got the point of [2], that “vote” function has already been introduced in graphx pregel api? ) Best, Yifan LI On 15 Sep 2014, at 23:07, Ankur Dave ankurd...@gmail.com wrote: At 2014-09-15 16:25:04 +0200, Yifan LI iamyifa...@gmail.com wrote: I am wondering if the vertex active/inactive(corresponding the change of its value between two supersteps) feature is introduced in Pregel API of GraphX? Vertex activeness in Pregel is controlled by messages: if a vertex did not receive a message in the previous iteration, its vertex program will not run in the current iteration. Also, inactive vertices will not be able to send messages because by default the sendMsg function will only be run on edges where at least one of the adjacent vertices received a message. You can change this behavior -- see the documentation for the activeDirection parameter to Pregel.apply [1]. There is also an open pull request to make active vertex tracking more explicit by allowing vertices to vote to halt directly [2]. Ankur [1] http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.Pregel$ [2] https://github.com/apache/spark/pull/1217
vertex active/inactive feature in Pregel API ?
Hi, I am wondering if the vertex active/inactive(corresponding the change of its value between two supersteps) feature is introduced in Pregel API of GraphX? if it is not a default setting, how to call it below? def sendMessage(edge: EdgeTriplet[(Int,HashMap[VertexId, Double]), Int]) = Iterator((edge.dstId, hmCal(edge.srcAttr))) or, I should do that by a customised measure function, e.g. by keeping its change in vertex attribute after each iteration. I noticed that there is an optional parameter “skipStale in mrTriplets operator. Best, Yifan LI
Re: [GraphX] how to set memory configurations to avoid OutOfMemoryError GC overhead limit exceeded
Thank you, Ankur! :) But how to assign the storage level to a new vertices RDD that mapped from an existing vertices RDD, e.g. *val newVertexRDD = graph.collectNeighborIds(EdgeDirection.Out).map{case(id:VertexId, a:Array[VertexId]) = (id, initialHashMap(a))}* the new one will be combined with that existing edges RDD(MEMORY_AND_DISK) to construct a new graph. e.g. val newGraph = Graph(newVertexRDD, graph.edges) BTW, the return of newVertexRDD.getStorageLevel is StorageLevel(true, true, false, true, 1), what does it mean? Thanks in advance! Best, Yifan 2014-09-03 22:42 GMT+02:00 Ankur Dave ankurd...@gmail.com: At 2014-09-03 17:58:09 +0200, Yifan LI iamyifa...@gmail.com wrote: val graph = GraphLoader.edgeListFile(sc, edgesFile, minEdgePartitions = numPartitions).partitionBy(PartitionStrategy.EdgePartition2D).persist(StorageLevel.MEMORY_AND_DISK) Error: java.lang.UnsupportedOperationException: Cannot change storage level of an RDD after it was already assigned a level You have to pass the StorageLevel to GraphLoader.edgeListFile: val graph = GraphLoader.edgeListFile( sc, edgesFile, minEdgePartitions = numPartitions, edgeStorageLevel = StorageLevel.MEMORY_AND_DISK, vertexStorageLevel = StorageLevel.MEMORY_AND_DISK) .partitionBy(PartitionStrategy.EdgePartition2D) Ankur
Re: [GraphX] how to set memory configurations to avoid OutOfMemoryError GC overhead limit exceeded
Hi Ankur, Thanks so much for your advice. But it failed when I tried to set the storage level in constructing a graph. val graph = GraphLoader.edgeListFile(sc, edgesFile, minEdgePartitions = numPartitions).partitionBy(PartitionStrategy.EdgePartition2D).persist(StorageLevel.MEMORY_AND_DISK) Error: java.lang.UnsupportedOperationException: Cannot change storage level of an RDD after it was already assigned a level Is there anyone could give me help? Best, Yifan 2014-08-18 23:52 GMT+02:00 Ankur Dave ankurd...@gmail.com: On Mon, Aug 18, 2014 at 6:29 AM, Yifan LI iamyifa...@gmail.com wrote: I am testing our application(similar to personalised page rank using Pregel, and note that each vertex property will need pretty much more space to store after new iteration) [...] But when we ran it on larger graph(e.g. LiveJouranl), it always end at the error GC overhead limit exceeded, even the partitions number is increased to 48 from 8. If the graph (including vertex properties) is too large to fit in memory, you might try allowing it to spill to disk. When constructing the graph, you can set vertexStorageLevel and edgeStorageLevel to StorageLevel.MEMORY_AND_DISK. This should allow the algorithm to finish. Ankur http://www.ankurdave.com/
Re: GraphX
Try this: ./bin/run-example graphx.LiveJournalPageRank edge_list_file… On Aug 2, 2014, at 5:55 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Hi, I am running Spark in a single node cluster. I am able to run the codes in Spark like SparkPageRank.scala, SparkKMeans.scala by the following command, bin/run-examples org.apache.spark.examples.SparkPageRank and the required things Now, I want to run the Pagerank.scala that is there in GraphX. Do we have a similar command like earlier one for this too? I also went through the run-example shell file and saw that the path is set to org.apache.spark.examples How should I run graphx codes? Thank You
[GraphX] how to compute only a subset of vertices in the whole graph?
Hi, I just implemented our algorithm(like personalised pagerank) using Pregel api, and it seems works well. But I am thinking of if I can compute only some selected vertexes(hubs), not to do update on every vertex… is it possible to do this using Pregel API? or, more realistically, only hubs can receive messages and compute results in the LAST iteration? since we don't need the final result of non-hub vertices. Best, Yifan LI - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
the implications of some items in webUI
Hi, I am analysing the application processing on Spark(GraphX), but feeling a little confused on some items of webUI. 1) what is the difference between Duration(Stages - Completed Stages) and Task Time(Executors) ? for instance, 43s VS. 5.6 m Task Time is approximated to Duration multiplied with Total Tasks? 2) what are the exact meanings of Shuffle Read/Shuffle Write? Best, Yifan LI
Re: the default GraphX graph-partition strategy on multicore machine?
Thanks so much, Ankur, :)) Excuse me but I am wondering that: (for a chosen partition strategy for my application) 1.1) how to check the size of each partition? is there any api, or log file? 1.2) how to check the processing cost of each partition(time, memory, etc)? 2.1) and the global communication cost of my application upon the chosen partition strategy? On Jul 18, 2014, at 9:18 PM, Ankur Dave ankurd...@gmail.com wrote: Sorry, I didn't read your vertex replication example carefully, so my previous answer is wrong. Here's the correct one: On Fri, Jul 18, 2014 at 9:13 AM, Yifan LI iamyifa...@gmail.com wrote: I don't understand, for instance, we have 3 edge partition tables(EA: a - b, a - c; EB: a - d, a - e; EC: d - c ), 2 vertex partition tables(VA: a, b, c; VB: d, e), the whole vertex table VA will be replicated to all these 3 edge partitions? since each of them refers to some vertexes in VA. Vertices can be replicated individually without requiring the entire vertex partition to be replicated. In this case, here's what will get replicated to each partition: EA: a (from VA), b (from VA), c (from VA) EB: a (from VA), d (from VB), e (from VB) EC: c (from VA), d (from VB) Ankur
Re: java.lang.OutOfMemoryError: GC overhead limit exceeded
Thanks, Abel. Best, Yifan LI On Jul 21, 2014, at 4:16 PM, Abel Coronado Iruegas acoronadoirue...@gmail.com wrote: Hi Yifan This works for me: export SPARK_JAVA_OPTS=-Xms10g -Xmx40g -XX:MaxPermSize=10g export ADD_JARS=/home/abel/spark/MLI/target/MLI-assembly-1.0.jar export SPARK_MEM=40g ./spark-shell Regards On Mon, Jul 21, 2014 at 7:48 AM, Yifan LI iamyifa...@gmail.com wrote: Hi, I am trying to load the Graphx example dataset(LiveJournal, 1.08GB) through Scala Shell on my standalone multicore machine(8 cpus, 16GB mem), but an OutOfMemory error was returned when below code was running, val graph = GraphLoader.edgeListFile(sc, path, minEdgePartitions = 16).partitionBy(PartitionStrategy.RandomVertexCut) I guess I should set some parameters to JVM? like -Xmx5120m But how to do this in Scala Shell? I directly used the bin/spark-shell to start spark and seems everything works correctly in WebUI. Or, I should do parameters setting at somewhere(spark-1.0.1)? Best, Yifan LI
Re: the default GraphX graph-partition strategy on multicore machine?
Hi Ankur, Thanks so much! :)) Yes, is possible to defining a custom partition strategy? And, some other questions: (2*4 cores machine, 24GB memory) - if I load one edges file(5 GB), without any cores/partitions setting, what is the default partition in graph construction? and how many cores will be used? Or, if the size of file is 50 GB(more than available memory, without partition setting)? - because each vertex must be replicated to all partitions where it is referenced. I don't understand, for instance, we have 3 edge partition tables(EA: a - b, a - c; EB: a - d, a - e; EC: d - c ), 2 vertex partition tables(VA: a, b, c; VB: d, e), the whole vertex table VA will be replicated to all these 3 edge partitions? since each of them refers to some vertexes in VA. - there is no shared-memory parallelism. You mean that the core is stricter to access only its own partition in memory? how do they communicate when the required data(edges?) in another partition? On Jul 15, 2014, at 9:30 PM, Ankur Dave ankurd...@gmail.com wrote: On Jul 15, 2014, at 12:06 PM, Yifan LI iamyifa...@gmail.com wrote: Btw, is there any possibility to customise the partition strategy as we expect? I'm not sure I understand. Are you asking about defining a custom partition strategy? On Tue, Jul 15, 2014 at 6:20 AM, Yifan LI iamyifa...@gmail.com wrote: when I load the file using sc.textFile (minPartitions = 16, PartitionStrategy.RandomVertexCut) The easiest way to load the edge file would actually be to use GraphLoader.edgeListFile(sc, path, minEdgePartitions = 16).partitionBy(PartitionStrategy.RandomVertexCut). 1) how much data will be loaded into memory? The exact size of the graph (vertices + edges) in memory depends on the graph's structure, the partition function, and the average vertex degree, because each vertex must be replicated to all partitions where it is referenced. It's easier to estimate the size of just the edges, which I did on the mailing list a while ago. To summarize, during graph construction each edge takes 60 bytes, and once the graph is constructed it takes 20 bytes. 2) how many partitions will be stored in memory? Once you call cache() on the graph, all 16 partitions will be stored in memory. You can also tell GraphX to spill them to disk in case of memory pressure by passing edgeStorageLevel=StorageLevel.MEMORY_AND_DISK to GraphLoader.edgeListFile. 3) If the thread/task on each core will read only one edge from memory and then compute it at every time? Yes, each task is single-threaded, so it will read the edges in its partition sequentially. 3.1) which edge on memory was determined to read into cache? In theory, each core should independently read the edges in its partition into its own cache. Cache issues should be much less of a concern than in most applications because different tasks (cores) operate on independent partitions; there is no shared-memory parallelism. The tradeoff is heavier reliance on shuffles. 3.2) how are those partitions being scheduled? Spark handles the scheduling. There are details in Section 5.1 of the Spark NSDI paper; in short, tasks are scheduled to run on the same machine as their data partition, and by default each machine can accept at most one task per core. Ankur
Re: the default GraphX graph-partition strategy on multicore machine?
Dear Ankur, Thanks so much! Btw, is there any possibility to customise the partition strategy as we expect? Best, Yifan On Jul 11, 2014, at 10:20 PM, Ankur Dave ankurd...@gmail.com wrote: Hi Yifan, When you run Spark on a single machine, it uses a local mode where one task per core can be executed at at a time -- that is, the level of parallelism is the same as the number of cores. To take advantage of this, when you load a file using sc.textFile, you should set the minPartitions argument to be the number of cores (available from sc.defaultParallelism) or a multiple thereof. This will split up your local edge file and allow you to take advantage of all the machine's cores. Once you've loaded the edge RDD with the appropriate number of partitions and constructed a graph using it, GraphX will leave the edge partitioning alone. During graph computation, each vertex will automatically be copied to the edge partitions where it is needed, and the computation will execute in parallel on each of the edge partitions (cores). If you later call Graph.partitionBy, it will by default preserve the number of edge partitions, but shuffle around the edges according to the partition strategy. This won't change the level of parallelism, but it might decrease the amount of inter-core communication. Hope that helps! By the way, do continue to post your GraphX questions to the Spark user list if possible. I'll probably still be the one answering them, but that way others can benefit as well. Ankur On Fri, Jul 11, 2014 at 3:05 AM, Yifan LI iamyifa...@gmail.com wrote: Hi Ankur, I am doing graph computation using GraphX on a single multicore machine(not a cluster). But It seems that I couldn't find enough docs w.r.t how GraphX partition graph on a multicore machine. Could you give me some introduction or docs? For instance, I have one single edges file(not HDFS, etc), which follows the srcID, dstID, edgeProperties format, maybe 100MB or 500GB on size. and the latest Spark 1.0.0(with GraphX) has been installed on a 64bit, 8*CPU machine. I propose to do my own algorithm application, - as default, how the edges data is partitioned? to each CPU? or to each process? - if later I specify partition strategy in partitionBy(), e.g. PartitionStrategy.EdgePartition2D what will happen? it will work? Thanks in advance! :) Best, Yifan LI Univ. Paris-Sud/ Inria, Paris, France
GraphX: how to specify partition strategy?
Hi, I am doing graph computation using GraphX, but it seems to be an error on graph partition strategy specification. as in GraphX programming guide: The Graph.partitionBy operator allows users to choose the graph partitioning strategy, but due to SPARK-1931, this method is broken in Spark 1.0.0. We encourage users to build the latest version of Spark from the master branch, which contains a fix. Alternatively, a workaround is to partition the edges before constructing the graph, as follows: My questions: - how to build the latest version of Spark from the master branch, which contains a fix? - how to specify other partition strategy, eg. CanonicalRandomVertexCut, EdgePartition1D, EdgePartition2D, RandomVertexCut (listed in Scala API document, but seems only EdgePartition2D is available? I am not sure for this! ) - Is it possible to add my own partition strategy(hash function, etc.) into GraphX? Thanks in advance! :)) Best, Yifan
which Spark package(wrt. graphX) I should install to do graph computation on cluster?
Hi, I am planning to do graph(social network) computation on a cluster(hadoop has been installed), but it seems there are a Pre-built package for hadoop which I am NOT sure if the graphX has been included in. or, should I install other released version(obviously the graphX has been included)? Looking for your reply! :) Best, Yifan