Dear Alexis,

Thanks again for your reply. After reading about checkpointing I have modified my sample code as follows:

for i in range(1000):
    print i
    data2=data.repartition(50).cache()
    if (i+1) % 10 == 0:
        data2.checkpoint()
    data2.first() # materialize rdd
    data.unpersist() # unpersist previous version
    data=data2

The data is checkpointed every 10 iterations to a directory that I specified. While this seems to improve things a little bit, there is still a lot of writing on disk (appcache directory, shown as "non HDFS files" in Cloudera Manager) *besides* the checkpoint files (which are regular HDFS files), and the application eventually runs out of disk space. The same is true even if I checkpoint at every iteration.

What am I doing wrong? Maybe some garbage collector setting?

Thanks a lot for the help,

Aurelien

Le 24/08/2015 10:39, alexis GILLAIN a écrit :
Hi Aurelien,

The first code should create a new RDD in memory at each iteration
(check the webui).
The second code will unpersist the RDD but that's not the main problem.

I think you have trouble due to long lineage as .cache() keep track of
lineage for recovery.
You should have a look at checkpointing :
https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/6-CacheAndCheckpoint.md
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointer.scala

You can also have a look at the code of others iterative algorithms in
mlllib for best practices.

2015-08-20 17:26 GMT+08:00 abellet <aurelien.bel...@telecom-paristech.fr
<mailto:aurelien.bel...@telecom-paristech.fr>>:

    Hello,

    For the need of my application, I need to periodically "shuffle" the
    data
    across nodes/partitions of a reasonably-large dataset. This is an
    expensive
    operation but I only need to do it every now and then. However it
    seems that
    I am doing something wrong because as the iterations go the memory usage
    increases, causing the job to spill onto HDFS, which eventually gets
    full. I
    am also getting some "Lost executor" errors that I don't get if I don't
    repartition.

    Here's a basic piece of code which reproduces the problem:

    data = sc.textFile("ImageNet_gist_train.txt",50).map(parseLine).cache()
    data.count()
    for i in range(1000):
             data=data.repartition(50).persist()
             # below several operations are done on data


    What am I doing wrong? I tried the following but it doesn't solve
    the issue:

    for i in range(1000):
             data2=data.repartition(50).persist()
             data2.count() # materialize rdd
             data.unpersist() # unpersist previous version
             data=data2


    Help and suggestions on this would be greatly appreciated! Thanks a lot!




    --
    View this message in context:
    
http://apache-spark-user-list.1001560.n3.nabble.com/Memory-efficient-successive-calls-to-repartition-tp24358.html
    Sent from the Apache Spark User List mailing list archive at Nabble.com.

    ---------------------------------------------------------------------
    To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
    <mailto:user-unsubscr...@spark.apache.org>
    For additional commands, e-mail: user-h...@spark.apache.org
    <mailto:user-h...@spark.apache.org>



---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to