Correct, but if you then have a subsequent DoFn write a new file, you might be surprised if a zombie execution of the first DoFn reexecutes and deletes that file!
On Wed, Jan 27, 2021 at 5:23 PM Robert Bradshaw <rober...@google.com> wrote: > Fortunately making deleting files idempotent is much easier than writing > them :). But one needs to handle the case of concurrent execution as well > as sequential re-execution due to possible zombie workers. > > On Wed, Jan 27, 2021 at 5:04 PM Reuven Lax <re...@google.com> wrote: > >> Keep in mind thatt DoFns might be reexecuted (even if you think they have >> completed successfully). This makes DoFns with side effects such as >> deleting files tricky to write correctly. >> >> On Wed, Jan 27, 2021 at 4:36 PM Tao Li <t...@zillow.com> wrote: >> >>> Thanks @Chamikara Jayalath <chamik...@google.com> I think it’s a good >>> idea to define a DoFn for this deletion operation, or maybe a composite >>> PTransform that does deletion first followed by ParquetIO.Write. >>> >>> >>> >>> *From: *Chamikara Jayalath <chamik...@google.com> >>> *Reply-To: *"user@beam.apache.org" <user@beam.apache.org> >>> *Date: *Wednesday, January 27, 2021 at 3:45 PM >>> *To: *user <user@beam.apache.org> >>> *Cc: *Alexey Romanenko <aromanenko....@gmail.com> >>> *Subject: *Re: Overwrite support from ParquetIO >>> >>> >>> >>> >>> >>> >>> >>> On Wed, Jan 27, 2021 at 12:06 PM Tao Li <t...@zillow.com> wrote: >>> >>> @Alexey Romanenko <aromanenko....@gmail.com> thanks for your response. >>> Regarding your questions: >>> >>> >>> >>> 1. Yes I can purge this directory (e.g. using s3 client from aws >>> sdk) before using ParquetIO to save files. The caveat is that this >>> deletion >>> operation is not part of the beam pipeline, so it will kick off before >>> the >>> pipeline starts. More ideally, this purge operation could be baked into >>> the >>> write operation with ParquetIO so we will have the deletion happen right >>> before the files writes. >>> 2. Regarding the naming strategy, yes the old files will be >>> overwritten by the new files if they have the same file names. However >>> this >>> does not always guarantee that all the old files in this directory are >>> wiped out (which is actually my requirement). For example we may change >>> the >>> shard count (through withNumShards() method) in different pipeline runs >>> and >>> there could be old files from previous run that won’t get overwritten in >>> the current run. >>> >>> >>> >>> In general, Beam file-based sinks are intended for writing new files. >>> So I don't think existing file-based sinks (including Parquet) will work >>> out of the box for replacing existing files or for appending to such files. >>> >>> But you should be able to delete existing files separately, for example. >>> >>> (1) As a function that is performed before executing the pipeline. >>> >>> (2) As a function that is performed from a ParDo step that is executed >>> before the ParquetIO.Write step. Also you will have to make sure that the >>> runner does not fuse the ParDo step and the Write step. Usually, this can >>> be done by performing it in a side-input step (to a ParDo that precedes >>> sink) or by adding a GBK/Reshuffle between the two steps. >>> >>> >>> >>> Thanks, >>> >>> Cham >>> >>> >>> >>> >>> >>> >>> 1. >>> >>> >>> >>> Please let me know if this makes sense to you. Thanks! >>> >>> >>> >>> >>> >>> *From: *Alexey Romanenko <aromanenko....@gmail.com> >>> *Reply-To: *"user@beam.apache.org" <user@beam.apache.org> >>> *Date: *Wednesday, January 27, 2021 at 9:10 AM >>> *To: *"user@beam.apache.org" <user@beam.apache.org> >>> *Subject: *Re: Overwrite support from ParquetIO >>> >>> >>> >>> What do you mean by “wipe out all existing parquet files before a write >>> operation”? Are these all files that already exist in the same output >>> directory? Can you purge this directory before or just use a new output >>> directory for every pipeline run? >>> >>> >>> >>> To write Parquet files you need to use ParquetIO.sink() >>> with FileIO.write() and I don’t think it will clean up the output directory >>> before write. Though, if there are the name collisions between existing and >>> new output files (it depends on used naming strategy) then I think the old >>> files will be overwritten by new ones. >>> >>> >>> >>> >>> >>> >>> >>> On 25 Jan 2021, at 19:10, Tao Li <t...@zillow.com> wrote: >>> >>> >>> >>> Hi Beam community, >>> >>> >>> >>> Does ParquetIO support an overwrite behavior when saving files? More >>> specifically, I would like to wipe out all existing parquet files before a >>> write operation. Is there a ParquetIO API to support that? Thanks! >>> >>> >>> >>>