If you use S3 you can first copy it into a temporary folder on HDFS. However 
for the checkpointing I would use the spark implementation. You can load also 
the file from S3 and checkpoint to HDFS.

> On 14 Feb 2017, at 17:43, Mendelson, Assaf <assaf.mendel...@rsa.com> wrote:
> 
> Thanks, I didn’t know the Hadoop API supports other file systems other than 
> HDFS and local file system (when there is 1 node).
> My main goal is indeed for checkpointing, every N iterations I save the data 
> for future use. The problem is that if I use overwrite mode then it first 
> deletes and then write the new one so that is what I am looking to solve.
>  
> I wasn’t aware of the issues with renaming in S3 (we currently not using it, 
> we just know we would probably need to support it or a similar store in the 
> future). That said, how does spark handle this case then when writing a 
> dataframe? Currently it writes everything to a temporary sub directory and 
> renames it at the end?
>  
> In any case, I was hoping for some way internal to spark to do a write which 
> does not harm the previous version of the dataframe on disk until a 
> successful writing of the new one.
> Thanks,
> Assaf.
>  
>  
> From: Steve Loughran [mailto:ste...@hortonworks.com] 
> Sent: Tuesday, February 14, 2017 3:25 PM
> To: Mendelson, Assaf
> Cc: Jörn Franke; user
> Subject: Re: fault tolerant dataframe write with overwrite
>  
>  
> On 14 Feb 2017, at 11:12, Mendelson, Assaf <assaf.mendel...@rsa.com> wrote:
>  
> I know how to get the filesystem, the problem is that this means using Hadoop 
> directly so if in the future we change to something else (e.g. S3) I would 
> need to rewrite the code.
>  
> well, no, because the s3 and hfs clients use the same API
>  
> FileSystem fs = FileSystem.get("hdfs://nn:8020/users/stevel", conf)
>  
> vs
>  
> FileSystem fs = FileSystem.get("s3a:/bucket1/dataset", conf)
>  
> same for wasb://  (which, being consistent and with fast atomic rename, can 
> be used instead of HDFS), other cluster filesystems. If it's a native fs, 
> then file:// should work everywhere, or some derivative (as redhat do with 
> gluster)
> 
> 
> This also relate to finding the last iteration, I would need to use Hadoop 
> filesystem which is not agnostic to the deployment.
>  
>  
> see above. if you are using a spark cluster of size > 1 you will need some 
> distributed filesystem, which is going to have to provide a
>  
> If there is an issue here, it is that if you rely on FileSystem.rename() 
> being an atomic O(1) operation then you are going to be disappointed on S3, 
> as its a non-atomic O(data) copy & delete whose failure state is "undefined". 
>  
>  
> The solution here comes from having specific commiter logic for the different 
> object stores. You really, really don' t want to go there. If you do, have a 
> start by looking at the S3guard WiP one: 
> https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-13786-committer/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3a_committer.md
>  
> further reading: 
> http://www.slideshare.net/steve_l/spark-summit-east-2017-apache-spark-and-object-stores
> 
> 
> Kyroserializer still costs much more than dataframe write.
>  
> As for the use case, I am doing a very large number of iterations. So the 
> idea is that every X iterations I want to save to disk so that if something 
> crashes I do not have to begin from the first iteration but just from the 
> relevant iteration.
>  
>  
> sounds like you don't really want the output to always be the FS, more 
> checkpointing iterations. Couldn't you do something like every 20 iterations, 
> write() the relevant RDD to the DFS
>  
>  
> Basically I would have liked to see something like saving normally and the 
> original data would not be removed until a successful write.
> Assaf.
>  
> From: Jörn Franke [mailto:jornfra...@gmail.com] 
> Sent: Tuesday, February 14, 2017 12:54 PM
> To: Mendelson, Assaf
> Cc: user
> Subject: Re: fault tolerant dataframe write with overwrite
>  
> Normally you can fetch the filesystem interface from the configuration ( I 
> assume you mean URI).
> Managing to get the last iteration: I do not understand the issue. You can 
> have as the directory the current timestamp and at the end you simply select 
> the directory with the highest number.
>  
> Regards to checkpointing , you can use also kyroserializer to avoid some 
> space overhead.
>  
> Aside from that, can you elaborate on the use case why you need to write 
> every iteration?
> 
> On 14 Feb 2017, at 11:22, Mendelson, Assaf <assaf.mendel...@rsa.com> wrote:
> 
> Hi,
>  
> I have a case where I have an iterative process which overwrites the results 
> of a previous iteration.
> Every iteration I need to write a dataframe with the results.
> The problem is that when I write, if I simply overwrite the results of the 
> previous iteration, this is not fault tolerant. i.e. if the program crashes 
> in the middle of an iteration, the data from previous ones is lost as 
> overwrite first removes the previous data and then starts writing.
>  
> Currently we simply write to a new directory and then rename but this is not 
> the best way as it requires us to know the interfaces to the underlying file 
> system (as well as requiring some extra work to manage which is the last one 
> etc.)
> I know I can also use checkpoint (although I haven’t fully tested the process 
> there), however, checkpointing converts the result to RDD which both takes 
> more time and more space.
> I was wondering if there is any efficient method of managing this from inside 
> spark.
> Thanks,
>                 Assaf.
>  

Reply via email to