Eugene, if I get this right, you're saying that IGNORE_MISSING_FILES is unsafe because it will skip (src, dst) pairs where neither exist? (it only looks if src exists)
For GCS, we can construct a safe retryable rename() operation, assuming that copy() and delete() are atomic for a single file or pair of files. On Wed, Jan 31, 2018 at 4:00 PM Raghu Angadi <[email protected]> wrote: > On Wed, Jan 31, 2018 at 2:43 PM, Eugene Kirpichov <[email protected]> > wrote: > >> As far as I know, the current implementation of file sinks is the only >> reason why the flag IGNORE_MISSING for copying even exists - there's no >> other compelling reason to justify it. We implement "rename" as "copy, then >> delete" (in a single DoFn), so for idempodency of this operation we need to >> ignore the copying of a non-existent file. >> >> I think the right way to go would be to change the implementation of >> renaming to have a @RequiresStableInput (or reshuffle) in the middle, so >> it's made of 2 individually idempotent operations: >> 1) copy, which would fail if input is missing, and would overwrite output >> if it exists >> -- reshuffle -- >> 2) delete, which would not fail if input is missing. >> > > Something like this is needed only in streaming, right? > > Raghu. > > >> That way first everything is copied (possibly via multiple attempts), and >> then old files are deleted (possibly via multiple attempts). >> >> On Wed, Jan 31, 2018 at 2:26 PM Udi Meiri <[email protected]> wrote: >> >>> I agree that overwriting is more in line with user expectations. >>> I believe that the sink should not ignore errors from the filesystem >>> layer. Instead, the FileSystem API should be more well defined. >>> Examples: rename() and copy() should overwrite existing files at the >>> destination, copy() should have an ignore_missing flag. >>> >>> On Wed, Jan 31, 2018 at 1:49 PM Raghu Angadi <[email protected]> wrote: >>> >>>> Original mail mentions that output from second run of word_count is >>>> ignored. That does not seem as safe as ignoring error from a second attempt >>>> of a step. How do we know second run didn't run on different output? >>>> Overwriting seems more accurate than ignoring. Does handling this error at >>>> sink level distinguish between the two (another run vs second attempt)? >>>> >>>> >>>> On Wed, Jan 31, 2018 at 12:32 PM, Udi Meiri <[email protected]> wrote: >>>> >>>>> Yeah, another round of refactoring is due to move the rename via >>>>> copy+delete logic up to the file-based sink level. >>>>> >>>>> On Wed, Jan 31, 2018, 10:42 Chamikara Jayalath <[email protected]> >>>>> wrote: >>>>> >>>>>> Good point. There's always the chance of step that performs final >>>>>> rename being retried. So we'll have to ignore this error at the sink >>>>>> level. >>>>>> We don't necessarily have to do this at the FileSystem level though. I >>>>>> think the proper behavior might be to raise an error for the rename at >>>>>> the >>>>>> FileSystem level if the destination already exists (or source doesn't >>>>>> exist) while ignoring that error (and possibly logging a warning) at the >>>>>> sink level. >>>>>> >>>>>> - Cham >>>>>> >>>>>> >>>>>> On Tue, Jan 30, 2018 at 6:47 PM Reuven Lax <[email protected]> wrote: >>>>>> >>>>>>> I think the idea was to ignore "already exists" errors. The reason >>>>>>> being that any step in Beam can be executed multiple times, including >>>>>>> the >>>>>>> rename step. If the rename step gets run twice, the second run should >>>>>>> succeed vacuously. >>>>>>> >>>>>>> >>>>>>> On Tue, Jan 30, 2018 at 6:19 PM, Udi Meiri <[email protected]> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> I've been working on HDFS code for the Python SDK and I've noticed >>>>>>>> some behaviors which are surprising. I wanted to know if these >>>>>>>> behaviors >>>>>>>> are known and intended. >>>>>>>> >>>>>>>> 1. When renaming files during finalize_write, rename errors are >>>>>>>> ignored >>>>>>>> <https://github.com/apache/beam/blob/3aa2bef87c93d2844dd7c8dbaf45db75ec607792/sdks/python/apache_beam/io/filebasedsink.py#L232>. >>>>>>>> For example, if I run wordcount twice using HDFS code I get a warning >>>>>>>> the >>>>>>>> second time because the file already exists: >>>>>>>> >>>>>>>> WARNING:root:Rename not successful: >>>>>>>> hdfs://beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da245/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2 >>>>>>>> -> hdfs://counts2-00000-of-00001, libhdfs error in renaming >>>>>>>> hdfs://beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da245/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2 >>>>>>>> to hdfs://counts2-00000-of-00001 with exceptions Unable to rename >>>>>>>> '/beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da245/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2' >>>>>>>> to '/counts2-00000-of-00001'. >>>>>>>> >>>>>>>> For GCS and local files there are no rename errors (in this case), >>>>>>>> since the rename operation silently overwrites existing destination >>>>>>>> files. >>>>>>>> However, blindly ignoring these errors might make the pipeline to >>>>>>>> report >>>>>>>> success even though output files are missing. >>>>>>>> >>>>>>>> 2. Output files (--ouput) overwrite existing files. >>>>>>>> >>>>>>>> 3. The Python SDK doesn't use Filesystems.copy(). The Java SDK >>>>>>>> doesn't use Filesystem.Rename(). >>>>>>>> >>>>>>>> Thanks, >>>>>>>> - Udi >>>>>>>> >>>>>>> >>>>>>> >>>>
smime.p7s
Description: S/MIME Cryptographic Signature
