[ https://issues.apache.org/jira/browse/BEAM-5638?focusedWorklogId=195689&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-195689 ]
ASF GitHub Bot logged work on BEAM-5638: ---------------------------------------- Author: ASF GitHub Bot Created on: 07/Feb/19 14:38 Start Date: 07/Feb/19 14:38 Worklog Time Spent: 10m Work Description: jklukas commented on pull request #7736: [BEAM-5638] Exception handling for Java MapElements and FlatMapElements URL: https://github.com/apache/beam/pull/7736#discussion_r254707600 ########## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java ########## @@ -170,4 +174,191 @@ public void populateDisplayData(DisplayData.Builder builder) { builder.include("fn", (HasDisplayData) originalFnForDisplayData); } } + + /** + * Return a modified {@code PTransform} that catches exceptions raised while mapping elements. + * + * <p>The user must call {@code via} on the returned {@link FlatMapWithExceptions} instance to + * define an exception handler. If the handler does not provide sufficient type information, the + * user must also call {@code into} to define a type descriptor for the error collection. + * + * <p>See {@link WithExceptions} documentation for usage patterns of the returned {@link + * WithExceptions.Result}. + * + * @return a {@link WithExceptions.Result} wrapping the output and error collections + */ + public FlatMapWithExceptions<InputT, OutputT, ?> withExceptions() { + return new FlatMapWithExceptions<>( + fn, originalFnForDisplayData, inputType, outputType, null, null); + } + + /** Implementation of {@link FlatMapElements#withExceptions()}. */ + public static class FlatMapWithExceptions<InputT, OutputT, FailureT> + extends PTransform< + PCollection<InputT>, WithExceptions.Result<PCollection<OutputT>, FailureT>> { + + private final transient TypeDescriptor<InputT> inputType; + private final transient TypeDescriptor<OutputT> outputType; + @Nullable private final transient TypeDescriptor<FailureT> failureType; + private final transient Object originalFnForDisplayData; + @Nullable private final Contextful<Fn<InputT, Iterable<OutputT>>> fn; + @Nullable private final ProcessFunction<ExceptionElement<InputT>, FailureT> exceptionHandler; + + FlatMapWithExceptions( + @Nullable Contextful<Fn<InputT, Iterable<OutputT>>> fn, + Object originalFnForDisplayData, + TypeDescriptor<InputT> inputType, + TypeDescriptor<OutputT> outputType, + @Nullable ProcessFunction<ExceptionElement<InputT>, FailureT> exceptionHandler, + @Nullable TypeDescriptor<FailureT> failureType) { + this.fn = fn; + this.originalFnForDisplayData = originalFnForDisplayData; + this.inputType = inputType; + this.outputType = outputType; + this.exceptionHandler = exceptionHandler; + this.failureType = failureType; + } + + /** + * Returns a new {@link FlatMapWithExceptions} transform with the given type descriptor for the + * error collection, but the exception handler yet to be specified using {@link + * #via(ProcessFunction)}. + */ + public <NewFailureT> FlatMapWithExceptions<InputT, OutputT, NewFailureT> into( + TypeDescriptor<NewFailureT> failureTypeDescriptor) { + return new FlatMapWithExceptions<>( + fn, originalFnForDisplayData, inputType, outputType, null, failureTypeDescriptor); + } + + /** + * Returns a {@code PTransform} that catches exceptions raised while mapping elements, passing + * the raised exception instance and the input element being processed through the given {@code + * exceptionHandler} and emitting the result to an error collection. + * + * <p>Example usage: + * + * <pre>{@code + * Result<PCollection<String>, String>> result = words.apply( + * FlatMapElements.into(TypeDescriptors.strings()) + * .via((String line) -> Arrays.asList(Arrays.copyOfRange(line.split(" "), 1, 5))) + * .withExceptions() + * .into(TypeDescriptors.strings()) + * .via(ee -> e.exception().getMessage())); Review comment: I've chose to use `into` and `via` to keep the interface for exception handling as close as possible to the existing interface of `MapElements` and `FlatMapElements`. A possible different direction here is a `withCoder` or `withFailureCoder` method analogous to [AvroIO.Parse#withCoder](https://beam.apache.org/releases/javadoc/2.9.0/org/apache/beam/sdk/io/AvroIO.Parse.html#withCoder-org.apache.beam.sdk.coders.Coder-). If we used that instead of a method to supply a type descriptor, though, I think we lose the possibility of taking advantage of InferableFunction; withCoder would always be required. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 195689) Time Spent: 5h (was: 4h 50m) Remaining Estimate: 163h (was: 163h 10m) > Add exception handling to single message transforms in Java SDK > --------------------------------------------------------------- > > Key: BEAM-5638 > URL: https://issues.apache.org/jira/browse/BEAM-5638 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core > Reporter: Jeff Klukas > Assignee: Jeff Klukas > Priority: Minor > Labels: triaged > Original Estimate: 168h > Time Spent: 5h > Remaining Estimate: 163h > > Add methods to MapElements, FlatMapElements, and Filter that allow users to > specify expected exceptions and tuple tags to associate with the with > collections of the successfully and unsuccessfully processed elements. > See discussion on dev list: > https://lists.apache.org/thread.html/936ed2a5f2c01be066fd903abf70130625e0b8cf4028c11b89b8b23f@%3Cdev.beam.apache.org%3E -- This message was sent by Atlassian JIRA (v7.6.3#76005)