Fortunately making deleting files idempotent is much easier than writing them :). But one needs to handle the case of concurrent execution as well as sequential re-execution due to possible zombie workers.
On Wed, Jan 27, 2021 at 5:04 PM Reuven Lax <re...@google.com> wrote: > Keep in mind thatt DoFns might be reexecuted (even if you think they have > completed successfully). This makes DoFns with side effects such as > deleting files tricky to write correctly. > > On Wed, Jan 27, 2021 at 4:36 PM Tao Li <t...@zillow.com> wrote: > >> Thanks @Chamikara Jayalath <chamik...@google.com> I think it’s a good >> idea to define a DoFn for this deletion operation, or maybe a composite >> PTransform that does deletion first followed by ParquetIO.Write. >> >> >> >> *From: *Chamikara Jayalath <chamik...@google.com> >> *Reply-To: *"user@beam.apache.org" <user@beam.apache.org> >> *Date: *Wednesday, January 27, 2021 at 3:45 PM >> *To: *user <user@beam.apache.org> >> *Cc: *Alexey Romanenko <aromanenko....@gmail.com> >> *Subject: *Re: Overwrite support from ParquetIO >> >> >> >> >> >> >> >> On Wed, Jan 27, 2021 at 12:06 PM Tao Li <t...@zillow.com> wrote: >> >> @Alexey Romanenko <aromanenko....@gmail.com> thanks for your response. >> Regarding your questions: >> >> >> >> 1. Yes I can purge this directory (e.g. using s3 client from aws sdk) >> before using ParquetIO to save files. The caveat is that this deletion >> operation is not part of the beam pipeline, so it will kick off before the >> pipeline starts. More ideally, this purge operation could be baked into >> the >> write operation with ParquetIO so we will have the deletion happen right >> before the files writes. >> 2. Regarding the naming strategy, yes the old files will be >> overwritten by the new files if they have the same file names. However >> this >> does not always guarantee that all the old files in this directory are >> wiped out (which is actually my requirement). For example we may change >> the >> shard count (through withNumShards() method) in different pipeline runs >> and >> there could be old files from previous run that won’t get overwritten in >> the current run. >> >> >> >> In general, Beam file-based sinks are intended for writing new files. So >> I don't think existing file-based sinks (including Parquet) will work out >> of the box for replacing existing files or for appending to such files. >> >> But you should be able to delete existing files separately, for example. >> >> (1) As a function that is performed before executing the pipeline. >> >> (2) As a function that is performed from a ParDo step that is executed >> before the ParquetIO.Write step. Also you will have to make sure that the >> runner does not fuse the ParDo step and the Write step. Usually, this can >> be done by performing it in a side-input step (to a ParDo that precedes >> sink) or by adding a GBK/Reshuffle between the two steps. >> >> >> >> Thanks, >> >> Cham >> >> >> >> >> >> >> 1. >> >> >> >> Please let me know if this makes sense to you. Thanks! >> >> >> >> >> >> *From: *Alexey Romanenko <aromanenko....@gmail.com> >> *Reply-To: *"user@beam.apache.org" <user@beam.apache.org> >> *Date: *Wednesday, January 27, 2021 at 9:10 AM >> *To: *"user@beam.apache.org" <user@beam.apache.org> >> *Subject: *Re: Overwrite support from ParquetIO >> >> >> >> What do you mean by “wipe out all existing parquet files before a write >> operation”? Are these all files that already exist in the same output >> directory? Can you purge this directory before or just use a new output >> directory for every pipeline run? >> >> >> >> To write Parquet files you need to use ParquetIO.sink() >> with FileIO.write() and I don’t think it will clean up the output directory >> before write. Though, if there are the name collisions between existing and >> new output files (it depends on used naming strategy) then I think the old >> files will be overwritten by new ones. >> >> >> >> >> >> >> >> On 25 Jan 2021, at 19:10, Tao Li <t...@zillow.com> wrote: >> >> >> >> Hi Beam community, >> >> >> >> Does ParquetIO support an overwrite behavior when saving files? More >> specifically, I would like to wipe out all existing parquet files before a >> write operation. Is there a ParquetIO API to support that? Thanks! >> >> >> >>