Hi,

This is what I tried:

for i in range(1000):
    print i
    data2=data.repartition(50).cache()
    if (i+1) % 10 == 0:
        data2.checkpoint()
    data2.first() # materialize rdd
    data.unpersist() # unpersist previous version
    sc._jvm.System.gc()
    data=data2

But unfortunately I do not get any significant improvement from the call to sc._jvm.System.gc()...

I checked the WebUI and I have a single RDD in memory, so unpersist() works as expected but still no solution to trigger the cleaning of shuffle files...

Aurélien

Le 9/2/15 4:11 PM, alexis GILLAIN a écrit :
Just made some tests on my laptop.

Deletion of the files is not immediate but a System.gc() call makes the
job on shuffle files of a checkpointed RDD.
It should solve your problem (`sc._jvm.System.gc()` in Python as pointed
in the databricks link in my previous message).


2015-09-02 20:55 GMT+08:00 Aurélien Bellet
<aurelien.bel...@telecom-paristech.fr
<mailto:aurelien.bel...@telecom-paristech.fr>>:

    Thanks a lot for the useful link and comments Alexis!

    First of all, the problem occurs without doing anything else in the
    code (except of course loading my data from HDFS at the beginning) -
    so it definitely comes from the shuffling. You're right, in the
    current version, checkpoint files are not removed and take up some
    space in HDFS (this is easy to fix). But this is negligible compared
    to the non hdfs files which keeps growing as iterations go. So I
    agree with you that this must come from the shuffling operations: it
    seems that the shuffle files are not removed along the execution
    (they are only removed if I stop/kill the application), despite the
    use of checkpoint.

    The class you mentioned is very interesting but I did not find a way
    to use it from pyspark. I will try to implement my own version,
    looking at the source code. But besides the queueing and removing of
    checkpoint files, I do not really see anything special there that
    could solve my issue.

    I will continue to investigate this. Just found out I can use a
    command line browser to look at the webui (I cannot access the
    server in graphical display mode), this should help me understand
    what's going on. I will also try the workarounds mentioned in the
    link. Keep you posted.

    Again, thanks a lot!

    Best,

    Aurelien


    Le 02/09/2015 14:15, alexis GILLAIN a écrit :

        Aurélien,

          From what you're saying, I can think of a couple of things
        considering
        I don't know what you are doing in the rest of the code :

        - There is lot of non hdfs writes, it comes from the rest of
        your code
        and/or repartittion(). Repartition involve a shuffling and
        creation of
        files on disk. I would have said that the problem come from that
        but I
        just checked and checkpoint() is supposed to delete shuffle files :
        
https://forums.databricks.com/questions/277/how-do-i-avoid-the-no-space-left-on-device-error.html
        (looks exactly as your problem so you could maybe try the others
        workarounds)
        Still, you may do a lot of shuffle in the rest of the code (you
        should
        see the amount of shuffle files written in the webui) and consider
        increasing the disk space available...if you can do that.

        - On the hdfs side, the class I pointed to has an update
        function which
        "automatically handles persisting and (optionally) checkpointing, as
        well as unpersisting and removing checkpoint files". Not sure your
        method for checkpointing remove previous checkpoint file.

        In the end, does the disk space error come from hdfs growing or
        local
        disk growing ?

        You should check the webui to identify which tasks spill data on
        disk
        and verify if the shuffle files are properly deleted when you
        checkpoint
        your rdd.


        Regards,


        2015-09-01 22:48 GMT+08:00 Aurélien Bellet
        <aurelien.bel...@telecom-paristech.fr
        <mailto:aurelien.bel...@telecom-paristech.fr>
        <mailto:aurelien.bel...@telecom-paristech.fr
        <mailto:aurelien.bel...@telecom-paristech.fr>>>:


             Dear Alexis,

             Thanks again for your reply. After reading about
        checkpointing I
             have modified my sample code as follows:

             for i in range(1000):
                  print i
                  data2=data.repartition(50).cache()
                  if (i+1) % 10 == 0:
                      data2.checkpoint()
                  data2.first() # materialize rdd
                  data.unpersist() # unpersist previous version
                  data=data2

             The data is checkpointed every 10 iterations to a directory
        that I
             specified. While this seems to improve things a little bit,
        there is
             still a lot of writing on disk (appcache directory, shown
        as "non
             HDFS files" in Cloudera Manager) *besides* the checkpoint files
             (which are regular HDFS files), and the application
        eventually runs
             out of disk space. The same is true even if I checkpoint at
        every
             iteration.

             What am I doing wrong? Maybe some garbage collector setting?

             Thanks a lot for the help,

             Aurelien

             Le 24/08/2015 10:39, alexis GILLAIN a écrit :

                 Hi Aurelien,

                 The first code should create a new RDD in memory at
        each iteration
                 (check the webui).
                 The second code will unpersist the RDD but that's not
        the main
                 problem.

                 I think you have trouble due to long lineage as
        .cache() keep
                 track of
                 lineage for recovery.
                 You should have a look at checkpointing :
        
https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/6-CacheAndCheckpoint.md
        
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointer.scala

                 You can also have a look at the code of others iterative
                 algorithms in
                 mlllib for best practices.

                 2015-08-20 17:26 GMT+08:00 abellet
                 <aurelien.bel...@telecom-paristech.fr
        <mailto:aurelien.bel...@telecom-paristech.fr>
                 <mailto:aurelien.bel...@telecom-paristech.fr
        <mailto:aurelien.bel...@telecom-paristech.fr>>
                 <mailto:aurelien.bel...@telecom-paristech.fr
        <mailto:aurelien.bel...@telecom-paristech.fr>

                 <mailto:aurelien.bel...@telecom-paristech.fr
        <mailto:aurelien.bel...@telecom-paristech.fr>>>>:

                      Hello,

                      For the need of my application, I need to periodically
                 "shuffle" the
                      data
                      across nodes/partitions of a reasonably-large
        dataset. This
                 is an
                      expensive
                      operation but I only need to do it every now and then.
                 However it
                      seems that
                      I am doing something wrong because as the
        iterations go the
                 memory usage
                      increases, causing the job to spill onto HDFS, which
                 eventually gets
                      full. I
                      am also getting some "Lost executor" errors that I
        don't
                 get if I don't
                      repartition.

                      Here's a basic piece of code which reproduces the
        problem:

                      data =

        sc.textFile("ImageNet_gist_train.txt",50).map(parseLine).cache()
                      data.count()
                      for i in range(1000):
                               data=data.repartition(50).persist()
                               # below several operations are done on data


                      What am I doing wrong? I tried the following but
        it doesn't
                 solve
                      the issue:

                      for i in range(1000):
                               data2=data.repartition(50).persist()
                               data2.count() # materialize rdd
                               data.unpersist() # unpersist previous version
                               data=data2


                      Help and suggestions on this would be greatly
        appreciated!
                 Thanks a lot!




                      --
                      View this message in context:
        
http://apache-spark-user-list.1001560.n3.nabble.com/Memory-efficient-successive-calls-to-repartition-tp24358.html
                      Sent from the Apache Spark User List mailing list
        archive
                 at Nabble.com.



        ---------------------------------------------------------------------
                      To unsubscribe, e-mail:
        user-unsubscr...@spark.apache.org
        <mailto:user-unsubscr...@spark.apache.org>
                 <mailto:user-unsubscr...@spark.apache.org
        <mailto:user-unsubscr...@spark.apache.org>>
                      <mailto:user-unsubscr...@spark.apache.org
        <mailto:user-unsubscr...@spark.apache.org>
                 <mailto:user-unsubscr...@spark.apache.org
        <mailto:user-unsubscr...@spark.apache.org>>>
                      For additional commands, e-mail:
        user-h...@spark.apache.org <mailto:user-h...@spark.apache.org>
                 <mailto:user-h...@spark.apache.org
        <mailto:user-h...@spark.apache.org>>
                      <mailto:user-h...@spark.apache.org
        <mailto:user-h...@spark.apache.org>
                 <mailto:user-h...@spark.apache.org
        <mailto:user-h...@spark.apache.org>>>




        ---------------------------------------------------------------------
             To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
        <mailto:user-unsubscr...@spark.apache.org>
             <mailto:user-unsubscr...@spark.apache.org
        <mailto:user-unsubscr...@spark.apache.org>>
             For additional commands, e-mail: user-h...@spark.apache.org
        <mailto:user-h...@spark.apache.org>
             <mailto:user-h...@spark.apache.org
        <mailto:user-h...@spark.apache.org>>



    ---------------------------------------------------------------------
    To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
    <mailto:user-unsubscr...@spark.apache.org>
    For additional commands, e-mail: user-h...@spark.apache.org
    <mailto:user-h...@spark.apache.org>



---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to