Thanks for your responses Luke. One point I have confusion over: * Modify the sink implementation to do what you want with the bad data and > update the pipeline. >
I modified the sink implementation to ignore the specific error that was the problem and updated the pipeline. The update succeeded. However, the problem persisted. How might that happen? Is there caching involved? Checkpointing? I changed the very last method called in the pipeline in order to ensure the validation would apply, but the problem persisted. And in the case where one is using a Sink which is a built-in IO module of Beam, modification of the Sink may not be immediately feasible. Is the only recourse in that case to drain a job an start a new one? On Tue, Aug 10, 2021 at 12:54 PM Luke Cwik <lc...@google.com> wrote: > > > On Tue, Aug 10, 2021 at 8:54 AM Evan Galpin <evan.gal...@gmail.com> wrote: > >> Hi all, >> >> I recently had an experience where a streaming pipeline became "clogged" >> due to invalid data reaching the final step in my pipeline such that the >> data was causing a non-transient error when writing to my Sink. Since the >> job is a streaming job, the element (bundle) was continuously retrying. >> >> What options are there for getting out of this state when it occurs? >> > > * Modify the sink implementation to do what you want with the bad data and > update the pipeline. > * Cancel the pipeline and update the implementation to handle the bad > records and rerun from last known good position reprocessing whatever was > necessary. > > I attempted to add validation and update the streaming job to remove the >> bad entity; though the update was successful, I believe the bad entity was >> already checkpointed (?) further downstream in the pipeline. What then? >> > And for something like a database schema and evolving it over time, what >> is the typical solution? >> > > * Pipeline update containing newly updated schema before data with new > schema starts rolling in. > * Use a format and encoding that is agnostic to changes with a source/sink > that can consume this agnostic format. See this thread[1] and others like > it in the user and dev mailing lists. > > >> - Should pipelines mirror a DB schema and do validation of all data types >> in the pipeline? >> > > Perform validation at critical points in the pipeline like data ingress > and egress. Insertion of the data into the DB failing via a dead letter > queue works for the cases that are loud and throw exceptions but for the > cases where they are inserted successfully but are still invalid from a > business logic standpoint won't be caught without validation. > > >> - Should all sinks implement a way to remove non-transient failures from >> retrying and output them via PCollectionTuple (such as with BigQuery failed >> inserts)? >> > > Yes, dead letter queues (DLQs) are quite a good solution for this since it > provides a lot of flexibility and allows for a process to fix it up > (typically a manual process). > > 1: > https://lists.apache.org/thread.html/r4b31c8b76fa81dcb130397077b981ab6429f2999b9d864c815214c5a%40%3Cuser.beam.apache.org%3E >