Thanks Steve to answer in detail. I was under same feeling with Chandan from the line as well: it was against my knowledge as rename operation itself in HDFS is atomic, and I didn't imagine it was for tackling object store.
I learned a lot for object store from your answer. Thanks again. Jungtaek Lim (HeartSaVioR) 2018년 10월 3일 (수) 오전 2:48, chandan prakash <chandanbaran...@gmail.com>님이 작성: > Thanks a lot Steve and Jungtaek for your answers. > Steve, > You explained really well in depth. > > I understood that the existing old implementation was not correct for > object store like S3. The new implementation will address that. And for > better performance we should better choose a Direct Write based checkpoint > rather than Rename based (which we can implement using the new > CheckpointFilemanager abstraction) > My confusion was because of this line in PR: > *This is incorrect as rename is not atomic in HDFS FileSystem > implementation* > I thought the above line meant that existing old implementation is not > correct for HDFS file system as well . > So wanted to understand if there is something I am missing . The new > implementation is for addressing issue of Object Store like S3 and nor HDFS. > Thanks again for your explanation, I am sure it will help a lot of other > code readers as well . > > Regards, > Chandan > > > > On Mon, Oct 1, 2018 at 5:37 PM Steve Loughran <ste...@hortonworks.com> > wrote: > >> >> >> On 11 Aug 2018, at 17:33, chandan prakash <chandanbaran...@gmail.com> >> wrote: >> >> Hi All, >> I was going through this pull request about new CheckpointFileManager >> abstraction in structured streaming coming in 2.4 : >> https://issues.apache.org/jira/browse/SPARK-23966 >> https://github.com/apache/spark/pull/21048 >> >> I went through the code in detail and found it will indtroduce a very >> nice abstraction which is much cleaner and extensible for Direct Writes >> File System like S3 (in addition to current HDFS file system). >> >> *But I am unable to understand, is it really solving some problem in >> exsisting State Store code which is currently existing in Spark 2.3 ? * >> >> *My questions related to above statements in State Store code : * >> *PR description*:: "Checkpoint files must be written atomically such >> that *no partial files are generated*. >> *QUESTION*: When are partial files generated in current code ? I can >> see that data is first written to temp-delta file and then renamed to >> version.delta file. If something bad happens, the task will fail due to >> thrown exception and abort() will be called on store to close and delete >> tempDeltaFileStream . I think it is quite clean, what is the case that >> partial files might be generated ? >> >> >> I suspect the issue is that as files are written to a "classic" Posix >> store, flush/sync operations can result in the intermediate data being >> visible to others. Which is why the convention for checkpointing/commit >> operations is : write to temp & rename. Which is not what you want for >> object stores, especially S3 >> >> >> >> *PR description*:: *State Store behavior is incorrect - HDFS FileSystem >> implementation does not have atomic rename*" >> *QUESTION*: Hdfs filesystem rename operation is atomic, I think above >> line takes into account about checking existing file if exists and then >> taking appropriate action which together makes the file renaming operation >> multi-steps and hence non-atomic. But why this behaviour is incorrect ? >> Even if multiple executors try to write to the same version.delta file, >> only 1st of them will succeed, the second one will see the file exists and >> will delete its temp-delta file. Looks good . >> >> >> HDFS single file and dir rename is atomic; it grabs a lock on the >> metadatastore, does the change, unlocks it. If you are doing any FS op >> which explicitly renames more than one file in your commit, you lose >> atomicity. If there's a check + rename then yes, it's two step, unless you >> can use create(path, overwrite=false) to create some lease file where you >> know that the creation is exclusive & atomic for HDFS + Posix, generally >> not-at-all for the stores, especially S3 which can actually cache the 404 >> in its load balancers for a few tens of milliseconds >> >> For object stores, you are in different world of pain >> >> S3: nope; O(files+ data) + observable + partial failures. List >> inconsistency + caching of negative GET/HEAD to defend against DoS >> wasb: no, except for bits of the tree where you enable leases, something >> which increases cost of operations. O(files), with the odd pause if some >> shard movement has to take place >> google GCS: not sure, but it is O(files) >> Azure abfs. Not atomic yet As the code says: >> >> if (isAtomicRenameKey(source.getName())) { >> LOG.warn("The atomic rename feature is not supported by the ABFS >> scheme; however rename," >> +" create and delete operations are atomic if Namespace is >> enabled for your Azure Storage account."); >> } >> >> From my reading of the SPARK-23966 PR, it's the object store problem >> which is being addressed -both correctness and performance. >> >> >> Anything I am missing here? >> Really curious to know which corner cases we are trying to solve by this >> new pull request ? >> >> >> >> Object stores as the back end. For S3 in particular, where that rename is >> O(data) and a direct PUT to the destination gives you that atomic ness. >> >> >> Someone needs to sit down and write that reference implementation. >> >> Whoever does want to do that, >> >> - I believe it can all be done with the normal Hadoop FS APIs, simply >> knowing that for the store that OutputStream.close() is (a) atomic, (b) >> potentially really slow as the remaining data gets uploaded and (c) when it >> fails, can mean all your data just got lost. >> - I've got the TLA+ spec for the S3 API which they can use as the >> foundation for their proofs of correctness >> https://issues.apache.org/jira/secure/attachment/12865161/objectstore.pdf >> >> >> >> -Steve >> > > > -- > Chandan Prakash > >