You could add a step to delete all of dest before a barrier and the step that does the rename as outlined. In that case, any dest file that exists must be good.
On Thu, Feb 1, 2018 at 2:52 PM, Eugene Kirpichov <[email protected]> wrote: > I think this is still unsafe in case exists(dst) (e.g. this is a re-run of a > pipeline) but src is missing due to some bad reason. However it's probably > better than what we have (e.g. we currently certainly don't perform checksum > checks). > > On Thu, Feb 1, 2018 at 2:45 PM Udi Meiri <[email protected]> wrote: >> >> For GCS, I would do what I believe we already do. >> rename(src, dst): >> - if !exists(src) and exists(dst) return 0 >> - if !exists(src) and !exists(dst) return error >> - if exists(src) and exists(dst) { if checksum(src) == checksum(dst) >> return 0 else delete(dst) } >> - Start a GCS copy from src to dst. >> - Wait for GCS copy to complete. >> - delete(src) >> >> For filesystems that don't have checksum() metadata, size() can be used >> instead. >> >> I've opened a bug to track this: >> https://issues.apache.org/jira/browse/BEAM-3600 >> >> On Thu, Feb 1, 2018 at 2:25 PM Eugene Kirpichov <[email protected]> >> wrote: >>> >>> Yes, IGNORE_MISSING_FILES is unsafe because it will - well - ignore files >>> that are missing for more ominous reasons than just this being a non-first >>> attempt at renaming src to dst. E.g. if there was a bug in constructing the >>> filename to be renamed, or if we somehow messed up the order of rename vs >>> cleanup, etc. - these situations with IGNORE_MISSING_FILES would lead to >>> silent data loss (likely caught by unit tests though - so this is not a >>> super serious issue). >>> >>> Basically I just can't think of a case when I was copying files and >>> thinking "oh man, I wish it didn't give an error if the stuff I'm copying >>> doesn't exist" - the option exists only because we couldn't come up with >>> another way to implement idempotent rename on GCS. >>> >>> What's your idea of how a safe retryable GCS rename() could work? >>> >>> On Wed, Jan 31, 2018 at 6:45 PM Udi Meiri <[email protected]> wrote: >>>> >>>> Eugene, if I get this right, you're saying that IGNORE_MISSING_FILES is >>>> unsafe because it will skip (src, dst) pairs where neither exist? (it only >>>> looks if src exists) >>>> >>>> For GCS, we can construct a safe retryable rename() operation, assuming >>>> that copy() and delete() are atomic for a single file or pair of files. >>>> >>>> >>>> >>>> On Wed, Jan 31, 2018 at 4:00 PM Raghu Angadi <[email protected]> wrote: >>>>> >>>>> On Wed, Jan 31, 2018 at 2:43 PM, Eugene Kirpichov >>>>> <[email protected]> wrote: >>>>>> >>>>>> As far as I know, the current implementation of file sinks is the only >>>>>> reason why the flag IGNORE_MISSING for copying even exists - there's no >>>>>> other compelling reason to justify it. We implement "rename" as "copy, >>>>>> then >>>>>> delete" (in a single DoFn), so for idempodency of this operation we need >>>>>> to >>>>>> ignore the copying of a non-existent file. >>>>>> >>>>>> I think the right way to go would be to change the implementation of >>>>>> renaming to have a @RequiresStableInput (or reshuffle) in the middle, so >>>>>> it's made of 2 individually idempotent operations: >>>>>> 1) copy, which would fail if input is missing, and would overwrite >>>>>> output if it exists >>>>>> -- reshuffle -- >>>>>> 2) delete, which would not fail if input is missing. >>>>> >>>>> >>>>> Something like this is needed only in streaming, right? >>>>> >>>>> Raghu. >>>>> >>>>>> >>>>>> That way first everything is copied (possibly via multiple attempts), >>>>>> and then old files are deleted (possibly via multiple attempts). >>>>>> >>>>>> On Wed, Jan 31, 2018 at 2:26 PM Udi Meiri <[email protected]> wrote: >>>>>>> >>>>>>> I agree that overwriting is more in line with user expectations. >>>>>>> I believe that the sink should not ignore errors from the filesystem >>>>>>> layer. Instead, the FileSystem API should be more well defined. >>>>>>> Examples: rename() and copy() should overwrite existing files at the >>>>>>> destination, copy() should have an ignore_missing flag. >>>>>>> >>>>>>> On Wed, Jan 31, 2018 at 1:49 PM Raghu Angadi <[email protected]> >>>>>>> wrote: >>>>>>>> >>>>>>>> Original mail mentions that output from second run of word_count is >>>>>>>> ignored. That does not seem as safe as ignoring error from a second >>>>>>>> attempt >>>>>>>> of a step. How do we know second run didn't run on different output? >>>>>>>> Overwriting seems more accurate than ignoring. Does handling this >>>>>>>> error at >>>>>>>> sink level distinguish between the two (another run vs second attempt)? >>>>>>>> >>>>>>>> >>>>>>>> On Wed, Jan 31, 2018 at 12:32 PM, Udi Meiri <[email protected]> >>>>>>>> wrote: >>>>>>>>> >>>>>>>>> Yeah, another round of refactoring is due to move the rename via >>>>>>>>> copy+delete logic up to the file-based sink level. >>>>>>>>> >>>>>>>>> >>>>>>>>> On Wed, Jan 31, 2018, 10:42 Chamikara Jayalath >>>>>>>>> <[email protected]> wrote: >>>>>>>>>> >>>>>>>>>> Good point. There's always the chance of step that performs final >>>>>>>>>> rename being retried. So we'll have to ignore this error at the sink >>>>>>>>>> level. >>>>>>>>>> We don't necessarily have to do this at the FileSystem level though. >>>>>>>>>> I think >>>>>>>>>> the proper behavior might be to raise an error for the rename at the >>>>>>>>>> FileSystem level if the destination already exists (or source >>>>>>>>>> doesn't exist) >>>>>>>>>> while ignoring that error (and possibly logging a warning) at the >>>>>>>>>> sink >>>>>>>>>> level. >>>>>>>>>> >>>>>>>>>> - Cham >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Tue, Jan 30, 2018 at 6:47 PM Reuven Lax <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>> I think the idea was to ignore "already exists" errors. The >>>>>>>>>>> reason being that any step in Beam can be executed multiple times, >>>>>>>>>>> including >>>>>>>>>>> the rename step. If the rename step gets run twice, the second run >>>>>>>>>>> should >>>>>>>>>>> succeed vacuously. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Tue, Jan 30, 2018 at 6:19 PM, Udi Meiri <[email protected]> >>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>> Hi, >>>>>>>>>>>> I've been working on HDFS code for the Python SDK and I've >>>>>>>>>>>> noticed some behaviors which are surprising. I wanted to know if >>>>>>>>>>>> these >>>>>>>>>>>> behaviors are known and intended. >>>>>>>>>>>> >>>>>>>>>>>> 1. When renaming files during finalize_write, rename errors are >>>>>>>>>>>> ignored. For example, if I run wordcount twice using HDFS code I >>>>>>>>>>>> get a >>>>>>>>>>>> warning the second time because the file already exists: >>>>>>>>>>>> >>>>>>>>>>>> WARNING:root:Rename not successful: >>>>>>>>>>>> hdfs://beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da245/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2 >>>>>>>>>>>> -> hdfs://counts2-00000-of-00001, libhdfs error in renaming >>>>>>>>>>>> hdfs://beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da245/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2 >>>>>>>>>>>> to hdfs://counts2-00000-of-00001 with exceptions Unable to rename >>>>>>>>>>>> '/beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da245/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2' >>>>>>>>>>>> to '/counts2-00000-of-00001'. >>>>>>>>>>>> >>>>>>>>>>>> For GCS and local files there are no rename errors (in this >>>>>>>>>>>> case), since the rename operation silently overwrites existing >>>>>>>>>>>> destination >>>>>>>>>>>> files. However, blindly ignoring these errors might make the >>>>>>>>>>>> pipeline to >>>>>>>>>>>> report success even though output files are missing. >>>>>>>>>>>> >>>>>>>>>>>> 2. Output files (--ouput) overwrite existing files. >>>>>>>>>>>> >>>>>>>>>>>> 3. The Python SDK doesn't use Filesystems.copy(). The Java SDK >>>>>>>>>>>> doesn't use Filesystem.Rename(). >>>>>>>>>>>> >>>>>>>>>>>> Thanks, >>>>>>>>>>>> - Udi >>>>>>>>>>> >>>>>>>>>>> >>>>>>>> >
