Re: Memory-efficient successive calls to repartition()

2015-09-08 Thread Aurélien Bellet
int i
  data2=data.repartition(50).cache()
  if (i+1) % 10 == 0:
  data2.checkpoint()
  data2.first() # materialize rdd
  data.unpersist() # unpersist previous version
  data=data2

 The data is checkpointed every 10 iterations to a directory
that I
 specified. While this seems to improve things a little bit,
there is
 still a lot of writing on disk (appcache directory, shown
as "non
 HDFS files" in Cloudera Manager) *besides* the checkpoint
files
 (which are regular HDFS files), and the application
eventually runs
 out of disk space. The same is true even if I checkpoint at
every
 iteration.

 What am I doing wrong? Maybe some garbage collector setting?

 Thanks a lot for the help,

 Aurelien

 Le 24/08/2015 10:39, alexis GILLAIN a écrit :

 Hi Aurelien,

 The first code should create a new RDD in memory at
each iteration
 (check the webui).
 The second code will unpersist the RDD but that's not
the main
 problem.

 I think you have trouble due to long lineage as
.cache() keep
 track of
 lineage for recovery.
 You should have a look at checkpointing :

https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/6-CacheAndCheckpoint.md


https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointer.scala


 You can also have a look at the code of others iterative
 algorithms in
 mlllib for best practices.

 2015-08-20 17:26 GMT+08:00 abellet
 <aurelien.bel...@telecom-paristech.fr
<mailto:aurelien.bel...@telecom-paristech.fr>
 <mailto:aurelien.bel...@telecom-paristech.fr
<mailto:aurelien.bel...@telecom-paristech.fr>>
 <mailto:aurelien.bel...@telecom-paristech.fr
<mailto:aurelien.bel...@telecom-paristech.fr>

 <mailto:aurelien.bel...@telecom-paristech.fr
<mailto:aurelien.bel...@telecom-paristech.fr>>>>:

  Hello,

  For the need of my application, I need to
periodically
 "shuffle" the
  data
  across nodes/partitions of a reasonably-large
dataset. This
 is an
  expensive
  operation but I only need to do it every now and
then.
 However it
  seems that
  I am doing something wrong because as the
iterations go the
 memory usage
  increases, causing the job to spill onto HDFS,
which
 eventually gets
  full. I
  am also getting some "Lost executor" errors that I
don't
 get if I don't
  repartition.

  Here's a basic piece of code which reproduces the
problem:

  data =

sc.textFile("ImageNet_gist_train.txt",50).map(parseLine).cache()
  data.count()
  for i in range(1000):
   data=data.repartition(50).persist()
   # below several operations are done on
data


  What am I doing wrong? I tried the following but
it doesn't
 solve
  the issue:

  for i in range(1000):
   data2=data.repartition(50).persist()
   data2.count() # materialize rdd
   data.unpersist() # unpersist previous
version
   data=data2


  Help and suggestions on this would be greatly
appreciated!
 Thanks a lot!




      --
      View this message in context:

http://apache-spark-user-list.1001560.n3.nabble.com/Memory-efficient-successive-calls-to-repartition-tp24358.html

  Sent from the Apache Spark User List mailing list
archive
 at Nabble.com.




-
  To unsubscribe, e-mail:
user-unsubscr...@spark.apache.org
<mailto:user-unsubscr...@spark.apache.org>
 <mailto:user-unsubscr...@spark.apache.org
<mailto:user-unsubscr...@spark.apache.org>>
 

Re: Memory-efficient successive calls to repartition()

2015-09-08 Thread Aurélien Bellet
  there is
 still a lot of writing on disk (appcache directory, shown
as "non
 HDFS files" in Cloudera Manager) *besides* the checkpoint files
 (which are regular HDFS files), and the application
eventually runs
 out of disk space. The same is true even if I checkpoint at
every
 iteration.

 What am I doing wrong? Maybe some garbage collector setting?

 Thanks a lot for the help,

 Aurelien

 Le 24/08/2015 10:39, alexis GILLAIN a écrit :

 Hi Aurelien,

 The first code should create a new RDD in memory at
each iteration
 (check the webui).
 The second code will unpersist the RDD but that's not
the main
 problem.

 I think you have trouble due to long lineage as
.cache() keep
 track of
 lineage for recovery.
 You should have a look at checkpointing :

https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/6-CacheAndCheckpoint.md

https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointer.scala

 You can also have a look at the code of others iterative
 algorithms in
 mlllib for best practices.

 2015-08-20 17:26 GMT+08:00 abellet
 <aurelien.bel...@telecom-paristech.fr
<mailto:aurelien.bel...@telecom-paristech.fr>
 <mailto:aurelien.bel...@telecom-paristech.fr
<mailto:aurelien.bel...@telecom-paristech.fr>>
 <mailto:aurelien.bel...@telecom-paristech.fr
<mailto:aurelien.bel...@telecom-paristech.fr>

 <mailto:aurelien.bel...@telecom-paristech.fr
<mailto:aurelien.bel...@telecom-paristech.fr>>>>:

  Hello,

  For the need of my application, I need to periodically
 "shuffle" the
  data
  across nodes/partitions of a reasonably-large
dataset. This
 is an
  expensive
  operation but I only need to do it every now and then.
 However it
  seems that
  I am doing something wrong because as the
iterations go the
 memory usage
  increases, causing the job to spill onto HDFS, which
 eventually gets
  full. I
  am also getting some "Lost executor" errors that I
don't
 get if I don't
  repartition.

  Here's a basic piece of code which reproduces the
problem:

  data =

sc.textFile("ImageNet_gist_train.txt",50).map(parseLine).cache()
  data.count()
  for i in range(1000):
   data=data.repartition(50).persist()
   # below several operations are done on data


  What am I doing wrong? I tried the following but
it doesn't
 solve
  the issue:

  for i in range(1000):
   data2=data.repartition(50).persist()
   data2.count() # materialize rdd
   data.unpersist() # unpersist previous version
   data=data2


  Help and suggestions on this would be greatly
appreciated!
 Thanks a lot!




      --
      View this message in context:

http://apache-spark-user-list.1001560.n3.nabble.com/Memory-efficient-successive-calls-to-repartition-tp24358.html
  Sent from the Apache Spark User List mailing list
archive
 at Nabble.com.



-
  To unsubscribe, e-mail:
user-unsubscr...@spark.apache.org
<mailto:user-unsubscr...@spark.apache.org>
 <mailto:user-unsubscr...@spark.apache.org
<mailto:user-unsubscr...@spark.apache.org>>
  <mailto:user-unsubscr...@spark.apache.org
<mailto:user-unsubscr...@spark.apache.org>
 <mailto:user-unsubscr...@spark.apache.org
<mailto:user-unsubscr...@spark.apache.org>>>
  For additional commands, e-mail:
user-h...@spark.apache.org <mailto:user-h...@spark.apache.org>
 <mailto:user-h...@spark.apa

Re: Memory-efficient successive calls to repartition()

2015-09-02 Thread alexis GILLAIN
ould create a new RDD in memory at each iteration
>> (check the webui).
>> The second code will unpersist the RDD but that's not the main
>> problem.
>>
>> I think you have trouble due to long lineage as .cache() keep
>> track of
>> lineage for recovery.
>> You should have a look at checkpointing :
>>
>> https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/6-CacheAndCheckpoint.md
>>
>> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointer.scala
>>
>> You can also have a look at the code of others iterative
>> algorithms in
>> mlllib for best practices.
>>
>> 2015-08-20 17:26 GMT+08:00 abellet
>> <aurelien.bel...@telecom-paristech.fr
>> <mailto:aurelien.bel...@telecom-paristech.fr>
>> <mailto:aurelien.bel...@telecom-paristech.fr
>>
>> <mailto:aurelien.bel...@telecom-paristech.fr>>>:
>>
>>  Hello,
>>
>>  For the need of my application, I need to periodically
>> "shuffle" the
>>  data
>>  across nodes/partitions of a reasonably-large dataset. This
>> is an
>>  expensive
>>  operation but I only need to do it every now and then.
>> However it
>>  seems that
>>  I am doing something wrong because as the iterations go the
>> memory usage
>>  increases, causing the job to spill onto HDFS, which
>> eventually gets
>>  full. I
>>  am also getting some "Lost executor" errors that I don't
>> get if I don't
>>  repartition.
>>
>>  Here's a basic piece of code which reproduces the problem:
>>
>>  data =
>> sc.textFile("ImageNet_gist_train.txt",50).map(parseLine).cache()
>>  data.count()
>>  for i in range(1000):
>>   data=data.repartition(50).persist()
>>   # below several operations are done on data
>>
>>
>>  What am I doing wrong? I tried the following but it doesn't
>> solve
>>  the issue:
>>
>>  for i in range(1000):
>>   data2=data.repartition(50).persist()
>>   data2.count() # materialize rdd
>>   data.unpersist() # unpersist previous version
>>   data=data2
>>
>>
>>  Help and suggestions on this would be greatly appreciated!
>> Thanks a lot!
>>
>>
>>
>>
>>  --
>>  View this message in context:
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Memory-efficient-successive-calls-to-repartition-tp24358.html
>>  Sent from the Apache Spark User List mailing list archive
>> at Nabble.com.
>>
>>
>>
>> -
>>  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> <mailto:user-unsubscr...@spark.apache.org>
>>  <mailto:user-unsubscr...@spark.apache.org
>> <mailto:user-unsubscr...@spark.apache.org>>
>>  For additional commands, e-mail: user-h...@spark.apache.org
>> <mailto:user-h...@spark.apache.org>
>>  <mailto:user-h...@spark.apache.org
>> <mailto:user-h...@spark.apache.org>>
>>
>>
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> <mailto:user-unsubscr...@spark.apache.org>
>> For additional commands, e-mail: user-h...@spark.apache.org
>> <mailto:user-h...@spark.apache.org>
>>
>>
>>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Memory-efficient successive calls to repartition()

2015-09-02 Thread shahid ashraf
, alexis GILLAIN a écrit :
>>>
>>> Hi Aurelien,
>>>
>>> The first code should create a new RDD in memory at each
>>> iteration
>>> (check the webui).
>>> The second code will unpersist the RDD but that's not the main
>>> problem.
>>>
>>> I think you have trouble due to long lineage as .cache() keep
>>> track of
>>> lineage for recovery.
>>> You should have a look at checkpointing :
>>>
>>> https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/6-CacheAndCheckpoint.md
>>>
>>> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointer.scala
>>>
>>> You can also have a look at the code of others iterative
>>> algorithms in
>>> mlllib for best practices.
>>>
>>> 2015-08-20 17:26 GMT+08:00 abellet
>>> <aurelien.bel...@telecom-paristech.fr
>>> <mailto:aurelien.bel...@telecom-paristech.fr>
>>> <mailto:aurelien.bel...@telecom-paristech.fr
>>>
>>> <mailto:aurelien.bel...@telecom-paristech.fr>>>:
>>>
>>>  Hello,
>>>
>>>  For the need of my application, I need to periodically
>>> "shuffle" the
>>>  data
>>>  across nodes/partitions of a reasonably-large dataset. This
>>> is an
>>>  expensive
>>>  operation but I only need to do it every now and then.
>>> However it
>>>  seems that
>>>  I am doing something wrong because as the iterations go the
>>> memory usage
>>>  increases, causing the job to spill onto HDFS, which
>>> eventually gets
>>>  full. I
>>>  am also getting some "Lost executor" errors that I don't
>>> get if I don't
>>>  repartition.
>>>
>>>  Here's a basic piece of code which reproduces the problem:
>>>
>>>  data =
>>> sc.textFile("ImageNet_gist_train.txt",50).map(parseLine).cache()
>>>  data.count()
>>>  for i in range(1000):
>>>   data=data.repartition(50).persist()
>>>   # below several operations are done on data
>>>
>>>
>>>  What am I doing wrong? I tried the following but it doesn't
>>> solve
>>>  the issue:
>>>
>>>  for i in range(1000):
>>>   data2=data.repartition(50).persist()
>>>   data2.count() # materialize rdd
>>>   data.unpersist() # unpersist previous version
>>>   data=data2
>>>
>>>
>>>  Help and suggestions on this would be greatly appreciated!
>>> Thanks a lot!
>>>
>>>
>>>
>>>
>>>  --
>>>  View this message in context:
>>>
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Memory-efficient-successive-calls-to-repartition-tp24358.html
>>>  Sent from the Apache Spark User List mailing list archive
>>> at Nabble.com.
>>>
>>>
>>>
>>> -
>>>  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> <mailto:user-unsubscr...@spark.apache.org>
>>>  <mailto:user-unsubscr...@spark.apache.org
>>> <mailto:user-unsubscr...@spark.apache.org>>
>>>  For additional commands, e-mail: user-h...@spark.apache.org
>>> <mailto:user-h...@spark.apache.org>
>>>  <mailto:user-h...@spark.apache.org
>>> <mailto:user-h...@spark.apache.org>>
>>>
>>>
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> <mailto:user-unsubscr...@spark.apache.org>
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>> <mailto:user-h...@spark.apache.org>
>>>
>>>
>>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


-- 
with Regards
Shahid Ashraf


Re: Memory-efficient successive calls to repartition()

2015-09-02 Thread alexis GILLAIN
Aurélien,

>From what you're saying, I can think of a couple of things considering I
don't know what you are doing in the rest of the code :

- There is lot of non hdfs writes, it comes from the rest of your code
and/or repartittion(). Repartition involve a shuffling and creation of
files on disk. I would have said that the problem come from that but I just
checked and checkpoint() is supposed to delete shuffle files :
https://forums.databricks.com/questions/277/how-do-i-avoid-the-no-space-left-on-device-error.html
(looks exactly as your problem so you could maybe try the others
workarounds)
Still, you may do a lot of shuffle in the rest of the code (you should see
the amount of shuffle files written in the webui) and consider increasing
the disk space available...if you can do that.

- On the hdfs side, the class I pointed to has an update function
which "automatically
handles persisting and (optionally) checkpointing, as well as unpersisting
and removing checkpoint files". Not sure your method for checkpointing
remove previous checkpoint file.

In the end, does the disk space error come from hdfs growing or local disk
growing ?

You should check the webui to identify which tasks spill data on disk and
verify if the shuffle files are properly deleted when you checkpoint your
rdd.


Regards,


2015-09-01 22:48 GMT+08:00 Aurélien Bellet <
aurelien.bel...@telecom-paristech.fr>:

> Dear Alexis,
>
> Thanks again for your reply. After reading about checkpointing I have
> modified my sample code as follows:
>
> for i in range(1000):
> print i
> data2=data.repartition(50).cache()
> if (i+1) % 10 == 0:
> data2.checkpoint()
> data2.first() # materialize rdd
> data.unpersist() # unpersist previous version
> data=data2
>
> The data is checkpointed every 10 iterations to a directory that I
> specified. While this seems to improve things a little bit, there is still
> a lot of writing on disk (appcache directory, shown as "non HDFS files" in
> Cloudera Manager) *besides* the checkpoint files (which are regular HDFS
> files), and the application eventually runs out of disk space. The same is
> true even if I checkpoint at every iteration.
>
> What am I doing wrong? Maybe some garbage collector setting?
>
> Thanks a lot for the help,
>
> Aurelien
>
> Le 24/08/2015 10:39, alexis GILLAIN a écrit :
>
>> Hi Aurelien,
>>
>> The first code should create a new RDD in memory at each iteration
>> (check the webui).
>> The second code will unpersist the RDD but that's not the main problem.
>>
>> I think you have trouble due to long lineage as .cache() keep track of
>> lineage for recovery.
>> You should have a look at checkpointing :
>>
>> https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/6-CacheAndCheckpoint.md
>>
>> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointer.scala
>>
>> You can also have a look at the code of others iterative algorithms in
>> mlllib for best practices.
>>
>> 2015-08-20 17:26 GMT+08:00 abellet <aurelien.bel...@telecom-paristech.fr
>> <mailto:aurelien.bel...@telecom-paristech.fr>>:
>>
>> Hello,
>>
>> For the need of my application, I need to periodically "shuffle" the
>> data
>> across nodes/partitions of a reasonably-large dataset. This is an
>> expensive
>> operation but I only need to do it every now and then. However it
>> seems that
>> I am doing something wrong because as the iterations go the memory
>> usage
>> increases, causing the job to spill onto HDFS, which eventually gets
>> full. I
>> am also getting some "Lost executor" errors that I don't get if I
>> don't
>> repartition.
>>
>> Here's a basic piece of code which reproduces the problem:
>>
>> data =
>> sc.textFile("ImageNet_gist_train.txt",50).map(parseLine).cache()
>> data.count()
>> for i in range(1000):
>>  data=data.repartition(50).persist()
>>  # below several operations are done on data
>>
>>
>> What am I doing wrong? I tried the following but it doesn't solve
>> the issue:
>>
>>     for i in range(1000):
>>  data2=data.repartition(50).persist()
>>  data2.count() # materialize rdd
>>  data.unpersist() # unpersist previous version
>>  data=data2
>>
>>
>> Help and suggestions on this would be greatly appreciated! Thanks a
>> lot!
>>
>>
>>
>>
>

Re: Memory-efficient successive calls to repartition()

2015-09-02 Thread Aurélien Bellet
 across nodes/partitions of a reasonably-large dataset. This
is an
 expensive
 operation but I only need to do it every now and then.
However it
 seems that
 I am doing something wrong because as the iterations go the
memory usage
 increases, causing the job to spill onto HDFS, which
eventually gets
 full. I
 am also getting some "Lost executor" errors that I don't
get if I don't
 repartition.

 Here's a basic piece of code which reproduces the problem:

 data =
sc.textFile("ImageNet_gist_train.txt",50).map(parseLine).cache()
 data.count()
 for i in range(1000):
  data=data.repartition(50).persist()
  # below several operations are done on data


 What am I doing wrong? I tried the following but it doesn't
solve
 the issue:

 for i in range(1000):
  data2=data.repartition(50).persist()
  data2.count() # materialize rdd
  data.unpersist() # unpersist previous version
  data=data2


 Help and suggestions on this would be greatly appreciated!
Thanks a lot!




 --
 View this message in context:
    
http://apache-spark-user-list.1001560.n3.nabble.com/Memory-efficient-successive-calls-to-repartition-tp24358.html
 Sent from the Apache Spark User List mailing list archive
at Nabble.com.


-
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
<mailto:user-unsubscr...@spark.apache.org>
 <mailto:user-unsubscr...@spark.apache.org
<mailto:user-unsubscr...@spark.apache.org>>
 For additional commands, e-mail: user-h...@spark.apache.org
<mailto:user-h...@spark.apache.org>
 <mailto:user-h...@spark.apache.org
<mailto:user-h...@spark.apache.org>>



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
<mailto:user-unsubscr...@spark.apache.org>
For additional commands, e-mail: user-h...@spark.apache.org
<mailto:user-h...@spark.apache.org>




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Memory-efficient successive calls to repartition()

2015-09-01 Thread Aurélien Bellet

Dear Alexis,

Thanks again for your reply. After reading about checkpointing I have 
modified my sample code as follows:


for i in range(1000):
print i
data2=data.repartition(50).cache()
if (i+1) % 10 == 0:
data2.checkpoint()
data2.first() # materialize rdd
data.unpersist() # unpersist previous version
data=data2

The data is checkpointed every 10 iterations to a directory that I 
specified. While this seems to improve things a little bit, there is 
still a lot of writing on disk (appcache directory, shown as "non HDFS 
files" in Cloudera Manager) *besides* the checkpoint files (which are 
regular HDFS files), and the application eventually runs out of disk 
space. The same is true even if I checkpoint at every iteration.


What am I doing wrong? Maybe some garbage collector setting?

Thanks a lot for the help,

Aurelien

Le 24/08/2015 10:39, alexis GILLAIN a écrit :

Hi Aurelien,

The first code should create a new RDD in memory at each iteration
(check the webui).
The second code will unpersist the RDD but that's not the main problem.

I think you have trouble due to long lineage as .cache() keep track of
lineage for recovery.
You should have a look at checkpointing :
https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/6-CacheAndCheckpoint.md
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointer.scala

You can also have a look at the code of others iterative algorithms in
mlllib for best practices.

2015-08-20 17:26 GMT+08:00 abellet <aurelien.bel...@telecom-paristech.fr
<mailto:aurelien.bel...@telecom-paristech.fr>>:

Hello,

For the need of my application, I need to periodically "shuffle" the
data
across nodes/partitions of a reasonably-large dataset. This is an
expensive
operation but I only need to do it every now and then. However it
seems that
I am doing something wrong because as the iterations go the memory usage
increases, causing the job to spill onto HDFS, which eventually gets
full. I
am also getting some "Lost executor" errors that I don't get if I don't
repartition.

Here's a basic piece of code which reproduces the problem:

data = sc.textFile("ImageNet_gist_train.txt",50).map(parseLine).cache()
data.count()
for i in range(1000):
 data=data.repartition(50).persist()
 # below several operations are done on data


What am I doing wrong? I tried the following but it doesn't solve
the issue:

for i in range(1000):
 data2=data.repartition(50).persist()
 data2.count() # materialize rdd
 data.unpersist() # unpersist previous version
 data=data2


Help and suggestions on this would be greatly appreciated! Thanks a lot!




--
View this message in context:

http://apache-spark-user-list.1001560.n3.nabble.com/Memory-efficient-successive-calls-to-repartition-tp24358.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
<mailto:user-unsubscr...@spark.apache.org>
For additional commands, e-mail: user-h...@spark.apache.org
<mailto:user-h...@spark.apache.org>




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Memory-efficient successive calls to repartition()

2015-08-24 Thread Alexis Gillain
Hi Aurelien,

The first code should create a new RDD in memory at each iteration (check
the webui).
The second code will unpersist the RDD but that's not the main problem.

I think you have trouble due to long lineage as .cache() keep track of
lineage for recovery.
You should have a look at checkpointing :
https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/6-CacheAndCheckpoint.md
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointer.scala

You can also have a look at the code of others iterative algorithms in
mlllib for best practices.


2015-08-20 17:26 GMT+08:00 abellet aurelien.bel...@telecom-paristech.fr:

 Hello,

 For the need of my application, I need to periodically shuffle the data
 across nodes/partitions of a reasonably-large dataset. This is an expensive
 operation but I only need to do it every now and then. However it seems
 that
 I am doing something wrong because as the iterations go the memory usage
 increases, causing the job to spill onto HDFS, which eventually gets full.
 I
 am also getting some Lost executor errors that I don't get if I don't
 repartition.

 Here's a basic piece of code which reproduces the problem:

 data = sc.textFile(ImageNet_gist_train.txt,50).map(parseLine).cache()
 data.count()
 for i in range(1000):
 data=data.repartition(50).persist()
 # below several operations are done on data


 What am I doing wrong? I tried the following but it doesn't solve the
 issue:

 for i in range(1000):
 data2=data.repartition(50).persist()
 data2.count() # materialize rdd
 data.unpersist() # unpersist previous version
 data=data2


 Help and suggestions on this would be greatly appreciated! Thanks a lot!




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Memory-efficient-successive-calls-to-repartition-tp24358.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Alexis GILLAIN


Re: Memory-efficient successive calls to repartition()

2015-08-24 Thread alexis GILLAIN
Hi Aurelien,

The first code should create a new RDD in memory at each iteration (check
the webui).
The second code will unpersist the RDD but that's not the main problem.

I think you have trouble due to long lineage as .cache() keep track of
lineage for recovery.
You should have a look at checkpointing :
https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/6-CacheAndCheckpoint.md
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointer.scala

You can also have a look at the code of others iterative algorithms in
mlllib for best practices.

2015-08-20 17:26 GMT+08:00 abellet aurelien.bel...@telecom-paristech.fr:

 Hello,

 For the need of my application, I need to periodically shuffle the data
 across nodes/partitions of a reasonably-large dataset. This is an expensive
 operation but I only need to do it every now and then. However it seems
 that
 I am doing something wrong because as the iterations go the memory usage
 increases, causing the job to spill onto HDFS, which eventually gets full.
 I
 am also getting some Lost executor errors that I don't get if I don't
 repartition.

 Here's a basic piece of code which reproduces the problem:

 data = sc.textFile(ImageNet_gist_train.txt,50).map(parseLine).cache()
 data.count()
 for i in range(1000):
 data=data.repartition(50).persist()
 # below several operations are done on data


 What am I doing wrong? I tried the following but it doesn't solve the
 issue:

 for i in range(1000):
 data2=data.repartition(50).persist()
 data2.count() # materialize rdd
 data.unpersist() # unpersist previous version
 data=data2


 Help and suggestions on this would be greatly appreciated! Thanks a lot!




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Memory-efficient-successive-calls-to-repartition-tp24358.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org