On Fri, Feb 2, 2018 at 11:17 AM, Chamikara Jayalath <chamik...@google.com> wrote:
> Currently, Python file-based sink is batch only. > Sure, but that won't be true forever. > > Regarding Raghu's question, stage/pipeline failure should not be > considered as a data loss but I prefer overriding existing output and > completing a possibly expensive pipeline over failing the whole pipeline > due to one or more existing files. > > - Cham > > > On Fri, Feb 2, 2018 at 10:21 AM Reuven Lax <re...@google.com> wrote: > >> However this code might run in streaming as well, right? >> >> On Fri, Feb 2, 2018 at 9:54 AM, Raghu Angadi <rang...@google.com> wrote: >> >>> In a batch pipeline, is it considered a data loss if the the stage fails >>> (assuming it does not set IGNORE_MISSING_FILES and fails hard)? If not, it >>> might be better to favor correctness and fail in current implementation. >>> >>> >>> On Thu, Feb 1, 2018 at 4:07 PM, Robert Bradshaw <rober...@google.com> >>> wrote: >>> >>>> You could add a step to delete all of dest before a barrier and the >>>> step that does the rename as outlined. In that case, any dest file >>>> that exists must be good. >>>> >>>> On Thu, Feb 1, 2018 at 2:52 PM, Eugene Kirpichov <kirpic...@google.com> >>>> wrote: >>>> > I think this is still unsafe in case exists(dst) (e.g. this is a >>>> re-run of a >>>> > pipeline) but src is missing due to some bad reason. However it's >>>> probably >>>> > better than what we have (e.g. we currently certainly don't perform >>>> checksum >>>> > checks). >>>> > >>>> > On Thu, Feb 1, 2018 at 2:45 PM Udi Meiri <eh...@google.com> wrote: >>>> >> >>>> >> For GCS, I would do what I believe we already do. >>>> >> rename(src, dst): >>>> >> - if !exists(src) and exists(dst) return 0 >>>> >> - if !exists(src) and !exists(dst) return error >>>> >> - if exists(src) and exists(dst) { if checksum(src) == checksum(dst) >>>> >> return 0 else delete(dst) } >>>> >> - Start a GCS copy from src to dst. >>>> >> - Wait for GCS copy to complete. >>>> >> - delete(src) >>>> >> >>>> >> For filesystems that don't have checksum() metadata, size() can be >>>> used >>>> >> instead. >>>> >> >>>> >> I've opened a bug to track this: >>>> >> https://issues.apache.org/jira/browse/BEAM-3600 >>>> >> >>>> >> On Thu, Feb 1, 2018 at 2:25 PM Eugene Kirpichov < >>>> kirpic...@google.com> >>>> >> wrote: >>>> >>> >>>> >>> Yes, IGNORE_MISSING_FILES is unsafe because it will - well - ignore >>>> files >>>> >>> that are missing for more ominous reasons than just this being a >>>> non-first >>>> >>> attempt at renaming src to dst. E.g. if there was a bug in >>>> constructing the >>>> >>> filename to be renamed, or if we somehow messed up the order of >>>> rename vs >>>> >>> cleanup, etc. - these situations with IGNORE_MISSING_FILES would >>>> lead to >>>> >>> silent data loss (likely caught by unit tests though - so this is >>>> not a >>>> >>> super serious issue). >>>> >>> >>>> >>> Basically I just can't think of a case when I was copying files and >>>> >>> thinking "oh man, I wish it didn't give an error if the stuff I'm >>>> copying >>>> >>> doesn't exist" - the option exists only because we couldn't come up >>>> with >>>> >>> another way to implement idempotent rename on GCS. >>>> >>> >>>> >>> What's your idea of how a safe retryable GCS rename() could work? >>>> >>> >>>> >>> On Wed, Jan 31, 2018 at 6:45 PM Udi Meiri <eh...@google.com> wrote: >>>> >>>> >>>> >>>> Eugene, if I get this right, you're saying that >>>> IGNORE_MISSING_FILES is >>>> >>>> unsafe because it will skip (src, dst) pairs where neither exist? >>>> (it only >>>> >>>> looks if src exists) >>>> >>>> >>>> >>>> For GCS, we can construct a safe retryable rename() operation, >>>> assuming >>>> >>>> that copy() and delete() are atomic for a single file or pair of >>>> files. >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> On Wed, Jan 31, 2018 at 4:00 PM Raghu Angadi <rang...@google.com> >>>> wrote: >>>> >>>>> >>>> >>>>> On Wed, Jan 31, 2018 at 2:43 PM, Eugene Kirpichov >>>> >>>>> <kirpic...@google.com> wrote: >>>> >>>>>> >>>> >>>>>> As far as I know, the current implementation of file sinks is >>>> the only >>>> >>>>>> reason why the flag IGNORE_MISSING for copying even exists - >>>> there's no >>>> >>>>>> other compelling reason to justify it. We implement "rename" as >>>> "copy, then >>>> >>>>>> delete" (in a single DoFn), so for idempodency of this operation >>>> we need to >>>> >>>>>> ignore the copying of a non-existent file. >>>> >>>>>> >>>> >>>>>> I think the right way to go would be to change the >>>> implementation of >>>> >>>>>> renaming to have a @RequiresStableInput (or reshuffle) in the >>>> middle, so >>>> >>>>>> it's made of 2 individually idempotent operations: >>>> >>>>>> 1) copy, which would fail if input is missing, and would >>>> overwrite >>>> >>>>>> output if it exists >>>> >>>>>> -- reshuffle -- >>>> >>>>>> 2) delete, which would not fail if input is missing. >>>> >>>>> >>>> >>>>> >>>> >>>>> Something like this is needed only in streaming, right? >>>> >>>>> >>>> >>>>> Raghu. >>>> >>>>> >>>> >>>>>> >>>> >>>>>> That way first everything is copied (possibly via multiple >>>> attempts), >>>> >>>>>> and then old files are deleted (possibly via multiple attempts). >>>> >>>>>> >>>> >>>>>> On Wed, Jan 31, 2018 at 2:26 PM Udi Meiri <eh...@google.com> >>>> wrote: >>>> >>>>>>> >>>> >>>>>>> I agree that overwriting is more in line with user expectations. >>>> >>>>>>> I believe that the sink should not ignore errors from the >>>> filesystem >>>> >>>>>>> layer. Instead, the FileSystem API should be more well defined. >>>> >>>>>>> Examples: rename() and copy() should overwrite existing files >>>> at the >>>> >>>>>>> destination, copy() should have an ignore_missing flag. >>>> >>>>>>> >>>> >>>>>>> On Wed, Jan 31, 2018 at 1:49 PM Raghu Angadi < >>>> rang...@google.com> >>>> >>>>>>> wrote: >>>> >>>>>>>> >>>> >>>>>>>> Original mail mentions that output from second run of >>>> word_count is >>>> >>>>>>>> ignored. That does not seem as safe as ignoring error from a >>>> second attempt >>>> >>>>>>>> of a step. How do we know second run didn't run on different >>>> output? >>>> >>>>>>>> Overwriting seems more accurate than ignoring. Does handling >>>> this error at >>>> >>>>>>>> sink level distinguish between the two (another run vs second >>>> attempt)? >>>> >>>>>>>> >>>> >>>>>>>> >>>> >>>>>>>> On Wed, Jan 31, 2018 at 12:32 PM, Udi Meiri <eh...@google.com> >>>> >>>>>>>> wrote: >>>> >>>>>>>>> >>>> >>>>>>>>> Yeah, another round of refactoring is due to move the rename >>>> via >>>> >>>>>>>>> copy+delete logic up to the file-based sink level. >>>> >>>>>>>>> >>>> >>>>>>>>> >>>> >>>>>>>>> On Wed, Jan 31, 2018, 10:42 Chamikara Jayalath >>>> >>>>>>>>> <chamik...@google.com> wrote: >>>> >>>>>>>>>> >>>> >>>>>>>>>> Good point. There's always the chance of step that performs >>>> final >>>> >>>>>>>>>> rename being retried. So we'll have to ignore this error at >>>> the sink level. >>>> >>>>>>>>>> We don't necessarily have to do this at the FileSystem level >>>> though. I think >>>> >>>>>>>>>> the proper behavior might be to raise an error for the >>>> rename at the >>>> >>>>>>>>>> FileSystem level if the destination already exists (or >>>> source doesn't exist) >>>> >>>>>>>>>> while ignoring that error (and possibly logging a warning) >>>> at the sink >>>> >>>>>>>>>> level. >>>> >>>>>>>>>> >>>> >>>>>>>>>> - Cham >>>> >>>>>>>>>> >>>> >>>>>>>>>> >>>> >>>>>>>>>> On Tue, Jan 30, 2018 at 6:47 PM Reuven Lax <re...@google.com >>>> > >>>> >>>>>>>>>> wrote: >>>> >>>>>>>>>>> >>>> >>>>>>>>>>> I think the idea was to ignore "already exists" errors. The >>>> >>>>>>>>>>> reason being that any step in Beam can be executed multiple >>>> times, including >>>> >>>>>>>>>>> the rename step. If the rename step gets run twice, the >>>> second run should >>>> >>>>>>>>>>> succeed vacuously. >>>> >>>>>>>>>>> >>>> >>>>>>>>>>> >>>> >>>>>>>>>>> On Tue, Jan 30, 2018 at 6:19 PM, Udi Meiri < >>>> eh...@google.com> >>>> >>>>>>>>>>> wrote: >>>> >>>>>>>>>>>> >>>> >>>>>>>>>>>> Hi, >>>> >>>>>>>>>>>> I've been working on HDFS code for the Python SDK and I've >>>> >>>>>>>>>>>> noticed some behaviors which are surprising. I wanted to >>>> know if these >>>> >>>>>>>>>>>> behaviors are known and intended. >>>> >>>>>>>>>>>> >>>> >>>>>>>>>>>> 1. When renaming files during finalize_write, rename >>>> errors are >>>> >>>>>>>>>>>> ignored. For example, if I run wordcount twice using HDFS >>>> code I get a >>>> >>>>>>>>>>>> warning the second time because the file already exists: >>>> >>>>>>>>>>>> >>>> >>>>>>>>>>>> WARNING:root:Rename not successful: >>>> >>>>>>>>>>>> hdfs://beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da2 >>>> 45/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2 >>>> >>>>>>>>>>>> -> hdfs://counts2-00000-of-00001, libhdfs error in renaming >>>> >>>>>>>>>>>> hdfs://beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da2 >>>> 45/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2 >>>> >>>>>>>>>>>> to hdfs://counts2-00000-of-00001 with exceptions Unable to >>>> rename >>>> >>>>>>>>>>>> '/beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da2 >>>> 45/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2' >>>> >>>>>>>>>>>> to '/counts2-00000-of-00001'. >>>> >>>>>>>>>>>> >>>> >>>>>>>>>>>> For GCS and local files there are no rename errors (in this >>>> >>>>>>>>>>>> case), since the rename operation silently overwrites >>>> existing destination >>>> >>>>>>>>>>>> files. However, blindly ignoring these errors might make >>>> the pipeline to >>>> >>>>>>>>>>>> report success even though output files are missing. >>>> >>>>>>>>>>>> >>>> >>>>>>>>>>>> 2. Output files (--ouput) overwrite existing files. >>>> >>>>>>>>>>>> >>>> >>>>>>>>>>>> 3. The Python SDK doesn't use Filesystems.copy(). The Java >>>> SDK >>>> >>>>>>>>>>>> doesn't use Filesystem.Rename(). >>>> >>>>>>>>>>>> >>>> >>>>>>>>>>>> Thanks, >>>> >>>>>>>>>>>> - Udi >>>> >>>>>>>>>>> >>>> >>>>>>>>>>> >>>> >>>>>>>> >>>> > >>>> >>> >>> >>