Thanks, I didn’t know the Hadoop API supports other file systems other than 
HDFS and local file system (when there is 1 node).
My main goal is indeed for checkpointing, every N iterations I save the data 
for future use. The problem is that if I use overwrite mode then it first 
deletes and then write the new one so that is what I am looking to solve.

I wasn’t aware of the issues with renaming in S3 (we currently not using it, we 
just know we would probably need to support it or a similar store in the 
future). That said, how does spark handle this case then when writing a 
dataframe? Currently it writes everything to a temporary sub directory and 
renames it at the end?

In any case, I was hoping for some way internal to spark to do a write which 
does not harm the previous version of the dataframe on disk until a successful 
writing of the new one.
Thanks,
Assaf.


From: Steve Loughran [mailto:ste...@hortonworks.com]
Sent: Tuesday, February 14, 2017 3:25 PM
To: Mendelson, Assaf
Cc: Jörn Franke; user
Subject: Re: fault tolerant dataframe write with overwrite


On 14 Feb 2017, at 11:12, Mendelson, Assaf 
<assaf.mendel...@rsa.com<mailto:assaf.mendel...@rsa.com>> wrote:

I know how to get the filesystem, the problem is that this means using Hadoop 
directly so if in the future we change to something else (e.g. S3) I would need 
to rewrite the code.

well, no, because the s3 and hfs clients use the same API

FileSystem fs = FileSystem.get("hdfs://nn:8020/users/stevel", conf)

vs

FileSystem fs = FileSystem.get("s3a:/bucket1/dataset", conf)

same for wasb://  (which, being consistent and with fast atomic rename, can be 
used instead of HDFS), other cluster filesystems. If it's a native fs, then 
file:// should work everywhere, or some derivative (as redhat do with gluster)


This also relate to finding the last iteration, I would need to use Hadoop 
filesystem which is not agnostic to the deployment.


see above. if you are using a spark cluster of size > 1 you will need some 
distributed filesystem, which is going to have to provide a

If there is an issue here, it is that if you rely on FileSystem.rename() being 
an atomic O(1) operation then you are going to be disappointed on S3, as its a 
non-atomic O(data) copy & delete whose failure state is "undefined".


The solution here comes from having specific commiter logic for the different 
object stores. You really, really don' t want to go there. If you do, have a 
start by looking at the S3guard WiP one: 
https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-13786-committer/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3a_committer.md

further reading: 
http://www.slideshare.net/steve_l/spark-summit-east-2017-apache-spark-and-object-stores


Kyroserializer still costs much more than dataframe write.

As for the use case, I am doing a very large number of iterations. So the idea 
is that every X iterations I want to save to disk so that if something crashes 
I do not have to begin from the first iteration but just from the relevant 
iteration.


sounds like you don't really want the output to always be the FS, more 
checkpointing iterations. Couldn't you do something like every 20 iterations, 
write() the relevant RDD to the DFS


Basically I would have liked to see something like saving normally and the 
original data would not be removed until a successful write.
Assaf.

From: Jörn Franke [mailto:jornfra...@gmail.com]
Sent: Tuesday, February 14, 2017 12:54 PM
To: Mendelson, Assaf
Cc: user
Subject: Re: fault tolerant dataframe write with overwrite

Normally you can fetch the filesystem interface from the configuration ( I 
assume you mean URI).
Managing to get the last iteration: I do not understand the issue. You can have 
as the directory the current timestamp and at the end you simply select the 
directory with the highest number.

Regards to checkpointing , you can use also kyroserializer to avoid some space 
overhead.

Aside from that, can you elaborate on the use case why you need to write every 
iteration?

On 14 Feb 2017, at 11:22, Mendelson, Assaf 
<assaf.mendel...@rsa.com<mailto:assaf.mendel...@rsa.com>> wrote:
Hi,

I have a case where I have an iterative process which overwrites the results of 
a previous iteration.
Every iteration I need to write a dataframe with the results.
The problem is that when I write, if I simply overwrite the results of the 
previous iteration, this is not fault tolerant. i.e. if the program crashes in 
the middle of an iteration, the data from previous ones is lost as overwrite 
first removes the previous data and then starts writing.

Currently we simply write to a new directory and then rename but this is not 
the best way as it requires us to know the interfaces to the underlying file 
system (as well as requiring some extra work to manage which is the last one 
etc.)
I know I can also use checkpoint (although I haven’t fully tested the process 
there), however, checkpointing converts the result to RDD which both takes more 
time and more space.
I was wondering if there is any efficient method of managing this from inside 
spark.
Thanks,
                Assaf.

Reply via email to