On 11 Aug 2018, at 17:33, chandan prakash 
<chandanbaran...@gmail.com<mailto:chandanbaran...@gmail.com>> wrote:

Hi All,
I was going through this pull request about new CheckpointFileManager 
abstraction in structured streaming coming in 2.4 :
https://issues.apache.org/jira/browse/SPARK-23966
https://github.com/apache/spark/pull/21048

I went through the code in detail and found it will indtroduce a very nice 
abstraction which is much cleaner and extensible for Direct Writes File System 
like S3 (in addition to current HDFS file system).

But I am unable to understand, is it really solving some problem in exsisting 
State Store code which is currently  existing in Spark 2.3 ?

My questions related to above statements in State Store code :
 PR description:: "Checkpoint files must be written atomically such that no 
partial files are generated.
QUESTION: When are partial files generated in current code ?  I can see that 
data is first written to temp-delta file and then renamed to version.delta 
file. If something bad happens, the task will fail due to thrown exception and 
abort() will be called on store to close and delete tempDeltaFileStream . I 
think it is quite clean, what is the case that partial files might be generated 
?

I suspect the issue is that as files are written to a "classic" Posix store, 
flush/sync operations can result in the intermediate data being visible to 
others. Which is why the convention for checkpointing/commit operations is : 
write to temp & rename. Which is not what you want for object stores, 
especially S3



 PR description:: State Store behavior is incorrect - HDFS FileSystem 
implementation does not have atomic rename"
QUESTION:  Hdfs filesystem rename operation is atomic, I think above line takes 
into account about checking existing file if exists and then taking appropriate 
action which together makes the file renaming operation multi-steps and hence 
non-atomic. But why this behaviour is incorrect ?
Even if multiple executors try to write to the same version.delta file, only 
1st of them will succeed, the second one will see the file exists and will 
delete its temp-delta file. Looks good .


HDFS single file and dir rename is atomic; it grabs a lock on the 
metadatastore, does the change, unlocks it. If you are doing any FS op which 
explicitly renames more than one file in your commit, you lose atomicity.  If 
there's a check + rename then yes, it's two step, unless you can use 
create(path, overwrite=false) to create some lease file where you know that the 
creation is exclusive & atomic for HDFS + Posix, generally not-at-all for the 
stores, especially S3 which can actually cache the 404 in its load balancers 
for a few tens of milliseconds

For object stores, you are in different world of pain

S3: nope; O(files+ data)  + observable + partial failures. List inconsistency + 
caching of negative GET/HEAD to defend against DoS
wasb: no, except for bits of the tree where you enable leases, something which 
increases cost of operations. O(files), with the odd pause if some shard 
movement has to take place
google GCS: not sure, but it is O(files)
Azure abfs. Not atomic yet As the code says:

    if (isAtomicRenameKey(source.getName())) {
      LOG.warn("The atomic rename feature is not supported by the ABFS scheme; 
however rename,"
              +" create and delete operations are atomic if Namespace is 
enabled for your Azure Storage account.");
    }

>From my reading of the SPARK-23966 PR, it's the object store problem which is 
>being addressed -both correctness and performance.


Anything I am missing here?
Really curious to know which corner cases we are trying to solve by this new 
pull request ?


Object stores as the back end. For S3 in particular, where that rename is 
O(data) and a direct PUT to the destination gives you that atomic ness.


Someone needs to sit down and write that reference implementation.

Whoever  does want to do that,

- I believe it can all be done with the normal Hadoop FS APIs, simply knowing 
that for the store that OutputStream.close() is (a) atomic, (b) potentially 
really slow as the remaining data gets uploaded and (c) when it fails, can mean 
all your data just got lost.
- I've got the TLA+ spec for the S3 API which they can use as the foundation 
for their proofs of correctness 
https://issues.apache.org/jira/secure/attachment/12865161/objectstore.pdf


-Steve

Reply via email to