However this code might run in streaming as well, right? On Fri, Feb 2, 2018 at 9:54 AM, Raghu Angadi <rang...@google.com> wrote:
> In a batch pipeline, is it considered a data loss if the the stage fails > (assuming it does not set IGNORE_MISSING_FILES and fails hard)? If not, it > might be better to favor correctness and fail in current implementation. > > > On Thu, Feb 1, 2018 at 4:07 PM, Robert Bradshaw <rober...@google.com> > wrote: > >> You could add a step to delete all of dest before a barrier and the >> step that does the rename as outlined. In that case, any dest file >> that exists must be good. >> >> On Thu, Feb 1, 2018 at 2:52 PM, Eugene Kirpichov <kirpic...@google.com> >> wrote: >> > I think this is still unsafe in case exists(dst) (e.g. this is a re-run >> of a >> > pipeline) but src is missing due to some bad reason. However it's >> probably >> > better than what we have (e.g. we currently certainly don't perform >> checksum >> > checks). >> > >> > On Thu, Feb 1, 2018 at 2:45 PM Udi Meiri <eh...@google.com> wrote: >> >> >> >> For GCS, I would do what I believe we already do. >> >> rename(src, dst): >> >> - if !exists(src) and exists(dst) return 0 >> >> - if !exists(src) and !exists(dst) return error >> >> - if exists(src) and exists(dst) { if checksum(src) == checksum(dst) >> >> return 0 else delete(dst) } >> >> - Start a GCS copy from src to dst. >> >> - Wait for GCS copy to complete. >> >> - delete(src) >> >> >> >> For filesystems that don't have checksum() metadata, size() can be used >> >> instead. >> >> >> >> I've opened a bug to track this: >> >> https://issues.apache.org/jira/browse/BEAM-3600 >> >> >> >> On Thu, Feb 1, 2018 at 2:25 PM Eugene Kirpichov <kirpic...@google.com> >> >> wrote: >> >>> >> >>> Yes, IGNORE_MISSING_FILES is unsafe because it will - well - ignore >> files >> >>> that are missing for more ominous reasons than just this being a >> non-first >> >>> attempt at renaming src to dst. E.g. if there was a bug in >> constructing the >> >>> filename to be renamed, or if we somehow messed up the order of >> rename vs >> >>> cleanup, etc. - these situations with IGNORE_MISSING_FILES would lead >> to >> >>> silent data loss (likely caught by unit tests though - so this is not >> a >> >>> super serious issue). >> >>> >> >>> Basically I just can't think of a case when I was copying files and >> >>> thinking "oh man, I wish it didn't give an error if the stuff I'm >> copying >> >>> doesn't exist" - the option exists only because we couldn't come up >> with >> >>> another way to implement idempotent rename on GCS. >> >>> >> >>> What's your idea of how a safe retryable GCS rename() could work? >> >>> >> >>> On Wed, Jan 31, 2018 at 6:45 PM Udi Meiri <eh...@google.com> wrote: >> >>>> >> >>>> Eugene, if I get this right, you're saying that IGNORE_MISSING_FILES >> is >> >>>> unsafe because it will skip (src, dst) pairs where neither exist? >> (it only >> >>>> looks if src exists) >> >>>> >> >>>> For GCS, we can construct a safe retryable rename() operation, >> assuming >> >>>> that copy() and delete() are atomic for a single file or pair of >> files. >> >>>> >> >>>> >> >>>> >> >>>> On Wed, Jan 31, 2018 at 4:00 PM Raghu Angadi <rang...@google.com> >> wrote: >> >>>>> >> >>>>> On Wed, Jan 31, 2018 at 2:43 PM, Eugene Kirpichov >> >>>>> <kirpic...@google.com> wrote: >> >>>>>> >> >>>>>> As far as I know, the current implementation of file sinks is the >> only >> >>>>>> reason why the flag IGNORE_MISSING for copying even exists - >> there's no >> >>>>>> other compelling reason to justify it. We implement "rename" as >> "copy, then >> >>>>>> delete" (in a single DoFn), so for idempodency of this operation >> we need to >> >>>>>> ignore the copying of a non-existent file. >> >>>>>> >> >>>>>> I think the right way to go would be to change the implementation >> of >> >>>>>> renaming to have a @RequiresStableInput (or reshuffle) in the >> middle, so >> >>>>>> it's made of 2 individually idempotent operations: >> >>>>>> 1) copy, which would fail if input is missing, and would overwrite >> >>>>>> output if it exists >> >>>>>> -- reshuffle -- >> >>>>>> 2) delete, which would not fail if input is missing. >> >>>>> >> >>>>> >> >>>>> Something like this is needed only in streaming, right? >> >>>>> >> >>>>> Raghu. >> >>>>> >> >>>>>> >> >>>>>> That way first everything is copied (possibly via multiple >> attempts), >> >>>>>> and then old files are deleted (possibly via multiple attempts). >> >>>>>> >> >>>>>> On Wed, Jan 31, 2018 at 2:26 PM Udi Meiri <eh...@google.com> >> wrote: >> >>>>>>> >> >>>>>>> I agree that overwriting is more in line with user expectations. >> >>>>>>> I believe that the sink should not ignore errors from the >> filesystem >> >>>>>>> layer. Instead, the FileSystem API should be more well defined. >> >>>>>>> Examples: rename() and copy() should overwrite existing files at >> the >> >>>>>>> destination, copy() should have an ignore_missing flag. >> >>>>>>> >> >>>>>>> On Wed, Jan 31, 2018 at 1:49 PM Raghu Angadi <rang...@google.com> >> >>>>>>> wrote: >> >>>>>>>> >> >>>>>>>> Original mail mentions that output from second run of word_count >> is >> >>>>>>>> ignored. That does not seem as safe as ignoring error from a >> second attempt >> >>>>>>>> of a step. How do we know second run didn't run on different >> output? >> >>>>>>>> Overwriting seems more accurate than ignoring. Does handling >> this error at >> >>>>>>>> sink level distinguish between the two (another run vs second >> attempt)? >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> On Wed, Jan 31, 2018 at 12:32 PM, Udi Meiri <eh...@google.com> >> >>>>>>>> wrote: >> >>>>>>>>> >> >>>>>>>>> Yeah, another round of refactoring is due to move the rename via >> >>>>>>>>> copy+delete logic up to the file-based sink level. >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> On Wed, Jan 31, 2018, 10:42 Chamikara Jayalath >> >>>>>>>>> <chamik...@google.com> wrote: >> >>>>>>>>>> >> >>>>>>>>>> Good point. There's always the chance of step that performs >> final >> >>>>>>>>>> rename being retried. So we'll have to ignore this error at >> the sink level. >> >>>>>>>>>> We don't necessarily have to do this at the FileSystem level >> though. I think >> >>>>>>>>>> the proper behavior might be to raise an error for the rename >> at the >> >>>>>>>>>> FileSystem level if the destination already exists (or source >> doesn't exist) >> >>>>>>>>>> while ignoring that error (and possibly logging a warning) at >> the sink >> >>>>>>>>>> level. >> >>>>>>>>>> >> >>>>>>>>>> - Cham >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> On Tue, Jan 30, 2018 at 6:47 PM Reuven Lax <re...@google.com> >> >>>>>>>>>> wrote: >> >>>>>>>>>>> >> >>>>>>>>>>> I think the idea was to ignore "already exists" errors. The >> >>>>>>>>>>> reason being that any step in Beam can be executed multiple >> times, including >> >>>>>>>>>>> the rename step. If the rename step gets run twice, the >> second run should >> >>>>>>>>>>> succeed vacuously. >> >>>>>>>>>>> >> >>>>>>>>>>> >> >>>>>>>>>>> On Tue, Jan 30, 2018 at 6:19 PM, Udi Meiri <eh...@google.com> >> >>>>>>>>>>> wrote: >> >>>>>>>>>>>> >> >>>>>>>>>>>> Hi, >> >>>>>>>>>>>> I've been working on HDFS code for the Python SDK and I've >> >>>>>>>>>>>> noticed some behaviors which are surprising. I wanted to >> know if these >> >>>>>>>>>>>> behaviors are known and intended. >> >>>>>>>>>>>> >> >>>>>>>>>>>> 1. When renaming files during finalize_write, rename errors >> are >> >>>>>>>>>>>> ignored. For example, if I run wordcount twice using HDFS >> code I get a >> >>>>>>>>>>>> warning the second time because the file already exists: >> >>>>>>>>>>>> >> >>>>>>>>>>>> WARNING:root:Rename not successful: >> >>>>>>>>>>>> hdfs://beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da245/ >> 1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2 >> >>>>>>>>>>>> -> hdfs://counts2-00000-of-00001, libhdfs error in renaming >> >>>>>>>>>>>> hdfs://beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da245/ >> 1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2 >> >>>>>>>>>>>> to hdfs://counts2-00000-of-00001 with exceptions Unable to >> rename >> >>>>>>>>>>>> '/beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da245/ >> 1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2' >> >>>>>>>>>>>> to '/counts2-00000-of-00001'. >> >>>>>>>>>>>> >> >>>>>>>>>>>> For GCS and local files there are no rename errors (in this >> >>>>>>>>>>>> case), since the rename operation silently overwrites >> existing destination >> >>>>>>>>>>>> files. However, blindly ignoring these errors might make the >> pipeline to >> >>>>>>>>>>>> report success even though output files are missing. >> >>>>>>>>>>>> >> >>>>>>>>>>>> 2. Output files (--ouput) overwrite existing files. >> >>>>>>>>>>>> >> >>>>>>>>>>>> 3. The Python SDK doesn't use Filesystems.copy(). The Java >> SDK >> >>>>>>>>>>>> doesn't use Filesystem.Rename(). >> >>>>>>>>>>>> >> >>>>>>>>>>>> Thanks, >> >>>>>>>>>>>> - Udi >> >>>>>>>>>>> >> >>>>>>>>>>> >> >>>>>>>> >> > >> > >