I'm looking for feedback on a new attempt at implementing an exception
handling interface for map transforms as previously discussed on this list
[0] and documented in JIRA [1]. I'd like for users to be able to pass a
function into MapElements, FlatMapElements, etc. that potentially raises an
exception without having to resort to rolling a completely custom ParDo
with an additional output for failing elements.

I have a PR open for review [2] that introduces an exception-handling
interface that mimics the existing `into` and `via` methods of MapElements:

 Result<PCollection<Integer>, String> result = words.apply(
     MapElements.into(TypeDescriptors.integers())
                .via((String word) -> 1 / word.length()) // throws
ArithmeticException
                .withExceptions() // returns a MapWithFailures transform
                .into(TypeDescriptors.strings())
                .via(ee -> e.exception().getMessage()));
 PCollection<String> errors = result.errors();
 PCollection<Integer> output = result.output();



The example above is a bit more complex than I'd like, but gives users full
control over what type handled exceptions are transformed into. It would be
nice if we could simply create an error collection of some type that wraps
the input element and the Exception directly, but there is still no general
solution for encoding subclasses of exception, thus the need for some
exception handling function (which in this example is the lambda passed to
the second `via`).

Let's call the above option 1.

If we expect that most users will simply want to preserve the input element
that failed and know general metadata about the exception (className,
message, and stackTrace), we could instead optimize for a default solution
where we return an instance of a new CodableException[InputT] type that
wraps the input element and has additional string fields for className,
message, and stackTrace:

 Result<PCollection<Integer>, String> result = words.apply(
     MapElements.into(TypeDescriptors.integers())
                .via((String word) -> 1 / word.length())
                .withExceptions());
 PCollection<CodableException<Integer>> errors = result.errors();
 PCollection<Integer> output = result.output();

Let's call this option 2.

It's less user code compared to option 1 and returns a richer error
collection. I believe we'd be able to handle setting an appropriate coder
behind the scenes, setting some composite coder that reuses the coder for
the input collection in order to encode the InputT instance.

I think we'd still need to provide some additional methods, though, if the
user wants to set a custom exception handling function and custom coder.
That would be for needs where a user wants to catch only a particular
subclass of exception, or access additional methods of Exception (to access
getCause() perhaps) or methods particular to an Exception subclass. The
full API would end up being more complex compared to option 1, but it does
make the default case much easier to use.

If it's not fairly obvious what's going on in either of the above examples,
then we likely haven't figured out an appropriate API yet. Reviews on the
PR or commentary on the above two options would be appreciated.

[0]
https://lists.apache.org/thread.html/936ed2a5f2c01be066fd903abf70130625e0b8cf4028c11b89b8b23f@%3Cdev.beam.apache.org%3E
<https://lists.apache.org/thread.html/936ed2a5f2c01be066fd903abf70130625e0b8cf4028c11b89b8b23f@%3Cdev.beam.apache.org%3E>
[1] https://issues.apache.org/jira/browse/BEAM-5638
[2] https://github.com/apache/beam/pull/7736

Reply via email to