Using provenance to explain bad data in a general manner requires deep
support from your data processing engine and is still a research topic (for
example,
https://blog.acolyer.org/2017/02/01/explaining-outputs-in-modern-data-analytics/)
so I wouldn't go down that path. I expect that putting in the effort to
make something customized for your domain will pay off, anyhow.

Eugene, are the best practices you mention somewhat like this section of
the PTransform Style Guide?
https://beam.apache.org/contribute/ptransform-style-guide/#runtime-errors-and-data-consistency

That document is under "contribute" instead of the programming guide; it
may be a bit terse. But the same guidelines we use for our contributions
are naturally also what we'd encourage all PTransform authors to use.
Adding [email protected] because this is veering into that sort of
territory.

Kenn

On Thu, Jun 22, 2017 at 6:40 PM, Eugene Kirpichov <[email protected]>
wrote:

> I don't think it's possible to come up with something that will catch
> failing elements at the level of whole PTransform's, because:
> - You can't return elements failing some internal transform of the
> composite PTransform, because 1) they are a private implementation detail
> of that transform and should not be exposed 2) their type is unknown and
> different from the type of input elements
> - That leaves you only with the option of returning elements of (one of)
> the transform's input(s) that in some way caused something else inside the
> transform to fail. Tracking this across a GroupByKey is, I believe,
> near-impossible (or if possible, it would be very hard to interpret the
> result of that), e.g. imagine you're computing a sum of your elements, and
> then a downstream function fails declaring that the sum is too large: which
> of the elements that went into the sum should the failure be attributed to?
>
> I think the only practical thing you can do here is 1) come up with a
> utility "DoFn that emits failing elements to an additional output tag" 2)
> come up with "best practices" for structuring your composite transform in
> ways that surface failures nicely, without violating abstraction boundaries
> (per bullet 1 above). Not sure what the best design for that would look
> like, but seems doable, suggestions welcome :)
>
> On Thu, Jun 22, 2017 at 6:25 PM Dmitry Demeshchuk <[email protected]>
> wrote:
>
>> I guess, an even more ideal approach would be something like this, which
>> also seems more doable:
>>
>> def third_char_is_an_a(word):
>>     if word[2] == 'a':
>>         return [word]
>>     return []
>>
>> output = (p
>>         | 'read' >> 
>> ReadFromText('gs://dataflow-samples/shakespeare/kinglear.txt'
>>
>> )
>>         | 'recorder' >> CaptureDownstreamFailures('my_failure_key')
>>         | 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
>>         | 'find_words_where_third_char_is_an_a' >> 
>> (beam.FlatMap(third_char_is_an_a)))
>>
>> output | 'failures' >> Failures('my_failure_key') >> 
>> WriteToText('gs://my-bucket/failures')
>> output | 'write' >> WriteToText('gs://my-bucket/output')
>>
>> ​
>>
>> On Thu, Jun 22, 2017 at 5:34 PM, Dmitry Demeshchuk <[email protected]>
>> wrote:
>>
>>> Hi folks,
>>>
>>> I’ve been recently struggling with the following problem.
>>>
>>> Data is quirky. It can have unicode, it can have poor escaping, or can
>>> be truncated, etc. Sometimes it happens due to problems with the processing
>>> code, sometimes it’s the data producing code, sometimes both.
>>>
>>> That said, if my data pipeline fails somewhere, I’d like to dump the
>>> problematic piece of data somewhere for later analysis. Suppose we have a
>>> pipeline:
>>>
>>> def third_char_is_an_a(word):
>>>     if word[2] == 'a':
>>>         return [word]
>>>     return []
>>>
>>> output = (p
>>>         | 'read' >> 
>>> ReadFromText('gs://dataflow-samples/shakespeare/kinglear.txt')
>>>         | 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
>>>         | 'find_words_where_third_char_is_an_a' >> 
>>> (beam.FlatMap(third_char_is_an_a)))
>>>
>>> output | 'write' >> WriteToText('gs://my-bucket/output')
>>>
>>> This pipeline will be failing, because I’m an idiot, and English has
>>> some words shorter than 3 characters. What I’d like, however, is being able
>>> to easily record these failures. For example I can just rewrite the whole
>>> pipeline:
>>>
>>> def third_char_is_an_a(word):
>>>     try:
>>>         if word[2] == 'a':
>>>             return []
>>>         return []
>>>     except Exception:
>>>         return [word]
>>>
>>> output = (p
>>>         | 'read' >> 
>>> ReadFromText('gs://dataflow-samples/shakespeare/kinglear.txt')
>>>         | 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
>>>         | 'find_words_where_third_char_is_an_a' >> 
>>> (beam.FlatMap(third_char_is_an_a)))
>>>
>>> output | 'write' >> WriteToText('gs://my-bucket/failed_words')
>>>
>>> If I wanted to still keep the succeeded results, I’d normally need to
>>> write something more complicated:
>>>
>>> def third_char_is_an_a(word):
>>>     try:
>>>         if word[2] == 'a':
>>>             return [(1, word)]
>>>         return []
>>>     except Exception:
>>>         return [(0, word)]
>>>
>>> output = (p
>>>         | 'read' >> 
>>> ReadFromText('gs://dataflow-samples/shakespeare/kinglear.txt')
>>>         | 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
>>>         | 'find_words_where_third_char_is_an_a' >> 
>>> (beam.FlatMap(third_char_is_an_a))
>>>
>>>         | 'partition' >> (beam.Partition(lambda x: x[0])))
>>>
>>> successes = output[1] | 'extract' >> beam.Map(lambda x: x[1])
>>> failures = output[0] | 'extract' >> beam.Map(lambda x: x[0])
>>>
>>> successes | 'write' >> WriteToText('gs://my-bucket/failed_words')
>>> failures | 'write' >> WriteToText('gs://my-bucket/failed_words')
>>>
>>> Would it be possible to instead make a generic PTransform named like
>>> DoOrReportBadData that allows doing something like this?
>>>
>>> def third_char_is_an_a(word):
>>>     if word[2] == 'a':
>>>         return [word]
>>>     return []
>>>
>>> failure_sink = WriteToText('gs://my-bucket/failed_words')
>>>
>>> output = (p
>>>         | 'read' >> 
>>> ReadFromText('gs://dataflow-samples/shakespeare/kinglear.txt')
>>>         | 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
>>>         | 'find_words_where_third_char_is_an_a' >> 
>>> DoOrRecordBadData(beam.FlatMap(third_char_is_an_a), 
>>> failure_sink=WriteToText('gs://my-bucket/failed_words')))
>>>
>>> output | 'write' >> WriteToText('gs://my-bucket/output')
>>>
>>> I’ve been trying to think of a way to implement that for any arbitrary
>>> PTransform, but in vain. It’s easy enough to implement that for a DoFn,
>>> and maybe that’s what I should do for starters?
>>>
>>> Also, this begs the second question. Can we somehow report the failures
>>> back to the upstream step? Say, instead of recording the actual word that
>>> failed, I’d rather record the initial data. For example:
>>>
>>> def third_char_is_an_a(word):
>>>     if word[2] == 'a':
>>>         return [word]
>>>     return []
>>>
>>> output = (p
>>>         | 'read' >> 
>>> ReadFromText('gs://dataflow-samples/shakespeare/kinglear.txt')
>>>         | 'reporter' >> 
>>> ReportDownstreamFailures(WriteToText('gs://my-bucket/failed_words'))
>>>         | 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
>>>         | 'find_words_where_third_char_is_an_a' >> 
>>> (beam.FlatMap(third_char_is_an_a)))
>>>
>>> output | 'write' >> WriteToText('gs://my-bucket/output')
>>>
>>> My guess is that the second option isn’t possible, but I’m still
>>> learning Beam, so may be wrong on that. I think either option (but
>>> especially the second one) would be super useful down the road for stream
>>> processing of data, so that pipelines can have some sort of a dumping
>>> ground for problematic items (which can be then looked into by human
>>> beings), while overall the pipeline is still running.
>>>
>>> Any thoughts would be very appreciated.
>>> ​
>>> --
>>> Best regards,
>>> Dmitry Demeshchuk.
>>>
>>
>>
>>
>> --
>> Best regards,
>> Dmitry Demeshchuk.
>>
>

Reply via email to