unsubscribe

2023-08-11 Thread Yifan LI
unsubscribe


Re: [Spark] java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE

2015-10-30 Thread Yifan LI
Thanks Deng,

Yes, I agree that there is a partition larger than 2GB which caused this
exception.

But actually in my case it seems to be not-helpful to fix this problem by
directly increasing partitioning in sortBy operation.

I think the partitioning in sortBy is not balanced, e.g. in my dataset,
there exist amounts of elements having same value, then they will be kept
in a common partition whose size would be quite large.

So, I called rePartition method for balanced partitioning after sorting and
it works well.

On 30 October 2015 at 03:13, Deng Ching-Mallete <och...@apache.org> wrote:

> Hi Yifan,
>
> This is a known issue, please refer to
> https://issues.apache.org/jira/browse/SPARK-6235 for more details. In
> your case, it looks like you are caching to disk a partition > 2G. A
> workaround would be to increase the number of your RDD partitions in order
> to make them smaller in size.
>
> HTH,
> Deng
>
> On Thu, Oct 29, 2015 at 8:40 PM, Yifan LI <iamyifa...@gmail.com> wrote:
>
>> I have a guess that before scanning that RDD, I sorted it and set
>> partitioning, so the result is not balanced:
>>
>> sortBy[S](f: Function
>> <http://spark.apache.org/docs/latest/api/scala/org/apache/spark/api/java/function/Function.html>
>> [T, S], ascending: Boolean, *numPartitions*: Int)
>>
>> I will try to repartition it to see if it helps.
>>
>> Best,
>> Yifan LI
>>
>>
>>
>>
>>
>> On 29 Oct 2015, at 12:52, Yifan LI <iamyifa...@gmail.com> wrote:
>>
>> Hey,
>>
>> I was just trying to scan a large RDD sortedRdd, ~1billion elements,
>> using toLocalIterator api, but an exception returned as it was almost
>> finished:
>>
>> java.lang.RuntimeException: java.lang.IllegalArgumentException: Size
>> exceeds Integer.MAX_VALUE
>> at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:821)
>> at
>> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125)
>> at
>> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113)
>> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285)
>> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127)
>> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134)
>> at
>> org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:511)
>> at
>> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:302)
>> at
>> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>> at
>> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>> at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>> at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>> at
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>> at
>> org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
>> at
>> org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114)
>> at
>> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:87)
>> at
>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:101)
>> at
>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
>> at
>> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>> at
>> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:254)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>> at
>> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333

[Spark] java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE

2015-10-29 Thread Yifan LI
(IdleStateHandler.java:254)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at 
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at scala.Option.foreach(Option.scala:236)
at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

Do you have any idea? 

I have set partitioning quite big, like 4


Best,
Yifan LI







Re: [Spark] java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE

2015-10-29 Thread Yifan LI
I have a guess that before scanning that RDD, I sorted it and set partitioning, 
so the result is not balanced:

sortBy[S](f: Function 
<http://spark.apache.org/docs/latest/api/scala/org/apache/spark/api/java/function/Function.html>[T,
 S], ascending: Boolean, numPartitions: Int)

I will try to repartition it to see if it helps.

Best,
Yifan LI





> On 29 Oct 2015, at 12:52, Yifan LI <iamyifa...@gmail.com> wrote:
> 
> Hey,
> 
> I was just trying to scan a large RDD sortedRdd, ~1billion elements, using 
> toLocalIterator api, but an exception returned as it was almost finished:
> 
> java.lang.RuntimeException: java.lang.IllegalArgumentException: Size exceeds 
> Integer.MAX_VALUE
>   at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:821)
>   at 
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125)
>   at 
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113)
>   at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285)
>   at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127)
>   at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134)
>   at 
> org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:511)
>   at 
> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:302)
>   at 
> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>   at 
> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>   at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>   at 
> org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
>   at 
> org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114)
>   at 
> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:87)
>   at 
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:101)
>   at 
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
>   at 
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>   at 
> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:254)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>   at 
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>   at 
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>   at 
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
>   at 
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
>   at 
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>   at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>   at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>   at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>   at 
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
>   at java.lang.T

Re: java.lang.NegativeArraySizeException? as iterating a big RDD

2015-10-23 Thread Yifan LI
Thanks for your advice, Jem. :)

I will increase the partitioning and see if it helps. 

Best,
Yifan LI





> On 23 Oct 2015, at 12:48, Jem Tucker <jem.tuc...@gmail.com> wrote:
> 
> Hi Yifan, 
> 
> I think this is a result of Kryo trying to seriallize something too large. 
> Have you tried to increase your partitioning? 
> 
> Cheers,
> 
> Jem
> 
> On Fri, Oct 23, 2015 at 11:24 AM Yifan LI <iamyifa...@gmail.com 
> <mailto:iamyifa...@gmail.com>> wrote:
> Hi,
> 
> I have a big sorted RDD sRdd(~962million elements), and need to scan its 
> elements in order(using sRdd.toLocalIterator).
> 
> But the process failed when the scanning was done after around 893million 
> elements, returned with following exception:
> 
> Anyone has idea? Thanks!
> 
> 
> Exception in thread "main" org.apache.spark.SparkException: Job aborted due 
> to stage failure: Task 0 in stage 421752.0 failed 128 times, most recent 
> failure: Lost task 0.127 in stage 421752.0 (TID 17304, 
> small15-tap1.common.lip6.fr <http://small15-tap1.common.lip6.fr/>): 
> java.lang.NegativeArraySizeException
>   at 
> com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:409)
>   at 
> com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:227)
>   at 
> com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221)
>   at 
> com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:117)
>   at 
> com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:228)
>   at 
> com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221)
>   at 
> com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:117)
>   at 
> com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapReferenceResolver.java:23)
>   at com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:598)
>   at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:566)
>   at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:36)
>   at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33)
>   at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>   at 
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
>   at 
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
>   at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>   at 
> org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:250)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:236)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> 
> Driver stacktrace:
>   at org.apache.spark.scheduler.DAGScheduler.org 
> <http://org.apache.spark.scheduler.dagscheduler.org/>$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>   at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
>   at scala.Option.foreach(Option.scala:236)
>   at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418)
>   at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> 
> Best,
> Yifan LI
> 
> 
> 
> 
> 



java.lang.NegativeArraySizeException? as iterating a big RDD

2015-10-23 Thread Yifan LI
Hi,

I have a big sorted RDD sRdd(~962million elements), and need to scan its 
elements in order(using sRdd.toLocalIterator).

But the process failed when the scanning was done after around 893million 
elements, returned with following exception:

Anyone has idea? Thanks!


Exception in thread "main" org.apache.spark.SparkException: Job aborted due to 
stage failure: Task 0 in stage 421752.0 failed 128 times, most recent failure: 
Lost task 0.127 in stage 421752.0 (TID 17304, small15-tap1.common.lip6.fr): 
java.lang.NegativeArraySizeException
at 
com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:409)
at 
com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:227)
at 
com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221)
at 
com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:117)
at 
com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:228)
at 
com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221)
at 
com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:117)
at 
com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapReferenceResolver.java:23)
at com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:598)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:566)
at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:36)
at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at 
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
at 
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at 
org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:250)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:236)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at scala.Option.foreach(Option.scala:236)
at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

Best,
Yifan LI







"dynamically" sort a large collection?

2015-10-12 Thread Yifan LI
Hey,

I need to scan a large "key-value" collection as below:

1) sort it on an attribute of “value” 
2) scan it one by one, from element with largest value
2.1) if the current element matches a pre-defined condition, its value will be 
reduced and the element will be inserted back to collection. 
if not, this current element should be removed from collection.


In my previous program, the 1) step can be easily conducted in Spark(RDD 
operation), but I am not sure how to do 2.1) step, esp. the “put/inserted back” 
operation on a sorted RDD.
I have tried to make a new RDD at every-time an element was found to inserted, 
but it is very costly due to a re-sorting…


Is there anyone having some ideas?

Thanks so much!

**
an example:

the sorted result of initial collection C(on bold value), sortedC:
(1, (71, “aaa"))
(2, (60, “bbb"))
(3, (53.5, “ccc”))
(4, (48, “ddd”))
(5, (29, “eee"))
…

pre-condition: its_value%2 == 0
if pre-condition is matched, its value will be reduce on half.

Thus:

#1:
71 is not matched, so this element is removed.
(1, (71, “aaa”)) —> removed!
(2, (60, “bbb"))
(3, (53.5, “ccc”))
(4, (48, “ddd”))
(5, (29, “eee"))
…

#2:
60 is matched! 60/2 = 30, the collection right now should be as:
(3, (53.5, “ccc”))
(4, (48, “ddd”))
(2, (30, “bbb”)) <— inserted back here
(5, (29, “eee"))
…






Best,
Yifan LI







Re: "dynamically" sort a large collection?

2015-10-12 Thread Yifan LI
Hey Adrian,

Thanks for your fast reply. :)

Actually the “pre-condition” is not fixed in real application, e.g. it would 
change based on counting of previous unmatched elements.
So I need to use iterator operator, rather than flatMap-like operators…

Besides, do you have any idea on how to avoid that “sort again”? it is too 
costly… :(

Anyway thank you again!

Best,
Yifan LI





> On 12 Oct 2015, at 12:19, Adrian Tanase <atan...@adobe.com> wrote:
> 
> I think you’re looking for the flatMap (or flatMapValues) operator – you can 
> do something like
> 
> sortedRdd.flatMapValues( v =>
> If (v % 2 == 0) {
> Some(v / 2)
> } else {
> None
> }
> )
> 
> Then you need to sort again.
> 
> -adrian
> 
> From: Yifan LI
> Date: Monday, October 12, 2015 at 1:03 PM
> To: spark users
> Subject: "dynamically" sort a large collection?
> 
> Hey,
> 
> I need to scan a large "key-value" collection as below:
> 
> 1) sort it on an attribute of “value” 
> 2) scan it one by one, from element with largest value
> 2.1) if the current element matches a pre-defined condition, its value will 
> be reduced and the element will be inserted back to collection. 
> if not, this current element should be removed from collection.
> 
> 
> In my previous program, the 1) step can be easily conducted in Spark(RDD 
> operation), but I am not sure how to do 2.1) step, esp. the “put/inserted 
> back” operation on a sorted RDD.
> I have tried to make a new RDD at every-time an element was found to 
> inserted, but it is very costly due to a re-sorting…
> 
> 
> Is there anyone having some ideas?
> 
> Thanks so much!
> 
> **
> an example:
> 
> the sorted result of initial collection C(on bold value), sortedC:
> (1, (71, “aaa"))
> (2, (60, “bbb"))
> (3, (53.5, “ccc”))
> (4, (48, “ddd”))
> (5, (29, “eee"))
> …
> 
> pre-condition: its_value%2 == 0
> if pre-condition is matched, its value will be reduce on half.
> 
> Thus:
> 
> #1:
> 71 is not matched, so this element is removed.
> (1, (71, “aaa”)) —> removed!
> (2, (60, “bbb"))
> (3, (53.5, “ccc”))
> (4, (48, “ddd”))
> (5, (29, “eee"))
> …
> 
> #2:
> 60 is matched! 60/2 = 30, the collection right now should be as:
> (3, (53.5, “ccc”))
> (4, (48, “ddd”))
> (2, (30, “bbb”)) <— inserted back here
> (5, (29, “eee"))
> …
> 
> 
> 
> 
> 
> 
> Best,
> Yifan LI
> 
> 
> 
> 
> 



Re: "dynamically" sort a large collection?

2015-10-12 Thread Yifan LI
Shiwei, yes, you might be right. Thanks. :)

Best,
Yifan LI





> On 12 Oct 2015, at 12:55, 郭士伟 <guoshi...@gmail.com> wrote:
> 
> I think this is not a problem Spark can solve effectively, cause RDD in 
> immutable. Every time you want to change an RDD, you create a new one, and 
> sort again. Maybe hbase or some other DB system will be a more suitable 
> solution. Or, if the data can fit into memory, use a simple heap will work.
> 
> 
> 2015-10-12 18:29 GMT+08:00 Yifan LI <iamyifa...@gmail.com 
> <mailto:iamyifa...@gmail.com>>:
> Hey Adrian,
> 
> Thanks for your fast reply. :)
> 
> Actually the “pre-condition” is not fixed in real application, e.g. it would 
> change based on counting of previous unmatched elements.
> So I need to use iterator operator, rather than flatMap-like operators…
> 
> Besides, do you have any idea on how to avoid that “sort again”? it is too 
> costly… :(
> 
> Anyway thank you again!
> 
> Best,
> Yifan LI
> 
> 
> 
> 
> 
>> On 12 Oct 2015, at 12:19, Adrian Tanase <atan...@adobe.com 
>> <mailto:atan...@adobe.com>> wrote:
>> 
>> I think you’re looking for the flatMap (or flatMapValues) operator – you can 
>> do something like
>> 
>> sortedRdd.flatMapValues( v =>
>> If (v % 2 == 0) {
>> Some(v / 2)
>> } else {
>> None
>> }
>> )
>> 
>> Then you need to sort again.
>> 
>> -adrian
>> 
>> From: Yifan LI
>> Date: Monday, October 12, 2015 at 1:03 PM
>> To: spark users
>> Subject: "dynamically" sort a large collection?
>> 
>> Hey,
>> 
>> I need to scan a large "key-value" collection as below:
>> 
>> 1) sort it on an attribute of “value” 
>> 2) scan it one by one, from element with largest value
>> 2.1) if the current element matches a pre-defined condition, its value will 
>> be reduced and the element will be inserted back to collection. 
>> if not, this current element should be removed from collection.
>> 
>> 
>> In my previous program, the 1) step can be easily conducted in Spark(RDD 
>> operation), but I am not sure how to do 2.1) step, esp. the “put/inserted 
>> back” operation on a sorted RDD.
>> I have tried to make a new RDD at every-time an element was found to 
>> inserted, but it is very costly due to a re-sorting…
>> 
>> 
>> Is there anyone having some ideas?
>> 
>> Thanks so much!
>> 
>> **
>> an example:
>> 
>> the sorted result of initial collection C(on bold value), sortedC:
>> (1, (71, “aaa"))
>> (2, (60, “bbb"))
>> (3, (53.5, “ccc”))
>> (4, (48, “ddd”))
>> (5, (29, “eee"))
>> …
>> 
>> pre-condition: its_value%2 == 0
>> if pre-condition is matched, its value will be reduce on half.
>> 
>> Thus:
>> 
>> #1:
>> 71 is not matched, so this element is removed.
>> (1, (71, “aaa”)) —> removed!
>> (2, (60, “bbb"))
>> (3, (53.5, “ccc”))
>> (4, (48, “ddd”))
>> (5, (29, “eee"))
>> …
>> 
>> #2:
>> 60 is matched! 60/2 = 30, the collection right now should be as:
>> (3, (53.5, “ccc”))
>> (4, (48, “ddd”))
>> (2, (30, “bbb”)) <— inserted back here
>> (5, (29, “eee"))
>> …
>> 
>> 
>> 
>> 
>> 
>> 
>> Best,
>> Yifan LI
>> 
>> 
>> 
>> 
>> 
> 
> 



Re: Master dies after program finishes normally

2015-06-26 Thread Yifan LI
Hi,

I just encountered the same problem, when I run a PageRank program which has 
lots of stages(iterations)…

The master was lost after my program done.

And, the issue still remains even I increased driver memory.


Have any idea? e.g. how to increase the master memory?
Thanks.



Best,
Yifan LI





 On 12 Feb 2015, at 20:05, Arush Kharbanda ar...@sigmoidanalytics.com wrote:
 
 What is your cluster configuration? Did you try looking at the Web UI? There 
 are many tips here 
 
 http://spark.apache.org/docs/1.2.0/tuning.html 
 http://spark.apache.org/docs/1.2.0/tuning.html
 
 Did you try these?
 
 On Fri, Feb 13, 2015 at 12:09 AM, Manas Kar manasdebashis...@gmail.com 
 mailto:manasdebashis...@gmail.com wrote:
 Hi, 
  I have a Hidden Markov Model running with 200MB data.
  Once the program finishes (i.e. all stages/jobs are done) the program hangs 
 for 20 minutes or so before killing master.
 
 In the spark master the following log appears.
 
 2015-02-12 13:00:05,035 ERROR akka.actor.ActorSystemImpl: Uncaught fatal 
 error from thread [sparkMaster-akka.actor.default-dispatcher-31] shutting 
 down ActorSystem [sparkMaster]
 java.lang.OutOfMemoryError: GC overhead limit exceeded
 at scala.collection.immutable.List$.newBuilder(List.scala:396)
 at 
 scala.collection.generic.GenericTraversableTemplate$class.genericBuilder(GenericTraversableTemplate.scala:69)
 at 
 scala.collection.AbstractTraversable.genericBuilder(Traversable.scala:105)
 at 
 scala.collection.generic.GenTraversableFactory$GenericCanBuildFrom.apply(GenTraversableFactory.scala:58)
 at 
 scala.collection.generic.GenTraversableFactory$GenericCanBuildFrom.apply(GenTraversableFactory.scala:53)
 at 
 scala.collection.TraversableLike$class.builder$1(TraversableLike.scala:239)
 at 
 scala.collection.TraversableLike$class.map(TraversableLike.scala:243)
 at scala.collection.AbstractTraversable.map(Traversable.scala:105)
 at 
 org.json4s.MonadicJValue$$anonfun$org$json4s$MonadicJValue$$findDirectByName$1.apply(MonadicJValue.scala:26)
 at 
 org.json4s.MonadicJValue$$anonfun$org$json4s$MonadicJValue$$findDirectByName$1.apply(MonadicJValue.scala:22)
 at 
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
 at 
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at 
 scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
 at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
 at org.json4s.MonadicJValue.org 
 http://org.json4s.monadicjvalue.org/$json4s$MonadicJValue$$findDirectByName(MonadicJValue.scala:22)
 at org.json4s.MonadicJValue.$bslash(MonadicJValue.scala:16)
 at 
 org.apache.spark.util.JsonProtocol$.taskStartFromJson(JsonProtocol.scala:450)
 at 
 org.apache.spark.util.JsonProtocol$.sparkEventFromJson(JsonProtocol.scala:423)
 at 
 org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2$$anonfun$apply$1.apply(ReplayListenerBus.scala:71)
 at 
 org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2$$anonfun$apply$1.apply(ReplayListenerBus.scala:69)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at 
 org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2.apply(ReplayListenerBus.scala:69)
 at 
 org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2.apply(ReplayListenerBus.scala:55)
 at 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at 
 scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
 at 
 org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:55)
 at 
 org.apache.spark.deploy.master.Master.rebuildSparkUI(Master.scala:726)
 at 
 org.apache.spark.deploy.master.Master.removeApplication(Master.scala:675)
 at 
 org.apache.spark.deploy.master.Master.finishApplication(Master.scala:653)
 at 
 org.apache.spark.deploy.master.Master$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$29.apply(Master.scala:399)
 
 Can anyone help?
 
 ..Manas
 
 
 
 -- 
  http://htmlsig.com/www.sigmoidanalytics.com
 Arush Kharbanda || Technical Teamlead
 
 ar...@sigmoidanalytics.com mailto:ar...@sigmoidanalytics.com || 
 www.sigmoidanalytics.com http://www.sigmoidanalytics.com/


large shuffling = executor lost?

2015-06-04 Thread Yifan LI
Hi,

I am running my graphx application with Spark 1.3.1 on a small cluster. Then it 
failed on this exception:

org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output 
location for shuffle 1
 
But actually I found it is caused by “ExecutorLostFailure” indeed, and someone 
told it might because there was a large shuffling…

Is there anyone has idea to fix it? Thanks in advance!




Best,
Yifan LI







com.esotericsoftware.kryo.KryoException: java.io.IOException: Stream is corrupted

2015-05-13 Thread Yifan LI
Hi,

I was running our graphx application(worked finely on Spark 1.2.0) but failed 
on Spark 1.3.1 with below exception.

Anyone has idea on this issue? I guess it was caused by using LZ4 codec?

Exception in thread main org.apache.spark.SparkException: Job aborted due to 
stage failure: Task 54 in stage 7.6 failed 128 times, most recent failure: Lost 
task 54.127 in stage 7.6 (TID 5311, small15-tap1.common.lip6.fr): 
com.esotericsoftware.kryo.KryoException: java.io.IOException: Stream is 
corrupted
at com.esotericsoftware.kryo.io.Input.fill(Input.java:142)
at com.esotericsoftware.kryo.io.Input.require(Input.java:155)
at com.esotericsoftware.kryo.io.Input.readInt(Input.java:337)
at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:109)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:721)
at 
org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:138)
at 
org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
org.apache.spark.graphx.impl.ShippableVertexPartition$.apply(ShippableVertexPartition.scala:60)
at 
org.apache.spark.graphx.VertexRDD$$anonfun$2.apply(VertexRDD.scala:300)
at 
org.apache.spark.graphx.VertexRDD$$anonfun$2.apply(VertexRDD.scala:297)
at 
org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
at 
org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Stream is corrupted
at 
net.jpountz.lz4.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:152)
at 
net.jpountz.lz4.LZ4BlockInputStream.read(LZ4BlockInputStream.java:116)
at com.esotericsoftware.kryo.io.Input.fill(Input.java:140)
... 35 more

Driver stacktrace:
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at scala.Option.foreach(Option.scala:236)
at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 

Best,
Yifan LI







applications are still in progress?

2015-05-13 Thread Yifan LI
Hi,

I have some applications finished(but actually failed before), that in WebUI 
show
Application myApp is still in progress.

and, in the eventlog folder, there are several log files like this:

app-20150512***.inprogress

So, I am wondering what the “inprogress” means…

Thanks! :)


Best,
Yifan LI







No space left on device??

2015-05-06 Thread Yifan LI
Hi,

I am running my graphx application on Spark, but it failed since there is an 
error on one executor node(on which available hdfs space is small) that “no 
space left on device”.

I can understand why it happened, because my vertex(-attribute) rdd was 
becoming bigger and bigger during computation…, so maybe sometime the request 
on that node was too bigger than available space.

But, is there any way to avoid this kind of error? I am sure that the overall 
disk space of all nodes is enough for my application.

Thanks in advance!



Best,
Yifan LI







Re: No space left on device??

2015-05-06 Thread Yifan LI
Thanks, Shao. :-)

I am wondering if the spark will rebalance the storage overhead in 
runtime…since still there is some available space on other nodes.


Best,
Yifan LI





 On 06 May 2015, at 14:57, Saisai Shao sai.sai.s...@gmail.com wrote:
 
 I think you could configure multiple disks through spark.local.dir, default 
 is /tmp. Anyway if your intermediate data is larger than available disk 
 space, still will meet this issue.
 
 spark.local.dir   /tmpDirectory to use for scratch space in Spark, 
 including map output files and RDDs that get stored on disk. This should be 
 on a fast, local disk in your system. It can also be a comma-separated list 
 of multiple directories on different disks. NOTE: In Spark 1.0 and later this 
 will be overriden by SPARK_LOCAL_DIRS (Standalone, Mesos) or LOCAL_DIRS 
 (YARN) environment variables set by the cluster manager.
 
 2015-05-06 20:35 GMT+08:00 Yifan LI iamyifa...@gmail.com 
 mailto:iamyifa...@gmail.com:
 Hi,
 
 I am running my graphx application on Spark, but it failed since there is an 
 error on one executor node(on which available hdfs space is small) that “no 
 space left on device”.
 
 I can understand why it happened, because my vertex(-attribute) rdd was 
 becoming bigger and bigger during computation…, so maybe sometime the request 
 on that node was too bigger than available space.
 
 But, is there any way to avoid this kind of error? I am sure that the overall 
 disk space of all nodes is enough for my application.
 
 Thanks in advance!
 
 
 
 Best,
 Yifan LI
 
 
 
 
 
 



Re: No space left on device??

2015-05-06 Thread Yifan LI
Yes, you are right. For now I have to say the workload/executor is distributed 
evenly…so, like you said, it is difficult to improve the situation.

However, have you any idea of how to make a *skew* data/executor distribution? 



Best,
Yifan LI





 On 06 May 2015, at 15:13, Saisai Shao sai.sai.s...@gmail.com wrote:
 
 I think it depends on your workload and executor distribution, if your 
 workload is evenly distributed without any big data skew, and executors are 
 evenly distributed on each nodes, the storage usage of each node is nearly 
 the same. Spark itself cannot rebalance the storage overhead as you mentioned.
 
 2015-05-06 21:09 GMT+08:00 Yifan LI iamyifa...@gmail.com 
 mailto:iamyifa...@gmail.com:
 Thanks, Shao. :-)
 
 I am wondering if the spark will rebalance the storage overhead in 
 runtime…since still there is some available space on other nodes.
 
 
 Best,
 Yifan LI
 
 
 
 
 
 On 06 May 2015, at 14:57, Saisai Shao sai.sai.s...@gmail.com 
 mailto:sai.sai.s...@gmail.com wrote:
 
 I think you could configure multiple disks through spark.local.dir, default 
 is /tmp. Anyway if your intermediate data is larger than available disk 
 space, still will meet this issue.
 
 spark.local.dir  /tmpDirectory to use for scratch space in Spark, 
 including map output files and RDDs that get stored on disk. This should be 
 on a fast, local disk in your system. It can also be a comma-separated list 
 of multiple directories on different disks. NOTE: In Spark 1.0 and later 
 this will be overriden by SPARK_LOCAL_DIRS (Standalone, Mesos) or LOCAL_DIRS 
 (YARN) environment variables set by the cluster manager.
 
 2015-05-06 20:35 GMT+08:00 Yifan LI iamyifa...@gmail.com 
 mailto:iamyifa...@gmail.com:
 Hi,
 
 I am running my graphx application on Spark, but it failed since there is an 
 error on one executor node(on which available hdfs space is small) that “no 
 space left on device”.
 
 I can understand why it happened, because my vertex(-attribute) rdd was 
 becoming bigger and bigger during computation…, so maybe sometime the 
 request on that node was too bigger than available space.
 
 But, is there any way to avoid this kind of error? I am sure that the 
 overall disk space of all nodes is enough for my application.
 
 Thanks in advance!
 
 
 
 Best,
 Yifan LI
 
 
 
 
 
 
 
 



to split an RDD to multiple ones?

2015-05-02 Thread Yifan LI
Hi,

I have an RDD srdd containing (unordered-)data like this:
s1_0, s3_0, s2_1, s2_2, s3_1, s1_3, s1_2, …

What I want is (it will be much better if they could be in ascending order):
srdd_s1:
s1_0, s1_1, s1_2, …, s1_n
srdd_s2:
s2_0, s2_1, s2_2, …, s2_n
srdd_s3:
s3_0, s3_1, s3_2, …, s3_n
…
…

Have any idea? Thanks in advance! :)


Best,
Yifan LI







Re: to split an RDD to multiple ones?

2015-05-02 Thread Yifan LI
Thanks, Olivier and Franz. :)


Best,
Yifan LI





 On 02 May 2015, at 23:23, Olivier Girardot ssab...@gmail.com wrote:
 
 I guess : 
 
 val srdd_s1 = srdd.filter(_.startsWith(s1_)).sortBy(_)
 val srdd_s2 = srdd.filter(_.startsWith(s2_)).sortBy(_)
 val srdd_s3 = srdd.filter(_.startsWith(s3_)).sortBy(_)
 
 Regards, 
 
 Olivier.
 
 Le sam. 2 mai 2015 à 22:53, Yifan LI iamyifa...@gmail.com 
 mailto:iamyifa...@gmail.com a écrit :
 Hi,
 
 I have an RDD srdd containing (unordered-)data like this:
 s1_0, s3_0, s2_1, s2_2, s3_1, s1_3, s1_2, …
 
 What I want is (it will be much better if they could be in ascending order):
 srdd_s1:
 s1_0, s1_1, s1_2, …, s1_n
 srdd_s2:
 s2_0, s2_1, s2_2, …, s2_n
 srdd_s3:
 s3_0, s3_1, s3_2, …, s3_n
 …
 …
 
 Have any idea? Thanks in advance! :)
 
 
 Best,
 Yifan LI
 
 
 
 
 



How to avoid the repartitioning in graph construction

2015-03-27 Thread Yifan LI
Hi,

Now I have 10 edge data files in my HDFS directory, e.g. edges_part00, 
edges_part01, …, edges_part09
format: srcId tarId
(They make a good partitioning of that whole graph, so I never expect any 
change(re-partitoning operations) on them during graph building).



I am thinking of how to use them to construct graph using Graphx api, without 
any repartitioning.

My idea:
1) to build an RDD, edgeTupleRDD, by using sc.textFile(“hdfs://myDirectory”)
in where each file size is limited below 64MB(smaller than a HDFS block)
so, normally I could get 1 partitions per file, right?

2) then, to build the graph by using Graph.fromEdgeTuples(edgeTupleRDD,..)
from graphx documentation, this operation will keep those partitions without 
any change, right?

——— — 
- Is there any other idea, or anything I missed?
- if a file is larger than 64MB(the default size of a HDFS block), the 
repartitioning will be inevitable??



Thanks in advance!

Best,
Yifan LI







Re: Processing graphs

2015-02-17 Thread Yifan LI
Hi Kannan,

I am not sure I have understood what your question is exactly, but maybe the 
reduceByKey or reduceByKeyLocally functionality is better to your need.



Best,
Yifan LI





 On 17 Feb 2015, at 17:37, Vijayasarathy Kannan kvi...@vt.edu wrote:
 
 Hi,
 
 I am working on a Spark application that processes graphs and I am trying to 
 do the following.
 
 - group the vertices (key - vertex, value - set of its outgoing edges)
 - distribute each key to separate processes and process them (like mapper)
 - reduce the results back at the main process
 
 Does the groupBy functionality do the distribution by default?
 Do we have to explicitly use RDDs to enable automatic distribution?
 
 It'd be great if you could help me understand these and how to go about with 
 the problem.
 
 Thanks.



Re: OutofMemoryError: Java heap space

2015-02-12 Thread Yifan LI
Thanks, Kelvin :)

The error seems to disappear after I decreased both 
spark.storage.memoryFraction and spark.shuffle.memoryFraction to 0.2

And, some increase on driver memory.

 

Best,
Yifan LI





 On 10 Feb 2015, at 18:58, Kelvin Chu 2dot7kel...@gmail.com wrote:
 
 Since the stacktrace shows kryo is being used, maybe, you could also try 
 increasing spark.kryoserializer.buffer.max.mb. Hope this help.
 
 Kelvin
 
 On Tue, Feb 10, 2015 at 1:26 AM, Akhil Das ak...@sigmoidanalytics.com 
 mailto:ak...@sigmoidanalytics.com wrote:
 You could try increasing the driver memory. Also, can you be more specific 
 about the data volume?
 
 Thanks
 Best Regards
 
 On Mon, Feb 9, 2015 at 3:30 PM, Yifan LI iamyifa...@gmail.com 
 mailto:iamyifa...@gmail.com wrote:
 Hi,
 
 I just found the following errors during computation(graphx), anyone has 
 ideas on this? thanks so much!
 
 (I think the memory is sufficient, spark.executor.memory  30GB )
 
 
 15/02/09 00:37:12 ERROR Executor: Exception in task 162.0 in stage 719.0 (TID 
 7653)
 java.lang.OutOfMemoryError: Java heap space
   at 
 com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:410)
   at 
 com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:113)
   at 
 com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapReferenceResolver.java:23)
   at com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:598)
   at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:566)
   at 
 com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:29)
   at 
 com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:27)
   at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
   at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
   at 
 scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
   at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
   at com.twitter.chill.TraversableSerializer.write(Traversable.scala:27)
   at com.twitter.chill.TraversableSerializer.write(Traversable.scala:21)
   at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
   at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:36)
   at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33)
   at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
   at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37)
   at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33)
   at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
   at 
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
   at 
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
   at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
   at 
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
   at 
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
   at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
   at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37)
   at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33)
   at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
   at 
 org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:128)
   at 
 org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:110)
 15/02/09 00:37:12 ERROR SparkUncaughtExceptionHandler: Uncaught exception in 
 thread Thread[Executor task launch worker-15,5,main]
 java.lang.OutOfMemoryError: Java heap space
   at 
 com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:410)
   at 
 com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:113)
   at 
 com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapReferenceResolver.java:23)
   at com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:598)
   at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:566)
   at 
 com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:29)
   at 
 com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:27)
   at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
   at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
   at 
 scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226

Re: OutofMemoryError: Java heap space

2015-02-10 Thread Yifan LI
Hi Akhil,

Excuse me, I am trying a random-walk algorithm over a not that large graph(~1GB 
raw dataset, including ~5million vertices and ~60million edges) on a cluster 
which has 20 machines.

And, the property of each vertex in graph is a hash map, of which size will 
increase dramatically during pregel supersteps. so, it seems to suffer from 
high GC?

Best,
Yifan LI





 On 10 Feb 2015, at 10:26, Akhil Das ak...@sigmoidanalytics.com wrote:
 
 You could try increasing the driver memory. Also, can you be more specific 
 about the data volume?
 
 Thanks
 Best Regards
 
 On Mon, Feb 9, 2015 at 3:30 PM, Yifan LI iamyifa...@gmail.com 
 mailto:iamyifa...@gmail.com wrote:
 Hi,
 
 I just found the following errors during computation(graphx), anyone has 
 ideas on this? thanks so much!
 
 (I think the memory is sufficient, spark.executor.memory  30GB )
 
 
 15/02/09 00:37:12 ERROR Executor: Exception in task 162.0 in stage 719.0 (TID 
 7653)
 java.lang.OutOfMemoryError: Java heap space
   at 
 com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:410)
   at 
 com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:113)
   at 
 com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapReferenceResolver.java:23)
   at com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:598)
   at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:566)
   at 
 com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:29)
   at 
 com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:27)
   at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
   at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
   at 
 scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
   at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
   at com.twitter.chill.TraversableSerializer.write(Traversable.scala:27)
   at com.twitter.chill.TraversableSerializer.write(Traversable.scala:21)
   at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
   at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:36)
   at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33)
   at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
   at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37)
   at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33)
   at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
   at 
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
   at 
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
   at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
   at 
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
   at 
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
   at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
   at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37)
   at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33)
   at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
   at 
 org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:128)
   at 
 org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:110)
 15/02/09 00:37:12 ERROR SparkUncaughtExceptionHandler: Uncaught exception in 
 thread Thread[Executor task launch worker-15,5,main]
 java.lang.OutOfMemoryError: Java heap space
   at 
 com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:410)
   at 
 com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:113)
   at 
 com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapReferenceResolver.java:23)
   at com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:598)
   at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:566)
   at 
 com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:29)
   at 
 com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:27)
   at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
   at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
   at 
 scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39

Re: OutofMemoryError: Java heap space

2015-02-10 Thread Yifan LI
Yes, I have read it, and am trying to find some way to do that… Thanks :)

Best,
Yifan LI





 On 10 Feb 2015, at 12:06, Akhil Das ak...@sigmoidanalytics.com wrote:
 
 Did you have a chance to look at this doc 
 http://spark.apache.org/docs/1.2.0/tuning.html 
 http://spark.apache.org/docs/1.2.0/tuning.html
 
 Thanks
 Best Regards
 
 On Tue, Feb 10, 2015 at 4:13 PM, Yifan LI iamyifa...@gmail.com 
 mailto:iamyifa...@gmail.com wrote:
 Hi Akhil,
 
 Excuse me, I am trying a random-walk algorithm over a not that large 
 graph(~1GB raw dataset, including ~5million vertices and ~60million edges) on 
 a cluster which has 20 machines.
 
 And, the property of each vertex in graph is a hash map, of which size will 
 increase dramatically during pregel supersteps. so, it seems to suffer from 
 high GC?
 
 Best,
 Yifan LI
 
 
 
 
 
 On 10 Feb 2015, at 10:26, Akhil Das ak...@sigmoidanalytics.com 
 mailto:ak...@sigmoidanalytics.com wrote:
 
 You could try increasing the driver memory. Also, can you be more specific 
 about the data volume?
 
 Thanks
 Best Regards
 
 On Mon, Feb 9, 2015 at 3:30 PM, Yifan LI iamyifa...@gmail.com 
 mailto:iamyifa...@gmail.com wrote:
 Hi,
 
 I just found the following errors during computation(graphx), anyone has 
 ideas on this? thanks so much!
 
 (I think the memory is sufficient, spark.executor.memory  30GB )
 
 
 15/02/09 00:37:12 ERROR Executor: Exception in task 162.0 in stage 719.0 
 (TID 7653)
 java.lang.OutOfMemoryError: Java heap space
  at 
 com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:410)
  at 
 com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:113)
  at 
 com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapReferenceResolver.java:23)
  at com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:598)
  at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:566)
  at 
 com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:29)
  at 
 com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:27)
  at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
  at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
  at 
 scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
  at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
  at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
  at com.twitter.chill.TraversableSerializer.write(Traversable.scala:27)
  at com.twitter.chill.TraversableSerializer.write(Traversable.scala:21)
  at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
  at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:36)
  at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33)
  at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
  at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37)
  at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33)
  at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
  at 
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
  at 
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
  at 
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
  at 
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
  at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
  at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37)
  at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33)
  at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
  at 
 org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:128)
  at 
 org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:110)
 15/02/09 00:37:12 ERROR SparkUncaughtExceptionHandler: Uncaught exception in 
 thread Thread[Executor task launch worker-15,5,main]
 java.lang.OutOfMemoryError: Java heap space
  at 
 com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:410)
  at 
 com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:113)
  at 
 com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapReferenceResolver.java:23)
  at com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:598)
  at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:566)
  at 
 com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:29

OutofMemoryError: Java heap space

2015-02-09 Thread Yifan LI
(TupleSerializers.scala:33)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at 
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
at 
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37)
at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at 
org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:128)
at 
org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:110)


Best,
Yifan LI







how to debug this kind of error, e.g. lost executor?

2015-02-05 Thread Yifan LI
Hi,

I am running a heavy memory/cpu overhead graphx application, I think the memory 
is sufficient and set RDDs’ StorageLevel using MEMORY_AND_DISK.

But I found there were some tasks failed due to following errors:

java.io.FileNotFoundException: 
/data/spark/local/spark-local-20150205151711-9700/09/rdd_3_275 (No files or 
folders of this type)

ExecutorLostFailure (executor 11 lost)


So, finally that stage failed:

org.apache.spark.shuffle.FetchFailedException: java.io.FileNotFoundException: 
/data/spark/local/spark-local-20150205151711-587a/16/shuffle_11_219_0.index


Anyone has points? Where I can get more details for this issue?


Best,
Yifan LI







Re: how to debug this kind of error, e.g. lost executor?

2015-02-05 Thread Yifan LI

Anyone has idea on where I can find the detailed log of that lost executor(why 
it was lost)?

Thanks in advance!





 On 05 Feb 2015, at 16:14, Yifan LI iamyifa...@gmail.com wrote:
 
 Hi,
 
 I am running a heavy memory/cpu overhead graphx application, I think the 
 memory is sufficient and set RDDs’ StorageLevel using MEMORY_AND_DISK.
 
 But I found there were some tasks failed due to following errors:
 
 java.io.FileNotFoundException: 
 /data/spark/local/spark-local-20150205151711-9700/09/rdd_3_275 (No files or 
 folders of this type)
 
 ExecutorLostFailure (executor 11 lost)
 
 
 So, finally that stage failed:
 
 org.apache.spark.shuffle.FetchFailedException: java.io.FileNotFoundException: 
 /data/spark/local/spark-local-20150205151711-587a/16/shuffle_11_219_0.index
 
 
 Anyone has points? Where I can get more details for this issue?
 
 
 Best,
 Yifan LI
 
 
 
 
 



Re: [Graphx Spark] Error of Lost executor and TimeoutException

2015-02-02 Thread Yifan LI
Thanks, Sonal.

But it seems to be an error happened when “cleaning broadcast”? 

BTW, what is the timeout of “[30 seconds]”? can I increase it?



Best,
Yifan LI





 On 02 Feb 2015, at 11:12, Sonal Goyal sonalgoy...@gmail.com wrote:
 
 That may be the cause of your issue. Take a look at the tuning guide[1] and 
 maybe also profile your application. See if you can reuse your objects. 
 
 1. http://spark.apache.org/docs/latest/tuning.html 
 http://spark.apache.org/docs/latest/tuning.html
 
 
 Best Regards,
 Sonal
 Founder, Nube Technologies http://www.nubetech.co/ 
 
  http://in.linkedin.com/in/sonalgoyal
 
 
 
 On Sat, Jan 31, 2015 at 4:21 AM, Yifan LI iamyifa...@gmail.com 
 mailto:iamyifa...@gmail.com wrote:
 Yes, I think so, esp. for a pregel application… have any suggestion?
 
 Best,
 Yifan LI
 
 
 
 
 
 On 30 Jan 2015, at 22:25, Sonal Goyal sonalgoy...@gmail.com 
 mailto:sonalgoy...@gmail.com wrote:
 
 Is your code hitting frequent garbage collection? 
 
 Best Regards,
 Sonal
 Founder, Nube Technologies http://www.nubetech.co/ 
 
  http://in.linkedin.com/in/sonalgoyal
 
 
 
 On Fri, Jan 30, 2015 at 7:52 PM, Yifan LI iamyifa...@gmail.com 
 mailto:iamyifa...@gmail.com wrote:
 
 
 
 Hi,
 
 I am running my graphx application on Spark 1.2.0(11 nodes cluster), has 
 requested 30GB memory per node and 100 cores for around 1GB input dataset(5 
 million vertices graph).
 
 But the error below always happen…
 
 Is there anyone could give me some points? 
 
 (BTW, the overall edge/vertex RDDs will reach more than 100GB during graph 
 computation, and another version of my application can work well on the 
 same dataset while it need much less memory during computation)
 
 Thanks in advance!!!
 
 
 15/01/29 18:05:08 ERROR ContextCleaner: Error cleaning broadcast 60
 java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
 at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
 at 
 scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
 at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
 at 
 scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
 at scala.concurrent.Await$.result(package.scala:107)
 at 
 org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:137)
 at 
 org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:227)
 at 
 org.apache.spark.broadcast.TorrentBroadcastFactory.unbroadcast(TorrentBroadcastFactory.scala:45)
 at 
 org.apache.spark.broadcast.BroadcastManager.unbroadcast(BroadcastManager.scala:66)
 at 
 org.apache.spark.ContextCleaner.doCleanupBroadcast(ContextCleaner.scala:185)
 at 
 org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:147)
 at 
 org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:138)
 at scala.Option.foreach(Option.scala:236)
 at 
 org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:138)
 at 
 org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply(ContextCleaner.scala:134)
 at 
 org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply(ContextCleaner.scala:134)
 at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1460)
 at org.apache.spark.ContextCleaner.org 
 http://org.apache.spark.contextcleaner.org/$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:133)
 at org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:65)
 [Stage 91:===  (2 + 4) 
 / 6]15/01/29 18:08:15 ERROR SparkDeploySchedulerBackend: Asked to remove 
 non-existent executor 0
 [Stage 93:  (29 + 20) 
 / 49]15/01/29 23:47:03 ERROR TaskSchedulerImpl: Lost executor 9 on 
 small11-tap1.common.lip6.fr http://small11-tap1.common.lip6.fr/: remote 
 Akka client disassociated
 [Stage 83:   (1 + 0) / 6][Stage 86:   (0 + 1) / 2][Stage 88:   (0 + 2) / 
 8]15/01/29 23:47:06 ERROR SparkDeploySchedulerBackend: Asked to remove 
 non-existent executor 9
 [Stage 83:===  (5 + 1) / 6][Stage 88:=   (9 + 2) 
 / 11]15/01/29 23:57:30 ERROR TaskSchedulerImpl: Lost executor 8 on 
 small10-tap1.common.lip6.fr http://small10-tap1.common.lip6.fr/: remote 
 Akka client disassociated
 15/01/29 23:57:30 ERROR SparkDeploySchedulerBackend: Asked to remove 
 non-existent executor 8
 15/01/29 23:57:30 ERROR SparkDeploySchedulerBackend: Asked to remove 
 non-existent executor 8
 
 Best,
 Yifan LI
 
 
 
 
 
 
 
 
 



Re: [Graphx Spark] Error of Lost executor and TimeoutException

2015-02-02 Thread Yifan LI
I think this broadcast cleaning(memory block remove?) timeout exception was 
caused by:

15/02/02 11:48:49 ERROR TaskSchedulerImpl: Lost executor 13 on 
small18-tap1.common.lip6.fr: remote Akka client disassociated
15/02/02 11:48:49 ERROR SparkDeploySchedulerBackend: Asked to remove 
non-existent executor 13
15/02/02 11:48:49 ERROR SparkDeploySchedulerBackend: Asked to remove 
non-existent executor 13

Anyone has points on this?


Best,
Yifan LI





 On 02 Feb 2015, at 11:47, Yifan LI iamyifa...@gmail.com wrote:
 
 Thanks, Sonal.
 
 But it seems to be an error happened when “cleaning broadcast”? 
 
 BTW, what is the timeout of “[30 seconds]”? can I increase it?
 
 
 
 Best,
 Yifan LI
 
 
 
 
 
 On 02 Feb 2015, at 11:12, Sonal Goyal sonalgoy...@gmail.com 
 mailto:sonalgoy...@gmail.com wrote:
 
 That may be the cause of your issue. Take a look at the tuning guide[1] and 
 maybe also profile your application. See if you can reuse your objects. 
 
 1. http://spark.apache.org/docs/latest/tuning.html 
 http://spark.apache.org/docs/latest/tuning.html
 
 
 Best Regards,
 Sonal
 Founder, Nube Technologies http://www.nubetech.co/ 
 
  http://in.linkedin.com/in/sonalgoyal
 
 
 
 On Sat, Jan 31, 2015 at 4:21 AM, Yifan LI iamyifa...@gmail.com 
 mailto:iamyifa...@gmail.com wrote:
 Yes, I think so, esp. for a pregel application… have any suggestion?
 
 Best,
 Yifan LI
 
 
 
 
 
 On 30 Jan 2015, at 22:25, Sonal Goyal sonalgoy...@gmail.com 
 mailto:sonalgoy...@gmail.com wrote:
 
 Is your code hitting frequent garbage collection? 
 
 Best Regards,
 Sonal
 Founder, Nube Technologies http://www.nubetech.co/ 
 
  http://in.linkedin.com/in/sonalgoyal
 
 
 
 On Fri, Jan 30, 2015 at 7:52 PM, Yifan LI iamyifa...@gmail.com 
 mailto:iamyifa...@gmail.com wrote:
 
 
 
 Hi,
 
 I am running my graphx application on Spark 1.2.0(11 nodes cluster), has 
 requested 30GB memory per node and 100 cores for around 1GB input 
 dataset(5 million vertices graph).
 
 But the error below always happen…
 
 Is there anyone could give me some points? 
 
 (BTW, the overall edge/vertex RDDs will reach more than 100GB during graph 
 computation, and another version of my application can work well on the 
 same dataset while it need much less memory during computation)
 
 Thanks in advance!!!
 
 
 15/01/29 18:05:08 ERROR ContextCleaner: Error cleaning broadcast 60
 java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at 
 scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at 
 scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at 
 org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:137)
at 
 org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:227)
at 
 org.apache.spark.broadcast.TorrentBroadcastFactory.unbroadcast(TorrentBroadcastFactory.scala:45)
at 
 org.apache.spark.broadcast.BroadcastManager.unbroadcast(BroadcastManager.scala:66)
at 
 org.apache.spark.ContextCleaner.doCleanupBroadcast(ContextCleaner.scala:185)
at 
 org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:147)
at 
 org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:138)
at scala.Option.foreach(Option.scala:236)
at 
 org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:138)
at 
 org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply(ContextCleaner.scala:134)
at 
 org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply(ContextCleaner.scala:134)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1460)
at org.apache.spark.ContextCleaner.org 
 http://org.apache.spark.contextcleaner.org/$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:133)
at org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:65)
 [Stage 91:===  (2 + 
 4) / 6]15/01/29 18:08:15 ERROR SparkDeploySchedulerBackend: Asked to 
 remove non-existent executor 0
 [Stage 93:  (29 + 20) 
 / 49]15/01/29 23:47:03 ERROR TaskSchedulerImpl: Lost executor 9 on 
 small11-tap1.common.lip6.fr http://small11-tap1.common.lip6.fr/: remote 
 Akka client disassociated
 [Stage 83:   (1 + 0) / 6][Stage 86:   (0 + 1) / 2][Stage 88:   (0 + 2) 
 / 8]15/01/29 23:47:06 ERROR SparkDeploySchedulerBackend: Asked to remove 
 non-existent executor 9
 [Stage 83:===  (5 + 1) / 6][Stage 88

Re: [Graphx Spark] Error of Lost executor and TimeoutException

2015-01-30 Thread Yifan LI
Yes, I think so, esp. for a pregel application… have any suggestion?

Best,
Yifan LI





 On 30 Jan 2015, at 22:25, Sonal Goyal sonalgoy...@gmail.com wrote:
 
 Is your code hitting frequent garbage collection? 
 
 Best Regards,
 Sonal
 Founder, Nube Technologies http://www.nubetech.co/ 
 
  http://in.linkedin.com/in/sonalgoyal
 
 
 
 On Fri, Jan 30, 2015 at 7:52 PM, Yifan LI iamyifa...@gmail.com 
 mailto:iamyifa...@gmail.com wrote:
 
 
 
 Hi,
 
 I am running my graphx application on Spark 1.2.0(11 nodes cluster), has 
 requested 30GB memory per node and 100 cores for around 1GB input dataset(5 
 million vertices graph).
 
 But the error below always happen…
 
 Is there anyone could give me some points? 
 
 (BTW, the overall edge/vertex RDDs will reach more than 100GB during graph 
 computation, and another version of my application can work well on the same 
 dataset while it need much less memory during computation)
 
 Thanks in advance!!!
 
 
 15/01/29 18:05:08 ERROR ContextCleaner: Error cleaning broadcast 60
 java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
  at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
  at 
 scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
  at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
  at 
 scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
  at scala.concurrent.Await$.result(package.scala:107)
  at 
 org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:137)
  at 
 org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:227)
  at 
 org.apache.spark.broadcast.TorrentBroadcastFactory.unbroadcast(TorrentBroadcastFactory.scala:45)
  at 
 org.apache.spark.broadcast.BroadcastManager.unbroadcast(BroadcastManager.scala:66)
  at 
 org.apache.spark.ContextCleaner.doCleanupBroadcast(ContextCleaner.scala:185)
  at 
 org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:147)
  at 
 org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:138)
  at scala.Option.foreach(Option.scala:236)
  at 
 org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:138)
  at 
 org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply(ContextCleaner.scala:134)
  at 
 org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply(ContextCleaner.scala:134)
  at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1460)
  at org.apache.spark.ContextCleaner.org 
 http://org.apache.spark.contextcleaner.org/$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:133)
  at org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:65)
 [Stage 91:===  (2 + 4) 
 / 6]15/01/29 18:08:15 ERROR SparkDeploySchedulerBackend: Asked to remove 
 non-existent executor 0
 [Stage 93:  (29 + 20) / 
 49]15/01/29 23:47:03 ERROR TaskSchedulerImpl: Lost executor 9 on 
 small11-tap1.common.lip6.fr http://small11-tap1.common.lip6.fr/: remote 
 Akka client disassociated
 [Stage 83:   (1 + 0) / 6][Stage 86:   (0 + 1) / 2][Stage 88:   (0 + 2) / 
 8]15/01/29 23:47:06 ERROR SparkDeploySchedulerBackend: Asked to remove 
 non-existent executor 9
 [Stage 83:===  (5 + 1) / 6][Stage 88:=   (9 + 2) / 
 11]15/01/29 23:57:30 ERROR TaskSchedulerImpl: Lost executor 8 on 
 small10-tap1.common.lip6.fr http://small10-tap1.common.lip6.fr/: remote 
 Akka client disassociated
 15/01/29 23:57:30 ERROR SparkDeploySchedulerBackend: Asked to remove 
 non-existent executor 8
 15/01/29 23:57:30 ERROR SparkDeploySchedulerBackend: Asked to remove 
 non-existent executor 8
 
 Best,
 Yifan LI
 
 
 
 
 
 
 



[Graphx Spark] Error of Lost executor and TimeoutException

2015-01-30 Thread Yifan LI

 
 
 Hi,
 
 I am running my graphx application on Spark 1.2.0(11 nodes cluster), has 
 requested 30GB memory per node and 100 cores for around 1GB input dataset(5 
 million vertices graph).
 
 But the error below always happen…
 
 Is there anyone could give me some points? 
 
 (BTW, the overall edge/vertex RDDs will reach more than 100GB during graph 
 computation, and another version of my application can work well on the same 
 dataset while it need much less memory during computation)
 
 Thanks in advance!!!
 
 
 15/01/29 18:05:08 ERROR ContextCleaner: Error cleaning broadcast 60
 java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
   at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
   at 
 scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
   at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
   at 
 scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
   at scala.concurrent.Await$.result(package.scala:107)
   at 
 org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:137)
   at 
 org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:227)
   at 
 org.apache.spark.broadcast.TorrentBroadcastFactory.unbroadcast(TorrentBroadcastFactory.scala:45)
   at 
 org.apache.spark.broadcast.BroadcastManager.unbroadcast(BroadcastManager.scala:66)
   at 
 org.apache.spark.ContextCleaner.doCleanupBroadcast(ContextCleaner.scala:185)
   at 
 org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:147)
   at 
 org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:138)
   at scala.Option.foreach(Option.scala:236)
   at 
 org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:138)
   at 
 org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply(ContextCleaner.scala:134)
   at 
 org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply(ContextCleaner.scala:134)
   at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1460)
   at org.apache.spark.ContextCleaner.org 
 http://org.apache.spark.contextcleaner.org/$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:133)
   at org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:65)
 [Stage 91:===  (2 + 4) / 
 6]15/01/29 18:08:15 ERROR SparkDeploySchedulerBackend: Asked to remove 
 non-existent executor 0
 [Stage 93:  (29 + 20) / 
 49]15/01/29 23:47:03 ERROR TaskSchedulerImpl: Lost executor 9 on 
 small11-tap1.common.lip6.fr http://small11-tap1.common.lip6.fr/: remote 
 Akka client disassociated
 [Stage 83:   (1 + 0) / 6][Stage 86:   (0 + 1) / 2][Stage 88:   (0 + 2) / 
 8]15/01/29 23:47:06 ERROR SparkDeploySchedulerBackend: Asked to remove 
 non-existent executor 9
 [Stage 83:===  (5 + 1) / 6][Stage 88:=   (9 + 2) / 
 11]15/01/29 23:57:30 ERROR TaskSchedulerImpl: Lost executor 8 on 
 small10-tap1.common.lip6.fr http://small10-tap1.common.lip6.fr/: remote 
 Akka client disassociated
 15/01/29 23:57:30 ERROR SparkDeploySchedulerBackend: Asked to remove 
 non-existent executor 8
 15/01/29 23:57:30 ERROR SparkDeploySchedulerBackend: Asked to remove 
 non-existent executor 8
 
 Best,
 Yifan LI
 
 
 
 
 



[Graphx Spark] Error of Lost executor and TimeoutException

2015-01-30 Thread Yifan LI
Hi,

I am running my graphx application on Spark 1.2.0(11 nodes cluster), has 
requested 30GB memory per node and 100 cores for around 1GB input dataset(5 
million vertices graph).

But the error below always happen…

Is there anyone could give me some points? 

(BTW, the overall edge/vertex RDDs will reach more than 100GB during graph 
computation, and another version of my application can work well on the same 
dataset while it need much less memory during computation)

Thanks in advance!!!


15/01/29 18:05:08 ERROR ContextCleaner: Error cleaning broadcast 60
java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at 
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at 
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at 
org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:137)
at 
org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:227)
at 
org.apache.spark.broadcast.TorrentBroadcastFactory.unbroadcast(TorrentBroadcastFactory.scala:45)
at 
org.apache.spark.broadcast.BroadcastManager.unbroadcast(BroadcastManager.scala:66)
at 
org.apache.spark.ContextCleaner.doCleanupBroadcast(ContextCleaner.scala:185)
at 
org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:147)
at 
org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:138)
at scala.Option.foreach(Option.scala:236)
at 
org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:138)
at 
org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply(ContextCleaner.scala:134)
at 
org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply(ContextCleaner.scala:134)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1460)
at 
org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:133)
at org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:65)
[Stage 91:===  (2 + 4) / 
6]15/01/29 18:08:15 ERROR SparkDeploySchedulerBackend: Asked to remove 
non-existent executor 0
[Stage 93:  (29 + 20) / 
49]15/01/29 23:47:03 ERROR TaskSchedulerImpl: Lost executor 9 on 
small11-tap1.common.lip6.fr: remote Akka client disassociated
[Stage 83:   (1 + 0) / 6][Stage 86:   (0 + 1) / 2][Stage 88:   (0 + 2) / 
8]15/01/29 23:47:06 ERROR SparkDeploySchedulerBackend: Asked to remove 
non-existent executor 9
[Stage 83:===  (5 + 1) / 6][Stage 88:=   (9 + 2) / 
11]15/01/29 23:57:30 ERROR TaskSchedulerImpl: Lost executor 8 on 
small10-tap1.common.lip6.fr: remote Akka client disassociated
15/01/29 23:57:30 ERROR SparkDeploySchedulerBackend: Asked to remove 
non-existent executor 8
15/01/29 23:57:30 ERROR SparkDeploySchedulerBackend: Asked to remove 
non-existent executor 8

Best,
Yifan LI







300% Fraction Cached?

2014-12-19 Thread Yifan LI
Hi,

I just saw an Edge RDD is 300% Fraction Cached” in Storage WebUI, what does 
that mean? I can understand if the value was under 100%…

Thanks.

Best,
Yifan LI







[Graphx] the communication cost of leftJoin

2014-12-12 Thread Yifan LI
Hi,

I am trying to leftJoin an other vertice RDD(e.g vB) with this one(vA).
vA.leftJoin(vB)(f)

- vA is the vertices RDD in graph G, and G is edge-partitioned using 
EdgePartition2D.

- vB is created using default partitioner(actually I am not sure...)

So, I am wondering, that if vB has same partitioner to vA, what will 
graphx(spark) do to handle this case?
for instance, as below
1) to check the partitioner of vB.
2) to do leftJoin operations, on each machine separately, for those co-located 
partitions of vA and vB.
right?

But, if vB’s partitioner is different, what will happen? how they communicate 
between partitions(and machines)?


Anyone has some points on this, or communication between RDDs? 

Thanks, :)

Best,
Yifan LI







[Graphx] which way is better to access faraway neighbors?

2014-12-05 Thread Yifan LI
Hi,

I have a graph in where each vertex keep several messages to some faraway 
neighbours(I mean, not to only immediate neighbours, at most k-hops far, e.g. k 
= 5).

now, I propose to distribute these messages to their corresponding 
destinations(say, faraway neighbours”):

- by using pregel api, one superset is enough 
- by using spark basic operations(groupByKey, leftJoin, etc) on vertices RDD 
and its intermediate results.

w.r.t the communication among machines, and the high cost of 
groupByKey/leftJoin, I guess that 1st option is better?

what’s your idea?


Best,
Yifan LI







map function

2014-12-04 Thread Yifan LI
Hi,

I have a RDD like below:
(1, (10, 20))
(2, (30, 40, 10))
(3, (30))
…

Is there any way to map it to this:
(10,1)
(20,1)
(30,2)
(40,2)
(10,2)
(30,3)
…

generally, for each element, it might be mapped to multiple.

Thanks in advance! 


Best,
Yifan LI







Re: map function

2014-12-04 Thread Yifan LI
Thanks, Paolo and Mark. :)


 On 04 Dec 2014, at 11:58, Paolo Platter paolo.plat...@agilelab.it wrote:
 
 Hi,
 
 rdd.flatMap( e = e._2.map( i = ( i, e._1)))
 
 Should work, but I didn't test it so maybe I'm missing something.
 
 Paolo
 
 Inviata dal mio Windows Phone
 Da: Yifan LI mailto:iamyifa...@gmail.com
 Inviato: ‎04/‎12/‎2014 09:27
 A: user@spark.apache.org mailto:user@spark.apache.org
 Oggetto: map function 
 
 Hi,
 
 I have a RDD like below:
 (1, (10, 20))
 (2, (30, 40, 10))
 (3, (30))
 …
 
 Is there any way to map it to this:
 (10,1)
 (20,1)
 (30,2)
 (40,2)
 (10,2)
 (30,3)
 …
 
 generally, for each element, it might be mapped to multiple.
 
 Thanks in advance! 
 
 
 Best,
 Yifan LI
 
 
 
 
 



[graphx] failed to submit an application with java.lang.ClassNotFoundException

2014-11-27 Thread Yifan LI
Hi,

I just tried to submit an application from graphx examples directory, but it 
failed:

yifan2:bin yifanli$ MASTER=local[*] ./run-example graphx.PPR_hubs
java.lang.ClassNotFoundException: org.apache.spark.examples.graphx.PPR_hubs
at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:249)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:318)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

and also,
yifan2:bin yifanli$ ./spark-submit --class 
org.apache.spark.examples.graphx.PPR_hubs 
../examples/target/scala-2.10/spark-examples-1.2.0-SNAPSHOT-hadoop1.0.4.jar
java.lang.ClassNotFoundException: org.apache.spark.examples.graphx.PPR_hubs
at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:249)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:318)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

anyone has some points on this?



Best,
Yifan LI







Re: How to measure communication between nodes in Spark Standalone Cluster?

2014-11-17 Thread Yifan LI
I am not sure there is a direct way(an api in graphx, etc) to measure the 
number of transferred vertex values among nodes during computation.

It might depend on:
- the operations in your application, e.g. only communicate with its immediate 
neighbours for each vertex.
- the partition strategy you chose, wrt the vertices replication factor
- the distribution of partitions on cluster
...


Best,
Yifan LI
LIP6, UPMC, Paris





 On 17 Nov 2014, at 11:59, Hlib Mykhailenko hlib.mykhaile...@inria.fr wrote:
 
 Hello,
 
 I use Spark Standalone Cluster and I want to measure somehow internode 
 communication. 
 As I understood, Graphx transfers only vertices values. Am I right?  
 
 But I do not want to get number of bytes which were transferred between any 
 two nodes.
 So is  there way to measure how many values of vertices were transferred 
 among nodes?
 
 Thanks!
 
 --
 Cordialement,
 Hlib Mykhailenko
 Doctorant à INRIA Sophia-Antipolis Méditerranée 
 http://www.inria.fr/centre/sophia/
 2004 Route des Lucioles BP93
 06902 SOPHIA ANTIPOLIS cedex
 



Re: How to set persistence level of graph in GraphX in spark 1.0.0

2014-10-28 Thread Yifan LI
Hi Arpit,

To try this:

val graph = GraphLoader.edgeListFile(sc, edgesFile, minEdgePartitions = 
numPartitions, edgeStorageLevel = StorageLevel.MEMORY_AND_DISK,
 vertexStorageLevel = StorageLevel.MEMORY_AND_DISK)


Best,
Yifan LI




On 28 Oct 2014, at 11:17, Arpit Kumar arp8...@gmail.com wrote:

 Any help regarding this issue please?
 
 Regards,
 Arpit
 
 On Sat, Oct 25, 2014 at 8:56 AM, Arpit Kumar arp8...@gmail.com wrote:
 Hi all,
 I am using the GrpahLoader class to load graphs from edge list files. But 
 then I need to change the storage level of the graph to some other thing than 
 MEMORY_ONLY.
 
 val graph = GraphLoader.edgeListFile(sc, fname,
   minEdgePartitions = 
 numEPart).persist(StorageLevel.MEMORY_AND_DISK_SER)
 
 The error I am getting while executing this is:
 Exception in thread main java.lang.UnsupportedOperationException: Cannot 
 change storage level of an RDD after it was already assigned a level
 
 
 Then I looked into the GraphLoader class. I know that in the latest version 
 of spark support for setting persistence level is provided in this class. 
 Please suggest a workaround for spark 1.0.0 as I do not have the option to 
 shift to latest release.
 
 Note: I tried copying the GraphLoader class to my package as GraphLoader1 
 importing 
 
 package com.cloudera.xyz
 
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.graphx._
 import org.apache.spark.{Logging, SparkContext}
 import org.apache.spark.graphx.impl._
 
 and then changing the persistence level to my suitability as 
 .persist(gStorageLevel) instead of .cache()
 
 But while compiling I am getting the following errors
 
 GraphLoader1.scala:49: error: class EdgePartitionBuilder in package impl 
 cannot be accessed in package org.apache.spark.graphx.impl
 [INFO]   val builder = new EdgePartitionBuilder[Int, Int]
 
 I am also attaching the file with the mail. Maybe this way of doing thing is 
 not possible.
 
 
 Please suggest some workarounds so that I can set persistence level of my 
 graph to MEMORY_AND_DISK_SER for the graph I read from edge file list
 
 
 
 -- 
 Arpit Kumar
 Fourth Year Undergraduate
 Department of Computer Science and Engineering
 Indian Institute of Technology, Kharagpur



Re: How to set persistence level of graph in GraphX in spark 1.0.0

2014-10-28 Thread Yifan LI
I am not sure if it can work on Spark 1.0, but give it a try.

or, Maybe you can try:
1) to construct the edges and vertices RDDs respectively with desired storage 
level.
2) then, to obtain a graph by using Graph(verticesRDD, edgesRDD).

Best,
Yifan LI




On 28 Oct 2014, at 12:10, Arpit Kumar arp8...@gmail.com wrote:

 Hi Yifan LI,
 I am currently working on Spark 1.0 in which we can't pass edgeStorageLevel 
 as parameter. It implicitly caches the edges. So I am looking for a 
 workaround.
 
 http://spark.apache.org/docs/1.0.0/api/scala/index.html#org.apache.spark.graphx.GraphLoader$
 
 Regards,
 Arpit
 
 On Tue, Oct 28, 2014 at 4:25 PM, Yifan LI iamyifa...@gmail.com wrote:
 Hi Arpit,
 
 To try this:
 
 val graph = GraphLoader.edgeListFile(sc, edgesFile, minEdgePartitions = 
 numPartitions, edgeStorageLevel = StorageLevel.MEMORY_AND_DISK,
  vertexStorageLevel = StorageLevel.MEMORY_AND_DISK)
 
 
 Best,
 Yifan LI
 
 
 
 
 On 28 Oct 2014, at 11:17, Arpit Kumar arp8...@gmail.com wrote:
 
 Any help regarding this issue please?
 
 Regards,
 Arpit
 
 On Sat, Oct 25, 2014 at 8:56 AM, Arpit Kumar arp8...@gmail.com wrote:
 Hi all,
 I am using the GrpahLoader class to load graphs from edge list files. But 
 then I need to change the storage level of the graph to some other thing 
 than MEMORY_ONLY.
 
 val graph = GraphLoader.edgeListFile(sc, fname,
   minEdgePartitions = 
 numEPart).persist(StorageLevel.MEMORY_AND_DISK_SER)
 
 The error I am getting while executing this is:
 Exception in thread main java.lang.UnsupportedOperationException: Cannot 
 change storage level of an RDD after it was already assigned a level
 
 
 Then I looked into the GraphLoader class. I know that in the latest version 
 of spark support for setting persistence level is provided in this class. 
 Please suggest a workaround for spark 1.0.0 as I do not have the option to 
 shift to latest release.
 
 Note: I tried copying the GraphLoader class to my package as GraphLoader1 
 importing 
 
 package com.cloudera.xyz
 
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.graphx._
 import org.apache.spark.{Logging, SparkContext}
 import org.apache.spark.graphx.impl._
 
 and then changing the persistence level to my suitability as 
 .persist(gStorageLevel) instead of .cache()
 
 But while compiling I am getting the following errors
 
 GraphLoader1.scala:49: error: class EdgePartitionBuilder in package impl 
 cannot be accessed in package org.apache.spark.graphx.impl
 [INFO]   val builder = new EdgePartitionBuilder[Int, Int]
 
 I am also attaching the file with the mail. Maybe this way of doing thing is 
 not possible.
 
 
 Please suggest some workarounds so that I can set persistence level of my 
 graph to MEMORY_AND_DISK_SER for the graph I read from edge file list
 
 
 
 -- 
 Arpit Kumar
 Fourth Year Undergraduate
 Department of Computer Science and Engineering
 Indian Institute of Technology, Kharagpur
 
 
 
 
 -- 
 Arpit Kumar
 Fourth Year Undergraduate
 Department of Computer Science and Engineering
 Indian Institute of Technology, Kharagpur



how to send message to specific vertex by Pregel api

2014-10-02 Thread Yifan LI
Hi,

Is there anyone having clue of sending messages to specific vertex(not to 
immediate neighbour), whose vId is stored in property of source vertex, in 
Pregel api?

More precisely, how to do this in sendMessage() ?
to pass more general Triplets into above function?

(Obviously we can do it using basic spark table operations(join, etc), for 
instance, in [1])


[1] http://event.cwi.nl/grades2014/03-salihoglu.pdf


Best,
Yifan LI

Re: vertex active/inactive feature in Pregel API ?

2014-09-16 Thread Yifan LI
Dear Ankur,

Thanks! :)

- from [1], and my understanding, the existing inactive feature in graphx 
pregel api is “if there is no in-edges, from active vertex, to this vertex, 
then we will say this one is inactive”, right?

For instance, there is a graph in which every vertex has at least one in-edges, 
then we run static Pagerank on it for 10 iterations. During this calculation, 
is there any vertex would be set inactive?


- for more “explicit active vertex tracking”, e.g. vote to halt, how to achieve 
it in existing api?
(I am not sure I got the point of [2], that “vote” function has already been 
introduced in graphx pregel api? )


Best,
Yifan LI

On 15 Sep 2014, at 23:07, Ankur Dave ankurd...@gmail.com wrote:

 At 2014-09-15 16:25:04 +0200, Yifan LI iamyifa...@gmail.com wrote:
 I am wondering if the vertex active/inactive(corresponding the change of its 
 value between two supersteps) feature is introduced in Pregel API of GraphX?
 
 Vertex activeness in Pregel is controlled by messages: if a vertex did not 
 receive a message in the previous iteration, its vertex program will not run 
 in the current iteration. Also, inactive vertices will not be able to send 
 messages because by default the sendMsg function will only be run on edges 
 where at least one of the adjacent vertices received a message. You can 
 change this behavior -- see the documentation for the activeDirection 
 parameter to Pregel.apply [1].
 
 There is also an open pull request to make active vertex tracking more 
 explicit by allowing vertices to vote to halt directly [2].
 
 Ankur
 
 [1] 
 http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.Pregel$
 [2] https://github.com/apache/spark/pull/1217



vertex active/inactive feature in Pregel API ?

2014-09-15 Thread Yifan LI
Hi,

I am wondering if the vertex active/inactive(corresponding the change of its 
value between two supersteps) feature is introduced in Pregel API of GraphX?

if it is not a default setting, how to call it below? 
  def sendMessage(edge: EdgeTriplet[(Int,HashMap[VertexId, Double]), Int]) =
Iterator((edge.dstId, hmCal(edge.srcAttr)))

or, I should do that by a customised measure function, e.g. by keeping its 
change in vertex attribute after each iteration.


I noticed that there is an optional parameter “skipStale in mrTriplets 
operator.


Best,
Yifan LI

Re: [GraphX] how to set memory configurations to avoid OutOfMemoryError GC overhead limit exceeded

2014-09-05 Thread Yifan LI
Thank you, Ankur! :)

But how to assign the storage level to a new vertices RDD that mapped from
an existing vertices RDD,
e.g.
*val newVertexRDD =
graph.collectNeighborIds(EdgeDirection.Out).map{case(id:VertexId,
a:Array[VertexId]) = (id, initialHashMap(a))}*

the new one will be combined with that existing edges RDD(MEMORY_AND_DISK)
to construct a new graph.
e.g.
val newGraph = Graph(newVertexRDD, graph.edges)


BTW, the return of newVertexRDD.getStorageLevel is StorageLevel(true, true,
false, true, 1), what does it mean?

Thanks in advance!

Best,
Yifan



2014-09-03 22:42 GMT+02:00 Ankur Dave ankurd...@gmail.com:

 At 2014-09-03 17:58:09 +0200, Yifan LI iamyifa...@gmail.com wrote:
  val graph = GraphLoader.edgeListFile(sc, edgesFile, minEdgePartitions =
 numPartitions).partitionBy(PartitionStrategy.EdgePartition2D).persist(StorageLevel.MEMORY_AND_DISK)
 
  Error: java.lang.UnsupportedOperationException: Cannot change storage
 level
  of an RDD after it was already assigned a level

 You have to pass the StorageLevel to GraphLoader.edgeListFile:

 val graph = GraphLoader.edgeListFile(
   sc, edgesFile, minEdgePartitions = numPartitions,
   edgeStorageLevel = StorageLevel.MEMORY_AND_DISK,
   vertexStorageLevel = StorageLevel.MEMORY_AND_DISK)
   .partitionBy(PartitionStrategy.EdgePartition2D)

 Ankur



Re: [GraphX] how to set memory configurations to avoid OutOfMemoryError GC overhead limit exceeded

2014-09-03 Thread Yifan LI
Hi Ankur,

Thanks so much for your advice.

But it failed when I tried to set the storage level in constructing a graph.

val graph = GraphLoader.edgeListFile(sc, edgesFile, minEdgePartitions =
numPartitions).partitionBy(PartitionStrategy.EdgePartition2D).persist(StorageLevel.MEMORY_AND_DISK)

Error: java.lang.UnsupportedOperationException: Cannot change storage level
of an RDD after it was already assigned a level


Is there anyone could give me help?

Best,
Yifan




2014-08-18 23:52 GMT+02:00 Ankur Dave ankurd...@gmail.com:

 On Mon, Aug 18, 2014 at 6:29 AM, Yifan LI iamyifa...@gmail.com wrote:

  I am testing our application(similar to personalised page rank using
 Pregel, and note that each vertex property will need pretty much more space
 to store after new iteration)

 [...]

 But when we ran it on larger graph(e.g. LiveJouranl), it always end at the
 error GC overhead limit exceeded, even the partitions number is increased
 to 48 from 8.


 If the graph (including vertex properties) is too large to fit in memory,
 you might try allowing it to spill to disk. When constructing the graph,
 you can set vertexStorageLevel and edgeStorageLevel to
 StorageLevel.MEMORY_AND_DISK. This should allow the algorithm to finish.

 Ankur http://www.ankurdave.com/



Re: GraphX

2014-08-02 Thread Yifan LI
Try this:

./bin/run-example graphx.LiveJournalPageRank edge_list_file…


On Aug 2, 2014, at 5:55 PM, Deep Pradhan pradhandeep1...@gmail.com wrote:

 Hi,
 I am running Spark in a single node cluster. I am able to run the codes in 
 Spark like SparkPageRank.scala, SparkKMeans.scala by the following command,
 bin/run-examples org.apache.spark.examples.SparkPageRank and the required 
 things
 Now, I want to run the Pagerank.scala that is there in GraphX. Do we have a 
 similar command like earlier one for this too?
 I also went through the run-example shell file and saw that the path is set 
 to org.apache.spark.examples
 How should I run graphx codes?
 Thank You
 



[GraphX] how to compute only a subset of vertices in the whole graph?

2014-08-02 Thread Yifan LI
Hi,

I just implemented our algorithm(like personalised pagerank) using Pregel api, 
and it seems works well.

But I am thinking of if I can compute only some selected vertexes(hubs), not to 
do update on every vertex…

is it possible to do this using Pregel API? 

or, more realistically, only hubs can receive messages and compute results in 
the LAST iteration? since we don't need the final result of non-hub vertices.



Best,
Yifan LI


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



the implications of some items in webUI

2014-07-22 Thread Yifan LI
Hi,

I am analysing the application processing on Spark(GraphX), but feeling a 
little confused on some items of webUI.

1) what is the difference between Duration(Stages - Completed Stages) and 
Task Time(Executors) ?
for instance, 43s VS. 5.6 m
Task Time is approximated to Duration multiplied with Total Tasks?

2) what are the exact meanings of Shuffle Read/Shuffle Write?


Best,
Yifan LI




Re: the default GraphX graph-partition strategy on multicore machine?

2014-07-21 Thread Yifan LI
Thanks so much, Ankur, :))

Excuse me but I am wondering that:

(for a chosen partition strategy for my application)
1.1)  how to check the size of each partition? is there any api, or log file?
1.2) how to check the processing cost of each partition(time, memory, etc)?

2.1) and the global communication cost of my application upon the chosen 
partition strategy?


On Jul 18, 2014, at 9:18 PM, Ankur Dave ankurd...@gmail.com wrote:

 Sorry, I didn't read your vertex replication example carefully, so my 
 previous answer is wrong. Here's the correct one:
 
 On Fri, Jul 18, 2014 at 9:13 AM, Yifan LI iamyifa...@gmail.com wrote:
 I don't understand, for instance, we have 3 edge partition tables(EA: a - b, 
 a - c; EB: a - d, a - e; EC: d - c ), 2 vertex partition tables(VA: a, b, 
 c; VB: d, e), the whole vertex table VA will be replicated to all these 3 
 edge partitions? since each of them refers to some vertexes in VA.
 
 Vertices can be replicated individually without requiring the entire vertex 
 partition to be replicated. In this case, here's what will get replicated to 
 each partition:
 
 EA: a (from VA), b (from VA), c (from VA)
 EB: a (from VA), d (from VB), e (from VB)
 EC: c (from VA), d (from VB)
 
 Ankur



Re: java.lang.OutOfMemoryError: GC overhead limit exceeded

2014-07-21 Thread Yifan LI
Thanks, Abel.


Best,
Yifan LI
On Jul 21, 2014, at 4:16 PM, Abel Coronado Iruegas acoronadoirue...@gmail.com 
wrote:

 Hi Yifan
 
 This works for me:
 
 export SPARK_JAVA_OPTS=-Xms10g -Xmx40g -XX:MaxPermSize=10g
 export ADD_JARS=/home/abel/spark/MLI/target/MLI-assembly-1.0.jar
 export SPARK_MEM=40g
 ./spark-shell 
 
 
 Regards
 
 
 On Mon, Jul 21, 2014 at 7:48 AM, Yifan LI iamyifa...@gmail.com wrote:
 Hi,
 
 I am trying to load the Graphx example dataset(LiveJournal, 1.08GB) through 
 Scala Shell on my standalone multicore machine(8 cpus, 16GB mem), but an 
 OutOfMemory error was returned when below code was running,
 
 val graph = GraphLoader.edgeListFile(sc, path, minEdgePartitions = 
 16).partitionBy(PartitionStrategy.RandomVertexCut)
 
 I guess I should set some parameters to JVM? like -Xmx5120m
 But how to do this in Scala Shell? 
 I directly used the bin/spark-shell to start spark and seems everything 
 works correctly in WebUI.
 
 Or, I should do parameters setting at somewhere(spark-1.0.1)?
 
 
 
 Best,
 Yifan LI
 



Re: the default GraphX graph-partition strategy on multicore machine?

2014-07-18 Thread Yifan LI
Hi Ankur,

Thanks so much! :))

Yes, is possible to defining a custom partition strategy?

And, some other questions:
(2*4 cores machine, 24GB memory)

- if I load one edges file(5 GB), without any cores/partitions setting, what is 
the default partition in graph construction? and how many cores will be used?
Or, if the size of file is 50 GB(more than available memory, without partition 
setting)?

- because each vertex must be replicated to all partitions where it is 
referenced.
I don't understand, for instance, we have 3 edge partition tables(EA: a - b, a 
- c; EB: a - d, a - e; EC: d - c ), 2 vertex partition tables(VA: a, b, c; 
VB: d, e), the whole vertex table VA will be replicated to all these 3 edge 
partitions? since each of them refers to some vertexes in VA.

- there is no shared-memory parallelism.
You mean that the core is stricter to access only its own partition in memory?
how do they communicate when the required data(edges?) in another partition?



On Jul 15, 2014, at 9:30 PM, Ankur Dave ankurd...@gmail.com wrote:

 On Jul 15, 2014, at 12:06 PM, Yifan LI iamyifa...@gmail.com wrote: 
 Btw, is there any possibility to customise the partition strategy as we 
 expect?
 
 I'm not sure I understand. Are you asking about defining a custom partition 
 strategy?
 
 On Tue, Jul 15, 2014 at 6:20 AM, Yifan LI iamyifa...@gmail.com wrote:
 when I load the file using sc.textFile (minPartitions = 16, 
 PartitionStrategy.RandomVertexCut)
 
 The easiest way to load the edge file would actually be to use 
 GraphLoader.edgeListFile(sc, path, minEdgePartitions = 
 16).partitionBy(PartitionStrategy.RandomVertexCut).
 
 1) how much data will be loaded into memory?
 
 The exact size of the graph (vertices + edges) in memory depends on the 
 graph's structure, the partition function, and the average vertex degree, 
 because each vertex must be replicated to all partitions where it is 
 referenced.
 
 It's easier to estimate the size of just the edges, which I did on the 
 mailing list a while ago. To summarize, during graph construction each edge 
 takes 60 bytes, and once the graph is constructed it takes 20 bytes.
 
 2) how many partitions will be stored in memory?
 
 Once you call cache() on the graph, all 16 partitions will be stored in 
 memory. You can also tell GraphX to spill them to disk in case of memory 
 pressure by passing edgeStorageLevel=StorageLevel.MEMORY_AND_DISK to 
 GraphLoader.edgeListFile.
 
 3) If the thread/task on each core will read only one edge from memory and 
 then compute it at every time?
 
 Yes, each task is single-threaded, so it will read the edges in its partition 
 sequentially.
 
 3.1) which edge on memory was determined to read into cache?
 
 In theory, each core should independently read the edges in its partition 
 into its own cache. Cache issues should be much less of a concern than in 
 most applications because different tasks (cores) operate on independent 
 partitions; there is no shared-memory parallelism. The tradeoff is heavier 
 reliance on shuffles.
 
 3.2) how are those partitions being scheduled?
 
 Spark handles the scheduling. There are details in Section 5.1 of the Spark 
 NSDI paper; in short, tasks are scheduled to run on the same machine as their 
 data partition, and by default each machine can accept at most one task per 
 core.
 
 Ankur
 



Re: the default GraphX graph-partition strategy on multicore machine?

2014-07-15 Thread Yifan LI
Dear Ankur,

Thanks so much!

Btw, is there any possibility to customise the partition strategy as we expect?


Best,
Yifan
On Jul 11, 2014, at 10:20 PM, Ankur Dave ankurd...@gmail.com wrote:

 Hi Yifan,
 
 When you run Spark on a single machine, it uses a local mode where one task 
 per core can be executed at at a time -- that is, the level of parallelism is 
 the same as the number of cores.
 
 To take advantage of this, when you load a file using sc.textFile, you should 
 set the minPartitions argument to be the number of cores (available from 
 sc.defaultParallelism) or a multiple thereof. This will split up your local 
 edge file and allow you to take advantage of all the machine's cores.
 
 Once you've loaded the edge RDD with the appropriate number of partitions and 
 constructed a graph using it, GraphX will leave the edge partitioning alone. 
 During graph computation, each vertex will automatically be copied to the 
 edge partitions where it is needed, and the computation will execute in 
 parallel on each of the edge partitions (cores).
 
 If you later call Graph.partitionBy, it will by default preserve the number 
 of edge partitions, but shuffle around the edges according to the partition 
 strategy. This won't change the level of parallelism, but it might decrease 
 the amount of inter-core communication.
 
 Hope that helps! By the way, do continue to post your GraphX questions to the 
 Spark user list if possible. I'll probably still be the one answering them, 
 but that way others can benefit as well.
 
 Ankur
 
 
 On Fri, Jul 11, 2014 at 3:05 AM, Yifan LI iamyifa...@gmail.com wrote:
 Hi Ankur,
 
 I am doing graph computation using GraphX on a single multicore machine(not a 
 cluster).
 But It seems that I couldn't find enough docs w.r.t how GraphX partition 
 graph on a multicore machine.
 Could you give me some introduction or docs?
 
 For instance, I have one single edges file(not HDFS, etc), which follows the 
 srcID, dstID, edgeProperties format, maybe 100MB or 500GB on size.
 and the latest Spark 1.0.0(with GraphX) has been installed on a 64bit, 8*CPU 
 machine.
 I propose to do my own algorithm application, 
 
 - as default, how the edges data is partitioned? to each CPU? or to each 
 process?
 
 - if later I specify partition strategy in partitionBy(), e.g. 
 PartitionStrategy.EdgePartition2D
 what will happen? it will work?
 
 
 Thanks in advance! :)
 
 Best, 
 Yifan LI
 Univ. Paris-Sud/ Inria, Paris, France
 



GraphX: how to specify partition strategy?

2014-07-10 Thread Yifan LI
Hi,

I am doing graph computation using GraphX, but it seems to be an error on graph 
partition strategy specification.
as in GraphX programming guide:
The Graph.partitionBy operator allows users to choose the graph partitioning 
strategy, but due to SPARK-1931, this method is broken in Spark 1.0.0. We 
encourage users to build the latest version of Spark from the master branch, 
which contains a fix. Alternatively, a workaround is to partition the edges 
before constructing the graph, as follows:

My questions:

- how to build the latest version of Spark from the master branch, which 
contains a fix?

- how to specify other partition strategy, eg. CanonicalRandomVertexCut, 
EdgePartition1D, EdgePartition2D, RandomVertexCut
(listed in Scala API document, but seems only EdgePartition2D is available? I 
am not sure for this! )

- Is it possible to add my own partition strategy(hash function, etc.) into 
GraphX?

Thanks in advance! :))

Best,
Yifan

which Spark package(wrt. graphX) I should install to do graph computation on cluster?

2014-07-07 Thread Yifan LI
Hi,

I am planning to do graph(social network) computation on a cluster(hadoop has 
been installed), but it seems there are a Pre-built package for hadoop which 
I am NOT sure if the graphX has been included in.

or, should I install other released version(obviously the graphX has been 
included)?

Looking for your reply! :)

Best,
Yifan