If you use S3 you can first copy it into a temporary folder on HDFS. However for the checkpointing I would use the spark implementation. You can load also the file from S3 and checkpoint to HDFS.
> On 14 Feb 2017, at 17:43, Mendelson, Assaf <assaf.mendel...@rsa.com> wrote: > > Thanks, I didn’t know the Hadoop API supports other file systems other than > HDFS and local file system (when there is 1 node). > My main goal is indeed for checkpointing, every N iterations I save the data > for future use. The problem is that if I use overwrite mode then it first > deletes and then write the new one so that is what I am looking to solve. > > I wasn’t aware of the issues with renaming in S3 (we currently not using it, > we just know we would probably need to support it or a similar store in the > future). That said, how does spark handle this case then when writing a > dataframe? Currently it writes everything to a temporary sub directory and > renames it at the end? > > In any case, I was hoping for some way internal to spark to do a write which > does not harm the previous version of the dataframe on disk until a > successful writing of the new one. > Thanks, > Assaf. > > > From: Steve Loughran [mailto:ste...@hortonworks.com] > Sent: Tuesday, February 14, 2017 3:25 PM > To: Mendelson, Assaf > Cc: Jörn Franke; user > Subject: Re: fault tolerant dataframe write with overwrite > > > On 14 Feb 2017, at 11:12, Mendelson, Assaf <assaf.mendel...@rsa.com> wrote: > > I know how to get the filesystem, the problem is that this means using Hadoop > directly so if in the future we change to something else (e.g. S3) I would > need to rewrite the code. > > well, no, because the s3 and hfs clients use the same API > > FileSystem fs = FileSystem.get("hdfs://nn:8020/users/stevel", conf) > > vs > > FileSystem fs = FileSystem.get("s3a:/bucket1/dataset", conf) > > same for wasb:// (which, being consistent and with fast atomic rename, can > be used instead of HDFS), other cluster filesystems. If it's a native fs, > then file:// should work everywhere, or some derivative (as redhat do with > gluster) > > > This also relate to finding the last iteration, I would need to use Hadoop > filesystem which is not agnostic to the deployment. > > > see above. if you are using a spark cluster of size > 1 you will need some > distributed filesystem, which is going to have to provide a > > If there is an issue here, it is that if you rely on FileSystem.rename() > being an atomic O(1) operation then you are going to be disappointed on S3, > as its a non-atomic O(data) copy & delete whose failure state is "undefined". > > > The solution here comes from having specific commiter logic for the different > object stores. You really, really don' t want to go there. If you do, have a > start by looking at the S3guard WiP one: > https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-13786-committer/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3a_committer.md > > further reading: > http://www.slideshare.net/steve_l/spark-summit-east-2017-apache-spark-and-object-stores > > > Kyroserializer still costs much more than dataframe write. > > As for the use case, I am doing a very large number of iterations. So the > idea is that every X iterations I want to save to disk so that if something > crashes I do not have to begin from the first iteration but just from the > relevant iteration. > > > sounds like you don't really want the output to always be the FS, more > checkpointing iterations. Couldn't you do something like every 20 iterations, > write() the relevant RDD to the DFS > > > Basically I would have liked to see something like saving normally and the > original data would not be removed until a successful write. > Assaf. > > From: Jörn Franke [mailto:jornfra...@gmail.com] > Sent: Tuesday, February 14, 2017 12:54 PM > To: Mendelson, Assaf > Cc: user > Subject: Re: fault tolerant dataframe write with overwrite > > Normally you can fetch the filesystem interface from the configuration ( I > assume you mean URI). > Managing to get the last iteration: I do not understand the issue. You can > have as the directory the current timestamp and at the end you simply select > the directory with the highest number. > > Regards to checkpointing , you can use also kyroserializer to avoid some > space overhead. > > Aside from that, can you elaborate on the use case why you need to write > every iteration? > > On 14 Feb 2017, at 11:22, Mendelson, Assaf <assaf.mendel...@rsa.com> wrote: > > Hi, > > I have a case where I have an iterative process which overwrites the results > of a previous iteration. > Every iteration I need to write a dataframe with the results. > The problem is that when I write, if I simply overwrite the results of the > previous iteration, this is not fault tolerant. i.e. if the program crashes > in the middle of an iteration, the data from previous ones is lost as > overwrite first removes the previous data and then starts writing. > > Currently we simply write to a new directory and then rename but this is not > the best way as it requires us to know the interfaces to the underlying file > system (as well as requiring some extra work to manage which is the last one > etc.) > I know I can also use checkpoint (although I haven’t fully tested the process > there), however, checkpointing converts the result to RDD which both takes > more time and more space. > I was wondering if there is any efficient method of managing this from inside > spark. > Thanks, > Assaf. >