Re: Overwrite support from ParquetIO

2021-01-27 Thread Reuven Lax
Correct, but if you then have a subsequent DoFn write a new file, you might
be surprised if a zombie execution of the first DoFn reexecutes and deletes
that file!

On Wed, Jan 27, 2021 at 5:23 PM Robert Bradshaw  wrote:

> Fortunately making deleting files idempotent is much easier than writing
> them :). But one needs to handle the case of concurrent execution as well
> as sequential re-execution due to possible zombie workers.
>
> On Wed, Jan 27, 2021 at 5:04 PM Reuven Lax  wrote:
>
>> Keep in mind thatt DoFns might be reexecuted (even if you think they have
>> completed successfully). This makes DoFns with side effects such as
>> deleting files tricky to write correctly.
>>
>> On Wed, Jan 27, 2021 at 4:36 PM Tao Li  wrote:
>>
>>> Thanks @Chamikara Jayalath  I think it’s a good
>>> idea to define a DoFn for this deletion operation, or maybe a composite
>>> PTransform that does deletion first followed by ParquetIO.Write.
>>>
>>>
>>>
>>> *From: *Chamikara Jayalath 
>>> *Reply-To: *"user@beam.apache.org" 
>>> *Date: *Wednesday, January 27, 2021 at 3:45 PM
>>> *To: *user 
>>> *Cc: *Alexey Romanenko 
>>> *Subject: *Re: Overwrite support from ParquetIO
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Wed, Jan 27, 2021 at 12:06 PM Tao Li  wrote:
>>>
>>> @Alexey Romanenko  thanks for your response.
>>> Regarding your questions:
>>>
>>>
>>>
>>>1. Yes I can purge this directory (e.g. using s3 client from aws
>>>sdk) before using ParquetIO to save files. The caveat is that this 
>>> deletion
>>>operation is not part of the beam pipeline, so it will kick off before 
>>> the
>>>pipeline starts. More ideally, this purge operation could be baked into 
>>> the
>>>write operation with ParquetIO so we will have the deletion happen right
>>>before the files writes.
>>>2. Regarding the naming strategy, yes the old files will be
>>>overwritten by the new files if they have the same file names. However 
>>> this
>>>does not always guarantee that all the old files in this directory are
>>>wiped out (which is actually my requirement). For example we may change 
>>> the
>>>shard count (through withNumShards() method) in different pipeline runs 
>>> and
>>>there could be old files from previous run that won’t get overwritten in
>>>the current run.
>>>
>>>
>>>
>>> In general, Beam file-based sinks are intended  for writing new files.
>>> So I don't think existing file-based sinks (including Parquet) will work
>>> out of the box for replacing existing files or for appending to such files.
>>>
>>> But you should be able to delete existing files separately, for example.
>>>
>>> (1) As a function that is performed before executing the pipeline.
>>>
>>> (2) As a function that is performed from a ParDo step that is executed
>>> before the ParquetIO.Write step. Also you will have to make sure that the
>>> runner does not fuse the ParDo step and the Write step. Usually, this can
>>> be done by performing it in a side-input step (to a ParDo that precedes
>>> sink) or by adding a GBK/Reshuffle between the two steps.
>>>
>>>
>>>
>>> Thanks,
>>>
>>> Cham
>>>
>>>
>>>
>>>
>>>
>>>
>>>1.
>>>
>>>
>>>
>>> Please let me know if this makes sense to you. Thanks!
>>>
>>>
>>>
>>>
>>>
>>> *From: *Alexey Romanenko 
>>> *Reply-To: *"user@beam.apache.org" 
>>> *Date: *Wednesday, January 27, 2021 at 9:10 AM
>>> *To: *"user@beam.apache.org" 
>>> *Subject: *Re: Overwrite support from ParquetIO
>>>
>>>
>>>
>>> What do you mean by “wipe out all existing parquet files before a write
>>> operation”? Are these all files that already exist in the same output
>>> directory? Can you purge this directory before or just use a new output
>>> directory for every pipeline run?
>>>
>>>
>>>
>>> To write Parquet files you need to use ParquetIO.sink()
>>> with FileIO.write() and I don’t think it will clean up the output directory
>>> before write. Though, if there are the name collisions between existing and
>>> new output files (it depends on used naming strategy) then I think the old
>>> files will be overwritten by new ones.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On 25 Jan 2021, at 19:10, Tao Li  wrote:
>>>
>>>
>>>
>>> Hi Beam community,
>>>
>>>
>>>
>>> Does ParquetIO support an overwrite behavior when saving files? More
>>> specifically, I would like to wipe out all existing parquet files before a
>>> write operation. Is there a ParquetIO API to support that? Thanks!
>>>
>>>
>>>
>>>


Re: Overwrite support from ParquetIO

2021-01-27 Thread Robert Bradshaw
Fortunately making deleting files idempotent is much easier than writing
them :). But one needs to handle the case of concurrent execution as well
as sequential re-execution due to possible zombie workers.

On Wed, Jan 27, 2021 at 5:04 PM Reuven Lax  wrote:

> Keep in mind thatt DoFns might be reexecuted (even if you think they have
> completed successfully). This makes DoFns with side effects such as
> deleting files tricky to write correctly.
>
> On Wed, Jan 27, 2021 at 4:36 PM Tao Li  wrote:
>
>> Thanks @Chamikara Jayalath  I think it’s a good
>> idea to define a DoFn for this deletion operation, or maybe a composite
>> PTransform that does deletion first followed by ParquetIO.Write.
>>
>>
>>
>> *From: *Chamikara Jayalath 
>> *Reply-To: *"user@beam.apache.org" 
>> *Date: *Wednesday, January 27, 2021 at 3:45 PM
>> *To: *user 
>> *Cc: *Alexey Romanenko 
>> *Subject: *Re: Overwrite support from ParquetIO
>>
>>
>>
>>
>>
>>
>>
>> On Wed, Jan 27, 2021 at 12:06 PM Tao Li  wrote:
>>
>> @Alexey Romanenko  thanks for your response.
>> Regarding your questions:
>>
>>
>>
>>1. Yes I can purge this directory (e.g. using s3 client from aws sdk)
>>before using ParquetIO to save files. The caveat is that this deletion
>>operation is not part of the beam pipeline, so it will kick off before the
>>pipeline starts. More ideally, this purge operation could be baked into 
>> the
>>write operation with ParquetIO so we will have the deletion happen right
>>before the files writes.
>>2. Regarding the naming strategy, yes the old files will be
>>overwritten by the new files if they have the same file names. However 
>> this
>>does not always guarantee that all the old files in this directory are
>>wiped out (which is actually my requirement). For example we may change 
>> the
>>shard count (through withNumShards() method) in different pipeline runs 
>> and
>>there could be old files from previous run that won’t get overwritten in
>>the current run.
>>
>>
>>
>> In general, Beam file-based sinks are intended  for writing new files. So
>> I don't think existing file-based sinks (including Parquet) will work out
>> of the box for replacing existing files or for appending to such files.
>>
>> But you should be able to delete existing files separately, for example.
>>
>> (1) As a function that is performed before executing the pipeline.
>>
>> (2) As a function that is performed from a ParDo step that is executed
>> before the ParquetIO.Write step. Also you will have to make sure that the
>> runner does not fuse the ParDo step and the Write step. Usually, this can
>> be done by performing it in a side-input step (to a ParDo that precedes
>> sink) or by adding a GBK/Reshuffle between the two steps.
>>
>>
>>
>> Thanks,
>>
>> Cham
>>
>>
>>
>>
>>
>>
>>1.
>>
>>
>>
>> Please let me know if this makes sense to you. Thanks!
>>
>>
>>
>>
>>
>> *From: *Alexey Romanenko 
>> *Reply-To: *"user@beam.apache.org" 
>> *Date: *Wednesday, January 27, 2021 at 9:10 AM
>> *To: *"user@beam.apache.org" 
>> *Subject: *Re: Overwrite support from ParquetIO
>>
>>
>>
>> What do you mean by “wipe out all existing parquet files before a write
>> operation”? Are these all files that already exist in the same output
>> directory? Can you purge this directory before or just use a new output
>> directory for every pipeline run?
>>
>>
>>
>> To write Parquet files you need to use ParquetIO.sink()
>> with FileIO.write() and I don’t think it will clean up the output directory
>> before write. Though, if there are the name collisions between existing and
>> new output files (it depends on used naming strategy) then I think the old
>> files will be overwritten by new ones.
>>
>>
>>
>>
>>
>>
>>
>> On 25 Jan 2021, at 19:10, Tao Li  wrote:
>>
>>
>>
>> Hi Beam community,
>>
>>
>>
>> Does ParquetIO support an overwrite behavior when saving files? More
>> specifically, I would like to wipe out all existing parquet files before a
>> write operation. Is there a ParquetIO API to support that? Thanks!
>>
>>
>>
>>


Re: Overwrite support from ParquetIO

2021-01-27 Thread Reuven Lax
Keep in mind thatt DoFns might be reexecuted (even if you think they have
completed successfully). This makes DoFns with side effects such as
deleting files tricky to write correctly.

On Wed, Jan 27, 2021 at 4:36 PM Tao Li  wrote:

> Thanks @Chamikara Jayalath  I think it’s a good
> idea to define a DoFn for this deletion operation, or maybe a composite
> PTransform that does deletion first followed by ParquetIO.Write.
>
>
>
> *From: *Chamikara Jayalath 
> *Reply-To: *"user@beam.apache.org" 
> *Date: *Wednesday, January 27, 2021 at 3:45 PM
> *To: *user 
> *Cc: *Alexey Romanenko 
> *Subject: *Re: Overwrite support from ParquetIO
>
>
>
>
>
>
>
> On Wed, Jan 27, 2021 at 12:06 PM Tao Li  wrote:
>
> @Alexey Romanenko  thanks for your response.
> Regarding your questions:
>
>
>
>1. Yes I can purge this directory (e.g. using s3 client from aws sdk)
>before using ParquetIO to save files. The caveat is that this deletion
>operation is not part of the beam pipeline, so it will kick off before the
>pipeline starts. More ideally, this purge operation could be baked into the
>write operation with ParquetIO so we will have the deletion happen right
>before the files writes.
>2. Regarding the naming strategy, yes the old files will be
>overwritten by the new files if they have the same file names. However this
>does not always guarantee that all the old files in this directory are
>wiped out (which is actually my requirement). For example we may change the
>shard count (through withNumShards() method) in different pipeline runs and
>there could be old files from previous run that won’t get overwritten in
>the current run.
>
>
>
> In general, Beam file-based sinks are intended  for writing new files. So
> I don't think existing file-based sinks (including Parquet) will work out
> of the box for replacing existing files or for appending to such files.
>
> But you should be able to delete existing files separately, for example.
>
> (1) As a function that is performed before executing the pipeline.
>
> (2) As a function that is performed from a ParDo step that is executed
> before the ParquetIO.Write step. Also you will have to make sure that the
> runner does not fuse the ParDo step and the Write step. Usually, this can
> be done by performing it in a side-input step (to a ParDo that precedes
> sink) or by adding a GBK/Reshuffle between the two steps.
>
>
>
> Thanks,
>
> Cham
>
>
>
>
>
>
>1.
>
>
>
> Please let me know if this makes sense to you. Thanks!
>
>
>
>
>
> *From: *Alexey Romanenko 
> *Reply-To: *"user@beam.apache.org" 
> *Date: *Wednesday, January 27, 2021 at 9:10 AM
> *To: *"user@beam.apache.org" 
> *Subject: *Re: Overwrite support from ParquetIO
>
>
>
> What do you mean by “wipe out all existing parquet files before a write
> operation”? Are these all files that already exist in the same output
> directory? Can you purge this directory before or just use a new output
> directory for every pipeline run?
>
>
>
> To write Parquet files you need to use ParquetIO.sink()
> with FileIO.write() and I don’t think it will clean up the output directory
> before write. Though, if there are the name collisions between existing and
> new output files (it depends on used naming strategy) then I think the old
> files will be overwritten by new ones.
>
>
>
>
>
>
>
> On 25 Jan 2021, at 19:10, Tao Li  wrote:
>
>
>
> Hi Beam community,
>
>
>
> Does ParquetIO support an overwrite behavior when saving files? More
> specifically, I would like to wipe out all existing parquet files before a
> write operation. Is there a ParquetIO API to support that? Thanks!
>
>
>
>


Re: Overwrite support from ParquetIO

2021-01-27 Thread Tao Li
Thanks @Chamikara Jayalath I think it’s a good 
idea to define a DoFn for this deletion operation, or maybe a composite 
PTransform that does deletion first followed by ParquetIO.Write.

From: Chamikara Jayalath 
Reply-To: "user@beam.apache.org" 
Date: Wednesday, January 27, 2021 at 3:45 PM
To: user 
Cc: Alexey Romanenko 
Subject: Re: Overwrite support from ParquetIO



On Wed, Jan 27, 2021 at 12:06 PM Tao Li 
mailto:t...@zillow.com>> wrote:
@Alexey Romanenko thanks for your response. 
Regarding your questions:


  1.  Yes I can purge this directory (e.g. using s3 client from aws sdk) before 
using ParquetIO to save files. The caveat is that this deletion operation is 
not part of the beam pipeline, so it will kick off before the pipeline starts. 
More ideally, this purge operation could be baked into the write operation with 
ParquetIO so we will have the deletion happen right before the files writes.
  2.  Regarding the naming strategy, yes the old files will be overwritten by 
the new files if they have the same file names. However this does not always 
guarantee that all the old files in this directory are wiped out (which is 
actually my requirement). For example we may change the shard count (through 
withNumShards() method) in different pipeline runs and there could be old files 
from previous run that won’t get overwritten in the current run.

In general, Beam file-based sinks are intended  for writing new files. So I 
don't think existing file-based sinks (including Parquet) will work out of the 
box for replacing existing files or for appending to such files.
But you should be able to delete existing files separately, for example.
(1) As a function that is performed before executing the pipeline.
(2) As a function that is performed from a ParDo step that is executed before 
the ParquetIO.Write step. Also you will have to make sure that the runner does 
not fuse the ParDo step and the Write step. Usually, this can be done by 
performing it in a side-input step (to a ParDo that precedes sink) or by adding 
a GBK/Reshuffle between the two steps.

Thanks,
Cham



  1.

Please let me know if this makes sense to you. Thanks!


From: Alexey Romanenko 
mailto:aromanenko@gmail.com>>
Reply-To: "user@beam.apache.org" 
mailto:user@beam.apache.org>>
Date: Wednesday, January 27, 2021 at 9:10 AM
To: "user@beam.apache.org" 
mailto:user@beam.apache.org>>
Subject: Re: Overwrite support from ParquetIO

What do you mean by “wipe out all existing parquet files before a write 
operation”? Are these all files that already exist in the same output 
directory? Can you purge this directory before or just use a new output 
directory for every pipeline run?

To write Parquet files you need to use ParquetIO.sink() with FileIO.write() and 
I don’t think it will clean up the output directory before write. Though, if 
there are the name collisions between existing and new output files (it depends 
on used naming strategy) then I think the old files will be overwritten by new 
ones.



On 25 Jan 2021, at 19:10, Tao Li mailto:t...@zillow.com>> 
wrote:

Hi Beam community,

Does ParquetIO support an overwrite behavior when saving files? More 
specifically, I would like to wipe out all existing parquet files before a 
write operation. Is there a ParquetIO API to support that? Thanks!



Re: Overwrite support from ParquetIO

2021-01-27 Thread Chamikara Jayalath
On Wed, Jan 27, 2021 at 12:06 PM Tao Li  wrote:

> @Alexey Romanenko  thanks for your response.
> Regarding your questions:
>
>
>
>1. Yes I can purge this directory (e.g. using s3 client from aws sdk)
>before using ParquetIO to save files. The caveat is that this deletion
>operation is not part of the beam pipeline, so it will kick off before the
>pipeline starts. More ideally, this purge operation could be baked into the
>write operation with ParquetIO so we will have the deletion happen right
>before the files writes.
>2. Regarding the naming strategy, yes the old files will be
>overwritten by the new files if they have the same file names. However this
>does not always guarantee that all the old files in this directory are
>wiped out (which is actually my requirement). For example we may change the
>shard count (through withNumShards() method) in different pipeline runs and
>there could be old files from previous run that won’t get overwritten in
>the current run.
>
>
In general, Beam file-based sinks are intended  for writing new files. So I
don't think existing file-based sinks (including Parquet) will work out of
the box for replacing existing files or for appending to such files.
But you should be able to delete existing files separately, for example.
(1) As a function that is performed before executing the pipeline.
(2) As a function that is performed from a ParDo step that is executed
before the ParquetIO.Write step. Also you will have to make sure that the
runner does not fuse the ParDo step and the Write step. Usually, this can
be done by performing it in a side-input step (to a ParDo that precedes
sink) or by adding a GBK/Reshuffle between the two steps.

Thanks,
Cham



>
>1.
>
>
>
> Please let me know if this makes sense to you. Thanks!
>
>
>
>
>
> *From: *Alexey Romanenko 
> *Reply-To: *"user@beam.apache.org" 
> *Date: *Wednesday, January 27, 2021 at 9:10 AM
> *To: *"user@beam.apache.org" 
> *Subject: *Re: Overwrite support from ParquetIO
>
>
>
> What do you mean by “wipe out all existing parquet files before a write
> operation”? Are these all files that already exist in the same output
> directory? Can you purge this directory before or just use a new output
> directory for every pipeline run?
>
>
>
> To write Parquet files you need to use ParquetIO.sink()
> with FileIO.write() and I don’t think it will clean up the output directory
> before write. Though, if there are the name collisions between existing and
> new output files (it depends on used naming strategy) then I think the old
> files will be overwritten by new ones.
>
>
>
>
>
>
>
> On 25 Jan 2021, at 19:10, Tao Li  wrote:
>
>
>
> Hi Beam community,
>
>
>
> Does ParquetIO support an overwrite behavior when saving files? More
> specifically, I would like to wipe out all existing parquet files before a
> write operation. Is there a ParquetIO API to support that? Thanks!
>
>
>


Re: Overwrite support from ParquetIO

2021-01-27 Thread Tao Li
@Alexey Romanenko thanks for your response. 
Regarding your questions:


  1.  Yes I can purge this directory (e.g. using s3 client from aws sdk) before 
using ParquetIO to save files. The caveat is that this deletion operation is 
not part of the beam pipeline, so it will kick off before the pipeline starts. 
More ideally, this purge operation could be baked into the write operation with 
ParquetIO so we will have the deletion happen right before the files writes.
  2.  Regarding the naming strategy, yes the old files will be overwritten by 
the new files if they have the same file names. However this does not always 
guarantee that all the old files in this directory are wiped out (which is 
actually my requirement). For example we may change the shard count (through 
withNumShards() method) in different pipeline runs and there could be old files 
from previous run that won’t get overwritten in the current run.

Please let me know if this makes sense to you. Thanks!


From: Alexey Romanenko 
Reply-To: "user@beam.apache.org" 
Date: Wednesday, January 27, 2021 at 9:10 AM
To: "user@beam.apache.org" 
Subject: Re: Overwrite support from ParquetIO

What do you mean by “wipe out all existing parquet files before a write 
operation”? Are these all files that already exist in the same output 
directory? Can you purge this directory before or just use a new output 
directory for every pipeline run?

To write Parquet files you need to use ParquetIO.sink() with FileIO.write() and 
I don’t think it will clean up the output directory before write. Though, if 
there are the name collisions between existing and new output files (it depends 
on used naming strategy) then I think the old files will be overwritten by new 
ones.




On 25 Jan 2021, at 19:10, Tao Li mailto:t...@zillow.com>> 
wrote:

Hi Beam community,

Does ParquetIO support an overwrite behavior when saving files? More 
specifically, I would like to wipe out all existing parquet files before a 
write operation. Is there a ParquetIO API to support that? Thanks!



Re: Beam support Flink Async I/O operator

2021-01-27 Thread Eleanore Jin
ha, yes, sorry I forgot about that

On Wed, Jan 27, 2021 at 4:31 AM Alexey Romanenko 
wrote:

> I guess it was already discussed a while ago [1] and the conclusion was
> that Flink Async I/O is a specific Flink operator and it can’t be used
> directly from Beam since Beam should provide a unified model and its
> implementation for all supported runners.
>
> Did a proposed workaround, based on GroupIntoBatches [2], work for you?
>
> [1]
> https://lists.apache.org/thread.html/rc1e087a15036c18564d3147c3570c7d80c4963ff7d48cf8eaf180758%40%3Cuser.beam.apache.org%3E
> [2]
> https://beam.apache.org/documentation/transforms/java/aggregation/groupintobatches/
>
> On 26 Jan 2021, at 22:09, Boyuan Zhang  wrote:
>
> +dev 
>
> On Tue, Jan 26, 2021 at 1:07 PM Eleanore Jin 
> wrote:
>
>> Hi community,
>>
>> Does Beam support Flink Async I/O operator? if so, can you please share
>> the doc, and if not, is there any workaround to achieve the same in Beam
>> semantics?
>>
>>
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html
>>
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65870673
>>
>> Thanks a lot!
>> Eleanore
>>
>
>


Re: Overwrite support from ParquetIO

2021-01-27 Thread Alexey Romanenko
What do you mean by “wipe out all existing parquet files before a write 
operation”? Are these all files that already exist in the same output 
directory? Can you purge this directory before or just use a new output 
directory for every pipeline run?

To write Parquet files you need to use ParquetIO.sink() with FileIO.write() and 
I don’t think it will clean up the output directory before write. Though, if 
there are the name collisions between existing and new output files (it depends 
on used naming strategy) then I think the old files will be overwritten by new 
ones. 



> On 25 Jan 2021, at 19:10, Tao Li  wrote:
> 
> Hi Beam community,
>  
> Does ParquetIO support an overwrite behavior when saving files? More 
> specifically, I would like to wipe out all existing parquet files before a 
> write operation. Is there a ParquetIO API to support that? Thanks!



Re: Beam support Flink Async I/O operator

2021-01-27 Thread Alexey Romanenko
I guess it was already discussed a while ago [1] and the conclusion was that 
Flink Async I/O is a specific Flink operator and it can’t be used directly from 
Beam since Beam should provide a unified model and its implementation for all 
supported runners.

Did a proposed workaround, based on GroupIntoBatches [2], work for you?

[1] 
https://lists.apache.org/thread.html/rc1e087a15036c18564d3147c3570c7d80c4963ff7d48cf8eaf180758%40%3Cuser.beam.apache.org%3E
[2] 
https://beam.apache.org/documentation/transforms/java/aggregation/groupintobatches/

> On 26 Jan 2021, at 22:09, Boyuan Zhang  wrote:
> 
> +dev  
> 
> On Tue, Jan 26, 2021 at 1:07 PM Eleanore Jin  > wrote:
> Hi community, 
> 
> Does Beam support Flink Async I/O operator? if so, can you please share the 
> doc, and if not, is there any workaround to achieve the same in Beam 
> semantics? 
> 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html
>  
> 
> 
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65870673 
> 
> 
> Thanks a lot!
> Eleanore