Correct, but if you then have a subsequent DoFn write a new file, you might
be surprised if a zombie execution of the first DoFn reexecutes and deletes
that file!

On Wed, Jan 27, 2021 at 5:23 PM Robert Bradshaw <rober...@google.com> wrote:

> Fortunately making deleting files idempotent is much easier than writing
> them :). But one needs to handle the case of concurrent execution as well
> as sequential re-execution due to possible zombie workers.
>
> On Wed, Jan 27, 2021 at 5:04 PM Reuven Lax <re...@google.com> wrote:
>
>> Keep in mind thatt DoFns might be reexecuted (even if you think they have
>> completed successfully). This makes DoFns with side effects such as
>> deleting files tricky to write correctly.
>>
>> On Wed, Jan 27, 2021 at 4:36 PM Tao Li <t...@zillow.com> wrote:
>>
>>> Thanks @Chamikara Jayalath <chamik...@google.com> I think it’s a good
>>> idea to define a DoFn for this deletion operation, or maybe a composite
>>> PTransform that does deletion first followed by ParquetIO.Write.
>>>
>>>
>>>
>>> *From: *Chamikara Jayalath <chamik...@google.com>
>>> *Reply-To: *"user@beam.apache.org" <user@beam.apache.org>
>>> *Date: *Wednesday, January 27, 2021 at 3:45 PM
>>> *To: *user <user@beam.apache.org>
>>> *Cc: *Alexey Romanenko <aromanenko....@gmail.com>
>>> *Subject: *Re: Overwrite support from ParquetIO
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Wed, Jan 27, 2021 at 12:06 PM Tao Li <t...@zillow.com> wrote:
>>>
>>> @Alexey Romanenko <aromanenko....@gmail.com> thanks for your response.
>>> Regarding your questions:
>>>
>>>
>>>
>>>    1. Yes I can purge this directory (e.g. using s3 client from aws
>>>    sdk) before using ParquetIO to save files. The caveat is that this 
>>> deletion
>>>    operation is not part of the beam pipeline, so it will kick off before 
>>> the
>>>    pipeline starts. More ideally, this purge operation could be baked into 
>>> the
>>>    write operation with ParquetIO so we will have the deletion happen right
>>>    before the files writes.
>>>    2. Regarding the naming strategy, yes the old files will be
>>>    overwritten by the new files if they have the same file names. However 
>>> this
>>>    does not always guarantee that all the old files in this directory are
>>>    wiped out (which is actually my requirement). For example we may change 
>>> the
>>>    shard count (through withNumShards() method) in different pipeline runs 
>>> and
>>>    there could be old files from previous run that won’t get overwritten in
>>>    the current run.
>>>
>>>
>>>
>>> In general, Beam file-based sinks are intended  for writing new files.
>>> So I don't think existing file-based sinks (including Parquet) will work
>>> out of the box for replacing existing files or for appending to such files.
>>>
>>> But you should be able to delete existing files separately, for example.
>>>
>>> (1) As a function that is performed before executing the pipeline.
>>>
>>> (2) As a function that is performed from a ParDo step that is executed
>>> before the ParquetIO.Write step. Also you will have to make sure that the
>>> runner does not fuse the ParDo step and the Write step. Usually, this can
>>> be done by performing it in a side-input step (to a ParDo that precedes
>>> sink) or by adding a GBK/Reshuffle between the two steps.
>>>
>>>
>>>
>>> Thanks,
>>>
>>> Cham
>>>
>>>
>>>
>>>
>>>
>>>
>>>    1.
>>>
>>>
>>>
>>> Please let me know if this makes sense to you. Thanks!
>>>
>>>
>>>
>>>
>>>
>>> *From: *Alexey Romanenko <aromanenko....@gmail.com>
>>> *Reply-To: *"user@beam.apache.org" <user@beam.apache.org>
>>> *Date: *Wednesday, January 27, 2021 at 9:10 AM
>>> *To: *"user@beam.apache.org" <user@beam.apache.org>
>>> *Subject: *Re: Overwrite support from ParquetIO
>>>
>>>
>>>
>>> What do you mean by “wipe out all existing parquet files before a write
>>> operation”? Are these all files that already exist in the same output
>>> directory? Can you purge this directory before or just use a new output
>>> directory for every pipeline run?
>>>
>>>
>>>
>>> To write Parquet files you need to use ParquetIO.sink()
>>> with FileIO.write() and I don’t think it will clean up the output directory
>>> before write. Though, if there are the name collisions between existing and
>>> new output files (it depends on used naming strategy) then I think the old
>>> files will be overwritten by new ones.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On 25 Jan 2021, at 19:10, Tao Li <t...@zillow.com> wrote:
>>>
>>>
>>>
>>> Hi Beam community,
>>>
>>>
>>>
>>> Does ParquetIO support an overwrite behavior when saving files? More
>>> specifically, I would like to wipe out all existing parquet files before a
>>> write operation. Is there a ParquetIO API to support that? Thanks!
>>>
>>>
>>>
>>>

Reply via email to