Re: A lesson about DoFn retries

2022-09-01 Thread Brian Hulette via dev
Thanks for sharing the learnings Ahmed!

> The solution lies in keeping the retry of each step separate. A good
example of this is in how steps 2 and 3 are implemented [3]. They are
separated into different DoFns and step 3 can start only after step 2
completes successfully. This way, any failure in step 3 does not go back to
affect step 2. Is it enough just that they're in different DoFns? I thought
the key was that the DoFns are separated by a GroupByKey, so they will be
in different fused stages, which are retried independently.

Brian

On Thu, Sep 1, 2022 at 1:43 PM Ahmed Abualsaud via dev 
wrote:

> Hi all,
>
> TLDR: When writing IO connectors, be wary of how bundle retries can affect
> the work flow.
>
> A faulty implementation of a step in BigQuery batch loads was discovered
> recently. I raised an issue [1] but also wanted to mention it here as a
> potentially helpful lesson for those developing new/existing IO connectors.
>
> For those unfamiliar with BigQueryIO file loads, a write that is too large
> for a single load job [2] looks roughly something like this:
>
>
>1.
>
>Take input rows and write them to temporary files.
>2.
>
>Load temporary files to temporary BQ tables.
>3.
>
>Delete temporary files.
>4.
>
>Copy the contents of temporary tables over to the final table.
>5.
>
>Delete temporary tables.
>
>
> The faulty part here is that steps 4 and 5 are done in the same DoFn (4 in
> processElement and 5 in finishBundle). In the case a bundle fails in the
> middle of table deletion, let’s say an error occurs when deleting the nth
> table, the whole bundle will retry and we will perform the copy again. But
> tables 1~n have already been deleted and so we get stuck trying to copy
> from non-existent sources.
>
> The solution lies in keeping the retry of each step separate. A good
> example of this is in how steps 2 and 3 are implemented [3]. They are
> separated into different DoFns and step 3 can start only after step 2
> completes successfully. This way, any failure in step 3 does not go back to
> affect step 2.
>
> That's all, thanks for your attention :)
>
> Ahmed
>
> [1] https://github.com/apache/beam/issues/22920
>
> [2]
> https://github.com/apache/beam/blob/f921a2f1996cf906d994a9d62aeb6978bab09dd5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java#L100-L105
>
>
> [3]
> https://github.com/apache/beam/blob/149ed074428ff9b5344169da7d54e8ee271aaba1/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java#L437-L454
>
>
>


A lesson about DoFn retries

2022-09-01 Thread Ahmed Abualsaud via dev
Hi all,

TLDR: When writing IO connectors, be wary of how bundle retries can affect
the work flow.

A faulty implementation of a step in BigQuery batch loads was discovered
recently. I raised an issue [1] but also wanted to mention it here as a
potentially helpful lesson for those developing new/existing IO connectors.

For those unfamiliar with BigQueryIO file loads, a write that is too large
for a single load job [2] looks roughly something like this:


   1.

   Take input rows and write them to temporary files.
   2.

   Load temporary files to temporary BQ tables.
   3.

   Delete temporary files.
   4.

   Copy the contents of temporary tables over to the final table.
   5.

   Delete temporary tables.


The faulty part here is that steps 4 and 5 are done in the same DoFn (4 in
processElement and 5 in finishBundle). In the case a bundle fails in the
middle of table deletion, let’s say an error occurs when deleting the nth
table, the whole bundle will retry and we will perform the copy again. But
tables 1~n have already been deleted and so we get stuck trying to copy
from non-existent sources.

The solution lies in keeping the retry of each step separate. A good
example of this is in how steps 2 and 3 are implemented [3]. They are
separated into different DoFns and step 3 can start only after step 2
completes successfully. This way, any failure in step 3 does not go back to
affect step 2.

That's all, thanks for your attention :)

Ahmed

[1] https://github.com/apache/beam/issues/22920

[2]
https://github.com/apache/beam/blob/f921a2f1996cf906d994a9d62aeb6978bab09dd5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java#L100-L105


[3]
https://github.com/apache/beam/blob/149ed074428ff9b5344169da7d54e8ee271aaba1/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java#L437-L454


Beam Dependency Check Report (2022-09-01)

2022-09-01 Thread Apache Jenkins Server
<<< text/html; charset=UTF-8: Unrecognized >>>


Re: [JmsIO] => Pull Request to fix message acknowledgement issue

2022-09-01 Thread Luke Cwik via dev
I have a better understanding of the problem after reviewing the doc and we
need to decide on what lifecycle scope we want the `Connection`, `Session`,
and `MessageConsumer` to have.

It looks like for the `Connection` we should try to have at most one
instance for the entire process per connection factory.
https://docs.oracle.com/javaee/7/api/javax/jms/Connection.html says that
the connection should be re-used. Having less connections would likely be
beneficial unless you think there would be a performance limitation of
using a single connection per process for all the messages?

For the `Session` and `MessageConsumer`, I'm not sure what lifecycle scope
it should have. Some ideas:
1. we could make it so that each `Session` is unique per checkpoint, e.g.
we hand off the ownership of the `Session` to the JmsCheckpointMark
everytime we checkpoint and create a new `Session` for the next set of
messages we receive. This would mean that we would also close the
`MessageConsumer` at every checkpoint and create a new one.
2. we could make it so that the `Session` is scoped to the reader
start/close and possibly multiple checkpoint marks and effectively close
the `Session` once the reader is closed and all checkpoint marks are
finalized/expired. We would close the `MessageConsumer` whenever the reader
is closed.
3. we could make it so that the `Session` is scoped to the `Connection` and
would only close it when the `Connection` closes.

1 seems pretty simple since the ownership of the `Session` is always owned
by a single distinct owner. This seems like it would make the most sense if
`Session` creation and management was cheap. Another positive is that once
the `Session` closes any messages that weren't acknowledged are returned
back to the queue and we will not have to wait for the reader to be closed
or all the checkpoint marks to be finalized.

What do you think?

On Mon, Aug 29, 2022 at 10:06 PM Jean-Baptiste Onofré 
wrote:

> Hi Vincent,
>
> thanks, I will take a look (as original JmsIO author ;)).
>
> Regards
> JB
>
> On Mon, Aug 29, 2022 at 6:43 PM BALLADA Vincent
>  wrote:
> >
> > Hi all,
> >
> >
> >
> > Here is a PR related to the following issue (Runner acknowledges
> messages on closed session):
> >
> > https://github.com/apache/beam/issues/20814
> >
> >
> >
> > And here is a documentation explaining the fix:
> >
> >
> https://docs.google.com/document/d/19HiNPoJeIlzCFyWGdlw7WEFutceL2AmN_5Z0Vmi1s-I/edit?usp=sharing
> >
> >
> >
> > And finally the PR:
> >
> > https://github.com/apache/beam/pull/22932
> >
> >
> >
> > Regards
> >
> >
> >
> > Vincent BALLADA
> >
> >
> >
> >
> >
> >
> > Confidential C
> >
> > -- Disclaimer 
> > Ce message ainsi que les eventuelles pieces jointes constituent une
> correspondance privee et confidentielle a l'attention exclusive du
> destinataire designe ci-dessus. Si vous n'etes pas le destinataire du
> present message ou une personne susceptible de pouvoir le lui delivrer, il
> vous est signifie que toute divulgation, distribution ou copie de cette
> transmission est strictement interdite. Si vous avez recu ce message par
> erreur, nous vous remercions d'en informer l'expediteur par telephone ou de
> lui retourner le present message, puis d'effacer immediatement ce message
> de votre systeme.
> >
> > *** This e-mail and any attachments is a confidential correspondence
> intended only for use of the individual or entity named above. If you are
> not the intended recipient or the agent responsible for delivering the
> message to the intended recipient, you are hereby notified that any
> disclosure, distribution or copying of this communication is strictly
> prohibited. If you have received this communication in error, please notify
> the sender by phone or by replying this message, and then delete this
> message from your system.
>


Beam High Priority Issue Report (68)

2022-09-01 Thread beamactions
This is your daily summary of Beam's current high priority issues that may need 
attention.

See https://beam.apache.org/contribute/issue-priorities for the meaning and 
expectations around issue priorities.

Unassigned P1 Issues:

https://github.com/apache/beam/issues/22969 Discrepancy in behavior of 
`DoFn.process()` when `yield` is combined with `return` statement, or vice versa
https://github.com/apache/beam/issues/22913 [Bug]: 
beam_PostCommit_Java_ValidatesRunner_Flink is failing
https://github.com/apache/beam/issues/22749 [Bug]: Bytebuddy version update 
causes Invisible parameter type error
https://github.com/apache/beam/issues/22743 [Bug]: Test flake: 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImplTest.testInsertWithinRowCountLimits
https://github.com/apache/beam/issues/22440 [Bug]: Python Batch Dataflow 
SideInput LoadTests failing
https://github.com/apache/beam/issues/22321 
PortableRunnerTestWithExternalEnv.test_pardo_large_input is regularly failing 
on jenkins
https://github.com/apache/beam/issues/22303 [Task]: Add tests to Kafka SDF and 
fix known and discovered issues
https://github.com/apache/beam/issues/22299 [Bug]: JDBCIO Write freeze at 
getConnection() in WriteFn
https://github.com/apache/beam/issues/22283 [Bug]: Python Lots of fn runner 
test items cost exactly 5 seconds to run
https://github.com/apache/beam/issues/21794 Dataflow runner creates a new timer 
whenever the output timestamp is change
https://github.com/apache/beam/issues/21713 404s in BigQueryIO don't get output 
to Failed Inserts PCollection
https://github.com/apache/beam/issues/21704 beam_PostCommit_Java_DataflowV2 
failures parent bug
https://github.com/apache/beam/issues/21703 pubsublite.ReadWriteIT failing in 
beam_PostCommit_Java_DataflowV1 and V2
https://github.com/apache/beam/issues/21702 SpannerWriteIT failing in beam 
PostCommit Java V1
https://github.com/apache/beam/issues/21701 beam_PostCommit_Java_DataflowV1 
failing with a variety of flakes and errors
https://github.com/apache/beam/issues/21700 
--dataflowServiceOptions=use_runner_v2 is broken
https://github.com/apache/beam/issues/21696 Flink Tests failure :  
java.lang.NoClassDefFoundError: Could not initialize class 
org.apache.beam.runners.core.construction.SerializablePipelineOptions 
https://github.com/apache/beam/issues/21695 DataflowPipelineResult does not 
raise exception for unsuccessful states.
https://github.com/apache/beam/issues/21694 BigQuery Storage API insert with 
writeResult retry and write to error table
https://github.com/apache/beam/issues/21480 flake: 
FlinkRunnerTest.testEnsureStdoutStdErrIsRestored
https://github.com/apache/beam/issues/21472 Dataflow streaming tests failing 
new AfterSynchronizedProcessingTime test
https://github.com/apache/beam/issues/21471 Flakes: Failed to load cache entry
https://github.com/apache/beam/issues/21470 Test flake: test_split_half_sdf
https://github.com/apache/beam/issues/21469 beam_PostCommit_XVR_Flink flaky: 
Connection refused
https://github.com/apache/beam/issues/21468 
beam_PostCommit_Python_Examples_Dataflow failing
https://github.com/apache/beam/issues/21467 GBK and CoGBK streaming Java load 
tests failing
https://github.com/apache/beam/issues/21465 Kafka commit offset drop data on 
failure for runners that have non-checkpointing shuffle
https://github.com/apache/beam/issues/21463 NPE in Flink Portable 
ValidatesRunner streaming suite
https://github.com/apache/beam/issues/21462 Flake in 
org.apache.beam.sdk.io.mqtt.MqttIOTest.testReadObject: Address already in use
https://github.com/apache/beam/issues/21271 pubsublite.ReadWriteIT flaky in 
beam_PostCommit_Java_DataflowV2  
https://github.com/apache/beam/issues/21270 
org.apache.beam.sdk.transforms.CombineTest$WindowingTests.testWindowedCombineGloballyAsSingletonView
 flaky on Dataflow Runner V2
https://github.com/apache/beam/issues/21268 Race between member variable being 
accessed due to leaking uninitialized state via OutboundObserverFactory
https://github.com/apache/beam/issues/21267 WriteToBigQuery submits a duplicate 
BQ load job if a 503 error code is returned from googleapi
https://github.com/apache/beam/issues/21266 
org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElementStateful
 is flaky in Java ValidatesRunner Flink suite.
https://github.com/apache/beam/issues/21265 
apache_beam.runners.portability.fn_api_runner.translations_test.TranslationsTest.test_run_packable_combine_globally
 'apache_beam.coders.coder_impl._AbstractIterable' object is not reversible
https://github.com/apache/beam/issues/21263 (Broken Pipe induced) Bricked 
Dataflow Pipeline 
https://github.com/apache/beam/issues/21262 Python AfterAny, AfterAll do not 
follow spec
https://github.com/apache/beam/issues/21261 
org.apache.beam.runners.dataflow.worker.fn.logging.BeamFnLoggingServiceTest.testMultipleClientsFailingIsHandledGracefullyByServer
 is flaky
https://github.com/apache/beam/issues/21260 Python DirectRunne