I agree that using an atomic rename operation is even better. I'm mainly
opposed to having a copy option that ignores missing files, and to our
implementation of rename using that option, because it's unsafe.
Unfortunately GCS doesn't have an atomic rename, so I'm not sure what's the
best way to go for GCS without introducing the unsafe operations.

On Wed, Jan 31, 2018, 3:39 PM Chamikara Jayalath <chamik...@google.com>
wrote:

> Agree with what Robert said. We have a rename() operation in the
> FileSystem abstraction and some file-systems might be able to implement
> this more efficiently than copy+delete. Also note that the same issue could
> arise in any other usage of rename operation. So I agree that a
> retry-tolerant version of rename will be useful. Note that we can do this
> without making all FileSystem.rename() implementations unsafe. For example,
> in Java, IGNORE_MISSING_FILES options is implemented by filtering out
> non-existing files in FileSystems.rename() before invoking
> FileSystem.rename().
>
>
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java#L316
>
> - Cham
>
>
> On Wed, Jan 31, 2018 at 3:14 PM Robert Bradshaw <rober...@google.com>
> wrote:
>
>> For very large filesets, it may be too much to assume that the copy
>> succeed in its entirety on the first try. (I suppose we could chunk
>> copies into individual retryable bundles, but this may not respect the
>> filesystem's default chunking/bulk operations.) The other downside of
>> copying entirely before any deletion is that unless the filesystem is
>> smart about copies, it may double the required intermediate storage
>> size (v.s. deleting once a particular shard has been copied). Also,
>> some filesystems may support rename (even bulk rename) that's cheaper
>> than copy + delete. For these reasons I think a (optionally
>> retry-tolerant) bulk rename makes sense as an operation on the
>> filesystem API rather than implemented as a composite operation built
>> on lower-level filesystem primitives.
>>
>> On Wed, Jan 31, 2018 at 2:43 PM, Eugene Kirpichov <kirpic...@google.com>
>> wrote:
>> > As far as I know, the current implementation of file sinks is the only
>> > reason why the flag IGNORE_MISSING for copying even exists - there's no
>> > other compelling reason to justify it. We implement "rename" as "copy,
>> then
>> > delete" (in a single DoFn), so for idempodency of this operation we
>> need to
>> > ignore the copying of a non-existent file.
>> >
>> > I think the right way to go would be to change the implementation of
>> > renaming to have a @RequiresStableInput (or reshuffle) in the middle, so
>> > it's made of 2 individually idempotent operations:
>> > 1) copy, which would fail if input is missing, and would overwrite
>> output if
>> > it exists
>> > -- reshuffle --
>> > 2) delete, which would not fail if input is missing.
>> >
>> > That way first everything is copied (possibly via multiple attempts),
>> and
>> > then old files are deleted (possibly via multiple attempts).
>> >
>> > On Wed, Jan 31, 2018 at 2:26 PM Udi Meiri <eh...@google.com> wrote:
>> >>
>> >> I agree that overwriting is more in line with user expectations.
>> >> I believe that the sink should not ignore errors from the filesystem
>> >> layer. Instead, the FileSystem API should be more well defined.
>> >> Examples: rename() and copy() should overwrite existing files at the
>> >> destination, copy() should have an ignore_missing flag.
>> >>
>> >> On Wed, Jan 31, 2018 at 1:49 PM Raghu Angadi <rang...@google.com>
>> wrote:
>> >>>
>> >>> Original mail mentions that output from second run of word_count is
>> >>> ignored. That does not seem as safe as ignoring error from a second
>> attempt
>> >>> of a step. How do we know second run didn't run on different output?
>> >>> Overwriting seems more accurate than ignoring. Does handling this
>> error at
>> >>> sink level distinguish between the two (another run vs second
>> attempt)?
>> >>>
>> >>>
>> >>> On Wed, Jan 31, 2018 at 12:32 PM, Udi Meiri <eh...@google.com> wrote:
>> >>>>
>> >>>> Yeah, another round of refactoring is due to move the rename via
>> >>>> copy+delete logic up to the file-based sink level.
>> >>>>
>> >>>>
>> >>>> On Wed, Jan 31, 2018, 10:42 Chamikara Jayalath <chamik...@google.com
>> >
>> >>>> wrote:
>> >>>>>
>> >>>>> Good point. There's always the chance of step that performs final
>> >>>>> rename being retried. So we'll have to ignore this error at the
>> sink level.
>> >>>>> We don't necessarily have to do this at the FileSystem level
>> though. I think
>> >>>>> the proper behavior might be to raise an error for the rename at the
>> >>>>> FileSystem level if the destination already exists (or source
>> doesn't exist)
>> >>>>> while ignoring that error (and possibly logging a warning) at the
>> sink
>> >>>>> level.
>> >>>>>
>> >>>>> - Cham
>> >>>>>
>> >>>>>
>> >>>>> On Tue, Jan 30, 2018 at 6:47 PM Reuven Lax <re...@google.com>
>> wrote:
>> >>>>>>
>> >>>>>> I think the idea was to ignore "already exists" errors. The reason
>> >>>>>> being that any step in Beam can be executed multiple times,
>> including the
>> >>>>>> rename step. If the rename step gets run twice, the second run
>> should
>> >>>>>> succeed vacuously.
>> >>>>>>
>> >>>>>>
>> >>>>>> On Tue, Jan 30, 2018 at 6:19 PM, Udi Meiri <eh...@google.com>
>> wrote:
>> >>>>>>>
>> >>>>>>> Hi,
>> >>>>>>> I've been working on HDFS code for the Python SDK and I've noticed
>> >>>>>>> some behaviors which are surprising. I wanted to know if these
>> behaviors are
>> >>>>>>> known and intended.
>> >>>>>>>
>> >>>>>>> 1. When renaming files during finalize_write, rename errors are
>> >>>>>>> ignored. For example, if I run wordcount twice using HDFS code I
>> get a
>> >>>>>>> warning the second time because the file already exists:
>> >>>>>>>
>> >>>>>>> WARNING:root:Rename not successful:
>> >>>>>>>
>> hdfs://beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da245/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2
>> >>>>>>> -> hdfs://counts2-00000-of-00001, libhdfs error in renaming
>> >>>>>>>
>> hdfs://beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da245/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2
>> >>>>>>> to hdfs://counts2-00000-of-00001 with exceptions Unable to rename
>> >>>>>>>
>> '/beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da245/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2'
>> >>>>>>> to '/counts2-00000-of-00001'.
>> >>>>>>>
>> >>>>>>> For GCS and local files there are no rename errors (in this case),
>> >>>>>>> since the rename operation silently overwrites existing
>> destination files.
>> >>>>>>> However, blindly ignoring these errors might make the pipeline to
>> report
>> >>>>>>> success even though output files are missing.
>> >>>>>>>
>> >>>>>>> 2. Output files (--ouput) overwrite existing files.
>> >>>>>>>
>> >>>>>>> 3. The Python SDK doesn't use Filesystems.copy(). The Java SDK
>> >>>>>>> doesn't use Filesystem.Rename().
>> >>>>>>>
>> >>>>>>> Thanks,
>> >>>>>>> - Udi
>> >>>>>>
>> >>>>>>
>> >>>
>> >
>>
>

Reply via email to