Interesting ideas! I think you're really honing in on what the Apache Beam API is missing: error handling for bad data and runtime errors. I like your method because it coalesces all the errors into a single collection to be looked at later. Also easy to add a PAssert on the errors collection.
Looks like others are also taking a stab at exception handling: https://medium.com/@vallerylancey/error-handling-elements-in-apache-beam-pipelines-fffdea91af2a I would also like to add another option that adds A+ Promise spec into the apply method. This makes exception handling more general than with only the Map method. On Fri, Feb 8, 2019 at 9:53 AM Jeff Klukas <[email protected]> wrote: > I'm looking for feedback on a new attempt at implementing an exception > handling interface for map transforms as previously discussed on this list > [0] and documented in JIRA [1]. I'd like for users to be able to pass a > function into MapElements, FlatMapElements, etc. that potentially raises an > exception without having to resort to rolling a completely custom ParDo > with an additional output for failing elements. > > I have a PR open for review [2] that introduces an exception-handling > interface that mimics the existing `into` and `via` methods of MapElements: > > Result<PCollection<Integer>, String> result = words.apply( > MapElements.into(TypeDescriptors.integers()) > .via((String word) -> 1 / word.length()) // throws > ArithmeticException > .withExceptions() // returns a MapWithFailures transform > .into(TypeDescriptors.strings()) > .via(ee -> e.exception().getMessage())); > PCollection<String> errors = result.errors(); > PCollection<Integer> output = result.output(); > > > > The example above is a bit more complex than I'd like, but gives users > full control over what type handled exceptions are transformed into. It > would be nice if we could simply create an error collection of some type > that wraps the input element and the Exception directly, but there is still > no general solution for encoding subclasses of exception, thus the need for > some exception handling function (which in this example is the lambda > passed to the second `via`). > > Let's call the above option 1. > > If we expect that most users will simply want to preserve the input > element that failed and know general metadata about the exception > (className, message, and stackTrace), we could instead optimize for a > default solution where we return an instance of a new > CodableException[InputT] type that wraps the input element and has > additional string fields for className, message, and stackTrace: > > Result<PCollection<Integer>, String> result = words.apply( > MapElements.into(TypeDescriptors.integers()) > .via((String word) -> 1 / word.length()) > .withExceptions()); > PCollection<CodableException<Integer>> errors = result.errors(); > PCollection<Integer> output = result.output(); > > Let's call this option 2. > > It's less user code compared to option 1 and returns a richer error > collection. I believe we'd be able to handle setting an appropriate coder > behind the scenes, setting some composite coder that reuses the coder for > the input collection in order to encode the InputT instance. > > I think we'd still need to provide some additional methods, though, if the > user wants to set a custom exception handling function and custom coder. > That would be for needs where a user wants to catch only a particular > subclass of exception, or access additional methods of Exception (to access > getCause() perhaps) or methods particular to an Exception subclass. The > full API would end up being more complex compared to option 1, but it does > make the default case much easier to use. > > If it's not fairly obvious what's going on in either of the above > examples, then we likely haven't figured out an appropriate API yet. > Reviews on the PR or commentary on the above two options would be > appreciated. > > [0] > https://lists.apache.org/thread.html/936ed2a5f2c01be066fd903abf70130625e0b8cf4028c11b89b8b23f@%3Cdev.beam.apache.org%3E > <https://lists.apache.org/thread.html/936ed2a5f2c01be066fd903abf70130625e0b8cf4028c11b89b8b23f@%3Cdev.beam.apache.org%3E> > [1] https://issues.apache.org/jira/browse/BEAM-5638 > [2] https://github.com/apache/beam/pull/7736 >
