Fortunately making deleting files idempotent is much easier than writing
them :). But one needs to handle the case of concurrent execution as well
as sequential re-execution due to possible zombie workers.

On Wed, Jan 27, 2021 at 5:04 PM Reuven Lax <re...@google.com> wrote:

> Keep in mind thatt DoFns might be reexecuted (even if you think they have
> completed successfully). This makes DoFns with side effects such as
> deleting files tricky to write correctly.
>
> On Wed, Jan 27, 2021 at 4:36 PM Tao Li <t...@zillow.com> wrote:
>
>> Thanks @Chamikara Jayalath <chamik...@google.com> I think it’s a good
>> idea to define a DoFn for this deletion operation, or maybe a composite
>> PTransform that does deletion first followed by ParquetIO.Write.
>>
>>
>>
>> *From: *Chamikara Jayalath <chamik...@google.com>
>> *Reply-To: *"user@beam.apache.org" <user@beam.apache.org>
>> *Date: *Wednesday, January 27, 2021 at 3:45 PM
>> *To: *user <user@beam.apache.org>
>> *Cc: *Alexey Romanenko <aromanenko....@gmail.com>
>> *Subject: *Re: Overwrite support from ParquetIO
>>
>>
>>
>>
>>
>>
>>
>> On Wed, Jan 27, 2021 at 12:06 PM Tao Li <t...@zillow.com> wrote:
>>
>> @Alexey Romanenko <aromanenko....@gmail.com> thanks for your response.
>> Regarding your questions:
>>
>>
>>
>>    1. Yes I can purge this directory (e.g. using s3 client from aws sdk)
>>    before using ParquetIO to save files. The caveat is that this deletion
>>    operation is not part of the beam pipeline, so it will kick off before the
>>    pipeline starts. More ideally, this purge operation could be baked into 
>> the
>>    write operation with ParquetIO so we will have the deletion happen right
>>    before the files writes.
>>    2. Regarding the naming strategy, yes the old files will be
>>    overwritten by the new files if they have the same file names. However 
>> this
>>    does not always guarantee that all the old files in this directory are
>>    wiped out (which is actually my requirement). For example we may change 
>> the
>>    shard count (through withNumShards() method) in different pipeline runs 
>> and
>>    there could be old files from previous run that won’t get overwritten in
>>    the current run.
>>
>>
>>
>> In general, Beam file-based sinks are intended  for writing new files. So
>> I don't think existing file-based sinks (including Parquet) will work out
>> of the box for replacing existing files or for appending to such files.
>>
>> But you should be able to delete existing files separately, for example.
>>
>> (1) As a function that is performed before executing the pipeline.
>>
>> (2) As a function that is performed from a ParDo step that is executed
>> before the ParquetIO.Write step. Also you will have to make sure that the
>> runner does not fuse the ParDo step and the Write step. Usually, this can
>> be done by performing it in a side-input step (to a ParDo that precedes
>> sink) or by adding a GBK/Reshuffle between the two steps.
>>
>>
>>
>> Thanks,
>>
>> Cham
>>
>>
>>
>>
>>
>>
>>    1.
>>
>>
>>
>> Please let me know if this makes sense to you. Thanks!
>>
>>
>>
>>
>>
>> *From: *Alexey Romanenko <aromanenko....@gmail.com>
>> *Reply-To: *"user@beam.apache.org" <user@beam.apache.org>
>> *Date: *Wednesday, January 27, 2021 at 9:10 AM
>> *To: *"user@beam.apache.org" <user@beam.apache.org>
>> *Subject: *Re: Overwrite support from ParquetIO
>>
>>
>>
>> What do you mean by “wipe out all existing parquet files before a write
>> operation”? Are these all files that already exist in the same output
>> directory? Can you purge this directory before or just use a new output
>> directory for every pipeline run?
>>
>>
>>
>> To write Parquet files you need to use ParquetIO.sink()
>> with FileIO.write() and I don’t think it will clean up the output directory
>> before write. Though, if there are the name collisions between existing and
>> new output files (it depends on used naming strategy) then I think the old
>> files will be overwritten by new ones.
>>
>>
>>
>>
>>
>>
>>
>> On 25 Jan 2021, at 19:10, Tao Li <t...@zillow.com> wrote:
>>
>>
>>
>> Hi Beam community,
>>
>>
>>
>> Does ParquetIO support an overwrite behavior when saving files? More
>> specifically, I would like to wipe out all existing parquet files before a
>> write operation. Is there a ParquetIO API to support that? Thanks!
>>
>>
>>
>>

Reply via email to