Just made some tests on my laptop. Deletion of the files is not immediate but a System.gc() call makes the job on shuffle files of a checkpointed RDD. It should solve your problem (`sc._jvm.System.gc()` in Python as pointed in the databricks link in my previous message).
2015-09-02 20:55 GMT+08:00 Aurélien Bellet < aurelien.bel...@telecom-paristech.fr>: > Thanks a lot for the useful link and comments Alexis! > > First of all, the problem occurs without doing anything else in the code > (except of course loading my data from HDFS at the beginning) - so it > definitely comes from the shuffling. You're right, in the current version, > checkpoint files are not removed and take up some space in HDFS (this is > easy to fix). But this is negligible compared to the non hdfs files which > keeps growing as iterations go. So I agree with you that this must come > from the shuffling operations: it seems that the shuffle files are not > removed along the execution (they are only removed if I stop/kill the > application), despite the use of checkpoint. > > The class you mentioned is very interesting but I did not find a way to > use it from pyspark. I will try to implement my own version, looking at the > source code. But besides the queueing and removing of checkpoint files, I > do not really see anything special there that could solve my issue. > > I will continue to investigate this. Just found out I can use a command > line browser to look at the webui (I cannot access the server in graphical > display mode), this should help me understand what's going on. I will also > try the workarounds mentioned in the link. Keep you posted. > > Again, thanks a lot! > > Best, > > Aurelien > > > Le 02/09/2015 14:15, alexis GILLAIN a écrit : > >> Aurélien, >> >> From what you're saying, I can think of a couple of things considering >> I don't know what you are doing in the rest of the code : >> >> - There is lot of non hdfs writes, it comes from the rest of your code >> and/or repartittion(). Repartition involve a shuffling and creation of >> files on disk. I would have said that the problem come from that but I >> just checked and checkpoint() is supposed to delete shuffle files : >> >> https://forums.databricks.com/questions/277/how-do-i-avoid-the-no-space-left-on-device-error.html >> (looks exactly as your problem so you could maybe try the others >> workarounds) >> Still, you may do a lot of shuffle in the rest of the code (you should >> see the amount of shuffle files written in the webui) and consider >> increasing the disk space available...if you can do that. >> >> - On the hdfs side, the class I pointed to has an update function which >> "automatically handles persisting and (optionally) checkpointing, as >> well as unpersisting and removing checkpoint files". Not sure your >> method for checkpointing remove previous checkpoint file. >> >> In the end, does the disk space error come from hdfs growing or local >> disk growing ? >> >> You should check the webui to identify which tasks spill data on disk >> and verify if the shuffle files are properly deleted when you checkpoint >> your rdd. >> >> >> Regards, >> >> >> 2015-09-01 22:48 GMT+08:00 Aurélien Bellet >> <aurelien.bel...@telecom-paristech.fr >> <mailto:aurelien.bel...@telecom-paristech.fr>>: >> >> >> Dear Alexis, >> >> Thanks again for your reply. After reading about checkpointing I >> have modified my sample code as follows: >> >> for i in range(1000): >> print i >> data2=data.repartition(50).cache() >> if (i+1) % 10 == 0: >> data2.checkpoint() >> data2.first() # materialize rdd >> data.unpersist() # unpersist previous version >> data=data2 >> >> The data is checkpointed every 10 iterations to a directory that I >> specified. While this seems to improve things a little bit, there is >> still a lot of writing on disk (appcache directory, shown as "non >> HDFS files" in Cloudera Manager) *besides* the checkpoint files >> (which are regular HDFS files), and the application eventually runs >> out of disk space. The same is true even if I checkpoint at every >> iteration. >> >> What am I doing wrong? Maybe some garbage collector setting? >> >> Thanks a lot for the help, >> >> Aurelien >> >> Le 24/08/2015 10:39, alexis GILLAIN a écrit : >> >> Hi Aurelien, >> >> The first code should create a new RDD in memory at each iteration >> (check the webui). >> The second code will unpersist the RDD but that's not the main >> problem. >> >> I think you have trouble due to long lineage as .cache() keep >> track of >> lineage for recovery. >> You should have a look at checkpointing : >> >> https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/6-CacheAndCheckpoint.md >> >> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointer.scala >> >> You can also have a look at the code of others iterative >> algorithms in >> mlllib for best practices. >> >> 2015-08-20 17:26 GMT+08:00 abellet >> <aurelien.bel...@telecom-paristech.fr >> <mailto:aurelien.bel...@telecom-paristech.fr> >> <mailto:aurelien.bel...@telecom-paristech.fr >> >> <mailto:aurelien.bel...@telecom-paristech.fr>>>: >> >> Hello, >> >> For the need of my application, I need to periodically >> "shuffle" the >> data >> across nodes/partitions of a reasonably-large dataset. This >> is an >> expensive >> operation but I only need to do it every now and then. >> However it >> seems that >> I am doing something wrong because as the iterations go the >> memory usage >> increases, causing the job to spill onto HDFS, which >> eventually gets >> full. I >> am also getting some "Lost executor" errors that I don't >> get if I don't >> repartition. >> >> Here's a basic piece of code which reproduces the problem: >> >> data = >> sc.textFile("ImageNet_gist_train.txt",50).map(parseLine).cache() >> data.count() >> for i in range(1000): >> data=data.repartition(50).persist() >> # below several operations are done on data >> >> >> What am I doing wrong? I tried the following but it doesn't >> solve >> the issue: >> >> for i in range(1000): >> data2=data.repartition(50).persist() >> data2.count() # materialize rdd >> data.unpersist() # unpersist previous version >> data=data2 >> >> >> Help and suggestions on this would be greatly appreciated! >> Thanks a lot! >> >> >> >> >> -- >> View this message in context: >> >> http://apache-spark-user-list.1001560.n3.nabble.com/Memory-efficient-successive-calls-to-repartition-tp24358.html >> Sent from the Apache Spark User List mailing list archive >> at Nabble.com. >> >> >> >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> <mailto:user-unsubscr...@spark.apache.org> >> <mailto:user-unsubscr...@spark.apache.org >> <mailto:user-unsubscr...@spark.apache.org>> >> For additional commands, e-mail: user-h...@spark.apache.org >> <mailto:user-h...@spark.apache.org> >> <mailto:user-h...@spark.apache.org >> <mailto:user-h...@spark.apache.org>> >> >> >> >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> <mailto:user-unsubscr...@spark.apache.org> >> For additional commands, e-mail: user-h...@spark.apache.org >> <mailto:user-h...@spark.apache.org> >> >> >> > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >