On Fri, Feb 2, 2018 at 11:17 AM, Chamikara Jayalath <chamik...@google.com>
wrote:

> Currently, Python file-based sink is batch only.
>

Sure, but that won't be true forever.


>
> Regarding Raghu's question, stage/pipeline failure should not be
> considered as a data loss but I prefer overriding existing output and
> completing a possibly expensive pipeline over failing the whole pipeline
> due to one or more existing files.
>
> - Cham
>
>
> On Fri, Feb 2, 2018 at 10:21 AM Reuven Lax <re...@google.com> wrote:
>
>> However this code might run in streaming as well, right?
>>
>> On Fri, Feb 2, 2018 at 9:54 AM, Raghu Angadi <rang...@google.com> wrote:
>>
>>> In a batch pipeline, is it considered a data loss if the the stage fails
>>> (assuming it does not set IGNORE_MISSING_FILES and fails hard)? If not, it
>>> might be better to favor correctness and fail in current implementation.
>>>
>>>
>>> On Thu, Feb 1, 2018 at 4:07 PM, Robert Bradshaw <rober...@google.com>
>>> wrote:
>>>
>>>> You could add a step to delete all of dest before a barrier and the
>>>> step that does the rename as outlined. In that case, any dest file
>>>> that exists must be good.
>>>>
>>>> On Thu, Feb 1, 2018 at 2:52 PM, Eugene Kirpichov <kirpic...@google.com>
>>>> wrote:
>>>> > I think this is still unsafe in case exists(dst) (e.g. this is a
>>>> re-run of a
>>>> > pipeline) but src is missing due to some bad reason. However it's
>>>> probably
>>>> > better than what we have (e.g. we currently certainly don't perform
>>>> checksum
>>>> > checks).
>>>> >
>>>> > On Thu, Feb 1, 2018 at 2:45 PM Udi Meiri <eh...@google.com> wrote:
>>>> >>
>>>> >> For GCS, I would do what I believe we already do.
>>>> >> rename(src, dst):
>>>> >> - if !exists(src) and exists(dst) return 0
>>>> >> - if !exists(src) and !exists(dst) return error
>>>> >> - if exists(src) and exists(dst) { if checksum(src) == checksum(dst)
>>>> >> return 0 else delete(dst) }
>>>> >> - Start a GCS copy from src to dst.
>>>> >> - Wait for GCS copy to complete.
>>>> >> - delete(src)
>>>> >>
>>>> >> For filesystems that don't have checksum() metadata, size() can be
>>>> used
>>>> >> instead.
>>>> >>
>>>> >> I've opened a bug to track this:
>>>> >> https://issues.apache.org/jira/browse/BEAM-3600
>>>> >>
>>>> >> On Thu, Feb 1, 2018 at 2:25 PM Eugene Kirpichov <
>>>> kirpic...@google.com>
>>>> >> wrote:
>>>> >>>
>>>> >>> Yes, IGNORE_MISSING_FILES is unsafe because it will - well - ignore
>>>> files
>>>> >>> that are missing for more ominous reasons than just this being a
>>>> non-first
>>>> >>> attempt at renaming src to dst. E.g. if there was a bug in
>>>> constructing the
>>>> >>> filename to be renamed, or if we somehow messed up the order of
>>>> rename vs
>>>> >>> cleanup, etc. - these situations with IGNORE_MISSING_FILES would
>>>> lead to
>>>> >>> silent data loss (likely caught by unit tests though - so this is
>>>> not a
>>>> >>> super serious issue).
>>>> >>>
>>>> >>> Basically I just can't think of a case when I was copying files and
>>>> >>> thinking "oh man, I wish it didn't give an error if the stuff I'm
>>>> copying
>>>> >>> doesn't exist" - the option exists only because we couldn't come up
>>>> with
>>>> >>> another way to implement idempotent rename on GCS.
>>>> >>>
>>>> >>> What's your idea of how a safe retryable GCS rename() could work?
>>>> >>>
>>>> >>> On Wed, Jan 31, 2018 at 6:45 PM Udi Meiri <eh...@google.com> wrote:
>>>> >>>>
>>>> >>>> Eugene, if I get this right, you're saying that
>>>> IGNORE_MISSING_FILES is
>>>> >>>> unsafe because it will skip (src, dst) pairs where neither exist?
>>>> (it only
>>>> >>>> looks if src exists)
>>>> >>>>
>>>> >>>> For GCS, we can construct a safe retryable rename() operation,
>>>> assuming
>>>> >>>> that copy() and delete() are atomic for a single file or pair of
>>>> files.
>>>> >>>>
>>>> >>>>
>>>> >>>>
>>>> >>>> On Wed, Jan 31, 2018 at 4:00 PM Raghu Angadi <rang...@google.com>
>>>> wrote:
>>>> >>>>>
>>>> >>>>> On Wed, Jan 31, 2018 at 2:43 PM, Eugene Kirpichov
>>>> >>>>> <kirpic...@google.com> wrote:
>>>> >>>>>>
>>>> >>>>>> As far as I know, the current implementation of file sinks is
>>>> the only
>>>> >>>>>> reason why the flag IGNORE_MISSING for copying even exists -
>>>> there's no
>>>> >>>>>> other compelling reason to justify it. We implement "rename" as
>>>> "copy, then
>>>> >>>>>> delete" (in a single DoFn), so for idempodency of this operation
>>>> we need to
>>>> >>>>>> ignore the copying of a non-existent file.
>>>> >>>>>>
>>>> >>>>>> I think the right way to go would be to change the
>>>> implementation of
>>>> >>>>>> renaming to have a @RequiresStableInput (or reshuffle) in the
>>>> middle, so
>>>> >>>>>> it's made of 2 individually idempotent operations:
>>>> >>>>>> 1) copy, which would fail if input is missing, and would
>>>> overwrite
>>>> >>>>>> output if it exists
>>>> >>>>>> -- reshuffle --
>>>> >>>>>> 2) delete, which would not fail if input is missing.
>>>> >>>>>
>>>> >>>>>
>>>> >>>>> Something like this is needed only in streaming, right?
>>>> >>>>>
>>>> >>>>> Raghu.
>>>> >>>>>
>>>> >>>>>>
>>>> >>>>>> That way first everything is copied (possibly via multiple
>>>> attempts),
>>>> >>>>>> and then old files are deleted (possibly via multiple attempts).
>>>> >>>>>>
>>>> >>>>>> On Wed, Jan 31, 2018 at 2:26 PM Udi Meiri <eh...@google.com>
>>>> wrote:
>>>> >>>>>>>
>>>> >>>>>>> I agree that overwriting is more in line with user expectations.
>>>> >>>>>>> I believe that the sink should not ignore errors from the
>>>> filesystem
>>>> >>>>>>> layer. Instead, the FileSystem API should be more well defined.
>>>> >>>>>>> Examples: rename() and copy() should overwrite existing files
>>>> at the
>>>> >>>>>>> destination, copy() should have an ignore_missing flag.
>>>> >>>>>>>
>>>> >>>>>>> On Wed, Jan 31, 2018 at 1:49 PM Raghu Angadi <
>>>> rang...@google.com>
>>>> >>>>>>> wrote:
>>>> >>>>>>>>
>>>> >>>>>>>> Original mail mentions that output from second run of
>>>> word_count is
>>>> >>>>>>>> ignored. That does not seem as safe as ignoring error from a
>>>> second attempt
>>>> >>>>>>>> of a step. How do we know second run didn't run on different
>>>> output?
>>>> >>>>>>>> Overwriting seems more accurate than ignoring. Does handling
>>>> this error at
>>>> >>>>>>>> sink level distinguish between the two (another run vs second
>>>> attempt)?
>>>> >>>>>>>>
>>>> >>>>>>>>
>>>> >>>>>>>> On Wed, Jan 31, 2018 at 12:32 PM, Udi Meiri <eh...@google.com>
>>>> >>>>>>>> wrote:
>>>> >>>>>>>>>
>>>> >>>>>>>>> Yeah, another round of refactoring is due to move the rename
>>>> via
>>>> >>>>>>>>> copy+delete logic up to the file-based sink level.
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>> On Wed, Jan 31, 2018, 10:42 Chamikara Jayalath
>>>> >>>>>>>>> <chamik...@google.com> wrote:
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> Good point. There's always the chance of step that performs
>>>> final
>>>> >>>>>>>>>> rename being retried. So we'll have to ignore this error at
>>>> the sink level.
>>>> >>>>>>>>>> We don't necessarily have to do this at the FileSystem level
>>>> though. I think
>>>> >>>>>>>>>> the proper behavior might be to raise an error for the
>>>> rename at the
>>>> >>>>>>>>>> FileSystem level if the destination already exists (or
>>>> source doesn't exist)
>>>> >>>>>>>>>> while ignoring that error (and possibly logging a warning)
>>>> at the sink
>>>> >>>>>>>>>> level.
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> - Cham
>>>> >>>>>>>>>>
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> On Tue, Jan 30, 2018 at 6:47 PM Reuven Lax <re...@google.com
>>>> >
>>>> >>>>>>>>>> wrote:
>>>> >>>>>>>>>>>
>>>> >>>>>>>>>>> I think the idea was to ignore "already exists" errors. The
>>>> >>>>>>>>>>> reason being that any step in Beam can be executed multiple
>>>> times, including
>>>> >>>>>>>>>>> the rename step. If the rename step gets run twice, the
>>>> second run should
>>>> >>>>>>>>>>> succeed vacuously.
>>>> >>>>>>>>>>>
>>>> >>>>>>>>>>>
>>>> >>>>>>>>>>> On Tue, Jan 30, 2018 at 6:19 PM, Udi Meiri <
>>>> eh...@google.com>
>>>> >>>>>>>>>>> wrote:
>>>> >>>>>>>>>>>>
>>>> >>>>>>>>>>>> Hi,
>>>> >>>>>>>>>>>> I've been working on HDFS code for the Python SDK and I've
>>>> >>>>>>>>>>>> noticed some behaviors which are surprising. I wanted to
>>>> know if these
>>>> >>>>>>>>>>>> behaviors are known and intended.
>>>> >>>>>>>>>>>>
>>>> >>>>>>>>>>>> 1. When renaming files during finalize_write, rename
>>>> errors are
>>>> >>>>>>>>>>>> ignored. For example, if I run wordcount twice using HDFS
>>>> code I get a
>>>> >>>>>>>>>>>> warning the second time because the file already exists:
>>>> >>>>>>>>>>>>
>>>> >>>>>>>>>>>> WARNING:root:Rename not successful:
>>>> >>>>>>>>>>>> hdfs://beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da2
>>>> 45/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2
>>>> >>>>>>>>>>>> -> hdfs://counts2-00000-of-00001, libhdfs error in renaming
>>>> >>>>>>>>>>>> hdfs://beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da2
>>>> 45/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2
>>>> >>>>>>>>>>>> to hdfs://counts2-00000-of-00001 with exceptions Unable to
>>>> rename
>>>> >>>>>>>>>>>> '/beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da2
>>>> 45/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2'
>>>> >>>>>>>>>>>> to '/counts2-00000-of-00001'.
>>>> >>>>>>>>>>>>
>>>> >>>>>>>>>>>> For GCS and local files there are no rename errors (in this
>>>> >>>>>>>>>>>> case), since the rename operation silently overwrites
>>>> existing destination
>>>> >>>>>>>>>>>> files. However, blindly ignoring these errors might make
>>>> the pipeline to
>>>> >>>>>>>>>>>> report success even though output files are missing.
>>>> >>>>>>>>>>>>
>>>> >>>>>>>>>>>> 2. Output files (--ouput) overwrite existing files.
>>>> >>>>>>>>>>>>
>>>> >>>>>>>>>>>> 3. The Python SDK doesn't use Filesystems.copy(). The Java
>>>> SDK
>>>> >>>>>>>>>>>> doesn't use Filesystem.Rename().
>>>> >>>>>>>>>>>>
>>>> >>>>>>>>>>>> Thanks,
>>>> >>>>>>>>>>>> - Udi
>>>> >>>>>>>>>>>
>>>> >>>>>>>>>>>
>>>> >>>>>>>>
>>>> >
>>>>
>>>
>>>
>>

Reply via email to