Re: flatMap output on disk / flatMap memory overhead

2015-08-01 Thread Puneet Kapoor
Hi Ocatavian,

Just out of curiosity, did you try persisting your RDD in serialized format
"MEMORY_AND_DISK_SER"  or "MEMORY_ONLY_SER" ??
i.e. changing your :
"rdd.persist(MEMORY_AND_DISK)"
to
"rdd.persist(MEMORY_ONLY_SER)"

Regards

On Wed, Jun 10, 2015 at 7:27 AM, Imran Rashid  wrote:

> I agree with Richard.  It looks like the issue here is shuffling, and
> shuffle data is always written to disk, so the issue is definitely not that
> all the output of flatMap has to be stored in memory.
>
> If at all possible, I'd first suggest upgrading to a new version of spark
> -- even in 1.2, there were big improvements to shuffle with sort based
> shuffle as the default.
>
> On Tue, Jun 2, 2015 at 1:09 PM, Richard Marscher  > wrote:
>
>> Are you sure it's memory related? What is the disk utilization and IO
>> performance on the workers? The error you posted looks to be related to
>> shuffle trying to obtain block data from another worker node and failing to
>> do so in reasonable amount of time. It may still be memory related, but I'm
>> not sure that other resources are ruled out yet.
>>
>> On Tue, Jun 2, 2015 at 5:10 AM, octavian.ganea <
>> octavian.ga...@inf.ethz.ch> wrote:
>>
>>> I was tried using reduceByKey, without success.
>>>
>>> I also tried this: rdd.persist(MEMORY_AND_DISK).flatMap(...).reduceByKey
>>> .
>>> However, I got the same error as before, namely the error described here:
>>>
>>> http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-td23098.html
>>>
>>> My task is to count the frequencies of pairs of words that occur in a
>>> set of
>>> documents at least 5 times. I know that this final output is sparse and
>>> should comfortably fit in memory. However, the intermediate pairs that
>>> are
>>> spilled by flatMap might need to be stored on the disk, but I don't
>>> understand why the persist option does not work and my job fails.
>>>
>>> My code:
>>>
>>> rdd.persist(StorageLevel.MEMORY_AND_DISK)
>>>  .flatMap(x => outputPairsOfWords(x)) // outputs pairs of type
>>> ((word1,word2) , 1)
>>> .reduceByKey((a,b) => (a + b).toShort)
>>> .filter({case((x,y),count) => count >= 5})
>>>
>>>
>>> My cluster has 8 nodes, each with 129 GB of RAM and 16 cores per node.
>>> One
>>> node I keep for the master, 7 nodes for the workers.
>>>
>>> my conf:
>>>
>>> conf.set("spark.cores.max", "128")
>>> conf.set("spark.akka.frameSize", "1024")
>>> conf.set("spark.executor.memory", "115g")
>>> conf.set("spark.shuffle.file.buffer.kb", "1000")
>>>
>>> my spark-env.sh:
>>>  ulimit -n 20
>>>  SPARK_JAVA_OPTS="-Xss1g -Xmx129g -d64 -XX:-UseGCOverheadLimit
>>> -XX:-UseCompressedOops"
>>>  SPARK_DRIVER_MEMORY=129G
>>>
>>> spark version: 1.1.1
>>>
>>> Thank you a lot for your help!
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-tp23098p23108.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Re: flatMap output on disk / flatMap memory overhead

2015-06-09 Thread Imran Rashid
I agree with Richard.  It looks like the issue here is shuffling, and
shuffle data is always written to disk, so the issue is definitely not that
all the output of flatMap has to be stored in memory.

If at all possible, I'd first suggest upgrading to a new version of spark
-- even in 1.2, there were big improvements to shuffle with sort based
shuffle as the default.

On Tue, Jun 2, 2015 at 1:09 PM, Richard Marscher 
wrote:

> Are you sure it's memory related? What is the disk utilization and IO
> performance on the workers? The error you posted looks to be related to
> shuffle trying to obtain block data from another worker node and failing to
> do so in reasonable amount of time. It may still be memory related, but I'm
> not sure that other resources are ruled out yet.
>
> On Tue, Jun 2, 2015 at 5:10 AM, octavian.ganea  > wrote:
>
>> I was tried using reduceByKey, without success.
>>
>> I also tried this: rdd.persist(MEMORY_AND_DISK).flatMap(...).reduceByKey .
>> However, I got the same error as before, namely the error described here:
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-td23098.html
>>
>> My task is to count the frequencies of pairs of words that occur in a set
>> of
>> documents at least 5 times. I know that this final output is sparse and
>> should comfortably fit in memory. However, the intermediate pairs that are
>> spilled by flatMap might need to be stored on the disk, but I don't
>> understand why the persist option does not work and my job fails.
>>
>> My code:
>>
>> rdd.persist(StorageLevel.MEMORY_AND_DISK)
>>  .flatMap(x => outputPairsOfWords(x)) // outputs pairs of type
>> ((word1,word2) , 1)
>> .reduceByKey((a,b) => (a + b).toShort)
>> .filter({case((x,y),count) => count >= 5})
>>
>>
>> My cluster has 8 nodes, each with 129 GB of RAM and 16 cores per node. One
>> node I keep for the master, 7 nodes for the workers.
>>
>> my conf:
>>
>> conf.set("spark.cores.max", "128")
>> conf.set("spark.akka.frameSize", "1024")
>> conf.set("spark.executor.memory", "115g")
>> conf.set("spark.shuffle.file.buffer.kb", "1000")
>>
>> my spark-env.sh:
>>  ulimit -n 20
>>  SPARK_JAVA_OPTS="-Xss1g -Xmx129g -d64 -XX:-UseGCOverheadLimit
>> -XX:-UseCompressedOops"
>>  SPARK_DRIVER_MEMORY=129G
>>
>> spark version: 1.1.1
>>
>> Thank you a lot for your help!
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-tp23098p23108.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: flatMap output on disk / flatMap memory overhead

2015-06-02 Thread Richard Marscher
Are you sure it's memory related? What is the disk utilization and IO
performance on the workers? The error you posted looks to be related to
shuffle trying to obtain block data from another worker node and failing to
do so in reasonable amount of time. It may still be memory related, but I'm
not sure that other resources are ruled out yet.

On Tue, Jun 2, 2015 at 5:10 AM, octavian.ganea 
wrote:

> I was tried using reduceByKey, without success.
>
> I also tried this: rdd.persist(MEMORY_AND_DISK).flatMap(...).reduceByKey .
> However, I got the same error as before, namely the error described here:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-td23098.html
>
> My task is to count the frequencies of pairs of words that occur in a set
> of
> documents at least 5 times. I know that this final output is sparse and
> should comfortably fit in memory. However, the intermediate pairs that are
> spilled by flatMap might need to be stored on the disk, but I don't
> understand why the persist option does not work and my job fails.
>
> My code:
>
> rdd.persist(StorageLevel.MEMORY_AND_DISK)
>  .flatMap(x => outputPairsOfWords(x)) // outputs pairs of type
> ((word1,word2) , 1)
> .reduceByKey((a,b) => (a + b).toShort)
> .filter({case((x,y),count) => count >= 5})
>
>
> My cluster has 8 nodes, each with 129 GB of RAM and 16 cores per node. One
> node I keep for the master, 7 nodes for the workers.
>
> my conf:
>
> conf.set("spark.cores.max", "128")
> conf.set("spark.akka.frameSize", "1024")
> conf.set("spark.executor.memory", "115g")
> conf.set("spark.shuffle.file.buffer.kb", "1000")
>
> my spark-env.sh:
>  ulimit -n 20
>  SPARK_JAVA_OPTS="-Xss1g -Xmx129g -d64 -XX:-UseGCOverheadLimit
> -XX:-UseCompressedOops"
>  SPARK_DRIVER_MEMORY=129G
>
> spark version: 1.1.1
>
> Thank you a lot for your help!
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-tp23098p23108.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: flatMap output on disk / flatMap memory overhead

2015-06-02 Thread octavian.ganea
I was tried using reduceByKey, without success. 

I also tried this: rdd.persist(MEMORY_AND_DISK).flatMap(...).reduceByKey .
However, I got the same error as before, namely the error described here: 
http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-td23098.html

My task is to count the frequencies of pairs of words that occur in a set of
documents at least 5 times. I know that this final output is sparse and
should comfortably fit in memory. However, the intermediate pairs that are
spilled by flatMap might need to be stored on the disk, but I don't
understand why the persist option does not work and my job fails.

My code:

rdd.persist(StorageLevel.MEMORY_AND_DISK)
 .flatMap(x => outputPairsOfWords(x)) // outputs pairs of type
((word1,word2) , 1)
.reduceByKey((a,b) => (a + b).toShort)
.filter({case((x,y),count) => count >= 5})
 

My cluster has 8 nodes, each with 129 GB of RAM and 16 cores per node. One
node I keep for the master, 7 nodes for the workers.

my conf:

conf.set("spark.cores.max", "128")
conf.set("spark.akka.frameSize", "1024")
conf.set("spark.executor.memory", "115g")
conf.set("spark.shuffle.file.buffer.kb", "1000")

my spark-env.sh:
 ulimit -n 20
 SPARK_JAVA_OPTS="-Xss1g -Xmx129g -d64 -XX:-UseGCOverheadLimit
-XX:-UseCompressedOops"
 SPARK_DRIVER_MEMORY=129G

spark version: 1.1.1

Thank you a lot for your help!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-tp23098p23108.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: flatMap output on disk / flatMap memory overhead

2015-06-01 Thread Akhil Das
You could try rdd.persist(MEMORY_AND_DISK/DISK_ONLY).flatMap(...), I think
StorageLevel MEMORY_AND_DISK means spark will try to keep the data in
memory and if there isn't sufficient space then it will be shipped to the
disk.

Thanks
Best Regards

On Mon, Jun 1, 2015 at 11:02 PM, octavian.ganea 
wrote:

> Hi,
>
> Is there any way to force the output RDD of a  flatMap op to be stored in
> both memory and disk as it is computed ? My RAM would not be able to fit
> the
> entire output of flatMap, so it really needs to starts using disk after the
> RAM gets full. I didn't find any way to force this.
>
> Also, what is the memory overhead of flatMap ? From my computations, the
> output RDD should fit in memory, but I get the following error after a
> while
> (and I know it's because of memory issues, since running the program with
> 1/3 of the input data finishes succesfully)
>
> 15/06/01 19:02:49 ERROR BlockFetcherIterator$BasicBlockFetcherIterator:
> Could not get block(s) from
> ConnectionManagerId(dco-node036-mgt.dco.ethz.ch,57478)
> java.io.IOException: sendMessageReliably failed because ack was not
> received
> within 60 sec
> at
>
> org.apache.spark.network.ConnectionManager$$anon$10$$anonfun$run$15.apply(ConnectionManager.scala:866)
> at
>
> org.apache.spark.network.ConnectionManager$$anon$10$$anonfun$run$15.apply(ConnectionManager.scala:865)
> at scala.Option.foreach(Option.scala:236)
> at
>
> org.apache.spark.network.ConnectionManager$$anon$10.run(ConnectionManager.scala:865)
> at
>
> io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:581)
> at
>
> io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:656)
> at
> io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:367)
> at java.lang.Thread.run(Thread.java:745)
>
>
> Also, I've seen also this:
> https://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence
> but my understanding is that one should apply something like:
> rdd.flatMap(...).persist(MEMORY_AND_DISK) which assumes that the entire
> output of flatMap is first stored in memory (which is not possible in my
> case) and, only when it's done, is stored on the disk. Please correct me if
> I'm wrong.  Anways, I've tried using this , but I got the same error.
>
> My config:
>
> conf.set("spark.cores.max", "128")
> conf.set("spark.akka.frameSize", "1024")
> conf.set("spark.executor.memory", "125g")
> conf.set("spark.shuffle.file.buffer.kb", "1000")
> conf.set("spark.shuffle.consolidateFiles", "true")
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-tp23098.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


flatMap output on disk / flatMap memory overhead

2015-06-01 Thread octavian.ganea
Hi,

Is there any way to force the output RDD of a  flatMap op to be stored in
both memory and disk as it is computed ? My RAM would not be able to fit the
entire output of flatMap, so it really needs to starts using disk after the
RAM gets full. I didn't find any way to force this. 

Also, what is the memory overhead of flatMap ? From my computations, the
output RDD should fit in memory, but I get the following error after a while
(and I know it's because of memory issues, since running the program with
1/3 of the input data finishes succesfully)

15/06/01 19:02:49 ERROR BlockFetcherIterator$BasicBlockFetcherIterator:
Could not get block(s) from
ConnectionManagerId(dco-node036-mgt.dco.ethz.ch,57478)
java.io.IOException: sendMessageReliably failed because ack was not received
within 60 sec
at
org.apache.spark.network.ConnectionManager$$anon$10$$anonfun$run$15.apply(ConnectionManager.scala:866)
at
org.apache.spark.network.ConnectionManager$$anon$10$$anonfun$run$15.apply(ConnectionManager.scala:865)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.network.ConnectionManager$$anon$10.run(ConnectionManager.scala:865)
at
io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:581)
at
io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:656)
at io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:367)
at java.lang.Thread.run(Thread.java:745)


Also, I've seen also this:
https://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence
but my understanding is that one should apply something like:
rdd.flatMap(...).persist(MEMORY_AND_DISK) which assumes that the entire
output of flatMap is first stored in memory (which is not possible in my
case) and, only when it's done, is stored on the disk. Please correct me if
I'm wrong.  Anways, I've tried using this , but I got the same error.

My config:

conf.set("spark.cores.max", "128")
conf.set("spark.akka.frameSize", "1024")
conf.set("spark.executor.memory", "125g")
conf.set("spark.shuffle.file.buffer.kb", "1000")
conf.set("spark.shuffle.consolidateFiles", "true")



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-tp23098.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org