[ https://issues.apache.org/jira/browse/BEAM-5638?focusedWorklogId=151995&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-151995 ]
ASF GitHub Bot logged work on BEAM-5638: ---------------------------------------- Author: ASF GitHub Bot Created on: 06/Oct/18 18:27 Start Date: 06/Oct/18 18:27 Worklog Time Spent: 10m Work Description: tims commented on a change in pull request #6586: [BEAM-5638] Exception handling for Java single message transforms URL: https://github.com/apache/beam/pull/6586#discussion_r223189848 ########## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Failure.java ########## @@ -0,0 +1,101 @@ +package org.apache.beam.sdk.transforms; + +import com.google.auto.value.AutoValue; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ObjectArrays; +import java.io.Serializable; +import java.util.List; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.apache.beam.sdk.values.TypeDescriptor; + +/** + * Wraps an exception along with an input value; this is the element type of failure collections + * returned by single message transforms configured to catch exceptions. + * + * @param <T> type of the wrapped input value that caused an exception to be raised + */ +@AutoValue +public abstract class Failure<T> implements Serializable { + public static <T> Failure<T> of(Exception exception, T value) { + return new AutoValue_Failure<>(exception, value); + } + + public abstract Exception exception(); + + public abstract T value(); + + /** + * Internal class for collecting tuple tags associated with collections of {@link Exception} + * classes that should route to them. Also contains helper methods to simplify implementation of + * the {@code WithFailures} nested classes of {@link MapElements}, {@link FlatMapElements}, etc. + */ + @AutoValue + abstract static class TaggedExceptionsList<T> implements Serializable { + abstract ImmutableList<TupleTag<Failure<T>>> tags(); + + abstract ImmutableList<List<Class<?>>> exceptionLists(); + + static <T> TaggedExceptionsList<T> empty() { + return new AutoValue_Failure_TaggedExceptionsList<>(ImmutableList.of(), ImmutableList.of()); + } + + /** + * Return a new {@link TaggedExceptionsList} that has all the tags and exceptions of this {@link + * TaggedExceptionsList} plus a new element representing the arguments passed in here. + */ + TaggedExceptionsList<T> and( + TupleTag<Failure<T>> tag, Class<?> exceptionToCatch, Class<?>[] additionalExceptions) { + final ImmutableList<TupleTag<Failure<T>>> newTags = + ImmutableList.<TupleTag<Failure<T>>>builder().addAll(tags()).add(tag).build(); + final ImmutableList<List<Class<?>>> newExceptionLists = + ImmutableList.<List<Class<?>>>builder() + .addAll(exceptionLists()) + .add( + ImmutableList.copyOf(ObjectArrays.concat(exceptionToCatch, additionalExceptions))) + .build(); + return new AutoValue_Failure_TaggedExceptionsList<>(newTags, newExceptionLists); + } + + /** Return the internal typed list of tags as an untyped {@link TupleTagList}. */ + TupleTagList tupleTagList() { + TupleTagList l = TupleTagList.empty(); + for (TupleTag<?> tag : tags()) { + l = l.and(tag); + } + return l; + } + + /** + * Check the registered exception classes to see if the exception passed in here matches. If it + * does, wrap the exception and value together in a {@link Failure} and send to the output + * receiver. If not, rethrow so processing stops on the unexpected failure. + */ + void outputOrRethrow(Exception e, T value, MultiOutputReceiver receiver) throws Exception { + for (int i = 0; i < tags().size(); i++) { + for (Class<?> cls : exceptionLists().get(i)) { + if (cls.isInstance(e)) { + receiver.get(tags().get(i)).output(Failure.of(e, value)); + return; + } + } + } + throw e; + } + + /** + * Set appropriate coders on all the failure collections in the given {@link PCollectionTuple}. + */ + PCollectionTuple applyFailureCoders(PCollectionTuple pcs) { + final SerializableCoder<Failure<T>> failureCoder = + SerializableCoder.of(new TypeDescriptor<Failure<T>>() {}); Review comment: I implemented less general exception handling recently just as a base class for my DoFns, I ended up having a proto that contains the stack trace and other info just as strings. Also because I was worried about the SerializableCoder efficiency. One option might be for the failure output type be a generic? And we provide a function for handling the exception instead? eg withFailureHandler(TupleTag<F> failureTag, SerializableFunction<Exception, F> failureHandler, Coder<F> failureCoder) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 151995) Time Spent: 2h 40m (was: 2.5h) Remaining Estimate: 165h 20m (was: 165.5h) > Add exception handling to single message transforms in Java SDK > --------------------------------------------------------------- > > Key: BEAM-5638 > URL: https://issues.apache.org/jira/browse/BEAM-5638 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core > Reporter: Jeff Klukas > Assignee: Jeff Klukas > Priority: Minor > Original Estimate: 168h > Time Spent: 2h 40m > Remaining Estimate: 165h 20m > > Add methods to MapElements, FlatMapElements, and Filter that allow users to > specify expected exceptions and tuple tags to associate with the with > collections of the successfully and unsuccessfully processed elements. > See discussion on dev list: > https://lists.apache.org/thread.html/936ed2a5f2c01be066fd903abf70130625e0b8cf4028c11b89b8b23f@%3Cdev.beam.apache.org%3E -- This message was sent by Atlassian JIRA (v7.6.3#76005)