> > It is likely that the incorrect transform was edited... > It appears you're right; I tried to reproduce but this time was able to clear the issue by making "the same" code change and updating the pipeline. I believe it was just a change in the wrong place in code.
Good to know this works! Thanks Luke 🙏 On Tue, Aug 10, 2021 at 1:19 PM Luke Cwik <lc...@google.com> wrote: > > > On Tue, Aug 10, 2021 at 10:11 AM Evan Galpin <evan.gal...@gmail.com> > wrote: > >> Thanks for your responses Luke. One point I have confusion over: >> >> * Modify the sink implementation to do what you want with the bad data >>> and update the pipeline. >>> >> >> I modified the sink implementation to ignore the specific error that was >> the problem and updated the pipeline. The update succeeded. However, the >> problem persisted. How might that happen? Is there caching involved? >> Checkpointing? I changed the very last method called in the pipeline in >> order to ensure the validation would apply, but the problem persisted. >> > > It is likely that the incorrect transform was edited. You should take a > look at the worker logs and find the exception that is being thrown and > find the step name it is associated with (e.g. > BigQueryIO/Write/StreamingInserts) and browse the source for a > "StreamingInserts" transform that is applied from the "Write" transform > which is applied from the BigQueryIO transform. > > >> >> And in the case where one is using a Sink which is a built-in IO module >> of Beam, modification of the Sink may not be immediately feasible. Is the >> only recourse in that case to drain a job an start a new one? >> >> The Beam IOs are open source allowing you to edit the code and build a > new version locally which you would consume in your project. Dataflow does > have an optimization and replaces the pubsub source/sink but all others to > my knowledge should be based upon the Apache Beam source. > > >> On Tue, Aug 10, 2021 at 12:54 PM Luke Cwik <lc...@google.com> wrote: >> >>> >>> >>> On Tue, Aug 10, 2021 at 8:54 AM Evan Galpin <evan.gal...@gmail.com> >>> wrote: >>> >>>> Hi all, >>>> >>>> I recently had an experience where a streaming pipeline became >>>> "clogged" due to invalid data reaching the final step in my pipeline such >>>> that the data was causing a non-transient error when writing to my Sink. >>>> Since the job is a streaming job, the element (bundle) was continuously >>>> retrying. >>>> >>>> What options are there for getting out of this state when it occurs? >>>> >>> >>> * Modify the sink implementation to do what you want with the bad data >>> and update the pipeline. >>> * Cancel the pipeline and update the implementation to handle the bad >>> records and rerun from last known good position reprocessing whatever was >>> necessary. >>> >>> I attempted to add validation and update the streaming job to remove the >>>> bad entity; though the update was successful, I believe the bad entity was >>>> already checkpointed (?) further downstream in the pipeline. What then? >>>> >>> And for something like a database schema and evolving it over time, what >>>> is the typical solution? >>>> >>> >>> * Pipeline update containing newly updated schema before data with new >>> schema starts rolling in. >>> * Use a format and encoding that is agnostic to changes with a >>> source/sink that can consume this agnostic format. See this thread[1] and >>> others like it in the user and dev mailing lists. >>> >>> >>>> - Should pipelines mirror a DB schema and do validation of all data >>>> types in the pipeline? >>>> >>> >>> Perform validation at critical points in the pipeline like data ingress >>> and egress. Insertion of the data into the DB failing via a dead letter >>> queue works for the cases that are loud and throw exceptions but for the >>> cases where they are inserted successfully but are still invalid from a >>> business logic standpoint won't be caught without validation. >>> >>> >>>> - Should all sinks implement a way to remove non-transient failures >>>> from retrying and output them via PCollectionTuple (such as with BigQuery >>>> failed inserts)? >>>> >>> >>> Yes, dead letter queues (DLQs) are quite a good solution for this since >>> it provides a lot of flexibility and allows for a process to fix it up >>> (typically a manual process). >>> >>> 1: >>> https://lists.apache.org/thread.html/r4b31c8b76fa81dcb130397077b981ab6429f2999b9d864c815214c5a%40%3Cuser.beam.apache.org%3E >>> >>