Agree with what Robert said. We have a rename() operation in the FileSystem abstraction and some file-systems might be able to implement this more efficiently than copy+delete. Also note that the same issue could arise in any other usage of rename operation. So I agree that a retry-tolerant version of rename will be useful. Note that we can do this without making all FileSystem.rename() implementations unsafe. For example, in Java, IGNORE_MISSING_FILES options is implemented by filtering out non-existing files in FileSystems.rename() before invoking FileSystem.rename().
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java#L316 - Cham On Wed, Jan 31, 2018 at 3:14 PM Robert Bradshaw <rober...@google.com> wrote: > For very large filesets, it may be too much to assume that the copy > succeed in its entirety on the first try. (I suppose we could chunk > copies into individual retryable bundles, but this may not respect the > filesystem's default chunking/bulk operations.) The other downside of > copying entirely before any deletion is that unless the filesystem is > smart about copies, it may double the required intermediate storage > size (v.s. deleting once a particular shard has been copied). Also, > some filesystems may support rename (even bulk rename) that's cheaper > than copy + delete. For these reasons I think a (optionally > retry-tolerant) bulk rename makes sense as an operation on the > filesystem API rather than implemented as a composite operation built > on lower-level filesystem primitives. > > On Wed, Jan 31, 2018 at 2:43 PM, Eugene Kirpichov <kirpic...@google.com> > wrote: > > As far as I know, the current implementation of file sinks is the only > > reason why the flag IGNORE_MISSING for copying even exists - there's no > > other compelling reason to justify it. We implement "rename" as "copy, > then > > delete" (in a single DoFn), so for idempodency of this operation we need > to > > ignore the copying of a non-existent file. > > > > I think the right way to go would be to change the implementation of > > renaming to have a @RequiresStableInput (or reshuffle) in the middle, so > > it's made of 2 individually idempotent operations: > > 1) copy, which would fail if input is missing, and would overwrite > output if > > it exists > > -- reshuffle -- > > 2) delete, which would not fail if input is missing. > > > > That way first everything is copied (possibly via multiple attempts), and > > then old files are deleted (possibly via multiple attempts). > > > > On Wed, Jan 31, 2018 at 2:26 PM Udi Meiri <eh...@google.com> wrote: > >> > >> I agree that overwriting is more in line with user expectations. > >> I believe that the sink should not ignore errors from the filesystem > >> layer. Instead, the FileSystem API should be more well defined. > >> Examples: rename() and copy() should overwrite existing files at the > >> destination, copy() should have an ignore_missing flag. > >> > >> On Wed, Jan 31, 2018 at 1:49 PM Raghu Angadi <rang...@google.com> > wrote: > >>> > >>> Original mail mentions that output from second run of word_count is > >>> ignored. That does not seem as safe as ignoring error from a second > attempt > >>> of a step. How do we know second run didn't run on different output? > >>> Overwriting seems more accurate than ignoring. Does handling this > error at > >>> sink level distinguish between the two (another run vs second attempt)? > >>> > >>> > >>> On Wed, Jan 31, 2018 at 12:32 PM, Udi Meiri <eh...@google.com> wrote: > >>>> > >>>> Yeah, another round of refactoring is due to move the rename via > >>>> copy+delete logic up to the file-based sink level. > >>>> > >>>> > >>>> On Wed, Jan 31, 2018, 10:42 Chamikara Jayalath <chamik...@google.com> > >>>> wrote: > >>>>> > >>>>> Good point. There's always the chance of step that performs final > >>>>> rename being retried. So we'll have to ignore this error at the sink > level. > >>>>> We don't necessarily have to do this at the FileSystem level though. > I think > >>>>> the proper behavior might be to raise an error for the rename at the > >>>>> FileSystem level if the destination already exists (or source > doesn't exist) > >>>>> while ignoring that error (and possibly logging a warning) at the > sink > >>>>> level. > >>>>> > >>>>> - Cham > >>>>> > >>>>> > >>>>> On Tue, Jan 30, 2018 at 6:47 PM Reuven Lax <re...@google.com> wrote: > >>>>>> > >>>>>> I think the idea was to ignore "already exists" errors. The reason > >>>>>> being that any step in Beam can be executed multiple times, > including the > >>>>>> rename step. If the rename step gets run twice, the second run > should > >>>>>> succeed vacuously. > >>>>>> > >>>>>> > >>>>>> On Tue, Jan 30, 2018 at 6:19 PM, Udi Meiri <eh...@google.com> > wrote: > >>>>>>> > >>>>>>> Hi, > >>>>>>> I've been working on HDFS code for the Python SDK and I've noticed > >>>>>>> some behaviors which are surprising. I wanted to know if these > behaviors are > >>>>>>> known and intended. > >>>>>>> > >>>>>>> 1. When renaming files during finalize_write, rename errors are > >>>>>>> ignored. For example, if I run wordcount twice using HDFS code I > get a > >>>>>>> warning the second time because the file already exists: > >>>>>>> > >>>>>>> WARNING:root:Rename not successful: > >>>>>>> > hdfs://beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da245/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2 > >>>>>>> -> hdfs://counts2-00000-of-00001, libhdfs error in renaming > >>>>>>> > hdfs://beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da245/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2 > >>>>>>> to hdfs://counts2-00000-of-00001 with exceptions Unable to rename > >>>>>>> > '/beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da245/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2' > >>>>>>> to '/counts2-00000-of-00001'. > >>>>>>> > >>>>>>> For GCS and local files there are no rename errors (in this case), > >>>>>>> since the rename operation silently overwrites existing > destination files. > >>>>>>> However, blindly ignoring these errors might make the pipeline to > report > >>>>>>> success even though output files are missing. > >>>>>>> > >>>>>>> 2. Output files (--ouput) overwrite existing files. > >>>>>>> > >>>>>>> 3. The Python SDK doesn't use Filesystems.copy(). The Java SDK > >>>>>>> doesn't use Filesystem.Rename(). > >>>>>>> > >>>>>>> Thanks, > >>>>>>> - Udi > >>>>>> > >>>>>> > >>> > > >