Jeff Klukas created BEAM-5413: --------------------------------- Summary: Add method for defining composite transforms as lambda expressions Key: BEAM-5413 URL: https://issues.apache.org/jira/browse/BEAM-5413 Project: Beam Issue Type: Improvement Components: sdk-java-core Reporter: Jeff Klukas Assignee: Kenneth Knowles Fix For: 2.8.0
Defining a composite transform today requires writing a full named subclass of PTransform (as [the programming guide documents|[https://beam.apache.org/documentation/programming-guide/#composite-transforms]),] but there are cases where users may want to define a fairly trivial composite transform using a less verbose Java 8 lambda expression. Consider an example where the user has defined MyDeserializationTransform that attempts to deserialize byte arrays into some object, returning a PCollectionTuple with tags for successfully deserialized records (mainTag) and for errors (errorTag). If we introduce a PTransform::compose method that takes in a SerializableFunction, the user can handle errors in a small lambda expression: {code:java} byteArrays .apply("attempt to deserialize messages", new MyDeserializationTransform()) .apply("write deserialization errors", PTransform.compose((PCollectionTuple input) -> { input .get(errorTag) .apply(new MyErrorOutputTransform()); return input.get(mainTag); }) .apply("more processing on the deserialized messages", new MyOtherTransform()) {code} This style allows a more concise and fluent pipeline definition than is currently possible. -- This message was sent by Atlassian JIRA (v7.6.3#76005)