Re: A lesson about DoFn retries

2022-09-05 Thread Jan Lukavský
Yes, all portable runners use fusion, it is built into the machinery 
that translates Pipeline into protobuf representation. It is needed for 
the ability to run the pipeline efficiently, otherwise there would be 
too many calls between the runner and SDK harness. Which is why the 
translation creates fused "executable stages".


On 9/3/22 04:34, Ahmed Abualsaud via dev wrote:
Yes you’re right, I forgot to mention that important piece of 
information  thanks for catching it.  The GBK keeps the DoFns 
separate at pipeline execution.


From what I’ve learned fusion is a Dataflow thing, do other runners do 
it too?


On Thu, Sep 1, 2022 at 6:08 PM Brian Hulette  wrote:

Thanks for sharing the learnings Ahmed!

> The solution lies in keeping the retry of each step separate. A
good example of this is in how steps 2 and 3 are implemented [3].
They are separated into different DoFns and step 3 can start only
after step 2 completes successfully. This way, any failure in step
3 does not go back to affect step 2. Is it enough just that
they're in different DoFns? I thought the key was that the DoFns
are separated by a GroupByKey, so they will be in different fused
stages, which are retried independently.
Brian

On Thu, Sep 1, 2022 at 1:43 PM Ahmed Abualsaud via dev
 wrote:

Hi all,


TLDR: When writing IO connectors, be wary of how bundle
retries can affect the work flow.


A faulty implementation of a step in BigQuery batch loads was
discovered recently.I raised an issue [1] but also wanted to
mention it here as a potentially helpful lesson for those
developing new/existing IO connectors.


For those unfamiliar with BigQueryIO file loads, a write that
is too large for a single load job [2] looks roughly something
like this:


1.

Take input rows and write them to temporary files.

2.

Load temporary files to temporary BQ tables.

3.

Delete temporary files.

4.

Copy the contents of temporary tables over to the final table.

5.

Delete temporary tables.


The faulty part here is that steps 4 and 5 are done in the
same DoFn (4 in processElementand 5 in finishBundle). In the
case a bundle fails in the middle of table deletion, let’s say
an error occurs when deleting the nthtable, the whole bundle
will retry and we will perform the copy again. But tables 1~n
have already been deleted and so we get stuck trying to copy
from non-existent sources.


The solution lies in keeping the retry of each step separate.
A good example of this is in how steps 2 and 3 are implemented
[3]. They are separated into different DoFns and step 3 can
start only after step 2 completes successfully. This way, any
failure in step 3 does not go back to affect step 2.


That's all, thanks for your attention :)

Ahmed


[1] https://github.com/apache/beam/issues/22920


[2]

https://github.com/apache/beam/blob/f921a2f1996cf906d994a9d62aeb6978bab09dd5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java#L100-L105



[3]

https://github.com/apache/beam/blob/149ed074428ff9b5344169da7d54e8ee271aaba1/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java#L437-L454





Re: A lesson about DoFn retries

2022-09-02 Thread Ahmed Abualsaud via dev
Yes you’re right, I forgot to mention that important piece of information
 thanks for catching it.  The GBK keeps the DoFns separate at pipeline
execution.

>From what I’ve learned fusion is a Dataflow thing, do other runners do it
too?

On Thu, Sep 1, 2022 at 6:08 PM Brian Hulette  wrote:

> Thanks for sharing the learnings Ahmed!
>
> > The solution lies in keeping the retry of each step separate. A good
> example of this is in how steps 2 and 3 are implemented [3]. They are
> separated into different DoFns and step 3 can start only after step 2
> completes successfully. This way, any failure in step 3 does not go back to
> affect step 2. Is it enough just that they're in different DoFns? I thought
> the key was that the DoFns are separated by a GroupByKey, so they will be
> in different fused stages, which are retried independently.
>
> Brian
>
> On Thu, Sep 1, 2022 at 1:43 PM Ahmed Abualsaud via dev <
> dev@beam.apache.org> wrote:
>
>> Hi all,
>>
>> TLDR: When writing IO connectors, be wary of how bundle retries can
>> affect the work flow.
>>
>> A faulty implementation of a step in BigQuery batch loads was discovered
>> recently. I raised an issue [1] but also wanted to mention it here as a
>> potentially helpful lesson for those developing new/existing IO connectors.
>>
>> For those unfamiliar with BigQueryIO file loads, a write that is too
>> large for a single load job [2] looks roughly something like this:
>>
>>
>>1.
>>
>>Take input rows and write them to temporary files.
>>2.
>>
>>Load temporary files to temporary BQ tables.
>>3.
>>
>>Delete temporary files.
>>4.
>>
>>Copy the contents of temporary tables over to the final table.
>>5.
>>
>>Delete temporary tables.
>>
>>
>> The faulty part here is that steps 4 and 5 are done in the same DoFn (4
>> in processElement and 5 in finishBundle). In the case a bundle fails in
>> the middle of table deletion, let’s say an error occurs when deleting the n
>> th table, the whole bundle will retry and we will perform the copy
>> again. But tables 1~n have already been deleted and so we get stuck trying
>> to copy from non-existent sources.
>>
>> The solution lies in keeping the retry of each step separate. A good
>> example of this is in how steps 2 and 3 are implemented [3]. They are
>> separated into different DoFns and step 3 can start only after step 2
>> completes successfully. This way, any failure in step 3 does not go back to
>> affect step 2.
>>
>> That's all, thanks for your attention :)
>>
>> Ahmed
>>
>> [1] https://github.com/apache/beam/issues/22920
>>
>> [2]
>> https://github.com/apache/beam/blob/f921a2f1996cf906d994a9d62aeb6978bab09dd5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java#L100-L105
>>
>>
>> [3]
>> https://github.com/apache/beam/blob/149ed074428ff9b5344169da7d54e8ee271aaba1/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java#L437-L454
>>
>>
>>


Re: A lesson about DoFn retries

2022-09-01 Thread Brian Hulette via dev
Thanks for sharing the learnings Ahmed!

> The solution lies in keeping the retry of each step separate. A good
example of this is in how steps 2 and 3 are implemented [3]. They are
separated into different DoFns and step 3 can start only after step 2
completes successfully. This way, any failure in step 3 does not go back to
affect step 2. Is it enough just that they're in different DoFns? I thought
the key was that the DoFns are separated by a GroupByKey, so they will be
in different fused stages, which are retried independently.

Brian

On Thu, Sep 1, 2022 at 1:43 PM Ahmed Abualsaud via dev 
wrote:

> Hi all,
>
> TLDR: When writing IO connectors, be wary of how bundle retries can affect
> the work flow.
>
> A faulty implementation of a step in BigQuery batch loads was discovered
> recently. I raised an issue [1] but also wanted to mention it here as a
> potentially helpful lesson for those developing new/existing IO connectors.
>
> For those unfamiliar with BigQueryIO file loads, a write that is too large
> for a single load job [2] looks roughly something like this:
>
>
>1.
>
>Take input rows and write them to temporary files.
>2.
>
>Load temporary files to temporary BQ tables.
>3.
>
>Delete temporary files.
>4.
>
>Copy the contents of temporary tables over to the final table.
>5.
>
>Delete temporary tables.
>
>
> The faulty part here is that steps 4 and 5 are done in the same DoFn (4 in
> processElement and 5 in finishBundle). In the case a bundle fails in the
> middle of table deletion, let’s say an error occurs when deleting the nth
> table, the whole bundle will retry and we will perform the copy again. But
> tables 1~n have already been deleted and so we get stuck trying to copy
> from non-existent sources.
>
> The solution lies in keeping the retry of each step separate. A good
> example of this is in how steps 2 and 3 are implemented [3]. They are
> separated into different DoFns and step 3 can start only after step 2
> completes successfully. This way, any failure in step 3 does not go back to
> affect step 2.
>
> That's all, thanks for your attention :)
>
> Ahmed
>
> [1] https://github.com/apache/beam/issues/22920
>
> [2]
> https://github.com/apache/beam/blob/f921a2f1996cf906d994a9d62aeb6978bab09dd5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java#L100-L105
>
>
> [3]
> https://github.com/apache/beam/blob/149ed074428ff9b5344169da7d54e8ee271aaba1/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java#L437-L454
>
>
>


A lesson about DoFn retries

2022-09-01 Thread Ahmed Abualsaud via dev
Hi all,

TLDR: When writing IO connectors, be wary of how bundle retries can affect
the work flow.

A faulty implementation of a step in BigQuery batch loads was discovered
recently. I raised an issue [1] but also wanted to mention it here as a
potentially helpful lesson for those developing new/existing IO connectors.

For those unfamiliar with BigQueryIO file loads, a write that is too large
for a single load job [2] looks roughly something like this:


   1.

   Take input rows and write them to temporary files.
   2.

   Load temporary files to temporary BQ tables.
   3.

   Delete temporary files.
   4.

   Copy the contents of temporary tables over to the final table.
   5.

   Delete temporary tables.


The faulty part here is that steps 4 and 5 are done in the same DoFn (4 in
processElement and 5 in finishBundle). In the case a bundle fails in the
middle of table deletion, let’s say an error occurs when deleting the nth
table, the whole bundle will retry and we will perform the copy again. But
tables 1~n have already been deleted and so we get stuck trying to copy
from non-existent sources.

The solution lies in keeping the retry of each step separate. A good
example of this is in how steps 2 and 3 are implemented [3]. They are
separated into different DoFns and step 3 can start only after step 2
completes successfully. This way, any failure in step 3 does not go back to
affect step 2.

That's all, thanks for your attention :)

Ahmed

[1] https://github.com/apache/beam/issues/22920

[2]
https://github.com/apache/beam/blob/f921a2f1996cf906d994a9d62aeb6978bab09dd5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java#L100-L105


[3]
https://github.com/apache/beam/blob/149ed074428ff9b5344169da7d54e8ee271aaba1/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java#L437-L454