I agree that chaining of transforms that might produce errors is a nice
pattern to be able to support, so I very much do want to provide something
like the promise interface you've described.
My currently posted PR supports this by having error-producing transforms
return a WithExceptions.Result collection that allows access to the
"output" and "error" collections, but also provides an `errorsTo` method to
strip the error collection to a list, returning the output collection for
further chaining:
PCollection<Integer> input = ...
List<PCollection<Map<String, String>> errorCollections = new ArrayList<>();
PCollection<SomeClass> output = input
.apply(MapElements...withExceptions()...)
.errorsTo(errorCollections)
.apply(MapElements...withExceptions()...)
.errorsTo(errorCollections);
PCollection<Map<String, String>> errors =
PCollectionList.of(errorCollections)
.apply("FlattenErrorCollections", Flatten.pCollections());
The need for collections with full type information and coders puts some
different constraints on the APIs for Beam. There may well be some deeper
changes possible (I believe Reuven previously suggested investigating
building support for error handling into the DoFn interface) but I see that
as an independent project of larger scope compared to this effort. Given
the current structure of DoFns, PTransforms, and coders, it's necessary to
provide an exception handler within the processing logic of the DoFn
itself, in order to produce a codable error collection that can be passed
to additional processing phases.
To restate, my proposal is about establishing a pattern for easily allowing
users to inject exception handling code to a DoFn wrapped in a PTransform,
and get out some error collection. It's an approach that's viable in the
short term without significant changes to deeper layers of the Beam Java
SDK. My intention with the PR was to factor as much reusable logic out as
possible to the new WithExceptions class so that additional transforms
within the SDK and user-defined transforms would be able to use the same
pattern that we're building into MapElements and FlatMapElements.
Sam - Does the above seem to provide the kind of functionality you like
from promises? Do you see ways to evolve this approach to better
incorporate successful ideas from promises? Or do you see opportunities for
deeper changes that we should pursue instead of the above?
On Mon, Feb 11, 2019 at 5:17 PM Sam Rohde <[email protected]> wrote:
> Sure, I was thinking of treating the apply as a promise (making use of
> your CodableException idea as well):
>
> ```
> PCollection<...> result = words.apply(new SomeUserDoFn())
>
> .then(new SomeOtherDoFn())
>
> .then(new OtherDoFn(),
>
> // Error Handler
>
> (CodableException<...> e) -> {
>
> logger.info(e.getMessage());
>
> return e;
>
> });
>
> ```
>
> The idea is to treat the pipeline with each apply as an asynchronous
> operation where each step can either be "fulfilled" or "rejected". The
> promises can then be chained together like above.
>
>
> On Mon, Feb 11, 2019 at 1:47 PM Jeff Klukas <[email protected]> wrote:
>
>> Vallery Lancey's post is definitely one of the viewpoints incorporated
>> into this approach. I neglected to include that link in this iteration, but
>> it was discussed in the previous thread.
>>
>> Can you explain more about "another option that adds A+ Promise spec into
>> the apply method"? I'm failing to parse what that means.
>>
>> On Mon, Feb 11, 2019 at 4:23 PM Sam Rohde <[email protected]> wrote:
>>
>>> Interesting ideas! I think you're really honing in on what the Apache
>>> Beam API is missing: error handling for bad data and runtime errors. I like
>>> your method because it coalesces all the errors into a single collection to
>>> be looked at later. Also easy to add a PAssert on the errors collection.
>>>
>>> Looks like others are also taking a stab at exception handling:
>>> https://medium.com/@vallerylancey/error-handling-elements-in-apache-beam-pipelines-fffdea91af2a
>>>
>>> I would also like to add another option that adds A+ Promise spec into
>>> the apply method. This makes exception handling more general than with only
>>> the Map method.
>>>
>>> On Fri, Feb 8, 2019 at 9:53 AM Jeff Klukas <[email protected]> wrote:
>>>
>>>> I'm looking for feedback on a new attempt at implementing an exception
>>>> handling interface for map transforms as previously discussed on this list
>>>> [0] and documented in JIRA [1]. I'd like for users to be able to pass a
>>>> function into MapElements, FlatMapElements, etc. that potentially raises an
>>>> exception without having to resort to rolling a completely custom ParDo
>>>> with an additional output for failing elements.
>>>>
>>>> I have a PR open for review [2] that introduces an exception-handling
>>>> interface that mimics the existing `into` and `via` methods of MapElements:
>>>>
>>>> Result<PCollection<Integer>, String> result = words.apply(
>>>> MapElements.into(TypeDescriptors.integers())
>>>> .via((String word) -> 1 / word.length()) // throws
>>>> ArithmeticException
>>>> .withExceptions() // returns a MapWithFailures transform
>>>> .into(TypeDescriptors.strings())
>>>> .via(ee -> e.exception().getMessage()));
>>>> PCollection<String> errors = result.errors();
>>>> PCollection<Integer> output = result.output();
>>>>
>>>>
>>>>
>>>> The example above is a bit more complex than I'd like, but gives users
>>>> full control over what type handled exceptions are transformed into. It
>>>> would be nice if we could simply create an error collection of some type
>>>> that wraps the input element and the Exception directly, but there is still
>>>> no general solution for encoding subclasses of exception, thus the need for
>>>> some exception handling function (which in this example is the lambda
>>>> passed to the second `via`).
>>>>
>>>> Let's call the above option 1.
>>>>
>>>> If we expect that most users will simply want to preserve the input
>>>> element that failed and know general metadata about the exception
>>>> (className, message, and stackTrace), we could instead optimize for a
>>>> default solution where we return an instance of a new
>>>> CodableException[InputT] type that wraps the input element and has
>>>> additional string fields for className, message, and stackTrace:
>>>>
>>>> Result<PCollection<Integer>, String> result = words.apply(
>>>> MapElements.into(TypeDescriptors.integers())
>>>> .via((String word) -> 1 / word.length())
>>>> .withExceptions());
>>>> PCollection<CodableException<Integer>> errors = result.errors();
>>>> PCollection<Integer> output = result.output();
>>>>
>>>> Let's call this option 2.
>>>>
>>>> It's less user code compared to option 1 and returns a richer error
>>>> collection. I believe we'd be able to handle setting an appropriate coder
>>>> behind the scenes, setting some composite coder that reuses the coder for
>>>> the input collection in order to encode the InputT instance.
>>>>
>>>> I think we'd still need to provide some additional methods, though, if
>>>> the user wants to set a custom exception handling function and custom
>>>> coder. That would be for needs where a user wants to catch only a
>>>> particular subclass of exception, or access additional methods of Exception
>>>> (to access getCause() perhaps) or methods particular to an Exception
>>>> subclass. The full API would end up being more complex compared to option
>>>> 1, but it does make the default case much easier to use.
>>>>
>>>> If it's not fairly obvious what's going on in either of the above
>>>> examples, then we likely haven't figured out an appropriate API yet.
>>>> Reviews on the PR or commentary on the above two options would be
>>>> appreciated.
>>>>
>>>> [0]
>>>> https://lists.apache.org/thread.html/936ed2a5f2c01be066fd903abf70130625e0b8cf4028c11b89b8b23f@%3Cdev.beam.apache.org%3E
>>>> <https://lists.apache.org/thread.html/936ed2a5f2c01be066fd903abf70130625e0b8cf4028c11b89b8b23f@%3Cdev.beam.apache.org%3E>
>>>> [1] https://issues.apache.org/jira/browse/BEAM-5638
>>>> [2] https://github.com/apache/beam/pull/7736
>>>>
>>>