[ 
https://issues.apache.org/jira/browse/BEAM-5638?focusedWorklogId=201033&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-201033
 ]

ASF GitHub Bot logged work on BEAM-5638:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 20/Feb/19 02:03
            Start Date: 20/Feb/19 02:03
    Worklog Time Spent: 10m 
      Work Description: reuvenlax commented on pull request #7736: [BEAM-5638] 
Exception handling for Java MapElements and FlatMapElements
URL: https://github.com/apache/beam/pull/7736#discussion_r258307310
 
 

 ##########
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
 ##########
 @@ -170,4 +174,193 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
       builder.include("fn", (HasDisplayData) originalFnForDisplayData);
     }
   }
+
+  /**
+   * Return a modified {@code PTransform} that catches exceptions raised while 
mapping elements.
+   *
+   * <p>The user must call {@code via} on the returned {@link 
FlatMapWithExceptions} instance to
+   * define an exception handler. If the handler does not provide sufficient 
type information, the
+   * user must also call {@code into} to define a type descriptor for the 
error collection.
+   *
+   * <p>See {@link WithExceptions} documentation for usage patterns of the 
returned {@link
+   * WithExceptions.Result}.
+   *
+   * @return a {@link WithExceptions.Result} wrapping the output and error 
collections
+   */
+  @Experimental(Experimental.Kind.WITH_EXCEPTIONS)
+  public FlatMapWithExceptions<InputT, OutputT, ?> withExceptions() {
+    return new FlatMapWithExceptions<>(
+        fn, originalFnForDisplayData, inputType, outputType, null, null);
+  }
+
+  /** Implementation of {@link FlatMapElements#withExceptions()}. */
+  @Experimental(Experimental.Kind.WITH_EXCEPTIONS)
+  public static class FlatMapWithExceptions<InputT, OutputT, FailureT>
+      extends PTransform<
+          PCollection<InputT>, WithExceptions.Result<PCollection<OutputT>, 
FailureT>> {
+
+    private final transient TypeDescriptor<InputT> inputType;
+    private final transient TypeDescriptor<OutputT> outputType;
+    @Nullable private final transient TypeDescriptor<FailureT> failureType;
+    private final transient Object originalFnForDisplayData;
+    @Nullable private final Contextful<Fn<InputT, Iterable<OutputT>>> fn;
+    @Nullable private final ProcessFunction<ExceptionElement<InputT>, 
FailureT> exceptionHandler;
+
+    FlatMapWithExceptions(
+        @Nullable Contextful<Fn<InputT, Iterable<OutputT>>> fn,
+        Object originalFnForDisplayData,
+        TypeDescriptor<InputT> inputType,
+        TypeDescriptor<OutputT> outputType,
+        @Nullable ProcessFunction<ExceptionElement<InputT>, FailureT> 
exceptionHandler,
+        @Nullable TypeDescriptor<FailureT> failureType) {
+      this.fn = fn;
+      this.originalFnForDisplayData = originalFnForDisplayData;
+      this.inputType = inputType;
+      this.outputType = outputType;
+      this.exceptionHandler = exceptionHandler;
+      this.failureType = failureType;
+    }
+
+    /**
+     * Returns a new {@link FlatMapWithExceptions} transform with the given 
type descriptor for the
+     * error collection, but the exception handler yet to be specified using 
{@link
+     * #via(ProcessFunction)}.
+     */
+    public <NewFailureT> FlatMapWithExceptions<InputT, OutputT, NewFailureT> 
into(
+        TypeDescriptor<NewFailureT> failureTypeDescriptor) {
+      return new FlatMapWithExceptions<>(
+          fn, originalFnForDisplayData, inputType, outputType, null, 
failureTypeDescriptor);
+    }
+
+    /**
+     * Returns a {@code PTransform} that catches exceptions raised while 
mapping elements, passing
+     * the raised exception instance and the input element being processed 
through the given {@code
+     * exceptionHandler} and emitting the result to an error collection.
+     *
+     * <p>Example usage:
+     *
+     * <pre>{@code
+     * Result<PCollection<String>, String>> result = words.apply(
+     *     FlatMapElements.into(TypeDescriptors.strings())
+     *         .via((String line) -> 
Arrays.asList(Arrays.copyOfRange(line.split(" "), 1, 5)))
+     *         .withExceptions()
+     *         .into(TypeDescriptors.strings())
+     *         .via(ee -> e.exception().getMessage()));
+     * PCollection<String> errors = result.errors();
+     * }</pre>
+     */
+    public FlatMapWithExceptions<InputT, OutputT, FailureT> via(
+        ProcessFunction<ExceptionElement<InputT>, FailureT> exceptionHandler) {
+      return new FlatMapWithExceptions<>(
+          fn, originalFnForDisplayData, inputType, outputType, 
exceptionHandler, failureType);
+    }
+
+    /**
+     * Like {@link #via(ProcessFunction)}, but takes advantage of the type 
information provided by
+     * {@link InferableFunction}, meaning that a call to {@link 
#into(TypeDescriptor)} may not be
+     * necessary.
+     *
+     * <p>Example usage:
+     *
+     * <pre>{@code
+     * Result<PCollection<Integer>, KV<String, Map<String, String>>> result = 
words.apply(
+     *     FlatMapElements.into(TypeDescriptors.strings())
+     *         .via((String line) -> 
Arrays.asList(Arrays.copyOfRange(line.split(" "), 1, 5)))
+     *         .withExceptions()
+     *         .via(new WithExceptions.ExceptionAsMapHandler<String>() {}));
+     * PCollection<KV<String, Map<String, String>>> errors = result.errors();
+     * }</pre>
+     */
+    public <NewFailureT> FlatMapWithExceptions<InputT, OutputT, NewFailureT> 
via(
+        InferableFunction<ExceptionElement<InputT>, NewFailureT> 
exceptionHandler) {
+      return new FlatMapWithExceptions<>(
+          fn,
+          originalFnForDisplayData,
+          inputType,
+          outputType,
+          exceptionHandler,
+          exceptionHandler.getOutputTypeDescriptor());
+    }
+
+    @Override
+    public WithExceptions.Result<PCollection<OutputT>, FailureT> 
expand(PCollection<InputT> input) {
+      final TupleTag<OutputT> outputTag = new TupleTag<OutputT>() {};
+      final TupleTag<FailureT> failureTag;
+      if (failureType == null) {
+        failureTag = new TupleTag<>();
+      } else {
+        failureTag =
+            new TupleTag<FailureT>() {
+              @Override
+              public TypeDescriptor<FailureT> getTypeDescriptor() {
+                return failureType;
+              }
+            };
+      }
+      DoFn<InputT, OutputT> doFn =
+          new DoFn<InputT, OutputT>() {
+            @ProcessElement
+            public void processElement(
+                @Element InputT element, MultiOutputReceiver receiver, 
ProcessContext c)
+                throws Exception {
+              boolean exceptionWasThrown = false;
+              Iterable<OutputT> res = null;
+              try {
+                res = fn.getClosure().apply(c.element(), 
Fn.Context.wrapProcessContext(c));
+              } catch (Exception e) {
+                exceptionWasThrown = true;
+                ExceptionElement<InputT> exceptionElement = 
ExceptionElement.of(element, e);
+                
receiver.get(failureTag).output(exceptionHandler.apply(exceptionElement));
+              }
+              if (!exceptionWasThrown) {
+                for (OutputT output : res) {
 
 Review comment:
   Ah, this is a very good point. I believe that the output() will immediately 
call the next fused DoFn with that element, so errors will bubble all the way 
up.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

            Worklog Id:     (was: 201033)
            Time Spent: 8h  (was: 7h 50m)
    Remaining Estimate: 160h  (was: 160h 10m)

> Add exception handling to single message transforms in Java SDK
> ---------------------------------------------------------------
>
>                 Key: BEAM-5638
>                 URL: https://issues.apache.org/jira/browse/BEAM-5638
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-java-core
>            Reporter: Jeff Klukas
>            Assignee: Jeff Klukas
>            Priority: Minor
>              Labels: triaged
>   Original Estimate: 168h
>          Time Spent: 8h
>  Remaining Estimate: 160h
>
> Add methods to MapElements, FlatMapElements, and Filter that allow users to 
> specify expected exceptions and tuple tags to associate with the with 
> collections of the successfully and unsuccessfully processed elements.
> See discussion on dev list:
> https://lists.apache.org/thread.html/936ed2a5f2c01be066fd903abf70130625e0b8cf4028c11b89b8b23f@%3Cdev.beam.apache.org%3E



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to