For GCS, I would do what I believe we already do.
rename(src, dst):
- if !exists(src) and exists(dst) return 0
- if !exists(src) and !exists(dst) return error
- if exists(src) and exists(dst) { if checksum(src) == checksum(dst) return
0 else delete(dst) }
- Start a GCS copy from src to dst.
- Wait for GCS copy to complete.
- delete(src)

For filesystems that don't have checksum() metadata, size() can be used
instead.

I've opened a bug to track this:
https://issues.apache.org/jira/browse/BEAM-3600

On Thu, Feb 1, 2018 at 2:25 PM Eugene Kirpichov <[email protected]>
wrote:

> Yes, IGNORE_MISSING_FILES is unsafe because it will - well - ignore files
> that are missing for more ominous reasons than just this being a non-first
> attempt at renaming src to dst. E.g. if there was a bug in constructing the
> filename to be renamed, or if we somehow messed up the order of rename vs
> cleanup, etc. - these situations with IGNORE_MISSING_FILES would lead to
> silent data loss (likely caught by unit tests though - so this is not a
> super serious issue).
>
> Basically I just can't think of a case when I was copying files and
> thinking "oh man, I wish it didn't give an error if the stuff I'm copying
> doesn't exist" - the option exists only because we couldn't come up with
> another way to implement idempotent rename on GCS.
>
> What's your idea of how a safe retryable GCS rename() could work?
>
> On Wed, Jan 31, 2018 at 6:45 PM Udi Meiri <[email protected]> wrote:
>
>> Eugene, if I get this right, you're saying that IGNORE_MISSING_FILES is
>> unsafe because it will skip (src, dst) pairs where neither exist? (it only
>> looks if src exists)
>>
>> For GCS, we can construct a safe retryable rename() operation, assuming
>> that copy() and delete() are atomic for a single file or pair of files.
>>
>>
>>
>> On Wed, Jan 31, 2018 at 4:00 PM Raghu Angadi <[email protected]> wrote:
>>
>>> On Wed, Jan 31, 2018 at 2:43 PM, Eugene Kirpichov <[email protected]>
>>> wrote:
>>>
>>>> As far as I know, the current implementation of file sinks is the only
>>>> reason why the flag IGNORE_MISSING for copying even exists - there's no
>>>> other compelling reason to justify it. We implement "rename" as "copy, then
>>>> delete" (in a single DoFn), so for idempodency of this operation we need to
>>>> ignore the copying of a non-existent file.
>>>>
>>>> I think the right way to go would be to change the implementation of
>>>> renaming to have a @RequiresStableInput (or reshuffle) in the middle, so
>>>> it's made of 2 individually idempotent operations:
>>>> 1) copy, which would fail if input is missing, and would overwrite
>>>> output if it exists
>>>> -- reshuffle --
>>>> 2) delete, which would not fail if input is missing.
>>>>
>>>
>>> Something like this is needed only in streaming, right?
>>>
>>> Raghu.
>>>
>>>
>>>> That way first everything is copied (possibly via multiple attempts),
>>>> and then old files are deleted (possibly via multiple attempts).
>>>>
>>>> On Wed, Jan 31, 2018 at 2:26 PM Udi Meiri <[email protected]> wrote:
>>>>
>>>>> I agree that overwriting is more in line with user expectations.
>>>>> I believe that the sink should not ignore errors from the filesystem
>>>>> layer. Instead, the FileSystem API should be more well defined.
>>>>> Examples: rename() and copy() should overwrite existing files at the
>>>>> destination, copy() should have an ignore_missing flag.
>>>>>
>>>>> On Wed, Jan 31, 2018 at 1:49 PM Raghu Angadi <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Original mail mentions that output from second run of word_count is
>>>>>> ignored. That does not seem as safe as ignoring error from a second 
>>>>>> attempt
>>>>>> of a step. How do we know second run didn't run on different output?
>>>>>> Overwriting seems more accurate than ignoring. Does handling this error 
>>>>>> at
>>>>>> sink level distinguish between the two (another run vs second attempt)?
>>>>>>
>>>>>>
>>>>>> On Wed, Jan 31, 2018 at 12:32 PM, Udi Meiri <[email protected]> wrote:
>>>>>>
>>>>>>> Yeah, another round of refactoring is due to move the rename via
>>>>>>> copy+delete logic up to the file-based sink level.
>>>>>>>
>>>>>>> On Wed, Jan 31, 2018, 10:42 Chamikara Jayalath <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Good point. There's always the chance of step that performs final
>>>>>>>> rename being retried. So we'll have to ignore this error at the sink 
>>>>>>>> level.
>>>>>>>> We don't necessarily have to do this at the FileSystem level though. I
>>>>>>>> think the proper behavior might be to raise an error for the rename at 
>>>>>>>> the
>>>>>>>> FileSystem level if the destination already exists (or source doesn't
>>>>>>>> exist) while ignoring that error (and possibly logging a warning) at 
>>>>>>>> the
>>>>>>>> sink level.
>>>>>>>>
>>>>>>>> - Cham
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Jan 30, 2018 at 6:47 PM Reuven Lax <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I think the idea was to ignore "already exists" errors. The reason
>>>>>>>>> being that any step in Beam can be executed multiple times, including 
>>>>>>>>> the
>>>>>>>>> rename step. If the rename step gets run twice, the second run should
>>>>>>>>> succeed vacuously.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Jan 30, 2018 at 6:19 PM, Udi Meiri <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>> I've been working on HDFS code for the Python SDK and I've
>>>>>>>>>> noticed some behaviors which are surprising. I wanted to know if 
>>>>>>>>>> these
>>>>>>>>>> behaviors are known and intended.
>>>>>>>>>>
>>>>>>>>>> 1. When renaming files during finalize_write, rename errors are
>>>>>>>>>> ignored
>>>>>>>>>> <https://github.com/apache/beam/blob/3aa2bef87c93d2844dd7c8dbaf45db75ec607792/sdks/python/apache_beam/io/filebasedsink.py#L232>.
>>>>>>>>>> For example, if I run wordcount twice using HDFS code I get a 
>>>>>>>>>> warning the
>>>>>>>>>> second time because the file already exists:
>>>>>>>>>>
>>>>>>>>>> WARNING:root:Rename not successful:
>>>>>>>>>> hdfs://beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da245/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2
>>>>>>>>>> -> hdfs://counts2-00000-of-00001, libhdfs error in renaming
>>>>>>>>>> hdfs://beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da245/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2
>>>>>>>>>> to hdfs://counts2-00000-of-00001 with exceptions Unable to rename
>>>>>>>>>> '/beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da245/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2'
>>>>>>>>>> to '/counts2-00000-of-00001'.
>>>>>>>>>>
>>>>>>>>>> For GCS and local files there are no rename errors (in this
>>>>>>>>>> case), since the rename operation silently overwrites existing 
>>>>>>>>>> destination
>>>>>>>>>> files. However, blindly ignoring these errors might make the 
>>>>>>>>>> pipeline to
>>>>>>>>>> report success even though output files are missing.
>>>>>>>>>>
>>>>>>>>>> 2. Output files (--ouput) overwrite existing files.
>>>>>>>>>>
>>>>>>>>>> 3. The Python SDK doesn't use Filesystems.copy(). The Java SDK
>>>>>>>>>> doesn't use Filesystem.Rename().
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> - Udi
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>

Attachment: smime.p7s
Description: S/MIME Cryptographic Signature

Reply via email to