Just made some tests on my laptop.

Deletion of the files is not immediate but a System.gc() call makes the job
on shuffle files of a checkpointed RDD.
It should solve your problem (`sc._jvm.System.gc()` in Python as pointed in
the databricks link in my previous message).


2015-09-02 20:55 GMT+08:00 Aurélien Bellet <
aurelien.bel...@telecom-paristech.fr>:

> Thanks a lot for the useful link and comments Alexis!
>
> First of all, the problem occurs without doing anything else in the code
> (except of course loading my data from HDFS at the beginning) - so it
> definitely comes from the shuffling. You're right, in the current version,
> checkpoint files are not removed and take up some space in HDFS (this is
> easy to fix). But this is negligible compared to the non hdfs files which
> keeps growing as iterations go. So I agree with you that this must come
> from the shuffling operations: it seems that the shuffle files are not
> removed along the execution (they are only removed if I stop/kill the
> application), despite the use of checkpoint.
>
> The class you mentioned is very interesting but I did not find a way to
> use it from pyspark. I will try to implement my own version, looking at the
> source code. But besides the queueing and removing of checkpoint files, I
> do not really see anything special there that could solve my issue.
>
> I will continue to investigate this. Just found out I can use a command
> line browser to look at the webui (I cannot access the server in graphical
> display mode), this should help me understand what's going on. I will also
> try the workarounds mentioned in the link. Keep you posted.
>
> Again, thanks a lot!
>
> Best,
>
> Aurelien
>
>
> Le 02/09/2015 14:15, alexis GILLAIN a écrit :
>
>> Aurélien,
>>
>>  From what you're saying, I can think of a couple of things considering
>> I don't know what you are doing in the rest of the code :
>>
>> - There is lot of non hdfs writes, it comes from the rest of your code
>> and/or repartittion(). Repartition involve a shuffling and creation of
>> files on disk. I would have said that the problem come from that but I
>> just checked and checkpoint() is supposed to delete shuffle files :
>>
>> https://forums.databricks.com/questions/277/how-do-i-avoid-the-no-space-left-on-device-error.html
>> (looks exactly as your problem so you could maybe try the others
>> workarounds)
>> Still, you may do a lot of shuffle in the rest of the code (you should
>> see the amount of shuffle files written in the webui) and consider
>> increasing the disk space available...if you can do that.
>>
>> - On the hdfs side, the class I pointed to has an update function which
>> "automatically handles persisting and (optionally) checkpointing, as
>> well as unpersisting and removing checkpoint files". Not sure your
>> method for checkpointing remove previous checkpoint file.
>>
>> In the end, does the disk space error come from hdfs growing or local
>> disk growing ?
>>
>> You should check the webui to identify which tasks spill data on disk
>> and verify if the shuffle files are properly deleted when you checkpoint
>> your rdd.
>>
>>
>> Regards,
>>
>>
>> 2015-09-01 22:48 GMT+08:00 Aurélien Bellet
>> <aurelien.bel...@telecom-paristech.fr
>> <mailto:aurelien.bel...@telecom-paristech.fr>>:
>>
>>
>>     Dear Alexis,
>>
>>     Thanks again for your reply. After reading about checkpointing I
>>     have modified my sample code as follows:
>>
>>     for i in range(1000):
>>          print i
>>          data2=data.repartition(50).cache()
>>          if (i+1) % 10 == 0:
>>              data2.checkpoint()
>>          data2.first() # materialize rdd
>>          data.unpersist() # unpersist previous version
>>          data=data2
>>
>>     The data is checkpointed every 10 iterations to a directory that I
>>     specified. While this seems to improve things a little bit, there is
>>     still a lot of writing on disk (appcache directory, shown as "non
>>     HDFS files" in Cloudera Manager) *besides* the checkpoint files
>>     (which are regular HDFS files), and the application eventually runs
>>     out of disk space. The same is true even if I checkpoint at every
>>     iteration.
>>
>>     What am I doing wrong? Maybe some garbage collector setting?
>>
>>     Thanks a lot for the help,
>>
>>     Aurelien
>>
>>     Le 24/08/2015 10:39, alexis GILLAIN a écrit :
>>
>>         Hi Aurelien,
>>
>>         The first code should create a new RDD in memory at each iteration
>>         (check the webui).
>>         The second code will unpersist the RDD but that's not the main
>>         problem.
>>
>>         I think you have trouble due to long lineage as .cache() keep
>>         track of
>>         lineage for recovery.
>>         You should have a look at checkpointing :
>>
>> https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/6-CacheAndCheckpoint.md
>>
>> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointer.scala
>>
>>         You can also have a look at the code of others iterative
>>         algorithms in
>>         mlllib for best practices.
>>
>>         2015-08-20 17:26 GMT+08:00 abellet
>>         <aurelien.bel...@telecom-paristech.fr
>>         <mailto:aurelien.bel...@telecom-paristech.fr>
>>         <mailto:aurelien.bel...@telecom-paristech.fr
>>
>>         <mailto:aurelien.bel...@telecom-paristech.fr>>>:
>>
>>              Hello,
>>
>>              For the need of my application, I need to periodically
>>         "shuffle" the
>>              data
>>              across nodes/partitions of a reasonably-large dataset. This
>>         is an
>>              expensive
>>              operation but I only need to do it every now and then.
>>         However it
>>              seems that
>>              I am doing something wrong because as the iterations go the
>>         memory usage
>>              increases, causing the job to spill onto HDFS, which
>>         eventually gets
>>              full. I
>>              am also getting some "Lost executor" errors that I don't
>>         get if I don't
>>              repartition.
>>
>>              Here's a basic piece of code which reproduces the problem:
>>
>>              data =
>>         sc.textFile("ImageNet_gist_train.txt",50).map(parseLine).cache()
>>              data.count()
>>              for i in range(1000):
>>                       data=data.repartition(50).persist()
>>                       # below several operations are done on data
>>
>>
>>              What am I doing wrong? I tried the following but it doesn't
>>         solve
>>              the issue:
>>
>>              for i in range(1000):
>>                       data2=data.repartition(50).persist()
>>                       data2.count() # materialize rdd
>>                       data.unpersist() # unpersist previous version
>>                       data=data2
>>
>>
>>              Help and suggestions on this would be greatly appreciated!
>>         Thanks a lot!
>>
>>
>>
>>
>>              --
>>              View this message in context:
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Memory-efficient-successive-calls-to-repartition-tp24358.html
>>              Sent from the Apache Spark User List mailing list archive
>>         at Nabble.com.
>>
>>
>>
>> ---------------------------------------------------------------------
>>              To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>         <mailto:user-unsubscr...@spark.apache.org>
>>              <mailto:user-unsubscr...@spark.apache.org
>>         <mailto:user-unsubscr...@spark.apache.org>>
>>              For additional commands, e-mail: user-h...@spark.apache.org
>>         <mailto:user-h...@spark.apache.org>
>>              <mailto:user-h...@spark.apache.org
>>         <mailto:user-h...@spark.apache.org>>
>>
>>
>>
>>     ---------------------------------------------------------------------
>>     To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>     <mailto:user-unsubscr...@spark.apache.org>
>>     For additional commands, e-mail: user-h...@spark.apache.org
>>     <mailto:user-h...@spark.apache.org>
>>
>>
>>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

Reply via email to