Vallery Lancey's post is definitely one of the viewpoints incorporated into this approach. I neglected to include that link in this iteration, but it was discussed in the previous thread.
Can you explain more about "another option that adds A+ Promise spec into the apply method"? I'm failing to parse what that means. On Mon, Feb 11, 2019 at 4:23 PM Sam Rohde <[email protected]> wrote: > Interesting ideas! I think you're really honing in on what the Apache Beam > API is missing: error handling for bad data and runtime errors. I like your > method because it coalesces all the errors into a single collection to be > looked at later. Also easy to add a PAssert on the errors collection. > > Looks like others are also taking a stab at exception handling: > https://medium.com/@vallerylancey/error-handling-elements-in-apache-beam-pipelines-fffdea91af2a > > I would also like to add another option that adds A+ Promise spec into the > apply method. This makes exception handling more general than with only the > Map method. > > On Fri, Feb 8, 2019 at 9:53 AM Jeff Klukas <[email protected]> wrote: > >> I'm looking for feedback on a new attempt at implementing an exception >> handling interface for map transforms as previously discussed on this list >> [0] and documented in JIRA [1]. I'd like for users to be able to pass a >> function into MapElements, FlatMapElements, etc. that potentially raises an >> exception without having to resort to rolling a completely custom ParDo >> with an additional output for failing elements. >> >> I have a PR open for review [2] that introduces an exception-handling >> interface that mimics the existing `into` and `via` methods of MapElements: >> >> Result<PCollection<Integer>, String> result = words.apply( >> MapElements.into(TypeDescriptors.integers()) >> .via((String word) -> 1 / word.length()) // throws >> ArithmeticException >> .withExceptions() // returns a MapWithFailures transform >> .into(TypeDescriptors.strings()) >> .via(ee -> e.exception().getMessage())); >> PCollection<String> errors = result.errors(); >> PCollection<Integer> output = result.output(); >> >> >> >> The example above is a bit more complex than I'd like, but gives users >> full control over what type handled exceptions are transformed into. It >> would be nice if we could simply create an error collection of some type >> that wraps the input element and the Exception directly, but there is still >> no general solution for encoding subclasses of exception, thus the need for >> some exception handling function (which in this example is the lambda >> passed to the second `via`). >> >> Let's call the above option 1. >> >> If we expect that most users will simply want to preserve the input >> element that failed and know general metadata about the exception >> (className, message, and stackTrace), we could instead optimize for a >> default solution where we return an instance of a new >> CodableException[InputT] type that wraps the input element and has >> additional string fields for className, message, and stackTrace: >> >> Result<PCollection<Integer>, String> result = words.apply( >> MapElements.into(TypeDescriptors.integers()) >> .via((String word) -> 1 / word.length()) >> .withExceptions()); >> PCollection<CodableException<Integer>> errors = result.errors(); >> PCollection<Integer> output = result.output(); >> >> Let's call this option 2. >> >> It's less user code compared to option 1 and returns a richer error >> collection. I believe we'd be able to handle setting an appropriate coder >> behind the scenes, setting some composite coder that reuses the coder for >> the input collection in order to encode the InputT instance. >> >> I think we'd still need to provide some additional methods, though, if >> the user wants to set a custom exception handling function and custom >> coder. That would be for needs where a user wants to catch only a >> particular subclass of exception, or access additional methods of Exception >> (to access getCause() perhaps) or methods particular to an Exception >> subclass. The full API would end up being more complex compared to option >> 1, but it does make the default case much easier to use. >> >> If it's not fairly obvious what's going on in either of the above >> examples, then we likely haven't figured out an appropriate API yet. >> Reviews on the PR or commentary on the above two options would be >> appreciated. >> >> [0] >> https://lists.apache.org/thread.html/936ed2a5f2c01be066fd903abf70130625e0b8cf4028c11b89b8b23f@%3Cdev.beam.apache.org%3E >> <https://lists.apache.org/thread.html/936ed2a5f2c01be066fd903abf70130625e0b8cf4028c11b89b8b23f@%3Cdev.beam.apache.org%3E> >> [1] https://issues.apache.org/jira/browse/BEAM-5638 >> [2] https://github.com/apache/beam/pull/7736 >> >
