[ 
https://issues.apache.org/jira/browse/BEAM-5638?focusedWorklogId=195689&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-195689
 ]

ASF GitHub Bot logged work on BEAM-5638:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 07/Feb/19 14:38
            Start Date: 07/Feb/19 14:38
    Worklog Time Spent: 10m 
      Work Description: jklukas commented on pull request #7736: [BEAM-5638] 
Exception handling for Java MapElements and FlatMapElements
URL: https://github.com/apache/beam/pull/7736#discussion_r254707600
 
 

 ##########
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
 ##########
 @@ -170,4 +174,191 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
       builder.include("fn", (HasDisplayData) originalFnForDisplayData);
     }
   }
+
+  /**
+   * Return a modified {@code PTransform} that catches exceptions raised while 
mapping elements.
+   *
+   * <p>The user must call {@code via} on the returned {@link 
FlatMapWithExceptions} instance to
+   * define an exception handler. If the handler does not provide sufficient 
type information, the
+   * user must also call {@code into} to define a type descriptor for the 
error collection.
+   *
+   * <p>See {@link WithExceptions} documentation for usage patterns of the 
returned {@link
+   * WithExceptions.Result}.
+   *
+   * @return a {@link WithExceptions.Result} wrapping the output and error 
collections
+   */
+  public FlatMapWithExceptions<InputT, OutputT, ?> withExceptions() {
+    return new FlatMapWithExceptions<>(
+        fn, originalFnForDisplayData, inputType, outputType, null, null);
+  }
+
+  /** Implementation of {@link FlatMapElements#withExceptions()}. */
+  public static class FlatMapWithExceptions<InputT, OutputT, FailureT>
+      extends PTransform<
+          PCollection<InputT>, WithExceptions.Result<PCollection<OutputT>, 
FailureT>> {
+
+    private final transient TypeDescriptor<InputT> inputType;
+    private final transient TypeDescriptor<OutputT> outputType;
+    @Nullable private final transient TypeDescriptor<FailureT> failureType;
+    private final transient Object originalFnForDisplayData;
+    @Nullable private final Contextful<Fn<InputT, Iterable<OutputT>>> fn;
+    @Nullable private final ProcessFunction<ExceptionElement<InputT>, 
FailureT> exceptionHandler;
+
+    FlatMapWithExceptions(
+        @Nullable Contextful<Fn<InputT, Iterable<OutputT>>> fn,
+        Object originalFnForDisplayData,
+        TypeDescriptor<InputT> inputType,
+        TypeDescriptor<OutputT> outputType,
+        @Nullable ProcessFunction<ExceptionElement<InputT>, FailureT> 
exceptionHandler,
+        @Nullable TypeDescriptor<FailureT> failureType) {
+      this.fn = fn;
+      this.originalFnForDisplayData = originalFnForDisplayData;
+      this.inputType = inputType;
+      this.outputType = outputType;
+      this.exceptionHandler = exceptionHandler;
+      this.failureType = failureType;
+    }
+
+    /**
+     * Returns a new {@link FlatMapWithExceptions} transform with the given 
type descriptor for the
+     * error collection, but the exception handler yet to be specified using 
{@link
+     * #via(ProcessFunction)}.
+     */
+    public <NewFailureT> FlatMapWithExceptions<InputT, OutputT, NewFailureT> 
into(
+        TypeDescriptor<NewFailureT> failureTypeDescriptor) {
+      return new FlatMapWithExceptions<>(
+          fn, originalFnForDisplayData, inputType, outputType, null, 
failureTypeDescriptor);
+    }
+
+    /**
+     * Returns a {@code PTransform} that catches exceptions raised while 
mapping elements, passing
+     * the raised exception instance and the input element being processed 
through the given {@code
+     * exceptionHandler} and emitting the result to an error collection.
+     *
+     * <p>Example usage:
+     *
+     * <pre>{@code
+     * Result<PCollection<String>, String>> result = words.apply(
+     *     FlatMapElements.into(TypeDescriptors.strings())
+     *         .via((String line) -> 
Arrays.asList(Arrays.copyOfRange(line.split(" "), 1, 5)))
+     *         .withExceptions()
+     *         .into(TypeDescriptors.strings())
+     *         .via(ee -> e.exception().getMessage()));
 
 Review comment:
   I've chose to use `into` and `via` to keep the interface for exception 
handling as close as possible to the existing interface of `MapElements` and 
`FlatMapElements`. A possible different direction here is a `withCoder` or 
`withFailureCoder` method analogous to 
[AvroIO.Parse#withCoder](https://beam.apache.org/releases/javadoc/2.9.0/org/apache/beam/sdk/io/AvroIO.Parse.html#withCoder-org.apache.beam.sdk.coders.Coder-).
 If we used that instead of a method to supply a type descriptor, though, I 
think we lose the possibility of taking advantage of InferableFunction; 
withCoder would always be required.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

            Worklog Id:     (was: 195689)
            Time Spent: 5h  (was: 4h 50m)
    Remaining Estimate: 163h  (was: 163h 10m)

> Add exception handling to single message transforms in Java SDK
> ---------------------------------------------------------------
>
>                 Key: BEAM-5638
>                 URL: https://issues.apache.org/jira/browse/BEAM-5638
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-java-core
>            Reporter: Jeff Klukas
>            Assignee: Jeff Klukas
>            Priority: Minor
>              Labels: triaged
>   Original Estimate: 168h
>          Time Spent: 5h
>  Remaining Estimate: 163h
>
> Add methods to MapElements, FlatMapElements, and Filter that allow users to 
> specify expected exceptions and tuple tags to associate with the with 
> collections of the successfully and unsuccessfully processed elements.
> See discussion on dev list:
> https://lists.apache.org/thread.html/936ed2a5f2c01be066fd903abf70130625e0b8cf4028c11b89b8b23f@%3Cdev.beam.apache.org%3E



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to