You could add a step to delete all of dest before a barrier and the
step that does the rename as outlined. In that case, any dest file
that exists must be good.

On Thu, Feb 1, 2018 at 2:52 PM, Eugene Kirpichov <[email protected]> wrote:
> I think this is still unsafe in case exists(dst) (e.g. this is a re-run of a
> pipeline) but src is missing due to some bad reason. However it's probably
> better than what we have (e.g. we currently certainly don't perform checksum
> checks).
>
> On Thu, Feb 1, 2018 at 2:45 PM Udi Meiri <[email protected]> wrote:
>>
>> For GCS, I would do what I believe we already do.
>> rename(src, dst):
>> - if !exists(src) and exists(dst) return 0
>> - if !exists(src) and !exists(dst) return error
>> - if exists(src) and exists(dst) { if checksum(src) == checksum(dst)
>> return 0 else delete(dst) }
>> - Start a GCS copy from src to dst.
>> - Wait for GCS copy to complete.
>> - delete(src)
>>
>> For filesystems that don't have checksum() metadata, size() can be used
>> instead.
>>
>> I've opened a bug to track this:
>> https://issues.apache.org/jira/browse/BEAM-3600
>>
>> On Thu, Feb 1, 2018 at 2:25 PM Eugene Kirpichov <[email protected]>
>> wrote:
>>>
>>> Yes, IGNORE_MISSING_FILES is unsafe because it will - well - ignore files
>>> that are missing for more ominous reasons than just this being a non-first
>>> attempt at renaming src to dst. E.g. if there was a bug in constructing the
>>> filename to be renamed, or if we somehow messed up the order of rename vs
>>> cleanup, etc. - these situations with IGNORE_MISSING_FILES would lead to
>>> silent data loss (likely caught by unit tests though - so this is not a
>>> super serious issue).
>>>
>>> Basically I just can't think of a case when I was copying files and
>>> thinking "oh man, I wish it didn't give an error if the stuff I'm copying
>>> doesn't exist" - the option exists only because we couldn't come up with
>>> another way to implement idempotent rename on GCS.
>>>
>>> What's your idea of how a safe retryable GCS rename() could work?
>>>
>>> On Wed, Jan 31, 2018 at 6:45 PM Udi Meiri <[email protected]> wrote:
>>>>
>>>> Eugene, if I get this right, you're saying that IGNORE_MISSING_FILES is
>>>> unsafe because it will skip (src, dst) pairs where neither exist? (it only
>>>> looks if src exists)
>>>>
>>>> For GCS, we can construct a safe retryable rename() operation, assuming
>>>> that copy() and delete() are atomic for a single file or pair of files.
>>>>
>>>>
>>>>
>>>> On Wed, Jan 31, 2018 at 4:00 PM Raghu Angadi <[email protected]> wrote:
>>>>>
>>>>> On Wed, Jan 31, 2018 at 2:43 PM, Eugene Kirpichov
>>>>> <[email protected]> wrote:
>>>>>>
>>>>>> As far as I know, the current implementation of file sinks is the only
>>>>>> reason why the flag IGNORE_MISSING for copying even exists - there's no
>>>>>> other compelling reason to justify it. We implement "rename" as "copy, 
>>>>>> then
>>>>>> delete" (in a single DoFn), so for idempodency of this operation we need 
>>>>>> to
>>>>>> ignore the copying of a non-existent file.
>>>>>>
>>>>>> I think the right way to go would be to change the implementation of
>>>>>> renaming to have a @RequiresStableInput (or reshuffle) in the middle, so
>>>>>> it's made of 2 individually idempotent operations:
>>>>>> 1) copy, which would fail if input is missing, and would overwrite
>>>>>> output if it exists
>>>>>> -- reshuffle --
>>>>>> 2) delete, which would not fail if input is missing.
>>>>>
>>>>>
>>>>> Something like this is needed only in streaming, right?
>>>>>
>>>>> Raghu.
>>>>>
>>>>>>
>>>>>> That way first everything is copied (possibly via multiple attempts),
>>>>>> and then old files are deleted (possibly via multiple attempts).
>>>>>>
>>>>>> On Wed, Jan 31, 2018 at 2:26 PM Udi Meiri <[email protected]> wrote:
>>>>>>>
>>>>>>> I agree that overwriting is more in line with user expectations.
>>>>>>> I believe that the sink should not ignore errors from the filesystem
>>>>>>> layer. Instead, the FileSystem API should be more well defined.
>>>>>>> Examples: rename() and copy() should overwrite existing files at the
>>>>>>> destination, copy() should have an ignore_missing flag.
>>>>>>>
>>>>>>> On Wed, Jan 31, 2018 at 1:49 PM Raghu Angadi <[email protected]>
>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Original mail mentions that output from second run of word_count is
>>>>>>>> ignored. That does not seem as safe as ignoring error from a second 
>>>>>>>> attempt
>>>>>>>> of a step. How do we know second run didn't run on different output?
>>>>>>>> Overwriting seems more accurate than ignoring. Does handling this 
>>>>>>>> error at
>>>>>>>> sink level distinguish between the two (another run vs second attempt)?
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Jan 31, 2018 at 12:32 PM, Udi Meiri <[email protected]>
>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> Yeah, another round of refactoring is due to move the rename via
>>>>>>>>> copy+delete logic up to the file-based sink level.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Jan 31, 2018, 10:42 Chamikara Jayalath
>>>>>>>>> <[email protected]> wrote:
>>>>>>>>>>
>>>>>>>>>> Good point. There's always the chance of step that performs final
>>>>>>>>>> rename being retried. So we'll have to ignore this error at the sink 
>>>>>>>>>> level.
>>>>>>>>>> We don't necessarily have to do this at the FileSystem level though. 
>>>>>>>>>> I think
>>>>>>>>>> the proper behavior might be to raise an error for the rename at the
>>>>>>>>>> FileSystem level if the destination already exists (or source 
>>>>>>>>>> doesn't exist)
>>>>>>>>>> while ignoring that error (and possibly logging a warning) at the 
>>>>>>>>>> sink
>>>>>>>>>> level.
>>>>>>>>>>
>>>>>>>>>> - Cham
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Tue, Jan 30, 2018 at 6:47 PM Reuven Lax <[email protected]>
>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> I think the idea was to ignore "already exists" errors. The
>>>>>>>>>>> reason being that any step in Beam can be executed multiple times, 
>>>>>>>>>>> including
>>>>>>>>>>> the rename step. If the rename step gets run twice, the second run 
>>>>>>>>>>> should
>>>>>>>>>>> succeed vacuously.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Jan 30, 2018 at 6:19 PM, Udi Meiri <[email protected]>
>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>> I've been working on HDFS code for the Python SDK and I've
>>>>>>>>>>>> noticed some behaviors which are surprising. I wanted to know if 
>>>>>>>>>>>> these
>>>>>>>>>>>> behaviors are known and intended.
>>>>>>>>>>>>
>>>>>>>>>>>> 1. When renaming files during finalize_write, rename errors are
>>>>>>>>>>>> ignored. For example, if I run wordcount twice using HDFS code I 
>>>>>>>>>>>> get a
>>>>>>>>>>>> warning the second time because the file already exists:
>>>>>>>>>>>>
>>>>>>>>>>>> WARNING:root:Rename not successful:
>>>>>>>>>>>> hdfs://beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da245/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2
>>>>>>>>>>>> -> hdfs://counts2-00000-of-00001, libhdfs error in renaming
>>>>>>>>>>>> hdfs://beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da245/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2
>>>>>>>>>>>> to hdfs://counts2-00000-of-00001 with exceptions Unable to rename
>>>>>>>>>>>> '/beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da245/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2'
>>>>>>>>>>>> to '/counts2-00000-of-00001'.
>>>>>>>>>>>>
>>>>>>>>>>>> For GCS and local files there are no rename errors (in this
>>>>>>>>>>>> case), since the rename operation silently overwrites existing 
>>>>>>>>>>>> destination
>>>>>>>>>>>> files. However, blindly ignoring these errors might make the 
>>>>>>>>>>>> pipeline to
>>>>>>>>>>>> report success even though output files are missing.
>>>>>>>>>>>>
>>>>>>>>>>>> 2. Output files (--ouput) overwrite existing files.
>>>>>>>>>>>>
>>>>>>>>>>>> 3. The Python SDK doesn't use Filesystems.copy(). The Java SDK
>>>>>>>>>>>> doesn't use Filesystem.Rename().
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> - Udi
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>
>

Reply via email to