>
> It is likely that the incorrect transform was edited...
>

It appears you're right; I tried to reproduce but  this time was able to
clear the issue by making "the same" code change and updating the
pipeline.  I believe it was just a change in the wrong place in code.

Good to know this works! Thanks Luke 🙏

On Tue, Aug 10, 2021 at 1:19 PM Luke Cwik <lc...@google.com> wrote:

>
>
> On Tue, Aug 10, 2021 at 10:11 AM Evan Galpin <evan.gal...@gmail.com>
> wrote:
>
>> Thanks for your responses Luke. One point I have confusion over:
>>
>> * Modify the sink implementation to do what you want with the bad data
>>> and update the pipeline.
>>>
>>
>>  I modified the sink implementation to ignore the specific error that was
>> the problem and updated the pipeline. The update succeeded.  However, the
>> problem persisted.  How might that happen?  Is there caching involved?
>> Checkpointing? I changed the very last method called in the pipeline in
>> order to ensure the validation would apply, but the problem persisted.
>>
>
> It is likely that the incorrect transform was edited. You should take a
> look at the worker logs and find the exception that is being thrown and
> find the step name it is associated with (e.g.
> BigQueryIO/Write/StreamingInserts) and browse the source for a
> "StreamingInserts" transform that is applied from the "Write" transform
> which is applied from the BigQueryIO transform.
>
>
>>
>> And in the case where one is using a Sink which is a built-in IO module
>> of Beam, modification of the Sink may not be immediately feasible. Is the
>> only recourse in that case to drain a job an start a new one?
>>
>> The Beam IOs are open source allowing you to edit the code and build a
> new version locally which you would consume in your project. Dataflow does
> have an optimization and replaces the pubsub source/sink but all others to
> my knowledge should be based upon the Apache Beam source.
>
>
>> On Tue, Aug 10, 2021 at 12:54 PM Luke Cwik <lc...@google.com> wrote:
>>
>>>
>>>
>>> On Tue, Aug 10, 2021 at 8:54 AM Evan Galpin <evan.gal...@gmail.com>
>>> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I recently had an experience where a streaming pipeline became
>>>> "clogged" due to invalid data reaching the final step in my pipeline such
>>>> that the data was causing a non-transient error when writing to my Sink.
>>>> Since the job is a streaming job, the element (bundle) was continuously
>>>> retrying.
>>>>
>>>> What options are there for getting out of this state when it occurs?
>>>>
>>>
>>> * Modify the sink implementation to do what you want with the bad data
>>> and update the pipeline.
>>> * Cancel the pipeline and update the implementation to handle the bad
>>> records and rerun from last known good position reprocessing whatever was
>>> necessary.
>>>
>>> I attempted to add validation and update the streaming job to remove the
>>>> bad entity; though the update was successful, I believe the bad entity was
>>>> already checkpointed (?) further downstream in the pipeline. What then?
>>>>
>>> And for something like a database schema and evolving it over time, what
>>>> is the typical solution?
>>>>
>>>
>>> * Pipeline update containing newly updated schema before data with new
>>> schema starts rolling in.
>>> * Use a format and encoding that is agnostic to changes with a
>>> source/sink that can consume this agnostic format. See this thread[1] and
>>> others like it in the user and dev mailing lists.
>>>
>>>
>>>> - Should pipelines mirror a DB schema and do validation of all data
>>>> types in the pipeline?
>>>>
>>>
>>> Perform validation at critical points in the pipeline like data ingress
>>> and egress. Insertion of the data into the DB failing via a dead letter
>>> queue works for the cases that are loud and throw exceptions but for the
>>> cases where they are inserted successfully but are still invalid from a
>>> business logic standpoint won't be caught without validation.
>>>
>>>
>>>> - Should all sinks implement a way to remove non-transient failures
>>>> from retrying and output them via PCollectionTuple (such as with BigQuery
>>>> failed inserts)?
>>>>
>>>
>>> Yes, dead letter queues (DLQs) are quite a good solution for this since
>>> it provides a lot of flexibility and allows for a process to fix it up
>>> (typically a manual process).
>>>
>>> 1:
>>> https://lists.apache.org/thread.html/r4b31c8b76fa81dcb130397077b981ab6429f2999b9d864c815214c5a%40%3Cuser.beam.apache.org%3E
>>>
>>

Reply via email to