For GCS, I would do what I believe we already do.
rename(src, dst):
- if !exists(src) and exists(dst) return 0
- if !exists(src) and !exists(dst) return error
- if exists(src) and exists(dst) { if checksum(src) == checksum(dst) return
0 else delete(dst) }
- Start a GCS copy from src to dst.
- Wait for GCS copy to complete.
- delete(src)For filesystems that don't have checksum() metadata, size() can be used instead. I've opened a bug to track this: https://issues.apache.org/jira/browse/BEAM-3600 On Thu, Feb 1, 2018 at 2:25 PM Eugene Kirpichov <[email protected]> wrote: > Yes, IGNORE_MISSING_FILES is unsafe because it will - well - ignore files > that are missing for more ominous reasons than just this being a non-first > attempt at renaming src to dst. E.g. if there was a bug in constructing the > filename to be renamed, or if we somehow messed up the order of rename vs > cleanup, etc. - these situations with IGNORE_MISSING_FILES would lead to > silent data loss (likely caught by unit tests though - so this is not a > super serious issue). > > Basically I just can't think of a case when I was copying files and > thinking "oh man, I wish it didn't give an error if the stuff I'm copying > doesn't exist" - the option exists only because we couldn't come up with > another way to implement idempotent rename on GCS. > > What's your idea of how a safe retryable GCS rename() could work? > > On Wed, Jan 31, 2018 at 6:45 PM Udi Meiri <[email protected]> wrote: > >> Eugene, if I get this right, you're saying that IGNORE_MISSING_FILES is >> unsafe because it will skip (src, dst) pairs where neither exist? (it only >> looks if src exists) >> >> For GCS, we can construct a safe retryable rename() operation, assuming >> that copy() and delete() are atomic for a single file or pair of files. >> >> >> >> On Wed, Jan 31, 2018 at 4:00 PM Raghu Angadi <[email protected]> wrote: >> >>> On Wed, Jan 31, 2018 at 2:43 PM, Eugene Kirpichov <[email protected]> >>> wrote: >>> >>>> As far as I know, the current implementation of file sinks is the only >>>> reason why the flag IGNORE_MISSING for copying even exists - there's no >>>> other compelling reason to justify it. We implement "rename" as "copy, then >>>> delete" (in a single DoFn), so for idempodency of this operation we need to >>>> ignore the copying of a non-existent file. >>>> >>>> I think the right way to go would be to change the implementation of >>>> renaming to have a @RequiresStableInput (or reshuffle) in the middle, so >>>> it's made of 2 individually idempotent operations: >>>> 1) copy, which would fail if input is missing, and would overwrite >>>> output if it exists >>>> -- reshuffle -- >>>> 2) delete, which would not fail if input is missing. >>>> >>> >>> Something like this is needed only in streaming, right? >>> >>> Raghu. >>> >>> >>>> That way first everything is copied (possibly via multiple attempts), >>>> and then old files are deleted (possibly via multiple attempts). >>>> >>>> On Wed, Jan 31, 2018 at 2:26 PM Udi Meiri <[email protected]> wrote: >>>> >>>>> I agree that overwriting is more in line with user expectations. >>>>> I believe that the sink should not ignore errors from the filesystem >>>>> layer. Instead, the FileSystem API should be more well defined. >>>>> Examples: rename() and copy() should overwrite existing files at the >>>>> destination, copy() should have an ignore_missing flag. >>>>> >>>>> On Wed, Jan 31, 2018 at 1:49 PM Raghu Angadi <[email protected]> >>>>> wrote: >>>>> >>>>>> Original mail mentions that output from second run of word_count is >>>>>> ignored. That does not seem as safe as ignoring error from a second >>>>>> attempt >>>>>> of a step. How do we know second run didn't run on different output? >>>>>> Overwriting seems more accurate than ignoring. Does handling this error >>>>>> at >>>>>> sink level distinguish between the two (another run vs second attempt)? >>>>>> >>>>>> >>>>>> On Wed, Jan 31, 2018 at 12:32 PM, Udi Meiri <[email protected]> wrote: >>>>>> >>>>>>> Yeah, another round of refactoring is due to move the rename via >>>>>>> copy+delete logic up to the file-based sink level. >>>>>>> >>>>>>> On Wed, Jan 31, 2018, 10:42 Chamikara Jayalath <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Good point. There's always the chance of step that performs final >>>>>>>> rename being retried. So we'll have to ignore this error at the sink >>>>>>>> level. >>>>>>>> We don't necessarily have to do this at the FileSystem level though. I >>>>>>>> think the proper behavior might be to raise an error for the rename at >>>>>>>> the >>>>>>>> FileSystem level if the destination already exists (or source doesn't >>>>>>>> exist) while ignoring that error (and possibly logging a warning) at >>>>>>>> the >>>>>>>> sink level. >>>>>>>> >>>>>>>> - Cham >>>>>>>> >>>>>>>> >>>>>>>> On Tue, Jan 30, 2018 at 6:47 PM Reuven Lax <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> I think the idea was to ignore "already exists" errors. The reason >>>>>>>>> being that any step in Beam can be executed multiple times, including >>>>>>>>> the >>>>>>>>> rename step. If the rename step gets run twice, the second run should >>>>>>>>> succeed vacuously. >>>>>>>>> >>>>>>>>> >>>>>>>>> On Tue, Jan 30, 2018 at 6:19 PM, Udi Meiri <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> I've been working on HDFS code for the Python SDK and I've >>>>>>>>>> noticed some behaviors which are surprising. I wanted to know if >>>>>>>>>> these >>>>>>>>>> behaviors are known and intended. >>>>>>>>>> >>>>>>>>>> 1. When renaming files during finalize_write, rename errors are >>>>>>>>>> ignored >>>>>>>>>> <https://github.com/apache/beam/blob/3aa2bef87c93d2844dd7c8dbaf45db75ec607792/sdks/python/apache_beam/io/filebasedsink.py#L232>. >>>>>>>>>> For example, if I run wordcount twice using HDFS code I get a >>>>>>>>>> warning the >>>>>>>>>> second time because the file already exists: >>>>>>>>>> >>>>>>>>>> WARNING:root:Rename not successful: >>>>>>>>>> hdfs://beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da245/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2 >>>>>>>>>> -> hdfs://counts2-00000-of-00001, libhdfs error in renaming >>>>>>>>>> hdfs://beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da245/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2 >>>>>>>>>> to hdfs://counts2-00000-of-00001 with exceptions Unable to rename >>>>>>>>>> '/beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da245/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2' >>>>>>>>>> to '/counts2-00000-of-00001'. >>>>>>>>>> >>>>>>>>>> For GCS and local files there are no rename errors (in this >>>>>>>>>> case), since the rename operation silently overwrites existing >>>>>>>>>> destination >>>>>>>>>> files. However, blindly ignoring these errors might make the >>>>>>>>>> pipeline to >>>>>>>>>> report success even though output files are missing. >>>>>>>>>> >>>>>>>>>> 2. Output files (--ouput) overwrite existing files. >>>>>>>>>> >>>>>>>>>> 3. The Python SDK doesn't use Filesystems.copy(). The Java SDK >>>>>>>>>> doesn't use Filesystem.Rename(). >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> - Udi >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>
smime.p7s
Description: S/MIME Cryptographic Signature
