Re: A lesson about DoFn retries
Thanks for sharing the learnings Ahmed! > The solution lies in keeping the retry of each step separate. A good example of this is in how steps 2 and 3 are implemented [3]. They are separated into different DoFns and step 3 can start only after step 2 completes successfully. This way, any failure in step 3 does not go back to affect step 2. Is it enough just that they're in different DoFns? I thought the key was that the DoFns are separated by a GroupByKey, so they will be in different fused stages, which are retried independently. Brian On Thu, Sep 1, 2022 at 1:43 PM Ahmed Abualsaud via dev wrote: > Hi all, > > TLDR: When writing IO connectors, be wary of how bundle retries can affect > the work flow. > > A faulty implementation of a step in BigQuery batch loads was discovered > recently. I raised an issue [1] but also wanted to mention it here as a > potentially helpful lesson for those developing new/existing IO connectors. > > For those unfamiliar with BigQueryIO file loads, a write that is too large > for a single load job [2] looks roughly something like this: > > >1. > >Take input rows and write them to temporary files. >2. > >Load temporary files to temporary BQ tables. >3. > >Delete temporary files. >4. > >Copy the contents of temporary tables over to the final table. >5. > >Delete temporary tables. > > > The faulty part here is that steps 4 and 5 are done in the same DoFn (4 in > processElement and 5 in finishBundle). In the case a bundle fails in the > middle of table deletion, let’s say an error occurs when deleting the nth > table, the whole bundle will retry and we will perform the copy again. But > tables 1~n have already been deleted and so we get stuck trying to copy > from non-existent sources. > > The solution lies in keeping the retry of each step separate. A good > example of this is in how steps 2 and 3 are implemented [3]. They are > separated into different DoFns and step 3 can start only after step 2 > completes successfully. This way, any failure in step 3 does not go back to > affect step 2. > > That's all, thanks for your attention :) > > Ahmed > > [1] https://github.com/apache/beam/issues/22920 > > [2] > https://github.com/apache/beam/blob/f921a2f1996cf906d994a9d62aeb6978bab09dd5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java#L100-L105 > > > [3] > https://github.com/apache/beam/blob/149ed074428ff9b5344169da7d54e8ee271aaba1/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java#L437-L454 > > >
A lesson about DoFn retries
Hi all, TLDR: When writing IO connectors, be wary of how bundle retries can affect the work flow. A faulty implementation of a step in BigQuery batch loads was discovered recently. I raised an issue [1] but also wanted to mention it here as a potentially helpful lesson for those developing new/existing IO connectors. For those unfamiliar with BigQueryIO file loads, a write that is too large for a single load job [2] looks roughly something like this: 1. Take input rows and write them to temporary files. 2. Load temporary files to temporary BQ tables. 3. Delete temporary files. 4. Copy the contents of temporary tables over to the final table. 5. Delete temporary tables. The faulty part here is that steps 4 and 5 are done in the same DoFn (4 in processElement and 5 in finishBundle). In the case a bundle fails in the middle of table deletion, let’s say an error occurs when deleting the nth table, the whole bundle will retry and we will perform the copy again. But tables 1~n have already been deleted and so we get stuck trying to copy from non-existent sources. The solution lies in keeping the retry of each step separate. A good example of this is in how steps 2 and 3 are implemented [3]. They are separated into different DoFns and step 3 can start only after step 2 completes successfully. This way, any failure in step 3 does not go back to affect step 2. That's all, thanks for your attention :) Ahmed [1] https://github.com/apache/beam/issues/22920 [2] https://github.com/apache/beam/blob/f921a2f1996cf906d994a9d62aeb6978bab09dd5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java#L100-L105 [3] https://github.com/apache/beam/blob/149ed074428ff9b5344169da7d54e8ee271aaba1/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java#L437-L454
Beam Dependency Check Report (2022-09-01)
<<< text/html; charset=UTF-8: Unrecognized >>>
Re: [JmsIO] => Pull Request to fix message acknowledgement issue
I have a better understanding of the problem after reviewing the doc and we need to decide on what lifecycle scope we want the `Connection`, `Session`, and `MessageConsumer` to have. It looks like for the `Connection` we should try to have at most one instance for the entire process per connection factory. https://docs.oracle.com/javaee/7/api/javax/jms/Connection.html says that the connection should be re-used. Having less connections would likely be beneficial unless you think there would be a performance limitation of using a single connection per process for all the messages? For the `Session` and `MessageConsumer`, I'm not sure what lifecycle scope it should have. Some ideas: 1. we could make it so that each `Session` is unique per checkpoint, e.g. we hand off the ownership of the `Session` to the JmsCheckpointMark everytime we checkpoint and create a new `Session` for the next set of messages we receive. This would mean that we would also close the `MessageConsumer` at every checkpoint and create a new one. 2. we could make it so that the `Session` is scoped to the reader start/close and possibly multiple checkpoint marks and effectively close the `Session` once the reader is closed and all checkpoint marks are finalized/expired. We would close the `MessageConsumer` whenever the reader is closed. 3. we could make it so that the `Session` is scoped to the `Connection` and would only close it when the `Connection` closes. 1 seems pretty simple since the ownership of the `Session` is always owned by a single distinct owner. This seems like it would make the most sense if `Session` creation and management was cheap. Another positive is that once the `Session` closes any messages that weren't acknowledged are returned back to the queue and we will not have to wait for the reader to be closed or all the checkpoint marks to be finalized. What do you think? On Mon, Aug 29, 2022 at 10:06 PM Jean-Baptiste Onofré wrote: > Hi Vincent, > > thanks, I will take a look (as original JmsIO author ;)). > > Regards > JB > > On Mon, Aug 29, 2022 at 6:43 PM BALLADA Vincent > wrote: > > > > Hi all, > > > > > > > > Here is a PR related to the following issue (Runner acknowledges > messages on closed session): > > > > https://github.com/apache/beam/issues/20814 > > > > > > > > And here is a documentation explaining the fix: > > > > > https://docs.google.com/document/d/19HiNPoJeIlzCFyWGdlw7WEFutceL2AmN_5Z0Vmi1s-I/edit?usp=sharing > > > > > > > > And finally the PR: > > > > https://github.com/apache/beam/pull/22932 > > > > > > > > Regards > > > > > > > > Vincent BALLADA > > > > > > > > > > > > > > Confidential C > > > > -- Disclaimer > > Ce message ainsi que les eventuelles pieces jointes constituent une > correspondance privee et confidentielle a l'attention exclusive du > destinataire designe ci-dessus. Si vous n'etes pas le destinataire du > present message ou une personne susceptible de pouvoir le lui delivrer, il > vous est signifie que toute divulgation, distribution ou copie de cette > transmission est strictement interdite. Si vous avez recu ce message par > erreur, nous vous remercions d'en informer l'expediteur par telephone ou de > lui retourner le present message, puis d'effacer immediatement ce message > de votre systeme. > > > > *** This e-mail and any attachments is a confidential correspondence > intended only for use of the individual or entity named above. If you are > not the intended recipient or the agent responsible for delivering the > message to the intended recipient, you are hereby notified that any > disclosure, distribution or copying of this communication is strictly > prohibited. If you have received this communication in error, please notify > the sender by phone or by replying this message, and then delete this > message from your system. >
Beam High Priority Issue Report (68)
This is your daily summary of Beam's current high priority issues that may need attention. See https://beam.apache.org/contribute/issue-priorities for the meaning and expectations around issue priorities. Unassigned P1 Issues: https://github.com/apache/beam/issues/22969 Discrepancy in behavior of `DoFn.process()` when `yield` is combined with `return` statement, or vice versa https://github.com/apache/beam/issues/22913 [Bug]: beam_PostCommit_Java_ValidatesRunner_Flink is failing https://github.com/apache/beam/issues/22749 [Bug]: Bytebuddy version update causes Invisible parameter type error https://github.com/apache/beam/issues/22743 [Bug]: Test flake: org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImplTest.testInsertWithinRowCountLimits https://github.com/apache/beam/issues/22440 [Bug]: Python Batch Dataflow SideInput LoadTests failing https://github.com/apache/beam/issues/22321 PortableRunnerTestWithExternalEnv.test_pardo_large_input is regularly failing on jenkins https://github.com/apache/beam/issues/22303 [Task]: Add tests to Kafka SDF and fix known and discovered issues https://github.com/apache/beam/issues/22299 [Bug]: JDBCIO Write freeze at getConnection() in WriteFn https://github.com/apache/beam/issues/22283 [Bug]: Python Lots of fn runner test items cost exactly 5 seconds to run https://github.com/apache/beam/issues/21794 Dataflow runner creates a new timer whenever the output timestamp is change https://github.com/apache/beam/issues/21713 404s in BigQueryIO don't get output to Failed Inserts PCollection https://github.com/apache/beam/issues/21704 beam_PostCommit_Java_DataflowV2 failures parent bug https://github.com/apache/beam/issues/21703 pubsublite.ReadWriteIT failing in beam_PostCommit_Java_DataflowV1 and V2 https://github.com/apache/beam/issues/21702 SpannerWriteIT failing in beam PostCommit Java V1 https://github.com/apache/beam/issues/21701 beam_PostCommit_Java_DataflowV1 failing with a variety of flakes and errors https://github.com/apache/beam/issues/21700 --dataflowServiceOptions=use_runner_v2 is broken https://github.com/apache/beam/issues/21696 Flink Tests failure : java.lang.NoClassDefFoundError: Could not initialize class org.apache.beam.runners.core.construction.SerializablePipelineOptions https://github.com/apache/beam/issues/21695 DataflowPipelineResult does not raise exception for unsuccessful states. https://github.com/apache/beam/issues/21694 BigQuery Storage API insert with writeResult retry and write to error table https://github.com/apache/beam/issues/21480 flake: FlinkRunnerTest.testEnsureStdoutStdErrIsRestored https://github.com/apache/beam/issues/21472 Dataflow streaming tests failing new AfterSynchronizedProcessingTime test https://github.com/apache/beam/issues/21471 Flakes: Failed to load cache entry https://github.com/apache/beam/issues/21470 Test flake: test_split_half_sdf https://github.com/apache/beam/issues/21469 beam_PostCommit_XVR_Flink flaky: Connection refused https://github.com/apache/beam/issues/21468 beam_PostCommit_Python_Examples_Dataflow failing https://github.com/apache/beam/issues/21467 GBK and CoGBK streaming Java load tests failing https://github.com/apache/beam/issues/21465 Kafka commit offset drop data on failure for runners that have non-checkpointing shuffle https://github.com/apache/beam/issues/21463 NPE in Flink Portable ValidatesRunner streaming suite https://github.com/apache/beam/issues/21462 Flake in org.apache.beam.sdk.io.mqtt.MqttIOTest.testReadObject: Address already in use https://github.com/apache/beam/issues/21271 pubsublite.ReadWriteIT flaky in beam_PostCommit_Java_DataflowV2 https://github.com/apache/beam/issues/21270 org.apache.beam.sdk.transforms.CombineTest$WindowingTests.testWindowedCombineGloballyAsSingletonView flaky on Dataflow Runner V2 https://github.com/apache/beam/issues/21268 Race between member variable being accessed due to leaking uninitialized state via OutboundObserverFactory https://github.com/apache/beam/issues/21267 WriteToBigQuery submits a duplicate BQ load job if a 503 error code is returned from googleapi https://github.com/apache/beam/issues/21266 org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElementStateful is flaky in Java ValidatesRunner Flink suite. https://github.com/apache/beam/issues/21265 apache_beam.runners.portability.fn_api_runner.translations_test.TranslationsTest.test_run_packable_combine_globally 'apache_beam.coders.coder_impl._AbstractIterable' object is not reversible https://github.com/apache/beam/issues/21263 (Broken Pipe induced) Bricked Dataflow Pipeline https://github.com/apache/beam/issues/21262 Python AfterAny, AfterAll do not follow spec https://github.com/apache/beam/issues/21261 org.apache.beam.runners.dataflow.worker.fn.logging.BeamFnLoggingServiceTest.testMultipleClientsFailingIsHandledGracefullyByServer is flaky https://github.com/apache/beam/issues/21260 Python DirectRunne