However this code might run in streaming as well, right?

On Fri, Feb 2, 2018 at 9:54 AM, Raghu Angadi <rang...@google.com> wrote:

> In a batch pipeline, is it considered a data loss if the the stage fails
> (assuming it does not set IGNORE_MISSING_FILES and fails hard)? If not, it
> might be better to favor correctness and fail in current implementation.
>
>
> On Thu, Feb 1, 2018 at 4:07 PM, Robert Bradshaw <rober...@google.com>
> wrote:
>
>> You could add a step to delete all of dest before a barrier and the
>> step that does the rename as outlined. In that case, any dest file
>> that exists must be good.
>>
>> On Thu, Feb 1, 2018 at 2:52 PM, Eugene Kirpichov <kirpic...@google.com>
>> wrote:
>> > I think this is still unsafe in case exists(dst) (e.g. this is a re-run
>> of a
>> > pipeline) but src is missing due to some bad reason. However it's
>> probably
>> > better than what we have (e.g. we currently certainly don't perform
>> checksum
>> > checks).
>> >
>> > On Thu, Feb 1, 2018 at 2:45 PM Udi Meiri <eh...@google.com> wrote:
>> >>
>> >> For GCS, I would do what I believe we already do.
>> >> rename(src, dst):
>> >> - if !exists(src) and exists(dst) return 0
>> >> - if !exists(src) and !exists(dst) return error
>> >> - if exists(src) and exists(dst) { if checksum(src) == checksum(dst)
>> >> return 0 else delete(dst) }
>> >> - Start a GCS copy from src to dst.
>> >> - Wait for GCS copy to complete.
>> >> - delete(src)
>> >>
>> >> For filesystems that don't have checksum() metadata, size() can be used
>> >> instead.
>> >>
>> >> I've opened a bug to track this:
>> >> https://issues.apache.org/jira/browse/BEAM-3600
>> >>
>> >> On Thu, Feb 1, 2018 at 2:25 PM Eugene Kirpichov <kirpic...@google.com>
>> >> wrote:
>> >>>
>> >>> Yes, IGNORE_MISSING_FILES is unsafe because it will - well - ignore
>> files
>> >>> that are missing for more ominous reasons than just this being a
>> non-first
>> >>> attempt at renaming src to dst. E.g. if there was a bug in
>> constructing the
>> >>> filename to be renamed, or if we somehow messed up the order of
>> rename vs
>> >>> cleanup, etc. - these situations with IGNORE_MISSING_FILES would lead
>> to
>> >>> silent data loss (likely caught by unit tests though - so this is not
>> a
>> >>> super serious issue).
>> >>>
>> >>> Basically I just can't think of a case when I was copying files and
>> >>> thinking "oh man, I wish it didn't give an error if the stuff I'm
>> copying
>> >>> doesn't exist" - the option exists only because we couldn't come up
>> with
>> >>> another way to implement idempotent rename on GCS.
>> >>>
>> >>> What's your idea of how a safe retryable GCS rename() could work?
>> >>>
>> >>> On Wed, Jan 31, 2018 at 6:45 PM Udi Meiri <eh...@google.com> wrote:
>> >>>>
>> >>>> Eugene, if I get this right, you're saying that IGNORE_MISSING_FILES
>> is
>> >>>> unsafe because it will skip (src, dst) pairs where neither exist?
>> (it only
>> >>>> looks if src exists)
>> >>>>
>> >>>> For GCS, we can construct a safe retryable rename() operation,
>> assuming
>> >>>> that copy() and delete() are atomic for a single file or pair of
>> files.
>> >>>>
>> >>>>
>> >>>>
>> >>>> On Wed, Jan 31, 2018 at 4:00 PM Raghu Angadi <rang...@google.com>
>> wrote:
>> >>>>>
>> >>>>> On Wed, Jan 31, 2018 at 2:43 PM, Eugene Kirpichov
>> >>>>> <kirpic...@google.com> wrote:
>> >>>>>>
>> >>>>>> As far as I know, the current implementation of file sinks is the
>> only
>> >>>>>> reason why the flag IGNORE_MISSING for copying even exists -
>> there's no
>> >>>>>> other compelling reason to justify it. We implement "rename" as
>> "copy, then
>> >>>>>> delete" (in a single DoFn), so for idempodency of this operation
>> we need to
>> >>>>>> ignore the copying of a non-existent file.
>> >>>>>>
>> >>>>>> I think the right way to go would be to change the implementation
>> of
>> >>>>>> renaming to have a @RequiresStableInput (or reshuffle) in the
>> middle, so
>> >>>>>> it's made of 2 individually idempotent operations:
>> >>>>>> 1) copy, which would fail if input is missing, and would overwrite
>> >>>>>> output if it exists
>> >>>>>> -- reshuffle --
>> >>>>>> 2) delete, which would not fail if input is missing.
>> >>>>>
>> >>>>>
>> >>>>> Something like this is needed only in streaming, right?
>> >>>>>
>> >>>>> Raghu.
>> >>>>>
>> >>>>>>
>> >>>>>> That way first everything is copied (possibly via multiple
>> attempts),
>> >>>>>> and then old files are deleted (possibly via multiple attempts).
>> >>>>>>
>> >>>>>> On Wed, Jan 31, 2018 at 2:26 PM Udi Meiri <eh...@google.com>
>> wrote:
>> >>>>>>>
>> >>>>>>> I agree that overwriting is more in line with user expectations.
>> >>>>>>> I believe that the sink should not ignore errors from the
>> filesystem
>> >>>>>>> layer. Instead, the FileSystem API should be more well defined.
>> >>>>>>> Examples: rename() and copy() should overwrite existing files at
>> the
>> >>>>>>> destination, copy() should have an ignore_missing flag.
>> >>>>>>>
>> >>>>>>> On Wed, Jan 31, 2018 at 1:49 PM Raghu Angadi <rang...@google.com>
>> >>>>>>> wrote:
>> >>>>>>>>
>> >>>>>>>> Original mail mentions that output from second run of word_count
>> is
>> >>>>>>>> ignored. That does not seem as safe as ignoring error from a
>> second attempt
>> >>>>>>>> of a step. How do we know second run didn't run on different
>> output?
>> >>>>>>>> Overwriting seems more accurate than ignoring. Does handling
>> this error at
>> >>>>>>>> sink level distinguish between the two (another run vs second
>> attempt)?
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> On Wed, Jan 31, 2018 at 12:32 PM, Udi Meiri <eh...@google.com>
>> >>>>>>>> wrote:
>> >>>>>>>>>
>> >>>>>>>>> Yeah, another round of refactoring is due to move the rename via
>> >>>>>>>>> copy+delete logic up to the file-based sink level.
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> On Wed, Jan 31, 2018, 10:42 Chamikara Jayalath
>> >>>>>>>>> <chamik...@google.com> wrote:
>> >>>>>>>>>>
>> >>>>>>>>>> Good point. There's always the chance of step that performs
>> final
>> >>>>>>>>>> rename being retried. So we'll have to ignore this error at
>> the sink level.
>> >>>>>>>>>> We don't necessarily have to do this at the FileSystem level
>> though. I think
>> >>>>>>>>>> the proper behavior might be to raise an error for the rename
>> at the
>> >>>>>>>>>> FileSystem level if the destination already exists (or source
>> doesn't exist)
>> >>>>>>>>>> while ignoring that error (and possibly logging a warning) at
>> the sink
>> >>>>>>>>>> level.
>> >>>>>>>>>>
>> >>>>>>>>>> - Cham
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>> On Tue, Jan 30, 2018 at 6:47 PM Reuven Lax <re...@google.com>
>> >>>>>>>>>> wrote:
>> >>>>>>>>>>>
>> >>>>>>>>>>> I think the idea was to ignore "already exists" errors. The
>> >>>>>>>>>>> reason being that any step in Beam can be executed multiple
>> times, including
>> >>>>>>>>>>> the rename step. If the rename step gets run twice, the
>> second run should
>> >>>>>>>>>>> succeed vacuously.
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>> On Tue, Jan 30, 2018 at 6:19 PM, Udi Meiri <eh...@google.com>
>> >>>>>>>>>>> wrote:
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> Hi,
>> >>>>>>>>>>>> I've been working on HDFS code for the Python SDK and I've
>> >>>>>>>>>>>> noticed some behaviors which are surprising. I wanted to
>> know if these
>> >>>>>>>>>>>> behaviors are known and intended.
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> 1. When renaming files during finalize_write, rename errors
>> are
>> >>>>>>>>>>>> ignored. For example, if I run wordcount twice using HDFS
>> code I get a
>> >>>>>>>>>>>> warning the second time because the file already exists:
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> WARNING:root:Rename not successful:
>> >>>>>>>>>>>> hdfs://beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da245/
>> 1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2
>> >>>>>>>>>>>> -> hdfs://counts2-00000-of-00001, libhdfs error in renaming
>> >>>>>>>>>>>> hdfs://beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da245/
>> 1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2
>> >>>>>>>>>>>> to hdfs://counts2-00000-of-00001 with exceptions Unable to
>> rename
>> >>>>>>>>>>>> '/beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da245/
>> 1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2'
>> >>>>>>>>>>>> to '/counts2-00000-of-00001'.
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> For GCS and local files there are no rename errors (in this
>> >>>>>>>>>>>> case), since the rename operation silently overwrites
>> existing destination
>> >>>>>>>>>>>> files. However, blindly ignoring these errors might make the
>> pipeline to
>> >>>>>>>>>>>> report success even though output files are missing.
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> 2. Output files (--ouput) overwrite existing files.
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> 3. The Python SDK doesn't use Filesystems.copy(). The Java
>> SDK
>> >>>>>>>>>>>> doesn't use Filesystem.Rename().
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> Thanks,
>> >>>>>>>>>>>> - Udi
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>
>> >
>>
>
>

Reply via email to