Dear Alexis,
Thanks again for your reply. After reading about checkpointing I have
modified my sample code as follows:
for i in range(1000):
print i
data2=data.repartition(50).cache()
if (i+1) % 10 == 0:
data2.checkpoint()
data2.first() # materialize rdd
data.unpersist() # unpersist previous version
data=data2
The data is checkpointed every 10 iterations to a directory that I
specified. While this seems to improve things a little bit, there is
still a lot of writing on disk (appcache directory, shown as "non HDFS
files" in Cloudera Manager) *besides* the checkpoint files (which are
regular HDFS files), and the application eventually runs out of disk
space. The same is true even if I checkpoint at every iteration.
What am I doing wrong? Maybe some garbage collector setting?
Thanks a lot for the help,
Aurelien
Le 24/08/2015 10:39, alexis GILLAIN a écrit :
Hi Aurelien,
The first code should create a new RDD in memory at each iteration
(check the webui).
The second code will unpersist the RDD but that's not the main problem.
I think you have trouble due to long lineage as .cache() keep track of
lineage for recovery.
You should have a look at checkpointing :
https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/6-CacheAndCheckpoint.md
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointer.scala
You can also have a look at the code of others iterative algorithms in
mlllib for best practices.
2015-08-20 17:26 GMT+08:00 abellet <aurelien.bel...@telecom-paristech.fr
<mailto:aurelien.bel...@telecom-paristech.fr>>:
Hello,
For the need of my application, I need to periodically "shuffle" the
data
across nodes/partitions of a reasonably-large dataset. This is an
expensive
operation but I only need to do it every now and then. However it
seems that
I am doing something wrong because as the iterations go the memory usage
increases, causing the job to spill onto HDFS, which eventually gets
full. I
am also getting some "Lost executor" errors that I don't get if I don't
repartition.
Here's a basic piece of code which reproduces the problem:
data = sc.textFile("ImageNet_gist_train.txt",50).map(parseLine).cache()
data.count()
for i in range(1000):
data=data.repartition(50).persist()
# below several operations are done on data
What am I doing wrong? I tried the following but it doesn't solve
the issue:
for i in range(1000):
data2=data.repartition(50).persist()
data2.count() # materialize rdd
data.unpersist() # unpersist previous version
data=data2
Help and suggestions on this would be greatly appreciated! Thanks a lot!
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Memory-efficient-successive-calls-to-repartition-tp24358.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
<mailto:user-unsubscr...@spark.apache.org>
For additional commands, e-mail: user-h...@spark.apache.org
<mailto:user-h...@spark.apache.org>
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org